referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/
host.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Validation host - is the primary interface for this crate. It allows the clients to enqueue
18//! jobs for PVF execution or preparation.
19//!
20//! The validation host is represented by a future/task that runs an event-loop and by a handle,
21//! [`ValidationHost`], that allows communication with that event-loop.
22
23use crate::{
24	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
25	execute::{self, PendingExecutionRequest},
26	metrics::Metrics,
27	prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
28};
29use always_assert::never;
30use futures::{
31	channel::{mpsc, oneshot},
32	Future, FutureExt, SinkExt, StreamExt,
33};
34#[cfg(feature = "test-utils")]
35use polkadot_node_core_pvf_common::ArtifactChecksum;
36use polkadot_node_core_pvf_common::{
37	error::{PrecheckResult, PrepareError},
38	prepare::PrepareSuccess,
39	pvf::PvfPrepData,
40};
41use polkadot_node_primitives::PoV;
42use polkadot_node_subsystem::{
43	messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
44};
45use polkadot_parachain_primitives::primitives::ValidationResult;
46use polkadot_primitives::{Hash, PersistedValidationData};
47use std::{
48	collections::HashMap,
49	path::PathBuf,
50	sync::Arc,
51	time::{Duration, SystemTime},
52};
53
54/// The time period after which a failed preparation artifact is considered ready to be retried.
55/// Note that we will only retry if another request comes in after this cooldown has passed.
56#[cfg(not(test))]
57pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
58#[cfg(test)]
59pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
60
61/// The amount of times we will retry failed prepare jobs.
62pub const NUM_PREPARE_RETRIES: u32 = 5;
63
64/// The name of binary spawned to prepare a PVF artifact
65pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
66
67/// The name of binary spawned to execute a PVF
68pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
69
70/// The size of incoming message queue
71pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
72
73/// An alias to not spell the type for the oneshot sender for the PVF execution result.
74pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
75
76/// Transmission end used for sending the PVF preparation result.
77pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
78
79/// A handle to the async process serving the validation host requests.
80#[derive(Clone)]
81pub struct ValidationHost {
82	to_host_tx: mpsc::Sender<ToHost>,
83	/// Available security features, detected by the host during startup.
84	pub security_status: SecurityStatus,
85}
86
87impl ValidationHost {
88	/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time
89	/// limit. This will prepare the PVF. The result of preparation will be sent to the provided
90	/// result sender.
91	///
92	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
93	/// situations this function should return immediately.
94	///
95	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
96	pub async fn precheck_pvf(
97		&mut self,
98		pvf: PvfPrepData,
99		result_tx: PrecheckResultSender,
100	) -> Result<(), String> {
101		self.to_host_tx
102			.send(ToHost::PrecheckPvf { pvf, result_tx })
103			.await
104			.map_err(|_| "the inner loop hung up".to_string())
105	}
106
107	/// Execute PVF with the given code, execution timeout, parameters and priority.
108	/// The result of execution will be sent to the provided result sender.
109	///
110	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
111	/// situations this function should return immediately.
112	///
113	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
114	pub async fn execute_pvf(
115		&mut self,
116		pvf: PvfPrepData,
117		exec_timeout: Duration,
118		pvd: Arc<PersistedValidationData>,
119		pov: Arc<PoV>,
120		priority: Priority,
121		exec_kind: PvfExecKind,
122		result_tx: ResultSender,
123	) -> Result<(), String> {
124		self.to_host_tx
125			.send(ToHost::ExecutePvf(ExecutePvfInputs {
126				pvf,
127				exec_timeout,
128				pvd,
129				pov,
130				priority,
131				exec_kind,
132				result_tx,
133			}))
134			.await
135			.map_err(|_| "the inner loop hung up".to_string())
136	}
137
138	/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
139	///
140	/// This is async to accommodate the possibility of back-pressure. In the vast majority of
141	/// situations this function should return immediately.
142	///
143	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
144	pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
145		self.to_host_tx
146			.send(ToHost::HeadsUp { active_pvfs })
147			.await
148			.map_err(|_| "the inner loop hung up".to_string())
149	}
150
151	/// Sends a signal to the validation host requesting to update best block.
152	///
153	/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
154	pub async fn update_active_leaves(
155		&mut self,
156		update: ActiveLeavesUpdate,
157		ancestors: Vec<Hash>,
158	) -> Result<(), String> {
159		self.to_host_tx
160			.send(ToHost::UpdateActiveLeaves { update, ancestors })
161			.await
162			.map_err(|_| "the inner loop hung up".to_string())
163	}
164
165	/// Replace the artifact checksum with a new one.
166	///
167	/// Only for test purposes to imitate a corruption of the artifact on disk.
168	#[cfg(feature = "test-utils")]
169	pub async fn replace_artifact_checksum(
170		&mut self,
171		checksum: ArtifactChecksum,
172		new_checksum: ArtifactChecksum,
173	) -> Result<(), String> {
174		self.to_host_tx
175			.send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum })
176			.await
177			.map_err(|_| "the inner loop hung up".to_string())
178	}
179}
180
181enum ToHost {
182	PrecheckPvf {
183		pvf: PvfPrepData,
184		result_tx: PrecheckResultSender,
185	},
186	ExecutePvf(ExecutePvfInputs),
187	HeadsUp {
188		active_pvfs: Vec<PvfPrepData>,
189	},
190	UpdateActiveLeaves {
191		update: ActiveLeavesUpdate,
192		ancestors: Vec<Hash>,
193	},
194	#[cfg(feature = "test-utils")]
195	ReplaceArtifactChecksum {
196		checksum: ArtifactChecksum,
197		new_checksum: ArtifactChecksum,
198	},
199}
200
201struct ExecutePvfInputs {
202	pvf: PvfPrepData,
203	exec_timeout: Duration,
204	pvd: Arc<PersistedValidationData>,
205	pov: Arc<PoV>,
206	priority: Priority,
207	exec_kind: PvfExecKind,
208	result_tx: ResultSender,
209}
210
211/// Configuration for the validation host.
212#[derive(Debug)]
213pub struct Config {
214	/// The root directory where the prepared artifacts can be stored.
215	pub cache_path: PathBuf,
216	/// The version of the node. `None` can be passed to skip the version check (only for tests).
217	pub node_version: Option<String>,
218	/// Whether the node is attempting to run as a secure validator.
219	pub secure_validator_mode: bool,
220
221	/// The path to the program that can be used to spawn the prepare workers.
222	pub prepare_worker_program_path: PathBuf,
223	/// The time allotted for a prepare worker to spawn and report to the host.
224	pub prepare_worker_spawn_timeout: Duration,
225	/// The maximum number of workers that can be spawned in the prepare pool for tasks with the
226	/// priority below critical.
227	pub prepare_workers_soft_max_num: usize,
228	/// The absolute number of workers that can be spawned in the prepare pool.
229	pub prepare_workers_hard_max_num: usize,
230
231	/// The path to the program that can be used to spawn the execute workers.
232	pub execute_worker_program_path: PathBuf,
233	/// The time allotted for an execute worker to spawn and report to the host.
234	pub execute_worker_spawn_timeout: Duration,
235	/// The maximum number of execute workers that can run at the same time.
236	pub execute_workers_max_num: usize,
237}
238
239impl Config {
240	/// Create a new instance of the configuration.
241	pub fn new(
242		cache_path: PathBuf,
243		node_version: Option<String>,
244		secure_validator_mode: bool,
245		prepare_worker_program_path: PathBuf,
246		execute_worker_program_path: PathBuf,
247		execute_workers_max_num: usize,
248		prepare_workers_soft_max_num: usize,
249		prepare_workers_hard_max_num: usize,
250	) -> Self {
251		Self {
252			cache_path,
253			node_version,
254			secure_validator_mode,
255
256			prepare_worker_program_path,
257			prepare_worker_spawn_timeout: Duration::from_secs(3),
258			prepare_workers_soft_max_num,
259			prepare_workers_hard_max_num,
260
261			execute_worker_program_path,
262			execute_worker_spawn_timeout: Duration::from_secs(3),
263			execute_workers_max_num,
264		}
265	}
266}
267
268/// Start the validation host.
269///
270/// Returns a [handle][`ValidationHost`] to the started validation host and the future. The future
271/// must be polled in order for validation host to function.
272///
273/// The future should not return normally but if it does then that indicates an unrecoverable error.
274/// In that case all pending requests will be canceled, dropping the result senders and new ones
275/// will be rejected.
276pub async fn start(
277	config: Config,
278	metrics: Metrics,
279) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
280	gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
281
282	// Make sure the cache is initialized before doing anything else.
283	let artifacts = Artifacts::new(&config.cache_path).await;
284
285	// Run checks for supported security features once per host startup. If some checks fail, warn
286	// if Secure Validator Mode is disabled and return an error otherwise.
287	#[cfg(target_os = "linux")]
288	let security_status = match crate::security::check_security_status(&config).await {
289		Ok(ok) => ok,
290		Err(err) => return Err(SubsystemError::Context(err)),
291	};
292	#[cfg(not(target_os = "linux"))]
293	let security_status = if config.secure_validator_mode {
294		gum::error!(
295			target: LOG_TARGET,
296			"{}{}{}",
297			crate::SECURE_MODE_ERROR,
298			crate::SECURE_LINUX_NOTE,
299			crate::IGNORE_SECURE_MODE_TIP
300		);
301		return Err(SubsystemError::Context(
302			"could not enable Secure Validator Mode for non-Linux; check logs".into(),
303		));
304	} else {
305		gum::warn!(
306			target: LOG_TARGET,
307			"{}{}",
308			crate::SECURE_MODE_WARNING,
309			crate::SECURE_LINUX_NOTE,
310		);
311		SecurityStatus::default()
312	};
313
314	let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
315
316	let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
317
318	let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
319		metrics.clone(),
320		config.prepare_worker_program_path.clone(),
321		config.cache_path.clone(),
322		config.prepare_worker_spawn_timeout,
323		config.node_version.clone(),
324		security_status.clone(),
325	);
326
327	let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
328		metrics.clone(),
329		config.prepare_workers_soft_max_num,
330		config.prepare_workers_hard_max_num,
331		config.cache_path.clone(),
332		to_prepare_pool,
333		from_prepare_pool,
334	);
335
336	let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
337		metrics,
338		config.execute_worker_program_path.to_owned(),
339		config.cache_path.clone(),
340		config.execute_workers_max_num,
341		config.execute_worker_spawn_timeout,
342		config.node_version,
343		security_status,
344	);
345
346	let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
347	let run_sweeper = sweeper_task(to_sweeper_rx);
348
349	let run_host = async move {
350		run(Inner {
351			cleanup_pulse_interval: Duration::from_secs(3600),
352			cleanup_config: ArtifactsCleanupConfig::default(),
353			artifacts,
354			to_host_rx,
355			to_prepare_queue_tx,
356			from_prepare_queue_rx,
357			to_execute_queue_tx,
358			from_execute_queue_rx,
359			to_sweeper_tx,
360			awaiting_prepare: AwaitingPrepare::default(),
361		})
362		.await
363	};
364
365	let task = async move {
366		// Bundle the sub-components' tasks together into a single future.
367		futures::select! {
368			_ = run_host.fuse() => {},
369			_ = run_prepare_queue.fuse() => {},
370			_ = run_prepare_pool.fuse() => {},
371			_ = run_execute_queue.fuse() => {},
372			_ = run_sweeper.fuse() => {},
373		};
374	};
375
376	Ok((validation_host, task))
377}
378
379/// A mapping from an artifact ID which is in preparation state to the list of pending execution
380/// requests that should be executed once the artifact's preparation is finished.
381#[derive(Default)]
382struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
383
384impl AwaitingPrepare {
385	fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
386		self.0.entry(artifact_id).or_default().push(pending_execution_request);
387	}
388
389	fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
390		self.0.remove(artifact_id).unwrap_or_default()
391	}
392}
393
394struct Inner {
395	cleanup_pulse_interval: Duration,
396	cleanup_config: ArtifactsCleanupConfig,
397	artifacts: Artifacts,
398
399	to_host_rx: mpsc::Receiver<ToHost>,
400
401	to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
402	from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
403
404	to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
405	from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
406
407	to_sweeper_tx: mpsc::Sender<PathBuf>,
408
409	awaiting_prepare: AwaitingPrepare,
410}
411
412#[derive(Debug)]
413struct Fatal;
414
415async fn run(
416	Inner {
417		cleanup_pulse_interval,
418		cleanup_config,
419		mut artifacts,
420		to_host_rx,
421		from_prepare_queue_rx,
422		mut to_prepare_queue_tx,
423		from_execute_queue_rx,
424		mut to_execute_queue_tx,
425		mut to_sweeper_tx,
426		mut awaiting_prepare,
427	}: Inner,
428) {
429	macro_rules! break_if_fatal {
430		($expr:expr) => {
431			match $expr {
432				Err(Fatal) => {
433					gum::error!(
434						target: LOG_TARGET,
435						"Fatal error occurred, terminating the host. Line: {}",
436						line!(),
437					);
438					break
439				},
440				Ok(v) => v,
441			}
442		};
443	}
444
445	let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
446	futures::pin_mut!(cleanup_pulse);
447
448	let mut to_host_rx = to_host_rx.fuse();
449	let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
450	let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
451
452	loop {
453		// biased to make it behave deterministically for tests.
454		futures::select_biased! {
455			from_execute_queue_rx = from_execute_queue_rx.next() => {
456				let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
457				let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
458				break_if_fatal!(handle_artifact_removal(
459					&mut to_sweeper_tx,
460					&mut artifacts,
461					artifact,
462					reply_to,
463				).await);
464			},
465			() = cleanup_pulse.select_next_some() => {
466				// `select_next_some` because we don't expect this to fail, but if it does, we
467				// still don't fail. The trade-off is that the compiled cache will start growing
468				// in size. That is, however, rather a slow process and hopefully the operator
469				// will notice it.
470
471				break_if_fatal!(handle_cleanup_pulse(
472					&mut to_sweeper_tx,
473					&mut artifacts,
474					&cleanup_config,
475				).await);
476			},
477			to_host = to_host_rx.next() => {
478				let to_host = match to_host {
479					None => {
480						// The sending half of the channel has been closed, meaning the
481						// `ValidationHost` struct was dropped. Shutting down gracefully.
482						break;
483					},
484					Some(to_host) => to_host,
485				};
486
487				// If the artifact failed before, it could be re-scheduled for preparation here if
488				// the preparation failure cooldown has elapsed.
489				break_if_fatal!(handle_to_host(
490					&mut artifacts,
491					&mut to_prepare_queue_tx,
492					&mut to_execute_queue_tx,
493					&mut awaiting_prepare,
494					to_host,
495				)
496				.await);
497			},
498			from_prepare_queue = from_prepare_queue_rx.next() => {
499				let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
500
501				// Note that the preparation outcome is always reported as concluded.
502				//
503				// That's because the error conditions are written into the artifact and will be
504				// reported at the time of the execution. It potentially, but not necessarily, can
505				// be scheduled for execution as a result of this function call, in case there are
506				// pending executions.
507				//
508				// We could be eager in terms of reporting and plumb the result from the preparation
509				// worker but we don't for the sake of simplicity.
510				break_if_fatal!(handle_prepare_done(
511					&mut artifacts,
512					&mut to_execute_queue_tx,
513					&mut awaiting_prepare,
514					from_queue,
515				).await);
516			},
517		}
518	}
519}
520
521async fn handle_to_host(
522	artifacts: &mut Artifacts,
523	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
524	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
525	awaiting_prepare: &mut AwaitingPrepare,
526	to_host: ToHost,
527) -> Result<(), Fatal> {
528	match to_host {
529		ToHost::PrecheckPvf { pvf, result_tx } => {
530			handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
531		},
532		ToHost::ExecutePvf(inputs) => {
533			handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
534				.await?;
535		},
536		ToHost::HeadsUp { active_pvfs } =>
537			handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
538		ToHost::UpdateActiveLeaves { update, ancestors } =>
539			handle_update_active_leaves(execute_queue, update, ancestors).await?,
540		#[cfg(feature = "test-utils")]
541		ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => {
542			artifacts.replace_artifact_checksum(checksum, new_checksum);
543		},
544	}
545
546	Ok(())
547}
548
549/// Handles PVF prechecking requests.
550///
551/// This tries to prepare the PVF by compiling the WASM blob within a timeout set in
552/// `PvfPrepData`.
553///
554/// We don't retry artifacts that previously failed preparation. We don't expect multiple
555/// pre-checking requests.
556async fn handle_precheck_pvf(
557	artifacts: &mut Artifacts,
558	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
559	pvf: PvfPrepData,
560	result_sender: PrecheckResultSender,
561) -> Result<(), Fatal> {
562	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
563
564	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
565		match state {
566			ArtifactState::Prepared { last_time_needed, .. } => {
567				*last_time_needed = SystemTime::now();
568				let _ = result_sender.send(Ok(()));
569			},
570			ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
571				waiting_for_response.push(result_sender),
572			ArtifactState::FailedToProcess { error, .. } => {
573				// Do not retry an artifact that previously failed preparation.
574				let _ = result_sender.send(PrecheckResult::Err(error.clone()));
575			},
576		}
577	} else {
578		artifacts.insert_preparing(artifact_id, vec![result_sender]);
579		send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
580			.await?;
581	}
582	Ok(())
583}
584
585/// Handles PVF execution.
586///
587/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is
588/// already a preparation job, we coalesce the two preparation jobs.
589///
590/// If the prepare job succeeded previously, we will enqueue an execute job right away.
591///
592/// If the prepare job failed previously, we may retry it under certain conditions.
593///
594/// When preparing for execution, we use a more lenient timeout
595/// ([`DEFAULT_LENIENT_PREPARATION_TIMEOUT`](polkadot_primitives::executor_params::DEFAULT_LENIENT_PREPARATION_TIMEOUT))
596/// than when prechecking.
597async fn handle_execute_pvf(
598	artifacts: &mut Artifacts,
599	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
600	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
601	awaiting_prepare: &mut AwaitingPrepare,
602	inputs: ExecutePvfInputs,
603) -> Result<(), Fatal> {
604	let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, exec_kind, result_tx } = inputs;
605	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
606	let executor_params = (*pvf.executor_params()).clone();
607
608	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
609		match state {
610			ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => {
611				let file_metadata = std::fs::metadata(path);
612
613				if file_metadata.is_ok() {
614					*last_time_needed = SystemTime::now();
615
616					// This artifact has already been prepared, send it to the execute queue.
617					send_execute(
618						execute_queue,
619						execute::ToQueue::Enqueue {
620							artifact: ArtifactPathId::new(artifact_id, path, *checksum),
621							pending_execution_request: PendingExecutionRequest {
622								exec_timeout,
623								pvd,
624								pov,
625								executor_params,
626								exec_kind,
627								result_tx,
628							},
629						},
630					)
631					.await?;
632				} else {
633					gum::warn!(
634						target: LOG_TARGET,
635						?pvf,
636						?artifact_id,
637						"handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
638					);
639
640					// The artifact has been prepared previously but the file is missing, prepare it
641					// again.
642					*state = ArtifactState::Preparing {
643						waiting_for_response: Vec::new(),
644						num_failures: 0,
645					};
646					enqueue_prepare_for_execute(
647						prepare_queue,
648						awaiting_prepare,
649						pvf,
650						priority,
651						artifact_id,
652						PendingExecutionRequest {
653							exec_timeout,
654							pvd,
655							pov,
656							executor_params,
657							exec_kind,
658							result_tx,
659						},
660					)
661					.await?;
662				}
663			},
664			ArtifactState::Preparing { .. } => {
665				awaiting_prepare.add(
666					artifact_id,
667					PendingExecutionRequest {
668						exec_timeout,
669						pvd,
670						pov,
671						executor_params,
672						result_tx,
673						exec_kind,
674					},
675				);
676			},
677			ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
678				if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
679					gum::warn!(
680						target: LOG_TARGET,
681						?pvf,
682						?artifact_id,
683						?last_time_failed,
684						%num_failures,
685						%error,
686						"handle_execute_pvf: Re-trying failed PVF preparation."
687					);
688
689					// If we are allowed to retry the failed prepare job, change the state to
690					// Preparing and re-queue this job.
691					*state = ArtifactState::Preparing {
692						waiting_for_response: Vec::new(),
693						num_failures: *num_failures,
694					};
695					enqueue_prepare_for_execute(
696						prepare_queue,
697						awaiting_prepare,
698						pvf,
699						priority,
700						artifact_id,
701						PendingExecutionRequest {
702							exec_timeout,
703							pvd,
704							pov,
705							executor_params,
706							exec_kind,
707							result_tx,
708						},
709					)
710					.await?;
711				} else {
712					let _ = result_tx.send(Err(ValidationError::from(error.clone())));
713				}
714			},
715		}
716	} else {
717		// Artifact is unknown: register it and enqueue a job with the corresponding priority and
718		// PVF.
719		artifacts.insert_preparing(artifact_id.clone(), Vec::new());
720		enqueue_prepare_for_execute(
721			prepare_queue,
722			awaiting_prepare,
723			pvf,
724			priority,
725			artifact_id,
726			PendingExecutionRequest {
727				exec_timeout,
728				pvd,
729				pov,
730				executor_params,
731				result_tx,
732				exec_kind,
733			},
734		)
735		.await?;
736	}
737
738	Ok(())
739}
740
741async fn handle_heads_up(
742	artifacts: &mut Artifacts,
743	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
744	active_pvfs: Vec<PvfPrepData>,
745) -> Result<(), Fatal> {
746	let now = SystemTime::now();
747
748	for active_pvf in active_pvfs {
749		let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
750		if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
751			match state {
752				ArtifactState::Prepared { last_time_needed, .. } => {
753					*last_time_needed = now;
754				},
755				ArtifactState::Preparing { .. } => {
756					// The artifact is already being prepared, so we don't need to do anything.
757				},
758				ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
759					if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
760						gum::warn!(
761							target: LOG_TARGET,
762							?active_pvf,
763							?artifact_id,
764							?last_time_failed,
765							%num_failures,
766							%error,
767							"handle_heads_up: Re-trying failed PVF preparation."
768						);
769
770						// If we are allowed to retry the failed prepare job, change the state to
771						// Preparing and re-queue this job.
772						*state = ArtifactState::Preparing {
773							waiting_for_response: vec![],
774							num_failures: *num_failures,
775						};
776						send_prepare(
777							prepare_queue,
778							prepare::ToQueue::Enqueue {
779								priority: Priority::Normal,
780								pvf: active_pvf,
781							},
782						)
783						.await?;
784					}
785				},
786			}
787		} else {
788			// It's not in the artifacts, so we need to enqueue a job to prepare it.
789			artifacts.insert_preparing(artifact_id.clone(), Vec::new());
790
791			send_prepare(
792				prepare_queue,
793				prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
794			)
795			.await?;
796		}
797	}
798
799	Ok(())
800}
801
802async fn handle_prepare_done(
803	artifacts: &mut Artifacts,
804	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
805	awaiting_prepare: &mut AwaitingPrepare,
806	from_queue: prepare::FromQueue,
807) -> Result<(), Fatal> {
808	let prepare::FromQueue { artifact_id, result } = from_queue;
809
810	// Make some sanity checks and extract the current state.
811	let state = match artifacts.artifact_state_mut(&artifact_id) {
812		None => {
813			// before sending request to prepare, the artifact is inserted with `preparing` state;
814			// the requests are deduplicated for the same artifact id;
815			// there is only one possible state change: prepare is done;
816			// thus the artifact cannot be unknown, only preparing;
817			// qed.
818			never!("an unknown artifact was prepared: {:?}", artifact_id);
819			return Ok(())
820		},
821		Some(ArtifactState::Prepared { .. }) => {
822			// before sending request to prepare, the artifact is inserted with `preparing` state;
823			// the requests are deduplicated for the same artifact id;
824			// there is only one possible state change: prepare is done;
825			// thus the artifact cannot be prepared, only preparing;
826			// qed.
827			never!("the artifact is already prepared: {:?}", artifact_id);
828			return Ok(())
829		},
830		Some(ArtifactState::FailedToProcess { .. }) => {
831			// The reasoning is similar to the above, the artifact cannot be
832			// processed at this point.
833			never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
834			return Ok(())
835		},
836		Some(state @ ArtifactState::Preparing { .. }) => state,
837	};
838
839	let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
840		state
841	{
842		for result_sender in waiting_for_response.drain(..) {
843			let result = result.clone().map(|_| ());
844			let _ = result_sender.send(result);
845		}
846		num_failures
847	} else {
848		never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
849		return Ok(())
850	};
851
852	// It's finally time to dispatch all the execution requests that were waiting for this artifact
853	// to be prepared.
854	let pending_requests = awaiting_prepare.take(&artifact_id);
855	for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } in
856		pending_requests
857	{
858		if result_tx.is_canceled() {
859			// Preparation could've taken quite a bit of time and the requester may be not
860			// interested in execution anymore, in which case we just skip the request.
861			continue
862		}
863
864		let (path, checksum) = match &result {
865			Ok(success) => (success.path.clone(), success.checksum),
866			Err(error) => {
867				let _ = result_tx.send(Err(ValidationError::from(error.clone())));
868				continue
869			},
870		};
871
872		send_execute(
873			execute_queue,
874			execute::ToQueue::Enqueue {
875				artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum),
876				pending_execution_request: PendingExecutionRequest {
877					exec_timeout,
878					pvd,
879					pov,
880					executor_params,
881					exec_kind,
882					result_tx,
883				},
884			},
885		)
886		.await?;
887	}
888
889	*state = match result {
890		Ok(PrepareSuccess { checksum, path, size, .. }) =>
891			ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size },
892		Err(error) => {
893			let last_time_failed = SystemTime::now();
894			let num_failures = *num_failures + 1;
895
896			gum::error!(
897				target: LOG_TARGET,
898				?artifact_id,
899				time_failed = ?last_time_failed,
900				%num_failures,
901				"artifact preparation failed: {}",
902				error
903			);
904			ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
905		},
906	};
907
908	Ok(())
909}
910
911async fn handle_update_active_leaves(
912	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
913	update: ActiveLeavesUpdate,
914	ancestors: Vec<Hash>,
915) -> Result<(), Fatal> {
916	send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
917}
918
919async fn send_prepare(
920	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
921	to_queue: prepare::ToQueue,
922) -> Result<(), Fatal> {
923	prepare_queue.send(to_queue).await.map_err(|_| Fatal)
924}
925
926async fn send_execute(
927	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
928	to_queue: execute::ToQueue,
929) -> Result<(), Fatal> {
930	execute_queue.send(to_queue).await.map_err(|_| Fatal)
931}
932
933/// Sends a job to the preparation queue, and adds an execution request that will wait to run after
934/// this prepare job has finished.
935async fn enqueue_prepare_for_execute(
936	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
937	awaiting_prepare: &mut AwaitingPrepare,
938	pvf: PvfPrepData,
939	priority: Priority,
940	artifact_id: ArtifactId,
941	pending_execution_request: PendingExecutionRequest,
942) -> Result<(), Fatal> {
943	send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
944
945	// Add an execution request that will wait to run after this prepare job has finished.
946	awaiting_prepare.add(artifact_id, pending_execution_request);
947
948	Ok(())
949}
950
951async fn handle_cleanup_pulse(
952	sweeper_tx: &mut mpsc::Sender<PathBuf>,
953	artifacts: &mut Artifacts,
954	cleanup_config: &ArtifactsCleanupConfig,
955) -> Result<(), Fatal> {
956	let to_remove = artifacts.prune(cleanup_config);
957	gum::debug!(
958		target: LOG_TARGET,
959		"PVF pruning: {} artifacts reached their end of life",
960		to_remove.len(),
961	);
962	for (artifact_id, path) in to_remove {
963		gum::debug!(
964			target: LOG_TARGET,
965			validation_code_hash = ?artifact_id.code_hash,
966			"pruning artifact",
967		);
968		sweeper_tx.send(path).await.map_err(|_| Fatal)?;
969	}
970
971	Ok(())
972}
973
974async fn handle_artifact_removal(
975	sweeper_tx: &mut mpsc::Sender<PathBuf>,
976	artifacts: &mut Artifacts,
977	artifact_id: ArtifactId,
978	reply_to: oneshot::Sender<()>,
979) -> Result<(), Fatal> {
980	let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
981		artifact
982	} else {
983		// if we haven't found the artifact by its id,
984		// it has been probably removed
985		// anyway with the randomness of the artifact name
986		// it is safe to ignore
987		return Ok(());
988	};
989	reply_to
990		.send(())
991		.expect("the execute queue waits for the artifact remove confirmation; qed");
992	// Thanks to the randomness of the artifact name (see
993	// `artifacts::generate_artifact_path`) there is no issue with any name conflict on
994	// future repreparation.
995	// So we can confirm the artifact removal already
996	gum::debug!(
997		target: LOG_TARGET,
998		validation_code_hash = ?artifact_id.code_hash,
999		"PVF pruning: pruning artifact by request from the execute queue",
1000	);
1001	sweeper_tx.send(path).await.map_err(|_| Fatal)?;
1002	Ok(())
1003}
1004
1005/// A simple task which sole purpose is to delete files thrown at it.
1006async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
1007	loop {
1008		match sweeper_rx.next().await {
1009			None => break,
1010			Some(condemned) => {
1011				let result = tokio::fs::remove_file(&condemned).await;
1012				gum::trace!(
1013					target: LOG_TARGET,
1014					?result,
1015					"Swept the artifact file {}",
1016					condemned.display(),
1017				);
1018			},
1019		}
1020	}
1021}
1022
1023/// Check if the conditions to retry a prepare job have been met.
1024fn can_retry_prepare_after_failure(
1025	last_time_failed: SystemTime,
1026	num_failures: u32,
1027	error: &PrepareError,
1028) -> bool {
1029	if error.is_deterministic() {
1030		// This error is considered deterministic, so it will probably be reproducible. Don't retry.
1031		return false
1032	}
1033
1034	// Retry if the retry cooldown has elapsed and if we have already retried less than
1035	// `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves.
1036	SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
1037		num_failures <= NUM_PREPARE_RETRIES
1038}
1039
1040/// A stream that yields a pulse continuously at a given interval.
1041fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
1042	futures::stream::unfold(interval, {
1043		|interval| async move {
1044			futures_timer::Delay::new(interval).await;
1045			Some(((), interval))
1046		}
1047	})
1048	.map(|_| ())
1049}
1050
1051#[cfg(test)]
1052pub(crate) mod tests {
1053	use super::*;
1054	use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
1055	use assert_matches::assert_matches;
1056	use futures::future::BoxFuture;
1057	use polkadot_node_primitives::BlockData;
1058	use sp_core::H256;
1059
1060	const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
1061	pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
1062
1063	#[tokio::test]
1064	async fn pulse_test() {
1065		let pulse = pulse_every(Duration::from_millis(100));
1066		futures::pin_mut!(pulse);
1067
1068		for _ in 0..5 {
1069			let start = std::time::Instant::now();
1070			let _ = pulse.next().await.unwrap();
1071
1072			let el = start.elapsed().as_millis();
1073			assert!(el > 50 && el < 150, "pulse duration: {}", el);
1074		}
1075	}
1076
1077	struct Builder {
1078		cleanup_pulse_interval: Duration,
1079		cleanup_config: ArtifactsCleanupConfig,
1080		artifacts: Artifacts,
1081	}
1082
1083	impl Builder {
1084		fn default() -> Self {
1085			Self {
1086				// these are selected high to not interfere in tests in which pruning is irrelevant.
1087				cleanup_pulse_interval: Duration::from_secs(3600),
1088				cleanup_config: ArtifactsCleanupConfig::default(),
1089				artifacts: Artifacts::empty(),
1090			}
1091		}
1092
1093		fn build(self) -> Test {
1094			Test::new(self)
1095		}
1096	}
1097
1098	struct Test {
1099		to_host_tx: Option<mpsc::Sender<ToHost>>,
1100
1101		to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
1102		from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
1103		to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
1104		#[allow(unused)]
1105		from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
1106		to_sweeper_rx: mpsc::Receiver<PathBuf>,
1107
1108		run: BoxFuture<'static, ()>,
1109	}
1110
1111	impl Test {
1112		fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
1113			let (to_host_tx, to_host_rx) = mpsc::channel(10);
1114			let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
1115			let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
1116			let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
1117			let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
1118			let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
1119
1120			let run = run(Inner {
1121				cleanup_pulse_interval,
1122				cleanup_config,
1123				artifacts,
1124				to_host_rx,
1125				to_prepare_queue_tx,
1126				from_prepare_queue_rx,
1127				to_execute_queue_tx,
1128				from_execute_queue_rx,
1129				to_sweeper_tx,
1130				awaiting_prepare: AwaitingPrepare::default(),
1131			})
1132			.boxed();
1133
1134			Self {
1135				to_host_tx: Some(to_host_tx),
1136				to_prepare_queue_rx,
1137				from_prepare_queue_tx,
1138				to_execute_queue_rx,
1139				from_execute_queue_tx,
1140				to_sweeper_rx,
1141				run,
1142			}
1143		}
1144
1145		fn host_handle(&mut self) -> ValidationHost {
1146			let to_host_tx = self.to_host_tx.take().unwrap();
1147			let security_status = Default::default();
1148			ValidationHost { to_host_tx, security_status }
1149		}
1150
1151		async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
1152		where
1153			T: Send,
1154		{
1155			run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
1156		}
1157
1158		async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
1159			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1160			run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
1161				.await
1162		}
1163
1164		async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
1165			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1166			run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
1167				.await
1168		}
1169
1170		async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
1171			use futures_timer::Delay;
1172
1173			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1174			run_until(
1175				&mut self.run,
1176				async {
1177					futures::select! {
1178						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1179						_ = to_prepare_queue_rx.next().fuse() => {
1180							panic!("the prepare queue is supposed to be empty")
1181						}
1182					}
1183				}
1184				.boxed(),
1185			)
1186			.await
1187		}
1188
1189		async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
1190			use futures_timer::Delay;
1191
1192			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1193			run_until(
1194				&mut self.run,
1195				async {
1196					futures::select! {
1197						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1198						_ = to_execute_queue_rx.next().fuse() => {
1199							panic!("the execute queue is supposed to be empty")
1200						}
1201					}
1202				}
1203				.boxed(),
1204			)
1205			.await
1206		}
1207
1208		async fn poll_ensure_to_sweeper_is_empty(&mut self) {
1209			use futures_timer::Delay;
1210
1211			let to_sweeper_rx = &mut self.to_sweeper_rx;
1212			run_until(
1213				&mut self.run,
1214				async {
1215					futures::select! {
1216						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1217						msg = to_sweeper_rx.next().fuse() => {
1218							panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
1219						}
1220					}
1221				}
1222				.boxed(),
1223			)
1224			.await
1225		}
1226	}
1227
1228	async fn run_until<R>(
1229		task: &mut (impl Future<Output = ()> + Unpin),
1230		mut fut: (impl Future<Output = R> + Unpin),
1231	) -> R {
1232		use std::task::Poll;
1233
1234		let start = std::time::Instant::now();
1235		let fut = &mut fut;
1236		loop {
1237			if start.elapsed() > std::time::Duration::from_secs(2) {
1238				// We expect that this will take only a couple of iterations and thus to take way
1239				// less than a second.
1240				panic!("timeout");
1241			}
1242
1243			if let Poll::Ready(r) = futures::poll!(&mut *fut) {
1244				break r
1245			}
1246
1247			if futures::poll!(&mut *task).is_ready() {
1248				panic!()
1249			}
1250		}
1251	}
1252
1253	#[tokio::test]
1254	async fn shutdown_on_handle_drop() {
1255		let test = Builder::default().build();
1256
1257		let join_handle = tokio::task::spawn(test.run);
1258
1259		// Dropping the handle will lead to conclusion of the read part and thus will make the event
1260		// loop to stop, which in turn will resolve the join handle.
1261		drop(test.to_host_tx);
1262		join_handle.await.unwrap();
1263	}
1264
1265	#[tokio::test]
1266	async fn pruning() {
1267		let mock_now = SystemTime::now() - Duration::from_millis(1000);
1268		let tempdir = tempfile::tempdir().unwrap();
1269		let cache_path = tempdir.path();
1270
1271		let mut builder = Builder::default();
1272		builder.cleanup_pulse_interval = Duration::from_millis(100);
1273		builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
1274		let path1 = generate_artifact_path(cache_path);
1275		let path2 = generate_artifact_path(cache_path);
1276		builder.artifacts.insert_prepared(
1277			artifact_id(1),
1278			path1.clone(),
1279			Default::default(),
1280			mock_now,
1281			1024,
1282		);
1283		builder.artifacts.insert_prepared(
1284			artifact_id(2),
1285			path2.clone(),
1286			Default::default(),
1287			mock_now,
1288			1024,
1289		);
1290		let mut test = builder.build();
1291		let mut host = test.host_handle();
1292
1293		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1294
1295		let to_sweeper_rx = &mut test.to_sweeper_rx;
1296		run_until(
1297			&mut test.run,
1298			async {
1299				assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
1300			}
1301			.boxed(),
1302		)
1303		.await;
1304
1305		// Extend TTL for the first artifact and make sure we don't receive another file removal
1306		// request.
1307		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1308		test.poll_ensure_to_sweeper_is_empty().await;
1309	}
1310
1311	#[tokio::test]
1312	async fn execute_pvf_requests() {
1313		let mut test = Builder::default().build();
1314		let mut host = test.host_handle();
1315		let pvd = Arc::new(PersistedValidationData {
1316			parent_head: Default::default(),
1317			relay_parent_number: 1u32,
1318			relay_parent_storage_root: H256::default(),
1319			max_pov_size: 4096 * 1024,
1320		});
1321		let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
1322		let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
1323
1324		let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
1325		host.execute_pvf(
1326			PvfPrepData::from_discriminator(1),
1327			TEST_EXECUTION_TIMEOUT,
1328			pvd.clone(),
1329			pov1.clone(),
1330			Priority::Normal,
1331			PvfExecKind::Backing(H256::default()),
1332			result_tx,
1333		)
1334		.await
1335		.unwrap();
1336
1337		let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
1338		host.execute_pvf(
1339			PvfPrepData::from_discriminator(1),
1340			TEST_EXECUTION_TIMEOUT,
1341			pvd.clone(),
1342			pov1,
1343			Priority::Critical,
1344			PvfExecKind::Backing(H256::default()),
1345			result_tx,
1346		)
1347		.await
1348		.unwrap();
1349
1350		let (result_tx, result_rx_pvf_2) = oneshot::channel();
1351		host.execute_pvf(
1352			PvfPrepData::from_discriminator(2),
1353			TEST_EXECUTION_TIMEOUT,
1354			pvd,
1355			pov2,
1356			Priority::Normal,
1357			PvfExecKind::Backing(H256::default()),
1358			result_tx,
1359		)
1360		.await
1361		.unwrap();
1362
1363		assert_matches!(
1364			test.poll_and_recv_to_prepare_queue().await,
1365			prepare::ToQueue::Enqueue { .. }
1366		);
1367		assert_matches!(
1368			test.poll_and_recv_to_prepare_queue().await,
1369			prepare::ToQueue::Enqueue { .. }
1370		);
1371
1372		test.from_prepare_queue_tx
1373			.send(prepare::FromQueue {
1374				artifact_id: artifact_id(1),
1375				result: Ok(PrepareSuccess::default()),
1376			})
1377			.await
1378			.unwrap();
1379		let result_tx_pvf_1_1 = assert_matches!(
1380			test.poll_and_recv_to_execute_queue().await,
1381			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1382		);
1383		let result_tx_pvf_1_2 = assert_matches!(
1384			test.poll_and_recv_to_execute_queue().await,
1385			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1386		);
1387
1388		test.from_prepare_queue_tx
1389			.send(prepare::FromQueue {
1390				artifact_id: artifact_id(2),
1391				result: Ok(PrepareSuccess::default()),
1392			})
1393			.await
1394			.unwrap();
1395		let result_tx_pvf_2 = assert_matches!(
1396			test.poll_and_recv_to_execute_queue().await,
1397			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1398		);
1399
1400		result_tx_pvf_1_1
1401			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1402			.unwrap();
1403		assert_matches!(
1404			result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
1405			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1406		);
1407
1408		result_tx_pvf_1_2
1409			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1410			.unwrap();
1411		assert_matches!(
1412			result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
1413			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1414		);
1415
1416		result_tx_pvf_2
1417			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1418			.unwrap();
1419		assert_matches!(
1420			result_rx_pvf_2.now_or_never().unwrap().unwrap(),
1421			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1422		);
1423	}
1424
1425	#[tokio::test]
1426	async fn precheck_pvf() {
1427		let mut test = Builder::default().build();
1428		let mut host = test.host_handle();
1429
1430		// First, test a simple precheck request.
1431		let (result_tx, result_rx) = oneshot::channel();
1432		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1433			.await
1434			.unwrap();
1435
1436		// The queue received the prepare request.
1437		assert_matches!(
1438			test.poll_and_recv_to_prepare_queue().await,
1439			prepare::ToQueue::Enqueue { .. }
1440		);
1441		// Send `Ok` right away and poll the host.
1442		test.from_prepare_queue_tx
1443			.send(prepare::FromQueue {
1444				artifact_id: artifact_id(1),
1445				result: Ok(PrepareSuccess::default()),
1446			})
1447			.await
1448			.unwrap();
1449		// No pending execute requests.
1450		test.poll_ensure_to_execute_queue_is_empty().await;
1451		// Received the precheck result.
1452		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1453
1454		// Send multiple requests for the same PVF.
1455		let mut precheck_receivers = Vec::new();
1456		for _ in 0..3 {
1457			let (result_tx, result_rx) = oneshot::channel();
1458			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1459				.await
1460				.unwrap();
1461			precheck_receivers.push(result_rx);
1462		}
1463		// Received prepare request.
1464		assert_matches!(
1465			test.poll_and_recv_to_prepare_queue().await,
1466			prepare::ToQueue::Enqueue { .. }
1467		);
1468		test.from_prepare_queue_tx
1469			.send(prepare::FromQueue {
1470				artifact_id: artifact_id(2),
1471				result: Err(PrepareError::TimedOut),
1472			})
1473			.await
1474			.unwrap();
1475		test.poll_ensure_to_execute_queue_is_empty().await;
1476		for result_rx in precheck_receivers {
1477			assert_matches!(
1478				result_rx.now_or_never().unwrap().unwrap(),
1479				Err(PrepareError::TimedOut)
1480			);
1481		}
1482	}
1483
1484	#[tokio::test]
1485	async fn test_prepare_done() {
1486		let mut test = Builder::default().build();
1487		let mut host = test.host_handle();
1488		let pvd = Arc::new(PersistedValidationData {
1489			parent_head: Default::default(),
1490			relay_parent_number: 1u32,
1491			relay_parent_storage_root: H256::default(),
1492			max_pov_size: 4096 * 1024,
1493		});
1494		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1495
1496		// Test mixed cases of receiving execute and precheck requests
1497		// for the same PVF.
1498
1499		// Send PVF for the execution and request the prechecking for it.
1500		let (result_tx, result_rx_execute) = oneshot::channel();
1501		host.execute_pvf(
1502			PvfPrepData::from_discriminator(1),
1503			TEST_EXECUTION_TIMEOUT,
1504			pvd.clone(),
1505			pov.clone(),
1506			Priority::Critical,
1507			PvfExecKind::Backing(H256::default()),
1508			result_tx,
1509		)
1510		.await
1511		.unwrap();
1512
1513		assert_matches!(
1514			test.poll_and_recv_to_prepare_queue().await,
1515			prepare::ToQueue::Enqueue { .. }
1516		);
1517
1518		let (result_tx, result_rx) = oneshot::channel();
1519		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1520			.await
1521			.unwrap();
1522
1523		// Suppose the preparation failed, the execution queue is empty and both
1524		// "clients" receive their results.
1525		test.from_prepare_queue_tx
1526			.send(prepare::FromQueue {
1527				artifact_id: artifact_id(1),
1528				result: Err(PrepareError::TimedOut),
1529			})
1530			.await
1531			.unwrap();
1532		test.poll_ensure_to_execute_queue_is_empty().await;
1533		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
1534		assert_matches!(
1535			result_rx_execute.now_or_never().unwrap().unwrap(),
1536			Err(ValidationError::Internal(_))
1537		);
1538
1539		// Reversed case: first send multiple precheck requests, then ask for an execution.
1540		let mut precheck_receivers = Vec::new();
1541		for _ in 0..3 {
1542			let (result_tx, result_rx) = oneshot::channel();
1543			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1544				.await
1545				.unwrap();
1546			precheck_receivers.push(result_rx);
1547		}
1548
1549		let (result_tx, _result_rx_execute) = oneshot::channel();
1550		host.execute_pvf(
1551			PvfPrepData::from_discriminator(2),
1552			TEST_EXECUTION_TIMEOUT,
1553			pvd,
1554			pov,
1555			Priority::Critical,
1556			PvfExecKind::Backing(H256::default()),
1557			result_tx,
1558		)
1559		.await
1560		.unwrap();
1561		// Received prepare request.
1562		assert_matches!(
1563			test.poll_and_recv_to_prepare_queue().await,
1564			prepare::ToQueue::Enqueue { .. }
1565		);
1566		test.from_prepare_queue_tx
1567			.send(prepare::FromQueue {
1568				artifact_id: artifact_id(2),
1569				result: Ok(PrepareSuccess::default()),
1570			})
1571			.await
1572			.unwrap();
1573		// The execute queue receives new request, preckecking is finished and we can
1574		// fetch results.
1575		assert_matches!(
1576			test.poll_and_recv_to_execute_queue().await,
1577			execute::ToQueue::Enqueue { .. }
1578		);
1579		for result_rx in precheck_receivers {
1580			assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1581		}
1582	}
1583
1584	// Test that multiple prechecking requests do not trigger preparation retries if the first one
1585	// failed.
1586	#[tokio::test]
1587	async fn test_precheck_prepare_no_retry() {
1588		let mut test = Builder::default().build();
1589		let mut host = test.host_handle();
1590
1591		// Submit a precheck request that fails.
1592		let (result_tx, result_rx) = oneshot::channel();
1593		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1594			.await
1595			.unwrap();
1596
1597		// The queue received the prepare request.
1598		assert_matches!(
1599			test.poll_and_recv_to_prepare_queue().await,
1600			prepare::ToQueue::Enqueue { .. }
1601		);
1602		// Send a PrepareError.
1603		test.from_prepare_queue_tx
1604			.send(prepare::FromQueue {
1605				artifact_id: artifact_id(1),
1606				result: Err(PrepareError::TimedOut),
1607			})
1608			.await
1609			.unwrap();
1610
1611		// The result should contain the error.
1612		let result = test.poll_and_recv_result(result_rx).await;
1613		assert_matches!(result, Err(PrepareError::TimedOut));
1614
1615		// Submit another precheck request.
1616		let (result_tx_2, result_rx_2) = oneshot::channel();
1617		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
1618			.await
1619			.unwrap();
1620
1621		// Assert the prepare queue is empty.
1622		test.poll_ensure_to_prepare_queue_is_empty().await;
1623
1624		// The result should contain the original error.
1625		let result = test.poll_and_recv_result(result_rx_2).await;
1626		assert_matches!(result, Err(PrepareError::TimedOut));
1627
1628		// Pause for enough time to reset the cooldown for this failed prepare request.
1629		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1630
1631		// Submit another precheck request.
1632		let (result_tx_3, result_rx_3) = oneshot::channel();
1633		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
1634			.await
1635			.unwrap();
1636
1637		// Assert the prepare queue is empty - we do not retry for precheck requests.
1638		test.poll_ensure_to_prepare_queue_is_empty().await;
1639
1640		// The result should still contain the original error.
1641		let result = test.poll_and_recv_result(result_rx_3).await;
1642		assert_matches!(result, Err(PrepareError::TimedOut));
1643	}
1644
1645	// Test that multiple execution requests trigger preparation retries if the first one failed due
1646	// to a potentially non-reproducible error.
1647	#[tokio::test]
1648	async fn test_execute_prepare_retry() {
1649		let mut test = Builder::default().build();
1650		let mut host = test.host_handle();
1651		let pvd = Arc::new(PersistedValidationData {
1652			parent_head: Default::default(),
1653			relay_parent_number: 1u32,
1654			relay_parent_storage_root: H256::default(),
1655			max_pov_size: 4096 * 1024,
1656		});
1657		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1658
1659		// Submit a execute request that fails.
1660		let (result_tx, result_rx) = oneshot::channel();
1661		host.execute_pvf(
1662			PvfPrepData::from_discriminator(1),
1663			TEST_EXECUTION_TIMEOUT,
1664			pvd.clone(),
1665			pov.clone(),
1666			Priority::Critical,
1667			PvfExecKind::Backing(H256::default()),
1668			result_tx,
1669		)
1670		.await
1671		.unwrap();
1672
1673		// The queue received the prepare request.
1674		assert_matches!(
1675			test.poll_and_recv_to_prepare_queue().await,
1676			prepare::ToQueue::Enqueue { .. }
1677		);
1678		// Send a PrepareError.
1679		test.from_prepare_queue_tx
1680			.send(prepare::FromQueue {
1681				artifact_id: artifact_id(1),
1682				result: Err(PrepareError::TimedOut),
1683			})
1684			.await
1685			.unwrap();
1686
1687		// The result should contain the error.
1688		let result = test.poll_and_recv_result(result_rx).await;
1689		assert_matches!(result, Err(ValidationError::Internal(_)));
1690
1691		// Submit another execute request. We shouldn't try to prepare again, yet.
1692		let (result_tx_2, result_rx_2) = oneshot::channel();
1693		host.execute_pvf(
1694			PvfPrepData::from_discriminator(1),
1695			TEST_EXECUTION_TIMEOUT,
1696			pvd.clone(),
1697			pov.clone(),
1698			Priority::Critical,
1699			PvfExecKind::Backing(H256::default()),
1700			result_tx_2,
1701		)
1702		.await
1703		.unwrap();
1704
1705		// Assert the prepare queue is empty.
1706		test.poll_ensure_to_prepare_queue_is_empty().await;
1707
1708		// The result should contain the original error.
1709		let result = test.poll_and_recv_result(result_rx_2).await;
1710		assert_matches!(result, Err(ValidationError::Internal(_)));
1711
1712		// Pause for enough time to reset the cooldown for this failed prepare request.
1713		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1714
1715		// Submit another execute request.
1716		let (result_tx_3, result_rx_3) = oneshot::channel();
1717		host.execute_pvf(
1718			PvfPrepData::from_discriminator(1),
1719			TEST_EXECUTION_TIMEOUT,
1720			pvd.clone(),
1721			pov.clone(),
1722			Priority::Critical,
1723			PvfExecKind::Backing(H256::default()),
1724			result_tx_3,
1725		)
1726		.await
1727		.unwrap();
1728
1729		// Assert the prepare queue contains the request.
1730		assert_matches!(
1731			test.poll_and_recv_to_prepare_queue().await,
1732			prepare::ToQueue::Enqueue { .. }
1733		);
1734
1735		test.from_prepare_queue_tx
1736			.send(prepare::FromQueue {
1737				artifact_id: artifact_id(1),
1738				result: Ok(PrepareSuccess::default()),
1739			})
1740			.await
1741			.unwrap();
1742
1743		// Preparation should have been retried and succeeded this time.
1744		let result_tx_3 = assert_matches!(
1745			test.poll_and_recv_to_execute_queue().await,
1746			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1747		);
1748
1749		// Send an error for the execution here, just so we can check the result receiver is still
1750		// alive.
1751		result_tx_3
1752			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1753			.unwrap();
1754		assert_matches!(
1755			result_rx_3.now_or_never().unwrap().unwrap(),
1756			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1757		);
1758	}
1759
1760	// Test that multiple execution requests don't trigger preparation retries if the first one
1761	// failed due to a reproducible error (e.g. Prevalidation).
1762	#[tokio::test]
1763	async fn test_execute_prepare_no_retry() {
1764		let mut test = Builder::default().build();
1765		let mut host = test.host_handle();
1766		let pvd = Arc::new(PersistedValidationData {
1767			parent_head: Default::default(),
1768			relay_parent_number: 1u32,
1769			relay_parent_storage_root: H256::default(),
1770			max_pov_size: 4096 * 1024,
1771		});
1772		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1773
1774		// Submit an execute request that fails.
1775		let (result_tx, result_rx) = oneshot::channel();
1776		host.execute_pvf(
1777			PvfPrepData::from_discriminator(1),
1778			TEST_EXECUTION_TIMEOUT,
1779			pvd.clone(),
1780			pov.clone(),
1781			Priority::Critical,
1782			PvfExecKind::Backing(H256::default()),
1783			result_tx,
1784		)
1785		.await
1786		.unwrap();
1787
1788		// The queue received the prepare request.
1789		assert_matches!(
1790			test.poll_and_recv_to_prepare_queue().await,
1791			prepare::ToQueue::Enqueue { .. }
1792		);
1793		// Send a PrepareError.
1794		test.from_prepare_queue_tx
1795			.send(prepare::FromQueue {
1796				artifact_id: artifact_id(1),
1797				result: Err(PrepareError::Prevalidation("reproducible error".into())),
1798			})
1799			.await
1800			.unwrap();
1801
1802		// The result should contain the error.
1803		let result = test.poll_and_recv_result(result_rx).await;
1804		assert_matches!(result, Err(ValidationError::Preparation(_)));
1805
1806		// Submit another execute request.
1807		let (result_tx_2, result_rx_2) = oneshot::channel();
1808		host.execute_pvf(
1809			PvfPrepData::from_discriminator(1),
1810			TEST_EXECUTION_TIMEOUT,
1811			pvd.clone(),
1812			pov.clone(),
1813			Priority::Critical,
1814			PvfExecKind::Backing(H256::default()),
1815			result_tx_2,
1816		)
1817		.await
1818		.unwrap();
1819
1820		// Assert the prepare queue is empty.
1821		test.poll_ensure_to_prepare_queue_is_empty().await;
1822
1823		// The result should contain the original error.
1824		let result = test.poll_and_recv_result(result_rx_2).await;
1825		assert_matches!(result, Err(ValidationError::Preparation(_)));
1826
1827		// Pause for enough time to reset the cooldown for this failed prepare request.
1828		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1829
1830		// Submit another execute request.
1831		let (result_tx_3, result_rx_3) = oneshot::channel();
1832		host.execute_pvf(
1833			PvfPrepData::from_discriminator(1),
1834			TEST_EXECUTION_TIMEOUT,
1835			pvd.clone(),
1836			pov.clone(),
1837			Priority::Critical,
1838			PvfExecKind::Backing(H256::default()),
1839			result_tx_3,
1840		)
1841		.await
1842		.unwrap();
1843
1844		// Assert the prepare queue is empty - we do not retry for prevalidation errors.
1845		test.poll_ensure_to_prepare_queue_is_empty().await;
1846
1847		// The result should still contain the original error.
1848		let result = test.poll_and_recv_result(result_rx_3).await;
1849		assert_matches!(result, Err(ValidationError::Preparation(_)));
1850	}
1851
1852	// Test that multiple heads-up requests trigger preparation retries if the first one failed.
1853	#[tokio::test]
1854	async fn test_heads_up_prepare_retry() {
1855		let mut test = Builder::default().build();
1856		let mut host = test.host_handle();
1857
1858		// Submit a heads-up request that fails.
1859		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1860
1861		// The queue received the prepare request.
1862		assert_matches!(
1863			test.poll_and_recv_to_prepare_queue().await,
1864			prepare::ToQueue::Enqueue { .. }
1865		);
1866		// Send a PrepareError.
1867		test.from_prepare_queue_tx
1868			.send(prepare::FromQueue {
1869				artifact_id: artifact_id(1),
1870				result: Err(PrepareError::TimedOut),
1871			})
1872			.await
1873			.unwrap();
1874
1875		// Submit another heads-up request.
1876		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1877
1878		// Assert the prepare queue is empty.
1879		test.poll_ensure_to_prepare_queue_is_empty().await;
1880
1881		// Pause for enough time to reset the cooldown for this failed prepare request.
1882		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1883
1884		// Submit another heads-up request.
1885		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1886
1887		// Assert the prepare queue contains the request.
1888		assert_matches!(
1889			test.poll_and_recv_to_prepare_queue().await,
1890			prepare::ToQueue::Enqueue { .. }
1891		);
1892	}
1893
1894	#[tokio::test]
1895	async fn cancellation() {
1896		let mut test = Builder::default().build();
1897		let mut host = test.host_handle();
1898		let pvd = Arc::new(PersistedValidationData {
1899			parent_head: Default::default(),
1900			relay_parent_number: 1u32,
1901			relay_parent_storage_root: H256::default(),
1902			max_pov_size: 4096 * 1024,
1903		});
1904		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1905
1906		let (result_tx, result_rx) = oneshot::channel();
1907		host.execute_pvf(
1908			PvfPrepData::from_discriminator(1),
1909			TEST_EXECUTION_TIMEOUT,
1910			pvd,
1911			pov,
1912			Priority::Normal,
1913			PvfExecKind::Backing(H256::default()),
1914			result_tx,
1915		)
1916		.await
1917		.unwrap();
1918
1919		assert_matches!(
1920			test.poll_and_recv_to_prepare_queue().await,
1921			prepare::ToQueue::Enqueue { .. }
1922		);
1923
1924		test.from_prepare_queue_tx
1925			.send(prepare::FromQueue {
1926				artifact_id: artifact_id(1),
1927				result: Ok(PrepareSuccess::default()),
1928			})
1929			.await
1930			.unwrap();
1931
1932		drop(result_rx);
1933
1934		test.poll_ensure_to_execute_queue_is_empty().await;
1935	}
1936}