1use super::pool::{self, Worker};
20use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, LOG_TARGET};
21use always_assert::{always, never};
22use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
23use polkadot_node_core_pvf_common::{error::PrepareResult, pvf::PvfPrepData};
24use std::{
25 collections::{HashMap, VecDeque},
26 path::PathBuf,
27};
28
29#[cfg(test)]
30use std::time::Duration;
31
32#[derive(Debug)]
34pub enum ToQueue {
35 Enqueue { priority: Priority, pvf: PvfPrepData },
40}
41
42#[derive(Debug)]
44pub struct FromQueue {
45 pub(crate) artifact_id: ArtifactId,
47 pub(crate) result: PrepareResult,
51}
52
53#[derive(Default)]
54struct Limits {
55 hard_capacity: usize,
58
59 soft_capacity: usize,
63}
64
65impl Limits {
66 fn can_afford_one_more(&self, spawned_num: usize, critical: bool) -> bool {
68 let cap = if critical { self.hard_capacity } else { self.soft_capacity };
69 spawned_num < cap
70 }
71
72 fn should_cull(&mut self, spawned_num: usize) -> bool {
75 spawned_num > self.soft_capacity
76 }
77}
78
79slotmap::new_key_type! { pub struct Job; }
80
81struct JobData {
82 priority: Priority,
84 pvf: PvfPrepData,
85 worker: Option<Worker>,
86}
87
88#[derive(Default)]
89struct WorkerData {
90 job: Option<Job>,
91}
92
93impl WorkerData {
94 fn is_idle(&self) -> bool {
95 self.job.is_none()
96 }
97}
98
99#[derive(Default)]
103struct Unscheduled {
104 normal: VecDeque<Job>,
105 critical: VecDeque<Job>,
106}
107
108impl Unscheduled {
109 fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
110 match prio {
111 Priority::Normal => &mut self.normal,
112 Priority::Critical => &mut self.critical,
113 }
114 }
115
116 fn add(&mut self, prio: Priority, job: Job) {
117 self.queue_mut(prio).push_back(job);
118 }
119
120 fn readd(&mut self, prio: Priority, job: Job) {
121 self.queue_mut(prio).push_front(job);
122 }
123
124 fn is_empty(&self) -> bool {
125 self.normal.is_empty() && self.critical.is_empty()
126 }
127
128 fn next(&mut self) -> Option<Job> {
129 let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
130 check(Priority::Critical).or_else(|| check(Priority::Normal))
131 }
132}
133
134struct Queue {
135 metrics: Metrics,
136
137 to_queue_rx: mpsc::Receiver<ToQueue>,
138 from_queue_tx: mpsc::UnboundedSender<FromQueue>,
139
140 to_pool_tx: mpsc::Sender<pool::ToPool>,
141 from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
142
143 cache_path: PathBuf,
144 limits: Limits,
145
146 jobs: slotmap::SlotMap<Job, JobData>,
147
148 artifact_id_to_job: HashMap<ArtifactId, Job>,
150 workers: slotmap::SparseSecondaryMap<Worker, WorkerData>,
152 spawn_inflight: usize,
154
155 unscheduled: Unscheduled,
158}
159
160struct Fatal;
162
163impl Queue {
164 fn new(
165 metrics: Metrics,
166 soft_capacity: usize,
167 hard_capacity: usize,
168 cache_path: PathBuf,
169 to_queue_rx: mpsc::Receiver<ToQueue>,
170 from_queue_tx: mpsc::UnboundedSender<FromQueue>,
171 to_pool_tx: mpsc::Sender<pool::ToPool>,
172 from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
173 ) -> Self {
174 Self {
175 metrics,
176 to_queue_rx,
177 from_queue_tx,
178 to_pool_tx,
179 from_pool_rx,
180 cache_path,
181 spawn_inflight: 0,
182 limits: Limits { hard_capacity, soft_capacity },
183 jobs: slotmap::SlotMap::with_key(),
184 unscheduled: Unscheduled::default(),
185 artifact_id_to_job: HashMap::new(),
186 workers: slotmap::SparseSecondaryMap::new(),
187 }
188 }
189
190 async fn run(mut self) {
191 macro_rules! break_if_fatal {
192 ($expr:expr) => {
193 if let Err(Fatal) = $expr {
194 break
195 }
196 };
197 }
198
199 loop {
200 futures::select_biased! {
202 to_queue = self.to_queue_rx.select_next_some() =>
203 break_if_fatal!(handle_to_queue(&mut self, to_queue).await),
204 from_pool = self.from_pool_rx.select_next_some() =>
205 break_if_fatal!(handle_from_pool(&mut self, from_pool).await),
206 }
207 }
208 }
209}
210
211async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
212 match to_queue {
213 ToQueue::Enqueue { priority, pvf } => {
214 handle_enqueue(queue, priority, pvf).await?;
215 },
216 }
217 Ok(())
218}
219
220async fn handle_enqueue(
221 queue: &mut Queue,
222 priority: Priority,
223 pvf: PvfPrepData,
224) -> Result<(), Fatal> {
225 gum::debug!(
226 target: LOG_TARGET,
227 validation_code_hash = ?pvf.code_hash(),
228 ?priority,
229 preparation_timeout = ?pvf.prep_timeout(),
230 "PVF is enqueued for preparation.",
231 );
232 queue.metrics.prepare_enqueued();
233
234 let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
235 if never!(
236 queue.artifact_id_to_job.contains_key(&artifact_id),
237 "second Enqueue sent for a known artifact"
238 ) {
239 gum::warn!(
244 target: LOG_TARGET,
245 "duplicate `enqueue` command received for {:?}",
246 artifact_id,
247 );
248 return Ok(())
249 }
250
251 let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
252 queue.artifact_id_to_job.insert(artifact_id, job);
253
254 if let Some(available) = find_idle_worker(queue) {
255 assign(queue, available, job).await?;
258 } else {
259 spawn_extra_worker(queue, priority.is_critical()).await?;
260 queue.unscheduled.add(priority, job);
261 }
262
263 Ok(())
264}
265
266fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
267 queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
268}
269
270async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
271 use pool::FromPool;
272 match from_pool {
273 FromPool::Spawned(worker) => handle_worker_spawned(queue, worker).await?,
274 FromPool::Concluded { worker, rip, result } =>
275 handle_worker_concluded(queue, worker, rip, result).await?,
276 FromPool::Rip(worker) => handle_worker_rip(queue, worker).await?,
277 }
278 Ok(())
279}
280
281async fn handle_worker_spawned(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
282 queue.workers.insert(worker, WorkerData::default());
283 queue.spawn_inflight -= 1;
284
285 if let Some(job) = queue.unscheduled.next() {
286 assign(queue, worker, job).await?;
287 }
288
289 Ok(())
290}
291
292async fn handle_worker_concluded(
293 queue: &mut Queue,
294 worker: Worker,
295 rip: bool,
296 result: PrepareResult,
297) -> Result<(), Fatal> {
298 queue.metrics.prepare_concluded();
299
300 macro_rules! never_none {
301 ($expr:expr) => {
302 match $expr {
303 Some(v) => v,
304 None => {
305 never!("never_none, {}", stringify!($expr));
309 return Ok(())
310 },
311 }
312 };
313 }
314
315 let worker_data = never_none!(queue.workers.get_mut(worker));
326
327 let job = never_none!(worker_data.job.take());
333
334 let job_data = never_none!(queue.jobs.remove(job));
342 let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf);
343
344 queue.artifact_id_to_job.remove(&artifact_id);
345
346 gum::debug!(
347 target: LOG_TARGET,
348 validation_code_hash = ?artifact_id.code_hash,
349 ?worker,
350 ?rip,
351 "prepare worker concluded",
352 );
353
354 reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;
355
356 if rip {
358 let worker_data = queue.workers.remove(worker);
359 always!(worker_data.is_some());
362
363 if !queue.unscheduled.is_empty() {
364 spawn_extra_worker(queue, false).await?;
367 }
368 } else if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) {
369 queue.workers.remove(worker);
371 send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
372 } else {
373 if let Some(job) = queue.unscheduled.next() {
375 assign(queue, worker, job).await?;
376 }
377 }
378
379 Ok(())
380}
381
382async fn handle_worker_rip(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
383 gum::debug!(target: LOG_TARGET, ?worker, "prepare worker ripped");
384
385 let worker_data = queue.workers.remove(worker);
386 if let Some(WorkerData { job: Some(job), .. }) = worker_data {
387 let priority = queue.jobs.get(job).map(|data| data.priority).unwrap_or_else(|| {
390 never!("the job of the ripped worker must be known but it is not");
397 Priority::Normal
398 });
399 queue.unscheduled.readd(priority, job);
400 }
401
402 if worker_data.is_some() && !queue.unscheduled.is_empty() {
406 spawn_extra_worker(queue, false).await?;
407 }
408 Ok(())
409}
410
411async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fatal> {
413 if queue
414 .limits
415 .can_afford_one_more(queue.workers.len() + queue.spawn_inflight, critical)
416 {
417 queue.spawn_inflight += 1;
418 send_pool(&mut queue.to_pool_tx, pool::ToPool::Spawn).await?;
419 }
420
421 Ok(())
422}
423
424async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
426 let job_data = &mut queue.jobs[job];
427 job_data.worker = Some(worker);
428
429 queue.workers[worker].job = Some(job);
430
431 send_pool(
432 &mut queue.to_pool_tx,
433 pool::ToPool::StartWork {
434 worker,
435 pvf: job_data.pvf.clone(),
436 cache_path: queue.cache_path.clone(),
437 },
438 )
439 .await?;
440
441 Ok(())
442}
443
444fn reply(from_queue_tx: &mut mpsc::UnboundedSender<FromQueue>, m: FromQueue) -> Result<(), Fatal> {
445 from_queue_tx.unbounded_send(m).map_err(|_| {
446 Fatal
448 })
449}
450
451async fn send_pool(
452 to_pool_tx: &mut mpsc::Sender<pool::ToPool>,
453 m: pool::ToPool,
454) -> Result<(), Fatal> {
455 to_pool_tx.send(m).await.map_err(|_| {
456 Fatal
458 })
459}
460
461pub fn start(
463 metrics: Metrics,
464 soft_capacity: usize,
465 hard_capacity: usize,
466 cache_path: PathBuf,
467 to_pool_tx: mpsc::Sender<pool::ToPool>,
468 from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
469) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
470 let (to_queue_tx, to_queue_rx) = mpsc::channel(150);
471 let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
472
473 let run = Queue::new(
474 metrics,
475 soft_capacity,
476 hard_capacity,
477 cache_path,
478 to_queue_rx,
479 from_queue_tx,
480 to_pool_tx,
481 from_pool_rx,
482 )
483 .run();
484
485 (to_queue_tx, from_queue_rx, run)
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use crate::host::tests::TEST_PREPARATION_TIMEOUT;
492 use assert_matches::assert_matches;
493 use futures::{future::BoxFuture, FutureExt};
494 use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareSuccess};
495 use slotmap::SlotMap;
496 use std::task::Poll;
497
498 fn pvf(discriminator: u32) -> PvfPrepData {
500 PvfPrepData::from_discriminator(discriminator)
501 }
502
503 async fn run_until<R>(
504 task: &mut (impl Future<Output = ()> + Unpin),
505 mut fut: (impl Future<Output = R> + Unpin),
506 ) -> R {
507 let start = std::time::Instant::now();
508 let fut = &mut fut;
509 loop {
510 if start.elapsed() > std::time::Duration::from_secs(1) {
511 panic!("timeout");
514 }
515
516 if let Poll::Ready(r) = futures::poll!(&mut *fut) {
517 break r
518 }
519
520 if futures::poll!(&mut *task).is_ready() {
521 panic!()
522 }
523 }
524 }
525
526 struct Test {
527 _tempdir: tempfile::TempDir,
528 run: BoxFuture<'static, ()>,
529 workers: SlotMap<Worker, ()>,
530 from_pool_tx: mpsc::UnboundedSender<pool::FromPool>,
531 to_pool_rx: mpsc::Receiver<pool::ToPool>,
532 to_queue_tx: mpsc::Sender<ToQueue>,
533 from_queue_rx: mpsc::UnboundedReceiver<FromQueue>,
534 }
535
536 impl Test {
537 fn new(soft_capacity: usize, hard_capacity: usize) -> Self {
538 let tempdir = tempfile::tempdir().unwrap();
539
540 let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
541 let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
542
543 let workers: SlotMap<Worker, ()> = SlotMap::with_key();
544
545 let (to_queue_tx, from_queue_rx, run) = start(
546 Metrics::default(),
547 soft_capacity,
548 hard_capacity,
549 tempdir.path().to_owned().into(),
550 to_pool_tx,
551 from_pool_rx,
552 );
553
554 Self {
555 _tempdir: tempdir,
556 run: run.boxed(),
557 workers,
558 from_pool_tx,
559 to_pool_rx,
560 to_queue_tx,
561 from_queue_rx,
562 }
563 }
564
565 fn send_queue(&mut self, to_queue: ToQueue) {
566 self.to_queue_tx.send(to_queue).now_or_never().unwrap().unwrap();
567 }
568
569 async fn poll_and_recv_from_queue(&mut self) -> FromQueue {
570 let from_queue_rx = &mut self.from_queue_rx;
571 run_until(&mut self.run, async { from_queue_rx.next().await.unwrap() }.boxed()).await
572 }
573
574 fn send_from_pool(&mut self, from_pool: pool::FromPool) {
575 self.from_pool_tx.send(from_pool).now_or_never().unwrap().unwrap();
576 }
577
578 async fn poll_and_recv_to_pool(&mut self) -> pool::ToPool {
579 let to_pool_rx = &mut self.to_pool_rx;
580 run_until(&mut self.run, async { to_pool_rx.next().await.unwrap() }.boxed()).await
581 }
582
583 async fn poll_ensure_to_pool_is_empty(&mut self) {
584 use futures_timer::Delay;
585
586 let to_pool_rx = &mut self.to_pool_rx;
587 run_until(
588 &mut self.run,
589 async {
590 futures::select! {
591 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
592 _ = to_pool_rx.next().fuse() => {
593 panic!("to pool supposed to be empty")
594 }
595 }
596 }
597 .boxed(),
598 )
599 .await
600 }
601 }
602
603 #[tokio::test]
604 async fn properly_concludes() {
605 let mut test = Test::new(2, 2);
606
607 test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
608 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
609
610 let w = test.workers.insert(());
611 test.send_from_pool(pool::FromPool::Spawned(w));
612 test.send_from_pool(pool::FromPool::Concluded {
613 worker: w,
614 rip: false,
615 result: Ok(PrepareSuccess::default()),
616 });
617
618 assert_eq!(
619 test.poll_and_recv_from_queue().await.artifact_id,
620 ArtifactId::from_pvf_prep_data(&pvf(1))
621 );
622 }
623
624 #[tokio::test]
625 async fn dont_spawn_over_soft_limit_unless_critical() {
626 let mut test = Test::new(2, 3);
627
628 let priority = Priority::Normal;
629 test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
630 test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
631 test.send_queue(ToQueue::Enqueue {
633 priority,
634 pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
635 });
636
637 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
639 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
640
641 let w1 = test.workers.insert(());
642 let w2 = test.workers.insert(());
643
644 test.send_from_pool(pool::FromPool::Spawned(w1));
645 test.send_from_pool(pool::FromPool::Spawned(w2));
646
647 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
649 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
650
651 test.send_from_pool(pool::FromPool::Concluded {
652 worker: w1,
653 rip: false,
654 result: Ok(PrepareSuccess::default()),
655 });
656
657 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
658
659 test.send_queue(ToQueue::Enqueue {
661 priority: Priority::Critical,
662 pvf: PvfPrepData::from_discriminator(4),
663 });
664
665 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
668 }
669
670 #[tokio::test]
671 async fn cull_unwanted() {
672 let mut test = Test::new(1, 2);
673
674 test.send_queue(ToQueue::Enqueue {
675 priority: Priority::Normal,
676 pvf: PvfPrepData::from_discriminator(1),
677 });
678 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
679 let w1 = test.workers.insert(());
680 test.send_from_pool(pool::FromPool::Spawned(w1));
681 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
682
683 test.send_queue(ToQueue::Enqueue {
685 priority: Priority::Critical,
686 pvf: PvfPrepData::from_discriminator(2),
687 });
688 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
689
690 test.send_from_pool(pool::FromPool::Concluded {
698 worker: w1,
699 rip: false,
700 result: Ok(PrepareSuccess::default()),
701 });
702 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
703 }
704
705 #[tokio::test]
706 async fn worker_mass_die_out_doesnt_stall_queue() {
707 let mut test = Test::new(2, 2);
708
709 let priority = Priority::Normal;
710 test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
711 test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
712 test.send_queue(ToQueue::Enqueue {
714 priority,
715 pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
716 });
717
718 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
719 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
720
721 let w1 = test.workers.insert(());
722 let w2 = test.workers.insert(());
723
724 test.send_from_pool(pool::FromPool::Spawned(w1));
725 test.send_from_pool(pool::FromPool::Spawned(w2));
726
727 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
728 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
729
730 test.send_from_pool(pool::FromPool::Concluded {
732 worker: w1,
733 rip: true,
734 result: Ok(PrepareSuccess::default()),
735 });
736
737 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
740 assert_eq!(
741 test.poll_and_recv_from_queue().await.artifact_id,
742 ArtifactId::from_pvf_prep_data(&pvf(1))
743 );
744 }
745
746 #[tokio::test]
747 async fn doesnt_resurrect_ripped_worker_if_no_work() {
748 let mut test = Test::new(2, 2);
749
750 test.send_queue(ToQueue::Enqueue {
751 priority: Priority::Normal,
752 pvf: PvfPrepData::from_discriminator(1),
753 });
754
755 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
756
757 let w1 = test.workers.insert(());
758 test.send_from_pool(pool::FromPool::Spawned(w1));
759
760 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
761
762 test.send_from_pool(pool::FromPool::Concluded {
763 worker: w1,
764 rip: true,
765 result: Err(PrepareError::IoErr("test".into())),
766 });
767 test.poll_ensure_to_pool_is_empty().await;
768 }
769
770 #[tokio::test]
771 async fn rip_for_start_work() {
772 let mut test = Test::new(2, 2);
773
774 test.send_queue(ToQueue::Enqueue {
775 priority: Priority::Normal,
776 pvf: PvfPrepData::from_discriminator(1),
777 });
778
779 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
780
781 let w1 = test.workers.insert(());
782 test.send_from_pool(pool::FromPool::Spawned(w1));
783
784 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
787 test.send_from_pool(pool::FromPool::Rip(w1));
788
789 assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
791
792 let w2 = test.workers.insert(());
793 test.send_from_pool(pool::FromPool::Spawned(w2));
794 assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
795 }
796}