1use crate::{
24 artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
25 execute::{self, PendingExecutionRequest},
26 metrics::Metrics,
27 prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
28};
29use always_assert::never;
30use futures::{
31 channel::{mpsc, oneshot},
32 Future, FutureExt, SinkExt, StreamExt,
33};
34#[cfg(feature = "test-utils")]
35use polkadot_node_core_pvf_common::ArtifactChecksum;
36use polkadot_node_core_pvf_common::{
37 error::{PrecheckResult, PrepareError},
38 prepare::PrepareSuccess,
39 pvf::PvfPrepData,
40};
41use polkadot_node_primitives::PoV;
42use polkadot_node_subsystem::{
43 messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
44};
45use polkadot_parachain_primitives::primitives::ValidationResult;
46use polkadot_primitives::{Hash, PersistedValidationData};
47use std::{
48 collections::HashMap,
49 path::PathBuf,
50 sync::Arc,
51 time::{Duration, SystemTime},
52};
53
54#[cfg(not(test))]
57pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
58#[cfg(test)]
59pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
60
61pub const NUM_PREPARE_RETRIES: u32 = 5;
63
64pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
66
67pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
69
70pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
72
73pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
75
76pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
78
79#[derive(Clone)]
81pub struct ValidationHost {
82 to_host_tx: mpsc::Sender<ToHost>,
83 pub security_status: SecurityStatus,
85}
86
87impl ValidationHost {
88 pub async fn precheck_pvf(
97 &mut self,
98 pvf: PvfPrepData,
99 result_tx: PrecheckResultSender,
100 ) -> Result<(), String> {
101 self.to_host_tx
102 .send(ToHost::PrecheckPvf { pvf, result_tx })
103 .await
104 .map_err(|_| "the inner loop hung up".to_string())
105 }
106
107 pub async fn execute_pvf(
115 &mut self,
116 pvf: PvfPrepData,
117 exec_timeout: Duration,
118 pvd: Arc<PersistedValidationData>,
119 pov: Arc<PoV>,
120 priority: Priority,
121 exec_kind: PvfExecKind,
122 result_tx: ResultSender,
123 ) -> Result<(), String> {
124 self.to_host_tx
125 .send(ToHost::ExecutePvf(ExecutePvfInputs {
126 pvf,
127 exec_timeout,
128 pvd,
129 pov,
130 priority,
131 exec_kind,
132 result_tx,
133 }))
134 .await
135 .map_err(|_| "the inner loop hung up".to_string())
136 }
137
138 pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
145 self.to_host_tx
146 .send(ToHost::HeadsUp { active_pvfs })
147 .await
148 .map_err(|_| "the inner loop hung up".to_string())
149 }
150
151 pub async fn update_active_leaves(
155 &mut self,
156 update: ActiveLeavesUpdate,
157 ancestors: Vec<Hash>,
158 ) -> Result<(), String> {
159 self.to_host_tx
160 .send(ToHost::UpdateActiveLeaves { update, ancestors })
161 .await
162 .map_err(|_| "the inner loop hung up".to_string())
163 }
164
165 #[cfg(feature = "test-utils")]
169 pub async fn replace_artifact_checksum(
170 &mut self,
171 checksum: ArtifactChecksum,
172 new_checksum: ArtifactChecksum,
173 ) -> Result<(), String> {
174 self.to_host_tx
175 .send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum })
176 .await
177 .map_err(|_| "the inner loop hung up".to_string())
178 }
179}
180
181enum ToHost {
182 PrecheckPvf {
183 pvf: PvfPrepData,
184 result_tx: PrecheckResultSender,
185 },
186 ExecutePvf(ExecutePvfInputs),
187 HeadsUp {
188 active_pvfs: Vec<PvfPrepData>,
189 },
190 UpdateActiveLeaves {
191 update: ActiveLeavesUpdate,
192 ancestors: Vec<Hash>,
193 },
194 #[cfg(feature = "test-utils")]
195 ReplaceArtifactChecksum {
196 checksum: ArtifactChecksum,
197 new_checksum: ArtifactChecksum,
198 },
199}
200
201struct ExecutePvfInputs {
202 pvf: PvfPrepData,
203 exec_timeout: Duration,
204 pvd: Arc<PersistedValidationData>,
205 pov: Arc<PoV>,
206 priority: Priority,
207 exec_kind: PvfExecKind,
208 result_tx: ResultSender,
209}
210
211#[derive(Debug)]
213pub struct Config {
214 pub cache_path: PathBuf,
216 pub node_version: Option<String>,
218 pub secure_validator_mode: bool,
220
221 pub prepare_worker_program_path: PathBuf,
223 pub prepare_worker_spawn_timeout: Duration,
225 pub prepare_workers_soft_max_num: usize,
228 pub prepare_workers_hard_max_num: usize,
230
231 pub execute_worker_program_path: PathBuf,
233 pub execute_worker_spawn_timeout: Duration,
235 pub execute_workers_max_num: usize,
237}
238
239impl Config {
240 pub fn new(
242 cache_path: PathBuf,
243 node_version: Option<String>,
244 secure_validator_mode: bool,
245 prepare_worker_program_path: PathBuf,
246 execute_worker_program_path: PathBuf,
247 execute_workers_max_num: usize,
248 prepare_workers_soft_max_num: usize,
249 prepare_workers_hard_max_num: usize,
250 ) -> Self {
251 Self {
252 cache_path,
253 node_version,
254 secure_validator_mode,
255
256 prepare_worker_program_path,
257 prepare_worker_spawn_timeout: Duration::from_secs(3),
258 prepare_workers_soft_max_num,
259 prepare_workers_hard_max_num,
260
261 execute_worker_program_path,
262 execute_worker_spawn_timeout: Duration::from_secs(3),
263 execute_workers_max_num,
264 }
265 }
266}
267
268pub async fn start(
277 config: Config,
278 metrics: Metrics,
279) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
280 gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
281
282 let artifacts = Artifacts::new(&config.cache_path).await;
284
285 #[cfg(target_os = "linux")]
288 let security_status = match crate::security::check_security_status(&config).await {
289 Ok(ok) => ok,
290 Err(err) => return Err(SubsystemError::Context(err)),
291 };
292 #[cfg(not(target_os = "linux"))]
293 let security_status = if config.secure_validator_mode {
294 gum::error!(
295 target: LOG_TARGET,
296 "{}{}{}",
297 crate::SECURE_MODE_ERROR,
298 crate::SECURE_LINUX_NOTE,
299 crate::IGNORE_SECURE_MODE_TIP
300 );
301 return Err(SubsystemError::Context(
302 "could not enable Secure Validator Mode for non-Linux; check logs".into(),
303 ));
304 } else {
305 gum::warn!(
306 target: LOG_TARGET,
307 "{}{}",
308 crate::SECURE_MODE_WARNING,
309 crate::SECURE_LINUX_NOTE,
310 );
311 SecurityStatus::default()
312 };
313
314 let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
315
316 let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
317
318 let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
319 metrics.clone(),
320 config.prepare_worker_program_path.clone(),
321 config.cache_path.clone(),
322 config.prepare_worker_spawn_timeout,
323 config.node_version.clone(),
324 security_status.clone(),
325 );
326
327 let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
328 metrics.clone(),
329 config.prepare_workers_soft_max_num,
330 config.prepare_workers_hard_max_num,
331 config.cache_path.clone(),
332 to_prepare_pool,
333 from_prepare_pool,
334 );
335
336 let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
337 metrics,
338 config.execute_worker_program_path.to_owned(),
339 config.cache_path.clone(),
340 config.execute_workers_max_num,
341 config.execute_worker_spawn_timeout,
342 config.node_version,
343 security_status,
344 );
345
346 let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
347 let run_sweeper = sweeper_task(to_sweeper_rx);
348
349 let run_host = async move {
350 run(Inner {
351 cleanup_pulse_interval: Duration::from_secs(3600),
352 cleanup_config: ArtifactsCleanupConfig::default(),
353 artifacts,
354 to_host_rx,
355 to_prepare_queue_tx,
356 from_prepare_queue_rx,
357 to_execute_queue_tx,
358 from_execute_queue_rx,
359 to_sweeper_tx,
360 awaiting_prepare: AwaitingPrepare::default(),
361 })
362 .await
363 };
364
365 let task = async move {
366 futures::select! {
368 _ = run_host.fuse() => {},
369 _ = run_prepare_queue.fuse() => {},
370 _ = run_prepare_pool.fuse() => {},
371 _ = run_execute_queue.fuse() => {},
372 _ = run_sweeper.fuse() => {},
373 };
374 };
375
376 Ok((validation_host, task))
377}
378
379#[derive(Default)]
382struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
383
384impl AwaitingPrepare {
385 fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
386 self.0.entry(artifact_id).or_default().push(pending_execution_request);
387 }
388
389 fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
390 self.0.remove(artifact_id).unwrap_or_default()
391 }
392}
393
394struct Inner {
395 cleanup_pulse_interval: Duration,
396 cleanup_config: ArtifactsCleanupConfig,
397 artifacts: Artifacts,
398
399 to_host_rx: mpsc::Receiver<ToHost>,
400
401 to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
402 from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
403
404 to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
405 from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
406
407 to_sweeper_tx: mpsc::Sender<PathBuf>,
408
409 awaiting_prepare: AwaitingPrepare,
410}
411
412#[derive(Debug)]
413struct Fatal;
414
415async fn run(
416 Inner {
417 cleanup_pulse_interval,
418 cleanup_config,
419 mut artifacts,
420 to_host_rx,
421 from_prepare_queue_rx,
422 mut to_prepare_queue_tx,
423 from_execute_queue_rx,
424 mut to_execute_queue_tx,
425 mut to_sweeper_tx,
426 mut awaiting_prepare,
427 }: Inner,
428) {
429 macro_rules! break_if_fatal {
430 ($expr:expr) => {
431 match $expr {
432 Err(Fatal) => {
433 gum::error!(
434 target: LOG_TARGET,
435 "Fatal error occurred, terminating the host. Line: {}",
436 line!(),
437 );
438 break
439 },
440 Ok(v) => v,
441 }
442 };
443 }
444
445 let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
446 futures::pin_mut!(cleanup_pulse);
447
448 let mut to_host_rx = to_host_rx.fuse();
449 let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
450 let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
451
452 loop {
453 futures::select_biased! {
455 from_execute_queue_rx = from_execute_queue_rx.next() => {
456 let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
457 let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
458 break_if_fatal!(handle_artifact_removal(
459 &mut to_sweeper_tx,
460 &mut artifacts,
461 artifact,
462 reply_to,
463 ).await);
464 },
465 () = cleanup_pulse.select_next_some() => {
466 break_if_fatal!(handle_cleanup_pulse(
472 &mut to_sweeper_tx,
473 &mut artifacts,
474 &cleanup_config,
475 ).await);
476 },
477 to_host = to_host_rx.next() => {
478 let to_host = match to_host {
479 None => {
480 break;
483 },
484 Some(to_host) => to_host,
485 };
486
487 break_if_fatal!(handle_to_host(
490 &mut artifacts,
491 &mut to_prepare_queue_tx,
492 &mut to_execute_queue_tx,
493 &mut awaiting_prepare,
494 to_host,
495 )
496 .await);
497 },
498 from_prepare_queue = from_prepare_queue_rx.next() => {
499 let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
500
501 break_if_fatal!(handle_prepare_done(
511 &mut artifacts,
512 &mut to_execute_queue_tx,
513 &mut awaiting_prepare,
514 from_queue,
515 ).await);
516 },
517 }
518 }
519}
520
521async fn handle_to_host(
522 artifacts: &mut Artifacts,
523 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
524 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
525 awaiting_prepare: &mut AwaitingPrepare,
526 to_host: ToHost,
527) -> Result<(), Fatal> {
528 match to_host {
529 ToHost::PrecheckPvf { pvf, result_tx } => {
530 handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
531 },
532 ToHost::ExecutePvf(inputs) => {
533 handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
534 .await?;
535 },
536 ToHost::HeadsUp { active_pvfs } =>
537 handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
538 ToHost::UpdateActiveLeaves { update, ancestors } =>
539 handle_update_active_leaves(execute_queue, update, ancestors).await?,
540 #[cfg(feature = "test-utils")]
541 ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => {
542 artifacts.replace_artifact_checksum(checksum, new_checksum);
543 },
544 }
545
546 Ok(())
547}
548
549async fn handle_precheck_pvf(
557 artifacts: &mut Artifacts,
558 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
559 pvf: PvfPrepData,
560 result_sender: PrecheckResultSender,
561) -> Result<(), Fatal> {
562 let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
563
564 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
565 match state {
566 ArtifactState::Prepared { last_time_needed, .. } => {
567 *last_time_needed = SystemTime::now();
568 let _ = result_sender.send(Ok(()));
569 },
570 ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
571 waiting_for_response.push(result_sender),
572 ArtifactState::FailedToProcess { error, .. } => {
573 let _ = result_sender.send(PrecheckResult::Err(error.clone()));
575 },
576 }
577 } else {
578 artifacts.insert_preparing(artifact_id, vec![result_sender]);
579 send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
580 .await?;
581 }
582 Ok(())
583}
584
585async fn handle_execute_pvf(
598 artifacts: &mut Artifacts,
599 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
600 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
601 awaiting_prepare: &mut AwaitingPrepare,
602 inputs: ExecutePvfInputs,
603) -> Result<(), Fatal> {
604 let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, exec_kind, result_tx } = inputs;
605 let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
606 let executor_params = (*pvf.executor_params()).clone();
607
608 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
609 match state {
610 ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => {
611 let file_metadata = std::fs::metadata(path);
612
613 if file_metadata.is_ok() {
614 *last_time_needed = SystemTime::now();
615
616 send_execute(
618 execute_queue,
619 execute::ToQueue::Enqueue {
620 artifact: ArtifactPathId::new(artifact_id, path, *checksum),
621 pending_execution_request: PendingExecutionRequest {
622 exec_timeout,
623 pvd,
624 pov,
625 executor_params,
626 exec_kind,
627 result_tx,
628 },
629 },
630 )
631 .await?;
632 } else {
633 gum::warn!(
634 target: LOG_TARGET,
635 ?pvf,
636 ?artifact_id,
637 "handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
638 );
639
640 *state = ArtifactState::Preparing {
643 waiting_for_response: Vec::new(),
644 num_failures: 0,
645 };
646 enqueue_prepare_for_execute(
647 prepare_queue,
648 awaiting_prepare,
649 pvf,
650 priority,
651 artifact_id,
652 PendingExecutionRequest {
653 exec_timeout,
654 pvd,
655 pov,
656 executor_params,
657 exec_kind,
658 result_tx,
659 },
660 )
661 .await?;
662 }
663 },
664 ArtifactState::Preparing { .. } => {
665 awaiting_prepare.add(
666 artifact_id,
667 PendingExecutionRequest {
668 exec_timeout,
669 pvd,
670 pov,
671 executor_params,
672 result_tx,
673 exec_kind,
674 },
675 );
676 },
677 ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
678 if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
679 gum::warn!(
680 target: LOG_TARGET,
681 ?pvf,
682 ?artifact_id,
683 ?last_time_failed,
684 %num_failures,
685 %error,
686 "handle_execute_pvf: Re-trying failed PVF preparation."
687 );
688
689 *state = ArtifactState::Preparing {
692 waiting_for_response: Vec::new(),
693 num_failures: *num_failures,
694 };
695 enqueue_prepare_for_execute(
696 prepare_queue,
697 awaiting_prepare,
698 pvf,
699 priority,
700 artifact_id,
701 PendingExecutionRequest {
702 exec_timeout,
703 pvd,
704 pov,
705 executor_params,
706 exec_kind,
707 result_tx,
708 },
709 )
710 .await?;
711 } else {
712 let _ = result_tx.send(Err(ValidationError::from(error.clone())));
713 }
714 },
715 }
716 } else {
717 artifacts.insert_preparing(artifact_id.clone(), Vec::new());
720 enqueue_prepare_for_execute(
721 prepare_queue,
722 awaiting_prepare,
723 pvf,
724 priority,
725 artifact_id,
726 PendingExecutionRequest {
727 exec_timeout,
728 pvd,
729 pov,
730 executor_params,
731 result_tx,
732 exec_kind,
733 },
734 )
735 .await?;
736 }
737
738 Ok(())
739}
740
741async fn handle_heads_up(
742 artifacts: &mut Artifacts,
743 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
744 active_pvfs: Vec<PvfPrepData>,
745) -> Result<(), Fatal> {
746 let now = SystemTime::now();
747
748 for active_pvf in active_pvfs {
749 let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
750 if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
751 match state {
752 ArtifactState::Prepared { last_time_needed, .. } => {
753 *last_time_needed = now;
754 },
755 ArtifactState::Preparing { .. } => {
756 },
758 ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
759 if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
760 gum::warn!(
761 target: LOG_TARGET,
762 ?active_pvf,
763 ?artifact_id,
764 ?last_time_failed,
765 %num_failures,
766 %error,
767 "handle_heads_up: Re-trying failed PVF preparation."
768 );
769
770 *state = ArtifactState::Preparing {
773 waiting_for_response: vec![],
774 num_failures: *num_failures,
775 };
776 send_prepare(
777 prepare_queue,
778 prepare::ToQueue::Enqueue {
779 priority: Priority::Normal,
780 pvf: active_pvf,
781 },
782 )
783 .await?;
784 }
785 },
786 }
787 } else {
788 artifacts.insert_preparing(artifact_id.clone(), Vec::new());
790
791 send_prepare(
792 prepare_queue,
793 prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
794 )
795 .await?;
796 }
797 }
798
799 Ok(())
800}
801
802async fn handle_prepare_done(
803 artifacts: &mut Artifacts,
804 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
805 awaiting_prepare: &mut AwaitingPrepare,
806 from_queue: prepare::FromQueue,
807) -> Result<(), Fatal> {
808 let prepare::FromQueue { artifact_id, result } = from_queue;
809
810 let state = match artifacts.artifact_state_mut(&artifact_id) {
812 None => {
813 never!("an unknown artifact was prepared: {:?}", artifact_id);
819 return Ok(())
820 },
821 Some(ArtifactState::Prepared { .. }) => {
822 never!("the artifact is already prepared: {:?}", artifact_id);
828 return Ok(())
829 },
830 Some(ArtifactState::FailedToProcess { .. }) => {
831 never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
834 return Ok(())
835 },
836 Some(state @ ArtifactState::Preparing { .. }) => state,
837 };
838
839 let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
840 state
841 {
842 for result_sender in waiting_for_response.drain(..) {
843 let result = result.clone().map(|_| ());
844 let _ = result_sender.send(result);
845 }
846 num_failures
847 } else {
848 never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
849 return Ok(())
850 };
851
852 let pending_requests = awaiting_prepare.take(&artifact_id);
855 for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } in
856 pending_requests
857 {
858 if result_tx.is_canceled() {
859 continue
862 }
863
864 let (path, checksum) = match &result {
865 Ok(success) => (success.path.clone(), success.checksum),
866 Err(error) => {
867 let _ = result_tx.send(Err(ValidationError::from(error.clone())));
868 continue
869 },
870 };
871
872 send_execute(
873 execute_queue,
874 execute::ToQueue::Enqueue {
875 artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum),
876 pending_execution_request: PendingExecutionRequest {
877 exec_timeout,
878 pvd,
879 pov,
880 executor_params,
881 exec_kind,
882 result_tx,
883 },
884 },
885 )
886 .await?;
887 }
888
889 *state = match result {
890 Ok(PrepareSuccess { checksum, path, size, .. }) =>
891 ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size },
892 Err(error) => {
893 let last_time_failed = SystemTime::now();
894 let num_failures = *num_failures + 1;
895
896 gum::error!(
897 target: LOG_TARGET,
898 ?artifact_id,
899 time_failed = ?last_time_failed,
900 %num_failures,
901 "artifact preparation failed: {}",
902 error
903 );
904 ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
905 },
906 };
907
908 Ok(())
909}
910
911async fn handle_update_active_leaves(
912 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
913 update: ActiveLeavesUpdate,
914 ancestors: Vec<Hash>,
915) -> Result<(), Fatal> {
916 send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
917}
918
919async fn send_prepare(
920 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
921 to_queue: prepare::ToQueue,
922) -> Result<(), Fatal> {
923 prepare_queue.send(to_queue).await.map_err(|_| Fatal)
924}
925
926async fn send_execute(
927 execute_queue: &mut mpsc::Sender<execute::ToQueue>,
928 to_queue: execute::ToQueue,
929) -> Result<(), Fatal> {
930 execute_queue.send(to_queue).await.map_err(|_| Fatal)
931}
932
933async fn enqueue_prepare_for_execute(
936 prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
937 awaiting_prepare: &mut AwaitingPrepare,
938 pvf: PvfPrepData,
939 priority: Priority,
940 artifact_id: ArtifactId,
941 pending_execution_request: PendingExecutionRequest,
942) -> Result<(), Fatal> {
943 send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
944
945 awaiting_prepare.add(artifact_id, pending_execution_request);
947
948 Ok(())
949}
950
951async fn handle_cleanup_pulse(
952 sweeper_tx: &mut mpsc::Sender<PathBuf>,
953 artifacts: &mut Artifacts,
954 cleanup_config: &ArtifactsCleanupConfig,
955) -> Result<(), Fatal> {
956 let to_remove = artifacts.prune(cleanup_config);
957 gum::debug!(
958 target: LOG_TARGET,
959 "PVF pruning: {} artifacts reached their end of life",
960 to_remove.len(),
961 );
962 for (artifact_id, path) in to_remove {
963 gum::debug!(
964 target: LOG_TARGET,
965 validation_code_hash = ?artifact_id.code_hash,
966 "pruning artifact",
967 );
968 sweeper_tx.send(path).await.map_err(|_| Fatal)?;
969 }
970
971 Ok(())
972}
973
974async fn handle_artifact_removal(
975 sweeper_tx: &mut mpsc::Sender<PathBuf>,
976 artifacts: &mut Artifacts,
977 artifact_id: ArtifactId,
978 reply_to: oneshot::Sender<()>,
979) -> Result<(), Fatal> {
980 let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
981 artifact
982 } else {
983 return Ok(());
988 };
989 reply_to
990 .send(())
991 .expect("the execute queue waits for the artifact remove confirmation; qed");
992 gum::debug!(
997 target: LOG_TARGET,
998 validation_code_hash = ?artifact_id.code_hash,
999 "PVF pruning: pruning artifact by request from the execute queue",
1000 );
1001 sweeper_tx.send(path).await.map_err(|_| Fatal)?;
1002 Ok(())
1003}
1004
1005async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
1007 loop {
1008 match sweeper_rx.next().await {
1009 None => break,
1010 Some(condemned) => {
1011 let result = tokio::fs::remove_file(&condemned).await;
1012 gum::trace!(
1013 target: LOG_TARGET,
1014 ?result,
1015 "Swept the artifact file {}",
1016 condemned.display(),
1017 );
1018 },
1019 }
1020 }
1021}
1022
1023fn can_retry_prepare_after_failure(
1025 last_time_failed: SystemTime,
1026 num_failures: u32,
1027 error: &PrepareError,
1028) -> bool {
1029 if error.is_deterministic() {
1030 return false
1032 }
1033
1034 SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
1037 num_failures <= NUM_PREPARE_RETRIES
1038}
1039
1040fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
1042 futures::stream::unfold(interval, {
1043 |interval| async move {
1044 futures_timer::Delay::new(interval).await;
1045 Some(((), interval))
1046 }
1047 })
1048 .map(|_| ())
1049}
1050
1051#[cfg(test)]
1052pub(crate) mod tests {
1053 use super::*;
1054 use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
1055 use assert_matches::assert_matches;
1056 use futures::future::BoxFuture;
1057 use polkadot_node_primitives::BlockData;
1058 use sp_core::H256;
1059
1060 const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
1061 pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
1062
1063 #[tokio::test]
1064 async fn pulse_test() {
1065 let pulse = pulse_every(Duration::from_millis(100));
1066 futures::pin_mut!(pulse);
1067
1068 for _ in 0..5 {
1069 let start = std::time::Instant::now();
1070 let _ = pulse.next().await.unwrap();
1071
1072 let el = start.elapsed().as_millis();
1073 assert!(el > 50 && el < 150, "pulse duration: {}", el);
1074 }
1075 }
1076
1077 struct Builder {
1078 cleanup_pulse_interval: Duration,
1079 cleanup_config: ArtifactsCleanupConfig,
1080 artifacts: Artifacts,
1081 }
1082
1083 impl Builder {
1084 fn default() -> Self {
1085 Self {
1086 cleanup_pulse_interval: Duration::from_secs(3600),
1088 cleanup_config: ArtifactsCleanupConfig::default(),
1089 artifacts: Artifacts::empty(),
1090 }
1091 }
1092
1093 fn build(self) -> Test {
1094 Test::new(self)
1095 }
1096 }
1097
1098 struct Test {
1099 to_host_tx: Option<mpsc::Sender<ToHost>>,
1100
1101 to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
1102 from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
1103 to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
1104 #[allow(unused)]
1105 from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
1106 to_sweeper_rx: mpsc::Receiver<PathBuf>,
1107
1108 run: BoxFuture<'static, ()>,
1109 }
1110
1111 impl Test {
1112 fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
1113 let (to_host_tx, to_host_rx) = mpsc::channel(10);
1114 let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
1115 let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
1116 let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
1117 let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
1118 let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
1119
1120 let run = run(Inner {
1121 cleanup_pulse_interval,
1122 cleanup_config,
1123 artifacts,
1124 to_host_rx,
1125 to_prepare_queue_tx,
1126 from_prepare_queue_rx,
1127 to_execute_queue_tx,
1128 from_execute_queue_rx,
1129 to_sweeper_tx,
1130 awaiting_prepare: AwaitingPrepare::default(),
1131 })
1132 .boxed();
1133
1134 Self {
1135 to_host_tx: Some(to_host_tx),
1136 to_prepare_queue_rx,
1137 from_prepare_queue_tx,
1138 to_execute_queue_rx,
1139 from_execute_queue_tx,
1140 to_sweeper_rx,
1141 run,
1142 }
1143 }
1144
1145 fn host_handle(&mut self) -> ValidationHost {
1146 let to_host_tx = self.to_host_tx.take().unwrap();
1147 let security_status = Default::default();
1148 ValidationHost { to_host_tx, security_status }
1149 }
1150
1151 async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
1152 where
1153 T: Send,
1154 {
1155 run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
1156 }
1157
1158 async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
1159 let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1160 run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
1161 .await
1162 }
1163
1164 async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
1165 let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1166 run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
1167 .await
1168 }
1169
1170 async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
1171 use futures_timer::Delay;
1172
1173 let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
1174 run_until(
1175 &mut self.run,
1176 async {
1177 futures::select! {
1178 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1179 _ = to_prepare_queue_rx.next().fuse() => {
1180 panic!("the prepare queue is supposed to be empty")
1181 }
1182 }
1183 }
1184 .boxed(),
1185 )
1186 .await
1187 }
1188
1189 async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
1190 use futures_timer::Delay;
1191
1192 let to_execute_queue_rx = &mut self.to_execute_queue_rx;
1193 run_until(
1194 &mut self.run,
1195 async {
1196 futures::select! {
1197 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1198 _ = to_execute_queue_rx.next().fuse() => {
1199 panic!("the execute queue is supposed to be empty")
1200 }
1201 }
1202 }
1203 .boxed(),
1204 )
1205 .await
1206 }
1207
1208 async fn poll_ensure_to_sweeper_is_empty(&mut self) {
1209 use futures_timer::Delay;
1210
1211 let to_sweeper_rx = &mut self.to_sweeper_rx;
1212 run_until(
1213 &mut self.run,
1214 async {
1215 futures::select! {
1216 _ = Delay::new(Duration::from_millis(500)).fuse() => (),
1217 msg = to_sweeper_rx.next().fuse() => {
1218 panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
1219 }
1220 }
1221 }
1222 .boxed(),
1223 )
1224 .await
1225 }
1226 }
1227
1228 async fn run_until<R>(
1229 task: &mut (impl Future<Output = ()> + Unpin),
1230 mut fut: (impl Future<Output = R> + Unpin),
1231 ) -> R {
1232 use std::task::Poll;
1233
1234 let start = std::time::Instant::now();
1235 let fut = &mut fut;
1236 loop {
1237 if start.elapsed() > std::time::Duration::from_secs(2) {
1238 panic!("timeout");
1241 }
1242
1243 if let Poll::Ready(r) = futures::poll!(&mut *fut) {
1244 break r
1245 }
1246
1247 if futures::poll!(&mut *task).is_ready() {
1248 panic!()
1249 }
1250 }
1251 }
1252
1253 #[tokio::test]
1254 async fn shutdown_on_handle_drop() {
1255 let test = Builder::default().build();
1256
1257 let join_handle = tokio::task::spawn(test.run);
1258
1259 drop(test.to_host_tx);
1262 join_handle.await.unwrap();
1263 }
1264
1265 #[tokio::test]
1266 async fn pruning() {
1267 let mock_now = SystemTime::now() - Duration::from_millis(1000);
1268 let tempdir = tempfile::tempdir().unwrap();
1269 let cache_path = tempdir.path();
1270
1271 let mut builder = Builder::default();
1272 builder.cleanup_pulse_interval = Duration::from_millis(100);
1273 builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
1274 let path1 = generate_artifact_path(cache_path);
1275 let path2 = generate_artifact_path(cache_path);
1276 builder.artifacts.insert_prepared(
1277 artifact_id(1),
1278 path1.clone(),
1279 Default::default(),
1280 mock_now,
1281 1024,
1282 );
1283 builder.artifacts.insert_prepared(
1284 artifact_id(2),
1285 path2.clone(),
1286 Default::default(),
1287 mock_now,
1288 1024,
1289 );
1290 let mut test = builder.build();
1291 let mut host = test.host_handle();
1292
1293 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1294
1295 let to_sweeper_rx = &mut test.to_sweeper_rx;
1296 run_until(
1297 &mut test.run,
1298 async {
1299 assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
1300 }
1301 .boxed(),
1302 )
1303 .await;
1304
1305 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1308 test.poll_ensure_to_sweeper_is_empty().await;
1309 }
1310
1311 #[tokio::test]
1312 async fn execute_pvf_requests() {
1313 let mut test = Builder::default().build();
1314 let mut host = test.host_handle();
1315 let pvd = Arc::new(PersistedValidationData {
1316 parent_head: Default::default(),
1317 relay_parent_number: 1u32,
1318 relay_parent_storage_root: H256::default(),
1319 max_pov_size: 4096 * 1024,
1320 });
1321 let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
1322 let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
1323
1324 let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
1325 host.execute_pvf(
1326 PvfPrepData::from_discriminator(1),
1327 TEST_EXECUTION_TIMEOUT,
1328 pvd.clone(),
1329 pov1.clone(),
1330 Priority::Normal,
1331 PvfExecKind::Backing(H256::default()),
1332 result_tx,
1333 )
1334 .await
1335 .unwrap();
1336
1337 let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
1338 host.execute_pvf(
1339 PvfPrepData::from_discriminator(1),
1340 TEST_EXECUTION_TIMEOUT,
1341 pvd.clone(),
1342 pov1,
1343 Priority::Critical,
1344 PvfExecKind::Backing(H256::default()),
1345 result_tx,
1346 )
1347 .await
1348 .unwrap();
1349
1350 let (result_tx, result_rx_pvf_2) = oneshot::channel();
1351 host.execute_pvf(
1352 PvfPrepData::from_discriminator(2),
1353 TEST_EXECUTION_TIMEOUT,
1354 pvd,
1355 pov2,
1356 Priority::Normal,
1357 PvfExecKind::Backing(H256::default()),
1358 result_tx,
1359 )
1360 .await
1361 .unwrap();
1362
1363 assert_matches!(
1364 test.poll_and_recv_to_prepare_queue().await,
1365 prepare::ToQueue::Enqueue { .. }
1366 );
1367 assert_matches!(
1368 test.poll_and_recv_to_prepare_queue().await,
1369 prepare::ToQueue::Enqueue { .. }
1370 );
1371
1372 test.from_prepare_queue_tx
1373 .send(prepare::FromQueue {
1374 artifact_id: artifact_id(1),
1375 result: Ok(PrepareSuccess::default()),
1376 })
1377 .await
1378 .unwrap();
1379 let result_tx_pvf_1_1 = assert_matches!(
1380 test.poll_and_recv_to_execute_queue().await,
1381 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1382 );
1383 let result_tx_pvf_1_2 = assert_matches!(
1384 test.poll_and_recv_to_execute_queue().await,
1385 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1386 );
1387
1388 test.from_prepare_queue_tx
1389 .send(prepare::FromQueue {
1390 artifact_id: artifact_id(2),
1391 result: Ok(PrepareSuccess::default()),
1392 })
1393 .await
1394 .unwrap();
1395 let result_tx_pvf_2 = assert_matches!(
1396 test.poll_and_recv_to_execute_queue().await,
1397 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1398 );
1399
1400 result_tx_pvf_1_1
1401 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1402 .unwrap();
1403 assert_matches!(
1404 result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
1405 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1406 );
1407
1408 result_tx_pvf_1_2
1409 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1410 .unwrap();
1411 assert_matches!(
1412 result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
1413 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1414 );
1415
1416 result_tx_pvf_2
1417 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1418 .unwrap();
1419 assert_matches!(
1420 result_rx_pvf_2.now_or_never().unwrap().unwrap(),
1421 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn precheck_pvf() {
1427 let mut test = Builder::default().build();
1428 let mut host = test.host_handle();
1429
1430 let (result_tx, result_rx) = oneshot::channel();
1432 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1433 .await
1434 .unwrap();
1435
1436 assert_matches!(
1438 test.poll_and_recv_to_prepare_queue().await,
1439 prepare::ToQueue::Enqueue { .. }
1440 );
1441 test.from_prepare_queue_tx
1443 .send(prepare::FromQueue {
1444 artifact_id: artifact_id(1),
1445 result: Ok(PrepareSuccess::default()),
1446 })
1447 .await
1448 .unwrap();
1449 test.poll_ensure_to_execute_queue_is_empty().await;
1451 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1453
1454 let mut precheck_receivers = Vec::new();
1456 for _ in 0..3 {
1457 let (result_tx, result_rx) = oneshot::channel();
1458 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1459 .await
1460 .unwrap();
1461 precheck_receivers.push(result_rx);
1462 }
1463 assert_matches!(
1465 test.poll_and_recv_to_prepare_queue().await,
1466 prepare::ToQueue::Enqueue { .. }
1467 );
1468 test.from_prepare_queue_tx
1469 .send(prepare::FromQueue {
1470 artifact_id: artifact_id(2),
1471 result: Err(PrepareError::TimedOut),
1472 })
1473 .await
1474 .unwrap();
1475 test.poll_ensure_to_execute_queue_is_empty().await;
1476 for result_rx in precheck_receivers {
1477 assert_matches!(
1478 result_rx.now_or_never().unwrap().unwrap(),
1479 Err(PrepareError::TimedOut)
1480 );
1481 }
1482 }
1483
1484 #[tokio::test]
1485 async fn test_prepare_done() {
1486 let mut test = Builder::default().build();
1487 let mut host = test.host_handle();
1488 let pvd = Arc::new(PersistedValidationData {
1489 parent_head: Default::default(),
1490 relay_parent_number: 1u32,
1491 relay_parent_storage_root: H256::default(),
1492 max_pov_size: 4096 * 1024,
1493 });
1494 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1495
1496 let (result_tx, result_rx_execute) = oneshot::channel();
1501 host.execute_pvf(
1502 PvfPrepData::from_discriminator(1),
1503 TEST_EXECUTION_TIMEOUT,
1504 pvd.clone(),
1505 pov.clone(),
1506 Priority::Critical,
1507 PvfExecKind::Backing(H256::default()),
1508 result_tx,
1509 )
1510 .await
1511 .unwrap();
1512
1513 assert_matches!(
1514 test.poll_and_recv_to_prepare_queue().await,
1515 prepare::ToQueue::Enqueue { .. }
1516 );
1517
1518 let (result_tx, result_rx) = oneshot::channel();
1519 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1520 .await
1521 .unwrap();
1522
1523 test.from_prepare_queue_tx
1526 .send(prepare::FromQueue {
1527 artifact_id: artifact_id(1),
1528 result: Err(PrepareError::TimedOut),
1529 })
1530 .await
1531 .unwrap();
1532 test.poll_ensure_to_execute_queue_is_empty().await;
1533 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
1534 assert_matches!(
1535 result_rx_execute.now_or_never().unwrap().unwrap(),
1536 Err(ValidationError::Internal(_))
1537 );
1538
1539 let mut precheck_receivers = Vec::new();
1541 for _ in 0..3 {
1542 let (result_tx, result_rx) = oneshot::channel();
1543 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
1544 .await
1545 .unwrap();
1546 precheck_receivers.push(result_rx);
1547 }
1548
1549 let (result_tx, _result_rx_execute) = oneshot::channel();
1550 host.execute_pvf(
1551 PvfPrepData::from_discriminator(2),
1552 TEST_EXECUTION_TIMEOUT,
1553 pvd,
1554 pov,
1555 Priority::Critical,
1556 PvfExecKind::Backing(H256::default()),
1557 result_tx,
1558 )
1559 .await
1560 .unwrap();
1561 assert_matches!(
1563 test.poll_and_recv_to_prepare_queue().await,
1564 prepare::ToQueue::Enqueue { .. }
1565 );
1566 test.from_prepare_queue_tx
1567 .send(prepare::FromQueue {
1568 artifact_id: artifact_id(2),
1569 result: Ok(PrepareSuccess::default()),
1570 })
1571 .await
1572 .unwrap();
1573 assert_matches!(
1576 test.poll_and_recv_to_execute_queue().await,
1577 execute::ToQueue::Enqueue { .. }
1578 );
1579 for result_rx in precheck_receivers {
1580 assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
1581 }
1582 }
1583
1584 #[tokio::test]
1587 async fn test_precheck_prepare_no_retry() {
1588 let mut test = Builder::default().build();
1589 let mut host = test.host_handle();
1590
1591 let (result_tx, result_rx) = oneshot::channel();
1593 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
1594 .await
1595 .unwrap();
1596
1597 assert_matches!(
1599 test.poll_and_recv_to_prepare_queue().await,
1600 prepare::ToQueue::Enqueue { .. }
1601 );
1602 test.from_prepare_queue_tx
1604 .send(prepare::FromQueue {
1605 artifact_id: artifact_id(1),
1606 result: Err(PrepareError::TimedOut),
1607 })
1608 .await
1609 .unwrap();
1610
1611 let result = test.poll_and_recv_result(result_rx).await;
1613 assert_matches!(result, Err(PrepareError::TimedOut));
1614
1615 let (result_tx_2, result_rx_2) = oneshot::channel();
1617 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
1618 .await
1619 .unwrap();
1620
1621 test.poll_ensure_to_prepare_queue_is_empty().await;
1623
1624 let result = test.poll_and_recv_result(result_rx_2).await;
1626 assert_matches!(result, Err(PrepareError::TimedOut));
1627
1628 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1630
1631 let (result_tx_3, result_rx_3) = oneshot::channel();
1633 host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
1634 .await
1635 .unwrap();
1636
1637 test.poll_ensure_to_prepare_queue_is_empty().await;
1639
1640 let result = test.poll_and_recv_result(result_rx_3).await;
1642 assert_matches!(result, Err(PrepareError::TimedOut));
1643 }
1644
1645 #[tokio::test]
1648 async fn test_execute_prepare_retry() {
1649 let mut test = Builder::default().build();
1650 let mut host = test.host_handle();
1651 let pvd = Arc::new(PersistedValidationData {
1652 parent_head: Default::default(),
1653 relay_parent_number: 1u32,
1654 relay_parent_storage_root: H256::default(),
1655 max_pov_size: 4096 * 1024,
1656 });
1657 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1658
1659 let (result_tx, result_rx) = oneshot::channel();
1661 host.execute_pvf(
1662 PvfPrepData::from_discriminator(1),
1663 TEST_EXECUTION_TIMEOUT,
1664 pvd.clone(),
1665 pov.clone(),
1666 Priority::Critical,
1667 PvfExecKind::Backing(H256::default()),
1668 result_tx,
1669 )
1670 .await
1671 .unwrap();
1672
1673 assert_matches!(
1675 test.poll_and_recv_to_prepare_queue().await,
1676 prepare::ToQueue::Enqueue { .. }
1677 );
1678 test.from_prepare_queue_tx
1680 .send(prepare::FromQueue {
1681 artifact_id: artifact_id(1),
1682 result: Err(PrepareError::TimedOut),
1683 })
1684 .await
1685 .unwrap();
1686
1687 let result = test.poll_and_recv_result(result_rx).await;
1689 assert_matches!(result, Err(ValidationError::Internal(_)));
1690
1691 let (result_tx_2, result_rx_2) = oneshot::channel();
1693 host.execute_pvf(
1694 PvfPrepData::from_discriminator(1),
1695 TEST_EXECUTION_TIMEOUT,
1696 pvd.clone(),
1697 pov.clone(),
1698 Priority::Critical,
1699 PvfExecKind::Backing(H256::default()),
1700 result_tx_2,
1701 )
1702 .await
1703 .unwrap();
1704
1705 test.poll_ensure_to_prepare_queue_is_empty().await;
1707
1708 let result = test.poll_and_recv_result(result_rx_2).await;
1710 assert_matches!(result, Err(ValidationError::Internal(_)));
1711
1712 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1714
1715 let (result_tx_3, result_rx_3) = oneshot::channel();
1717 host.execute_pvf(
1718 PvfPrepData::from_discriminator(1),
1719 TEST_EXECUTION_TIMEOUT,
1720 pvd.clone(),
1721 pov.clone(),
1722 Priority::Critical,
1723 PvfExecKind::Backing(H256::default()),
1724 result_tx_3,
1725 )
1726 .await
1727 .unwrap();
1728
1729 assert_matches!(
1731 test.poll_and_recv_to_prepare_queue().await,
1732 prepare::ToQueue::Enqueue { .. }
1733 );
1734
1735 test.from_prepare_queue_tx
1736 .send(prepare::FromQueue {
1737 artifact_id: artifact_id(1),
1738 result: Ok(PrepareSuccess::default()),
1739 })
1740 .await
1741 .unwrap();
1742
1743 let result_tx_3 = assert_matches!(
1745 test.poll_and_recv_to_execute_queue().await,
1746 execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
1747 );
1748
1749 result_tx_3
1752 .send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
1753 .unwrap();
1754 assert_matches!(
1755 result_rx_3.now_or_never().unwrap().unwrap(),
1756 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
1757 );
1758 }
1759
1760 #[tokio::test]
1763 async fn test_execute_prepare_no_retry() {
1764 let mut test = Builder::default().build();
1765 let mut host = test.host_handle();
1766 let pvd = Arc::new(PersistedValidationData {
1767 parent_head: Default::default(),
1768 relay_parent_number: 1u32,
1769 relay_parent_storage_root: H256::default(),
1770 max_pov_size: 4096 * 1024,
1771 });
1772 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1773
1774 let (result_tx, result_rx) = oneshot::channel();
1776 host.execute_pvf(
1777 PvfPrepData::from_discriminator(1),
1778 TEST_EXECUTION_TIMEOUT,
1779 pvd.clone(),
1780 pov.clone(),
1781 Priority::Critical,
1782 PvfExecKind::Backing(H256::default()),
1783 result_tx,
1784 )
1785 .await
1786 .unwrap();
1787
1788 assert_matches!(
1790 test.poll_and_recv_to_prepare_queue().await,
1791 prepare::ToQueue::Enqueue { .. }
1792 );
1793 test.from_prepare_queue_tx
1795 .send(prepare::FromQueue {
1796 artifact_id: artifact_id(1),
1797 result: Err(PrepareError::Prevalidation("reproducible error".into())),
1798 })
1799 .await
1800 .unwrap();
1801
1802 let result = test.poll_and_recv_result(result_rx).await;
1804 assert_matches!(result, Err(ValidationError::Preparation(_)));
1805
1806 let (result_tx_2, result_rx_2) = oneshot::channel();
1808 host.execute_pvf(
1809 PvfPrepData::from_discriminator(1),
1810 TEST_EXECUTION_TIMEOUT,
1811 pvd.clone(),
1812 pov.clone(),
1813 Priority::Critical,
1814 PvfExecKind::Backing(H256::default()),
1815 result_tx_2,
1816 )
1817 .await
1818 .unwrap();
1819
1820 test.poll_ensure_to_prepare_queue_is_empty().await;
1822
1823 let result = test.poll_and_recv_result(result_rx_2).await;
1825 assert_matches!(result, Err(ValidationError::Preparation(_)));
1826
1827 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1829
1830 let (result_tx_3, result_rx_3) = oneshot::channel();
1832 host.execute_pvf(
1833 PvfPrepData::from_discriminator(1),
1834 TEST_EXECUTION_TIMEOUT,
1835 pvd.clone(),
1836 pov.clone(),
1837 Priority::Critical,
1838 PvfExecKind::Backing(H256::default()),
1839 result_tx_3,
1840 )
1841 .await
1842 .unwrap();
1843
1844 test.poll_ensure_to_prepare_queue_is_empty().await;
1846
1847 let result = test.poll_and_recv_result(result_rx_3).await;
1849 assert_matches!(result, Err(ValidationError::Preparation(_)));
1850 }
1851
1852 #[tokio::test]
1854 async fn test_heads_up_prepare_retry() {
1855 let mut test = Builder::default().build();
1856 let mut host = test.host_handle();
1857
1858 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1860
1861 assert_matches!(
1863 test.poll_and_recv_to_prepare_queue().await,
1864 prepare::ToQueue::Enqueue { .. }
1865 );
1866 test.from_prepare_queue_tx
1868 .send(prepare::FromQueue {
1869 artifact_id: artifact_id(1),
1870 result: Err(PrepareError::TimedOut),
1871 })
1872 .await
1873 .unwrap();
1874
1875 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1877
1878 test.poll_ensure_to_prepare_queue_is_empty().await;
1880
1881 futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
1883
1884 host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
1886
1887 assert_matches!(
1889 test.poll_and_recv_to_prepare_queue().await,
1890 prepare::ToQueue::Enqueue { .. }
1891 );
1892 }
1893
1894 #[tokio::test]
1895 async fn cancellation() {
1896 let mut test = Builder::default().build();
1897 let mut host = test.host_handle();
1898 let pvd = Arc::new(PersistedValidationData {
1899 parent_head: Default::default(),
1900 relay_parent_number: 1u32,
1901 relay_parent_storage_root: H256::default(),
1902 max_pov_size: 4096 * 1024,
1903 });
1904 let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
1905
1906 let (result_tx, result_rx) = oneshot::channel();
1907 host.execute_pvf(
1908 PvfPrepData::from_discriminator(1),
1909 TEST_EXECUTION_TIMEOUT,
1910 pvd,
1911 pov,
1912 Priority::Normal,
1913 PvfExecKind::Backing(H256::default()),
1914 result_tx,
1915 )
1916 .await
1917 .unwrap();
1918
1919 assert_matches!(
1920 test.poll_and_recv_to_prepare_queue().await,
1921 prepare::ToQueue::Enqueue { .. }
1922 );
1923
1924 test.from_prepare_queue_tx
1925 .send(prepare::FromQueue {
1926 artifact_id: artifact_id(1),
1927 result: Ok(PrepareSuccess::default()),
1928 })
1929 .await
1930 .unwrap();
1931
1932 drop(result_rx);
1933
1934 test.poll_ensure_to_execute_queue_is_empty().await;
1935 }
1936}