1use crate::{
24	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
25	execute::{self, PendingExecutionRequest},
26	metrics::Metrics,
27	prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
28};
29use always_assert::never;
30use futures::{
31	channel::{mpsc, oneshot},
32	Future, FutureExt, SinkExt, StreamExt,
33};
34#[cfg(feature = "test-utils")]
35use polkadot_node_core_pvf_common::ArtifactChecksum;
36use polkadot_node_core_pvf_common::{
37	error::{PrecheckResult, PrepareError},
38	prepare::PrepareSuccess,
39	pvf::PvfPrepData,
40};
41use polkadot_node_primitives::PoV;
42use polkadot_node_subsystem::{
43	messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
44};
45use polkadot_parachain_primitives::primitives::ValidationResult;
46use polkadot_primitives::{Hash, PersistedValidationData};
47use std::{
48	collections::HashMap,
49	path::PathBuf,
50	sync::Arc,
51	time::{Duration, SystemTime},
52};
53
54#[cfg(not(test))]
57pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
58#[cfg(test)]
59pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
60
61pub const NUM_PREPARE_RETRIES: u32 = 5;
63
64pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
66
67pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
69
70pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
72
73pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
75
76pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
78
79#[derive(Clone)]
81pub struct ValidationHost {
82	to_host_tx: mpsc::Sender<ToHost>,
83	pub security_status: SecurityStatus,
85}
86
87impl ValidationHost {
88	pub async fn precheck_pvf(
97		&mut self,
98		pvf: PvfPrepData,
99		result_tx: PrecheckResultSender,
100	) -> Result<(), String> {
101		self.to_host_tx
102			.send(ToHost::PrecheckPvf { pvf, result_tx })
103			.await
104			.map_err(|_| "the inner loop hung up".to_string())
105	}
106
107	pub async fn execute_pvf(
115		&mut self,
116		pvf: PvfPrepData,
117		exec_timeout: Duration,
118		pvd: Arc<PersistedValidationData>,
119		pov: Arc<PoV>,
120		priority: Priority,
121		exec_kind: PvfExecKind,
122		result_tx: ResultSender,
123	) -> Result<(), String> {
124		self.to_host_tx
125			.send(ToHost::ExecutePvf(ExecutePvfInputs {
126				pvf,
127				exec_timeout,
128				pvd,
129				pov,
130				priority,
131				exec_kind,
132				result_tx,
133			}))
134			.await
135			.map_err(|_| "the inner loop hung up".to_string())
136	}
137
138	pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
145		self.to_host_tx
146			.send(ToHost::HeadsUp { active_pvfs })
147			.await
148			.map_err(|_| "the inner loop hung up".to_string())
149	}
150
151	pub async fn update_active_leaves(
155		&mut self,
156		update: ActiveLeavesUpdate,
157		ancestors: Vec<Hash>,
158	) -> Result<(), String> {
159		self.to_host_tx
160			.send(ToHost::UpdateActiveLeaves { update, ancestors })
161			.await
162			.map_err(|_| "the inner loop hung up".to_string())
163	}
164
165	#[cfg(feature = "test-utils")]
169	pub async fn replace_artifact_checksum(
170		&mut self,
171		checksum: ArtifactChecksum,
172		new_checksum: ArtifactChecksum,
173	) -> Result<(), String> {
174		self.to_host_tx
175			.send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum })
176			.await
177			.map_err(|_| "the inner loop hung up".to_string())
178	}
179}
180
181enum ToHost {
182	PrecheckPvf {
183		pvf: PvfPrepData,
184		result_tx: PrecheckResultSender,
185	},
186	ExecutePvf(ExecutePvfInputs),
187	HeadsUp {
188		active_pvfs: Vec<PvfPrepData>,
189	},
190	UpdateActiveLeaves {
191		update: ActiveLeavesUpdate,
192		ancestors: Vec<Hash>,
193	},
194	#[cfg(feature = "test-utils")]
195	ReplaceArtifactChecksum {
196		checksum: ArtifactChecksum,
197		new_checksum: ArtifactChecksum,
198	},
199}
200
201struct ExecutePvfInputs {
202	pvf: PvfPrepData,
203	exec_timeout: Duration,
204	pvd: Arc<PersistedValidationData>,
205	pov: Arc<PoV>,
206	priority: Priority,
207	exec_kind: PvfExecKind,
208	result_tx: ResultSender,
209}
210
211#[derive(Debug)]
213pub struct Config {
214	pub cache_path: PathBuf,
216	pub node_version: Option<String>,
218	pub secure_validator_mode: bool,
220
221	pub prepare_worker_program_path: PathBuf,
223	pub prepare_worker_spawn_timeout: Duration,
225	pub prepare_workers_soft_max_num: usize,
228	pub prepare_workers_hard_max_num: usize,
230
231	pub execute_worker_program_path: PathBuf,
233	pub execute_worker_spawn_timeout: Duration,
235	pub execute_workers_max_num: usize,
237}
238
239impl Config {
240	pub fn new(
242		cache_path: PathBuf,
243		node_version: Option<String>,
244		secure_validator_mode: bool,
245		prepare_worker_program_path: PathBuf,
246		execute_worker_program_path: PathBuf,
247		execute_workers_max_num: usize,
248		prepare_workers_soft_max_num: usize,
249		prepare_workers_hard_max_num: usize,
250	) -> Self {
251		Self {
252			cache_path,
253			node_version,
254			secure_validator_mode,
255
256			prepare_worker_program_path,
257			prepare_worker_spawn_timeout: Duration::from_secs(3),
258			prepare_workers_soft_max_num,
259			prepare_workers_hard_max_num,
260
261			execute_worker_program_path,
262			execute_worker_spawn_timeout: Duration::from_secs(3),
263			execute_workers_max_num,
264		}
265	}
266}
267
268pub async fn start(
277	config: Config,
278	metrics: Metrics,
279) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
280	gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
281
282	let artifacts = Artifacts::new(&config.cache_path).await;
284
285	#[cfg(target_os = "linux")]
288	let security_status = match crate::security::check_security_status(&config).await {
289		Ok(ok) => ok,
290		Err(err) => return Err(SubsystemError::Context(err)),
291	};
292	#[cfg(not(target_os = "linux"))]
293	let security_status = if config.secure_validator_mode {
294		gum::error!(
295			target: LOG_TARGET,
296			"{}{}{}",
297			crate::SECURE_MODE_ERROR,
298			crate::SECURE_LINUX_NOTE,
299			crate::IGNORE_SECURE_MODE_TIP
300		);
301		return Err(SubsystemError::Context(
302			"could not enable Secure Validator Mode for non-Linux; check logs".into(),
303		));
304	} else {
305		gum::warn!(
306			target: LOG_TARGET,
307			"{}{}",
308			crate::SECURE_MODE_WARNING,
309			crate::SECURE_LINUX_NOTE,
310		);
311		SecurityStatus::default()
312	};
313
314	let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
315
316	let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
317
318	let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
319		metrics.clone(),
320		config.prepare_worker_program_path.clone(),
321		config.cache_path.clone(),
322		config.prepare_worker_spawn_timeout,
323		config.node_version.clone(),
324		security_status.clone(),
325	);
326
327	let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
328		metrics.clone(),
329		config.prepare_workers_soft_max_num,
330		config.prepare_workers_hard_max_num,
331		config.cache_path.clone(),
332		to_prepare_pool,
333		from_prepare_pool,
334	);
335
336	let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
337		metrics,
338		config.execute_worker_program_path.to_owned(),
339		config.cache_path.clone(),
340		config.execute_workers_max_num,
341		config.execute_worker_spawn_timeout,
342		config.node_version,
343		security_status,
344	);
345
346	let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
347	let run_sweeper = sweeper_task(to_sweeper_rx);
348
349	let run_host = async move {
350		run(Inner {
351			cleanup_pulse_interval: Duration::from_secs(3600),
352			cleanup_config: ArtifactsCleanupConfig::default(),
353			artifacts,
354			to_host_rx,
355			to_prepare_queue_tx,
356			from_prepare_queue_rx,
357			to_execute_queue_tx,
358			from_execute_queue_rx,
359			to_sweeper_tx,
360			awaiting_prepare: AwaitingPrepare::default(),
361		})
362		.await
363	};
364
365	let task = async move {
366		futures::select! {
368			_ = run_host.fuse() => {},
369			_ = run_prepare_queue.fuse() => {},
370			_ = run_prepare_pool.fuse() => {},
371			_ = run_execute_queue.fuse() => {},
372			_ = run_sweeper.fuse() => {},
373		};
374	};
375
376	Ok((validation_host, task))
377}
378
379#[derive(Default)]
382struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
383
384impl AwaitingPrepare {
385	fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
386		self.0.entry(artifact_id).or_default().push(pending_execution_request);
387	}
388
389	fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
390		self.0.remove(artifact_id).unwrap_or_default()
391	}
392}
393
394struct Inner {
395	cleanup_pulse_interval: Duration,
396	cleanup_config: ArtifactsCleanupConfig,
397	artifacts: Artifacts,
398
399	to_host_rx: mpsc::Receiver<ToHost>,
400
401	to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
402	from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
403
404	to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
405	from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
406
407	to_sweeper_tx: mpsc::Sender<PathBuf>,
408
409	awaiting_prepare: AwaitingPrepare,
410}
411
412#[derive(Debug)]
413struct Fatal;
414
415async fn run(
416	Inner {
417		cleanup_pulse_interval,
418		cleanup_config,
419		mut artifacts,
420		to_host_rx,
421		from_prepare_queue_rx,
422		mut to_prepare_queue_tx,
423		from_execute_queue_rx,
424		mut to_execute_queue_tx,
425		mut to_sweeper_tx,
426		mut awaiting_prepare,
427	}: Inner,
428) {
429	macro_rules! break_if_fatal {
430		($expr:expr) => {
431			match $expr {
432				Err(Fatal) => {
433					gum::error!(
434						target: LOG_TARGET,
435						"Fatal error occurred, terminating the host. Line: {}",
436						line!(),
437					);
438					break
439				},
440				Ok(v) => v,
441			}
442		};
443	}
444
445	let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
446	futures::pin_mut!(cleanup_pulse);
447
448	let mut to_host_rx = to_host_rx.fuse();
449	let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
450	let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
451
452	loop {
453		futures::select_biased! {
455			from_execute_queue_rx = from_execute_queue_rx.next() => {
456				let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
457				let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
458				break_if_fatal!(handle_artifact_removal(
459					&mut to_sweeper_tx,
460					&mut artifacts,
461					artifact,
462					reply_to,
463				).await);
464			},
465			() = cleanup_pulse.select_next_some() => {
466				break_if_fatal!(handle_cleanup_pulse(
472					&mut to_sweeper_tx,
473					&mut artifacts,
474					&cleanup_config,
475				).await);
476			},
477			to_host = to_host_rx.next() => {
478				let to_host = match to_host {
479					None => {
480						break;
483					},
484					Some(to_host) => to_host,
485				};
486
487				break_if_fatal!(handle_to_host(
490					&mut artifacts,
491					&mut to_prepare_queue_tx,
492					&mut to_execute_queue_tx,
493					&mut awaiting_prepare,
494					to_host,
495				)
496				.await);
497			},
498			from_prepare_queue = from_prepare_queue_rx.next() => {
499				let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
500
501				break_if_fatal!(handle_prepare_done(
511					&mut artifacts,
512					&mut to_execute_queue_tx,
513					&mut awaiting_prepare,
514					from_queue,
515				).await);
516			},
517		}
518	}
519}
520
521async fn handle_to_host(
522	artifacts: &mut Artifacts,
523	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
524	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
525	awaiting_prepare: &mut AwaitingPrepare,
526	to_host: ToHost,
527) -> Result<(), Fatal> {
528	match to_host {
529		ToHost::PrecheckPvf { pvf, result_tx } => {
530			handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
531		},
532		ToHost::ExecutePvf(inputs) => {
533			handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
534				.await?;
535		},
536		ToHost::HeadsUp { active_pvfs } =>
537			handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
538		ToHost::UpdateActiveLeaves { update, ancestors } =>
539			handle_update_active_leaves(execute_queue, update, ancestors).await?,
540		#[cfg(feature = "test-utils")]
541		ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => {
542			artifacts.replace_artifact_checksum(checksum, new_checksum);
543		},
544	}
545
546	Ok(())
547}
548
549async fn handle_precheck_pvf(
557	artifacts: &mut Artifacts,
558	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
559	pvf: PvfPrepData,
560	result_sender: PrecheckResultSender,
561) -> Result<(), Fatal> {
562	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
563
564	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
565		match state {
566			ArtifactState::Prepared { last_time_needed, .. } => {
567				*last_time_needed = SystemTime::now();
568				let _ = result_sender.send(Ok(()));
569			},
570			ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
571				waiting_for_response.push(result_sender),
572			ArtifactState::FailedToProcess { error, .. } => {
573				let _ = result_sender.send(PrecheckResult::Err(error.clone()));
575			},
576		}
577	} else {
578		artifacts.insert_preparing(artifact_id, vec![result_sender]);
579		send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
580			.await?;
581	}
582	Ok(())
583}
584
585async fn handle_execute_pvf(
598	artifacts: &mut Artifacts,
599	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
600	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
601	awaiting_prepare: &mut AwaitingPrepare,
602	inputs: ExecutePvfInputs,
603) -> Result<(), Fatal> {
604	let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, exec_kind, result_tx } = inputs;
605	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
606	let executor_params = (*pvf.executor_params()).clone();
607
608	if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
609		match state {
610			ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => {
611				let file_metadata = std::fs::metadata(path);
612
613				if file_metadata.is_ok() {
614					*last_time_needed = SystemTime::now();
615
616					send_execute(
618						execute_queue,
619						execute::ToQueue::Enqueue {
620							artifact: ArtifactPathId::new(artifact_id, path, *checksum),
621							pending_execution_request: PendingExecutionRequest {
622								exec_timeout,
623								pvd,
624								pov,
625								executor_params,
626								exec_kind,
627								result_tx,
628							},
629						},
630					)
631					.await?;
632				} else {
633					gum::warn!(
634						target: LOG_TARGET,
635						?pvf,
636						?artifact_id,
637						"handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
638					);
639
640					*state = ArtifactState::Preparing {
643						waiting_for_response: Vec::new(),
644						num_failures: 0,
645					};
646					enqueue_prepare_for_execute(
647						prepare_queue,
648						awaiting_prepare,
649						pvf,
650						priority,
651						artifact_id,
652						PendingExecutionRequest {
653							exec_timeout,
654							pvd,
655							pov,
656							executor_params,
657							exec_kind,
658							result_tx,
659						},
660					)
661					.await?;
662				}
663			},
664			ArtifactState::Preparing { .. } => {
665				awaiting_prepare.add(
666					artifact_id,
667					PendingExecutionRequest {
668						exec_timeout,
669						pvd,
670						pov,
671						executor_params,
672						result_tx,
673						exec_kind,
674					},
675				);
676			},
677			ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
678				if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
679					gum::warn!(
680						target: LOG_TARGET,
681						?pvf,
682						?artifact_id,
683						?last_time_failed,
684						%num_failures,
685						%error,
686						"handle_execute_pvf: Re-trying failed PVF preparation."
687					);
688
689					*state = ArtifactState::Preparing {
692						waiting_for_response: Vec::new(),
693						num_failures: *num_failures,
694					};
695					enqueue_prepare_for_execute(
696						prepare_queue,
697						awaiting_prepare,
698						pvf,
699						priority,
700						artifact_id,
701						PendingExecutionRequest {
702							exec_timeout,
703							pvd,
704							pov,
705							executor_params,
706							exec_kind,
707							result_tx,
708						},
709					)
710					.await?;
711				} else {
712					let _ = result_tx.send(Err(ValidationError::from(error.clone())));
713				}
714			},
715		}
716	} else {
717		artifacts.insert_preparing(artifact_id.clone(), Vec::new());
720		enqueue_prepare_for_execute(
721			prepare_queue,
722			awaiting_prepare,
723			pvf,
724			priority,
725			artifact_id,
726			PendingExecutionRequest {
727				exec_timeout,
728				pvd,
729				pov,
730				executor_params,
731				result_tx,
732				exec_kind,
733			},
734		)
735		.await?;
736	}
737
738	Ok(())
739}
740
741async fn handle_heads_up(
742	artifacts: &mut Artifacts,
743	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
744	active_pvfs: Vec<PvfPrepData>,
745) -> Result<(), Fatal> {
746	let now = SystemTime::now();
747
748	for active_pvf in active_pvfs {
749		let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
750		if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
751			match state {
752				ArtifactState::Prepared { last_time_needed, .. } => {
753					*last_time_needed = now;
754				},
755				ArtifactState::Preparing { .. } => {
756					},
758				ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
759					if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
760						gum::warn!(
761							target: LOG_TARGET,
762							?active_pvf,
763							?artifact_id,
764							?last_time_failed,
765							%num_failures,
766							%error,
767							"handle_heads_up: Re-trying failed PVF preparation."
768						);
769
770						*state = ArtifactState::Preparing {
773							waiting_for_response: vec![],
774							num_failures: *num_failures,
775						};
776						send_prepare(
777							prepare_queue,
778							prepare::ToQueue::Enqueue {
779								priority: Priority::Normal,
780								pvf: active_pvf,
781							},
782						)
783						.await?;
784					}
785				},
786			}
787		} else {
788			artifacts.insert_preparing(artifact_id.clone(), Vec::new());
790
791			send_prepare(
792				prepare_queue,
793				prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
794			)
795			.await?;
796		}
797	}
798
799	Ok(())
800}
801
802async fn handle_prepare_done(
803	artifacts: &mut Artifacts,
804	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
805	awaiting_prepare: &mut AwaitingPrepare,
806	from_queue: prepare::FromQueue,
807) -> Result<(), Fatal> {
808	let prepare::FromQueue { artifact_id, result } = from_queue;
809
810	let state = match artifacts.artifact_state_mut(&artifact_id) {
812		None => {
813			never!("an unknown artifact was prepared: {:?}", artifact_id);
819			return Ok(())
820		},
821		Some(ArtifactState::Prepared { .. }) => {
822			never!("the artifact is already prepared: {:?}", artifact_id);
828			return Ok(())
829		},
830		Some(ArtifactState::FailedToProcess { .. }) => {
831			never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
834			return Ok(())
835		},
836		Some(state @ ArtifactState::Preparing { .. }) => state,
837	};
838
839	let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
840		state
841	{
842		for result_sender in waiting_for_response.drain(..) {
843			let result = result.clone().map(|_| ());
844			let _ = result_sender.send(result);
845		}
846		num_failures
847	} else {
848		never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
849		return Ok(())
850	};
851
852	let pending_requests = awaiting_prepare.take(&artifact_id);
855	for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } in
856		pending_requests
857	{
858		if result_tx.is_canceled() {
859			continue
862		}
863
864		let (path, checksum) = match &result {
865			Ok(success) => (success.path.clone(), success.checksum),
866			Err(error) => {
867				let _ = result_tx.send(Err(ValidationError::from(error.clone())));
868				continue
869			},
870		};
871
872		send_execute(
873			execute_queue,
874			execute::ToQueue::Enqueue {
875				artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum),
876				pending_execution_request: PendingExecutionRequest {
877					exec_timeout,
878					pvd,
879					pov,
880					executor_params,
881					exec_kind,
882					result_tx,
883				},
884			},
885		)
886		.await?;
887	}
888
889	*state = match result {
890		Ok(PrepareSuccess { checksum, path, size, .. }) =>
891			ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size },
892		Err(error) => {
893			let last_time_failed = SystemTime::now();
894			let num_failures = *num_failures + 1;
895
896			gum::error!(
897				target: LOG_TARGET,
898				?artifact_id,
899				time_failed = ?last_time_failed,
900				%num_failures,
901				"artifact preparation failed: {}",
902				error
903			);
904			ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
905		},
906	};
907
908	Ok(())
909}
910
911async fn handle_update_active_leaves(
912	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
913	update: ActiveLeavesUpdate,
914	ancestors: Vec<Hash>,
915) -> Result<(), Fatal> {
916	send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
917}
918
919async fn send_prepare(
920	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
921	to_queue: prepare::ToQueue,
922) -> Result<(), Fatal> {
923	prepare_queue.send(to_queue).await.map_err(|_| Fatal)
924}
925
926async fn send_execute(
927	execute_queue: &mut mpsc::Sender<execute::ToQueue>,
928	to_queue: execute::ToQueue,
929) -> Result<(), Fatal> {
930	execute_queue.send(to_queue).await.map_err(|_| Fatal)
931}
932
933async fn enqueue_prepare_for_execute(
936	prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
937	awaiting_prepare: &mut AwaitingPrepare,
938	pvf: PvfPrepData,
939	priority: Priority,
940	artifact_id: ArtifactId,
941	pending_execution_request: PendingExecutionRequest,
942) -> Result<(), Fatal> {
943	send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
944
945	awaiting_prepare.add(artifact_id, pending_execution_request);
947
948	Ok(())
949}
950
951async fn handle_cleanup_pulse(
952	sweeper_tx: &mut mpsc::Sender<PathBuf>,
953	artifacts: &mut Artifacts,
954	cleanup_config: &ArtifactsCleanupConfig,
955) -> Result<(), Fatal> {
956	let to_remove = artifacts.prune(cleanup_config);
957	gum::debug!(
958		target: LOG_TARGET,
959		"PVF pruning: {} artifacts reached their end of life",
960		to_remove.len(),
961	);
962	for (artifact_id, path) in to_remove {
963		gum::debug!(
964			target: LOG_TARGET,
965			validation_code_hash = ?artifact_id.code_hash,
966			"pruning artifact",
967		);
968		sweeper_tx.send(path).await.map_err(|_| Fatal)?;
969	}
970
971	Ok(())
972}
973
974async fn handle_artifact_removal(
975	sweeper_tx: &mut mpsc::Sender<PathBuf>,
976	artifacts: &mut Artifacts,
977	artifact_id: ArtifactId,
978	reply_to: oneshot::Sender<()>,
979) -> Result<(), Fatal> {
980	let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
981		artifact
982	} else {
983		return Ok(());
988	};
989	reply_to
990		.send(())
991		.expect("the execute queue waits for the artifact remove confirmation; qed");
992	gum::debug!(
997		target: LOG_TARGET,
998		validation_code_hash = ?artifact_id.code_hash,
999		"PVF pruning: pruning artifact by request from the execute queue",
1000	);
1001	sweeper_tx.send(path).await.map_err(|_| Fatal)?;
1002	Ok(())
1003}
1004
1005async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
1007	loop {
1008		match sweeper_rx.next().await {
1009			None => break,
1010			Some(condemned) => {
1011				let result = tokio::fs::remove_file(&condemned).await;
1012				gum::trace!(
1013					target: LOG_TARGET,
1014					?result,
1015					"Swept the artifact file {}",
1016					condemned.display(),
1017				);
1018			},
1019		}
1020	}
1021}
1022
1023fn can_retry_prepare_after_failure(
1025	last_time_failed: SystemTime,
1026	num_failures: u32,
1027	error: &PrepareError,
1028) -> bool {
1029	if error.is_deterministic() {
1030		return false
1032	}
1033
1034	SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
1037		num_failures <= NUM_PREPARE_RETRIES
1038}
1039
1040fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
1042	futures::stream::unfold(interval, {
1043		|interval| async move {
1044			futures_timer::Delay::new(interval).await;
1045			Some(((), interval))
1046		}
1047	})
1048	.map(|_| ())
1049}
1050
1051#[cfg(test)]
1052pub(crate) mod tests {
1053	use super::*;
1054	use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
1055	use assert_matches::assert_matches;
1056	use futures::future::BoxFuture;
1057	use polkadot_node_primitives::BlockData;
1058	use sp_core::H256;
1059
1060	const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
1061	pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
1062
1063	#[tokio::test]
1064	async fn pulse_test() {
1065		let pulse = pulse_every(Duration::from_millis(100));
1066		futures::pin_mut!(pulse);
1067
1068		for _ in 0..5 {
1069			let start = std::time::Instant::now();
1070			let _ = pulse.next().await.unwrap();
1071
1072			let el = start.elapsed().as_millis();
1073			assert!(el > 50 && el < 150, "pulse duration: {}", el);
1074		}
1075	}
1076
1077	struct Builder {
1078		cleanup_pulse_interval: Duration,
1079		cleanup_config: ArtifactsCleanupConfig,
1080		artifacts: Artifacts,
1081	}
1082
1083	impl Builder {
1084		fn default() -> Self {
1085			Self {
1086				cleanup_pulse_interval: Duration::from_secs(3600),
1088				cleanup_config: ArtifactsCleanupConfig::default(),
1089				artifacts: Artifacts::empty(),
1090			}
1091		}
1092
1093		fn build(self) -> Test {
1094			Test::new(self)
1095		}
1096	}
1097
1098	struct Test {
1099		to_host_tx: Option<mpsc::Sender<ToHost>>,
1100
1101		to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
1102		from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
1103		to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
1104		#[allow(unused)]
1105		from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
1106		to_sweeper_rx: mpsc::Receiver<PathBuf>,
1107
1108		run: BoxFuture<'static, ()>,
1109	}
1110
1111	impl Test {
1112		fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
1113			let (to_host_tx, to_host_rx) = mpsc::channel(10);
1114			let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
1115			let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
1116			let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
1117			let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
1118			let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
1119
1120			let run = run(Inner {
1121				cleanup_pulse_interval,
1122				cleanup_config,
1123				artifacts,
1124				to_host_rx,
1125				to_prepare_queue_tx,
1126				from_prepare_queue_rx,
1127				to_execute_queue_tx,
1128				from_execute_queue_rx,
1129				to_sweeper_tx,
1130				awaiting_prepare: AwaitingPrepare::default(),
1131			})
1132			.boxed();
1133
1134			Self {
1135				to_host_tx: Some(to_host_tx),
1136				to_prepare_queue_rx,
1137				from_prepare_queue_tx,
1138				to_execute_queue_rx,
1139				from_execute_queue_tx,
1140				to_sweeper_rx,
1141				run,
1142			}
1143		}
1144
1145		fn host_handle(&mut self) -> ValidationHost {
1146			let to_host_tx = self.to_host_tx.take().unwrap();
1147			let security_status = Default::default();
1148			ValidationHost { to_host_tx, security_status }
1149		}
1150
1151		async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
1152		where
1153			T: Send,
1154		{
1155			run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
1156		}
1157
1158		async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
1159			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1160			run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
1161				.await
1162		}
1163
1164		async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
1165			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1166			run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
1167				.await
1168		}
1169
1170		async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
1171			use futures_timer::Delay;
1172
1173			let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1174			run_until(
1175				&mut self.run,
1176				async {
1177					futures::select! {
1178						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1179						_ = to_prepare_queue_rx.next().fuse() => {
1180							panic!("the prepare queue is supposed to be empty")
1181						}
1182					}
1183				}
1184				.boxed(),
1185			)
1186			.await
1187		}
1188
1189		async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
1190			use futures_timer::Delay;
1191
1192			let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1193			run_until(
1194				&mut self.run,
1195				async {
1196					futures::select! {
1197						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1198						_ = to_execute_queue_rx.next().fuse() => {
1199							panic!("the execute queue is supposed to be empty")
1200						}
1201					}
1202				}
1203				.boxed(),
1204			)
1205			.await
1206		}
1207
1208		async fn poll_ensure_to_sweeper_is_empty(&mut self) {
1209			use futures_timer::Delay;
1210
1211			let to_sweeper_rx = &mut self.to_sweeper_rx;
1212			run_until(
1213				&mut self.run,
1214				async {
1215					futures::select! {
1216						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
1217						msg = to_sweeper_rx.next().fuse() => {
1218							panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
1219						}
1220					}
1221				}
1222				.boxed(),
1223			)
1224			.await
1225		}
1226	}
1227
1228	async fn run_until<R>(
1229		task: &mut (impl Future<Output = ()> + Unpin),
1230		mut fut: (impl Future<Output = R> + Unpin),
1231	) -> R {
1232		use std::task::Poll;
1233
1234		let start = std::time::Instant::now();
1235		let fut = &mut fut;
1236		loop {
1237			if start.elapsed() > std::time::Duration::from_secs(2) {
1238				panic!("timeout");
1241			}
1242
1243			if let Poll::Ready(r) = futures::poll!(&mut *fut) {
1244				break r
1245			}
1246
1247			if futures::poll!(&mut *task).is_ready() {
1248				panic!()
1249			}
1250		}
1251	}
1252
1253	#[tokio::test]
1254	async fn shutdown_on_handle_drop() {
1255		let test = Builder::default().build();
1256
1257		let join_handle = tokio::task::spawn(test.run);
1258
1259		drop(test.to_host_tx);
1262		join_handle.await.unwrap();
1263	}
1264
1265	#[tokio::test]
1266	async fn pruning() {
1267		let mock_now = SystemTime::now() - Duration::from_millis(1000);
1268		let tempdir = tempfile::tempdir().unwrap();
1269		let cache_path = tempdir.path();
1270
1271		let mut builder = Builder::default();
1272		builder.cleanup_pulse_interval = Duration::from_millis(100);
1273		builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
1274		let path1 = generate_artifact_path(cache_path);
1275		let path2 = generate_artifact_path(cache_path);
1276		builder.artifacts.insert_prepared(
1277			artifact_id(1),
1278			path1.clone(),
1279			Default::default(),
1280			mock_now,
1281			1024,
1282		);
1283		builder.artifacts.insert_prepared(
1284			artifact_id(2),
1285			path2.clone(),
1286			Default::default(),
1287			mock_now,
1288			1024,
1289		);
1290		let mut test = builder.build();
1291		let mut host = test.host_handle();
1292
1293		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1294
1295		let to_sweeper_rx = &mut test.to_sweeper_rx;
1296		run_until(
1297			&mut test.run,
1298			async {
1299				assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
1300			}
1301			.boxed(),
1302		)
1303		.await;
1304
1305		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1308		test.poll_ensure_to_sweeper_is_empty().await;
1309	}
1310
1311	#[tokio::test]
1312	async fn execute_pvf_requests() {
1313		let mut test = Builder::default().build();
1314		let mut host = test.host_handle();
1315		let pvd = Arc::new(PersistedValidationData {
1316			parent_head: Default::default(),
1317			relay_parent_number: 1u32,
1318			relay_parent_storage_root: H256::default(),
1319			max_pov_size: 4096 * 1024,
1320		});
1321		let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
1322		let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
1323
1324		let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
1325		host.execute_pvf(
1326			PvfPrepData::from_discriminator(1),
1327			TEST_EXECUTION_TIMEOUT,
1328			pvd.clone(),
1329			pov1.clone(),
1330			Priority::Normal,
1331			PvfExecKind::Backing(H256::default()),
1332			result_tx,
1333		)
1334		.await
1335		.unwrap();
1336
1337		let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
1338		host.execute_pvf(
1339			PvfPrepData::from_discriminator(1),
1340			TEST_EXECUTION_TIMEOUT,
1341			pvd.clone(),
1342			pov1,
1343			Priority::Critical,
1344			PvfExecKind::Backing(H256::default()),
1345			result_tx,
1346		)
1347		.await
1348		.unwrap();
1349
1350		let (result_tx, result_rx_pvf_2) = oneshot::channel();
1351		host.execute_pvf(
1352			PvfPrepData::from_discriminator(2),
1353			TEST_EXECUTION_TIMEOUT,
1354			pvd,
1355			pov2,
1356			Priority::Normal,
1357			PvfExecKind::Backing(H256::default()),
1358			result_tx,
1359		)
1360		.await
1361		.unwrap();
1362
1363		assert_matches!(
1364			test.poll_and_recv_to_prepare_queue().await,
1365			prepare::ToQueue::Enqueue { .. }
1366		);
1367		assert_matches!(
1368			test.poll_and_recv_to_prepare_queue().await,
1369			prepare::ToQueue::Enqueue { .. }
1370		);
1371
1372		test.from_prepare_queue_tx
1373			.send(prepare::FromQueue {
1374				artifact_id: artifact_id(1),
1375				result: Ok(PrepareSuccess::default()),
1376			})
1377			.await
1378			.unwrap();
1379		let result_tx_pvf_1_1 = assert_matches!(
1380			test.poll_and_recv_to_execute_queue().await,
1381			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1382		);
1383		let result_tx_pvf_1_2 = assert_matches!(
1384			test.poll_and_recv_to_execute_queue().await,
1385			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1386		);
1387
1388		test.from_prepare_queue_tx
1389			.send(prepare::FromQueue {
1390				artifact_id: artifact_id(2),
1391				result: Ok(PrepareSuccess::default()),
1392			})
1393			.await
1394			.unwrap();
1395		let result_tx_pvf_2 = assert_matches!(
1396			test.poll_and_recv_to_execute_queue().await,
1397			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1398		);
1399
1400		result_tx_pvf_1_1
1401			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1402			.unwrap();
1403		assert_matches!(
1404			result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
1405			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1406		);
1407
1408		result_tx_pvf_1_2
1409			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1410			.unwrap();
1411		assert_matches!(
1412			result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
1413			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1414		);
1415
1416		result_tx_pvf_2
1417			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1418			.unwrap();
1419		assert_matches!(
1420			result_rx_pvf_2.now_or_never().unwrap().unwrap(),
1421			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1422		);
1423	}
1424
1425	#[tokio::test]
1426	async fn precheck_pvf() {
1427		let mut test = Builder::default().build();
1428		let mut host = test.host_handle();
1429
1430		let (result_tx, result_rx) = oneshot::channel();
1432		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1433			.await
1434			.unwrap();
1435
1436		assert_matches!(
1438			test.poll_and_recv_to_prepare_queue().await,
1439			prepare::ToQueue::Enqueue { .. }
1440		);
1441		test.from_prepare_queue_tx
1443			.send(prepare::FromQueue {
1444				artifact_id: artifact_id(1),
1445				result: Ok(PrepareSuccess::default()),
1446			})
1447			.await
1448			.unwrap();
1449		test.poll_ensure_to_execute_queue_is_empty().await;
1451		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1453
1454		let mut precheck_receivers = Vec::new();
1456		for _ in 0..3 {
1457			let (result_tx, result_rx) = oneshot::channel();
1458			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1459				.await
1460				.unwrap();
1461			precheck_receivers.push(result_rx);
1462		}
1463		assert_matches!(
1465			test.poll_and_recv_to_prepare_queue().await,
1466			prepare::ToQueue::Enqueue { .. }
1467		);
1468		test.from_prepare_queue_tx
1469			.send(prepare::FromQueue {
1470				artifact_id: artifact_id(2),
1471				result: Err(PrepareError::TimedOut),
1472			})
1473			.await
1474			.unwrap();
1475		test.poll_ensure_to_execute_queue_is_empty().await;
1476		for result_rx in precheck_receivers {
1477			assert_matches!(
1478				result_rx.now_or_never().unwrap().unwrap(),
1479				Err(PrepareError::TimedOut)
1480			);
1481		}
1482	}
1483
1484	#[tokio::test]
1485	async fn test_prepare_done() {
1486		let mut test = Builder::default().build();
1487		let mut host = test.host_handle();
1488		let pvd = Arc::new(PersistedValidationData {
1489			parent_head: Default::default(),
1490			relay_parent_number: 1u32,
1491			relay_parent_storage_root: H256::default(),
1492			max_pov_size: 4096 * 1024,
1493		});
1494		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1495
1496		let (result_tx, result_rx_execute) = oneshot::channel();
1501		host.execute_pvf(
1502			PvfPrepData::from_discriminator(1),
1503			TEST_EXECUTION_TIMEOUT,
1504			pvd.clone(),
1505			pov.clone(),
1506			Priority::Critical,
1507			PvfExecKind::Backing(H256::default()),
1508			result_tx,
1509		)
1510		.await
1511		.unwrap();
1512
1513		assert_matches!(
1514			test.poll_and_recv_to_prepare_queue().await,
1515			prepare::ToQueue::Enqueue { .. }
1516		);
1517
1518		let (result_tx, result_rx) = oneshot::channel();
1519		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1520			.await
1521			.unwrap();
1522
1523		test.from_prepare_queue_tx
1526			.send(prepare::FromQueue {
1527				artifact_id: artifact_id(1),
1528				result: Err(PrepareError::TimedOut),
1529			})
1530			.await
1531			.unwrap();
1532		test.poll_ensure_to_execute_queue_is_empty().await;
1533		assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
1534		assert_matches!(
1535			result_rx_execute.now_or_never().unwrap().unwrap(),
1536			Err(ValidationError::Internal(_))
1537		);
1538
1539		let mut precheck_receivers = Vec::new();
1541		for _ in 0..3 {
1542			let (result_tx, result_rx) = oneshot::channel();
1543			host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1544				.await
1545				.unwrap();
1546			precheck_receivers.push(result_rx);
1547		}
1548
1549		let (result_tx, _result_rx_execute) = oneshot::channel();
1550		host.execute_pvf(
1551			PvfPrepData::from_discriminator(2),
1552			TEST_EXECUTION_TIMEOUT,
1553			pvd,
1554			pov,
1555			Priority::Critical,
1556			PvfExecKind::Backing(H256::default()),
1557			result_tx,
1558		)
1559		.await
1560		.unwrap();
1561		assert_matches!(
1563			test.poll_and_recv_to_prepare_queue().await,
1564			prepare::ToQueue::Enqueue { .. }
1565		);
1566		test.from_prepare_queue_tx
1567			.send(prepare::FromQueue {
1568				artifact_id: artifact_id(2),
1569				result: Ok(PrepareSuccess::default()),
1570			})
1571			.await
1572			.unwrap();
1573		assert_matches!(
1576			test.poll_and_recv_to_execute_queue().await,
1577			execute::ToQueue::Enqueue { .. }
1578		);
1579		for result_rx in precheck_receivers {
1580			assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1581		}
1582	}
1583
1584	#[tokio::test]
1587	async fn test_precheck_prepare_no_retry() {
1588		let mut test = Builder::default().build();
1589		let mut host = test.host_handle();
1590
1591		let (result_tx, result_rx) = oneshot::channel();
1593		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1594			.await
1595			.unwrap();
1596
1597		assert_matches!(
1599			test.poll_and_recv_to_prepare_queue().await,
1600			prepare::ToQueue::Enqueue { .. }
1601		);
1602		test.from_prepare_queue_tx
1604			.send(prepare::FromQueue {
1605				artifact_id: artifact_id(1),
1606				result: Err(PrepareError::TimedOut),
1607			})
1608			.await
1609			.unwrap();
1610
1611		let result = test.poll_and_recv_result(result_rx).await;
1613		assert_matches!(result, Err(PrepareError::TimedOut));
1614
1615		let (result_tx_2, result_rx_2) = oneshot::channel();
1617		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
1618			.await
1619			.unwrap();
1620
1621		test.poll_ensure_to_prepare_queue_is_empty().await;
1623
1624		let result = test.poll_and_recv_result(result_rx_2).await;
1626		assert_matches!(result, Err(PrepareError::TimedOut));
1627
1628		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1630
1631		let (result_tx_3, result_rx_3) = oneshot::channel();
1633		host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
1634			.await
1635			.unwrap();
1636
1637		test.poll_ensure_to_prepare_queue_is_empty().await;
1639
1640		let result = test.poll_and_recv_result(result_rx_3).await;
1642		assert_matches!(result, Err(PrepareError::TimedOut));
1643	}
1644
1645	#[tokio::test]
1648	async fn test_execute_prepare_retry() {
1649		let mut test = Builder::default().build();
1650		let mut host = test.host_handle();
1651		let pvd = Arc::new(PersistedValidationData {
1652			parent_head: Default::default(),
1653			relay_parent_number: 1u32,
1654			relay_parent_storage_root: H256::default(),
1655			max_pov_size: 4096 * 1024,
1656		});
1657		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1658
1659		let (result_tx, result_rx) = oneshot::channel();
1661		host.execute_pvf(
1662			PvfPrepData::from_discriminator(1),
1663			TEST_EXECUTION_TIMEOUT,
1664			pvd.clone(),
1665			pov.clone(),
1666			Priority::Critical,
1667			PvfExecKind::Backing(H256::default()),
1668			result_tx,
1669		)
1670		.await
1671		.unwrap();
1672
1673		assert_matches!(
1675			test.poll_and_recv_to_prepare_queue().await,
1676			prepare::ToQueue::Enqueue { .. }
1677		);
1678		test.from_prepare_queue_tx
1680			.send(prepare::FromQueue {
1681				artifact_id: artifact_id(1),
1682				result: Err(PrepareError::TimedOut),
1683			})
1684			.await
1685			.unwrap();
1686
1687		let result = test.poll_and_recv_result(result_rx).await;
1689		assert_matches!(result, Err(ValidationError::Internal(_)));
1690
1691		let (result_tx_2, result_rx_2) = oneshot::channel();
1693		host.execute_pvf(
1694			PvfPrepData::from_discriminator(1),
1695			TEST_EXECUTION_TIMEOUT,
1696			pvd.clone(),
1697			pov.clone(),
1698			Priority::Critical,
1699			PvfExecKind::Backing(H256::default()),
1700			result_tx_2,
1701		)
1702		.await
1703		.unwrap();
1704
1705		test.poll_ensure_to_prepare_queue_is_empty().await;
1707
1708		let result = test.poll_and_recv_result(result_rx_2).await;
1710		assert_matches!(result, Err(ValidationError::Internal(_)));
1711
1712		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1714
1715		let (result_tx_3, result_rx_3) = oneshot::channel();
1717		host.execute_pvf(
1718			PvfPrepData::from_discriminator(1),
1719			TEST_EXECUTION_TIMEOUT,
1720			pvd.clone(),
1721			pov.clone(),
1722			Priority::Critical,
1723			PvfExecKind::Backing(H256::default()),
1724			result_tx_3,
1725		)
1726		.await
1727		.unwrap();
1728
1729		assert_matches!(
1731			test.poll_and_recv_to_prepare_queue().await,
1732			prepare::ToQueue::Enqueue { .. }
1733		);
1734
1735		test.from_prepare_queue_tx
1736			.send(prepare::FromQueue {
1737				artifact_id: artifact_id(1),
1738				result: Ok(PrepareSuccess::default()),
1739			})
1740			.await
1741			.unwrap();
1742
1743		let result_tx_3 = assert_matches!(
1745			test.poll_and_recv_to_execute_queue().await,
1746			execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1747		);
1748
1749		result_tx_3
1752			.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1753			.unwrap();
1754		assert_matches!(
1755			result_rx_3.now_or_never().unwrap().unwrap(),
1756			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1757		);
1758	}
1759
1760	#[tokio::test]
1763	async fn test_execute_prepare_no_retry() {
1764		let mut test = Builder::default().build();
1765		let mut host = test.host_handle();
1766		let pvd = Arc::new(PersistedValidationData {
1767			parent_head: Default::default(),
1768			relay_parent_number: 1u32,
1769			relay_parent_storage_root: H256::default(),
1770			max_pov_size: 4096 * 1024,
1771		});
1772		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1773
1774		let (result_tx, result_rx) = oneshot::channel();
1776		host.execute_pvf(
1777			PvfPrepData::from_discriminator(1),
1778			TEST_EXECUTION_TIMEOUT,
1779			pvd.clone(),
1780			pov.clone(),
1781			Priority::Critical,
1782			PvfExecKind::Backing(H256::default()),
1783			result_tx,
1784		)
1785		.await
1786		.unwrap();
1787
1788		assert_matches!(
1790			test.poll_and_recv_to_prepare_queue().await,
1791			prepare::ToQueue::Enqueue { .. }
1792		);
1793		test.from_prepare_queue_tx
1795			.send(prepare::FromQueue {
1796				artifact_id: artifact_id(1),
1797				result: Err(PrepareError::Prevalidation("reproducible error".into())),
1798			})
1799			.await
1800			.unwrap();
1801
1802		let result = test.poll_and_recv_result(result_rx).await;
1804		assert_matches!(result, Err(ValidationError::Preparation(_)));
1805
1806		let (result_tx_2, result_rx_2) = oneshot::channel();
1808		host.execute_pvf(
1809			PvfPrepData::from_discriminator(1),
1810			TEST_EXECUTION_TIMEOUT,
1811			pvd.clone(),
1812			pov.clone(),
1813			Priority::Critical,
1814			PvfExecKind::Backing(H256::default()),
1815			result_tx_2,
1816		)
1817		.await
1818		.unwrap();
1819
1820		test.poll_ensure_to_prepare_queue_is_empty().await;
1822
1823		let result = test.poll_and_recv_result(result_rx_2).await;
1825		assert_matches!(result, Err(ValidationError::Preparation(_)));
1826
1827		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1829
1830		let (result_tx_3, result_rx_3) = oneshot::channel();
1832		host.execute_pvf(
1833			PvfPrepData::from_discriminator(1),
1834			TEST_EXECUTION_TIMEOUT,
1835			pvd.clone(),
1836			pov.clone(),
1837			Priority::Critical,
1838			PvfExecKind::Backing(H256::default()),
1839			result_tx_3,
1840		)
1841		.await
1842		.unwrap();
1843
1844		test.poll_ensure_to_prepare_queue_is_empty().await;
1846
1847		let result = test.poll_and_recv_result(result_rx_3).await;
1849		assert_matches!(result, Err(ValidationError::Preparation(_)));
1850	}
1851
1852	#[tokio::test]
1854	async fn test_heads_up_prepare_retry() {
1855		let mut test = Builder::default().build();
1856		let mut host = test.host_handle();
1857
1858		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1860
1861		assert_matches!(
1863			test.poll_and_recv_to_prepare_queue().await,
1864			prepare::ToQueue::Enqueue { .. }
1865		);
1866		test.from_prepare_queue_tx
1868			.send(prepare::FromQueue {
1869				artifact_id: artifact_id(1),
1870				result: Err(PrepareError::TimedOut),
1871			})
1872			.await
1873			.unwrap();
1874
1875		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1877
1878		test.poll_ensure_to_prepare_queue_is_empty().await;
1880
1881		futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1883
1884		host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1886
1887		assert_matches!(
1889			test.poll_and_recv_to_prepare_queue().await,
1890			prepare::ToQueue::Enqueue { .. }
1891		);
1892	}
1893
1894	#[tokio::test]
1895	async fn cancellation() {
1896		let mut test = Builder::default().build();
1897		let mut host = test.host_handle();
1898		let pvd = Arc::new(PersistedValidationData {
1899			parent_head: Default::default(),
1900			relay_parent_number: 1u32,
1901			relay_parent_storage_root: H256::default(),
1902			max_pov_size: 4096 * 1024,
1903		});
1904		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1905
1906		let (result_tx, result_rx) = oneshot::channel();
1907		host.execute_pvf(
1908			PvfPrepData::from_discriminator(1),
1909			TEST_EXECUTION_TIMEOUT,
1910			pvd,
1911			pov,
1912			Priority::Normal,
1913			PvfExecKind::Backing(H256::default()),
1914			result_tx,
1915		)
1916		.await
1917		.unwrap();
1918
1919		assert_matches!(
1920			test.poll_and_recv_to_prepare_queue().await,
1921			prepare::ToQueue::Enqueue { .. }
1922		);
1923
1924		test.from_prepare_queue_tx
1925			.send(prepare::FromQueue {
1926				artifact_id: artifact_id(1),
1927				result: Ok(PrepareSuccess::default()),
1928			})
1929			.await
1930			.unwrap();
1931
1932		drop(result_rx);
1933
1934		test.poll_ensure_to_execute_queue_is_empty().await;
1935	}
1936}