1use sp_runtime::traits::Zero;
35mod benchmarking;
36pub mod migration;
37mod mock_helpers;
38mod types;
39
40extern crate alloc;
41
42#[cfg(test)]
43mod tests;
44
45use crate::{configuration, paras, scheduler::common::Assignment};
46use alloc::collections::BinaryHeap;
47use core::mem::take;
48use frame_support::{
49 pallet_prelude::*,
50 traits::{
51 defensive_prelude::*,
52 Currency,
53 ExistenceRequirement::{self, AllowDeath, KeepAlive},
54 WithdrawReasons,
55 },
56 PalletId,
57};
58use frame_system::{pallet_prelude::*, Pallet as System};
59use polkadot_primitives::{CoreIndex, Id as ParaId};
60use sp_runtime::{
61 traits::{AccountIdConversion, One, SaturatedConversion},
62 FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
63};
64use types::{
65 BalanceOf, CoreAffinityCount, EnqueuedOrder, QueuePushDirection, QueueStatusType,
66 SpotTrafficCalculationErr,
67};
68
69const LOG_TARGET: &str = "runtime::parachains::on-demand";
70
71pub use pallet::*;
72
73pub trait WeightInfo {
74 fn place_order_allow_death(s: u32) -> Weight;
75 fn place_order_keep_alive(s: u32) -> Weight;
76 fn place_order_with_credits(s: u32) -> Weight;
77}
78
79pub struct TestWeightInfo;
81
82impl WeightInfo for TestWeightInfo {
83 fn place_order_allow_death(_: u32) -> Weight {
84 Weight::MAX
85 }
86
87 fn place_order_keep_alive(_: u32) -> Weight {
88 Weight::MAX
89 }
90
91 fn place_order_with_credits(_: u32) -> Weight {
92 Weight::MAX
93 }
94}
95
96#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
98enum PaymentType {
99 Credits,
101 Balance,
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107
108 use super::*;
109
110 const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
111
112 #[pallet::pallet]
113 #[pallet::without_storage_info]
114 #[pallet::storage_version(STORAGE_VERSION)]
115 pub struct Pallet<T>(_);
116
117 #[pallet::config]
118 pub trait Config: frame_system::Config + configuration::Config + paras::Config {
119 #[allow(deprecated)]
121 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
122
123 type Currency: Currency<Self::AccountId>;
125
126 type WeightInfo: WeightInfo;
128
129 #[pallet::constant]
131 type TrafficDefaultValue: Get<FixedU128>;
132
133 #[pallet::constant]
136 type MaxHistoricalRevenue: Get<u32>;
137
138 #[pallet::constant]
140 type PalletId: Get<PalletId>;
141 }
142
143 #[pallet::type_value]
145 pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType {
146 QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() }
147 }
148
149 #[pallet::type_value]
150 pub(super) fn EntriesOnEmpty<T: Config>() -> BinaryHeap<EnqueuedOrder> {
151 BinaryHeap::new()
152 }
153
154 #[pallet::storage]
158 pub(super) type ParaIdAffinity<T: Config> =
159 StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>;
160
161 #[pallet::storage]
163 pub(super) type QueueStatus<T: Config> =
164 StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty<T>>;
165
166 #[pallet::storage]
168 pub(super) type FreeEntries<T: Config> =
169 StorageValue<_, BinaryHeap<EnqueuedOrder>, ValueQuery, EntriesOnEmpty<T>>;
170
171 #[pallet::storage]
173 pub(super) type AffinityEntries<T: Config> = StorageMap<
174 _,
175 Twox64Concat,
176 CoreIndex,
177 BinaryHeap<EnqueuedOrder>,
178 ValueQuery,
179 EntriesOnEmpty<T>,
180 >;
181
182 #[pallet::storage]
184 pub type Revenue<T: Config> =
185 StorageValue<_, BoundedVec<BalanceOf<T>, T::MaxHistoricalRevenue>, ValueQuery>;
186
187 #[pallet::storage]
189 pub type Credits<T: Config> =
190 StorageMap<_, Blake2_128Concat, T::AccountId, BalanceOf<T>, ValueQuery>;
191
192 #[pallet::event]
193 #[pallet::generate_deposit(pub(super) fn deposit_event)]
194 pub enum Event<T: Config> {
195 OnDemandOrderPlaced { para_id: ParaId, spot_price: BalanceOf<T>, ordered_by: T::AccountId },
197 SpotPriceSet { spot_price: BalanceOf<T> },
199 AccountCredited { who: T::AccountId, amount: BalanceOf<T> },
201 }
202
203 #[pallet::error]
204 pub enum Error<T> {
205 QueueFull,
207 SpotPriceHigherThanMaxAmount,
210 InsufficientCredits,
212 }
213
214 #[pallet::hooks]
215 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
216 fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
217 Revenue::<T>::mutate(|revenue| {
219 if let Some(overdue) =
220 revenue.force_insert_keep_left(0, 0u32.into()).defensive_unwrap_or(None)
221 {
222 if let Some(last) = revenue.last_mut() {
225 *last = last.saturating_add(overdue);
226 }
227 }
228 });
229
230 let config = configuration::ActiveConfig::<T>::get();
231 QueueStatus::<T>::mutate(|queue_status| {
234 Self::update_spot_traffic(&config, queue_status);
235 });
236
237 T::DbWeight::get().reads_writes(3, 2)
240 }
241 }
242
243 #[pallet::call]
244 impl<T: Config> Pallet<T> {
245 #[pallet::call_index(0)]
261 #[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(QueueStatus::<T>::get().size()))]
262 #[allow(deprecated)]
263 #[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
264 pub fn place_order_allow_death(
265 origin: OriginFor<T>,
266 max_amount: BalanceOf<T>,
267 para_id: ParaId,
268 ) -> DispatchResult {
269 let sender = ensure_signed(origin)?;
270 Pallet::<T>::do_place_order(
271 sender,
272 max_amount,
273 para_id,
274 AllowDeath,
275 PaymentType::Balance,
276 )
277 }
278
279 #[pallet::call_index(1)]
295 #[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(QueueStatus::<T>::get().size()))]
296 #[allow(deprecated)]
297 #[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
298 pub fn place_order_keep_alive(
299 origin: OriginFor<T>,
300 max_amount: BalanceOf<T>,
301 para_id: ParaId,
302 ) -> DispatchResult {
303 let sender = ensure_signed(origin)?;
304 Pallet::<T>::do_place_order(
305 sender,
306 max_amount,
307 para_id,
308 KeepAlive,
309 PaymentType::Balance,
310 )
311 }
312
313 #[pallet::call_index(2)]
331 #[pallet::weight(<T as Config>::WeightInfo::place_order_with_credits(QueueStatus::<T>::get().size()))]
332 pub fn place_order_with_credits(
333 origin: OriginFor<T>,
334 max_amount: BalanceOf<T>,
335 para_id: ParaId,
336 ) -> DispatchResult {
337 let sender = ensure_signed(origin)?;
338 Pallet::<T>::do_place_order(
339 sender,
340 max_amount,
341 para_id,
342 KeepAlive,
343 PaymentType::Credits,
344 )
345 }
346 }
347}
348
349impl<T: Config> Pallet<T>
351where
352 BalanceOf<T>: FixedPointOperand,
353{
354 pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option<Assignment> {
359 let entry: Result<EnqueuedOrder, ()> = QueueStatus::<T>::try_mutate(|queue_status| {
360 AffinityEntries::<T>::try_mutate(core_index, |affinity_entries| {
361 let free_entry = FreeEntries::<T>::try_mutate(|free_entries| {
362 let affinity_next = affinity_entries.peek();
363 let free_next = free_entries.peek();
364 let pick_free = match (affinity_next, free_next) {
365 (None, _) => true,
366 (Some(_), None) => false,
367 (Some(a), Some(f)) => f < a,
368 };
369 if pick_free {
370 let entry = free_entries.pop().ok_or(())?;
371 let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) =
372 take(free_entries)
373 .into_iter()
374 .partition(|e| e.para_id == entry.para_id);
375 affinity_entries.append(&mut affinities);
376 *free_entries = free;
377 Ok(entry)
378 } else {
379 Err(())
380 }
381 });
382 let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?;
383 queue_status.consume_index(entry.idx);
384 Ok(entry)
385 })
386 });
387
388 let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?;
389
390 Pallet::<T>::increase_affinity(assignment.para_id(), core_index);
391 Some(assignment)
392 }
393
394 pub fn assignment_duplicated(para_id: ParaId, core_index: CoreIndex) {
396 Pallet::<T>::increase_affinity(para_id, core_index);
397 }
398
399 pub fn report_processed(para_id: ParaId, core_index: CoreIndex) {
406 Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
407 }
408
409 pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) {
420 Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
421 QueueStatus::<T>::mutate(|queue_status| {
422 Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front);
423 });
424 }
425
426 pub fn credit_account(who: T::AccountId, amount: BalanceOf<T>) {
432 Credits::<T>::mutate(who.clone(), |credits| {
433 *credits = credits.saturating_add(amount);
434 });
435 Pallet::<T>::deposit_event(Event::<T>::AccountCredited { who, amount });
436 }
437
438 fn do_place_order(
457 sender: <T as frame_system::Config>::AccountId,
458 max_amount: BalanceOf<T>,
459 para_id: ParaId,
460 existence_requirement: ExistenceRequirement,
461 payment_type: PaymentType,
462 ) -> DispatchResult {
463 let config = configuration::ActiveConfig::<T>::get();
464
465 QueueStatus::<T>::mutate(|queue_status| {
466 Self::update_spot_traffic(&config, queue_status);
467 let traffic = queue_status.traffic;
468
469 let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
471 config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
472 );
473
474 ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
476
477 ensure!(
478 queue_status.size() < config.scheduler_params.on_demand_queue_max_size,
479 Error::<T>::QueueFull
480 );
481
482 match payment_type {
483 PaymentType::Balance => {
484 let amt = T::Currency::withdraw(
487 &sender,
488 spot_price,
489 WithdrawReasons::FEE,
490 existence_requirement,
491 )?;
492
493 let pot = Self::account_id();
496 if !System::<T>::account_exists(&pot) {
497 System::<T>::inc_providers(&pot);
498 }
499 T::Currency::resolve_creating(&pot, amt);
500 },
501 PaymentType::Credits => {
502 let credits = Credits::<T>::get(&sender);
503
504 let new_credits_value =
506 credits.checked_sub(&spot_price).ok_or(Error::<T>::InsufficientCredits)?;
507
508 if new_credits_value.is_zero() {
509 Credits::<T>::remove(&sender);
510 } else {
511 Credits::<T>::insert(&sender, new_credits_value);
512 }
513 },
514 }
515
516 Revenue::<T>::mutate(|bounded_revenue| {
518 if let Some(current_block) = bounded_revenue.get_mut(0) {
519 *current_block = current_block.saturating_add(spot_price);
520 } else {
521 bounded_revenue.try_push(spot_price).defensive_ok();
525 }
526 });
527
528 Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
529 Pallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced {
530 para_id,
531 spot_price,
532 ordered_by: sender,
533 });
534
535 Ok(())
536 })
537 }
538
539 fn update_spot_traffic(
541 config: &configuration::HostConfiguration<BlockNumberFor<T>>,
542 queue_status: &mut QueueStatusType,
543 ) {
544 let old_traffic = queue_status.traffic;
545 match Self::calculate_spot_traffic(
546 old_traffic,
547 config.scheduler_params.on_demand_queue_max_size,
548 queue_status.size(),
549 config.scheduler_params.on_demand_target_queue_utilization,
550 config.scheduler_params.on_demand_fee_variability,
551 ) {
552 Ok(new_traffic) => {
553 if new_traffic != old_traffic {
555 queue_status.traffic = new_traffic;
556
557 let spot_price: BalanceOf<T> = new_traffic.saturating_mul_int(
559 config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
560 );
561
562 Pallet::<T>::deposit_event(Event::<T>::SpotPriceSet { spot_price });
564 }
565 },
566 Err(err) => {
567 log::debug!(
568 target: LOG_TARGET,
569 "Error calculating spot traffic: {:?}", err
570 );
571 },
572 };
573 }
574
575 fn calculate_spot_traffic(
598 traffic: FixedU128,
599 queue_capacity: u32,
600 queue_size: u32,
601 target_queue_utilisation: Perbill,
602 variability: Perbill,
603 ) -> Result<FixedU128, SpotTrafficCalculationErr> {
604 if queue_capacity == 0 {
606 return Err(SpotTrafficCalculationErr::QueueCapacityIsZero)
607 }
608
609 if queue_size > queue_capacity {
611 return Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity)
612 }
613
614 let queue_util_ratio = FixedU128::from_rational(queue_size.into(), queue_capacity.into());
616 let positive = queue_util_ratio >= target_queue_utilisation.into();
617 let queue_util_diff = queue_util_ratio.max(target_queue_utilisation.into()) -
618 queue_util_ratio.min(target_queue_utilisation.into());
619
620 let var_times_qud = queue_util_diff.saturating_mul(variability.into());
622
623 let var_times_qud_pow = var_times_qud.saturating_mul(var_times_qud);
625
626 let div_by_two: FixedU128;
628 match var_times_qud_pow.const_checked_div(2.into()) {
629 Some(dbt) => div_by_two = dbt,
630 None => return Err(SpotTrafficCalculationErr::Division),
631 }
632
633 if positive {
635 let new_traffic = queue_util_diff
636 .saturating_add(div_by_two)
637 .saturating_add(One::one())
638 .saturating_mul(traffic);
639 Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
640 } else {
641 let new_traffic = queue_util_diff.saturating_sub(div_by_two).saturating_mul(traffic);
642 Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
643 }
644 }
645
646 fn add_on_demand_order(
653 queue_status: &mut QueueStatusType,
654 para_id: ParaId,
655 location: QueuePushDirection,
656 ) {
657 let idx = match location {
658 QueuePushDirection::Back => queue_status.push_back(),
659 QueuePushDirection::Front => queue_status.push_front(),
660 };
661
662 let affinity = ParaIdAffinity::<T>::get(para_id);
663 let order = EnqueuedOrder::new(idx, para_id);
664 #[cfg(test)]
665 log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location);
666
667 match affinity {
668 None => FreeEntries::<T>::mutate(|entries| entries.push(order)),
669 Some(affinity) =>
670 AffinityEntries::<T>::mutate(affinity.core_index, |entries| entries.push(order)),
671 }
672 }
673
674 fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) {
678 let affinity = Pallet::<T>::decrease_affinity(para_id, core_index);
679 #[cfg(not(test))]
680 debug_assert_ne!(
681 affinity, None,
682 "Decreased affinity for a para that has not been served on a core?"
683 );
684 if affinity != Some(0) {
685 return;
686 }
687 AffinityEntries::<T>::mutate(core_index, |affinity_entries| {
691 FreeEntries::<T>::mutate(|free_entries| {
692 let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) =
693 take(affinity_entries).into_iter().partition(|e| e.para_id == para_id);
694 free_entries.append(&mut freed);
695 *affinity_entries = affinities;
696 })
697 });
698 }
699
700 fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option<u32> {
709 ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| {
710 let affinity = maybe_affinity.as_mut()?;
711 if affinity.core_index == core_index {
712 let new_count = affinity.count.saturating_sub(1);
713 if new_count > 0 {
714 *maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count });
715 } else {
716 *maybe_affinity = None;
717 }
718 return Some(new_count);
719 } else {
720 None
721 }
722 })
723 }
724
725 fn increase_affinity(para_id: ParaId, core_index: CoreIndex) {
730 ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| match maybe_affinity {
731 Some(affinity) =>
732 if affinity.core_index == core_index {
733 *maybe_affinity = Some(CoreAffinityCount {
734 core_index,
735 count: affinity.count.saturating_add(1),
736 });
737 },
738 None => {
739 *maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 });
740 },
741 })
742 }
743
744 pub fn claim_revenue_until(when: BlockNumberFor<T>) -> BalanceOf<T> {
746 let now = <frame_system::Pallet<T>>::block_number();
747 let mut amount: BalanceOf<T> = BalanceOf::<T>::zero();
748 Revenue::<T>::mutate(|revenue| {
749 while !revenue.is_empty() {
750 let index = (revenue.len() - 1) as u32;
751 if when > now.saturating_sub(index.into()) {
752 amount = amount.saturating_add(revenue.pop().defensive_unwrap_or(0u32.into()));
753 } else {
754 break
755 }
756 }
757 });
758
759 amount
760 }
761
762 pub fn account_id() -> T::AccountId {
764 T::PalletId::get().into_account_truncating()
765 }
766
767 #[cfg(test)]
769 fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
770 ParaIdAffinity::<T>::get(para_id)
771 }
772
773 #[cfg(test)]
775 fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> {
776 AffinityEntries::<T>::get(core_index)
777 }
778
779 #[cfg(test)]
781 fn get_free_entries() -> BinaryHeap<EnqueuedOrder> {
782 FreeEntries::<T>::get()
783 }
784
785 #[cfg(feature = "runtime-benchmarks")]
786 pub fn populate_queue(para_id: ParaId, num: u32) {
787 QueueStatus::<T>::mutate(|queue_status| {
788 for _ in 0..num {
789 Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
790 }
791 });
792 }
793
794 #[cfg(test)]
795 fn set_queue_status(new_status: QueueStatusType) {
796 QueueStatus::<T>::set(new_status);
797 }
798
799 #[cfg(test)]
800 fn get_queue_status() -> QueueStatusType {
801 QueueStatus::<T>::get()
802 }
803
804 #[cfg(test)]
805 fn get_traffic_default_value() -> FixedU128 {
806 <T as Config>::TrafficDefaultValue::get()
807 }
808
809 #[cfg(test)]
810 fn get_revenue() -> Vec<BalanceOf<T>> {
811 Revenue::<T>::get().to_vec()
812 }
813}