1use crate::{
24 artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
25 execute::{self, PendingExecutionRequest},
26 metrics::Metrics,
27 prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
28};
29use always_assert::never;
30use futures::{
31 channel::{mpsc, oneshot},
32 Future, FutureExt, SinkExt, StreamExt,
33};
34#[cfg(feature = "test-utils")]
35use polkadot_node_core_pvf_common::ArtifactChecksum;
36use polkadot_node_core_pvf_common::{
37 error::{PrecheckResult, PrepareError},
38 prepare::PrepareSuccess,
39 pvf::PvfPrepData,
40};
41use polkadot_node_subsystem::{
42 messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
43};
44use polkadot_parachain_primitives::primitives::ValidationResult;
45use polkadot_primitives::Hash;
46use std::{
47 collections::HashMap,
48 path::PathBuf,
49 time::{Duration, SystemTime},
50};
51
52#[cfg(not(test))]
55pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
56#[cfg(test)]
57pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
58
59pub const NUM_PREPARE_RETRIES: u32 = 5;
61
62pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
64
65pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
67
68pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
70
71pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
73
74pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
76
77#[derive(Clone)]
79pub struct ValidationHost {
80 to_host_tx: mpsc::Sender<ToHost>,
81 pub security_status: SecurityStatus,
83}
84
85impl ValidationHost {
86 pub async fn precheck_pvf(
95 &mut self,
96 pvf: PvfPrepData,
97 result_tx: PrecheckResultSender,
98 ) -> Result<(), String> {
99 self.to_host_tx
100 .send(ToHost::PrecheckPvf { pvf, result_tx })
101 .await
102 .map_err(|_| "the inner loop hung up".to_string())
103 }
104
105 pub async fn execute_pvf(
113 &mut self,
114 pvf: PvfPrepData,
115 validation_context: polkadot_node_core_pvf_common::execute::ValidationContext,
116 priority: Priority,
117 exec_kind: PvfExecKind,
118 result_tx: ResultSender,
119 ) -> Result<(), String> {
120 self.to_host_tx
121 .send(ToHost::ExecutePvf(ExecutePvfInputs {
122 pvf,
123 validation_context,
124 priority,
125 exec_kind,
126 result_tx,
127 }))
128 .await
129 .map_err(|_| "the inner loop hung up".to_string())
130 }
131
132 pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
139 self.to_host_tx
140 .send(ToHost::HeadsUp { active_pvfs })
141 .await
142 .map_err(|_| "the inner loop hung up".to_string())
143 }
144
145 pub async fn update_active_leaves(
149 &mut self,
150 update: ActiveLeavesUpdate,
151 ancestors: Vec<Hash>,
152 ) -> Result<(), String> {
153 self.to_host_tx
154 .send(ToHost::UpdateActiveLeaves { update, ancestors })
155 .await
156 .map_err(|_| "the inner loop hung up".to_string())
157 }
158
159 #[cfg(feature = "test-utils")]
163 pub async fn replace_artifact_checksum(
164 &mut self,
165 checksum: ArtifactChecksum,
166 new_checksum: ArtifactChecksum,
167 ) -> Result<(), String> {
168 self.to_host_tx
169 .send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum })
170 .await
171 .map_err(|_| "the inner loop hung up".to_string())
172 }
173}
174
175enum ToHost {
176 PrecheckPvf {
177 pvf: PvfPrepData,
178 result_tx: PrecheckResultSender,
179 },
180 ExecutePvf(ExecutePvfInputs),
181 HeadsUp {
182 active_pvfs: Vec<PvfPrepData>,
183 },
184 UpdateActiveLeaves {
185 update: ActiveLeavesUpdate,
186 ancestors: Vec<Hash>,
187 },
188 #[cfg(feature = "test-utils")]
189 ReplaceArtifactChecksum {
190 checksum: ArtifactChecksum,
191 new_checksum: ArtifactChecksum,
192 },
193}
194
195struct ExecutePvfInputs {
196 pvf: PvfPrepData,
197 validation_context: polkadot_node_core_pvf_common::execute::ValidationContext,
198 priority: Priority,
199 exec_kind: PvfExecKind,
200 result_tx: ResultSender,
201}
202
203#[derive(Debug)]
205pub struct Config {
206 pub cache_path: PathBuf,
208 pub node_version: Option<String>,
210 pub secure_validator_mode: bool,
212
213 pub prepare_worker_program_path: PathBuf,
215 pub prepare_worker_spawn_timeout: Duration,
217 pub prepare_workers_soft_max_num: usize,
220 pub prepare_workers_hard_max_num: usize,
222
223 pub execute_worker_program_path: PathBuf,
225 pub execute_worker_spawn_timeout: Duration,
227 pub execute_workers_max_num: usize,
229}
230
231impl Config {
232 pub fn new(
234 cache_path: PathBuf,
235 node_version: Option<String>,
236 secure_validator_mode: bool,
237 prepare_worker_program_path: PathBuf,
238 execute_worker_program_path: PathBuf,
239 execute_workers_max_num: usize,
240 prepare_workers_soft_max_num: usize,
241 prepare_workers_hard_max_num: usize,
242 ) -> Self {
243 Self {
244 cache_path,
245 node_version,
246 secure_validator_mode,
247
248 prepare_worker_program_path,
249 prepare_worker_spawn_timeout: Duration::from_secs(3),
250 prepare_workers_soft_max_num,
251 prepare_workers_hard_max_num,
252
253 execute_worker_program_path,
254 execute_worker_spawn_timeout: Duration::from_secs(3),
255 execute_workers_max_num,
256 }
257 }
258}
259
260pub async fn start(
269 config: Config,
270 metrics: Metrics,
271) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
272 gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
273
274 let artifacts = Artifacts::new(&config.cache_path).await;
276
277 #[cfg(target_os = "linux")]
280 let security_status = match crate::security::check_security_status(&config).await {
281 Ok(ok) => ok,
282 Err(err) => return Err(SubsystemError::Context(err)),
283 };
284 #[cfg(not(target_os = "linux"))]
285 let security_status = if config.secure_validator_mode {
286 gum::error!(
287 target: LOG_TARGET,
288 "{}{}{}",
289 crate::SECURE_MODE_ERROR,
290 crate::SECURE_LINUX_NOTE,
291 crate::IGNORE_SECURE_MODE_TIP
292 );
293 return Err(SubsystemError::Context(
294 "could not enable Secure Validator Mode for non-Linux; check logs".into(),
295 ));
296 } else {
297 gum::warn!(
298 target: LOG_TARGET,
299 "{}{}",
300 crate::SECURE_MODE_WARNING,
301 crate::SECURE_LINUX_NOTE,
302 );
303 SecurityStatus::default()
304 };
305
306 let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
307
308 let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
309
310 let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
311 metrics.clone(),
312 config.prepare_worker_program_path.clone(),
313 config.cache_path.clone(),
314 config.prepare_worker_spawn_timeout,
315 config.node_version.clone(),
316 security_status.clone(),
317 );
318
319 let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
320 metrics.clone(),
321 config.prepare_workers_soft_max_num,
322 config.prepare_workers_hard_max_num,
323 config.cache_path.clone(),
324 to_prepare_pool,
325 from_prepare_pool,
326 );
327
328 let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
329 metrics,
330 config.execute_worker_program_path.to_owned(),
331 config.cache_path.clone(),
332 config.execute_workers_max_num,
333 config.execute_worker_spawn_timeout,
334 config.node_version,
335 security_status,
336 );
337
338 let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
339 let run_sweeper = sweeper_task(to_sweeper_rx);
340
341 let run_host = async move {
342 run(Inner {
343 cleanup_pulse_interval: Duration::from_secs(3600),
344 cleanup_config: ArtifactsCleanupConfig::default(),
345 artifacts,
346 to_host_rx,
347 to_prepare_queue_tx,
348 from_prepare_queue_rx,
349 to_execute_queue_tx,
350 from_execute_queue_rx,
351 to_sweeper_tx,
352 awaiting_prepare: AwaitingPrepare::default(),
353 })
354 .await
355 };
356
357 let task = async move {
358 futures::select! {
360 _ = run_host.fuse() => {},
361 _ = run_prepare_queue.fuse() => {},
362 _ = run_prepare_pool.fuse() => {},
363 _ = run_execute_queue.fuse() => {},
364 _ = run_sweeper.fuse() => {},
365 };
366 };
367
368 Ok((validation_host, task))
369}
370
371#[derive(Default)]
374struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
375
376impl AwaitingPrepare {
377 fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
378 self.0.entry(artifact_id).or_default().push(pending_execution_request);
379 }
380
381 fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
382 self.0.remove(artifact_id).unwrap_or_default()
383 }
384}
385
386struct Inner {
387 cleanup_pulse_interval: Duration,
388 cleanup_config: ArtifactsCleanupConfig,
389 artifacts: Artifacts,
390
391 to_host_rx: mpsc::Receiver<ToHost>,
392
393 to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
394 from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
395
396 to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
397 from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
398
399 to_sweeper_tx: mpsc::Sender<PathBuf>,
400
401 awaiting_prepare: AwaitingPrepare,
402}
403
404#[derive(Debug)]
405struct Fatal;
406
407async fn run(
408 Inner {
409 cleanup_pulse_interval,
410 cleanup_config,
411 mut artifacts,
412 to_host_rx,
413 from_prepare_queue_rx,
414 mut to_prepare_queue_tx,
415 from_execute_queue_rx,
416 mut to_execute_queue_tx,
417 mut to_sweeper_tx,
418 mut awaiting_prepare,
419 }: Inner,
420) {
421 macro_rules! break_if_fatal {
422 ($expr:expr) => {
423 match $expr {
424 Err(Fatal) => {
425 gum::error!(
426 target: LOG_TARGET,
427 "Fatal error occurred, terminating the host. Line: {}",
428 line!(),
429 );
430 break
431 },
432 Ok(v) => v,
433 }
434 };
435 }
436
437 let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
438 futures::pin_mut!(cleanup_pulse);
439
440 let mut to_host_rx = to_host_rx.fuse();
441 let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
442 let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
443
444 loop {
445 futures::select_biased! {
447 from_execute_queue_rx = from_execute_queue_rx.next() => {
448 let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
449 let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
450 break_if_fatal!(handle_artifact_removal(
451 &mut to_sweeper_tx,
452 &mut artifacts,
453 artifact,
454 reply_to,
455 ).await);
456 },
457 () = cleanup_pulse.select_next_some() => {
458 break_if_fatal!(handle_cleanup_pulse(
464 &mut to_sweeper_tx,
465 &mut artifacts,
466 &cleanup_config,
467 ).await);
468 },
469 to_host = to_host_rx.next() => {
470 let to_host = match to_host {
471 None => {
472 break;
475 },
476 Some(to_host) => to_host,
477 };
478
479 break_if_fatal!(handle_to_host(
482 &mut artifacts,
483 &mut to_prepare_queue_tx,
484 &mut to_execute_queue_tx,
485 &mut awaiting_prepare,
486 to_host,
487 )
488 .await);
489 },
490 from_prepare_queue = from_prepare_queue_rx.next() => {
491 let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
492
493 break_if_fatal!(handle_prepare_done(
503 &mut artifacts,
504 &mut to_execute_queue_tx,
505 &mut awaiting_prepare,
506 from_queue,
507 ).await);
508 },
509 }
510 }
511}
512
513async fn handle_to_host(
514 artifacts: &mut Artifacts,
515 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
516 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
517 awaiting_prepare: &mut AwaitingPrepare,
518 to_host: ToHost,
519) -> Result<(), Fatal> {
520 match to_host {
521 ToHost::PrecheckPvf { pvf, result_tx } => {
522 handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
523 },
524 ToHost::ExecutePvf(inputs) => {
525 handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
526 .await?;
527 },
528 ToHost::HeadsUp { active_pvfs } => {
529 handle_heads_up(artifacts, prepare_queue, active_pvfs).await?
530 },
531 ToHost::UpdateActiveLeaves { update, ancestors } => {
532 handle_update_active_leaves(execute_queue, update, ancestors).await?
533 },
534 #[cfg(feature = "test-utils")]
535 ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => {
536 artifacts.replace_artifact_checksum(checksum, new_checksum);
537 },
538 }
539
540 Ok(())
541}
542
543async fn handle_precheck_pvf(
551 artifacts: &mut Artifacts,
552 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
553 pvf: PvfPrepData,
554 result_sender: PrecheckResultSender,
555) -> Result<(), Fatal> {
556 let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
557
558 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
559 match state {
560 ArtifactState::Prepared { last_time_needed, .. } => {
561 *last_time_needed = SystemTime::now();
562 let _ = result_sender.send(Ok(()));
563 },
564 ArtifactState::Preparing { waiting_for_response, num_failures: _ } => {
565 waiting_for_response.push(result_sender)
566 },
567 ArtifactState::FailedToProcess { error, .. } => {
568 let _ = result_sender.send(PrecheckResult::Err(error.clone()));
570 },
571 }
572 } else {
573 artifacts.insert_preparing(artifact_id, vec![result_sender]);
574 send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
575 .await?;
576 }
577 Ok(())
578}
579
580async fn handle_execute_pvf(
593 artifacts: &mut Artifacts,
594 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
595 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
596 awaiting_prepare: &mut AwaitingPrepare,
597 inputs: ExecutePvfInputs,
598) -> Result<(), Fatal> {
599 let ExecutePvfInputs { pvf, validation_context, priority, exec_kind, result_tx } = inputs;
600 let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
601 let exec_timeout = validation_context.exec_timeout;
602
603 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
604 match state {
605 ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => {
606 let file_metadata = std::fs::metadata(path);
607
608 if file_metadata.is_ok() {
609 *last_time_needed = SystemTime::now();
610
611 send_execute(
613 execute_queue,
614 execute::ToQueue::Enqueue {
615 artifact: ArtifactPathId::new(artifact_id, path, *checksum),
616 pending_execution_request: PendingExecutionRequest {
617 exec_timeout,
618 validation_context,
619 exec_kind,
620 result_tx,
621 },
622 },
623 )
624 .await?;
625 } else {
626 gum::warn!(
627 target: LOG_TARGET,
628 ?pvf,
629 ?artifact_id,
630 "handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
631 );
632
633 *state = ArtifactState::Preparing {
636 waiting_for_response: Vec::new(),
637 num_failures: 0,
638 };
639 enqueue_prepare_for_execute(
640 prepare_queue,
641 awaiting_prepare,
642 pvf,
643 priority,
644 artifact_id,
645 PendingExecutionRequest {
646 exec_timeout,
647 validation_context,
648 exec_kind,
649 result_tx,
650 },
651 )
652 .await?;
653 }
654 },
655 ArtifactState::Preparing { .. } => {
656 awaiting_prepare.add(
657 artifact_id,
658 PendingExecutionRequest {
659 exec_timeout,
660 validation_context,
661 result_tx,
662 exec_kind,
663 },
664 );
665 },
666 ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
667 if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
668 gum::warn!(
669 target: LOG_TARGET,
670 ?pvf,
671 ?artifact_id,
672 ?last_time_failed,
673 %num_failures,
674 %error,
675 "handle_execute_pvf: Re-trying failed PVF preparation."
676 );
677
678 *state = ArtifactState::Preparing {
681 waiting_for_response: Vec::new(),
682 num_failures: *num_failures,
683 };
684 enqueue_prepare_for_execute(
685 prepare_queue,
686 awaiting_prepare,
687 pvf,
688 priority,
689 artifact_id,
690 PendingExecutionRequest {
691 exec_timeout,
692 validation_context,
693 exec_kind,
694 result_tx,
695 },
696 )
697 .await?;
698 } else {
699 let _ = result_tx.send(Err(ValidationError::from(error.clone())));
700 }
701 },
702 }
703 } else {
704 artifacts.insert_preparing(artifact_id.clone(), Vec::new());
707 enqueue_prepare_for_execute(
708 prepare_queue,
709 awaiting_prepare,
710 pvf,
711 priority,
712 artifact_id,
713 PendingExecutionRequest { exec_timeout, validation_context, result_tx, exec_kind },
714 )
715 .await?;
716 }
717
718 Ok(())
719}
720
721async fn handle_heads_up(
722 artifacts: &mut Artifacts,
723 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
724 active_pvfs: Vec<PvfPrepData>,
725) -> Result<(), Fatal> {
726 let now = SystemTime::now();
727
728 for active_pvf in active_pvfs {
729 let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
730 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
731 match state {
732 ArtifactState::Prepared { last_time_needed, .. } => {
733 *last_time_needed = now;
734 },
735 ArtifactState::Preparing { .. } => {
736 },
738 ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
739 if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
740 gum::warn!(
741 target: LOG_TARGET,
742 ?active_pvf,
743 ?artifact_id,
744 ?last_time_failed,
745 %num_failures,
746 %error,
747 "handle_heads_up: Re-trying failed PVF preparation."
748 );
749
750 *state = ArtifactState::Preparing {
753 waiting_for_response: vec![],
754 num_failures: *num_failures,
755 };
756 send_prepare(
757 prepare_queue,
758 prepare::ToQueue::Enqueue {
759 priority: Priority::Normal,
760 pvf: active_pvf,
761 },
762 )
763 .await?;
764 }
765 },
766 }
767 } else {
768 artifacts.insert_preparing(artifact_id.clone(), Vec::new());
770
771 send_prepare(
772 prepare_queue,
773 prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
774 )
775 .await?;
776 }
777 }
778
779 Ok(())
780}
781
782async fn handle_prepare_done(
783 artifacts: &mut Artifacts,
784 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
785 awaiting_prepare: &mut AwaitingPrepare,
786 from_queue: prepare::FromQueue,
787) -> Result<(), Fatal> {
788 let prepare::FromQueue { artifact_id, result } = from_queue;
789
790 let state = match artifacts.artifact_state_mut(&artifact_id) {
792 None => {
793 never!("an unknown artifact was prepared: {:?}", artifact_id);
799 return Ok(());
800 },
801 Some(ArtifactState::Prepared { .. }) => {
802 never!("the artifact is already prepared: {:?}", artifact_id);
808 return Ok(());
809 },
810 Some(ArtifactState::FailedToProcess { .. }) => {
811 never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
814 return Ok(());
815 },
816 Some(state @ ArtifactState::Preparing { .. }) => state,
817 };
818
819 let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
820 state
821 {
822 for result_sender in waiting_for_response.drain(..) {
823 let result = result.clone().map(|_| ());
824 let _ = result_sender.send(result);
825 }
826 num_failures
827 } else {
828 never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
829 return Ok(());
830 };
831
832 let pending_requests = awaiting_prepare.take(&artifact_id);
835 for PendingExecutionRequest { exec_timeout, validation_context, result_tx, exec_kind } in
836 pending_requests
837 {
838 if result_tx.is_canceled() {
839 continue;
842 }
843
844 let (path, checksum) = match &result {
845 Ok(success) => (success.path.clone(), success.checksum),
846 Err(error) => {
847 let _ = result_tx.send(Err(ValidationError::from(error.clone())));
848 continue;
849 },
850 };
851
852 send_execute(
853 execute_queue,
854 execute::ToQueue::Enqueue {
855 artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum),
856 pending_execution_request: PendingExecutionRequest {
857 exec_timeout,
858 validation_context,
859 result_tx,
860 exec_kind,
861 },
862 },
863 )
864 .await?;
865 }
866
867 *state = match result {
868 Ok(PrepareSuccess { checksum, path, size, .. }) => {
869 ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size }
870 },
871 Err(error) => {
872 let last_time_failed = SystemTime::now();
873 let num_failures = *num_failures + 1;
874
875 gum::error!(
876 target: LOG_TARGET,
877 ?artifact_id,
878 time_failed = ?last_time_failed,
879 %num_failures,
880 "artifact preparation failed: {}",
881 error
882 );
883 ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
884 },
885 };
886
887 Ok(())
888}
889
890async fn handle_update_active_leaves(
891 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
892 update: ActiveLeavesUpdate,
893 ancestors: Vec<Hash>,
894) -> Result<(), Fatal> {
895 send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
896}
897
898async fn send_prepare(
899 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
900 to_queue: prepare::ToQueue,
901) -> Result<(), Fatal> {
902 prepare_queue.send(to_queue).await.map_err(|_| Fatal)
903}
904
905async fn send_execute(
906 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
907 to_queue: execute::ToQueue,
908) -> Result<(), Fatal> {
909 execute_queue.send(to_queue).await.map_err(|_| Fatal)
910}
911
912async fn enqueue_prepare_for_execute(
915 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
916 awaiting_prepare: &mut AwaitingPrepare,
917 pvf: PvfPrepData,
918 priority: Priority,
919 artifact_id: ArtifactId,
920 pending_execution_request: PendingExecutionRequest,
921) -> Result<(), Fatal> {
922 send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
923
924 awaiting_prepare.add(artifact_id, pending_execution_request);
926
927 Ok(())
928}
929
930async fn handle_cleanup_pulse(
931 sweeper_tx: &mut mpsc::Sender<PathBuf>,
932 artifacts: &mut Artifacts,
933 cleanup_config: &ArtifactsCleanupConfig,
934) -> Result<(), Fatal> {
935 let to_remove = artifacts.prune(cleanup_config);
936 gum::debug!(
937 target: LOG_TARGET,
938 "PVF pruning: {} artifacts reached their end of life",
939 to_remove.len(),
940 );
941 for (artifact_id, path) in to_remove {
942 gum::debug!(
943 target: LOG_TARGET,
944 validation_code_hash = ?artifact_id.code_hash,
945 "pruning artifact",
946 );
947 sweeper_tx.send(path).await.map_err(|_| Fatal)?;
948 }
949
950 Ok(())
951}
952
953async fn handle_artifact_removal(
954 sweeper_tx: &mut mpsc::Sender<PathBuf>,
955 artifacts: &mut Artifacts,
956 artifact_id: ArtifactId,
957 reply_to: oneshot::Sender<()>,
958) -> Result<(), Fatal> {
959 let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
960 artifact
961 } else {
962 return Ok(());
967 };
968 reply_to
969 .send(())
970 .expect("the execute queue waits for the artifact remove confirmation; qed");
971 gum::debug!(
976 target: LOG_TARGET,
977 validation_code_hash = ?artifact_id.code_hash,
978 "PVF pruning: pruning artifact by request from the execute queue",
979 );
980 sweeper_tx.send(path).await.map_err(|_| Fatal)?;
981 Ok(())
982}
983
984async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
986 loop {
987 match sweeper_rx.next().await {
988 None => break,
989 Some(condemned) => {
990 let result = tokio::fs::remove_file(&condemned).await;
991 gum::trace!(
992 target: LOG_TARGET,
993 ?result,
994 "Swept the artifact file {}",
995 condemned.display(),
996 );
997 },
998 }
999 }
1000}
1001
1002fn can_retry_prepare_after_failure(
1004 last_time_failed: SystemTime,
1005 num_failures: u32,
1006 error: &PrepareError,
1007) -> bool {
1008 if error.is_deterministic() {
1009 return false;
1011 }
1012
1013 SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
1016 num_failures <= NUM_PREPARE_RETRIES
1017}
1018
1019fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
1021 futures::stream::unfold(interval, {
1022 |interval| async move {
1023 futures_timer::Delay::new(interval).await;
1024 Some(((), interval))
1025 }
1026 })
1027 .map(|_| ())
1028}
1029
1030#[cfg(test)]
1031pub(crate) mod tests {
1032 use super::*;
1033 use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
1034 use assert_matches::assert_matches;
1035 use futures::future::BoxFuture;
1036 use polkadot_node_core_pvf_common::execute::ValidationContext;
1037 use polkadot_node_primitives::{BlockData, PoV};
1038 use polkadot_primitives::{
1039 CandidateReceiptV2 as CandidateReceipt, ExecutorParams, PersistedValidationData,
1040 };
1041 use polkadot_primitives_test_helpers::dummy_candidate_receipt;
1042 use sp_core::H256;
1043 use std::sync::Arc;
1044
1045 const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
1046 pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
1047
1048 #[tokio::test]
1049 async fn pulse_test() {
1050 let pulse = pulse_every(Duration::from_millis(100));
1051 futures::pin_mut!(pulse);
1052
1053 for _ in 0..5 {
1054 let start = std::time::Instant::now();
1055 let _ = pulse.next().await.unwrap();
1056
1057 let el = start.elapsed().as_millis();
1058 assert!(el > 50 && el < 150, "pulse duration: {}", el);
1059 }
1060 }
1061
1062 struct Builder {
1063 cleanup_pulse_interval: Duration,
1064 cleanup_config: ArtifactsCleanupConfig,
1065 artifacts: Artifacts,
1066 }
1067
1068 impl Builder {
1069 fn default() -> Self {
1070 Self {
1071 cleanup_pulse_interval: Duration::from_secs(3600),
1073 cleanup_config: ArtifactsCleanupConfig::default(),
1074 artifacts: Artifacts::empty(),
1075 }
1076 }
1077
1078 fn build(self) -> Test {
1079 Test::new(self)
1080 }
1081 }
1082
1083 struct Test {
1084 to_host_tx: Option<mpsc::Sender<ToHost>>,
1085
1086 to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
1087 from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
1088 to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
1089 #[allow(unused)]
1090 from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
1091 to_sweeper_rx: mpsc::Receiver<PathBuf>,
1092
1093 run: BoxFuture<'static, ()>,
1094 }
1095
1096 impl Test {
1097 fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
1098 let (to_host_tx, to_host_rx) = mpsc::channel(10);
1099 let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
1100 let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
1101 let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
1102 let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
1103 let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
1104
1105 let run = run(Inner {
1106 cleanup_pulse_interval,
1107 cleanup_config,
1108 artifacts,
1109 to_host_rx,
1110 to_prepare_queue_tx,
1111 from_prepare_queue_rx,
1112 to_execute_queue_tx,
1113 from_execute_queue_rx,
1114 to_sweeper_tx,
1115 awaiting_prepare: AwaitingPrepare::default(),
1116 })
1117 .boxed();
1118
1119 Self {
1120 to_host_tx: Some(to_host_tx),
1121 to_prepare_queue_rx,
1122 from_prepare_queue_tx,
1123 to_execute_queue_rx,
1124 from_execute_queue_tx,
1125 to_sweeper_rx,
1126 run,
1127 }
1128 }
1129
1130 fn host_handle(&mut self) -> ValidationHost {
1131 let to_host_tx = self.to_host_tx.take().unwrap();
1132 let security_status = Default::default();
1133 ValidationHost { to_host_tx, security_status }
1134 }
1135
1136 async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
1137 where
1138 T: Send,
1139 {
1140 run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
1141 }
1142
1143 async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
1144 let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1145 run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
1146 .await
1147 }
1148
1149 async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
1150 let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1151 run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
1152 .await
1153 }
1154
1155 async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
1156 use futures_timer::Delay;
1157
1158 let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1159 run_until(
1160 &mut self.run,
1161 async {
1162 futures::select! {
1163 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1164 _ = to_prepare_queue_rx.next().fuse() => {
1165 panic!("the prepare queue is supposed to be empty")
1166 }
1167 }
1168 }
1169 .boxed(),
1170 )
1171 .await
1172 }
1173
1174 async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
1175 use futures_timer::Delay;
1176
1177 let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1178 run_until(
1179 &mut self.run,
1180 async {
1181 futures::select! {
1182 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1183 _ = to_execute_queue_rx.next().fuse() => {
1184 panic!("the execute queue is supposed to be empty")
1185 }
1186 }
1187 }
1188 .boxed(),
1189 )
1190 .await
1191 }
1192
1193 async fn poll_ensure_to_sweeper_is_empty(&mut self) {
1194 use futures_timer::Delay;
1195
1196 let to_sweeper_rx = &mut self.to_sweeper_rx;
1197 run_until(
1198 &mut self.run,
1199 async {
1200 futures::select! {
1201 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1202 msg = to_sweeper_rx.next().fuse() => {
1203 panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
1204 }
1205 }
1206 }
1207 .boxed(),
1208 )
1209 .await
1210 }
1211 }
1212
1213 async fn run_until<R>(
1214 task: &mut (impl Future<Output = ()> + Unpin),
1215 mut fut: impl Future<Output = R> + Unpin,
1216 ) -> R {
1217 use std::task::Poll;
1218
1219 let start = std::time::Instant::now();
1220 let fut = &mut fut;
1221 loop {
1222 if start.elapsed() > std::time::Duration::from_secs(2) {
1223 panic!("timeout");
1226 }
1227
1228 if let Poll::Ready(r) = futures::poll!(&mut *fut) {
1229 break r;
1230 }
1231
1232 if futures::poll!(&mut *task).is_ready() {
1233 panic!()
1234 }
1235 }
1236 }
1237
1238 fn test_validation_context(
1240 pvd: Arc<PersistedValidationData>,
1241 pov: Arc<PoV>,
1242 ) -> ValidationContext {
1243 let candidate_receipt: CandidateReceipt = dummy_candidate_receipt(H256::default()).into();
1244 ValidationContext {
1245 candidate_receipt,
1246 pvd,
1247 pov,
1248 executor_params: ExecutorParams::default(),
1249 exec_timeout: TEST_EXECUTION_TIMEOUT,
1250 v3_seen: false,
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn shutdown_on_handle_drop() {
1256 let test = Builder::default().build();
1257
1258 let join_handle = tokio::task::spawn(test.run);
1259
1260 drop(test.to_host_tx);
1263 join_handle.await.unwrap();
1264 }
1265
1266 #[tokio::test]
1267 async fn pruning() {
1268 let mock_now = SystemTime::now() - Duration::from_millis(1000);
1269 let tempdir = tempfile::tempdir().unwrap();
1270 let cache_path = tempdir.path();
1271
1272 let mut builder = Builder::default();
1273 builder.cleanup_pulse_interval = Duration::from_millis(100);
1274 builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
1275 let path1 = generate_artifact_path(cache_path);
1276 let path2 = generate_artifact_path(cache_path);
1277 builder.artifacts.insert_prepared(
1278 artifact_id(1),
1279 path1.clone(),
1280 Default::default(),
1281 mock_now,
1282 1024,
1283 );
1284 builder.artifacts.insert_prepared(
1285 artifact_id(2),
1286 path2.clone(),
1287 Default::default(),
1288 mock_now,
1289 1024,
1290 );
1291 let mut test = builder.build();
1292 let mut host = test.host_handle();
1293
1294 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1295
1296 let to_sweeper_rx = &mut test.to_sweeper_rx;
1297 run_until(
1298 &mut test.run,
1299 async {
1300 assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
1301 }
1302 .boxed(),
1303 )
1304 .await;
1305
1306 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1309 test.poll_ensure_to_sweeper_is_empty().await;
1310 }
1311
1312 #[tokio::test]
1313 async fn execute_pvf_requests() {
1314 let mut test = Builder::default().build();
1315 let mut host = test.host_handle();
1316 let pvd = Arc::new(PersistedValidationData {
1317 parent_head: Default::default(),
1318 relay_parent_number: 1u32,
1319 relay_parent_storage_root: H256::default(),
1320 max_pov_size: 4096 * 1024,
1321 });
1322 let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
1323 let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
1324
1325 let validation_context_1 = test_validation_context(pvd.clone(), pov1.clone());
1328
1329 let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
1330 host.execute_pvf(
1331 PvfPrepData::from_discriminator(1),
1332 validation_context_1.clone(),
1333 Priority::Normal,
1334 PvfExecKind::Backing(H256::default()),
1335 result_tx,
1336 )
1337 .await
1338 .unwrap();
1339
1340 let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
1341 host.execute_pvf(
1342 PvfPrepData::from_discriminator(1),
1343 validation_context_1,
1344 Priority::Critical,
1345 PvfExecKind::Backing(H256::default()),
1346 result_tx,
1347 )
1348 .await
1349 .unwrap();
1350
1351 let (result_tx, result_rx_pvf_2) = oneshot::channel();
1352 let validation_context_2 = test_validation_context(pvd, pov2);
1353 host.execute_pvf(
1354 PvfPrepData::from_discriminator(2),
1355 validation_context_2,
1356 Priority::Normal,
1357 PvfExecKind::Backing(H256::default()),
1358 result_tx,
1359 )
1360 .await
1361 .unwrap();
1362
1363 assert_matches!(
1364 test.poll_and_recv_to_prepare_queue().await,
1365 prepare::ToQueue::Enqueue { .. }
1366 );
1367 assert_matches!(
1368 test.poll_and_recv_to_prepare_queue().await,
1369 prepare::ToQueue::Enqueue { .. }
1370 );
1371
1372 test.from_prepare_queue_tx
1373 .send(prepare::FromQueue {
1374 artifact_id: artifact_id(1),
1375 result: Ok(PrepareSuccess::default()),
1376 })
1377 .await
1378 .unwrap();
1379 let result_tx_pvf_1_1 = assert_matches!(
1380 test.poll_and_recv_to_execute_queue().await,
1381 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1382 );
1383 let result_tx_pvf_1_2 = assert_matches!(
1384 test.poll_and_recv_to_execute_queue().await,
1385 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1386 );
1387
1388 test.from_prepare_queue_tx
1389 .send(prepare::FromQueue {
1390 artifact_id: artifact_id(2),
1391 result: Ok(PrepareSuccess::default()),
1392 })
1393 .await
1394 .unwrap();
1395 let result_tx_pvf_2 = assert_matches!(
1396 test.poll_and_recv_to_execute_queue().await,
1397 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1398 );
1399
1400 result_tx_pvf_1_1
1401 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1402 .unwrap();
1403 assert_matches!(
1404 result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
1405 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1406 );
1407
1408 result_tx_pvf_1_2
1409 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1410 .unwrap();
1411 assert_matches!(
1412 result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
1413 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1414 );
1415
1416 result_tx_pvf_2
1417 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1418 .unwrap();
1419 assert_matches!(
1420 result_rx_pvf_2.now_or_never().unwrap().unwrap(),
1421 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn precheck_pvf() {
1427 let mut test = Builder::default().build();
1428 let mut host = test.host_handle();
1429
1430 let (result_tx, result_rx) = oneshot::channel();
1432 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1433 .await
1434 .unwrap();
1435
1436 assert_matches!(
1438 test.poll_and_recv_to_prepare_queue().await,
1439 prepare::ToQueue::Enqueue { .. }
1440 );
1441 test.from_prepare_queue_tx
1443 .send(prepare::FromQueue {
1444 artifact_id: artifact_id(1),
1445 result: Ok(PrepareSuccess::default()),
1446 })
1447 .await
1448 .unwrap();
1449 test.poll_ensure_to_execute_queue_is_empty().await;
1451 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1453
1454 let mut precheck_receivers = Vec::new();
1456 for _ in 0..3 {
1457 let (result_tx, result_rx) = oneshot::channel();
1458 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1459 .await
1460 .unwrap();
1461 precheck_receivers.push(result_rx);
1462 }
1463 assert_matches!(
1465 test.poll_and_recv_to_prepare_queue().await,
1466 prepare::ToQueue::Enqueue { .. }
1467 );
1468 test.from_prepare_queue_tx
1469 .send(prepare::FromQueue {
1470 artifact_id: artifact_id(2),
1471 result: Err(PrepareError::TimedOut),
1472 })
1473 .await
1474 .unwrap();
1475 test.poll_ensure_to_execute_queue_is_empty().await;
1476 for result_rx in precheck_receivers {
1477 assert_matches!(
1478 result_rx.now_or_never().unwrap().unwrap(),
1479 Err(PrepareError::TimedOut)
1480 );
1481 }
1482 }
1483
1484 #[tokio::test]
1485 async fn test_prepare_done() {
1486 let mut test = Builder::default().build();
1487 let mut host = test.host_handle();
1488 let pvd = Arc::new(PersistedValidationData {
1489 parent_head: Default::default(),
1490 relay_parent_number: 1u32,
1491 relay_parent_storage_root: H256::default(),
1492 max_pov_size: 4096 * 1024,
1493 });
1494 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1495
1496 let (result_tx, result_rx_execute) = oneshot::channel();
1501 let validation_context = test_validation_context(pvd.clone(), pov.clone());
1502 host.execute_pvf(
1503 PvfPrepData::from_discriminator(1),
1504 validation_context,
1505 Priority::Critical,
1506 PvfExecKind::Backing(H256::default()),
1507 result_tx,
1508 )
1509 .await
1510 .unwrap();
1511
1512 assert_matches!(
1513 test.poll_and_recv_to_prepare_queue().await,
1514 prepare::ToQueue::Enqueue { .. }
1515 );
1516
1517 let (result_tx, result_rx) = oneshot::channel();
1518 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1519 .await
1520 .unwrap();
1521
1522 test.from_prepare_queue_tx
1525 .send(prepare::FromQueue {
1526 artifact_id: artifact_id(1),
1527 result: Err(PrepareError::TimedOut),
1528 })
1529 .await
1530 .unwrap();
1531 test.poll_ensure_to_execute_queue_is_empty().await;
1532 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
1533 assert_matches!(
1534 result_rx_execute.now_or_never().unwrap().unwrap(),
1535 Err(ValidationError::Internal(_))
1536 );
1537
1538 let mut precheck_receivers = Vec::new();
1540 for _ in 0..3 {
1541 let (result_tx, result_rx) = oneshot::channel();
1542 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1543 .await
1544 .unwrap();
1545 precheck_receivers.push(result_rx);
1546 }
1547
1548 let (result_tx, _result_rx_execute) = oneshot::channel();
1549 let validation_context = test_validation_context(pvd, pov);
1550 host.execute_pvf(
1551 PvfPrepData::from_discriminator(2),
1552 validation_context,
1553 Priority::Critical,
1554 PvfExecKind::Backing(H256::default()),
1555 result_tx,
1556 )
1557 .await
1558 .unwrap();
1559 assert_matches!(
1561 test.poll_and_recv_to_prepare_queue().await,
1562 prepare::ToQueue::Enqueue { .. }
1563 );
1564 test.from_prepare_queue_tx
1565 .send(prepare::FromQueue {
1566 artifact_id: artifact_id(2),
1567 result: Ok(PrepareSuccess::default()),
1568 })
1569 .await
1570 .unwrap();
1571 assert_matches!(
1574 test.poll_and_recv_to_execute_queue().await,
1575 execute::ToQueue::Enqueue { .. }
1576 );
1577 for result_rx in precheck_receivers {
1578 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1579 }
1580 }
1581
1582 #[tokio::test]
1585 async fn test_precheck_prepare_no_retry() {
1586 let mut test = Builder::default().build();
1587 let mut host = test.host_handle();
1588
1589 let (result_tx, result_rx) = oneshot::channel();
1591 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1592 .await
1593 .unwrap();
1594
1595 assert_matches!(
1597 test.poll_and_recv_to_prepare_queue().await,
1598 prepare::ToQueue::Enqueue { .. }
1599 );
1600 test.from_prepare_queue_tx
1602 .send(prepare::FromQueue {
1603 artifact_id: artifact_id(1),
1604 result: Err(PrepareError::TimedOut),
1605 })
1606 .await
1607 .unwrap();
1608
1609 let result = test.poll_and_recv_result(result_rx).await;
1611 assert_matches!(result, Err(PrepareError::TimedOut));
1612
1613 let (result_tx_2, result_rx_2) = oneshot::channel();
1615 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
1616 .await
1617 .unwrap();
1618
1619 test.poll_ensure_to_prepare_queue_is_empty().await;
1621
1622 let result = test.poll_and_recv_result(result_rx_2).await;
1624 assert_matches!(result, Err(PrepareError::TimedOut));
1625
1626 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1628
1629 let (result_tx_3, result_rx_3) = oneshot::channel();
1631 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
1632 .await
1633 .unwrap();
1634
1635 test.poll_ensure_to_prepare_queue_is_empty().await;
1637
1638 let result = test.poll_and_recv_result(result_rx_3).await;
1640 assert_matches!(result, Err(PrepareError::TimedOut));
1641 }
1642
1643 #[tokio::test]
1646 async fn test_execute_prepare_retry() {
1647 let mut test = Builder::default().build();
1648 let mut host = test.host_handle();
1649 let pvd = Arc::new(PersistedValidationData {
1650 parent_head: Default::default(),
1651 relay_parent_number: 1u32,
1652 relay_parent_storage_root: H256::default(),
1653 max_pov_size: 4096 * 1024,
1654 });
1655 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1656
1657 let (result_tx, result_rx) = oneshot::channel();
1659 let validation_context = test_validation_context(pvd.clone(), pov.clone());
1660 host.execute_pvf(
1661 PvfPrepData::from_discriminator(1),
1662 validation_context.clone(),
1663 Priority::Critical,
1664 PvfExecKind::Backing(H256::default()),
1665 result_tx,
1666 )
1667 .await
1668 .unwrap();
1669
1670 assert_matches!(
1672 test.poll_and_recv_to_prepare_queue().await,
1673 prepare::ToQueue::Enqueue { .. }
1674 );
1675 test.from_prepare_queue_tx
1677 .send(prepare::FromQueue {
1678 artifact_id: artifact_id(1),
1679 result: Err(PrepareError::TimedOut),
1680 })
1681 .await
1682 .unwrap();
1683
1684 let result = test.poll_and_recv_result(result_rx).await;
1686 assert_matches!(result, Err(ValidationError::Internal(_)));
1687
1688 let (result_tx_2, result_rx_2) = oneshot::channel();
1690 host.execute_pvf(
1691 PvfPrepData::from_discriminator(1),
1692 validation_context.clone(),
1693 Priority::Critical,
1694 PvfExecKind::Backing(H256::default()),
1695 result_tx_2,
1696 )
1697 .await
1698 .unwrap();
1699
1700 test.poll_ensure_to_prepare_queue_is_empty().await;
1702
1703 let result = test.poll_and_recv_result(result_rx_2).await;
1705 assert_matches!(result, Err(ValidationError::Internal(_)));
1706
1707 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1709
1710 let (result_tx_3, result_rx_3) = oneshot::channel();
1712 host.execute_pvf(
1713 PvfPrepData::from_discriminator(1),
1714 validation_context,
1715 Priority::Critical,
1716 PvfExecKind::Backing(H256::default()),
1717 result_tx_3,
1718 )
1719 .await
1720 .unwrap();
1721
1722 assert_matches!(
1724 test.poll_and_recv_to_prepare_queue().await,
1725 prepare::ToQueue::Enqueue { .. }
1726 );
1727
1728 test.from_prepare_queue_tx
1729 .send(prepare::FromQueue {
1730 artifact_id: artifact_id(1),
1731 result: Ok(PrepareSuccess::default()),
1732 })
1733 .await
1734 .unwrap();
1735
1736 let result_tx_3 = assert_matches!(
1738 test.poll_and_recv_to_execute_queue().await,
1739 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1740 );
1741
1742 result_tx_3
1745 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1746 .unwrap();
1747 assert_matches!(
1748 result_rx_3.now_or_never().unwrap().unwrap(),
1749 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1750 );
1751 }
1752
1753 #[tokio::test]
1756 async fn test_execute_prepare_no_retry() {
1757 let mut test = Builder::default().build();
1758 let mut host = test.host_handle();
1759 let pvd = Arc::new(PersistedValidationData {
1760 parent_head: Default::default(),
1761 relay_parent_number: 1u32,
1762 relay_parent_storage_root: H256::default(),
1763 max_pov_size: 4096 * 1024,
1764 });
1765 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1766
1767 let (result_tx, result_rx) = oneshot::channel();
1769 let validation_context = test_validation_context(pvd.clone(), pov.clone());
1770 host.execute_pvf(
1771 PvfPrepData::from_discriminator(1),
1772 validation_context.clone(),
1773 Priority::Critical,
1774 PvfExecKind::Backing(H256::default()),
1775 result_tx,
1776 )
1777 .await
1778 .unwrap();
1779
1780 assert_matches!(
1782 test.poll_and_recv_to_prepare_queue().await,
1783 prepare::ToQueue::Enqueue { .. }
1784 );
1785 test.from_prepare_queue_tx
1787 .send(prepare::FromQueue {
1788 artifact_id: artifact_id(1),
1789 result: Err(PrepareError::Prevalidation("reproducible error".into())),
1790 })
1791 .await
1792 .unwrap();
1793
1794 let result = test.poll_and_recv_result(result_rx).await;
1796 assert_matches!(result, Err(ValidationError::Preparation(_)));
1797
1798 let (result_tx_2, result_rx_2) = oneshot::channel();
1800 host.execute_pvf(
1801 PvfPrepData::from_discriminator(1),
1802 validation_context.clone(),
1803 Priority::Critical,
1804 PvfExecKind::Backing(H256::default()),
1805 result_tx_2,
1806 )
1807 .await
1808 .unwrap();
1809
1810 test.poll_ensure_to_prepare_queue_is_empty().await;
1812
1813 let result = test.poll_and_recv_result(result_rx_2).await;
1815 assert_matches!(result, Err(ValidationError::Preparation(_)));
1816
1817 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1819
1820 let (result_tx_3, result_rx_3) = oneshot::channel();
1822 host.execute_pvf(
1823 PvfPrepData::from_discriminator(1),
1824 validation_context,
1825 Priority::Critical,
1826 PvfExecKind::Backing(H256::default()),
1827 result_tx_3,
1828 )
1829 .await
1830 .unwrap();
1831
1832 test.poll_ensure_to_prepare_queue_is_empty().await;
1834
1835 let result = test.poll_and_recv_result(result_rx_3).await;
1837 assert_matches!(result, Err(ValidationError::Preparation(_)));
1838 }
1839
1840 #[tokio::test]
1842 async fn test_heads_up_prepare_retry() {
1843 let mut test = Builder::default().build();
1844 let mut host = test.host_handle();
1845
1846 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1848
1849 assert_matches!(
1851 test.poll_and_recv_to_prepare_queue().await,
1852 prepare::ToQueue::Enqueue { .. }
1853 );
1854 test.from_prepare_queue_tx
1856 .send(prepare::FromQueue {
1857 artifact_id: artifact_id(1),
1858 result: Err(PrepareError::TimedOut),
1859 })
1860 .await
1861 .unwrap();
1862
1863 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1865
1866 test.poll_ensure_to_prepare_queue_is_empty().await;
1868
1869 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1871
1872 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1874
1875 assert_matches!(
1877 test.poll_and_recv_to_prepare_queue().await,
1878 prepare::ToQueue::Enqueue { .. }
1879 );
1880 }
1881
1882 #[tokio::test]
1883 async fn cancellation() {
1884 let mut test = Builder::default().build();
1885 let mut host = test.host_handle();
1886 let pvd = Arc::new(PersistedValidationData {
1887 parent_head: Default::default(),
1888 relay_parent_number: 1u32,
1889 relay_parent_storage_root: H256::default(),
1890 max_pov_size: 4096 * 1024,
1891 });
1892 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1893
1894 let (result_tx, result_rx) = oneshot::channel();
1895 let validation_context = test_validation_context(pvd, pov);
1896 host.execute_pvf(
1897 PvfPrepData::from_discriminator(1),
1898 validation_context,
1899 Priority::Normal,
1900 PvfExecKind::Backing(H256::default()),
1901 result_tx,
1902 )
1903 .await
1904 .unwrap();
1905
1906 assert_matches!(
1907 test.poll_and_recv_to_prepare_queue().await,
1908 prepare::ToQueue::Enqueue { .. }
1909 );
1910
1911 test.from_prepare_queue_tx
1912 .send(prepare::FromQueue {
1913 artifact_id: artifact_id(1),
1914 result: Ok(PrepareSuccess::default()),
1915 })
1916 .await
1917 .unwrap();
1918
1919 drop(result_rx);
1920
1921 test.poll_ensure_to_execute_queue_is_empty().await;
1922 }
1923}