1use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse};
20use crate::{
21 artifacts::{ArtifactId, ArtifactPathId},
22 host::ResultSender,
23 metrics::Metrics,
24 worker_interface::{IdleWorker, WorkerHandle},
25 InvalidCandidate, PossiblyInvalidError, ValidationError, LOG_TARGET,
26};
27use futures::{
28 channel::{mpsc, oneshot},
29 future::BoxFuture,
30 stream::{FuturesUnordered, StreamExt as _},
31 Future, FutureExt,
32};
33use polkadot_node_core_pvf_common::{
34 execute::{JobResponse, WorkerError, WorkerResponse},
35 SecurityStatus,
36};
37use polkadot_node_primitives::PoV;
38use polkadot_node_subsystem::{messages::PvfExecKind, ActiveLeavesUpdate};
39use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, Hash, PersistedValidationData};
40use slotmap::HopSlotMap;
41use std::{
42 collections::{HashMap, VecDeque},
43 fmt,
44 path::PathBuf,
45 sync::Arc,
46 time::{Duration, Instant},
47};
48use strum::{EnumIter, IntoEnumIterator};
49
50const MAX_KEEP_WAITING: Duration = Duration::from_secs(4);
56
57slotmap::new_key_type! { struct Worker; }
58
59#[derive(Debug)]
60pub enum ToQueue {
61 UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec<Hash> },
62 Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
63}
64
65#[derive(Debug)]
67pub enum FromQueue {
68 RemoveArtifact { artifact: ArtifactId, reply_to: oneshot::Sender<()> },
69}
70
71#[derive(Debug)]
74pub struct PendingExecutionRequest {
75 pub exec_timeout: Duration,
76 pub pvd: Arc<PersistedValidationData>,
77 pub pov: Arc<PoV>,
78 pub executor_params: ExecutorParams,
79 pub result_tx: ResultSender,
80 pub exec_kind: PvfExecKind,
81}
82
83struct ExecuteJob {
84 artifact: ArtifactPathId,
85 exec_timeout: Duration,
86 exec_kind: PvfExecKind,
87 pvd: Arc<PersistedValidationData>,
88 pov: Arc<PoV>,
89 executor_params: ExecutorParams,
90 result_tx: ResultSender,
91 waiting_since: Instant,
92}
93
94struct WorkerData {
95 idle: Option<IdleWorker>,
96 handle: WorkerHandle,
97 executor_params_hash: ExecutorParamsHash,
98}
99
100impl fmt::Debug for WorkerData {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 write!(f, "WorkerData(pid={})", self.handle.id())
103 }
104}
105
106struct Workers {
107 running: HopSlotMap<Worker, WorkerData>,
109
110 spawn_inflight: usize,
112
113 capacity: usize,
115}
116
117impl Workers {
118 fn can_afford_one_more(&self) -> bool {
119 self.spawn_inflight + self.running.len() < self.capacity
120 }
121
122 fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option<Worker> {
123 self.running.iter().find_map(|d| {
124 if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
125 Some(d.0)
126 } else {
127 None
128 }
129 })
130 }
131
132 fn find_idle(&self) -> Option<Worker> {
133 self.running
134 .iter()
135 .find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
136 }
137
138 fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
142 self.running.get_mut(worker)?.idle.take()
143 }
144}
145
146enum QueueEvent {
147 Spawn(IdleWorker, WorkerHandle, ExecuteJob),
148 FinishWork(
149 Worker,
150 Result<WorkerInterfaceResponse, WorkerInterfaceError>,
151 ArtifactId,
152 ResultSender,
153 ),
154}
155
156type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
157
158struct Queue {
159 metrics: Metrics,
160
161 to_queue_rx: mpsc::Receiver<ToQueue>,
163 from_queue_tx: mpsc::UnboundedSender<FromQueue>,
165
166 program_path: PathBuf,
168 cache_path: PathBuf,
169 spawn_timeout: Duration,
170 node_version: Option<String>,
171 security_status: SecurityStatus,
172
173 unscheduled: Unscheduled,
175 workers: Workers,
176 mux: Mux,
177
178 active_leaves: HashMap<Hash, Vec<Hash>>,
180}
181
182impl Queue {
183 fn new(
184 metrics: Metrics,
185 program_path: PathBuf,
186 cache_path: PathBuf,
187 worker_capacity: usize,
188 spawn_timeout: Duration,
189 node_version: Option<String>,
190 security_status: SecurityStatus,
191 to_queue_rx: mpsc::Receiver<ToQueue>,
192 from_queue_tx: mpsc::UnboundedSender<FromQueue>,
193 ) -> Self {
194 Self {
195 metrics,
196 program_path,
197 cache_path,
198 spawn_timeout,
199 node_version,
200 security_status,
201 to_queue_rx,
202 from_queue_tx,
203 unscheduled: Unscheduled::new(),
204 mux: Mux::new(),
205 workers: Workers {
206 running: HopSlotMap::with_capacity_and_key(10),
207 spawn_inflight: 0,
208 capacity: worker_capacity,
209 },
210 active_leaves: Default::default(),
211 }
212 }
213
214 async fn run(mut self) {
215 loop {
216 futures::select! {
217 to_queue = self.to_queue_rx.next() => {
218 if let Some(to_queue) = to_queue {
219 handle_to_queue(&mut self, to_queue);
220 } else {
221 break;
222 }
223 }
224 ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
225 }
226
227 purge_dead(&self.metrics, &mut self.workers).await;
228 }
229 }
230
231 fn try_assign_next_job(&mut self, finished_worker: Option<Worker>) {
238 let priority = self.unscheduled.select_next_priority();
240 let Some(queue) = self.unscheduled.get_mut(priority) else { return };
241
242 let eldest = if let Some(eldest) = queue.get(0) { eldest } else { return };
245
246 let mut worker = None;
249 let mut job_index = 0;
250
251 if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
254 if let Some(finished_worker) = finished_worker {
255 if let Some(worker_data) = self.workers.running.get(finished_worker) {
256 for (i, job) in queue.iter().enumerate() {
257 if worker_data.executor_params_hash == job.executor_params.hash() {
258 (worker, job_index) = (Some(finished_worker), i);
259 break
260 }
261 }
262 }
263 }
264 }
265
266 if worker.is_none() {
267 worker = self.workers.find_available(queue[job_index].executor_params.hash());
269 }
270
271 if worker.is_none() {
272 if let Some(idle) = self.workers.find_idle() {
273 if self.workers.running.remove(idle).is_some() {
276 self.metrics.execute_worker().on_retired();
277 }
278 }
279 }
280
281 if worker.is_none() && !self.workers.can_afford_one_more() {
282 return
284 }
285
286 let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed");
287 let exec_kind = job.exec_kind;
288
289 if let Some(worker) = worker {
290 assign(self, worker, job);
291 } else {
292 spawn_extra_worker(self, job);
293 }
294 self.metrics.on_execute_kind(exec_kind);
295 self.unscheduled.mark_scheduled(priority);
296 }
297
298 fn update_active_leaves(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
299 self.prune_deactivated_leaves(&update);
300 self.insert_active_leaf(update, ancestors);
301 self.prune_old_jobs();
302 }
303
304 fn prune_deactivated_leaves(&mut self, update: &ActiveLeavesUpdate) {
305 for hash in &update.deactivated {
306 let _ = self.active_leaves.remove(&hash);
307 }
308 }
309
310 fn insert_active_leaf(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
311 let Some(leaf) = update.activated else { return };
312 let _ = self.active_leaves.insert(leaf.hash, ancestors);
313 }
314
315 fn prune_old_jobs(&mut self) {
316 for &priority in &[Priority::Backing, Priority::BackingSystemParas] {
317 let Some(queue) = self.unscheduled.get_mut(priority) else { continue };
318 let to_remove: Vec<usize> = queue
319 .iter()
320 .enumerate()
321 .filter_map(|(index, job)| {
322 let relay_parent = match job.exec_kind {
323 PvfExecKind::Backing(x) | PvfExecKind::BackingSystemParas(x) => x,
324 _ => return None,
325 };
326 let in_active_fork = self.active_leaves.iter().any(|(hash, ancestors)| {
327 *hash == relay_parent || ancestors.contains(&relay_parent)
328 });
329 if in_active_fork {
330 None
331 } else {
332 Some(index)
333 }
334 })
335 .collect();
336
337 for &index in to_remove.iter().rev() {
338 if index > queue.len() {
339 continue
340 }
341
342 let Some(job) = queue.remove(index) else { continue };
343 let _ = job.result_tx.send(Err(ValidationError::ExecutionDeadline));
344 gum::warn!(
345 target: LOG_TARGET,
346 ?priority,
347 exec_kind = ?job.exec_kind,
348 "Job exceeded its deadline and was dropped without execution",
349 );
350 }
351 }
352 }
353}
354
355async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
356 let mut to_remove = vec![];
357 for (worker, data) in workers.running.iter_mut() {
358 if futures::poll!(&mut data.handle).is_ready() {
359 to_remove.push(worker);
361 }
362 }
363 for w in to_remove {
364 if workers.running.remove(w).is_some() {
365 metrics.execute_worker().on_retired();
366 }
367 }
368}
369
370fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
371 match to_queue {
372 ToQueue::UpdateActiveLeaves { update, ancestors } => {
373 queue.update_active_leaves(update, ancestors);
374 },
375 ToQueue::Enqueue { artifact, pending_execution_request } => {
376 let PendingExecutionRequest {
377 exec_timeout,
378 pvd,
379 pov,
380 executor_params,
381 result_tx,
382 exec_kind,
383 } = pending_execution_request;
384 gum::debug!(
385 target: LOG_TARGET,
386 validation_code_hash = ?artifact.id.code_hash,
387 "enqueueing an artifact for execution",
388 );
389 queue.metrics.observe_pov_size(pov.block_data.0.len(), true);
390 queue.metrics.execute_enqueued();
391 let job = ExecuteJob {
392 artifact,
393 exec_timeout,
394 exec_kind,
395 pvd,
396 pov,
397 executor_params,
398 result_tx,
399 waiting_since: Instant::now(),
400 };
401 queue.unscheduled.add(job, exec_kind.into());
402 queue.try_assign_next_job(None);
403 },
404 }
405}
406
407async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
408 match event {
409 QueueEvent::Spawn(idle, handle, job) => {
410 handle_worker_spawned(queue, idle, handle, job);
411 },
412 QueueEvent::FinishWork(worker, outcome, artifact_id, result_tx) => {
413 handle_job_finish(queue, worker, outcome, artifact_id, result_tx).await;
414 },
415 }
416}
417
418fn handle_worker_spawned(
419 queue: &mut Queue,
420 idle: IdleWorker,
421 handle: WorkerHandle,
422 job: ExecuteJob,
423) {
424 queue.metrics.execute_worker().on_spawned();
425 queue.workers.spawn_inflight -= 1;
426 let worker = queue.workers.running.insert(WorkerData {
427 idle: Some(idle),
428 handle,
429 executor_params_hash: job.executor_params.hash(),
430 });
431
432 gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
433
434 assign(queue, worker, job);
435}
436
437async fn handle_job_finish(
440 queue: &mut Queue,
441 worker: Worker,
442 worker_result: Result<WorkerInterfaceResponse, WorkerInterfaceError>,
443 artifact_id: ArtifactId,
444 result_tx: ResultSender,
445) {
446 let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result {
447 Ok(WorkerInterfaceResponse {
448 worker_response:
449 WorkerResponse {
450 job_response: JobResponse::Ok { result_descriptor },
451 duration,
452 pov_size,
453 },
454 idle_worker,
455 }) => {
456 (Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size))
459 },
460 Ok(WorkerInterfaceResponse {
461 worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
462 idle_worker,
463 }) => (
464 Some(idle_worker),
465 Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
466 None,
467 None,
468 None,
469 ),
470 Ok(WorkerInterfaceResponse {
471 worker_response:
472 WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. },
473 idle_worker,
474 }) => (
475 Some(idle_worker),
476 Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)),
477 None,
478 None,
479 None,
480 ),
481 Ok(WorkerInterfaceResponse {
482 worker_response:
483 WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. },
484 idle_worker,
485 }) => {
486 let (result_tx, result_rx) = oneshot::channel();
489 queue
490 .from_queue_tx
491 .unbounded_send(FromQueue::RemoveArtifact {
492 artifact: artifact_id.clone(),
493 reply_to: result_tx,
494 })
495 .expect("from execute queue receiver is listened by the host; qed");
496 (
497 Some(idle_worker),
498 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(
499 err,
500 ))),
501 None,
502 Some(result_rx),
503 None,
504 )
505 },
506 Ok(WorkerInterfaceResponse {
507 worker_response: WorkerResponse { job_response: JobResponse::CorruptedArtifact, .. },
508 idle_worker,
509 }) => {
510 let (tx, rx) = oneshot::channel();
511 queue
512 .from_queue_tx
513 .unbounded_send(FromQueue::RemoveArtifact {
514 artifact: artifact_id.clone(),
515 reply_to: tx,
516 })
517 .expect("from execute queue receiver is listened by the host; qed");
518 (
519 Some(idle_worker),
520 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::CorruptedArtifact)),
521 None,
522 Some(rx),
523 None,
524 )
525 },
526
527 Err(WorkerInterfaceError::InternalError(err)) |
528 Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
529 (None, Err(ValidationError::Internal(err)), None, None, None),
530 Err(WorkerInterfaceError::HardTimeout) |
533 Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
534 (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None),
535 Err(WorkerInterfaceError::CommunicationErr(_err)) => (
537 None,
538 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
539 None,
540 None,
541 None,
542 ),
543 Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
544 None,
545 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
546 None,
547 None,
548 None,
549 ),
550 Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
551 None,
552 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
553 None,
554 None,
555 None,
556 ),
557 };
558
559 queue.metrics.execute_finished();
560 if let Some(pov_size) = pov_size {
561 queue.metrics.observe_pov_size(pov_size as usize, false)
562 }
563 if let Err(ref err) = result {
564 gum::warn!(
565 target: LOG_TARGET,
566 ?artifact_id,
567 ?worker,
568 worker_rip = idle_worker.is_none(),
569 "execution worker concluded, error occurred: {}",
570 err
571 );
572 } else {
573 gum::trace!(
574 target: LOG_TARGET,
575 ?artifact_id,
576 ?worker,
577 worker_rip = idle_worker.is_none(),
578 ?duration,
579 "execute worker concluded successfully",
580 );
581 }
582
583 if let Some(sync_channel) = sync_channel {
584 let _ = sync_channel.await;
587 }
588
589 let _ = result_tx.send(result);
592
593 if let Some(idle_worker) = idle_worker {
601 if let Some(data) = queue.workers.running.get_mut(worker) {
602 data.idle = Some(idle_worker);
603 return queue.try_assign_next_job(Some(worker))
604 }
605 } else {
606 if queue.workers.running.remove(worker).is_some() {
608 queue.metrics.execute_worker().on_retired();
609 }
610 }
611
612 queue.try_assign_next_job(None);
613}
614
615fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
616 queue.metrics.execute_worker().on_begin_spawn();
617 gum::debug!(target: LOG_TARGET, "spawning an extra worker");
618
619 queue.mux.push(
620 spawn_worker_task(
621 queue.program_path.clone(),
622 queue.cache_path.clone(),
623 job,
624 queue.spawn_timeout,
625 queue.node_version.clone(),
626 queue.security_status.clone(),
627 )
628 .boxed(),
629 );
630 queue.workers.spawn_inflight += 1;
631}
632
633async fn spawn_worker_task(
641 program_path: PathBuf,
642 cache_path: PathBuf,
643 job: ExecuteJob,
644 spawn_timeout: Duration,
645 node_version: Option<String>,
646 security_status: SecurityStatus,
647) -> QueueEvent {
648 use futures_timer::Delay;
649
650 loop {
651 match super::worker_interface::spawn(
652 &program_path,
653 &cache_path,
654 job.executor_params.clone(),
655 spawn_timeout,
656 node_version.as_deref(),
657 security_status.clone(),
658 )
659 .await
660 {
661 Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
662 Err(err) => {
663 gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
664
665 Delay::new(Duration::from_secs(3)).await;
667 },
668 }
669 }
670}
671
672fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
677 gum::debug!(
678 target: LOG_TARGET,
679 validation_code_hash = ?job.artifact.id,
680 ?worker,
681 "assigning the execute worker",
682 );
683
684 debug_assert_eq!(
685 queue
686 .workers
687 .running
688 .get(worker)
689 .expect("caller must provide existing worker; qed")
690 .executor_params_hash,
691 job.executor_params.hash()
692 );
693
694 let idle = queue.workers.claim_idle(worker).expect(
695 "this caller must supply a worker which is idle and running;
696 thus claim_idle cannot return None;
697 qed.",
698 );
699 queue
700 .metrics
701 .observe_execution_queued_time(job.waiting_since.elapsed().as_millis() as u32);
702 let execution_timer = queue.metrics.time_execution();
703 queue.mux.push(
704 async move {
705 let _timer = execution_timer;
706 let result = super::worker_interface::start_work(
707 idle,
708 job.artifact.clone(),
709 job.exec_timeout,
710 job.pvd,
711 job.pov,
712 )
713 .await;
714 QueueEvent::FinishWork(worker, result, job.artifact.id, job.result_tx)
715 }
716 .boxed(),
717 );
718}
719
720pub fn start(
721 metrics: Metrics,
722 program_path: PathBuf,
723 cache_path: PathBuf,
724 worker_capacity: usize,
725 spawn_timeout: Duration,
726 node_version: Option<String>,
727 security_status: SecurityStatus,
728) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
729 let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
730 let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
731
732 let run = Queue::new(
733 metrics,
734 program_path,
735 cache_path,
736 worker_capacity,
737 spawn_timeout,
738 node_version,
739 security_status,
740 to_queue_rx,
741 from_queue_tx,
742 )
743 .run();
744 (to_queue_tx, from_queue_rx, run)
745}
746
747#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)]
752enum Priority {
753 Dispute,
754 Approval,
755 BackingSystemParas,
756 Backing,
757}
758
759impl From<PvfExecKind> for Priority {
760 fn from(kind: PvfExecKind) -> Self {
761 match kind {
762 PvfExecKind::Dispute => Priority::Dispute,
763 PvfExecKind::Approval => Priority::Approval,
764 PvfExecKind::BackingSystemParas(_) => Priority::BackingSystemParas,
765 PvfExecKind::Backing(_) => Priority::Backing,
766 }
767 }
768}
769
770struct Unscheduled {
771 unscheduled: HashMap<Priority, VecDeque<ExecuteJob>>,
772 counter: HashMap<Priority, usize>,
773}
774
775impl Unscheduled {
776 const SCHEDULING_WINDOW_SIZE: usize = 12;
783
784 const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(Priority, usize)] = &[
800 (Priority::Dispute, 70),
801 (Priority::Approval, 80),
802 (Priority::BackingSystemParas, 100),
803 (Priority::Backing, 100),
804 ];
805
806 fn new() -> Self {
807 Self {
808 unscheduled: Priority::iter().map(|priority| (priority, VecDeque::new())).collect(),
809 counter: Priority::iter().map(|priority| (priority, 0)).collect(),
810 }
811 }
812
813 fn select_next_priority(&self) -> Priority {
814 gum::debug!(
815 target: LOG_TARGET,
816 unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::<HashMap<Priority, usize>>(),
817 counter = ?self.counter,
818 "Selecting next execution priority...",
819 );
820
821 let priority = Priority::iter()
822 .find(|priority| self.has_pending(priority) && !self.has_reached_threshold(priority))
823 .unwrap_or_else(|| {
824 Priority::iter()
825 .find(|priority| self.has_pending(priority))
826 .unwrap_or(Priority::Backing)
827 });
828
829 gum::debug!(
830 target: LOG_TARGET,
831 ?priority,
832 "Selected next execution priority",
833 );
834
835 priority
836 }
837
838 fn get_mut(&mut self, priority: Priority) -> Option<&mut VecDeque<ExecuteJob>> {
839 self.unscheduled.get_mut(&priority)
840 }
841
842 fn add(&mut self, job: ExecuteJob, priority: Priority) {
843 self.unscheduled.entry(priority).or_default().push_back(job);
844 }
845
846 fn has_pending(&self, priority: &Priority) -> bool {
847 !self.unscheduled.get(priority).unwrap_or(&VecDeque::new()).is_empty()
848 }
849
850 fn priority_allocation_threshold(priority: &Priority) -> Option<usize> {
851 Self::PRIORITY_ALLOCATION_THRESHOLDS.iter().find_map(|&(p, value)| {
852 if p == *priority {
853 Some(value)
854 } else {
855 None
856 }
857 })
858 }
859
860 fn has_reached_threshold(&self, priority: &Priority) -> bool {
863 let Some(threshold) = Self::priority_allocation_threshold(priority) else { return false };
864 let Some(count) = self.counter.get(&priority) else { return false };
865 let total_scheduled_at_priority_or_lower: usize = self
867 .counter
868 .iter()
869 .filter_map(|(p, c)| if *p >= *priority { Some(c) } else { None })
870 .sum();
871 if total_scheduled_at_priority_or_lower == 0 {
872 return false
873 }
874
875 let has_reached_threshold = count * 100 / total_scheduled_at_priority_or_lower >= threshold;
876
877 gum::debug!(
878 target: LOG_TARGET,
879 ?priority,
880 ?count,
881 ?total_scheduled_at_priority_or_lower,
882 "Execution priority has {}reached threshold: {}/{}%",
883 if has_reached_threshold {""} else {"not "},
884 count * 100 / total_scheduled_at_priority_or_lower,
885 threshold
886 );
887
888 has_reached_threshold
889 }
890
891 fn mark_scheduled(&mut self, priority: Priority) {
892 *self.counter.entry(priority).or_default() += 1;
893
894 if self.counter.values().sum::<usize>() >= Self::SCHEDULING_WINDOW_SIZE {
895 self.reset_counter();
896 }
897 gum::debug!(
898 target: LOG_TARGET,
899 ?priority,
900 "Job marked as scheduled",
901 );
902 }
903
904 fn reset_counter(&mut self) {
905 self.counter = Priority::iter().map(|kind| (kind, 0)).collect();
906 }
907}
908
909#[cfg(test)]
910mod tests {
911 use polkadot_node_primitives::BlockData;
912 use polkadot_node_subsystem_test_helpers::mock::new_leaf;
913 use sp_core::H256;
914
915 use super::*;
916 use crate::testing::artifact_id;
917 use std::time::Duration;
918
919 fn create_execution_job() -> ExecuteJob {
920 let (result_tx, _result_rx) = oneshot::channel();
921 let pvd = Arc::new(PersistedValidationData {
922 parent_head: Default::default(),
923 relay_parent_number: 1u32,
924 relay_parent_storage_root: H256::default(),
925 max_pov_size: 4096 * 1024,
926 });
927 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
928 ExecuteJob {
929 artifact: ArtifactPathId {
930 id: artifact_id(0),
931 path: PathBuf::new(),
932 checksum: Default::default(),
933 },
934 exec_timeout: Duration::from_secs(10),
935 exec_kind: PvfExecKind::Approval,
936 pvd,
937 pov,
938 executor_params: ExecutorParams::default(),
939 result_tx,
940 waiting_since: Instant::now(),
941 }
942 }
943
944 #[test]
945 fn test_unscheduled_add() {
946 let mut unscheduled = Unscheduled::new();
947
948 Priority::iter().for_each(|priority| {
949 unscheduled.add(create_execution_job(), priority);
950 });
951
952 Priority::iter().for_each(|priority| {
953 let queue = unscheduled.unscheduled.get(&priority).unwrap();
954 assert_eq!(queue.len(), 1);
955 });
956 }
957
958 #[test]
959 fn test_unscheduled_priority_distribution() {
960 use Priority::*;
961
962 let mut priorities = vec![];
963
964 let mut unscheduled = Unscheduled::new();
965 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
966 unscheduled.add(create_execution_job(), Dispute);
967 unscheduled.add(create_execution_job(), Approval);
968 unscheduled.add(create_execution_job(), BackingSystemParas);
969 unscheduled.add(create_execution_job(), Backing);
970 }
971
972 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
973 let priority = unscheduled.select_next_priority();
974 priorities.push(priority);
975 unscheduled.mark_scheduled(priority);
976 }
977
978 assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
979 assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
980 assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 1);
981 }
982
983 #[test]
984 fn test_unscheduled_priority_distribution_without_backing_system_paras() {
985 use Priority::*;
986
987 let mut priorities = vec![];
988
989 let mut unscheduled = Unscheduled::new();
990 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
991 unscheduled.add(create_execution_job(), Dispute);
992 unscheduled.add(create_execution_job(), Approval);
993 unscheduled.add(create_execution_job(), Backing);
994 }
995
996 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
997 let priority = unscheduled.select_next_priority();
998 priorities.push(priority);
999 unscheduled.mark_scheduled(priority);
1000 }
1001
1002 assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
1003 assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
1004 assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1005 }
1006
1007 #[test]
1008 fn test_unscheduled_priority_distribution_without_disputes() {
1009 use Priority::*;
1010
1011 let mut priorities = vec![];
1012
1013 let mut unscheduled = Unscheduled::new();
1014 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1015 unscheduled.add(create_execution_job(), Approval);
1016 unscheduled.add(create_execution_job(), BackingSystemParas);
1017 unscheduled.add(create_execution_job(), Backing);
1018 }
1019
1020 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1021 let priority = unscheduled.select_next_priority();
1022 priorities.push(priority);
1023 unscheduled.mark_scheduled(priority);
1024 }
1025
1026 assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 9);
1027 assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 2);
1028 assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1029 }
1030
1031 #[test]
1032 fn test_unscheduled_priority_distribution_without_disputes_and_only_one_backing() {
1033 use Priority::*;
1034
1035 let mut priorities = vec![];
1036
1037 let mut unscheduled = Unscheduled::new();
1038 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1039 unscheduled.add(create_execution_job(), Approval);
1040 }
1041 unscheduled.add(create_execution_job(), Backing);
1042
1043 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1044 let priority = unscheduled.select_next_priority();
1045 priorities.push(priority);
1046 unscheduled.mark_scheduled(priority);
1047 }
1048
1049 assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 11);
1050 assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1051 }
1052
1053 #[test]
1054 fn test_unscheduled_does_not_postpone_backing() {
1055 use Priority::*;
1056
1057 let mut priorities = vec![];
1058
1059 let mut unscheduled = Unscheduled::new();
1060 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1061 unscheduled.add(create_execution_job(), Approval);
1062 }
1063 unscheduled.add(create_execution_job(), Backing);
1064
1065 for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1066 let priority = unscheduled.select_next_priority();
1067 priorities.push(priority);
1068 unscheduled.mark_scheduled(priority);
1069 }
1070
1071 assert_eq!(&priorities[..4], &[Approval, Backing, Approval, Approval]);
1072 }
1073
1074 #[tokio::test]
1075 async fn test_prunes_old_jobs_on_active_leaves_update() {
1076 let (_, to_queue_rx) = mpsc::channel(1);
1078 let (from_queue_tx, _) = mpsc::unbounded();
1079 let mut queue = Queue::new(
1080 Metrics::default(),
1081 PathBuf::new(),
1082 PathBuf::new(),
1083 1,
1084 Duration::from_secs(1),
1085 None,
1086 SecurityStatus::default(),
1087 to_queue_rx,
1088 from_queue_tx,
1089 );
1090 let old_relay_parent = Hash::random();
1091 let relevant_relay_parent = Hash::random();
1092
1093 assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 0);
1094 let mut result_rxs = vec![];
1095 let (result_tx, _result_rx) = oneshot::channel();
1096 let relevant_job = ExecuteJob {
1097 artifact: ArtifactPathId {
1098 id: artifact_id(0),
1099 path: PathBuf::new(),
1100 checksum: Default::default(),
1101 },
1102 exec_timeout: Duration::from_secs(1),
1103 exec_kind: PvfExecKind::Backing(relevant_relay_parent),
1104 pvd: Arc::new(PersistedValidationData::default()),
1105 pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
1106 executor_params: ExecutorParams::default(),
1107 result_tx,
1108 waiting_since: Instant::now(),
1109 };
1110 queue.unscheduled.add(relevant_job, Priority::Backing);
1111 for _ in 0..10 {
1112 let (result_tx, result_rx) = oneshot::channel();
1113 let expired_job = ExecuteJob {
1114 artifact: ArtifactPathId {
1115 id: artifact_id(0),
1116 path: PathBuf::new(),
1117 checksum: Default::default(),
1118 },
1119 exec_timeout: Duration::from_secs(1),
1120 exec_kind: PvfExecKind::Backing(old_relay_parent),
1121 pvd: Arc::new(PersistedValidationData::default()),
1122 pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
1123 executor_params: ExecutorParams::default(),
1124 result_tx,
1125 waiting_since: Instant::now(),
1126 };
1127 queue.unscheduled.add(expired_job, Priority::Backing);
1128 result_rxs.push(result_rx);
1129 }
1130 assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 11);
1131
1132 queue.update_active_leaves(
1134 ActiveLeavesUpdate::start_work(new_leaf(Hash::random(), 1)),
1135 vec![relevant_relay_parent],
1136 );
1137
1138 for rx in result_rxs {
1140 assert!(matches!(rx.await, Ok(Err(ValidationError::ExecutionDeadline))));
1141 }
1142 assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 1);
1143 }
1144}