referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/
scheduler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The scheduler module for parachains and parathreads.
18//!
19//! This module is responsible for two main tasks:
20//!   - Partitioning validators into groups and assigning groups to parachains and parathreads
21//!   - Scheduling parachains and parathreads
22//!
23//! It aims to achieve these tasks with these goals in mind:
24//! - It should be possible to know at least a block ahead-of-time, ideally more, which validators
25//!   are going to be assigned to which parachains.
26//! - Parachains that have a candidate pending availability in this fork of the chain should not be
27//!   assigned.
28//! - Validator assignments should not be gameable. Malicious cartels should not be able to
29//!   manipulate the scheduler to assign themselves as desired.
30//! - High or close to optimal throughput of parachains and parathreads. Work among validator groups
31//!   should be balanced.
32//!
33//! The Scheduler manages resource allocation using the concept of "Availability Cores".
34//! There will be one availability core for each parachain, and a fixed number of cores
35//! used for multiplexing parathreads. Validators will be partitioned into groups, with the same
36//! number of groups as availability cores. Validator groups will be assigned to different
37//! availability cores over time.
38
39use crate::{configuration, initializer::SessionChangeNotification, paras};
40use alloc::{
41	collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
42	vec::Vec,
43};
44use frame_support::{pallet_prelude::*, traits::Defensive};
45use frame_system::pallet_prelude::BlockNumberFor;
46pub use polkadot_core_primitives::v2::BlockNumber;
47use polkadot_primitives::{
48	CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, SchedulerParams,
49	ValidatorIndex,
50};
51use sp_runtime::traits::One;
52
53pub mod common;
54
55use common::{Assignment, AssignmentProvider};
56
57pub use pallet::*;
58
59#[cfg(test)]
60mod tests;
61
62const LOG_TARGET: &str = "runtime::parachains::scheduler";
63
64pub mod migration;
65
66#[frame_support::pallet]
67pub mod pallet {
68	use super::*;
69
70	const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
71
72	#[pallet::pallet]
73	#[pallet::without_storage_info]
74	#[pallet::storage_version(STORAGE_VERSION)]
75	pub struct Pallet<T>(_);
76
77	#[pallet::config]
78	pub trait Config: frame_system::Config + configuration::Config + paras::Config {
79		type AssignmentProvider: AssignmentProvider<BlockNumberFor<Self>>;
80	}
81
82	/// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the
83	/// broader set of Polkadot validators, but instead just the subset used for parachains during
84	/// this session.
85	///
86	/// Bound: The number of cores is the sum of the numbers of parachains and parathread
87	/// multiplexers. Reasonably, 100-1000. The dominant factor is the number of validators: safe
88	/// upper bound at 10k.
89	#[pallet::storage]
90	pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
91
92	/// The block number where the session start occurred. Used to track how many group rotations
93	/// have occurred.
94	///
95	/// Note that in the context of parachains modules the session change is signaled during
96	/// the block and enacted at the end of the block (at the finalization stage, to be exact).
97	/// Thus for all intents and purposes the effect of the session change is observed at the
98	/// block following the session change, block number of which we save in this storage value.
99	#[pallet::storage]
100	pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
101
102	/// One entry for each availability core. The `VecDeque` represents the assignments to be
103	/// scheduled on that core.
104	#[pallet::storage]
105	pub type ClaimQueue<T> = StorageValue<_, BTreeMap<CoreIndex, VecDeque<Assignment>>, ValueQuery>;
106
107	/// Availability timeout status of a core.
108	pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
109		/// Is the core already timed out?
110		///
111		/// If this is true the core will be freed at this block.
112		pub timed_out: bool,
113
114		/// When does this core timeout.
115		///
116		/// The block number the core times out. If `timed_out` is true, this will correspond to
117		/// now (current block number).
118		pub live_until: BlockNumber,
119	}
120}
121
122impl<T: Config> Pallet<T> {
123	/// Called by the initializer to initialize the scheduler pallet.
124	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
125		Weight::zero()
126	}
127
128	/// Called by the initializer to finalize the scheduler pallet.
129	pub(crate) fn initializer_finalize() {}
130
131	/// Called by the initializer to note that a new session has started.
132	pub(crate) fn initializer_on_new_session(
133		notification: &SessionChangeNotification<BlockNumberFor<T>>,
134	) {
135		let SessionChangeNotification { validators, new_config, .. } = notification;
136		let config = new_config;
137		let assigner_cores = config.scheduler_params.num_cores;
138
139		let n_cores = core::cmp::max(
140			assigner_cores,
141			match config.scheduler_params.max_validators_per_core {
142				Some(x) if x != 0 => validators.len() as u32 / x,
143				_ => 0,
144			},
145		);
146
147		// shuffle validators into groups.
148		if n_cores == 0 || validators.is_empty() {
149			ValidatorGroups::<T>::set(Vec::new());
150		} else {
151			let group_base_size = validators
152				.len()
153				.checked_div(n_cores as usize)
154				.defensive_proof("n_cores should not be 0")
155				.unwrap_or(0);
156			let n_larger_groups = validators
157				.len()
158				.checked_rem(n_cores as usize)
159				.defensive_proof("n_cores should not be 0")
160				.unwrap_or(0);
161
162			// Groups contain indices into the validators from the session change notification,
163			// which are already shuffled.
164
165			let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
166			for i in 0..n_larger_groups {
167				let offset = (group_base_size + 1) * i;
168				groups.push(
169					(0..group_base_size + 1)
170						.map(|j| offset + j)
171						.map(|j| ValidatorIndex(j as _))
172						.collect(),
173				);
174			}
175
176			for i in 0..(n_cores as usize - n_larger_groups) {
177				let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
178				groups.push(
179					(0..group_base_size)
180						.map(|j| offset + j)
181						.map(|j| ValidatorIndex(j as _))
182						.collect(),
183				);
184			}
185
186			ValidatorGroups::<T>::set(groups);
187		}
188
189		// Resize and populate claim queue.
190		Self::maybe_resize_claim_queue();
191		Self::populate_claim_queue_after_session_change();
192
193		let now = frame_system::Pallet::<T>::block_number() + One::one();
194		SessionStartBlock::<T>::set(now);
195	}
196
197	/// Get the validators in the given group, if the group index is valid for this session.
198	pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
199		ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
200	}
201
202	/// Get the number of cores.
203	pub(crate) fn num_availability_cores() -> usize {
204		ValidatorGroups::<T>::decode_len().unwrap_or(0)
205	}
206
207	/// Expected claim queue len. Can be different than the real length if for example we don't have
208	/// assignments for a core.
209	fn expected_claim_queue_len(config: &SchedulerParams<BlockNumberFor<T>>) -> u32 {
210		core::cmp::min(config.num_cores, Self::num_availability_cores() as u32)
211	}
212
213	/// Get the group assigned to a specific core by index at the current block number. Result
214	/// undefined if the core index is unknown or the block number is less than the session start
215	/// index.
216	pub(crate) fn group_assigned_to_core(
217		core: CoreIndex,
218		at: BlockNumberFor<T>,
219	) -> Option<GroupIndex> {
220		let config = configuration::ActiveConfig::<T>::get();
221		let session_start_block = SessionStartBlock::<T>::get();
222
223		if at < session_start_block {
224			return None
225		}
226
227		let validator_groups = ValidatorGroups::<T>::get();
228
229		if core.0 as usize >= validator_groups.len() {
230			return None
231		}
232
233		let rotations_since_session_start: BlockNumberFor<T> =
234			(at - session_start_block) / config.scheduler_params.group_rotation_frequency;
235
236		let rotations_since_session_start =
237			<BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
238				.unwrap_or(0);
239		// Error case can only happen if rotations occur only once every u32::max(),
240		// so functionally no difference in behavior.
241
242		let group_idx =
243			(core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
244		Some(GroupIndex(group_idx as u32))
245	}
246
247	/// Returns a predicate that should be used for timing out occupied cores.
248	///
249	/// This only ever times out cores that have been occupied across a group rotation boundary.
250	pub(crate) fn availability_timeout_predicate(
251	) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
252		let config = configuration::ActiveConfig::<T>::get();
253		let now = frame_system::Pallet::<T>::block_number();
254		let rotation_info = Self::group_rotation_info(now);
255
256		let next_rotation = rotation_info.next_rotation_at();
257
258		let times_out = Self::availability_timeout_check_required();
259
260		move |pending_since| {
261			let time_out_at = if times_out {
262				// We are at the beginning of the rotation, here availability period is relevant.
263				// Note: blocks backed in this rotation will never time out here as backed_in +
264				// config.paras_availability_period will always be > now for these blocks, as
265				// otherwise above condition would not be true.
266				pending_since + config.scheduler_params.paras_availability_period
267			} else {
268				next_rotation + config.scheduler_params.paras_availability_period
269			};
270
271			AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
272		}
273	}
274
275	/// Is evaluation of `availability_timeout_predicate` necessary at the current block?
276	///
277	/// This can be used to avoid calling `availability_timeout_predicate` for each core in case
278	/// this function returns false.
279	pub(crate) fn availability_timeout_check_required() -> bool {
280		let config = configuration::ActiveConfig::<T>::get();
281		let now = frame_system::Pallet::<T>::block_number() + One::one();
282		let rotation_info = Self::group_rotation_info(now);
283
284		let current_window =
285			rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
286		now < current_window
287	}
288
289	/// Returns a helper for determining group rotation.
290	pub(crate) fn group_rotation_info(
291		now: BlockNumberFor<T>,
292	) -> GroupRotationInfo<BlockNumberFor<T>> {
293		let session_start_block = SessionStartBlock::<T>::get();
294		let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
295			.scheduler_params
296			.group_rotation_frequency;
297
298		GroupRotationInfo { session_start_block, now, group_rotation_frequency }
299	}
300
301	/// Return the next thing that will be scheduled on this core assuming it is currently
302	/// occupied and the candidate occupying it became available.
303	pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
304		// Since this is being called from a runtime API, we need to workaround for #64.
305		if Self::on_chain_storage_version() == StorageVersion::new(2) {
306			migration::v2::ClaimQueue::<T>::get()
307				.get(&core)
308				.and_then(|a| a.front().map(|entry| entry.assignment.para_id()))
309		} else {
310			ClaimQueue::<T>::get()
311				.get(&core)
312				.and_then(|a| a.front().map(|assignment| assignment.para_id()))
313		}
314		.map(|para_id| ScheduledCore { para_id, collator: None })
315	}
316
317	// Since this is being called from a runtime API, we need to workaround for #64.
318	pub(crate) fn get_claim_queue() -> BTreeMap<CoreIndex, VecDeque<Assignment>> {
319		if Self::on_chain_storage_version() == StorageVersion::new(2) {
320			migration::v2::ClaimQueue::<T>::get()
321				.into_iter()
322				.map(|(core_index, entries)| {
323					(core_index, entries.into_iter().map(|e| e.assignment).collect())
324				})
325				.collect()
326		} else {
327			ClaimQueue::<T>::get()
328		}
329	}
330
331	/// For each core that isn't part of the `except_for` set, pop the first item of the claim queue
332	/// and fill the queue from the assignment provider.
333	pub(crate) fn advance_claim_queue(except_for: &BTreeSet<CoreIndex>) {
334		let config = configuration::ActiveConfig::<T>::get();
335		let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
336		// Extra sanity, config should already never be smaller than 1:
337		let n_lookahead = config.scheduler_params.lookahead.max(1);
338
339		for core_idx in 0..expected_claim_queue_len {
340			let core_idx = CoreIndex::from(core_idx);
341
342			if !except_for.contains(&core_idx) {
343				let core_idx = CoreIndex::from(core_idx);
344
345				if let Some(dropped_para) = Self::pop_front_of_claim_queue(&core_idx) {
346					T::AssignmentProvider::report_processed(dropped_para);
347				}
348
349				Self::fill_claim_queue(core_idx, n_lookahead);
350			}
351		}
352	}
353
354	// on new session
355	fn maybe_resize_claim_queue() {
356		let cq = ClaimQueue::<T>::get();
357		let Some((old_max_core, _)) = cq.last_key_value() else { return };
358		let config = configuration::ActiveConfig::<T>::get();
359		let new_core_count = Self::expected_claim_queue_len(&config.scheduler_params);
360
361		if new_core_count < (old_max_core.0 + 1) {
362			ClaimQueue::<T>::mutate(|cq| {
363				let to_remove: Vec<_> =
364					cq.range(CoreIndex(new_core_count)..=*old_max_core).map(|(k, _)| *k).collect();
365				for key in to_remove {
366					if let Some(dropped_assignments) = cq.remove(&key) {
367						Self::push_back_to_assignment_provider(dropped_assignments.into_iter());
368					}
369				}
370			});
371		}
372	}
373
374	// Populate the claim queue. To be called on new session, after all the other modules were
375	// initialized.
376	fn populate_claim_queue_after_session_change() {
377		let config = configuration::ActiveConfig::<T>::get();
378		// Extra sanity, config should already never be smaller than 1:
379		let n_lookahead = config.scheduler_params.lookahead.max(1);
380		let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params);
381
382		for core_idx in 0..expected_claim_queue_len {
383			let core_idx = CoreIndex::from(core_idx);
384			Self::fill_claim_queue(core_idx, n_lookahead);
385		}
386	}
387
388	/// Push some assignments back to the provider.
389	fn push_back_to_assignment_provider(
390		assignments: impl core::iter::DoubleEndedIterator<Item = Assignment>,
391	) {
392		// Push back in reverse order so that when we pop from the provider again,
393		// the entries in the claim queue are in the same order as they are right
394		// now.
395		for assignment in assignments.rev() {
396			T::AssignmentProvider::push_back_assignment(assignment);
397		}
398	}
399
400	fn fill_claim_queue(core_idx: CoreIndex, n_lookahead: u32) {
401		ClaimQueue::<T>::mutate(|la| {
402			let cq = la.entry(core_idx).or_default();
403
404			let mut n_lookahead_used = cq.len() as u32;
405
406			// If the claim queue used to be empty, we need to double the first assignment.
407			// Otherwise, the para will only be able to get the collation in right at the next block
408			// (synchronous backing).
409			// Only do this if the configured lookahead is greater than 1. Otherwise, it doesn't
410			// make sense.
411			if n_lookahead_used == 0 && n_lookahead > 1 {
412				if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
413					T::AssignmentProvider::assignment_duplicated(&assignment);
414					cq.push_back(assignment.clone());
415					cq.push_back(assignment);
416					n_lookahead_used += 2;
417				}
418			}
419
420			for _ in n_lookahead_used..n_lookahead {
421				if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
422					cq.push_back(assignment);
423				} else {
424					break
425				}
426			}
427
428			// If we didn't end up pushing anything, remove the entry. We don't want to waste the
429			// space if we've no assignments.
430			if cq.is_empty() {
431				la.remove(&core_idx);
432			}
433		});
434	}
435
436	fn pop_front_of_claim_queue(core_idx: &CoreIndex) -> Option<Assignment> {
437		ClaimQueue::<T>::mutate(|cq| cq.get_mut(core_idx)?.pop_front())
438	}
439
440	#[cfg(any(feature = "try-runtime", test))]
441	fn claim_queue_len() -> usize {
442		ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
443	}
444
445	#[cfg(all(not(feature = "runtime-benchmarks"), test))]
446	pub(crate) fn claim_queue_is_empty() -> bool {
447		Self::claim_queue_len() == 0
448	}
449
450	#[cfg(test)]
451	pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
452		ValidatorGroups::<T>::set(validator_groups);
453	}
454
455	#[cfg(test)]
456	pub(crate) fn set_claim_queue(claim_queue: BTreeMap<CoreIndex, VecDeque<Assignment>>) {
457		ClaimQueue::<T>::set(claim_queue);
458	}
459}