referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/
scheduler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The scheduler module for parachains and parathreads.
18//!
19//! This module is responsible for two main tasks:
20//!   - Partitioning validators into groups and assigning groups to parachains and parathreads
21//!   - Scheduling parachains and parathreads
22//!
23//! It aims to achieve these tasks with these goals in mind:
24//! - It should be possible to know at least a block ahead-of-time, ideally more, which validators
25//!   are going to be assigned to which parachains.
26//! - Parachains that have a candidate pending availability in this fork of the chain should not be
27//!   assigned.
28//! - Validator assignments should not be gameable. Malicious cartels should not be able to
29//!   manipulate the scheduler to assign themselves as desired.
30//! - High or close to optimal throughput of parachains and parathreads. Work among validator groups
31//!   should be balanced.
32//!
33//! The Scheduler manages resource allocation using the concept of "Availability Cores".
34//! There will be one availability core for each parachain, and a fixed number of cores
35//! used for multiplexing parathreads. Validators will be partitioned into groups, with the same
36//! number of groups as availability cores. Validator groups will be assigned to different
37//! availability cores over time.
38
39use crate::{configuration, initializer::SessionChangeNotification, paras::AssignCoretime};
40use alloc::{
41	collections::{btree_map::BTreeMap, vec_deque::VecDeque},
42	vec,
43	vec::Vec,
44};
45use frame_support::{pallet_prelude::*, traits::Defensive};
46use frame_system::pallet_prelude::BlockNumberFor;
47use polkadot_primitives::{CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ValidatorIndex};
48use sp_runtime::traits::{One, Saturating};
49
50const LOG_TARGET: &str = "runtime::parachains::scheduler";
51
52pub use assigner_coretime::{CoreAssignment, PartsOf57600};
53pub use pallet::*;
54pub use polkadot_core_primitives::v2::BlockNumber;
55
56#[cfg(test)]
57mod tests;
58
59/// Implements core assignments as coming from the Coretime chain.
60///
61/// Depends on the ondemand pallet to assign pool cores.
62mod assigner_coretime;
63
64/// Storage migrations for the scheduler pallet.
65pub mod migration;
66
67#[frame_support::pallet]
68pub mod pallet {
69
70	use crate::on_demand;
71
72	use super::*;
73
74	const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);
75
76	#[pallet::pallet]
77	#[pallet::without_storage_info]
78	#[pallet::storage_version(STORAGE_VERSION)]
79	pub struct Pallet<T>(_);
80
81	#[pallet::config]
82	pub trait Config: frame_system::Config + configuration::Config + on_demand::Config {}
83
84	#[pallet::error]
85	pub enum Error<T> {
86		/// assign_core was called with no assignments.
87		AssignmentsEmpty,
88		/// assign_core with non allowed insertion.
89		DisallowedInsert,
90	}
91
92	impl<T> From<assigner_coretime::Error> for Error<T> {
93		fn from(e: assigner_coretime::Error) -> Self {
94			match e {
95				assigner_coretime::Error::AssignmentsEmpty => Error::AssignmentsEmpty,
96				assigner_coretime::Error::DisallowedInsert => Error::DisallowedInsert,
97			}
98		}
99	}
100
101	/// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the
102	/// broader set of Polkadot validators, but instead just the subset used for parachains during
103	/// this session.
104	///
105	/// Bound: The number of cores is the sum of the numbers of parachains and parathread
106	/// multiplexers. Reasonably, 100-1000. The dominant factor is the number of validators: safe
107	/// upper bound at 10k.
108	#[pallet::storage]
109	pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
110
111	/// The block number where the session start occurred. Used to track how many group rotations
112	/// have occurred.
113	///
114	/// Note that in the context of parachains modules the session change is signaled during
115	/// the block and enacted at the end of the block (at the finalization stage, to be exact).
116	/// Thus for all intents and purposes the effect of the session change is observed at the
117	/// block following the session change, block number of which we save in this storage value.
118	#[pallet::storage]
119	pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
120
121	/// Scheduled assignment sets for coretime cores.
122	///
123	/// Assignments as of the given block number. They will go into state once the block number is
124	/// reached (and replace whatever was in there before).
125	///
126	/// Managed by the `assigner_coretime` submodule.
127	#[pallet::storage]
128	pub(super) type CoreSchedules<T: Config> = StorageMap<
129		_,
130		Twox64Concat,
131		(BlockNumberFor<T>, CoreIndex),
132		assigner_coretime::Schedule<BlockNumberFor<T>>,
133		OptionQuery,
134	>;
135
136	/// Assignments which are currently active for each core.
137	///
138	/// They will be picked from `CoreSchedules` once we reach the scheduled block number.
139	///
140	/// Managed by the `assigner_coretime` submodule.
141	#[pallet::storage]
142	pub(super) type CoreDescriptors<T: Config> = StorageValue<
143		_,
144		BTreeMap<CoreIndex, assigner_coretime::CoreDescriptor<BlockNumberFor<T>>>,
145		ValueQuery,
146	>;
147
148	/// Availability timeout status of a core.
149	pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
150		/// Is the core already timed out?
151		///
152		/// If this is true the core will be freed at this block.
153		pub timed_out: bool,
154
155		/// When does this core timeout.
156		///
157		/// The block number the core times out. If `timed_out` is true, this will correspond to
158		/// now (current block number).
159		pub live_until: BlockNumber,
160	}
161}
162
163impl<T: Config> AssignCoretime for Pallet<T> {
164	// Only for testing purposes.
165	fn assign_coretime(id: ParaId) -> DispatchResult {
166		let current_block = frame_system::Pallet::<T>::block_number();
167
168		// Add a new core and assign the para to it.
169		let mut config = configuration::ActiveConfig::<T>::get();
170		let core = config.scheduler_params.num_cores;
171		config.scheduler_params.num_cores.saturating_inc();
172
173		// `assign_coretime` is only called at genesis or by root, so setting the active
174		// config here is fine.
175		configuration::Pallet::<T>::force_set_active_config(config);
176
177		let begin = current_block + One::one();
178		let assignment = vec![(pallet_broker::CoreAssignment::Task(id.into()), PartsOf57600::FULL)];
179		assigner_coretime::assign_core::<T>(CoreIndex(core), begin, assignment, None)
180			.map_err(Error::<T>::from)?;
181		Ok(())
182	}
183}
184
185impl<T: Config> Pallet<T> {
186	/// Assign a particular core ala Coretime.
187	pub(crate) fn assign_core(
188		core: CoreIndex,
189		begin: BlockNumberFor<T>,
190		assignment: Vec<(CoreAssignment, PartsOf57600)>,
191		end_hint: Option<BlockNumberFor<T>>,
192	) -> DispatchResult {
193		assigner_coretime::assign_core::<T>(core, begin, assignment, end_hint)
194			.map_err(Error::<T>::from)?;
195		Ok(())
196	}
197
198	/// Advance claim queue.
199	///
200	/// Parameters:
201	/// - is_blocked: Inform whether a given core is currently blocked (schedules can not be
202	/// served).
203	///
204	/// Returns: The `ParaId`s that had been scheduled next, blocked ones are filtered out.
205	pub(crate) fn advance_claim_queue<F: Fn(CoreIndex) -> bool>(
206		is_blocked: F,
207	) -> BTreeMap<CoreIndex, ParaId> {
208		let mut assignments = assigner_coretime::advance_assignments::<T, F>(is_blocked);
209		assignments.split_off(&CoreIndex(Self::num_availability_cores() as _));
210		assignments
211	}
212
213	/// Retrieve upcoming claims for each core.
214	///
215	/// To be called from runtime APIs.
216	pub(crate) fn claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>> {
217		// Since this is being called from a runtime API, we need to workaround for #64.
218		if Self::on_chain_storage_version() == StorageVersion::new(3) {
219			return migration::v3::ClaimQueue::<T>::get()
220				.into_iter()
221				.map(|(core_index, paras)| {
222					(core_index, paras.into_iter().map(|e| e.para_id()).collect())
223				})
224				.collect();
225		}
226
227		let config = configuration::ActiveConfig::<T>::get();
228		let lookahead = config.scheduler_params.lookahead;
229		let mut queue = assigner_coretime::peek_next_block::<T>(lookahead);
230		queue.split_off(&CoreIndex(Self::num_availability_cores() as _));
231		queue
232	}
233
234	/// Called by the initializer to initialize the scheduler pallet.
235	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
236		Weight::zero()
237	}
238
239	/// Called by the initializer to finalize the scheduler pallet.
240	pub(crate) fn initializer_finalize() {}
241
242	/// Called by the initializer to note that a new session has started.
243	pub(crate) fn initializer_on_new_session(
244		notification: &SessionChangeNotification<BlockNumberFor<T>>,
245	) {
246		let SessionChangeNotification { validators, new_config, .. } = notification;
247		let config = new_config;
248		let assigner_cores = config.scheduler_params.num_cores;
249
250		let n_cores = core::cmp::max(
251			assigner_cores,
252			match config.scheduler_params.max_validators_per_core {
253				Some(x) if x != 0 => validators.len() as u32 / x,
254				_ => 0,
255			},
256		);
257
258		// shuffle validators into groups.
259		if n_cores == 0 || validators.is_empty() {
260			ValidatorGroups::<T>::set(Vec::new());
261		} else {
262			let group_base_size = validators
263				.len()
264				.checked_div(n_cores as usize)
265				.defensive_proof("n_cores should not be 0")
266				.unwrap_or(0);
267			let n_larger_groups = validators
268				.len()
269				.checked_rem(n_cores as usize)
270				.defensive_proof("n_cores should not be 0")
271				.unwrap_or(0);
272
273			// Groups contain indices into the validators from the session change notification,
274			// which are already shuffled.
275
276			let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
277			for i in 0..n_larger_groups {
278				let offset = (group_base_size + 1) * i;
279				groups.push(
280					(0..group_base_size + 1)
281						.map(|j| offset + j)
282						.map(|j| ValidatorIndex(j as _))
283						.collect(),
284				);
285			}
286
287			for i in 0..(n_cores as usize - n_larger_groups) {
288				let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
289				groups.push(
290					(0..group_base_size)
291						.map(|j| offset + j)
292						.map(|j| ValidatorIndex(j as _))
293						.collect(),
294				);
295			}
296
297			ValidatorGroups::<T>::set(groups);
298		}
299		let now = frame_system::Pallet::<T>::block_number() + One::one();
300		SessionStartBlock::<T>::set(now);
301	}
302
303	/// Get the validators in the given group, if the group index is valid for this session.
304	pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
305		ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
306	}
307
308	/// Get the number of cores.
309	pub(crate) fn num_availability_cores() -> usize {
310		ValidatorGroups::<T>::decode_len().unwrap_or(0)
311	}
312
313	/// Get the group assigned to a specific core by index at the current block number. Result
314	/// undefined if the core index is unknown or the block number is less than the session start
315	/// index.
316	pub(crate) fn group_assigned_to_core(
317		core: CoreIndex,
318		at: BlockNumberFor<T>,
319	) -> Option<GroupIndex> {
320		let config = configuration::ActiveConfig::<T>::get();
321		let session_start_block = SessionStartBlock::<T>::get();
322
323		if at < session_start_block {
324			return None;
325		}
326
327		let validator_groups = ValidatorGroups::<T>::get();
328
329		if core.0 as usize >= validator_groups.len() {
330			return None;
331		}
332
333		let rotations_since_session_start: BlockNumberFor<T> =
334			(at - session_start_block) / config.scheduler_params.group_rotation_frequency;
335
336		let rotations_since_session_start =
337			<BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
338				.unwrap_or(0);
339		// Error case can only happen if rotations occur only once every u32::max(),
340		// so functionally no difference in behavior.
341
342		let group_idx =
343			(core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
344		Some(GroupIndex(group_idx as u32))
345	}
346
347	/// Returns a predicate that should be used for timing out occupied cores.
348	///
349	/// This only ever times out cores that have been occupied across a group rotation boundary.
350	pub(crate) fn availability_timeout_predicate(
351	) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
352		let config = configuration::ActiveConfig::<T>::get();
353		let now = frame_system::Pallet::<T>::block_number();
354		let rotation_info = Self::group_rotation_info(now);
355
356		let next_rotation = rotation_info.next_rotation_at();
357
358		let times_out = Self::availability_timeout_check_required();
359
360		move |pending_since| {
361			let time_out_at = if times_out {
362				// We are at the beginning of the rotation, here availability period is relevant.
363				// Note: blocks backed in this rotation will never time out here as backed_in +
364				// config.paras_availability_period will always be > now for these blocks, as
365				// otherwise above condition would not be true.
366				pending_since + config.scheduler_params.paras_availability_period
367			} else {
368				next_rotation + config.scheduler_params.paras_availability_period
369			};
370
371			AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
372		}
373	}
374
375	/// Is evaluation of `availability_timeout_predicate` necessary at the current block?
376	///
377	/// This can be used to avoid calling `availability_timeout_predicate` for each core in case
378	/// this function returns false.
379	pub(crate) fn availability_timeout_check_required() -> bool {
380		let config = configuration::ActiveConfig::<T>::get();
381		let now = frame_system::Pallet::<T>::block_number() + One::one();
382		let rotation_info = Self::group_rotation_info(now);
383
384		let current_window =
385			rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
386		now < current_window
387	}
388
389	/// Returns a helper for determining group rotation.
390	pub(crate) fn group_rotation_info(
391		now: BlockNumberFor<T>,
392	) -> GroupRotationInfo<BlockNumberFor<T>> {
393		let session_start_block = SessionStartBlock::<T>::get();
394		let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
395			.scheduler_params
396			.group_rotation_frequency;
397
398		GroupRotationInfo { session_start_block, now, group_rotation_frequency }
399	}
400
401	#[cfg(test)]
402	fn claim_queue_len() -> usize {
403		Self::claim_queue().iter().map(|la_vec| la_vec.1.len()).sum()
404	}
405
406	#[cfg(test)]
407	#[allow(dead_code)]
408	pub(crate) fn claim_queue_is_empty() -> bool {
409		Self::claim_queue_len() == 0
410	}
411
412	#[cfg(test)]
413	pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
414		ValidatorGroups::<T>::set(validator_groups);
415	}
416}