use crate::{
artifacts::ArtifactPathId,
worker_interface::{
clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use codec::{Decode, Encode};
use futures::FutureExt;
use futures_timer::Delay;
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, WorkerError, WorkerResponse},
worker_dir, SecurityStatus,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{ExecutorParams, PersistedValidationData};
use std::{path::Path, sync::Arc, time::Duration};
use tokio::{io, net::UnixStream};
pub async fn spawn(
program_path: &Path,
cache_path: &Path,
executor_params: ExecutorParams,
spawn_timeout: Duration,
node_version: Option<&str>,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let mut extra_args = vec!["execute-worker"];
if let Some(node_version) = node_version {
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
}
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
cache_path,
&extra_args,
spawn_timeout,
security_status,
)
.await?;
send_execute_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
let err = SpawnErr::Handshake { err: error.to_string() };
gum::warn!(
target: LOG_TARGET,
worker_pid = %idle_worker.pid,
"failed to send a handshake to the spawned worker: {}",
error
);
err
})?;
Ok((idle_worker, worker_handle))
}
pub struct Response {
pub worker_response: WorkerResponse,
pub idle_worker: IdleWorker,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("The communication with the worker exceeded the hard limit")]
HardTimeout,
#[error("An I/O error happened during communication with the worker: {0}")]
CommunicationErr(#[from] io::Error),
#[error("The worker reported an error: {0}")]
WorkerError(#[from] WorkerError),
#[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
}
pub async fn start_work(
worker: IdleWorker,
artifact: ArtifactPathId,
execution_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
) -> Result<Response, Error> {
let IdleWorker { mut stream, pid, worker_dir } = worker;
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
validation_code_hash = ?artifact.id.code_hash,
"starting execute for {}",
artifact.path.display(),
);
with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"failed to send an execute request: {}",
error,
);
Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
})?;
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let worker_result = futures::select! {
worker_result = recv_result(&mut stream).fuse() => {
match worker_result {
Ok(result) =>
handle_result(
result,
pid,
execution_timeout,
)
.await,
Err(error) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"failed to recv an execute result: {}",
error,
);
return Err(Error::CommunicationErr(error))
},
}
},
_ = Delay::new(timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
);
return Err(Error::HardTimeout)
},
};
match worker_result {
Ok(worker_response) => Ok(Response {
worker_response,
idle_worker: IdleWorker { stream, pid, worker_dir },
}),
Err(worker_error) => Err(worker_error.into()),
}
})
.await
}
async fn handle_result(
worker_result: Result<WorkerResponse, WorkerError>,
worker_pid: u32,
execution_timeout: Duration,
) -> Result<WorkerResponse, WorkerError> {
if let Ok(WorkerResponse { duration, .. }) = worker_result {
if duration > execution_timeout {
gum::warn!(
target: LOG_TARGET,
worker_pid,
"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
duration.as_millis(),
execution_timeout.as_millis(),
);
return Err(WorkerError::JobTimedOut)
}
}
worker_result
}
async fn with_worker_dir_setup<F, Fut>(
worker_dir: WorkerDir,
pid: u32,
artifact_path: &Path,
f: F,
) -> Result<Response, Error>
where
Fut: futures::Future<Output = Result<Response, Error>>,
F: FnOnce(WorkerDir) -> Fut,
{
let link_path = worker_dir::execute_artifact(worker_dir.path());
if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
"failed to clear worker cache after the job: {}",
err,
);
return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into());
}
let worker_dir_path = worker_dir.path().to_owned();
let result = f(worker_dir).await;
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir_path,
"failed to clear worker cache after the job: {:?}",
err,
);
return Err(InternalValidationError::CouldNotClearWorkerDir {
err: format!("{:?}", err),
path: worker_dir_path.to_str().map(String::from),
}
.into())
}
result
}
async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
framed_send(stream, &handshake.encode()).await
}
async fn send_request(
stream: &mut UnixStream,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
execution_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &pvd.encode()).await?;
framed_send(stream, &pov.encode()).await?;
framed_send(stream, &execution_timeout.encode()).await
}
async fn recv_result(stream: &mut UnixStream) -> io::Result<Result<WorkerResponse, WorkerError>> {
let result_bytes = framed_recv(stream).await?;
Result::<WorkerResponse, WorkerError>::decode(&mut result_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_result: decode error: {:?}", e),
)
})
}