referrerpolicy=no-referrer-when-downgrade

polkadot_runtime_parachains/on_demand/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The parachain on demand assignment module.
18//!
19//! Implements a mechanism for taking in orders for on-demand parachain (previously parathreads)
20//! assignments. This module is not handled by the initializer but is instead instantiated in the
21//! `construct_runtime` macro.
22//!
23//! The module currently limits parallel execution of blocks from the same `ParaId` via
24//! a core affinity mechanism. As long as there exists an affinity for a `CoreIndex` for
25//! a specific `ParaId`, orders for blockspace for that `ParaId` will only be assigned to
26//! that `CoreIndex`.
27//!
28//! NOTE: Once we have elastic scaling implemented we might want to extend this module to support
29//! ignoring core affinity up to a certain extend. This should be opt-in though as the parachain
30//! needs to support multiple cores in the same block. If we want to enable a single parachain
31//! occupying multiple cores in on-demand, we will likely add a separate order type, where the
32//! intent can be made explicit.
33
34use sp_runtime::traits::Zero;
35mod benchmarking;
36pub mod migration;
37mod mock_helpers;
38mod types;
39
40extern crate alloc;
41
42#[cfg(test)]
43mod tests;
44
45use crate::{configuration, paras, scheduler::common::Assignment};
46use alloc::collections::BinaryHeap;
47use core::mem::take;
48use frame_support::{
49	pallet_prelude::*,
50	traits::{
51		defensive_prelude::*,
52		Currency,
53		ExistenceRequirement::{self, AllowDeath, KeepAlive},
54		WithdrawReasons,
55	},
56	PalletId,
57};
58use frame_system::{pallet_prelude::*, Pallet as System};
59use polkadot_primitives::{CoreIndex, Id as ParaId};
60use sp_runtime::{
61	traits::{AccountIdConversion, One, SaturatedConversion},
62	FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
63};
64use types::{
65	BalanceOf, CoreAffinityCount, EnqueuedOrder, QueuePushDirection, QueueStatusType,
66	SpotTrafficCalculationErr,
67};
68
69const LOG_TARGET: &str = "runtime::parachains::on-demand";
70
71pub use pallet::*;
72
73pub trait WeightInfo {
74	fn place_order_allow_death(s: u32) -> Weight;
75	fn place_order_keep_alive(s: u32) -> Weight;
76	fn place_order_with_credits(s: u32) -> Weight;
77}
78
79/// A weight info that is only suitable for testing.
80pub struct TestWeightInfo;
81
82impl WeightInfo for TestWeightInfo {
83	fn place_order_allow_death(_: u32) -> Weight {
84		Weight::MAX
85	}
86
87	fn place_order_keep_alive(_: u32) -> Weight {
88		Weight::MAX
89	}
90
91	fn place_order_with_credits(_: u32) -> Weight {
92		Weight::MAX
93	}
94}
95
96/// Defines how the account wants to pay for on-demand.
97#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
98enum PaymentType {
99	/// Use credits to purchase on-demand coretime.
100	Credits,
101	/// Use account's free balance to purchase on-demand coretime.
102	Balance,
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107
108	use super::*;
109
110	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
111
112	#[pallet::pallet]
113	#[pallet::without_storage_info]
114	#[pallet::storage_version(STORAGE_VERSION)]
115	pub struct Pallet<T>(_);
116
117	#[pallet::config]
118	pub trait Config: frame_system::Config + configuration::Config + paras::Config {
119		/// The runtime's definition of an event.
120		#[allow(deprecated)]
121		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
122
123		/// The runtime's definition of a Currency.
124		type Currency: Currency<Self::AccountId>;
125
126		/// Something that provides the weight of this pallet.
127		type WeightInfo: WeightInfo;
128
129		/// The default value for the spot traffic multiplier.
130		#[pallet::constant]
131		type TrafficDefaultValue: Get<FixedU128>;
132
133		/// The maximum number of blocks some historical revenue
134		/// information stored for.
135		#[pallet::constant]
136		type MaxHistoricalRevenue: Get<u32>;
137
138		/// Identifier for the internal revenue balance.
139		#[pallet::constant]
140		type PalletId: Get<PalletId>;
141	}
142
143	/// Creates an empty queue status for an empty queue with initial traffic value.
144	#[pallet::type_value]
145	pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType {
146		QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() }
147	}
148
149	#[pallet::type_value]
150	pub(super) fn EntriesOnEmpty<T: Config>() -> BinaryHeap<EnqueuedOrder> {
151		BinaryHeap::new()
152	}
153
154	/// Maps a `ParaId` to `CoreIndex` and keeps track of how many assignments the scheduler has in
155	/// it's lookahead. Keeping track of this affinity prevents parallel execution of the same
156	/// `ParaId` on two or more `CoreIndex`es.
157	#[pallet::storage]
158	pub(super) type ParaIdAffinity<T: Config> =
159		StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>;
160
161	/// Overall status of queue (both free + affinity entries)
162	#[pallet::storage]
163	pub(super) type QueueStatus<T: Config> =
164		StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty<T>>;
165
166	/// Priority queue for all orders which don't yet (or not any more) have any core affinity.
167	#[pallet::storage]
168	pub(super) type FreeEntries<T: Config> =
169		StorageValue<_, BinaryHeap<EnqueuedOrder>, ValueQuery, EntriesOnEmpty<T>>;
170
171	/// Queue entries that are currently bound to a particular core due to core affinity.
172	#[pallet::storage]
173	pub(super) type AffinityEntries<T: Config> = StorageMap<
174		_,
175		Twox64Concat,
176		CoreIndex,
177		BinaryHeap<EnqueuedOrder>,
178		ValueQuery,
179		EntriesOnEmpty<T>,
180	>;
181
182	/// Keeps track of accumulated revenue from on demand order sales.
183	#[pallet::storage]
184	pub type Revenue<T: Config> =
185		StorageValue<_, BoundedVec<BalanceOf<T>, T::MaxHistoricalRevenue>, ValueQuery>;
186
187	/// Keeps track of credits owned by each account.
188	#[pallet::storage]
189	pub type Credits<T: Config> =
190		StorageMap<_, Blake2_128Concat, T::AccountId, BalanceOf<T>, ValueQuery>;
191
192	#[pallet::event]
193	#[pallet::generate_deposit(pub(super) fn deposit_event)]
194	pub enum Event<T: Config> {
195		/// An order was placed at some spot price amount by orderer ordered_by
196		OnDemandOrderPlaced { para_id: ParaId, spot_price: BalanceOf<T>, ordered_by: T::AccountId },
197		/// The value of the spot price has likely changed
198		SpotPriceSet { spot_price: BalanceOf<T> },
199		/// An account was given credits.
200		AccountCredited { who: T::AccountId, amount: BalanceOf<T> },
201	}
202
203	#[pallet::error]
204	pub enum Error<T> {
205		/// The order queue is full, `place_order` will not continue.
206		QueueFull,
207		/// The current spot price is higher than the max amount specified in the `place_order`
208		/// call, making it invalid.
209		SpotPriceHigherThanMaxAmount,
210		/// The account doesn't have enough credits to purchase on-demand coretime.
211		InsufficientCredits,
212	}
213
214	#[pallet::hooks]
215	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
216		fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
217			// Update revenue information storage.
218			Revenue::<T>::mutate(|revenue| {
219				if let Some(overdue) =
220					revenue.force_insert_keep_left(0, 0u32.into()).defensive_unwrap_or(None)
221				{
222					// We have some overdue revenue not claimed by the Coretime Chain, let's
223					// accumulate it at the oldest stored block
224					if let Some(last) = revenue.last_mut() {
225						*last = last.saturating_add(overdue);
226					}
227				}
228			});
229
230			let config = configuration::ActiveConfig::<T>::get();
231			// We need to update the spot traffic on block initialize in order to account for idle
232			// blocks.
233			QueueStatus::<T>::mutate(|queue_status| {
234				Self::update_spot_traffic(&config, queue_status);
235			});
236
237			// Reads: `Revenue`, `ActiveConfig`, `QueueStatus`
238			// Writes: `Revenue`, `QueueStatus`
239			T::DbWeight::get().reads_writes(3, 2)
240		}
241	}
242
243	#[pallet::call]
244	impl<T: Config> Pallet<T> {
245		/// Create a single on demand core order.
246		/// Will use the spot price for the current block and will reap the account if needed.
247		///
248		/// Parameters:
249		/// - `origin`: The sender of the call, funds will be withdrawn from this account.
250		/// - `max_amount`: The maximum balance to withdraw from the origin to place an order.
251		/// - `para_id`: A `ParaId` the origin wants to provide blockspace for.
252		///
253		/// Errors:
254		/// - `InsufficientBalance`: from the Currency implementation
255		/// - `QueueFull`
256		/// - `SpotPriceHigherThanMaxAmount`
257		///
258		/// Events:
259		/// - `OnDemandOrderPlaced`
260		#[pallet::call_index(0)]
261		#[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(QueueStatus::<T>::get().size()))]
262		#[allow(deprecated)]
263		#[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
264		pub fn place_order_allow_death(
265			origin: OriginFor<T>,
266			max_amount: BalanceOf<T>,
267			para_id: ParaId,
268		) -> DispatchResult {
269			let sender = ensure_signed(origin)?;
270			Pallet::<T>::do_place_order(
271				sender,
272				max_amount,
273				para_id,
274				AllowDeath,
275				PaymentType::Balance,
276			)
277		}
278
279		/// Same as the [`place_order_allow_death`](Self::place_order_allow_death) call , but with a
280		/// check that placing the order will not reap the account.
281		///
282		/// Parameters:
283		/// - `origin`: The sender of the call, funds will be withdrawn from this account.
284		/// - `max_amount`: The maximum balance to withdraw from the origin to place an order.
285		/// - `para_id`: A `ParaId` the origin wants to provide blockspace for.
286		///
287		/// Errors:
288		/// - `InsufficientBalance`: from the Currency implementation
289		/// - `QueueFull`
290		/// - `SpotPriceHigherThanMaxAmount`
291		///
292		/// Events:
293		/// - `OnDemandOrderPlaced`
294		#[pallet::call_index(1)]
295		#[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(QueueStatus::<T>::get().size()))]
296		#[allow(deprecated)]
297		#[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
298		pub fn place_order_keep_alive(
299			origin: OriginFor<T>,
300			max_amount: BalanceOf<T>,
301			para_id: ParaId,
302		) -> DispatchResult {
303			let sender = ensure_signed(origin)?;
304			Pallet::<T>::do_place_order(
305				sender,
306				max_amount,
307				para_id,
308				KeepAlive,
309				PaymentType::Balance,
310			)
311		}
312
313		/// Create a single on demand core order with credits.
314		/// Will charge the owner's on-demand credit account the spot price for the current block.
315		///
316		/// Parameters:
317		/// - `origin`: The sender of the call, on-demand credits will be withdrawn from this
318		///   account.
319		/// - `max_amount`: The maximum number of credits to spend from the origin to place an
320		///   order.
321		/// - `para_id`: A `ParaId` the origin wants to provide blockspace for.
322		///
323		/// Errors:
324		/// - `InsufficientCredits`
325		/// - `QueueFull`
326		/// - `SpotPriceHigherThanMaxAmount`
327		///
328		/// Events:
329		/// - `OnDemandOrderPlaced`
330		#[pallet::call_index(2)]
331		#[pallet::weight(<T as Config>::WeightInfo::place_order_with_credits(QueueStatus::<T>::get().size()))]
332		pub fn place_order_with_credits(
333			origin: OriginFor<T>,
334			max_amount: BalanceOf<T>,
335			para_id: ParaId,
336		) -> DispatchResult {
337			let sender = ensure_signed(origin)?;
338			Pallet::<T>::do_place_order(
339				sender,
340				max_amount,
341				para_id,
342				KeepAlive,
343				PaymentType::Credits,
344			)
345		}
346	}
347}
348
349// Internal functions and interface to scheduler/wrapping assignment provider.
350impl<T: Config> Pallet<T>
351where
352	BalanceOf<T>: FixedPointOperand,
353{
354	/// Take the next queued entry that is available for a given core index.
355	///
356	/// Parameters:
357	/// - `core_index`: The core index
358	pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option<Assignment> {
359		let entry: Result<EnqueuedOrder, ()> = QueueStatus::<T>::try_mutate(|queue_status| {
360			AffinityEntries::<T>::try_mutate(core_index, |affinity_entries| {
361				let free_entry = FreeEntries::<T>::try_mutate(|free_entries| {
362					let affinity_next = affinity_entries.peek();
363					let free_next = free_entries.peek();
364					let pick_free = match (affinity_next, free_next) {
365						(None, _) => true,
366						(Some(_), None) => false,
367						(Some(a), Some(f)) => f < a,
368					};
369					if pick_free {
370						let entry = free_entries.pop().ok_or(())?;
371						let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) =
372							take(free_entries)
373								.into_iter()
374								.partition(|e| e.para_id == entry.para_id);
375						affinity_entries.append(&mut affinities);
376						*free_entries = free;
377						Ok(entry)
378					} else {
379						Err(())
380					}
381				});
382				let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?;
383				queue_status.consume_index(entry.idx);
384				Ok(entry)
385			})
386		});
387
388		let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?;
389
390		Pallet::<T>::increase_affinity(assignment.para_id(), core_index);
391		Some(assignment)
392	}
393
394	/// Report that an assignment was duplicated by the scheduler.
395	pub fn assignment_duplicated(para_id: ParaId, core_index: CoreIndex) {
396		Pallet::<T>::increase_affinity(para_id, core_index);
397	}
398
399	/// Report that the `para_id` & `core_index` combination was processed.
400	///
401	/// This should be called once it is clear that the assignment won't get pushed back anymore.
402	///
403	/// In other words for each `pop_assignment_for_core` a call to this function or
404	/// `push_back_assignment` must follow, but only one.
405	pub fn report_processed(para_id: ParaId, core_index: CoreIndex) {
406		Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
407	}
408
409	/// Push an assignment back to the front of the queue.
410	///
411	/// The assignment has not been processed yet. Typically used on session boundaries.
412	///
413	/// NOTE: We are not checking queue size here. So due to push backs it is possible that we
414	/// exceed the maximum queue size slightly.
415	///
416	/// Parameters:
417	/// - `para_id`: The para that did not make it.
418	/// - `core_index`: The core the para was scheduled on.
419	pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) {
420		Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
421		QueueStatus::<T>::mutate(|queue_status| {
422			Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front);
423		});
424	}
425
426	/// Adds credits to the specified account.
427	///
428	/// Parameters:
429	/// - `who`: Credit receiver.
430	/// - `amount`: The amount of new credits the account will receive.
431	pub fn credit_account(who: T::AccountId, amount: BalanceOf<T>) {
432		Credits::<T>::mutate(who.clone(), |credits| {
433			*credits = credits.saturating_add(amount);
434		});
435		Pallet::<T>::deposit_event(Event::<T>::AccountCredited { who, amount });
436	}
437
438	/// Helper function for `place_order_*` calls. Used to differentiate between placing orders
439	/// with a keep alive check or to allow the account to be reaped. The amount charged is
440	/// stored to the pallet account to be later paid out as revenue.
441	///
442	/// Parameters:
443	/// - `sender`: The sender of the call, funds will be withdrawn from this account.
444	/// - `max_amount`: The maximum balance to withdraw from the origin to place an order.
445	/// - `para_id`: A `ParaId` the origin wants to provide blockspace for.
446	/// - `existence_requirement`: Whether or not to ensure that the account will not be reaped.
447	/// - `payment_type`: Defines how the user wants to pay for on-demand.
448	///
449	/// Errors:
450	/// - `InsufficientBalance`: from the Currency implementation
451	/// - `QueueFull`
452	/// - `SpotPriceHigherThanMaxAmount`
453	///
454	/// Events:
455	/// - `OnDemandOrderPlaced`
456	fn do_place_order(
457		sender: <T as frame_system::Config>::AccountId,
458		max_amount: BalanceOf<T>,
459		para_id: ParaId,
460		existence_requirement: ExistenceRequirement,
461		payment_type: PaymentType,
462	) -> DispatchResult {
463		let config = configuration::ActiveConfig::<T>::get();
464
465		QueueStatus::<T>::mutate(|queue_status| {
466			Self::update_spot_traffic(&config, queue_status);
467			let traffic = queue_status.traffic;
468
469			// Calculate spot price
470			let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
471				config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
472			);
473
474			// Is the current price higher than `max_amount`
475			ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
476
477			ensure!(
478				queue_status.size() < config.scheduler_params.on_demand_queue_max_size,
479				Error::<T>::QueueFull
480			);
481
482			match payment_type {
483				PaymentType::Balance => {
484					// Charge the sending account the spot price. The amount will be teleported to
485					// the broker chain once it requests revenue information.
486					let amt = T::Currency::withdraw(
487						&sender,
488						spot_price,
489						WithdrawReasons::FEE,
490						existence_requirement,
491					)?;
492
493					// Consume the negative imbalance and deposit it into the pallet account. Make
494					// sure the account preserves even without the existential deposit.
495					let pot = Self::account_id();
496					if !System::<T>::account_exists(&pot) {
497						System::<T>::inc_providers(&pot);
498					}
499					T::Currency::resolve_creating(&pot, amt);
500				},
501				PaymentType::Credits => {
502					let credits = Credits::<T>::get(&sender);
503
504					// Charge the sending account the spot price in credits.
505					let new_credits_value =
506						credits.checked_sub(&spot_price).ok_or(Error::<T>::InsufficientCredits)?;
507
508					if new_credits_value.is_zero() {
509						Credits::<T>::remove(&sender);
510					} else {
511						Credits::<T>::insert(&sender, new_credits_value);
512					}
513				},
514			}
515
516			// Add the amount to the current block's (index 0) revenue information.
517			Revenue::<T>::mutate(|bounded_revenue| {
518				if let Some(current_block) = bounded_revenue.get_mut(0) {
519					*current_block = current_block.saturating_add(spot_price);
520				} else {
521					// Revenue has already been claimed in the same block, including the block
522					// itself. It shouldn't normally happen as revenue claims in the future are
523					// not allowed.
524					bounded_revenue.try_push(spot_price).defensive_ok();
525				}
526			});
527
528			Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
529			Pallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced {
530				para_id,
531				spot_price,
532				ordered_by: sender,
533			});
534
535			Ok(())
536		})
537	}
538
539	/// Calculate and update spot traffic.
540	fn update_spot_traffic(
541		config: &configuration::HostConfiguration<BlockNumberFor<T>>,
542		queue_status: &mut QueueStatusType,
543	) {
544		let old_traffic = queue_status.traffic;
545		match Self::calculate_spot_traffic(
546			old_traffic,
547			config.scheduler_params.on_demand_queue_max_size,
548			queue_status.size(),
549			config.scheduler_params.on_demand_target_queue_utilization,
550			config.scheduler_params.on_demand_fee_variability,
551		) {
552			Ok(new_traffic) => {
553				// Only update storage on change
554				if new_traffic != old_traffic {
555					queue_status.traffic = new_traffic;
556
557					// calculate the new spot price
558					let spot_price: BalanceOf<T> = new_traffic.saturating_mul_int(
559						config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
560					);
561
562					// emit the event for updated new price
563					Pallet::<T>::deposit_event(Event::<T>::SpotPriceSet { spot_price });
564				}
565			},
566			Err(err) => {
567				log::debug!(
568					target: LOG_TARGET,
569					"Error calculating spot traffic: {:?}", err
570				);
571			},
572		};
573	}
574
575	/// The spot price multiplier. This is based on the transaction fee calculations defined in:
576	/// https://research.web3.foundation/Polkadot/overview/token-economics#setting-transaction-fees
577	///
578	/// Parameters:
579	/// - `traffic`: The previously calculated multiplier, can never go below 1.0.
580	/// - `queue_capacity`: The max size of the order book.
581	/// - `queue_size`: How many orders are currently in the order book.
582	/// - `target_queue_utilisation`: How much of the queue_capacity should be ideally occupied,
583	///   expressed in percentages(perbill).
584	/// - `variability`: A variability factor, i.e. how quickly the spot price adjusts. This number
585	///   can be chosen by p/(k*(1-s)) where p is the desired ratio increase in spot price over k
586	///   number of blocks. s is the target_queue_utilisation. A concrete example: v =
587	///   0.05/(20*(1-0.25)) = 0.0033.
588	///
589	/// Returns:
590	/// - A `FixedU128` in the range of  `Config::TrafficDefaultValue` - `FixedU128::MAX` on
591	///   success.
592	///
593	/// Errors:
594	/// - `SpotTrafficCalculationErr::QueueCapacityIsZero`
595	/// - `SpotTrafficCalculationErr::QueueSizeLargerThanCapacity`
596	/// - `SpotTrafficCalculationErr::Division`
597	fn calculate_spot_traffic(
598		traffic: FixedU128,
599		queue_capacity: u32,
600		queue_size: u32,
601		target_queue_utilisation: Perbill,
602		variability: Perbill,
603	) -> Result<FixedU128, SpotTrafficCalculationErr> {
604		// Return early if queue has no capacity.
605		if queue_capacity == 0 {
606			return Err(SpotTrafficCalculationErr::QueueCapacityIsZero)
607		}
608
609		// Return early if queue size is greater than capacity.
610		if queue_size > queue_capacity {
611			return Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity)
612		}
613
614		// (queue_size / queue_capacity) - target_queue_utilisation
615		let queue_util_ratio = FixedU128::from_rational(queue_size.into(), queue_capacity.into());
616		let positive = queue_util_ratio >= target_queue_utilisation.into();
617		let queue_util_diff = queue_util_ratio.max(target_queue_utilisation.into()) -
618			queue_util_ratio.min(target_queue_utilisation.into());
619
620		// variability * queue_util_diff
621		let var_times_qud = queue_util_diff.saturating_mul(variability.into());
622
623		// variability^2 * queue_util_diff^2
624		let var_times_qud_pow = var_times_qud.saturating_mul(var_times_qud);
625
626		// (variability^2 * queue_util_diff^2)/2
627		let div_by_two: FixedU128;
628		match var_times_qud_pow.const_checked_div(2.into()) {
629			Some(dbt) => div_by_two = dbt,
630			None => return Err(SpotTrafficCalculationErr::Division),
631		}
632
633		// traffic * (1 + queue_util_diff) + div_by_two
634		if positive {
635			let new_traffic = queue_util_diff
636				.saturating_add(div_by_two)
637				.saturating_add(One::one())
638				.saturating_mul(traffic);
639			Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
640		} else {
641			let new_traffic = queue_util_diff.saturating_sub(div_by_two).saturating_mul(traffic);
642			Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
643		}
644	}
645
646	/// Adds an order to the on demand queue.
647	///
648	/// Parameters:
649	/// - `location`: Whether to push this entry to the back or the front of the queue. Pushing an
650	///   entry to the front of the queue is only used when the scheduler wants to push back an
651	///   entry it has already popped.
652	fn add_on_demand_order(
653		queue_status: &mut QueueStatusType,
654		para_id: ParaId,
655		location: QueuePushDirection,
656	) {
657		let idx = match location {
658			QueuePushDirection::Back => queue_status.push_back(),
659			QueuePushDirection::Front => queue_status.push_front(),
660		};
661
662		let affinity = ParaIdAffinity::<T>::get(para_id);
663		let order = EnqueuedOrder::new(idx, para_id);
664		#[cfg(test)]
665		log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location);
666
667		match affinity {
668			None => FreeEntries::<T>::mutate(|entries| entries.push(order)),
669			Some(affinity) =>
670				AffinityEntries::<T>::mutate(affinity.core_index, |entries| entries.push(order)),
671		}
672	}
673
674	/// Decrease core affinity for para and update queue
675	///
676	/// if affinity dropped to 0, moving entries back to `FreeEntries`.
677	fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) {
678		let affinity = Pallet::<T>::decrease_affinity(para_id, core_index);
679		#[cfg(not(test))]
680		debug_assert_ne!(
681			affinity, None,
682			"Decreased affinity for a para that has not been served on a core?"
683		);
684		if affinity != Some(0) {
685			return;
686		}
687		// No affinity more for entries on this core, free any entries:
688		//
689		// This is necessary to ensure them being served as the core might no longer exist at all.
690		AffinityEntries::<T>::mutate(core_index, |affinity_entries| {
691			FreeEntries::<T>::mutate(|free_entries| {
692				let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) =
693					take(affinity_entries).into_iter().partition(|e| e.para_id == para_id);
694				free_entries.append(&mut freed);
695				*affinity_entries = affinities;
696			})
697		});
698	}
699
700	/// Decreases the affinity of a `ParaId` to a specified `CoreIndex`.
701	///
702	/// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_index
703	/// matches. When the count reaches 0, the entry is removed.
704	/// A non-existent entry is a no-op.
705	///
706	/// Returns: The new affinity of the para on that core. `None` if there is no affinity on this
707	/// core.
708	fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option<u32> {
709		ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| {
710			let affinity = maybe_affinity.as_mut()?;
711			if affinity.core_index == core_index {
712				let new_count = affinity.count.saturating_sub(1);
713				if new_count > 0 {
714					*maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count });
715				} else {
716					*maybe_affinity = None;
717				}
718				return Some(new_count);
719			} else {
720				None
721			}
722		})
723	}
724
725	/// Increases the affinity of a `ParaId` to a specified `CoreIndex`.
726	/// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_index
727	/// matches. A non-existent entry will be initialized with a count of 1 and uses the supplied
728	/// `CoreIndex`.
729	fn increase_affinity(para_id: ParaId, core_index: CoreIndex) {
730		ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| match maybe_affinity {
731			Some(affinity) =>
732				if affinity.core_index == core_index {
733					*maybe_affinity = Some(CoreAffinityCount {
734						core_index,
735						count: affinity.count.saturating_add(1),
736					});
737				},
738			None => {
739				*maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 });
740			},
741		})
742	}
743
744	/// Collect the revenue from the `when` blockheight
745	pub fn claim_revenue_until(when: BlockNumberFor<T>) -> BalanceOf<T> {
746		let now = <frame_system::Pallet<T>>::block_number();
747		let mut amount: BalanceOf<T> = BalanceOf::<T>::zero();
748		Revenue::<T>::mutate(|revenue| {
749			while !revenue.is_empty() {
750				let index = (revenue.len() - 1) as u32;
751				if when > now.saturating_sub(index.into()) {
752					amount = amount.saturating_add(revenue.pop().defensive_unwrap_or(0u32.into()));
753				} else {
754					break
755				}
756			}
757		});
758
759		amount
760	}
761
762	/// Account of the pallet pot, where the funds from instantaneous coretime sale are accumulated.
763	pub fn account_id() -> T::AccountId {
764		T::PalletId::get().into_account_truncating()
765	}
766
767	/// Getter for the affinity tracker.
768	#[cfg(test)]
769	fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
770		ParaIdAffinity::<T>::get(para_id)
771	}
772
773	/// Getter for the affinity entries.
774	#[cfg(test)]
775	fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> {
776		AffinityEntries::<T>::get(core_index)
777	}
778
779	/// Getter for the free entries.
780	#[cfg(test)]
781	fn get_free_entries() -> BinaryHeap<EnqueuedOrder> {
782		FreeEntries::<T>::get()
783	}
784
785	#[cfg(feature = "runtime-benchmarks")]
786	pub fn populate_queue(para_id: ParaId, num: u32) {
787		QueueStatus::<T>::mutate(|queue_status| {
788			for _ in 0..num {
789				Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
790			}
791		});
792	}
793
794	#[cfg(test)]
795	fn set_queue_status(new_status: QueueStatusType) {
796		QueueStatus::<T>::set(new_status);
797	}
798
799	#[cfg(test)]
800	fn get_queue_status() -> QueueStatusType {
801		QueueStatus::<T>::get()
802	}
803
804	#[cfg(test)]
805	fn get_traffic_default_value() -> FixedU128 {
806		<T as Config>::TrafficDefaultValue::get()
807	}
808
809	#[cfg(test)]
810	fn get_revenue() -> Vec<BalanceOf<T>> {
811		Revenue::<T>::get().to_vec()
812	}
813}