//! Host interface to the execute worker.
use crate::{
clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use codec::{Decode, Encode};
use futures::FutureExt;
use futures_timer::Delay;
use polkadot_node_core_pvf_common::{
execute::{Handshake, WorkerError, WorkerResponse},
worker_dir, SecurityStatus,
use polkadot_node_primitives::PoV;
use polkadot_primitives::{ExecutorParams, PersistedValidationData};
use std::{path::Path, sync::Arc, time::Duration};
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// Sends a handshake message to the worker as soon as it is spawned.
pub async fn spawn(
program_path: &Path,
cache_path: &Path,
executor_params: ExecutorParams,
spawn_timeout: Duration,
node_version: Option<&str>,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let mut extra_args = vec!["execute-worker"];
if let Some(node_version) = node_version {
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
let (mut idle_worker, worker_handle) = spawn_with_program_path(
send_execute_handshake(&mut idle_worker.stream, Handshake { executor_params })
.map_err(|error| {
let err = SpawnErr::Handshake { err: error.to_string() };
target: LOG_TARGET,
worker_pid = %idle_worker.pid,
"failed to send a handshake to the spawned worker: {}",
Ok((idle_worker, worker_handle))
/// Outcome of PVF execution.
/// PVF execution completed and the result is returned. The worker is ready for
/// another job.
pub struct Response {
/// The response (valid/invalid) from the worker.
pub worker_response: WorkerResponse,
/// Returning the idle worker token means the worker can be reused.
pub idle_worker: IdleWorker,
/// The idle worker token is not returned for any of these cases, meaning the worker must be
/// terminated.
/// NOTE: Errors related to the preparation process are not expected to be encountered by the
/// execution workers.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// The execution time exceeded the hard limit. The worker is terminated.
#[error("The communication with the worker exceeded the hard limit")]
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
#[error("An I/O error happened during communication with the worker: {0}")]
CommunicationErr(#[from] io::Error),
/// The worker reported an error (can be from itself or from the job). The worker should not be
/// reused.
#[error("The worker reported an error: {0}")]
WorkerError(#[from] WorkerError),
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
/// be a problem with the worker, so we terminate it.
#[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
/// NOTE: Not returning the idle worker token in `Outcome` will trigger the child process being
/// killed, if it's still alive.
pub async fn start_work(
worker: IdleWorker,
artifact: ArtifactPathId,
execution_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
) -> Result<Response, Error> {
let IdleWorker { mut stream, pid, worker_dir } = worker;
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"starting execute for {}",
with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| {
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"failed to send an execute request: {}",
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let worker_result = futures::select! {
worker_result = recv_result(&mut stream).fuse() => {
match worker_result {
Ok(result) =>
Err(error) => {
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"failed to recv an execute result: {}",
return Err(Error::CommunicationErr(error))
_ = Delay::new(timeout).fuse() => {
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
return Err(Error::HardTimeout)
match worker_result {
Ok(worker_response) => Ok(Response {
idle_worker: IdleWorker { stream, pid, worker_dir },
Err(worker_error) => Err(worker_error.into()),
/// Handles the case where we successfully received response bytes on the host from the child.
/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
/// by [`with_worker_dir_setup`].
async fn handle_result(
worker_result: Result<WorkerResponse, WorkerError>,
worker_pid: u32,
execution_timeout: Duration,
) -> Result<WorkerResponse, WorkerError> {
if let Ok(WorkerResponse { duration, .. }) = worker_result {
if duration > execution_timeout {
// The job didn't complete within the timeout.
target: LOG_TARGET,
"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
// Return a timeout error.
return Err(WorkerError::JobTimedOut)
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
/// passing the file path in, and clean up the worker cache.
/// Failure to clean up the worker cache results in an error - leaving any files here could be a
/// security issue, and we should shut down the worker. This should be very rare.
async fn with_worker_dir_setup<F, Fut>(
worker_dir: WorkerDir,
pid: u32,
artifact_path: &Path,
f: F,
) -> Result<Response, Error>
Fut: futures::Future<Output = Result<Response, Error>>,
F: FnOnce(WorkerDir) -> Fut,
// Cheaply create a hard link to the artifact. The artifact is always at a known location in the
// worker cache, and the child can't access any other artifacts or gain any information from the
// original filename.
let link_path = worker_dir::execute_artifact(worker_dir.path());
if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await {
target: LOG_TARGET,
worker_pid = %pid,
"failed to clear worker cache after the job: {}",
return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into());
let worker_dir_path = worker_dir.path().to_owned();
let result = f(worker_dir).await;
// Try to clear the worker dir.
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
target: LOG_TARGET,
worker_pid = %pid,
"failed to clear worker cache after the job: {:?}",
return Err(InternalValidationError::CouldNotClearWorkerDir {
err: format!("{:?}", err),
path: worker_dir_path.to_str().map(String::from),
/// Sends a handshake with information specific to the execute worker.
async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
framed_send(stream, &handshake.encode()).await
async fn send_request(
stream: &mut UnixStream,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
execution_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &pvd.encode()).await?;
framed_send(stream, &pov.encode()).await?;
framed_send(stream, &execution_timeout.encode()).await
async fn recv_result(stream: &mut UnixStream) -> io::Result<Result<WorkerResponse, WorkerError>> {
let result_bytes = framed_recv(stream).await?;
Result::<WorkerResponse, WorkerError>::decode(&mut result_bytes.as_slice()).map_err(|e| {
format!("execute pvf recv_result: decode error: {:?}", e),