use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
execute::{self, PendingExecutionRequest},
metrics::Metrics,
prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_node_core_pvf_common::{
error::{PrecheckResult, PrepareError},
prepare::PrepareSuccess,
pvf::PvfPrepData,
};
use polkadot_node_primitives::PoV;
use polkadot_node_subsystem::{
messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::{Hash, PersistedValidationData};
use std::{
collections::HashMap,
path::PathBuf,
sync::Arc,
time::{Duration, SystemTime},
};
#[cfg(not(test))]
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
#[cfg(test)]
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
pub const NUM_PREPARE_RETRIES: u32 = 5;
pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
#[derive(Clone)]
pub struct ValidationHost {
to_host_tx: mpsc::Sender<ToHost>,
pub security_status: SecurityStatus,
}
impl ValidationHost {
pub async fn precheck_pvf(
&mut self,
pvf: PvfPrepData,
result_tx: PrecheckResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::PrecheckPvf { pvf, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
pub async fn execute_pvf(
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
priority: Priority,
exec_kind: PvfExecKind,
result_tx: ResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::ExecutePvf(ExecutePvfInputs {
pvf,
exec_timeout,
pvd,
pov,
priority,
exec_kind,
result_tx,
}))
.await
.map_err(|_| "the inner loop hung up".to_string())
}
pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
self.to_host_tx
.send(ToHost::HeadsUp { active_pvfs })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
pub async fn update_active_leaves(
&mut self,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::UpdateActiveLeaves { update, ancestors })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
}
enum ToHost {
PrecheckPvf { pvf: PvfPrepData, result_tx: PrecheckResultSender },
ExecutePvf(ExecutePvfInputs),
HeadsUp { active_pvfs: Vec<PvfPrepData> },
UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec<Hash> },
}
struct ExecutePvfInputs {
pvf: PvfPrepData,
exec_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
priority: Priority,
exec_kind: PvfExecKind,
result_tx: ResultSender,
}
#[derive(Debug)]
pub struct Config {
pub cache_path: PathBuf,
pub node_version: Option<String>,
pub secure_validator_mode: bool,
pub prepare_worker_program_path: PathBuf,
pub prepare_worker_spawn_timeout: Duration,
pub prepare_workers_soft_max_num: usize,
pub prepare_workers_hard_max_num: usize,
pub execute_worker_program_path: PathBuf,
pub execute_worker_spawn_timeout: Duration,
pub execute_workers_max_num: usize,
}
impl Config {
pub fn new(
cache_path: PathBuf,
node_version: Option<String>,
secure_validator_mode: bool,
prepare_worker_program_path: PathBuf,
execute_worker_program_path: PathBuf,
execute_workers_max_num: usize,
prepare_workers_soft_max_num: usize,
prepare_workers_hard_max_num: usize,
) -> Self {
Self {
cache_path,
node_version,
secure_validator_mode,
prepare_worker_program_path,
prepare_worker_spawn_timeout: Duration::from_secs(3),
prepare_workers_soft_max_num,
prepare_workers_hard_max_num,
execute_worker_program_path,
execute_worker_spawn_timeout: Duration::from_secs(3),
execute_workers_max_num,
}
}
}
pub async fn start(
config: Config,
metrics: Metrics,
) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
let artifacts = Artifacts::new(&config.cache_path).await;
#[cfg(target_os = "linux")]
let security_status = match crate::security::check_security_status(&config).await {
Ok(ok) => ok,
Err(err) => return Err(SubsystemError::Context(err)),
};
#[cfg(not(target_os = "linux"))]
let security_status = if config.secure_validator_mode {
gum::error!(
target: LOG_TARGET,
"{}{}{}",
crate::SECURE_MODE_ERROR,
crate::SECURE_LINUX_NOTE,
crate::IGNORE_SECURE_MODE_TIP
);
return Err(SubsystemError::Context(
"could not enable Secure Validator Mode for non-Linux; check logs".into(),
));
} else {
gum::warn!(
target: LOG_TARGET,
"{}{}",
crate::SECURE_MODE_WARNING,
crate::SECURE_LINUX_NOTE,
);
SecurityStatus::default()
};
let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };
let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
metrics.clone(),
config.prepare_worker_program_path.clone(),
config.cache_path.clone(),
config.prepare_worker_spawn_timeout,
config.node_version.clone(),
security_status.clone(),
);
let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
metrics.clone(),
config.prepare_workers_soft_max_num,
config.prepare_workers_hard_max_num,
config.cache_path.clone(),
to_prepare_pool,
from_prepare_pool,
);
let (to_execute_queue_tx, from_execute_queue_rx, run_execute_queue) = execute::start(
metrics,
config.execute_worker_program_path.to_owned(),
config.cache_path.clone(),
config.execute_workers_max_num,
config.execute_worker_spawn_timeout,
config.node_version,
security_status,
);
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
let run_sweeper = sweeper_task(to_sweeper_rx);
let run_host = async move {
run(Inner {
cleanup_pulse_interval: Duration::from_secs(3600),
cleanup_config: ArtifactsCleanupConfig::default(),
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
from_execute_queue_rx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
})
.await
};
let task = async move {
futures::select! {
_ = run_host.fuse() => {},
_ = run_prepare_queue.fuse() => {},
_ = run_prepare_pool.fuse() => {},
_ = run_execute_queue.fuse() => {},
_ = run_sweeper.fuse() => {},
};
};
Ok((validation_host, task))
}
#[derive(Default)]
struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
impl AwaitingPrepare {
fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
self.0.entry(artifact_id).or_default().push(pending_execution_request);
}
fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
self.0.remove(artifact_id).unwrap_or_default()
}
}
struct Inner {
cleanup_pulse_interval: Duration,
cleanup_config: ArtifactsCleanupConfig,
artifacts: Artifacts,
to_host_rx: mpsc::Receiver<ToHost>,
to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
from_execute_queue_rx: mpsc::UnboundedReceiver<execute::FromQueue>,
to_sweeper_tx: mpsc::Sender<PathBuf>,
awaiting_prepare: AwaitingPrepare,
}
#[derive(Debug)]
struct Fatal;
async fn run(
Inner {
cleanup_pulse_interval,
cleanup_config,
mut artifacts,
to_host_rx,
from_prepare_queue_rx,
mut to_prepare_queue_tx,
from_execute_queue_rx,
mut to_execute_queue_tx,
mut to_sweeper_tx,
mut awaiting_prepare,
}: Inner,
) {
macro_rules! break_if_fatal {
($expr:expr) => {
match $expr {
Err(Fatal) => {
gum::error!(
target: LOG_TARGET,
"Fatal error occurred, terminating the host. Line: {}",
line!(),
);
break
},
Ok(v) => v,
}
};
}
let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
futures::pin_mut!(cleanup_pulse);
let mut to_host_rx = to_host_rx.fuse();
let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
let mut from_execute_queue_rx = from_execute_queue_rx.fuse();
loop {
futures::select_biased! {
from_execute_queue_rx = from_execute_queue_rx.next() => {
let from_queue = break_if_fatal!(from_execute_queue_rx.ok_or(Fatal));
let execute::FromQueue::RemoveArtifact { artifact, reply_to } = from_queue;
break_if_fatal!(handle_artifact_removal(
&mut to_sweeper_tx,
&mut artifacts,
artifact,
reply_to,
).await);
},
() = cleanup_pulse.select_next_some() => {
break_if_fatal!(handle_cleanup_pulse(
&mut to_sweeper_tx,
&mut artifacts,
&cleanup_config,
).await);
},
to_host = to_host_rx.next() => {
let to_host = match to_host {
None => {
break;
},
Some(to_host) => to_host,
};
break_if_fatal!(handle_to_host(
&mut artifacts,
&mut to_prepare_queue_tx,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
to_host,
)
.await);
},
from_prepare_queue = from_prepare_queue_rx.next() => {
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
break_if_fatal!(handle_prepare_done(
&mut artifacts,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
from_queue,
).await);
},
}
}
}
async fn handle_to_host(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
to_host: ToHost,
) -> Result<(), Fatal> {
match to_host {
ToHost::PrecheckPvf { pvf, result_tx } => {
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
},
ToHost::ExecutePvf(inputs) => {
handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
.await?;
},
ToHost::HeadsUp { active_pvfs } =>
handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
ToHost::UpdateActiveLeaves { update, ancestors } =>
handle_update_active_leaves(execute_queue, update, ancestors).await?,
}
Ok(())
}
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
pvf: PvfPrepData,
result_sender: PrecheckResultSender,
) -> Result<(), Fatal> {
let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(()));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess { error, .. } => {
let _ = result_sender.send(PrecheckResult::Err(error.clone()));
},
}
} else {
artifacts.insert_preparing(artifact_id, vec![result_sender]);
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
.await?;
}
Ok(())
}
async fn handle_execute_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
inputs: ExecutePvfInputs,
) -> Result<(), Fatal> {
let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, exec_kind, result_tx } = inputs;
let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
let executor_params = (*pvf.executor_params()).clone();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { ref path, last_time_needed, .. } => {
let file_metadata = std::fs::metadata(path);
if file_metadata.is_ok() {
*last_time_needed = SystemTime::now();
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
exec_kind,
result_tx,
},
},
)
.await?;
} else {
gum::warn!(
target: LOG_TARGET,
?pvf,
?artifact_id,
"handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
);
*state = ArtifactState::Preparing {
waiting_for_response: Vec::new(),
num_failures: 0,
};
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
exec_kind,
result_tx,
},
)
.await?;
}
},
ArtifactState::Preparing { .. } => {
awaiting_prepare.add(
artifact_id,
PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
result_tx,
exec_kind,
},
);
},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
gum::warn!(
target: LOG_TARGET,
?pvf,
?artifact_id,
?last_time_failed,
%num_failures,
%error,
"handle_execute_pvf: Re-trying failed PVF preparation."
);
*state = ArtifactState::Preparing {
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
exec_kind,
result_tx,
},
)
.await?;
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
},
}
} else {
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
result_tx,
exec_kind,
},
)
.await?;
}
Ok(())
}
async fn handle_heads_up(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
active_pvfs: Vec<PvfPrepData>,
) -> Result<(), Fatal> {
let now = SystemTime::now();
for active_pvf in active_pvfs {
let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = now;
},
ArtifactState::Preparing { .. } => {
},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
gum::warn!(
target: LOG_TARGET,
?active_pvf,
?artifact_id,
?last_time_failed,
%num_failures,
%error,
"handle_heads_up: Re-trying failed PVF preparation."
);
*state = ArtifactState::Preparing {
waiting_for_response: vec![],
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
},
)
.await?;
}
},
}
} else {
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
)
.await?;
}
}
Ok(())
}
async fn handle_prepare_done(
artifacts: &mut Artifacts,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
from_queue: prepare::FromQueue,
) -> Result<(), Fatal> {
let prepare::FromQueue { artifact_id, result } = from_queue;
let state = match artifacts.artifact_state_mut(&artifact_id) {
None => {
never!("an unknown artifact was prepared: {:?}", artifact_id);
return Ok(())
},
Some(ArtifactState::Prepared { .. }) => {
never!("the artifact is already prepared: {:?}", artifact_id);
return Ok(())
},
Some(ArtifactState::FailedToProcess { .. }) => {
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
return Ok(())
},
Some(state @ ArtifactState::Preparing { .. }) => state,
};
let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
state
{
for result_sender in waiting_for_response.drain(..) {
let result = result.clone().map(|_| ());
let _ = result_sender.send(result);
}
num_failures
} else {
never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
return Ok(())
};
let pending_requests = awaiting_prepare.take(&artifact_id);
for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } in
pending_requests
{
if result_tx.is_canceled() {
continue
}
let path = match &result {
Ok(success) => success.path.clone(),
Err(error) => {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
continue
},
};
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), &path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
exec_kind,
result_tx,
},
},
)
.await?;
}
*state = match result {
Ok(PrepareSuccess { path, size, .. }) =>
ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), size },
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
gum::error!(
target: LOG_TARGET,
?artifact_id,
time_failed = ?last_time_failed,
%num_failures,
"artifact preparation failed: {}",
error
);
ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
},
};
Ok(())
}
async fn handle_update_active_leaves(
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), Fatal> {
send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await
}
async fn send_prepare(
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
to_queue: prepare::ToQueue,
) -> Result<(), Fatal> {
prepare_queue.send(to_queue).await.map_err(|_| Fatal)
}
async fn send_execute(
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
to_queue: execute::ToQueue,
) -> Result<(), Fatal> {
execute_queue.send(to_queue).await.map_err(|_| Fatal)
}
async fn enqueue_prepare_for_execute(
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
pvf: PvfPrepData,
priority: Priority,
artifact_id: ArtifactId,
pending_execution_request: PendingExecutionRequest,
) -> Result<(), Fatal> {
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
awaiting_prepare.add(artifact_id, pending_execution_request);
Ok(())
}
async fn handle_cleanup_pulse(
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
cleanup_config: &ArtifactsCleanupConfig,
) -> Result<(), Fatal> {
let to_remove = artifacts.prune(cleanup_config);
gum::debug!(
target: LOG_TARGET,
"PVF pruning: {} artifacts reached their end of life",
to_remove.len(),
);
for (artifact_id, path) in to_remove {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
"pruning artifact",
);
sweeper_tx.send(path).await.map_err(|_| Fatal)?;
}
Ok(())
}
async fn handle_artifact_removal(
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
artifact_id: ArtifactId,
reply_to: oneshot::Sender<()>,
) -> Result<(), Fatal> {
let (artifact_id, path) = if let Some(artifact) = artifacts.remove(artifact_id) {
artifact
} else {
return Ok(());
};
reply_to
.send(())
.expect("the execute queue waits for the artifact remove confirmation; qed");
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
"PVF pruning: pruning artifact by request from the execute queue",
);
sweeper_tx.send(path).await.map_err(|_| Fatal)?;
Ok(())
}
async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
loop {
match sweeper_rx.next().await {
None => break,
Some(condemned) => {
let result = tokio::fs::remove_file(&condemned).await;
gum::trace!(
target: LOG_TARGET,
?result,
"Swept the artifact file {}",
condemned.display(),
);
},
}
}
}
fn can_retry_prepare_after_failure(
last_time_failed: SystemTime,
num_failures: u32,
error: &PrepareError,
) -> bool {
if error.is_deterministic() {
return false
}
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES
}
fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold(interval, {
|interval| async move {
futures_timer::Delay::new(interval).await;
Some(((), interval))
}
})
.map(|_| ())
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;
use polkadot_node_primitives::BlockData;
use sp_core::H256;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
#[tokio::test]
async fn pulse_test() {
let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse);
for _ in 0..5 {
let start = std::time::Instant::now();
let _ = pulse.next().await.unwrap();
let el = start.elapsed().as_millis();
assert!(el > 50 && el < 150, "pulse duration: {}", el);
}
}
struct Builder {
cleanup_pulse_interval: Duration,
cleanup_config: ArtifactsCleanupConfig,
artifacts: Artifacts,
}
impl Builder {
fn default() -> Self {
Self {
cleanup_pulse_interval: Duration::from_secs(3600),
cleanup_config: ArtifactsCleanupConfig::default(),
artifacts: Artifacts::empty(),
}
}
fn build(self) -> Test {
Test::new(self)
}
}
struct Test {
to_host_tx: Option<mpsc::Sender<ToHost>>,
to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
#[allow(unused)]
from_execute_queue_tx: mpsc::UnboundedSender<execute::FromQueue>,
to_sweeper_rx: mpsc::Receiver<PathBuf>,
run: BoxFuture<'static, ()>,
}
impl Test {
fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
let (from_execute_queue_tx, from_execute_queue_rx) = mpsc::unbounded();
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
let run = run(Inner {
cleanup_pulse_interval,
cleanup_config,
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
from_execute_queue_rx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
})
.boxed();
Self {
to_host_tx: Some(to_host_tx),
to_prepare_queue_rx,
from_prepare_queue_tx,
to_execute_queue_rx,
from_execute_queue_tx,
to_sweeper_rx,
run,
}
}
fn host_handle(&mut self) -> ValidationHost {
let to_host_tx = self.to_host_tx.take().unwrap();
let security_status = Default::default();
ValidationHost { to_host_tx, security_status }
}
async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
where
T: Send,
{
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
}
async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
.await
}
async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
let to_execute_queue_rx = &mut self.to_execute_queue_rx;
run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
.await
}
async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
use futures_timer::Delay;
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_prepare_queue_rx.next().fuse() => {
panic!("the prepare queue is supposed to be empty")
}
}
}
.boxed(),
)
.await
}
async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
use futures_timer::Delay;
let to_execute_queue_rx = &mut self.to_execute_queue_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_execute_queue_rx.next().fuse() => {
panic!("the execute queue is supposed to be empty")
}
}
}
.boxed(),
)
.await
}
async fn poll_ensure_to_sweeper_is_empty(&mut self) {
use futures_timer::Delay;
let to_sweeper_rx = &mut self.to_sweeper_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
msg = to_sweeper_rx.next().fuse() => {
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
}
}
}
.boxed(),
)
.await
}
}
async fn run_until<R>(
task: &mut (impl Future<Output = ()> + Unpin),
mut fut: (impl Future<Output = R> + Unpin),
) -> R {
use std::task::Poll;
let start = std::time::Instant::now();
let fut = &mut fut;
loop {
if start.elapsed() > std::time::Duration::from_secs(2) {
panic!("timeout");
}
if let Poll::Ready(r) = futures::poll!(&mut *fut) {
break r
}
if futures::poll!(&mut *task).is_ready() {
panic!()
}
}
}
#[tokio::test]
async fn shutdown_on_handle_drop() {
let test = Builder::default().build();
let join_handle = tokio::task::spawn(test.run);
drop(test.to_host_tx);
join_handle.await.unwrap();
}
#[tokio::test]
async fn pruning() {
let mock_now = SystemTime::now() - Duration::from_millis(1000);
let tempdir = tempfile::tempdir().unwrap();
let cache_path = tempdir.path();
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
let path1 = generate_artifact_path(cache_path);
let path2 = generate_artifact_path(cache_path);
builder.artifacts.insert_prepared(artifact_id(1), path1.clone(), mock_now, 1024);
builder.artifacts.insert_prepared(artifact_id(2), path2.clone(), mock_now, 1024);
let mut test = builder.build();
let mut host = test.host_handle();
host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
let to_sweeper_rx = &mut test.to_sweeper_rx;
run_until(
&mut test.run,
async {
assert_eq!(to_sweeper_rx.next().await.unwrap(), path2);
}
.boxed(),
)
.await;
host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
test.poll_ensure_to_sweeper_is_empty().await;
}
#[tokio::test]
async fn execute_pvf_requests() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov1.clone(),
Priority::Normal,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov1,
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
let (result_tx, result_rx_pvf_2) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
pvd,
pov2,
Priority::Normal,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
let result_tx_pvf_1_2 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
let result_tx_pvf_2 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
result_tx_pvf_1_1
.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
);
result_tx_pvf_1_2
.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
);
result_tx_pvf_2
.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_pvf_2.now_or_never().unwrap().unwrap(),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
);
}
#[tokio::test]
async fn precheck_pvf() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
test.poll_ensure_to_execute_queue_is_empty().await;
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
.await
.unwrap();
precheck_receivers.push(result_rx);
}
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
test.poll_ensure_to_execute_queue_is_empty().await;
for result_rx in precheck_receivers {
assert_matches!(
result_rx.now_or_never().unwrap().unwrap(),
Err(PrepareError::TimedOut)
);
}
}
#[tokio::test]
async fn test_prepare_done() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
let (result_tx, result_rx_execute) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
.await
.unwrap();
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
test.poll_ensure_to_execute_queue_is_empty().await;
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
assert_matches!(
result_rx_execute.now_or_never().unwrap().unwrap(),
Err(ValidationError::Internal(_))
);
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(2), result_tx)
.await
.unwrap();
precheck_receivers.push(result_rx);
}
let (result_tx, _result_rx_execute) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
pvd,
pov,
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { .. }
);
for result_rx in precheck_receivers {
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
}
}
#[tokio::test]
async fn test_precheck_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(PrepareError::TimedOut));
let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_2)
.await
.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(PrepareError::TimedOut));
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(PvfPrepData::from_discriminator_precheck(1), result_tx_3)
.await
.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(PrepareError::TimedOut));
}
#[tokio::test]
async fn test_execute_prepare_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::Internal(_)));
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx_2,
)
.await
.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::Internal(_)));
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx_3,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
result_tx_3
.send(Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_3.now_or_never().unwrap().unwrap(),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath))
);
}
#[tokio::test]
async fn test_execute_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::Prevalidation("reproducible error".into())),
})
.await
.unwrap();
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::Preparation(_)));
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx_2,
)
.await
.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::Preparation(_)));
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd.clone(),
pov.clone(),
Priority::Critical,
PvfExecKind::Backing(H256::default()),
result_tx_3,
)
.await
.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(ValidationError::Preparation(_)));
}
#[tokio::test]
async fn test_heads_up_prepare_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
test.poll_ensure_to_prepare_queue_is_empty().await;
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
}
#[tokio::test]
async fn cancellation() {
let mut test = Builder::default().build();
let mut host = test.host_handle();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
pvd,
pov,
Priority::Normal,
PvfExecKind::Backing(H256::default()),
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
drop(result_rx);
test.poll_ensure_to_execute_queue_is_empty().await;
}
}