referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/execute/
queue.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A queue that handles requests for PVF execution.
18
19use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse};
20use crate::{
21	artifacts::{ArtifactId, ArtifactPathId},
22	host::ResultSender,
23	metrics::Metrics,
24	worker_interface::{IdleWorker, WorkerHandle},
25	InvalidCandidate, PossiblyInvalidError, ValidationError, LOG_TARGET,
26};
27use futures::{
28	channel::{mpsc, oneshot},
29	future::BoxFuture,
30	stream::{FuturesUnordered, StreamExt as _},
31	Future, FutureExt,
32};
33use polkadot_node_core_pvf_common::{
34	execute::{JobResponse, WorkerError, WorkerResponse},
35	SecurityStatus,
36};
37use polkadot_node_primitives::PoV;
38use polkadot_node_subsystem::{messages::PvfExecKind, ActiveLeavesUpdate};
39use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, Hash, PersistedValidationData};
40use slotmap::HopSlotMap;
41use std::{
42	collections::{HashMap, VecDeque},
43	fmt,
44	path::PathBuf,
45	sync::Arc,
46	time::{Duration, Instant},
47};
48use strum::{EnumIter, IntoEnumIterator};
49
50/// The amount of time a job for which the queue does not have a compatible worker may wait in the
51/// queue. After that time passes, the queue will kill the first worker which becomes idle to
52/// re-spawn a new worker to execute the job immediately.
53/// To make any sense and not to break things, the value should be greater than minimal execution
54/// timeout in use, and less than the block time.
55const MAX_KEEP_WAITING: Duration = Duration::from_secs(4);
56
57slotmap::new_key_type! { struct Worker; }
58
59#[derive(Debug)]
60pub enum ToQueue {
61	UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec<Hash> },
62	Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
63}
64
65/// A response from queue.
66#[derive(Debug)]
67pub enum FromQueue {
68	RemoveArtifact { artifact: ArtifactId, reply_to: oneshot::Sender<()> },
69}
70
71/// An execution request that should execute the PVF (known in the context) and send the results
72/// to the given result sender.
73#[derive(Debug)]
74pub struct PendingExecutionRequest {
75	pub exec_timeout: Duration,
76	pub pvd: Arc<PersistedValidationData>,
77	pub pov: Arc<PoV>,
78	pub executor_params: ExecutorParams,
79	pub result_tx: ResultSender,
80	pub exec_kind: PvfExecKind,
81}
82
83struct ExecuteJob {
84	artifact: ArtifactPathId,
85	exec_timeout: Duration,
86	exec_kind: PvfExecKind,
87	pvd: Arc<PersistedValidationData>,
88	pov: Arc<PoV>,
89	executor_params: ExecutorParams,
90	result_tx: ResultSender,
91	waiting_since: Instant,
92}
93
94struct WorkerData {
95	idle: Option<IdleWorker>,
96	handle: WorkerHandle,
97	executor_params_hash: ExecutorParamsHash,
98}
99
100impl fmt::Debug for WorkerData {
101	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102		write!(f, "WorkerData(pid={})", self.handle.id())
103	}
104}
105
106struct Workers {
107	/// The registry of running workers.
108	running: HopSlotMap<Worker, WorkerData>,
109
110	/// The number of spawning but not yet spawned workers.
111	spawn_inflight: usize,
112
113	/// The maximum number of workers queue can have at once.
114	capacity: usize,
115}
116
117impl Workers {
118	fn can_afford_one_more(&self) -> bool {
119		self.spawn_inflight + self.running.len() < self.capacity
120	}
121
122	fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option<Worker> {
123		self.running.iter().find_map(|d| {
124			if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
125				Some(d.0)
126			} else {
127				None
128			}
129		})
130	}
131
132	fn find_idle(&self) -> Option<Worker> {
133		self.running
134			.iter()
135			.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
136	}
137
138	/// Find the associated data by the worker token and extract it's [`IdleWorker`] token.
139	///
140	/// Returns `None` if either worker is not recognized or idle token is absent.
141	fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
142		self.running.get_mut(worker)?.idle.take()
143	}
144}
145
146enum QueueEvent {
147	Spawn(IdleWorker, WorkerHandle, ExecuteJob),
148	FinishWork(
149		Worker,
150		Result<WorkerInterfaceResponse, WorkerInterfaceError>,
151		ArtifactId,
152		ResultSender,
153	),
154}
155
156type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
157
158struct Queue {
159	metrics: Metrics,
160
161	/// The receiver that receives messages to the pool.
162	to_queue_rx: mpsc::Receiver<ToQueue>,
163	/// The sender to send messages back to validation host.
164	from_queue_tx: mpsc::UnboundedSender<FromQueue>,
165
166	// Some variables related to the current session.
167	program_path: PathBuf,
168	cache_path: PathBuf,
169	spawn_timeout: Duration,
170	node_version: Option<String>,
171	security_status: SecurityStatus,
172
173	/// The queue of jobs that are waiting for a worker to pick up.
174	unscheduled: Unscheduled,
175	workers: Workers,
176	mux: Mux,
177
178	/// Active leaves and their ancestors to check the viability of backing jobs.
179	active_leaves: HashMap<Hash, Vec<Hash>>,
180}
181
182impl Queue {
183	fn new(
184		metrics: Metrics,
185		program_path: PathBuf,
186		cache_path: PathBuf,
187		worker_capacity: usize,
188		spawn_timeout: Duration,
189		node_version: Option<String>,
190		security_status: SecurityStatus,
191		to_queue_rx: mpsc::Receiver<ToQueue>,
192		from_queue_tx: mpsc::UnboundedSender<FromQueue>,
193	) -> Self {
194		Self {
195			metrics,
196			program_path,
197			cache_path,
198			spawn_timeout,
199			node_version,
200			security_status,
201			to_queue_rx,
202			from_queue_tx,
203			unscheduled: Unscheduled::new(),
204			mux: Mux::new(),
205			workers: Workers {
206				running: HopSlotMap::with_capacity_and_key(10),
207				spawn_inflight: 0,
208				capacity: worker_capacity,
209			},
210			active_leaves: Default::default(),
211		}
212	}
213
214	async fn run(mut self) {
215		loop {
216			futures::select! {
217				to_queue = self.to_queue_rx.next() => {
218					if let Some(to_queue) = to_queue {
219						handle_to_queue(&mut self, to_queue);
220					} else {
221						break;
222					}
223				}
224				ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
225			}
226
227			purge_dead(&self.metrics, &mut self.workers).await;
228		}
229	}
230
231	/// Tries to assign a job in the queue to a worker. If an idle worker is provided, it does its
232	/// best to find a job with a compatible execution environment unless there are jobs in the
233	/// queue waiting too long. In that case, it kills an existing idle worker and spawns a new
234	/// one. It may spawn an additional worker if that is affordable.
235	/// If all the workers are busy or the queue is empty, it does nothing.
236	/// Should be called every time a new job arrives to the queue or a job finishes.
237	fn try_assign_next_job(&mut self, finished_worker: Option<Worker>) {
238		// We always work at the same priority level
239		let priority = self.unscheduled.select_next_priority();
240		let Some(queue) = self.unscheduled.get_mut(priority) else { return };
241
242		// New jobs are always pushed to the tail of the queue based on their priority;
243		// the one at its head of each queue is always the eldest one.
244		let eldest = if let Some(eldest) = queue.get(0) { eldest } else { return };
245
246		// By default, we're going to execute the eldest job on any worker slot available, even if
247		// we have to kill and re-spawn a worker
248		let mut worker = None;
249		let mut job_index = 0;
250
251		// But if we're not pressed for time, we can try to find a better job-worker pair not
252		// requiring the expensive kill-spawn operation
253		if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
254			if let Some(finished_worker) = finished_worker {
255				if let Some(worker_data) = self.workers.running.get(finished_worker) {
256					for (i, job) in queue.iter().enumerate() {
257						if worker_data.executor_params_hash == job.executor_params.hash() {
258							(worker, job_index) = (Some(finished_worker), i);
259							break
260						}
261					}
262				}
263			}
264		}
265
266		if worker.is_none() {
267			// Try to obtain a worker for the job
268			worker = self.workers.find_available(queue[job_index].executor_params.hash());
269		}
270
271		if worker.is_none() {
272			if let Some(idle) = self.workers.find_idle() {
273				// No available workers of required type but there are some idle ones of other
274				// types, have to kill one and re-spawn with the correct type
275				if self.workers.running.remove(idle).is_some() {
276					self.metrics.execute_worker().on_retired();
277				}
278			}
279		}
280
281		if worker.is_none() && !self.workers.can_afford_one_more() {
282			// Bad luck, no worker slot can be used to execute the job
283			return
284		}
285
286		let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed");
287		let exec_kind = job.exec_kind;
288
289		if let Some(worker) = worker {
290			assign(self, worker, job);
291		} else {
292			spawn_extra_worker(self, job);
293		}
294		self.metrics.on_execute_kind(exec_kind);
295		self.unscheduled.mark_scheduled(priority);
296	}
297
298	fn update_active_leaves(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
299		self.prune_deactivated_leaves(&update);
300		self.insert_active_leaf(update, ancestors);
301		self.prune_old_jobs();
302	}
303
304	fn prune_deactivated_leaves(&mut self, update: &ActiveLeavesUpdate) {
305		for hash in &update.deactivated {
306			let _ = self.active_leaves.remove(&hash);
307		}
308	}
309
310	fn insert_active_leaf(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
311		let Some(leaf) = update.activated else { return };
312		let _ = self.active_leaves.insert(leaf.hash, ancestors);
313	}
314
315	fn prune_old_jobs(&mut self) {
316		for &priority in &[Priority::Backing, Priority::BackingSystemParas] {
317			let Some(queue) = self.unscheduled.get_mut(priority) else { continue };
318			let to_remove: Vec<usize> = queue
319				.iter()
320				.enumerate()
321				.filter_map(|(index, job)| {
322					let relay_parent = match job.exec_kind {
323						PvfExecKind::Backing(x) | PvfExecKind::BackingSystemParas(x) => x,
324						_ => return None,
325					};
326					let in_active_fork = self.active_leaves.iter().any(|(hash, ancestors)| {
327						*hash == relay_parent || ancestors.contains(&relay_parent)
328					});
329					if in_active_fork {
330						None
331					} else {
332						Some(index)
333					}
334				})
335				.collect();
336
337			for &index in to_remove.iter().rev() {
338				if index > queue.len() {
339					continue
340				}
341
342				let Some(job) = queue.remove(index) else { continue };
343				let _ = job.result_tx.send(Err(ValidationError::ExecutionDeadline));
344				gum::warn!(
345					target: LOG_TARGET,
346					?priority,
347					exec_kind = ?job.exec_kind,
348					"Job exceeded its deadline and was dropped without execution",
349				);
350			}
351		}
352	}
353}
354
355async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
356	let mut to_remove = vec![];
357	for (worker, data) in workers.running.iter_mut() {
358		if futures::poll!(&mut data.handle).is_ready() {
359			// a resolved future means that the worker has terminated. Weed it out.
360			to_remove.push(worker);
361		}
362	}
363	for w in to_remove {
364		if workers.running.remove(w).is_some() {
365			metrics.execute_worker().on_retired();
366		}
367	}
368}
369
370fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
371	match to_queue {
372		ToQueue::UpdateActiveLeaves { update, ancestors } => {
373			queue.update_active_leaves(update, ancestors);
374		},
375		ToQueue::Enqueue { artifact, pending_execution_request } => {
376			let PendingExecutionRequest {
377				exec_timeout,
378				pvd,
379				pov,
380				executor_params,
381				result_tx,
382				exec_kind,
383			} = pending_execution_request;
384			gum::debug!(
385				target: LOG_TARGET,
386				validation_code_hash = ?artifact.id.code_hash,
387				"enqueueing an artifact for execution",
388			);
389			queue.metrics.observe_pov_size(pov.block_data.0.len(), true);
390			queue.metrics.execute_enqueued();
391			let job = ExecuteJob {
392				artifact,
393				exec_timeout,
394				exec_kind,
395				pvd,
396				pov,
397				executor_params,
398				result_tx,
399				waiting_since: Instant::now(),
400			};
401			queue.unscheduled.add(job, exec_kind.into());
402			queue.try_assign_next_job(None);
403		},
404	}
405}
406
407async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
408	match event {
409		QueueEvent::Spawn(idle, handle, job) => {
410			handle_worker_spawned(queue, idle, handle, job);
411		},
412		QueueEvent::FinishWork(worker, outcome, artifact_id, result_tx) => {
413			handle_job_finish(queue, worker, outcome, artifact_id, result_tx).await;
414		},
415	}
416}
417
418fn handle_worker_spawned(
419	queue: &mut Queue,
420	idle: IdleWorker,
421	handle: WorkerHandle,
422	job: ExecuteJob,
423) {
424	queue.metrics.execute_worker().on_spawned();
425	queue.workers.spawn_inflight -= 1;
426	let worker = queue.workers.running.insert(WorkerData {
427		idle: Some(idle),
428		handle,
429		executor_params_hash: job.executor_params.hash(),
430	});
431
432	gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
433
434	assign(queue, worker, job);
435}
436
437/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
438/// worker. Otherwise, puts back into the available workers list.
439async fn handle_job_finish(
440	queue: &mut Queue,
441	worker: Worker,
442	worker_result: Result<WorkerInterfaceResponse, WorkerInterfaceError>,
443	artifact_id: ArtifactId,
444	result_tx: ResultSender,
445) {
446	let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result {
447		Ok(WorkerInterfaceResponse {
448			worker_response:
449				WorkerResponse {
450					job_response: JobResponse::Ok { result_descriptor },
451					duration,
452					pov_size,
453				},
454			idle_worker,
455		}) => {
456			// TODO: propagate the soft timeout
457
458			(Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size))
459		},
460		Ok(WorkerInterfaceResponse {
461			worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
462			idle_worker,
463		}) => (
464			Some(idle_worker),
465			Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
466			None,
467			None,
468			None,
469		),
470		Ok(WorkerInterfaceResponse {
471			worker_response:
472				WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. },
473			idle_worker,
474		}) => (
475			Some(idle_worker),
476			Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)),
477			None,
478			None,
479			None,
480		),
481		Ok(WorkerInterfaceResponse {
482			worker_response:
483				WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. },
484			idle_worker,
485		}) => {
486			// The task for artifact removal is executed concurrently with
487			// the message to the host on the execution result.
488			let (result_tx, result_rx) = oneshot::channel();
489			queue
490				.from_queue_tx
491				.unbounded_send(FromQueue::RemoveArtifact {
492					artifact: artifact_id.clone(),
493					reply_to: result_tx,
494				})
495				.expect("from execute queue receiver is listened by the host; qed");
496			(
497				Some(idle_worker),
498				Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(
499					err,
500				))),
501				None,
502				Some(result_rx),
503				None,
504			)
505		},
506		Ok(WorkerInterfaceResponse {
507			worker_response: WorkerResponse { job_response: JobResponse::CorruptedArtifact, .. },
508			idle_worker,
509		}) => {
510			let (tx, rx) = oneshot::channel();
511			queue
512				.from_queue_tx
513				.unbounded_send(FromQueue::RemoveArtifact {
514					artifact: artifact_id.clone(),
515					reply_to: tx,
516				})
517				.expect("from execute queue receiver is listened by the host; qed");
518			(
519				Some(idle_worker),
520				Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::CorruptedArtifact)),
521				None,
522				Some(rx),
523				None,
524			)
525		},
526
527		Err(WorkerInterfaceError::InternalError(err)) |
528		Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
529			(None, Err(ValidationError::Internal(err)), None, None, None),
530		// Either the worker or the job timed out. Kill the worker in either case. Treated as
531		// definitely-invalid, because if we timed out, there's no time left for a retry.
532		Err(WorkerInterfaceError::HardTimeout) |
533		Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
534			(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None),
535		// "Maybe invalid" errors (will retry).
536		Err(WorkerInterfaceError::CommunicationErr(_err)) => (
537			None,
538			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
539			None,
540			None,
541			None,
542		),
543		Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
544			None,
545			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
546			None,
547			None,
548			None,
549		),
550		Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
551			None,
552			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
553			None,
554			None,
555			None,
556		),
557	};
558
559	queue.metrics.execute_finished();
560	if let Some(pov_size) = pov_size {
561		queue.metrics.observe_pov_size(pov_size as usize, false)
562	}
563	if let Err(ref err) = result {
564		gum::warn!(
565			target: LOG_TARGET,
566			?artifact_id,
567			?worker,
568			worker_rip = idle_worker.is_none(),
569			"execution worker concluded, error occurred: {}",
570			err
571		);
572	} else {
573		gum::trace!(
574			target: LOG_TARGET,
575			?artifact_id,
576			?worker,
577			worker_rip = idle_worker.is_none(),
578			?duration,
579			"execute worker concluded successfully",
580		);
581	}
582
583	if let Some(sync_channel) = sync_channel {
584		// err means the sender is dropped (the artifact is already removed from the cache)
585		// so that's legitimate to ignore the result
586		let _ = sync_channel.await;
587	}
588
589	// First we send the result. It may fail due to the other end of the channel being dropped,
590	// that's legitimate and we don't treat that as an error.
591	let _ = result_tx.send(result);
592
593	// Then, we should deal with the worker:
594	//
595	// - if the `idle_worker` token was returned we should either schedule the next task or just put
596	//   it back so that the next incoming job will be able to claim it
597	//
598	// - if the `idle_worker` token was consumed, all the metadata pertaining to that worker should
599	//   be removed.
600	if let Some(idle_worker) = idle_worker {
601		if let Some(data) = queue.workers.running.get_mut(worker) {
602			data.idle = Some(idle_worker);
603			return queue.try_assign_next_job(Some(worker))
604		}
605	} else {
606		// Note it's possible that the worker was purged already by `purge_dead`
607		if queue.workers.running.remove(worker).is_some() {
608			queue.metrics.execute_worker().on_retired();
609		}
610	}
611
612	queue.try_assign_next_job(None);
613}
614
615fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
616	queue.metrics.execute_worker().on_begin_spawn();
617	gum::debug!(target: LOG_TARGET, "spawning an extra worker");
618
619	queue.mux.push(
620		spawn_worker_task(
621			queue.program_path.clone(),
622			queue.cache_path.clone(),
623			job,
624			queue.spawn_timeout,
625			queue.node_version.clone(),
626			queue.security_status.clone(),
627		)
628		.boxed(),
629	);
630	queue.workers.spawn_inflight += 1;
631}
632
633/// Spawns a new worker to execute a pre-assigned job.
634/// A worker is never spawned as idle; a job to be executed by the worker has to be determined
635/// beforehand. In such a way, a race condition is avoided: during the worker being spawned,
636/// another job in the queue, with an incompatible execution environment, may become stale, and
637/// the queue would have to kill a newly started worker and spawn another one.
638/// Nevertheless, if the worker finishes executing the job, it becomes idle and may be used to
639/// execute other jobs with a compatible execution environment.
640async fn spawn_worker_task(
641	program_path: PathBuf,
642	cache_path: PathBuf,
643	job: ExecuteJob,
644	spawn_timeout: Duration,
645	node_version: Option<String>,
646	security_status: SecurityStatus,
647) -> QueueEvent {
648	use futures_timer::Delay;
649
650	loop {
651		match super::worker_interface::spawn(
652			&program_path,
653			&cache_path,
654			job.executor_params.clone(),
655			spawn_timeout,
656			node_version.as_deref(),
657			security_status.clone(),
658		)
659		.await
660		{
661			Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
662			Err(err) => {
663				gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
664
665				// Assume that the failure is intermittent and retry after a delay.
666				Delay::new(Duration::from_secs(3)).await;
667			},
668		}
669	}
670}
671
672/// Ask the given worker to perform the given job.
673///
674/// The worker must be running and idle. The job and the worker must share the same execution
675/// environment parameter set.
676fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
677	gum::debug!(
678		target: LOG_TARGET,
679		validation_code_hash = ?job.artifact.id,
680		?worker,
681		"assigning the execute worker",
682	);
683
684	debug_assert_eq!(
685		queue
686			.workers
687			.running
688			.get(worker)
689			.expect("caller must provide existing worker; qed")
690			.executor_params_hash,
691		job.executor_params.hash()
692	);
693
694	let idle = queue.workers.claim_idle(worker).expect(
695		"this caller must supply a worker which is idle and running;
696			thus claim_idle cannot return None;
697			qed.",
698	);
699	queue
700		.metrics
701		.observe_execution_queued_time(job.waiting_since.elapsed().as_millis() as u32);
702	let execution_timer = queue.metrics.time_execution();
703	queue.mux.push(
704		async move {
705			let _timer = execution_timer;
706			let result = super::worker_interface::start_work(
707				idle,
708				job.artifact.clone(),
709				job.exec_timeout,
710				job.pvd,
711				job.pov,
712			)
713			.await;
714			QueueEvent::FinishWork(worker, result, job.artifact.id, job.result_tx)
715		}
716		.boxed(),
717	);
718}
719
720pub fn start(
721	metrics: Metrics,
722	program_path: PathBuf,
723	cache_path: PathBuf,
724	worker_capacity: usize,
725	spawn_timeout: Duration,
726	node_version: Option<String>,
727	security_status: SecurityStatus,
728) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
729	let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
730	let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
731
732	let run = Queue::new(
733		metrics,
734		program_path,
735		cache_path,
736		worker_capacity,
737		spawn_timeout,
738		node_version,
739		security_status,
740		to_queue_rx,
741		from_queue_tx,
742	)
743	.run();
744	(to_queue_tx, from_queue_rx, run)
745}
746
747/// Priority of execution jobs based on PvfExecKind.
748///
749/// The order is important, because we iterate through the values and assume it is going from higher
750/// to lowest priority.
751#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)]
752enum Priority {
753	Dispute,
754	Approval,
755	BackingSystemParas,
756	Backing,
757}
758
759impl From<PvfExecKind> for Priority {
760	fn from(kind: PvfExecKind) -> Self {
761		match kind {
762			PvfExecKind::Dispute => Priority::Dispute,
763			PvfExecKind::Approval => Priority::Approval,
764			PvfExecKind::BackingSystemParas(_) => Priority::BackingSystemParas,
765			PvfExecKind::Backing(_) => Priority::Backing,
766		}
767	}
768}
769
770struct Unscheduled {
771	unscheduled: HashMap<Priority, VecDeque<ExecuteJob>>,
772	counter: HashMap<Priority, usize>,
773}
774
775impl Unscheduled {
776	/// We keep track of every scheduled job in the `counter`, but reset it if the total number of
777	/// counted jobs reaches the threshold. This number is set as the maximum amount of jobs per
778	/// relay chain block possible with 4 CPU cores and 2 seconds of execution time. Under normal
779	/// conditions, the maximum expected queue size is at least vrf_module_samples(6) + 1 for
780	/// backing a parachain candidate. A buffer is added to cover situations where more work
781	/// arrives in the queue.
782	const SCHEDULING_WINDOW_SIZE: usize = 12;
783
784	/// A threshold in percentages indicates how much time a current priority can "steal" from lower
785	/// priorities. Given the `SCHEDULING_WINDOW_SIZE` is 12 and all job priorities are present:
786	/// - Disputes consume 70% or 8 jobs in a row.
787	/// - The remaining 30% of original 100% is allocated for approval and all backing jobs.
788	/// - 80% or 3 jobs of the remaining goes to approvals.
789	/// - The remaining 6% of original 100% is allocated for all backing jobs.
790	/// - 100% or 1 job of the remaining goes to backing system parachains.
791	/// - Nothing is left for backing.
792	/// - The counter is restarted and the distribution starts from the beginning.
793	///
794	/// This system might seem complex, but we operate with the remaining percentages because:
795	/// - Not all job types are present in each block. If we used parts of the original 100%,
796	///   approvals could not exceed 24%, even if there are no disputes.
797	/// - We cannot fully prioritize backing system parachains over backing other parachains based
798	///   on the distribution of the original 100%.
799	const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(Priority, usize)] = &[
800		(Priority::Dispute, 70),
801		(Priority::Approval, 80),
802		(Priority::BackingSystemParas, 100),
803		(Priority::Backing, 100),
804	];
805
806	fn new() -> Self {
807		Self {
808			unscheduled: Priority::iter().map(|priority| (priority, VecDeque::new())).collect(),
809			counter: Priority::iter().map(|priority| (priority, 0)).collect(),
810		}
811	}
812
813	fn select_next_priority(&self) -> Priority {
814		gum::debug!(
815			target: LOG_TARGET,
816			unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::<HashMap<Priority, usize>>(),
817			counter = ?self.counter,
818			"Selecting next execution priority...",
819		);
820
821		let priority = Priority::iter()
822			.find(|priority| self.has_pending(priority) && !self.has_reached_threshold(priority))
823			.unwrap_or_else(|| {
824				Priority::iter()
825					.find(|priority| self.has_pending(priority))
826					.unwrap_or(Priority::Backing)
827			});
828
829		gum::debug!(
830			target: LOG_TARGET,
831			?priority,
832			"Selected next execution priority",
833		);
834
835		priority
836	}
837
838	fn get_mut(&mut self, priority: Priority) -> Option<&mut VecDeque<ExecuteJob>> {
839		self.unscheduled.get_mut(&priority)
840	}
841
842	fn add(&mut self, job: ExecuteJob, priority: Priority) {
843		self.unscheduled.entry(priority).or_default().push_back(job);
844	}
845
846	fn has_pending(&self, priority: &Priority) -> bool {
847		!self.unscheduled.get(priority).unwrap_or(&VecDeque::new()).is_empty()
848	}
849
850	fn priority_allocation_threshold(priority: &Priority) -> Option<usize> {
851		Self::PRIORITY_ALLOCATION_THRESHOLDS.iter().find_map(|&(p, value)| {
852			if p == *priority {
853				Some(value)
854			} else {
855				None
856			}
857		})
858	}
859
860	/// Checks if a given priority has reached its allocated threshold
861	/// The thresholds are defined in `PRIORITY_ALLOCATION_THRESHOLDS`.
862	fn has_reached_threshold(&self, priority: &Priority) -> bool {
863		let Some(threshold) = Self::priority_allocation_threshold(priority) else { return false };
864		let Some(count) = self.counter.get(&priority) else { return false };
865		// Every time we iterate by lower level priorities
866		let total_scheduled_at_priority_or_lower: usize = self
867			.counter
868			.iter()
869			.filter_map(|(p, c)| if *p >= *priority { Some(c) } else { None })
870			.sum();
871		if total_scheduled_at_priority_or_lower == 0 {
872			return false
873		}
874
875		let has_reached_threshold = count * 100 / total_scheduled_at_priority_or_lower >= threshold;
876
877		gum::debug!(
878			target: LOG_TARGET,
879			?priority,
880			?count,
881			?total_scheduled_at_priority_or_lower,
882			"Execution priority has {}reached threshold: {}/{}%",
883			if has_reached_threshold {""} else {"not "},
884			count * 100 / total_scheduled_at_priority_or_lower,
885			threshold
886		);
887
888		has_reached_threshold
889	}
890
891	fn mark_scheduled(&mut self, priority: Priority) {
892		*self.counter.entry(priority).or_default() += 1;
893
894		if self.counter.values().sum::<usize>() >= Self::SCHEDULING_WINDOW_SIZE {
895			self.reset_counter();
896		}
897		gum::debug!(
898			target: LOG_TARGET,
899			?priority,
900			"Job marked as scheduled",
901		);
902	}
903
904	fn reset_counter(&mut self) {
905		self.counter = Priority::iter().map(|kind| (kind, 0)).collect();
906	}
907}
908
909#[cfg(test)]
910mod tests {
911	use polkadot_node_primitives::BlockData;
912	use polkadot_node_subsystem_test_helpers::mock::new_leaf;
913	use sp_core::H256;
914
915	use super::*;
916	use crate::testing::artifact_id;
917	use std::time::Duration;
918
919	fn create_execution_job() -> ExecuteJob {
920		let (result_tx, _result_rx) = oneshot::channel();
921		let pvd = Arc::new(PersistedValidationData {
922			parent_head: Default::default(),
923			relay_parent_number: 1u32,
924			relay_parent_storage_root: H256::default(),
925			max_pov_size: 4096 * 1024,
926		});
927		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
928		ExecuteJob {
929			artifact: ArtifactPathId {
930				id: artifact_id(0),
931				path: PathBuf::new(),
932				checksum: Default::default(),
933			},
934			exec_timeout: Duration::from_secs(10),
935			exec_kind: PvfExecKind::Approval,
936			pvd,
937			pov,
938			executor_params: ExecutorParams::default(),
939			result_tx,
940			waiting_since: Instant::now(),
941		}
942	}
943
944	#[test]
945	fn test_unscheduled_add() {
946		let mut unscheduled = Unscheduled::new();
947
948		Priority::iter().for_each(|priority| {
949			unscheduled.add(create_execution_job(), priority);
950		});
951
952		Priority::iter().for_each(|priority| {
953			let queue = unscheduled.unscheduled.get(&priority).unwrap();
954			assert_eq!(queue.len(), 1);
955		});
956	}
957
958	#[test]
959	fn test_unscheduled_priority_distribution() {
960		use Priority::*;
961
962		let mut priorities = vec![];
963
964		let mut unscheduled = Unscheduled::new();
965		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
966			unscheduled.add(create_execution_job(), Dispute);
967			unscheduled.add(create_execution_job(), Approval);
968			unscheduled.add(create_execution_job(), BackingSystemParas);
969			unscheduled.add(create_execution_job(), Backing);
970		}
971
972		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
973			let priority = unscheduled.select_next_priority();
974			priorities.push(priority);
975			unscheduled.mark_scheduled(priority);
976		}
977
978		assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
979		assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
980		assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 1);
981	}
982
983	#[test]
984	fn test_unscheduled_priority_distribution_without_backing_system_paras() {
985		use Priority::*;
986
987		let mut priorities = vec![];
988
989		let mut unscheduled = Unscheduled::new();
990		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
991			unscheduled.add(create_execution_job(), Dispute);
992			unscheduled.add(create_execution_job(), Approval);
993			unscheduled.add(create_execution_job(), Backing);
994		}
995
996		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
997			let priority = unscheduled.select_next_priority();
998			priorities.push(priority);
999			unscheduled.mark_scheduled(priority);
1000		}
1001
1002		assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
1003		assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
1004		assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1005	}
1006
1007	#[test]
1008	fn test_unscheduled_priority_distribution_without_disputes() {
1009		use Priority::*;
1010
1011		let mut priorities = vec![];
1012
1013		let mut unscheduled = Unscheduled::new();
1014		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1015			unscheduled.add(create_execution_job(), Approval);
1016			unscheduled.add(create_execution_job(), BackingSystemParas);
1017			unscheduled.add(create_execution_job(), Backing);
1018		}
1019
1020		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1021			let priority = unscheduled.select_next_priority();
1022			priorities.push(priority);
1023			unscheduled.mark_scheduled(priority);
1024		}
1025
1026		assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 9);
1027		assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 2);
1028		assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1029	}
1030
1031	#[test]
1032	fn test_unscheduled_priority_distribution_without_disputes_and_only_one_backing() {
1033		use Priority::*;
1034
1035		let mut priorities = vec![];
1036
1037		let mut unscheduled = Unscheduled::new();
1038		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1039			unscheduled.add(create_execution_job(), Approval);
1040		}
1041		unscheduled.add(create_execution_job(), Backing);
1042
1043		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1044			let priority = unscheduled.select_next_priority();
1045			priorities.push(priority);
1046			unscheduled.mark_scheduled(priority);
1047		}
1048
1049		assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 11);
1050		assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
1051	}
1052
1053	#[test]
1054	fn test_unscheduled_does_not_postpone_backing() {
1055		use Priority::*;
1056
1057		let mut priorities = vec![];
1058
1059		let mut unscheduled = Unscheduled::new();
1060		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1061			unscheduled.add(create_execution_job(), Approval);
1062		}
1063		unscheduled.add(create_execution_job(), Backing);
1064
1065		for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
1066			let priority = unscheduled.select_next_priority();
1067			priorities.push(priority);
1068			unscheduled.mark_scheduled(priority);
1069		}
1070
1071		assert_eq!(&priorities[..4], &[Approval, Backing, Approval, Approval]);
1072	}
1073
1074	#[tokio::test]
1075	async fn test_prunes_old_jobs_on_active_leaves_update() {
1076		// Set up a queue, but without a real worker, we won't execute any jobs.
1077		let (_, to_queue_rx) = mpsc::channel(1);
1078		let (from_queue_tx, _) = mpsc::unbounded();
1079		let mut queue = Queue::new(
1080			Metrics::default(),
1081			PathBuf::new(),
1082			PathBuf::new(),
1083			1,
1084			Duration::from_secs(1),
1085			None,
1086			SecurityStatus::default(),
1087			to_queue_rx,
1088			from_queue_tx,
1089		);
1090		let old_relay_parent = Hash::random();
1091		let relevant_relay_parent = Hash::random();
1092
1093		assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 0);
1094		let mut result_rxs = vec![];
1095		let (result_tx, _result_rx) = oneshot::channel();
1096		let relevant_job = ExecuteJob {
1097			artifact: ArtifactPathId {
1098				id: artifact_id(0),
1099				path: PathBuf::new(),
1100				checksum: Default::default(),
1101			},
1102			exec_timeout: Duration::from_secs(1),
1103			exec_kind: PvfExecKind::Backing(relevant_relay_parent),
1104			pvd: Arc::new(PersistedValidationData::default()),
1105			pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
1106			executor_params: ExecutorParams::default(),
1107			result_tx,
1108			waiting_since: Instant::now(),
1109		};
1110		queue.unscheduled.add(relevant_job, Priority::Backing);
1111		for _ in 0..10 {
1112			let (result_tx, result_rx) = oneshot::channel();
1113			let expired_job = ExecuteJob {
1114				artifact: ArtifactPathId {
1115					id: artifact_id(0),
1116					path: PathBuf::new(),
1117					checksum: Default::default(),
1118				},
1119				exec_timeout: Duration::from_secs(1),
1120				exec_kind: PvfExecKind::Backing(old_relay_parent),
1121				pvd: Arc::new(PersistedValidationData::default()),
1122				pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
1123				executor_params: ExecutorParams::default(),
1124				result_tx,
1125				waiting_since: Instant::now(),
1126			};
1127			queue.unscheduled.add(expired_job, Priority::Backing);
1128			result_rxs.push(result_rx);
1129		}
1130		assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 11);
1131
1132		// Add an active leaf
1133		queue.update_active_leaves(
1134			ActiveLeavesUpdate::start_work(new_leaf(Hash::random(), 1)),
1135			vec![relevant_relay_parent],
1136		);
1137
1138		// It prunes all old jobs and drops them with an `ExecutionDeadline` error.
1139		for rx in result_rxs {
1140			assert!(matches!(rx.await, Ok(Err(ValidationError::ExecutionDeadline))));
1141		}
1142		assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 1);
1143	}
1144}