polkadot_runtime_parachains/on_demand/
mod.rs1use core::mem;
30
31use sp_runtime::traits::Zero;
32mod benchmarking;
33pub mod migration;
34
35extern crate alloc;
36
37use crate::{configuration, paras};
38use alloc::{collections::BTreeSet, vec::Vec};
39use frame_support::{
40 pallet_prelude::*,
41 traits::{
42 defensive_prelude::*,
43 Currency,
44 ExistenceRequirement::{self, AllowDeath, KeepAlive},
45 WithdrawReasons,
46 },
47 PalletId,
48};
49use frame_system::{pallet_prelude::*, Pallet as System};
50use polkadot_primitives::{Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE};
51use sp_runtime::{
52 traits::{AccountIdConversion, One, SaturatedConversion},
53 FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
54};
55
56pub use pallet::*;
57
58mod mock_helpers;
59#[cfg(test)]
60mod tests;
61
62const LOG_TARGET: &str = "runtime::parachains::on-demand";
63
64pub trait WeightInfo {
65 fn place_order_allow_death() -> Weight;
66 fn place_order_keep_alive() -> Weight;
67 fn place_order_with_credits() -> Weight;
68}
69
70pub struct TestWeightInfo;
72
73impl WeightInfo for TestWeightInfo {
74 fn place_order_allow_death() -> Weight {
75 Weight::MAX
76 }
77
78 fn place_order_keep_alive() -> Weight {
79 Weight::MAX
80 }
81
82 fn place_order_with_credits() -> Weight {
83 Weight::MAX
84 }
85}
86
87#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
89enum PaymentType {
90 Credits,
92 Balance,
94}
95
96pub type BalanceOf<T> =
98 <<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance;
99
100#[derive(Encode, Decode, TypeInfo)]
102pub struct OrderQueue<N> {
103 queue: BoundedVec<EnqueuedOrder<N>, ConstU32<ON_DEMAND_MAX_QUEUE_MAX_SIZE>>,
104}
105
106impl<N> OrderQueue<N> {
107 pub fn pop_assignment_for_cores<T: Config>(
109 &mut self,
110 now: N,
111 mut num_cores: u32,
112 ) -> impl Iterator<Item = ParaId>
113 where
114 N: Saturating + Ord + One + Copy,
115 {
116 let mut popped = BTreeSet::new();
117 let mut remaining_orders = Vec::with_capacity(self.queue.len());
118 for order in mem::take(&mut self.queue) {
119 let ready_at = order.ordered_at.saturating_plus_one().saturating_plus_one();
121 let is_ready = ready_at <= now;
122
123 if num_cores > 0 && is_ready && popped.insert(order.para_id) {
124 num_cores -= 1;
125 } else {
126 remaining_orders.push(order);
127 }
128 }
129 self.queue = BoundedVec::truncate_from(remaining_orders);
130 popped.into_iter()
131 }
132
133 fn new() -> Self {
134 OrderQueue { queue: BoundedVec::new() }
135 }
136
137 fn try_push(&mut self, now: N, para_id: ParaId) -> Result<(), ParaId> {
141 self.queue
142 .try_push(EnqueuedOrder { para_id, ordered_at: now })
143 .map_err(|o| o.para_id)
144 }
145
146 fn len(&self) -> usize {
147 self.queue.len()
148 }
149}
150
151#[derive(Encode, Decode, TypeInfo)]
153struct EnqueuedOrder<N> {
154 para_id: ParaId,
156 ordered_at: N,
158}
159
160#[derive(Encode, Decode, TypeInfo)]
162struct OrderStatus<N> {
163 traffic: FixedU128,
165
166 queue: OrderQueue<N>,
168}
169
170impl<N> Default for OrderStatus<N> {
171 fn default() -> OrderStatus<N> {
172 OrderStatus { traffic: FixedU128::default(), queue: OrderQueue::new() }
173 }
174}
175
176#[derive(PartialEq, Debug)]
178pub enum SpotTrafficCalculationErr {
179 QueueCapacityIsZero,
181 QueueSizeLargerThanCapacity,
183 Division,
185}
186
187#[frame_support::pallet]
188pub mod pallet {
189
190 use super::*;
191 use polkadot_primitives::Id as ParaId;
192
193 const STORAGE_VERSION: StorageVersion = StorageVersion::new(2);
194
195 #[pallet::pallet]
196 #[pallet::without_storage_info]
197 #[pallet::storage_version(STORAGE_VERSION)]
198 pub struct Pallet<T>(_);
199
200 #[pallet::config]
201 pub trait Config: frame_system::Config + configuration::Config + paras::Config {
202 #[allow(deprecated)]
204 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
205
206 type Currency: Currency<Self::AccountId>;
208
209 type WeightInfo: WeightInfo;
211
212 #[pallet::constant]
214 type TrafficDefaultValue: Get<FixedU128>;
215
216 #[pallet::constant]
219 type MaxHistoricalRevenue: Get<u32>;
220
221 #[pallet::constant]
223 type PalletId: Get<PalletId>;
224 }
225
226 #[pallet::storage]
228 pub(super) type OrderStatus<T: Config> =
229 StorageValue<_, super::OrderStatus<BlockNumberFor<T>>, ValueQuery>;
230
231 #[pallet::storage]
233 pub(super) type Revenue<T: Config> =
234 StorageValue<_, BoundedVec<BalanceOf<T>, T::MaxHistoricalRevenue>, ValueQuery>;
235
236 #[pallet::storage]
238 pub type Credits<T: Config> =
239 StorageMap<_, Blake2_128Concat, T::AccountId, BalanceOf<T>, ValueQuery>;
240
241 #[pallet::event]
242 #[pallet::generate_deposit(pub(super) fn deposit_event)]
243 pub enum Event<T: Config> {
244 OnDemandOrderPlaced { para_id: ParaId, spot_price: BalanceOf<T>, ordered_by: T::AccountId },
246 SpotPriceSet { spot_price: BalanceOf<T> },
248 AccountCredited { who: T::AccountId, amount: BalanceOf<T> },
250 }
251
252 #[pallet::error]
253 pub enum Error<T> {
254 QueueFull,
256 SpotPriceHigherThanMaxAmount,
259 InsufficientCredits,
261 }
262
263 #[pallet::hooks]
264 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
265 fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
266 Revenue::<T>::mutate(|revenue| {
268 if let Some(overdue) =
269 revenue.force_insert_keep_left(0, 0u32.into()).defensive_unwrap_or(None)
270 {
271 if let Some(last) = revenue.last_mut() {
274 *last = last.saturating_add(overdue);
275 }
276 }
277 });
278
279 let config = configuration::ActiveConfig::<T>::get();
280 OrderStatus::<T>::mutate(|order_status| {
283 Self::update_spot_traffic(&config, order_status);
284 });
285
286 T::DbWeight::get().reads_writes(3, 2)
289 }
290 }
291
292 #[pallet::call]
293 impl<T: Config> Pallet<T> {
294 #[pallet::call_index(0)]
310 #[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death())]
311 #[allow(deprecated)]
312 #[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
313 pub fn place_order_allow_death(
314 origin: OriginFor<T>,
315 max_amount: BalanceOf<T>,
316 para_id: ParaId,
317 ) -> DispatchResult {
318 let sender = ensure_signed(origin)?;
319 Pallet::<T>::do_place_order(
320 sender,
321 max_amount,
322 para_id,
323 AllowDeath,
324 PaymentType::Balance,
325 )
326 }
327
328 #[pallet::call_index(1)]
344 #[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive())]
345 #[allow(deprecated)]
346 #[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
347 pub fn place_order_keep_alive(
348 origin: OriginFor<T>,
349 max_amount: BalanceOf<T>,
350 para_id: ParaId,
351 ) -> DispatchResult {
352 let sender = ensure_signed(origin)?;
353 Pallet::<T>::do_place_order(
354 sender,
355 max_amount,
356 para_id,
357 KeepAlive,
358 PaymentType::Balance,
359 )
360 }
361
362 #[pallet::call_index(2)]
380 #[pallet::weight(<T as Config>::WeightInfo::place_order_with_credits())]
381 pub fn place_order_with_credits(
382 origin: OriginFor<T>,
383 max_amount: BalanceOf<T>,
384 para_id: ParaId,
385 ) -> DispatchResult {
386 let sender = ensure_signed(origin)?;
387 Pallet::<T>::do_place_order(
388 sender,
389 max_amount,
390 para_id,
391 KeepAlive,
392 PaymentType::Credits,
393 )
394 }
395 }
396}
397
398impl<T: Config> Pallet<T>
400where
401 BalanceOf<T>: FixedPointOperand,
402{
403 pub fn pop_assignment_for_cores(
405 now: BlockNumberFor<T>,
406 num_cores: u32,
407 ) -> impl Iterator<Item = ParaId> {
408 pallet::OrderStatus::<T>::mutate(|order_status| {
409 order_status.queue.pop_assignment_for_cores::<T>(now, num_cores)
410 })
411 }
412
413 pub fn peek_order_queue() -> OrderQueue<BlockNumberFor<T>> {
423 pallet::OrderStatus::<T>::get().queue
424 }
425
426 pub fn push_back_order(para_id: ParaId) {
433 pallet::OrderStatus::<T>::mutate(|order_status| {
434 let now = <frame_system::Pallet<T>>::block_number();
435 if let Err(e) = order_status.queue.try_push(now, para_id) {
436 log::debug!(target: LOG_TARGET, "Pushing back order failed (queue too long): {:?}", e);
437 };
438 });
439 }
440
441 pub fn credit_account(who: T::AccountId, amount: BalanceOf<T>) {
447 Credits::<T>::mutate(who.clone(), |credits| {
448 *credits = credits.saturating_add(amount);
449 });
450 Pallet::<T>::deposit_event(Event::<T>::AccountCredited { who, amount });
451 }
452
453 fn do_place_order(
472 sender: <T as frame_system::Config>::AccountId,
473 max_amount: BalanceOf<T>,
474 para_id: ParaId,
475 existence_requirement: ExistenceRequirement,
476 payment_type: PaymentType,
477 ) -> DispatchResult {
478 let config = configuration::ActiveConfig::<T>::get();
479
480 pallet::OrderStatus::<T>::mutate(|order_status| {
481 Self::update_spot_traffic(&config, order_status);
482 let traffic = order_status.traffic;
483
484 let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
486 config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
487 );
488
489 ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
491
492 ensure!(
493 order_status.queue.len() <
494 config.scheduler_params.on_demand_queue_max_size as usize,
495 Error::<T>::QueueFull
496 );
497
498 match payment_type {
499 PaymentType::Balance => {
500 let amt = T::Currency::withdraw(
503 &sender,
504 spot_price,
505 WithdrawReasons::FEE,
506 existence_requirement,
507 )?;
508
509 let pot = Self::account_id();
512 if !System::<T>::account_exists(&pot) {
513 System::<T>::inc_providers(&pot);
514 }
515 T::Currency::resolve_creating(&pot, amt);
516 },
517 PaymentType::Credits => {
518 let credits = Credits::<T>::get(&sender);
519
520 let new_credits_value =
522 credits.checked_sub(&spot_price).ok_or(Error::<T>::InsufficientCredits)?;
523
524 if new_credits_value.is_zero() {
525 Credits::<T>::remove(&sender);
526 } else {
527 Credits::<T>::insert(&sender, new_credits_value);
528 }
529 },
530 }
531
532 Revenue::<T>::mutate(|bounded_revenue| {
534 if let Some(current_block) = bounded_revenue.get_mut(0) {
535 *current_block = current_block.saturating_add(spot_price);
536 } else {
537 bounded_revenue.try_push(spot_price).defensive_ok();
541 }
542 });
543
544 let now = <frame_system::Pallet<T>>::block_number();
545 order_status
546 .queue
547 .try_push(now, para_id)
548 .defensive_map_err(|_| Error::<T>::QueueFull)?;
549
550 Pallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced {
551 para_id,
552 spot_price,
553 ordered_by: sender,
554 });
555
556 Ok(())
557 })
558 }
559
560 fn update_spot_traffic(
562 config: &configuration::HostConfiguration<BlockNumberFor<T>>,
563 order_status: &mut OrderStatus<BlockNumberFor<T>>,
564 ) {
565 let old_traffic = order_status.traffic;
566 match Self::calculate_spot_traffic(
567 old_traffic,
568 config.scheduler_params.on_demand_queue_max_size,
569 order_status.queue.len() as u32,
570 config.scheduler_params.on_demand_target_queue_utilization,
571 config.scheduler_params.on_demand_fee_variability,
572 ) {
573 Ok(new_traffic) => {
574 if new_traffic != old_traffic {
576 order_status.traffic = new_traffic;
577
578 let spot_price: BalanceOf<T> = new_traffic.saturating_mul_int(
580 config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
581 );
582
583 Pallet::<T>::deposit_event(Event::<T>::SpotPriceSet { spot_price });
585 }
586 },
587 Err(err) => {
588 log::debug!(
589 target: LOG_TARGET,
590 "Error calculating spot traffic: {:?}", err
591 );
592 },
593 };
594 }
595
596 fn calculate_spot_traffic(
619 traffic: FixedU128,
620 queue_capacity: u32,
621 queue_size: u32,
622 target_queue_utilisation: Perbill,
623 variability: Perbill,
624 ) -> Result<FixedU128, SpotTrafficCalculationErr> {
625 if queue_capacity == 0 {
627 return Err(SpotTrafficCalculationErr::QueueCapacityIsZero);
628 }
629
630 if queue_size > queue_capacity {
632 return Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity);
633 }
634
635 let queue_util_ratio = FixedU128::from_rational(queue_size.into(), queue_capacity.into());
637 let positive = queue_util_ratio >= target_queue_utilisation.into();
638 let queue_util_diff = queue_util_ratio.max(target_queue_utilisation.into()) -
639 queue_util_ratio.min(target_queue_utilisation.into());
640
641 let var_times_qud = queue_util_diff.saturating_mul(variability.into());
643
644 let var_times_qud_pow = var_times_qud.saturating_mul(var_times_qud);
646
647 let div_by_two: FixedU128;
649 match var_times_qud_pow.const_checked_div(2.into()) {
650 Some(dbt) => div_by_two = dbt,
651 None => return Err(SpotTrafficCalculationErr::Division),
652 }
653
654 if positive {
656 let new_traffic = queue_util_diff
657 .saturating_add(div_by_two)
658 .saturating_add(One::one())
659 .saturating_mul(traffic);
660 Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
661 } else {
662 let new_traffic = queue_util_diff.saturating_sub(div_by_two).saturating_mul(traffic);
663 Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
664 }
665 }
666
667 pub fn claim_revenue_until(when: BlockNumberFor<T>) -> BalanceOf<T> {
669 let now = <frame_system::Pallet<T>>::block_number();
670 let mut amount: BalanceOf<T> = BalanceOf::<T>::zero();
671 Revenue::<T>::mutate(|revenue| {
672 while !revenue.is_empty() {
673 let index = (revenue.len() - 1) as u32;
674 if when > now.saturating_sub(index.into()) {
675 amount = amount.saturating_add(revenue.pop().defensive_unwrap_or(0u32.into()));
676 } else {
677 break;
678 }
679 }
680 });
681
682 amount
683 }
684
685 pub fn account_id() -> T::AccountId {
687 T::PalletId::get().into_account_truncating()
688 }
689
690 #[cfg(feature = "runtime-benchmarks")]
691 pub fn populate_queue(para_id: ParaId, num: u32) {
692 let now = <frame_system::Pallet<T>>::block_number();
693 pallet::OrderStatus::<T>::mutate(|order_status| {
694 for _ in 0..num {
695 order_status.queue.try_push(now, para_id).unwrap();
696 }
697 });
698 }
699
700 #[cfg(feature = "runtime-benchmarks")]
701 pub(crate) fn set_revenue(rev: BoundedVec<BalanceOf<T>, T::MaxHistoricalRevenue>) {
702 Revenue::<T>::put(rev);
703 }
704
705 #[cfg(test)]
706 fn set_order_status(new_status: OrderStatus<BlockNumberFor<T>>) {
707 pallet::OrderStatus::<T>::set(new_status);
708 }
709
710 #[cfg(test)]
711 fn get_order_status() -> OrderStatus<BlockNumberFor<T>> {
712 pallet::OrderStatus::<T>::get()
713 }
714
715 #[cfg(test)]
716 fn get_traffic_default_value() -> FixedU128 {
717 <T as Config>::TrafficDefaultValue::get()
718 }
719
720 #[cfg(test)]
721 fn get_revenue() -> Vec<BalanceOf<T>> {
722 Revenue::<T>::get().to_vec()
723 }
724}