referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/
host.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Validation host - is the primary interface for this crate. It allows the clients to enqueue
18//! jobs for PVF execution or preparation.
19//!
20//! The validation host is represented by a future/task that runs an event-loop and by a handle,
21//! [`ValidationHost`], that allows communication with that event-loop.
22
23use crate::{
24	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
25	execute::{self, PendingExecutionRequest},
26	metrics::Metrics,
27	prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
28};
29use always_assert::never;
30use futures::{
31	channel::{mpsc, oneshot},
32	Future, FutureExt, SinkExt, StreamExt,
33};
34#[cfg(feature = "test-utils")]
35use polkadot_node_core_pvf_common::ArtifactChecksum;
36use polkadot_node_core_pvf_common::{
37	error::{PrecheckResult, PrepareError},
38	prepare::PrepareSuccess,
39	pvf::PvfPrepData,
40};
41use polkadot_node_subsystem::{
42	messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
43};
44use polkadot_parachain_primitives::primitives::ValidationResult;
45use polkadot_primitives::Hash;
46use std::{
47	collections::HashMap,
48	path::PathBuf,
49	time::{Duration, SystemTime},
50};
51
52/// The time period after which a failed preparation artifact is considered ready to be retried.
53/// Note that we will only retry if another request comes in after this cooldown has passed.
54#[cfg(not(test))]
55pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
56#[cfg(test)]
57pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
58
59/// The amount of times we will retry failed prepare jobs.
60pub const NUM_PREPARE_RETRIES: u32 = 5;
61
62/// The name of binary spawned to prepare a PVF artifact
63pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
64
65/// The name of binary spawned to execute a PVF
66pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
67
68/// The size of incoming message queue
69pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
70
71/// An alias to not spell the type for the oneshot sender for the PVF execution result.
72pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
73
74/// Transmission end used for sending the PVF preparation result.
75pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
76
77/// A handle to the async process serving the validation host requests.
78#[derive(Clone)]
79pub struct ValidationHost {
80	to_host_tx: mpsc::Sender<ToHost>,
81	/// Available security features, detected by the host during startup.
82	pub security_status: SecurityStatus,
83}
84
85impl ValidationHost {
86	/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time
87	/// limit. This will prepare the PVF. The result of preparation will be sent to the provided
88	/// result sender.
89	///
90	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
91	/// situations this function should return immediately.
92	///
93	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
94	pub async fn precheck_pvf(
95		&mut self,
96		pvf: PvfPrepData,
97		result_tx: PrecheckResultSender,
98	) -> Result<(), String> {
99		self.to_host_tx
100			.send(ToHost::PrecheckPvf { pvf, result_tx })
101			.await
102			.map_err(|_| "the inner loop hung up".to_string())
103	}
104
105	/// Execute PVF with the given code, execution timeout, parameters and priority.
106	/// The result of execution will be sent to the provided result sender.
107	///
108	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
109	/// situations this function should return immediately.
110	///
111	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
112	pub async fn execute_pvf(
113		&mut self,
114		pvf: PvfPrepData,
115		validation_context: polkadot_node_core_pvf_common::execute::ValidationContext,
116		priority: Priority,
117		exec_kind: PvfExecKind,
118		result_tx: ResultSender,
119	) -> Result<(), String> {
120		self.to_host_tx
121			.send(ToHost::ExecutePvf(ExecutePvfInputs {
122				pvf,
123				validation_context,
124				priority,
125				exec_kind,
126				result_tx,
127			}))
128			.await
129			.map_err(|_| "the inner loop hung up".to_string())
130	}
131
132	/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
133	///
134	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
135	/// situations this function should return immediately.
136	///
137	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
138	pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
139		self.to_host_tx
140			.send(ToHost::HeadsUp { active_pvfs })
141			.await
142			.map_err(|_| "the inner loop hung up".to_string())
143	}
144
145	/// Sends a signal to the validation host requesting to update best block.
146	///
147	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
148	pub async fn update_active_leaves(
149		&mut self,
150		update: ActiveLeavesUpdate,
151		ancestors: Vec<Hash>,
152	) -> Result<(), String> {
153		self.to_host_tx
154			.send(ToHost::UpdateActiveLeaves { update, ancestors })
155			.await
156			.map_err(|_| "the inner loop hung up".to_string())
157	}
158
159	/// Replace the artifact checksum with a new one.
160	///
161	/// Only for test purposes to imitate a corruption of the artifact on disk.
162	#[cfg(feature = "test-utils")]
163	pub async fn replace_artifact_checksum(
164		&mut self,
165		checksum: ArtifactChecksum,
166		new_checksum: ArtifactChecksum,
167	) -> Result<(), String> {
168		self.to_host_tx
169			.send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum })
170			.await
171			.map_err(|_| "the inner loop hung up".to_string())
172	}
173}
174
175enum ToHost {
176	PrecheckPvf {
177		pvf: PvfPrepData,
178		result_tx: PrecheckResultSender,
179	},
180	ExecutePvf(ExecutePvfInputs),
181	HeadsUp {
182		active_pvfs: Vec<PvfPrepData>,
183	},
184	UpdateActiveLeaves {
185		update: ActiveLeavesUpdate,
186		ancestors: Vec<Hash>,
187	},
188	#[cfg(feature = "test-utils")]
189	ReplaceArtifactChecksum {
190		checksum: ArtifactChecksum,
191		new_checksum: ArtifactChecksum,
192	},
193}
194
195struct ExecutePvfInputs {
196	pvf: PvfPrepData,
197	validation_context: polkadot_node_core_pvf_common::execute::ValidationContext,
198	priority: Priority,
199	exec_kind: PvfExecKind,
200	result_tx: ResultSender,
201}
202
203/// Configuration for the validation host.
204#[derive(Debug)]
205pub struct Config {
206	/// The root directory where the prepared artifacts can be stored.
207	pub cache_path: PathBuf,
208	/// The version of the node. `None` can be passed to skip the version check (only for tests).
209	pub node_version: Option<String>,
210	/// Whether the node is attempting to run as a secure validator.
211	pub secure_validator_mode: bool,
212
213	/// The path to the program that can be used to spawn the prepare workers.
214	pub prepare_worker_program_path: PathBuf,
215	/// The time allotted for a prepare worker to spawn and report to the host.
216	pub prepare_worker_spawn_timeout: Duration,
217	/// The maximum number of workers that can be spawned in the prepare pool for tasks with the
218	/// priority below critical.
219	pub prepare_workers_soft_max_num: usize,
220	/// The absolute number of workers that can be spawned in the prepare pool.
221	pub prepare_workers_hard_max_num: usize,
222
223	/// The path to the program that can be used to spawn the execute workers.
224	pub execute_worker_program_path: PathBuf,
225	/// The time allotted for an execute worker to spawn and report to the host.
226	pub execute_worker_spawn_timeout: Duration,
227	/// The maximum number of execute workers that can run at the same time.
228	pub execute_workers_max_num: usize,
229}
230
231impl Config {
232	/// Create a new instance of the configuration.
233	pub fn new(
234		cache_path: PathBuf,
235		node_version: Option<String>,
236		secure_validator_mode: bool,
237		prepare_worker_program_path: PathBuf,
238		execute_worker_program_path: PathBuf,
239		execute_workers_max_num: usize,
240		prepare_workers_soft_max_num: usize,
241		prepare_workers_hard_max_num: usize,
242	) -> Self {
243		Self {
244			cache_path,
245			node_version,
246			secure_validator_mode,
247
248			prepare_worker_program_path,
249			prepare_worker_spawn_timeout: Duration::from_secs(3),
250			prepare_workers_soft_max_num,
251			prepare_workers_hard_max_num,
252
253			execute_worker_program_path,
254			execute_worker_spawn_timeout: Duration::from_secs(3),
255			execute_workers_max_num,
256		}
257	}
258}
259
260/// Start the validation host.
261///
262/// Returns a [handle][`ValidationHost`] to the started validation host and the future. The future
263/// must be polled in order for validation host to function.
264///
265/// The future should not return normally but if it does then that indicates an unrecoverable error.
266/// In that case all pending requests will be canceled, dropping the result senders and new ones
267/// will be rejected.
268pub async fn start(
269	config: Config,
270	metrics: Metrics,
271) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
272	gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
273
274	// Make sure the cache is initialized before doing anything else.
275	let artifacts = Artifacts::new(&config.cache_path).await;
276
277	// Run checks for supported security features once per host startup. If some checks fail, warn
278	// if Secure Validator Mode is disabled and return an error otherwise.
279	#[cfg(target_os = "linux")]
280	let security_status = match crate::security::check_security_status(&config).await {
281		Ok(ok) => ok,
282		Err(err) => return Err(SubsystemError::Context(err)),
283	};
284	#[cfg(not(target_os = "linux"))]
285	let security_status = if config.secure_validator_mode {
286		gum::error!(
287			target: LOG_TARGET,
288			"{}{}{}",
289			crate::SECURE_MODE_ERROR,
290			crate::SECURE_LINUX_NOTE,
291			crate::IGNORE_SECURE_MODE_TIP
292		);
293		return Err(SubsystemError::Context(
294			"could not enable Secure Validator Mode for non-Linux; check logs".into(),
295		));
296	} else {
297		gum::warn!(
298			target: LOG_TARGET,
299			"{}{}",
300			crate::SECURE_MODE_WARNING,
301			crate::SECURE_LINUX_NOTE,
302		);
303		SecurityStatus::default()
304	};
305
306	let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
307
308	let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
309
310	let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
311		metrics.clone(),
312		config.prepare_worker_program_path.clone(),
313		config.cache_path.clone(),
314		config.prepare_worker_spawn_timeout,
315		config.node_version.clone(),
316		security_status.clone(),
317	);
318
319	let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
320		metrics.clone(),
321		config.prepare_workers_soft_max_num,
322		config.prepare_workers_hard_max_num,
323		config.cache_path.clone(),
324		to_prepare_pool,
325		from_prepare_pool,
326	);
327
328	let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
329		metrics,
330		config.execute_worker_program_path.to_owned(),
331		config.cache_path.clone(),
332		config.execute_workers_max_num,
333		config.execute_worker_spawn_timeout,
334		config.node_version,
335		security_status,
336	);
337
338	let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
339	let run_sweeper = sweeper_task(to_sweeper_rx);
340
341	let run_host = async move {
342		run(Inner {
343			cleanup_pulse_interval: Duration::from_secs(3600),
344			cleanup_config: ArtifactsCleanupConfig::default(),
345			artifacts,
346			to_host_rx,
347			to_prepare_queue_tx,
348			from_prepare_queue_rx,
349			to_execute_queue_tx,
350			from_execute_queue_rx,
351			to_sweeper_tx,
352			awaiting_prepare: AwaitingPrepare::default(),
353		})
354		.await
355	};
356
357	let task = async move {
358		// Bundle the sub-components' tasks together into a single future.
359		futures::select! {
360			_ = run_host.fuse() => {},
361			_ = run_prepare_queue.fuse() => {},
362			_ = run_prepare_pool.fuse() => {},
363			_ = run_execute_queue.fuse() => {},
364			_ = run_sweeper.fuse() => {},
365		};
366	};
367
368	Ok((validation_host, task))
369}
370
371/// A mapping from an artifact ID which is in preparation state to the list of pending execution
372/// requests that should be executed once the artifact's preparation is finished.
373#[derive(Default)]
374struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
375
376impl AwaitingPrepare {
377	fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
378		self.0.entry(artifact_id).or_default().push(pending_execution_request);
379	}
380
381	fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
382		self.0.remove(artifact_id).unwrap_or_default()
383	}
384}
385
386struct Inner {
387	cleanup_pulse_interval: Duration,
388	cleanup_config: ArtifactsCleanupConfig,
389	artifacts: Artifacts,
390
391	to_host_rx: mpsc::Receiver<ToHost>,
392
393	to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
394	from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
395
396	to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
397	from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
398
399	to_sweeper_tx: mpsc::Sender<PathBuf>,
400
401	awaiting_prepare: AwaitingPrepare,
402}
403
404#[derive(Debug)]
405struct Fatal;
406
407async fn run(
408	Inner {
409		cleanup_pulse_interval,
410		cleanup_config,
411		mut artifacts,
412		to_host_rx,
413		from_prepare_queue_rx,
414		mut to_prepare_queue_tx,
415		from_execute_queue_rx,
416		mut to_execute_queue_tx,
417		mut to_sweeper_tx,
418		mut awaiting_prepare,
419	}: Inner,
420) {
421	macro_rules! break_if_fatal {
422		($expr:expr) => {
423			match $expr {
424				Err(Fatal) => {
425					gum::error!(
426						target: LOG_TARGET,
427						"Fatal error occurred, terminating the host. Line: {}",
428						line!(),
429					);
430					break
431				},
432				Ok(v) => v,
433			}
434		};
435	}
436
437	let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
438	futures::pin_mut!(cleanup_pulse);
439
440	let mut to_host_rx = to_host_rx.fuse();
441	let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
442	let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
443
444	loop {
445		// biased to make it behave deterministically for tests.
446		futures::select_biased! {
447			from_execute_queue_rx = from_execute_queue_rx.next() => {
448				let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
449				let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
450				break_if_fatal!(handle_artifact_removal(
451					&mut to_sweeper_tx,
452					&mut artifacts,
453					artifact,
454					reply_to,
455				).await);
456			},
457			() = cleanup_pulse.select_next_some() => {
458				// `select_next_some` because we don't expect this to fail, but if it does, we
459				// still don't fail. The trade-off is that the compiled cache will start growing
460				// in size. That is, however, rather a slow process and hopefully the operator
461				// will notice it.
462
463				break_if_fatal!(handle_cleanup_pulse(
464					&mut to_sweeper_tx,
465					&mut artifacts,
466					&cleanup_config,
467				).await);
468			},
469			to_host = to_host_rx.next() => {
470				let to_host = match to_host {
471					None => {
472						// The sending half of the channel has been closed, meaning the
473						// `ValidationHost` struct was dropped. Shutting down gracefully.
474						break;
475					},
476					Some(to_host) => to_host,
477				};
478
479				// If the artifact failed before, it could be re-scheduled for preparation here if
480				// the preparation failure cooldown has elapsed.
481				break_if_fatal!(handle_to_host(
482					&mut artifacts,
483					&mut to_prepare_queue_tx,
484					&mut to_execute_queue_tx,
485					&mut awaiting_prepare,
486					to_host,
487				)
488				.await);
489			},
490			from_prepare_queue = from_prepare_queue_rx.next() => {
491				let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
492
493				// Note that the preparation outcome is always reported as concluded.
494				//
495				// That's because the error conditions are written into the artifact and will be
496				// reported at the time of the execution. It potentially, but not necessarily, can
497				// be scheduled for execution as a result of this function call, in case there are
498				// pending executions.
499				//
500				// We could be eager in terms of reporting and plumb the result from the preparation
501				// worker but we don't for the sake of simplicity.
502				break_if_fatal!(handle_prepare_done(
503					&mut artifacts,
504					&mut to_execute_queue_tx,
505					&mut awaiting_prepare,
506					from_queue,
507				).await);
508			},
509		}
510	}
511}
512
513async fn handle_to_host(
514	artifacts: &mut Artifacts,
515	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
516	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
517	awaiting_prepare: &mut AwaitingPrepare,
518	to_host: ToHost,
519) -> Result<(), Fatal> {
520	match to_host {
521		ToHost::PrecheckPvf { pvf, result_tx } => {
522			handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
523		},
524		ToHost::ExecutePvf(inputs) => {
525			handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
526				.await?;
527		},
528		ToHost::HeadsUp { active_pvfs } => {
529			handle_heads_up(artifacts, prepare_queue, active_pvfs).await?
530		},
531		ToHost::UpdateActiveLeaves { update, ancestors } => {
532			handle_update_active_leaves(execute_queue, update, ancestors).await?
533		},
534		#[cfg(feature = "test-utils")]
535		ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => {
536			artifacts.replace_artifact_checksum(checksum, new_checksum);
537		},
538	}
539
540	Ok(())
541}
542
543/// Handles PVF prechecking requests.
544///
545/// This tries to prepare the PVF by compiling the WASM blob within a timeout set in
546/// `PvfPrepData`.
547///
548/// We don't retry artifacts that previously failed preparation. We don't expect multiple
549/// pre-checking requests.
550async fn handle_precheck_pvf(
551	artifacts: &mut Artifacts,
552	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
553	pvf: PvfPrepData,
554	result_sender: PrecheckResultSender,
555) -> Result<(), Fatal> {
556	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
557
558	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
559		match state {
560			ArtifactState::Prepared { last_time_needed, .. } => {
561				*last_time_needed = SystemTime::now();
562				let _ = result_sender.send(Ok(()));
563			},
564			ArtifactState::Preparing { waiting_for_response, num_failures: _ } => {
565				waiting_for_response.push(result_sender)
566			},
567			ArtifactState::FailedToProcess { error, .. } => {
568				// Do not retry an artifact that previously failed preparation.
569				let _ = result_sender.send(PrecheckResult::Err(error.clone()));
570			},
571		}
572	} else {
573		artifacts.insert_preparing(artifact_id, vec![result_sender]);
574		send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
575			.await?;
576	}
577	Ok(())
578}
579
580/// Handles PVF execution.
581///
582/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is
583/// already a preparation job, we coalesce the two preparation jobs.
584///
585/// If the prepare job succeeded previously, we will enqueue an execute job right away.
586///
587/// If the prepare job failed previously, we may retry it under certain conditions.
588///
589/// When preparing for execution, we use a more lenient timeout
590/// ([`DEFAULT_LENIENT_PREPARATION_TIMEOUT`](polkadot_primitives::executor_params::DEFAULT_LENIENT_PREPARATION_TIMEOUT))
591/// than when prechecking.
592async fn handle_execute_pvf(
593	artifacts: &mut Artifacts,
594	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
595	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
596	awaiting_prepare: &mut AwaitingPrepare,
597	inputs: ExecutePvfInputs,
598) -> Result<(), Fatal> {
599	let ExecutePvfInputs { pvf, validation_context, priority, exec_kind, result_tx } = inputs;
600	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
601	let exec_timeout = validation_context.exec_timeout;
602
603	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
604		match state {
605			ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => {
606				let file_metadata = std::fs::metadata(path);
607
608				if file_metadata.is_ok() {
609					*last_time_needed = SystemTime::now();
610
611					// This artifact has already been prepared, send it to the execute queue.
612					send_execute(
613						execute_queue,
614						execute::ToQueue::Enqueue {
615							artifact: ArtifactPathId::new(artifact_id, path, *checksum),
616							pending_execution_request: PendingExecutionRequest {
617								exec_timeout,
618								validation_context,
619								exec_kind,
620								result_tx,
621							},
622						},
623					)
624					.await?;
625				} else {
626					gum::warn!(
627						target: LOG_TARGET,
628						?pvf,
629						?artifact_id,
630						"handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
631					);
632
633					// The artifact has been prepared previously but the file is missing, prepare it
634					// again.
635					*state = ArtifactState::Preparing {
636						waiting_for_response: Vec::new(),
637						num_failures: 0,
638					};
639					enqueue_prepare_for_execute(
640						prepare_queue,
641						awaiting_prepare,
642						pvf,
643						priority,
644						artifact_id,
645						PendingExecutionRequest {
646							exec_timeout,
647							validation_context,
648							exec_kind,
649							result_tx,
650						},
651					)
652					.await?;
653				}
654			},
655			ArtifactState::Preparing { .. } => {
656				awaiting_prepare.add(
657					artifact_id,
658					PendingExecutionRequest {
659						exec_timeout,
660						validation_context,
661						result_tx,
662						exec_kind,
663					},
664				);
665			},
666			ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
667				if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
668					gum::warn!(
669						target: LOG_TARGET,
670						?pvf,
671						?artifact_id,
672						?last_time_failed,
673						%num_failures,
674						%error,
675						"handle_execute_pvf: Re-trying failed PVF preparation."
676					);
677
678					// If we are allowed to retry the failed prepare job, change the state to
679					// Preparing and re-queue this job.
680					*state = ArtifactState::Preparing {
681						waiting_for_response: Vec::new(),
682						num_failures: *num_failures,
683					};
684					enqueue_prepare_for_execute(
685						prepare_queue,
686						awaiting_prepare,
687						pvf,
688						priority,
689						artifact_id,
690						PendingExecutionRequest {
691							exec_timeout,
692							validation_context,
693							exec_kind,
694							result_tx,
695						},
696					)
697					.await?;
698				} else {
699					let _ = result_tx.send(Err(ValidationError::from(error.clone())));
700				}
701			},
702		}
703	} else {
704		// Artifact is unknown: register it and enqueue a job with the corresponding priority and
705		// PVF.
706		artifacts.insert_preparing(artifact_id.clone(), Vec::new());
707		enqueue_prepare_for_execute(
708			prepare_queue,
709			awaiting_prepare,
710			pvf,
711			priority,
712			artifact_id,
713			PendingExecutionRequest { exec_timeout, validation_context, result_tx, exec_kind },
714		)
715		.await?;
716	}
717
718	Ok(())
719}
720
721async fn handle_heads_up(
722	artifacts: &mut Artifacts,
723	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
724	active_pvfs: Vec<PvfPrepData>,
725) -> Result<(), Fatal> {
726	let now = SystemTime::now();
727
728	for active_pvf in active_pvfs {
729		let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
730		if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
731			match state {
732				ArtifactState::Prepared { last_time_needed, .. } => {
733					*last_time_needed = now;
734				},
735				ArtifactState::Preparing { .. } => {
736					// The artifact is already being prepared, so we don't need to do anything.
737				},
738				ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
739					if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
740						gum::warn!(
741							target: LOG_TARGET,
742							?active_pvf,
743							?artifact_id,
744							?last_time_failed,
745							%num_failures,
746							%error,
747							"handle_heads_up: Re-trying failed PVF preparation."
748						);
749
750						// If we are allowed to retry the failed prepare job, change the state to
751						// Preparing and re-queue this job.
752						*state = ArtifactState::Preparing {
753							waiting_for_response: vec![],
754							num_failures: *num_failures,
755						};
756						send_prepare(
757							prepare_queue,
758							prepare::ToQueue::Enqueue {
759								priority: Priority::Normal,
760								pvf: active_pvf,
761							},
762						)
763						.await?;
764					}
765				},
766			}
767		} else {
768			// It's not in the artifacts, so we need to enqueue a job to prepare it.
769			artifacts.insert_preparing(artifact_id.clone(), Vec::new());
770
771			send_prepare(
772				prepare_queue,
773				prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
774			)
775			.await?;
776		}
777	}
778
779	Ok(())
780}
781
782async fn handle_prepare_done(
783	artifacts: &mut Artifacts,
784	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
785	awaiting_prepare: &mut AwaitingPrepare,
786	from_queue: prepare::FromQueue,
787) -> Result<(), Fatal> {
788	let prepare::FromQueue { artifact_id, result } = from_queue;
789
790	// Make some sanity checks and extract the current state.
791	let state = match artifacts.artifact_state_mut(&artifact_id) {
792		None => {
793			// before sending request to prepare, the artifact is inserted with `preparing` state;
794			// the requests are deduplicated for the same artifact id;
795			// there is only one possible state change: prepare is done;
796			// thus the artifact cannot be unknown, only preparing;
797			// qed.
798			never!("an unknown artifact was prepared: {:?}", artifact_id);
799			return Ok(());
800		},
801		Some(ArtifactState::Prepared { .. }) => {
802			// before sending request to prepare, the artifact is inserted with `preparing` state;
803			// the requests are deduplicated for the same artifact id;
804			// there is only one possible state change: prepare is done;
805			// thus the artifact cannot be prepared, only preparing;
806			// qed.
807			never!("the artifact is already prepared: {:?}", artifact_id);
808			return Ok(());
809		},
810		Some(ArtifactState::FailedToProcess { .. }) => {
811			// The reasoning is similar to the above, the artifact cannot be
812			// processed at this point.
813			never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
814			return Ok(());
815		},
816		Some(state @ ArtifactState::Preparing { .. }) => state,
817	};
818
819	let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
820		state
821	{
822		for result_sender in waiting_for_response.drain(..) {
823			let result = result.clone().map(|_| ());
824			let _ = result_sender.send(result);
825		}
826		num_failures
827	} else {
828		never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
829		return Ok(());
830	};
831
832	// It's finally time to dispatch all the execution requests that were waiting for this artifact
833	// to be prepared.
834	let pending_requests = awaiting_prepare.take(&artifact_id);
835	for PendingExecutionRequest { exec_timeout, validation_context, result_tx, exec_kind } in
836		pending_requests
837	{
838		if result_tx.is_canceled() {
839			// Preparation could've taken quite a bit of time and the requester may be not
840			// interested in execution anymore, in which case we just skip the request.
841			continue;
842		}
843
844		let (path, checksum) = match &result {
845			Ok(success) => (success.path.clone(), success.checksum),
846			Err(error) => {
847				let _ = result_tx.send(Err(ValidationError::from(error.clone())));
848				continue;
849			},
850		};
851
852		send_execute(
853			execute_queue,
854			execute::ToQueue::Enqueue {
855				artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum),
856				pending_execution_request: PendingExecutionRequest {
857					exec_timeout,
858					validation_context,
859					result_tx,
860					exec_kind,
861				},
862			},
863		)
864		.await?;
865	}
866
867	*state = match result {
868		Ok(PrepareSuccess { checksum, path, size, .. }) => {
869			ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size }
870		},
871		Err(error) => {
872			let last_time_failed = SystemTime::now();
873			let num_failures = *num_failures + 1;
874
875			gum::error!(
876				target: LOG_TARGET,
877				?artifact_id,
878				time_failed = ?last_time_failed,
879				%num_failures,
880				"artifact preparation failed: {}",
881				error
882			);
883			ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
884		},
885	};
886
887	Ok(())
888}
889
890async fn handle_update_active_leaves(
891	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
892	update: ActiveLeavesUpdate,
893	ancestors: Vec<Hash>,
894) -> Result<(), Fatal> {
895	send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
896}
897
898async fn send_prepare(
899	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
900	to_queue: prepare::ToQueue,
901) -> Result<(), Fatal> {
902	prepare_queue.send(to_queue).await.map_err(|_| Fatal)
903}
904
905async fn send_execute(
906	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
907	to_queue: execute::ToQueue,
908) -> Result<(), Fatal> {
909	execute_queue.send(to_queue).await.map_err(|_| Fatal)
910}
911
912/// Sends a job to the preparation queue, and adds an execution request that will wait to run after
913/// this prepare job has finished.
914async fn enqueue_prepare_for_execute(
915	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
916	awaiting_prepare: &mut AwaitingPrepare,
917	pvf: PvfPrepData,
918	priority: Priority,
919	artifact_id: ArtifactId,
920	pending_execution_request: PendingExecutionRequest,
921) -> Result<(), Fatal> {
922	send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
923
924	// Add an execution request that will wait to run after this prepare job has finished.
925	awaiting_prepare.add(artifact_id, pending_execution_request);
926
927	Ok(())
928}
929
930async fn handle_cleanup_pulse(
931	sweeper_tx: &mut mpsc::Sender<PathBuf>,
932	artifacts: &mut Artifacts,
933	cleanup_config: &ArtifactsCleanupConfig,
934) -> Result<(), Fatal> {
935	let to_remove = artifacts.prune(cleanup_config);
936	gum::debug!(
937		target: LOG_TARGET,
938		"PVF pruning: {} artifacts reached their end of life",
939		to_remove.len(),
940	);
941	for (artifact_id, path) in to_remove {
942		gum::debug!(
943			target: LOG_TARGET,
944			validation_code_hash = ?artifact_id.code_hash,
945			"pruning artifact",
946		);
947		sweeper_tx.send(path).await.map_err(|_| Fatal)?;
948	}
949
950	Ok(())
951}
952
953async fn handle_artifact_removal(
954	sweeper_tx: &mut mpsc::Sender<PathBuf>,
955	artifacts: &mut Artifacts,
956	artifact_id: ArtifactId,
957	reply_to: oneshot::Sender<()>,
958) -> Result<(), Fatal> {
959	let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
960		artifact
961	} else {
962		// if we haven't found the artifact by its id,
963		// it has been probably removed
964		// anyway with the randomness of the artifact name
965		// it is safe to ignore
966		return Ok(());
967	};
968	reply_to
969		.send(())
970		.expect("the execute queue waits for the artifact remove confirmation; qed");
971	// Thanks to the randomness of the artifact name (see
972	// `artifacts::generate_artifact_path`) there is no issue with any name conflict on
973	// future repreparation.
974	// So we can confirm the artifact removal already
975	gum::debug!(
976		target: LOG_TARGET,
977		validation_code_hash = ?artifact_id.code_hash,
978		"PVF pruning: pruning artifact by request from the execute queue",
979	);
980	sweeper_tx.send(path).await.map_err(|_| Fatal)?;
981	Ok(())
982}
983
984/// A simple task which sole purpose is to delete files thrown at it.
985async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
986	loop {
987		match sweeper_rx.next().await {
988			None => break,
989			Some(condemned) => {
990				let result = tokio::fs::remove_file(&condemned).await;
991				gum::trace!(
992					target: LOG_TARGET,
993					?result,
994					"Swept the artifact file {}",
995					condemned.display(),
996				);
997			},
998		}
999	}
1000}
1001
1002/// Check if the conditions to retry a prepare job have been met.
1003fn can_retry_prepare_after_failure(
1004	last_time_failed: SystemTime,
1005	num_failures: u32,
1006	error: &PrepareError,
1007) -> bool {
1008	if error.is_deterministic() {
1009		// This error is considered deterministic, so it will probably be reproducible. Don't retry.
1010		return false;
1011	}
1012
1013	// Retry if the retry cooldown has elapsed and if we have already retried less than
1014	// `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves.
1015	SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
1016		num_failures <= NUM_PREPARE_RETRIES
1017}
1018
1019/// A stream that yields a pulse continuously at a given interval.
1020fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
1021	futures::stream::unfold(interval, {
1022		|interval| async move {
1023			futures_timer::Delay::new(interval).await;
1024			Some(((), interval))
1025		}
1026	})
1027	.map(|_| ())
1028}
1029
1030#[cfg(test)]
1031pub(crate) mod tests {
1032	use super::*;
1033	use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
1034	use assert_matches::assert_matches;
1035	use futures::future::BoxFuture;
1036	use polkadot_node_core_pvf_common::execute::ValidationContext;
1037	use polkadot_node_primitives::{BlockData, PoV};
1038	use polkadot_primitives::{
1039		CandidateReceiptV2 as CandidateReceipt, ExecutorParams, PersistedValidationData,
1040	};
1041	use polkadot_primitives_test_helpers::dummy_candidate_receipt;
1042	use sp_core::H256;
1043	use std::sync::Arc;
1044
1045	const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
1046	pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
1047
1048	#[tokio::test]
1049	async fn pulse_test() {
1050		let pulse = pulse_every(Duration::from_millis(100));
1051		futures::pin_mut!(pulse);
1052
1053		for _ in 0..5 {
1054			let start = std::time::Instant::now();
1055			let _ = pulse.next().await.unwrap();
1056
1057			let el = start.elapsed().as_millis();
1058			assert!(el > 50 && el < 150, "pulse duration: {}", el);
1059		}
1060	}
1061
1062	struct Builder {
1063		cleanup_pulse_interval: Duration,
1064		cleanup_config: ArtifactsCleanupConfig,
1065		artifacts: Artifacts,
1066	}
1067
1068	impl Builder {
1069		fn default() -> Self {
1070			Self {
1071				// these are selected high to not interfere in tests in which pruning is irrelevant.
1072				cleanup_pulse_interval: Duration::from_secs(3600),
1073				cleanup_config: ArtifactsCleanupConfig::default(),
1074				artifacts: Artifacts::empty(),
1075			}
1076		}
1077
1078		fn build(self) -> Test {
1079			Test::new(self)
1080		}
1081	}
1082
1083	struct Test {
1084		to_host_tx: Option<mpsc::Sender<ToHost>>,
1085
1086		to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
1087		from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
1088		to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
1089		#[allow(unused)]
1090		from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
1091		to_sweeper_rx: mpsc::Receiver<PathBuf>,
1092
1093		run: BoxFuture<'static, ()>,
1094	}
1095
1096	impl Test {
1097		fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
1098			let (to_host_tx, to_host_rx) = mpsc::channel(10);
1099			let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
1100			let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
1101			let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
1102			let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
1103			let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
1104
1105			let run = run(Inner {
1106				cleanup_pulse_interval,
1107				cleanup_config,
1108				artifacts,
1109				to_host_rx,
1110				to_prepare_queue_tx,
1111				from_prepare_queue_rx,
1112				to_execute_queue_tx,
1113				from_execute_queue_rx,
1114				to_sweeper_tx,
1115				awaiting_prepare: AwaitingPrepare::default(),
1116			})
1117			.boxed();
1118
1119			Self {
1120				to_host_tx: Some(to_host_tx),
1121				to_prepare_queue_rx,
1122				from_prepare_queue_tx,
1123				to_execute_queue_rx,
1124				from_execute_queue_tx,
1125				to_sweeper_rx,
1126				run,
1127			}
1128		}
1129
1130		fn host_handle(&mut self) -> ValidationHost {
1131			let to_host_tx = self.to_host_tx.take().unwrap();
1132			let security_status = Default::default();
1133			ValidationHost { to_host_tx, security_status }
1134		}
1135
1136		async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
1137		where
1138			T: Send,
1139		{
1140			run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
1141		}
1142
1143		async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
1144			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1145			run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
1146				.await
1147		}
1148
1149		async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
1150			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1151			run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
1152				.await
1153		}
1154
1155		async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
1156			use futures_timer::Delay;
1157
1158			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1159			run_until(
1160				&mut self.run,
1161				async {
1162					futures::select! {
1163						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1164						_ = to_prepare_queue_rx.next().fuse() => {
1165							panic!("the prepare queue is supposed to be empty")
1166						}
1167					}
1168				}
1169				.boxed(),
1170			)
1171			.await
1172		}
1173
1174		async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
1175			use futures_timer::Delay;
1176
1177			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1178			run_until(
1179				&mut self.run,
1180				async {
1181					futures::select! {
1182						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1183						_ = to_execute_queue_rx.next().fuse() => {
1184							panic!("the execute queue is supposed to be empty")
1185						}
1186					}
1187				}
1188				.boxed(),
1189			)
1190			.await
1191		}
1192
1193		async fn poll_ensure_to_sweeper_is_empty(&mut self) {
1194			use futures_timer::Delay;
1195
1196			let to_sweeper_rx = &mut self.to_sweeper_rx;
1197			run_until(
1198				&mut self.run,
1199				async {
1200					futures::select! {
1201						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1202						msg = to_sweeper_rx.next().fuse() => {
1203							panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
1204						}
1205					}
1206				}
1207				.boxed(),
1208			)
1209			.await
1210		}
1211	}
1212
1213	async fn run_until<R>(
1214		task: &mut (impl Future<Output = ()> + Unpin),
1215		mut fut: impl Future<Output = R> + Unpin,
1216	) -> R {
1217		use std::task::Poll;
1218
1219		let start = std::time::Instant::now();
1220		let fut = &mut fut;
1221		loop {
1222			if start.elapsed() > std::time::Duration::from_secs(2) {
1223				// We expect that this will take only a couple of iterations and thus to take way
1224				// less than a second.
1225				panic!("timeout");
1226			}
1227
1228			if let Poll::Ready(r) = futures::poll!(&mut *fut) {
1229				break r;
1230			}
1231
1232			if futures::poll!(&mut *task).is_ready() {
1233				panic!()
1234			}
1235		}
1236	}
1237
1238	/// Helper to create a standard validation context for tests.
1239	fn test_validation_context(
1240		pvd: Arc<PersistedValidationData>,
1241		pov: Arc<PoV>,
1242	) -> ValidationContext {
1243		let candidate_receipt: CandidateReceipt = dummy_candidate_receipt(H256::default()).into();
1244		ValidationContext {
1245			candidate_receipt,
1246			pvd,
1247			pov,
1248			executor_params: ExecutorParams::default(),
1249			exec_timeout: TEST_EXECUTION_TIMEOUT,
1250			v3_seen: false,
1251		}
1252	}
1253
1254	#[tokio::test]
1255	async fn shutdown_on_handle_drop() {
1256		let test = Builder::default().build();
1257
1258		let join_handle = tokio::task::spawn(test.run);
1259
1260		// Dropping the handle will lead to conclusion of the read part and thus will make the event
1261		// loop to stop, which in turn will resolve the join handle.
1262		drop(test.to_host_tx);
1263		join_handle.await.unwrap();
1264	}
1265
1266	#[tokio::test]
1267	async fn pruning() {
1268		let mock_now = SystemTime::now() - Duration::from_millis(1000);
1269		let tempdir = tempfile::tempdir().unwrap();
1270		let cache_path = tempdir.path();
1271
1272		let mut builder = Builder::default();
1273		builder.cleanup_pulse_interval = Duration::from_millis(100);
1274		builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
1275		let path1 = generate_artifact_path(cache_path);
1276		let path2 = generate_artifact_path(cache_path);
1277		builder.artifacts.insert_prepared(
1278			artifact_id(1),
1279			path1.clone(),
1280			Default::default(),
1281			mock_now,
1282			1024,
1283		);
1284		builder.artifacts.insert_prepared(
1285			artifact_id(2),
1286			path2.clone(),
1287			Default::default(),
1288			mock_now,
1289			1024,
1290		);
1291		let mut test = builder.build();
1292		let mut host = test.host_handle();
1293
1294		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1295
1296		let to_sweeper_rx = &mut test.to_sweeper_rx;
1297		run_until(
1298			&mut test.run,
1299			async {
1300				assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
1301			}
1302			.boxed(),
1303		)
1304		.await;
1305
1306		// Extend TTL for the first artifact and make sure we don't receive another file removal
1307		// request.
1308		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1309		test.poll_ensure_to_sweeper_is_empty().await;
1310	}
1311
1312	#[tokio::test]
1313	async fn execute_pvf_requests() {
1314		let mut test = Builder::default().build();
1315		let mut host = test.host_handle();
1316		let pvd = Arc::new(PersistedValidationData {
1317			parent_head: Default::default(),
1318			relay_parent_number: 1u32,
1319			relay_parent_storage_root: H256::default(),
1320			max_pov_size: 4096 * 1024,
1321		});
1322		let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
1323		let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
1324
1325		// Execute the same PVF with the same validation context but different priorities.
1326		// This tests that priority handling works correctly for identical requests.
1327		let validation_context_1 = test_validation_context(pvd.clone(), pov1.clone());
1328
1329		let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
1330		host.execute_pvf(
1331			PvfPrepData::from_discriminator(1),
1332			validation_context_1.clone(),
1333			Priority::Normal,
1334			PvfExecKind::Backing(H256::default()),
1335			result_tx,
1336		)
1337		.await
1338		.unwrap();
1339
1340		let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
1341		host.execute_pvf(
1342			PvfPrepData::from_discriminator(1),
1343			validation_context_1,
1344			Priority::Critical,
1345			PvfExecKind::Backing(H256::default()),
1346			result_tx,
1347		)
1348		.await
1349		.unwrap();
1350
1351		let (result_tx, result_rx_pvf_2) = oneshot::channel();
1352		let validation_context_2 = test_validation_context(pvd, pov2);
1353		host.execute_pvf(
1354			PvfPrepData::from_discriminator(2),
1355			validation_context_2,
1356			Priority::Normal,
1357			PvfExecKind::Backing(H256::default()),
1358			result_tx,
1359		)
1360		.await
1361		.unwrap();
1362
1363		assert_matches!(
1364			test.poll_and_recv_to_prepare_queue().await,
1365			prepare::ToQueue::Enqueue { .. }
1366		);
1367		assert_matches!(
1368			test.poll_and_recv_to_prepare_queue().await,
1369			prepare::ToQueue::Enqueue { .. }
1370		);
1371
1372		test.from_prepare_queue_tx
1373			.send(prepare::FromQueue {
1374				artifact_id: artifact_id(1),
1375				result: Ok(PrepareSuccess::default()),
1376			})
1377			.await
1378			.unwrap();
1379		let result_tx_pvf_1_1 = assert_matches!(
1380			test.poll_and_recv_to_execute_queue().await,
1381			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1382		);
1383		let result_tx_pvf_1_2 = assert_matches!(
1384			test.poll_and_recv_to_execute_queue().await,
1385			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1386		);
1387
1388		test.from_prepare_queue_tx
1389			.send(prepare::FromQueue {
1390				artifact_id: artifact_id(2),
1391				result: Ok(PrepareSuccess::default()),
1392			})
1393			.await
1394			.unwrap();
1395		let result_tx_pvf_2 = assert_matches!(
1396			test.poll_and_recv_to_execute_queue().await,
1397			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1398		);
1399
1400		result_tx_pvf_1_1
1401			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1402			.unwrap();
1403		assert_matches!(
1404			result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
1405			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1406		);
1407
1408		result_tx_pvf_1_2
1409			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1410			.unwrap();
1411		assert_matches!(
1412			result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
1413			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1414		);
1415
1416		result_tx_pvf_2
1417			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1418			.unwrap();
1419		assert_matches!(
1420			result_rx_pvf_2.now_or_never().unwrap().unwrap(),
1421			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1422		);
1423	}
1424
1425	#[tokio::test]
1426	async fn precheck_pvf() {
1427		let mut test = Builder::default().build();
1428		let mut host = test.host_handle();
1429
1430		// First, test a simple precheck request.
1431		let (result_tx, result_rx) = oneshot::channel();
1432		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1433			.await
1434			.unwrap();
1435
1436		// The queue received the prepare request.
1437		assert_matches!(
1438			test.poll_and_recv_to_prepare_queue().await,
1439			prepare::ToQueue::Enqueue { .. }
1440		);
1441		// Send `Ok` right away and poll the host.
1442		test.from_prepare_queue_tx
1443			.send(prepare::FromQueue {
1444				artifact_id: artifact_id(1),
1445				result: Ok(PrepareSuccess::default()),
1446			})
1447			.await
1448			.unwrap();
1449		// No pending execute requests.
1450		test.poll_ensure_to_execute_queue_is_empty().await;
1451		// Received the precheck result.
1452		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1453
1454		// Send multiple requests for the same PVF.
1455		let mut precheck_receivers = Vec::new();
1456		for _ in 0..3 {
1457			let (result_tx, result_rx) = oneshot::channel();
1458			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1459				.await
1460				.unwrap();
1461			precheck_receivers.push(result_rx);
1462		}
1463		// Received prepare request.
1464		assert_matches!(
1465			test.poll_and_recv_to_prepare_queue().await,
1466			prepare::ToQueue::Enqueue { .. }
1467		);
1468		test.from_prepare_queue_tx
1469			.send(prepare::FromQueue {
1470				artifact_id: artifact_id(2),
1471				result: Err(PrepareError::TimedOut),
1472			})
1473			.await
1474			.unwrap();
1475		test.poll_ensure_to_execute_queue_is_empty().await;
1476		for result_rx in precheck_receivers {
1477			assert_matches!(
1478				result_rx.now_or_never().unwrap().unwrap(),
1479				Err(PrepareError::TimedOut)
1480			);
1481		}
1482	}
1483
1484	#[tokio::test]
1485	async fn test_prepare_done() {
1486		let mut test = Builder::default().build();
1487		let mut host = test.host_handle();
1488		let pvd = Arc::new(PersistedValidationData {
1489			parent_head: Default::default(),
1490			relay_parent_number: 1u32,
1491			relay_parent_storage_root: H256::default(),
1492			max_pov_size: 4096 * 1024,
1493		});
1494		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1495
1496		// Test mixed cases of receiving execute and precheck requests
1497		// for the same PVF.
1498
1499		// Send PVF for the execution and request the prechecking for it.
1500		let (result_tx, result_rx_execute) = oneshot::channel();
1501		let validation_context = test_validation_context(pvd.clone(), pov.clone());
1502		host.execute_pvf(
1503			PvfPrepData::from_discriminator(1),
1504			validation_context,
1505			Priority::Critical,
1506			PvfExecKind::Backing(H256::default()),
1507			result_tx,
1508		)
1509		.await
1510		.unwrap();
1511
1512		assert_matches!(
1513			test.poll_and_recv_to_prepare_queue().await,
1514			prepare::ToQueue::Enqueue { .. }
1515		);
1516
1517		let (result_tx, result_rx) = oneshot::channel();
1518		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1519			.await
1520			.unwrap();
1521
1522		// Suppose the preparation failed, the execution queue is empty and both
1523		// "clients" receive their results.
1524		test.from_prepare_queue_tx
1525			.send(prepare::FromQueue {
1526				artifact_id: artifact_id(1),
1527				result: Err(PrepareError::TimedOut),
1528			})
1529			.await
1530			.unwrap();
1531		test.poll_ensure_to_execute_queue_is_empty().await;
1532		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
1533		assert_matches!(
1534			result_rx_execute.now_or_never().unwrap().unwrap(),
1535			Err(ValidationError::Internal(_))
1536		);
1537
1538		// Reversed case: first send multiple precheck requests, then ask for an execution.
1539		let mut precheck_receivers = Vec::new();
1540		for _ in 0..3 {
1541			let (result_tx, result_rx) = oneshot::channel();
1542			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1543				.await
1544				.unwrap();
1545			precheck_receivers.push(result_rx);
1546		}
1547
1548		let (result_tx, _result_rx_execute) = oneshot::channel();
1549		let validation_context = test_validation_context(pvd, pov);
1550		host.execute_pvf(
1551			PvfPrepData::from_discriminator(2),
1552			validation_context,
1553			Priority::Critical,
1554			PvfExecKind::Backing(H256::default()),
1555			result_tx,
1556		)
1557		.await
1558		.unwrap();
1559		// Received prepare request.
1560		assert_matches!(
1561			test.poll_and_recv_to_prepare_queue().await,
1562			prepare::ToQueue::Enqueue { .. }
1563		);
1564		test.from_prepare_queue_tx
1565			.send(prepare::FromQueue {
1566				artifact_id: artifact_id(2),
1567				result: Ok(PrepareSuccess::default()),
1568			})
1569			.await
1570			.unwrap();
1571		// The execute queue receives new request, preckecking is finished and we can
1572		// fetch results.
1573		assert_matches!(
1574			test.poll_and_recv_to_execute_queue().await,
1575			execute::ToQueue::Enqueue { .. }
1576		);
1577		for result_rx in precheck_receivers {
1578			assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1579		}
1580	}
1581
1582	// Test that multiple prechecking requests do not trigger preparation retries if the first one
1583	// failed.
1584	#[tokio::test]
1585	async fn test_precheck_prepare_no_retry() {
1586		let mut test = Builder::default().build();
1587		let mut host = test.host_handle();
1588
1589		// Submit a precheck request that fails.
1590		let (result_tx, result_rx) = oneshot::channel();
1591		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1592			.await
1593			.unwrap();
1594
1595		// The queue received the prepare request.
1596		assert_matches!(
1597			test.poll_and_recv_to_prepare_queue().await,
1598			prepare::ToQueue::Enqueue { .. }
1599		);
1600		// Send a PrepareError.
1601		test.from_prepare_queue_tx
1602			.send(prepare::FromQueue {
1603				artifact_id: artifact_id(1),
1604				result: Err(PrepareError::TimedOut),
1605			})
1606			.await
1607			.unwrap();
1608
1609		// The result should contain the error.
1610		let result = test.poll_and_recv_result(result_rx).await;
1611		assert_matches!(result, Err(PrepareError::TimedOut));
1612
1613		// Submit another precheck request.
1614		let (result_tx_2, result_rx_2) = oneshot::channel();
1615		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
1616			.await
1617			.unwrap();
1618
1619		// Assert the prepare queue is empty.
1620		test.poll_ensure_to_prepare_queue_is_empty().await;
1621
1622		// The result should contain the original error.
1623		let result = test.poll_and_recv_result(result_rx_2).await;
1624		assert_matches!(result, Err(PrepareError::TimedOut));
1625
1626		// Pause for enough time to reset the cooldown for this failed prepare request.
1627		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1628
1629		// Submit another precheck request.
1630		let (result_tx_3, result_rx_3) = oneshot::channel();
1631		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
1632			.await
1633			.unwrap();
1634
1635		// Assert the prepare queue is empty - we do not retry for precheck requests.
1636		test.poll_ensure_to_prepare_queue_is_empty().await;
1637
1638		// The result should still contain the original error.
1639		let result = test.poll_and_recv_result(result_rx_3).await;
1640		assert_matches!(result, Err(PrepareError::TimedOut));
1641	}
1642
1643	// Test that multiple execution requests trigger preparation retries if the first one failed due
1644	// to a potentially non-reproducible error.
1645	#[tokio::test]
1646	async fn test_execute_prepare_retry() {
1647		let mut test = Builder::default().build();
1648		let mut host = test.host_handle();
1649		let pvd = Arc::new(PersistedValidationData {
1650			parent_head: Default::default(),
1651			relay_parent_number: 1u32,
1652			relay_parent_storage_root: H256::default(),
1653			max_pov_size: 4096 * 1024,
1654		});
1655		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1656
1657		// Submit a execute request that fails.
1658		let (result_tx, result_rx) = oneshot::channel();
1659		let validation_context = test_validation_context(pvd.clone(), pov.clone());
1660		host.execute_pvf(
1661			PvfPrepData::from_discriminator(1),
1662			validation_context.clone(),
1663			Priority::Critical,
1664			PvfExecKind::Backing(H256::default()),
1665			result_tx,
1666		)
1667		.await
1668		.unwrap();
1669
1670		// The queue received the prepare request.
1671		assert_matches!(
1672			test.poll_and_recv_to_prepare_queue().await,
1673			prepare::ToQueue::Enqueue { .. }
1674		);
1675		// Send a PrepareError.
1676		test.from_prepare_queue_tx
1677			.send(prepare::FromQueue {
1678				artifact_id: artifact_id(1),
1679				result: Err(PrepareError::TimedOut),
1680			})
1681			.await
1682			.unwrap();
1683
1684		// The result should contain the error.
1685		let result = test.poll_and_recv_result(result_rx).await;
1686		assert_matches!(result, Err(ValidationError::Internal(_)));
1687
1688		// Submit another execute request. We shouldn't try to prepare again, yet.
1689		let (result_tx_2, result_rx_2) = oneshot::channel();
1690		host.execute_pvf(
1691			PvfPrepData::from_discriminator(1),
1692			validation_context.clone(),
1693			Priority::Critical,
1694			PvfExecKind::Backing(H256::default()),
1695			result_tx_2,
1696		)
1697		.await
1698		.unwrap();
1699
1700		// Assert the prepare queue is empty.
1701		test.poll_ensure_to_prepare_queue_is_empty().await;
1702
1703		// The result should contain the original error.
1704		let result = test.poll_and_recv_result(result_rx_2).await;
1705		assert_matches!(result, Err(ValidationError::Internal(_)));
1706
1707		// Pause for enough time to reset the cooldown for this failed prepare request.
1708		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1709
1710		// Submit another execute request.
1711		let (result_tx_3, result_rx_3) = oneshot::channel();
1712		host.execute_pvf(
1713			PvfPrepData::from_discriminator(1),
1714			validation_context,
1715			Priority::Critical,
1716			PvfExecKind::Backing(H256::default()),
1717			result_tx_3,
1718		)
1719		.await
1720		.unwrap();
1721
1722		// Assert the prepare queue contains the request.
1723		assert_matches!(
1724			test.poll_and_recv_to_prepare_queue().await,
1725			prepare::ToQueue::Enqueue { .. }
1726		);
1727
1728		test.from_prepare_queue_tx
1729			.send(prepare::FromQueue {
1730				artifact_id: artifact_id(1),
1731				result: Ok(PrepareSuccess::default()),
1732			})
1733			.await
1734			.unwrap();
1735
1736		// Preparation should have been retried and succeeded this time.
1737		let result_tx_3 = assert_matches!(
1738			test.poll_and_recv_to_execute_queue().await,
1739			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1740		);
1741
1742		// Send an error for the execution here, just so we can check the result receiver is still
1743		// alive.
1744		result_tx_3
1745			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1746			.unwrap();
1747		assert_matches!(
1748			result_rx_3.now_or_never().unwrap().unwrap(),
1749			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1750		);
1751	}
1752
1753	// Test that multiple execution requests don't trigger preparation retries if the first one
1754	// failed due to a reproducible error (e.g. Prevalidation).
1755	#[tokio::test]
1756	async fn test_execute_prepare_no_retry() {
1757		let mut test = Builder::default().build();
1758		let mut host = test.host_handle();
1759		let pvd = Arc::new(PersistedValidationData {
1760			parent_head: Default::default(),
1761			relay_parent_number: 1u32,
1762			relay_parent_storage_root: H256::default(),
1763			max_pov_size: 4096 * 1024,
1764		});
1765		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1766
1767		// Submit an execute request that fails.
1768		let (result_tx, result_rx) = oneshot::channel();
1769		let validation_context = test_validation_context(pvd.clone(), pov.clone());
1770		host.execute_pvf(
1771			PvfPrepData::from_discriminator(1),
1772			validation_context.clone(),
1773			Priority::Critical,
1774			PvfExecKind::Backing(H256::default()),
1775			result_tx,
1776		)
1777		.await
1778		.unwrap();
1779
1780		// The queue received the prepare request.
1781		assert_matches!(
1782			test.poll_and_recv_to_prepare_queue().await,
1783			prepare::ToQueue::Enqueue { .. }
1784		);
1785		// Send a PrepareError.
1786		test.from_prepare_queue_tx
1787			.send(prepare::FromQueue {
1788				artifact_id: artifact_id(1),
1789				result: Err(PrepareError::Prevalidation("reproducible error".into())),
1790			})
1791			.await
1792			.unwrap();
1793
1794		// The result should contain the error.
1795		let result = test.poll_and_recv_result(result_rx).await;
1796		assert_matches!(result, Err(ValidationError::Preparation(_)));
1797
1798		// Submit another execute request.
1799		let (result_tx_2, result_rx_2) = oneshot::channel();
1800		host.execute_pvf(
1801			PvfPrepData::from_discriminator(1),
1802			validation_context.clone(),
1803			Priority::Critical,
1804			PvfExecKind::Backing(H256::default()),
1805			result_tx_2,
1806		)
1807		.await
1808		.unwrap();
1809
1810		// Assert the prepare queue is empty.
1811		test.poll_ensure_to_prepare_queue_is_empty().await;
1812
1813		// The result should contain the original error.
1814		let result = test.poll_and_recv_result(result_rx_2).await;
1815		assert_matches!(result, Err(ValidationError::Preparation(_)));
1816
1817		// Pause for enough time to reset the cooldown for this failed prepare request.
1818		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1819
1820		// Submit another execute request.
1821		let (result_tx_3, result_rx_3) = oneshot::channel();
1822		host.execute_pvf(
1823			PvfPrepData::from_discriminator(1),
1824			validation_context,
1825			Priority::Critical,
1826			PvfExecKind::Backing(H256::default()),
1827			result_tx_3,
1828		)
1829		.await
1830		.unwrap();
1831
1832		// Assert the prepare queue is empty - we do not retry for prevalidation errors.
1833		test.poll_ensure_to_prepare_queue_is_empty().await;
1834
1835		// The result should still contain the original error.
1836		let result = test.poll_and_recv_result(result_rx_3).await;
1837		assert_matches!(result, Err(ValidationError::Preparation(_)));
1838	}
1839
1840	// Test that multiple heads-up requests trigger preparation retries if the first one failed.
1841	#[tokio::test]
1842	async fn test_heads_up_prepare_retry() {
1843		let mut test = Builder::default().build();
1844		let mut host = test.host_handle();
1845
1846		// Submit a heads-up request that fails.
1847		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1848
1849		// The queue received the prepare request.
1850		assert_matches!(
1851			test.poll_and_recv_to_prepare_queue().await,
1852			prepare::ToQueue::Enqueue { .. }
1853		);
1854		// Send a PrepareError.
1855		test.from_prepare_queue_tx
1856			.send(prepare::FromQueue {
1857				artifact_id: artifact_id(1),
1858				result: Err(PrepareError::TimedOut),
1859			})
1860			.await
1861			.unwrap();
1862
1863		// Submit another heads-up request.
1864		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1865
1866		// Assert the prepare queue is empty.
1867		test.poll_ensure_to_prepare_queue_is_empty().await;
1868
1869		// Pause for enough time to reset the cooldown for this failed prepare request.
1870		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1871
1872		// Submit another heads-up request.
1873		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1874
1875		// Assert the prepare queue contains the request.
1876		assert_matches!(
1877			test.poll_and_recv_to_prepare_queue().await,
1878			prepare::ToQueue::Enqueue { .. }
1879		);
1880	}
1881
1882	#[tokio::test]
1883	async fn cancellation() {
1884		let mut test = Builder::default().build();
1885		let mut host = test.host_handle();
1886		let pvd = Arc::new(PersistedValidationData {
1887			parent_head: Default::default(),
1888			relay_parent_number: 1u32,
1889			relay_parent_storage_root: H256::default(),
1890			max_pov_size: 4096 * 1024,
1891		});
1892		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1893
1894		let (result_tx, result_rx) = oneshot::channel();
1895		let validation_context = test_validation_context(pvd, pov);
1896		host.execute_pvf(
1897			PvfPrepData::from_discriminator(1),
1898			validation_context,
1899			Priority::Normal,
1900			PvfExecKind::Backing(H256::default()),
1901			result_tx,
1902		)
1903		.await
1904		.unwrap();
1905
1906		assert_matches!(
1907			test.poll_and_recv_to_prepare_queue().await,
1908			prepare::ToQueue::Enqueue { .. }
1909		);
1910
1911		test.from_prepare_queue_tx
1912			.send(prepare::FromQueue {
1913				artifact_id: artifact_id(1),
1914				result: Ok(PrepareSuccess::default()),
1915			})
1916			.await
1917			.unwrap();
1918
1919		drop(result_rx);
1920
1921		test.poll_ensure_to_execute_queue_is_empty().await;
1922	}
1923}