1use super::*;
19use crate::on_demand::LOG_TARGET;
20use frame_support::{
21 migrations::VersionedMigration, storage_alias, traits::UncheckedOnRuntimeUpgrade,
22 weights::Weight,
23};
24use polkadot_primitives::CoreIndex;
25
26mod v1 {
28 use super::*;
29 use alloc::collections::BinaryHeap;
30 use core::cmp::Ordering;
31
32 const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000;
34
35 #[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
37 pub(super) struct QueueIndex(pub u32);
38
39 #[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
41 pub(super) struct ReverseQueueIndex(pub u32);
42
43 impl Ord for QueueIndex {
44 fn cmp(&self, other: &Self) -> Ordering {
45 let diff = self.0.overflowing_sub(other.0).0;
46 if diff == 0 {
47 Ordering::Equal
48 } else if diff <= ON_DEMAND_MAX_QUEUE_MAX_SIZE {
49 Ordering::Greater
50 } else {
51 Ordering::Less
52 }
53 }
54 }
55
56 impl PartialOrd for QueueIndex {
57 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58 Some(self.cmp(other))
59 }
60 }
61
62 impl Ord for ReverseQueueIndex {
63 fn cmp(&self, other: &Self) -> Ordering {
64 QueueIndex(other.0).cmp(&QueueIndex(self.0))
65 }
66 }
67
68 impl PartialOrd for ReverseQueueIndex {
69 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
70 Some(self.cmp(other))
71 }
72 }
73
74 #[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
76 pub(super) struct OldEnqueuedOrder {
77 pub para_id: ParaId,
78 pub idx: QueueIndex,
79 }
80
81 impl PartialOrd for OldEnqueuedOrder {
82 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
83 match other.idx.partial_cmp(&self.idx) {
84 Some(Ordering::Equal) => other.para_id.partial_cmp(&self.para_id),
85 o => o,
86 }
87 }
88 }
89
90 impl Ord for OldEnqueuedOrder {
91 fn cmp(&self, other: &Self) -> Ordering {
92 match other.idx.cmp(&self.idx) {
94 Ordering::Equal => other.para_id.cmp(&self.para_id),
95 o => o,
96 }
97 }
98 }
99
100 #[derive(Encode, Decode, TypeInfo)]
102 pub(super) struct CoreAffinityCount {
103 pub core_index: CoreIndex,
104 pub count: u32,
105 }
106
107 #[derive(Encode, Decode, TypeInfo)]
109 pub(super) struct OldQueueStatus {
110 pub traffic: FixedU128,
111 pub next_index: QueueIndex,
112 pub smallest_index: QueueIndex,
113 pub freed_indices: BinaryHeap<ReverseQueueIndex>,
114 }
115
116 impl Default for OldQueueStatus {
117 fn default() -> Self {
118 Self {
119 traffic: FixedU128::default(),
120 next_index: QueueIndex(0),
121 smallest_index: QueueIndex(0),
122 freed_indices: BinaryHeap::new(),
123 }
124 }
125 }
126
127 #[storage_alias]
128 pub(super) type ParaIdAffinity<T: Config> =
129 StorageMap<Pallet<T>, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>;
130
131 #[storage_alias]
132 pub(super) type QueueStatus<T: Config> = StorageValue<Pallet<T>, OldQueueStatus, OptionQuery>;
133
134 #[storage_alias]
135 pub(super) type FreeEntries<T: Config> =
136 StorageValue<Pallet<T>, BinaryHeap<OldEnqueuedOrder>, OptionQuery>;
137
138 #[storage_alias]
139 pub(super) type AffinityEntries<T: Config> =
140 StorageMap<Pallet<T>, Twox64Concat, CoreIndex, BinaryHeap<OldEnqueuedOrder>, OptionQuery>;
141}
142
143pub struct UncheckedMigrateToV2<T>(core::marker::PhantomData<T>);
145
146#[cfg(any(feature = "try-runtime", test))]
147impl<T: Config> UncheckedMigrateToV2<T> {
148 pub fn pre_upgrade() -> Result<alloc::vec::Vec<u8>, sp_runtime::TryRuntimeError> {
149 let old_queue_status = v1::QueueStatus::<T>::get();
150 let free_entries = v1::FreeEntries::<T>::get().unwrap_or_default();
151 let affinity_keys: alloc::vec::Vec<_> = v1::AffinityEntries::<T>::iter_keys().collect();
152
153 let mut total_orders = free_entries.len();
154 for core_idx in affinity_keys.iter() {
155 total_orders += v1::AffinityEntries::<T>::get(core_idx).unwrap_or_default().len();
156 }
157
158 let affinity_count = v1::ParaIdAffinity::<T>::iter().count();
159
160 log::info!(
161 target: LOG_TARGET,
162 "Before migration: {} total orders ({} free, {} in affinity queues), {} affinity mappings, traffic: {:?}",
163 total_orders,
164 free_entries.len(),
165 total_orders - free_entries.len(),
166 affinity_count,
167 old_queue_status.as_ref().map(|s| s.traffic)
168 );
169
170 if total_orders > polkadot_primitives::ON_DEMAND_MAX_QUEUE_MAX_SIZE as usize {
172 log::error!(
173 target: LOG_TARGET,
174 "Migration would lose orders: {} total orders exceeds V2 capacity of {}",
175 total_orders,
176 polkadot_primitives::ON_DEMAND_MAX_QUEUE_MAX_SIZE
177 );
178 return Err("Too many orders to migrate - queue capacity exceeded".into());
179 }
180
181 Ok((total_orders as u32, affinity_count as u32, old_queue_status.map(|s| s.traffic))
182 .encode())
183 }
184
185 pub fn post_upgrade(state: alloc::vec::Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
186 log::info!(target: LOG_TARGET, "Running post_upgrade() for v2");
187
188 let (expected_orders, expected_affinity_count, expected_traffic): (
189 u32,
190 u32,
191 Option<FixedU128>,
192 ) = Decode::decode(&mut &state[..]).map_err(|_| "Failed to decode pre_upgrade state")?;
193
194 ensure!(!v1::QueueStatus::<T>::exists(), "Old QueueStatus should be removed");
196 ensure!(!v1::FreeEntries::<T>::exists(), "FreeEntries should be removed");
197 ensure!(v1::AffinityEntries::<T>::iter().count() == 0, "AffinityEntries should be empty");
198 ensure!(v1::ParaIdAffinity::<T>::iter().count() == 0, "ParaIdAffinity should be empty");
199
200 let new_order_status = super::pallet::OrderStatus::<T>::get();
202
203 match expected_traffic {
205 Some(expected) => {
206 ensure!(new_order_status.traffic == expected, "Traffic value should be preserved")
207 },
208 None => {
209 let default_traffic = T::TrafficDefaultValue::get();
211 ensure!(
212 new_order_status.traffic == default_traffic,
213 "Traffic value should be set to default when no old QueueStatus existed"
214 );
215 },
216 }
217
218 let migrated_orders = new_order_status.queue.len() as u32;
219
220 ensure!(migrated_orders == expected_orders, "Migrated order count mismatch",);
223
224 log::info!(
225 target: LOG_TARGET,
226 "Successfully migrated {} orders (expected {}), removed {} affinity mappings, traffic preserved: {:?}",
227 migrated_orders,
228 expected_orders,
229 expected_affinity_count,
230 new_order_status.traffic
231 );
232
233 Ok(())
234 }
235}
236
237impl<T: Config> UncheckedOnRuntimeUpgrade for UncheckedMigrateToV2<T> {
238 fn on_runtime_upgrade() -> Weight {
239 let mut weight: Weight = Weight::zero();
240
241 let now = frame_system::Pallet::<T>::block_number();
242 let old_queue_status = v1::QueueStatus::<T>::take().unwrap_or_else(|| v1::OldQueueStatus {
243 traffic: T::TrafficDefaultValue::get(),
244 ..Default::default()
245 });
246 weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
247
248 let mut all_orders = alloc::vec::Vec::new();
250
251 let free_entries = v1::FreeEntries::<T>::take().unwrap_or_default();
253 weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
254 for order in free_entries.into_iter() {
255 all_orders.push(order);
256 }
257
258 let mut affinity_count = 0u64;
261 for (_core_idx, affinity_heap) in v1::AffinityEntries::<T>::drain() {
262 affinity_count += 1;
263 for order in affinity_heap.into_iter() {
264 all_orders.push(order);
265 }
266 }
267 weight.saturating_accrue(T::DbWeight::get().reads_writes(affinity_count, affinity_count));
269
270 all_orders.sort_by_key(|o| o.idx);
272
273 let affinity_count = v1::ParaIdAffinity::<T>::iter().count();
275 let _ = v1::ParaIdAffinity::<T>::clear(u32::MAX, None);
276 weight.saturating_accrue(
277 T::DbWeight::get().reads_writes(affinity_count as u64, affinity_count as u64),
278 );
279
280 super::pallet::OrderStatus::<T>::mutate(|order_status| {
282 order_status.traffic = old_queue_status.traffic;
284
285 for old_order in all_orders.iter() {
287 if let Err(para_id) = order_status.queue.try_push(now, old_order.para_id) {
288 log::warn!(
289 target: LOG_TARGET,
290 "Failed to migrate order for para_id {:?} - queue full, stopping migration of remaining orders",
291 para_id
292 );
293 break;
295 }
296 }
297 });
298 weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
299
300 log::info!(
301 target: LOG_TARGET,
302 "Migrated on demand assigner storage to v2: {} orders migrated, {} affinity entries removed",
303 all_orders.len(),
304 affinity_count
305 );
306
307 weight
308 }
309
310 #[cfg(feature = "try-runtime")]
311 fn pre_upgrade() -> Result<alloc::vec::Vec<u8>, sp_runtime::TryRuntimeError> {
312 Self::pre_upgrade()
313 }
314
315 #[cfg(feature = "try-runtime")]
316 fn post_upgrade(state: alloc::vec::Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
317 Self::post_upgrade(state)
318 }
319}
320
321pub type MigrateV1ToV2<T> = VersionedMigration<
323 1,
324 2,
325 UncheckedMigrateToV2<T>,
326 Pallet<T>,
327 <T as frame_system::Config>::DbWeight,
328>;
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::{
334 mock::{new_test_ext, MockGenesisConfig, Test},
335 on_demand,
336 };
337 use alloc::collections::BinaryHeap;
338 use frame_support::pallet_prelude::*;
339 use polkadot_primitives::{CoreIndex, Id as ParaId};
340 use sp_runtime::FixedU128;
341
342 #[test]
343 fn affinity_queue_merging_works() {
344 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
345 let para_1 = ParaId::from(1000);
346 let para_2 = ParaId::from(1001);
347 let para_3 = ParaId::from(1002);
348
349 let mut affinity_queue_core_0 = BinaryHeap::new();
351 affinity_queue_core_0
352 .push(v1::OldEnqueuedOrder { para_id: para_1, idx: v1::QueueIndex(1) });
353 affinity_queue_core_0
354 .push(v1::OldEnqueuedOrder { para_id: para_2, idx: v1::QueueIndex(3) });
355
356 let mut affinity_queue_core_1 = BinaryHeap::new();
357 affinity_queue_core_1
358 .push(v1::OldEnqueuedOrder { para_id: para_3, idx: v1::QueueIndex(2) });
359
360 v1::AffinityEntries::<Test>::insert(CoreIndex(0), affinity_queue_core_0);
361 v1::AffinityEntries::<Test>::insert(CoreIndex(1), affinity_queue_core_1);
362
363 let old_status = v1::OldQueueStatus {
365 traffic: FixedU128::from_rational(5, 10),
366 next_index: v1::QueueIndex(4),
367 smallest_index: v1::QueueIndex(1),
368 freed_indices: BinaryHeap::new(),
369 };
370 v1::QueueStatus::<Test>::put(old_status);
371
372 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
374
375 let state =
377 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
378 let _weight = UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
379 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
380
381 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
383 assert_eq!(new_status.traffic, FixedU128::from_rational(5, 10));
384
385 assert_eq!(new_status.queue.len(), 3);
387
388 assert!(!v1::QueueStatus::<Test>::exists());
390 assert_eq!(v1::AffinityEntries::<Test>::iter_keys().count(), 0);
391 });
392 }
393
394 #[test]
395 fn free_and_affinity_queues_merged() {
396 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
397 let para_1 = ParaId::from(1000);
398 let para_2 = ParaId::from(1001);
399 let para_3 = ParaId::from(1002);
400 let para_4 = ParaId::from(1003);
401
402 let mut free_queue = BinaryHeap::new();
404 free_queue.push(v1::OldEnqueuedOrder { para_id: para_1, idx: v1::QueueIndex(1) });
405 free_queue.push(v1::OldEnqueuedOrder { para_id: para_2, idx: v1::QueueIndex(5) });
406 v1::FreeEntries::<Test>::put(free_queue);
407
408 let mut affinity_queue = BinaryHeap::new();
410 affinity_queue.push(v1::OldEnqueuedOrder { para_id: para_3, idx: v1::QueueIndex(3) });
411 affinity_queue.push(v1::OldEnqueuedOrder { para_id: para_4, idx: v1::QueueIndex(7) });
412 v1::AffinityEntries::<Test>::insert(CoreIndex(0), affinity_queue);
413
414 let old_status = v1::OldQueueStatus::default();
415 v1::QueueStatus::<Test>::put(old_status);
416
417 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
418
419 let state =
421 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
422 UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
423 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
424
425 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
427 assert_eq!(new_status.queue.len(), 4);
428
429 assert!(!v1::FreeEntries::<Test>::exists());
431 assert_eq!(v1::AffinityEntries::<Test>::iter_keys().count(), 0);
432 });
433 }
434
435 #[test]
436 fn order_preservation_by_queue_index() {
437 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
438 let para_1 = ParaId::from(1000);
439 let para_2 = ParaId::from(1001);
440 let para_3 = ParaId::from(1002);
441
442 let mut free_queue = BinaryHeap::new();
444 free_queue.push(v1::OldEnqueuedOrder { para_id: para_2, idx: v1::QueueIndex(5) });
445 free_queue.push(v1::OldEnqueuedOrder { para_id: para_1, idx: v1::QueueIndex(2) });
446 free_queue.push(v1::OldEnqueuedOrder { para_id: para_3, idx: v1::QueueIndex(10) });
447 v1::FreeEntries::<Test>::put(free_queue);
448
449 let old_status = v1::OldQueueStatus::default();
450 v1::QueueStatus::<Test>::put(old_status);
451
452 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
453
454 let state =
456 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
457 UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
458 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
459
460 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
463 assert_eq!(new_status.queue.len(), 3);
464
465 assert_eq!(new_status.queue.queue[0].para_id, para_1);
467 assert_eq!(new_status.queue.queue[1].para_id, para_2);
468 assert_eq!(new_status.queue.queue[2].para_id, para_3);
469 });
470 }
471
472 #[test]
473 fn traffic_value_preserved() {
474 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
475 let traffic_value = FixedU128::from_rational(75, 100);
476
477 let old_status = v1::OldQueueStatus {
478 traffic: traffic_value,
479 next_index: v1::QueueIndex(1),
480 smallest_index: v1::QueueIndex(1),
481 freed_indices: BinaryHeap::new(),
482 };
483 v1::QueueStatus::<Test>::put(old_status);
484
485 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
486
487 let state =
489 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
490 UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
491 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
492
493 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
495 assert_eq!(new_status.traffic, traffic_value);
496 });
497 }
498
499 #[test]
500 fn para_id_affinity_removed() {
501 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
502 let para_1 = ParaId::from(1000);
503 let para_2 = ParaId::from(1001);
504
505 v1::ParaIdAffinity::<Test>::insert(
507 para_1,
508 v1::CoreAffinityCount { core_index: CoreIndex(0), count: 5 },
509 );
510 v1::ParaIdAffinity::<Test>::insert(
511 para_2,
512 v1::CoreAffinityCount { core_index: CoreIndex(1), count: 3 },
513 );
514
515 let old_status = v1::OldQueueStatus::default();
516 v1::QueueStatus::<Test>::put(old_status);
517
518 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
519
520 let state =
522 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
523 UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
524 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
525
526 assert_eq!(v1::ParaIdAffinity::<Test>::iter().count(), 0);
528 });
529 }
530
531 #[test]
532 fn empty_storage_migration() {
533 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
534 let old_status = v1::OldQueueStatus::default();
536 v1::QueueStatus::<Test>::put(old_status);
537
538 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
539
540 let state =
542 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
543 let _weight = UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
544 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
545
546 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
548 assert_eq!(new_status.queue.len(), 0);
549
550 assert!(!v1::QueueStatus::<Test>::exists());
552 });
553 }
554
555 #[test]
556 fn multiple_affinity_cores_merged() {
557 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
558 for core_idx in 0..5 {
560 let mut affinity_queue = BinaryHeap::new();
561 affinity_queue.push(v1::OldEnqueuedOrder {
562 para_id: ParaId::from(1000 + core_idx),
563 idx: v1::QueueIndex(core_idx),
564 });
565 v1::AffinityEntries::<Test>::insert(CoreIndex(core_idx), affinity_queue);
566 }
567
568 let old_status = v1::OldQueueStatus::default();
569 v1::QueueStatus::<Test>::put(old_status);
570
571 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
572
573 let state =
575 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
576 UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
577 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
578
579 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
581 assert_eq!(new_status.queue.len(), 5);
582
583 assert_eq!(v1::AffinityEntries::<Test>::iter_keys().count(), 0);
585 });
586 }
587
588 #[test]
589 fn queue_full_handling() {
590 new_test_ext(MockGenesisConfig::default()).execute_with(|| {
591 let _now = frame_system::Pallet::<Test>::block_number();
592
593 let mut free_queue = BinaryHeap::new();
595
596 for i in 0..1000 {
598 free_queue.push(v1::OldEnqueuedOrder {
599 para_id: ParaId::from(i),
600 idx: v1::QueueIndex(i),
601 });
602 }
603
604 v1::FreeEntries::<Test>::put(free_queue);
605
606 let old_status = v1::OldQueueStatus::default();
607 v1::QueueStatus::<Test>::put(old_status);
608
609 StorageVersion::new(1).put::<on_demand::Pallet<Test>>();
610
611 let state =
613 UncheckedMigrateToV2::<Test>::pre_upgrade().expect("pre_upgrade should succeed");
614 let _weight = UncheckedMigrateToV2::<Test>::on_runtime_upgrade();
615 UncheckedMigrateToV2::<Test>::post_upgrade(state).expect("post_upgrade should succeed");
616
617 let new_status = on_demand::pallet::OrderStatus::<Test>::get();
619 assert!(new_status.queue.len() > 0);
621 });
622 }
623}