1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Host interface to the prepare worker.
use crate::{
clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult, PrepareWorkerResult},
prepare::{PrepareStats, PrepareSuccess, PrepareWorkerSuccess},
worker_dir, SecurityStatus,
use sp_core::hexdisplay::HexDisplay;
use std::{
path::{Path, PathBuf},
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// Sends a handshake message to the worker as soon as it is spawned.
pub async fn spawn(
program_path: &Path,
cache_path: &Path,
spawn_timeout: Duration,
node_version: Option<&str>,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let mut extra_args = vec!["prepare-worker"];
if let Some(node_version) = node_version {
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
/// Outcome of PVF preparation.
/// If the idle worker token is not returned, it means the worker must be terminated.
pub enum Outcome {
/// The worker has finished the work assigned to it.
Concluded { worker: IdleWorker, result: PrepareResult },
/// The host tried to reach the worker but failed. This is most likely because the worked was
/// killed by the system.
/// The temporary file for the artifact could not be created at the given cache path.
CreateTmpFileErr { worker: IdleWorker, err: String },
/// The response from the worker is received, but the tmp file cannot be renamed (moved) to the
/// final destination location.
RenameTmpFile {
worker: IdleWorker,
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
// conversion to `Option<String>`.
src: Option<String>,
dest: Option<String>,
/// The worker cache could not be cleared for the given reason.
ClearWorkerDir { err: String },
/// The worker failed to finish the job until the given deadline.
/// The worker is no longer usable and should be killed.
/// An IO error occurred while receiving the result from the worker process.
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
/// The worker ran out of memory and is aborting. The worker should be ripped.
/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
/// The worker might still be usable, but we kill it just in case.
JobDied { err: String, job_pid: i32 },
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process
/// being killed.
pub async fn start_work(
metrics: &Metrics,
worker: IdleWorker,
pvf: PvfPrepData,
cache_path: PathBuf,
) -> Outcome {
let IdleWorker { stream, pid, worker_dir } = worker;
target: LOG_TARGET,
worker_pid = %pid,
"starting prepare for {:?}",
|tmp_artifact_file, mut stream, worker_dir| async move {
let preparation_timeout = pvf.prep_timeout();
if let Err(err) = send_request(&mut stream, &pvf).await {
target: LOG_TARGET,
worker_pid = %pid,
"failed to send a prepare request: {:?}",
return Outcome::Unreachable
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines. In that case we should
// propagate the error to the pool.
// We use a generous timeout here. This is in addition to the one in the child process,
// in case the child stalls. We have a wall clock timeout here in the host, but a CPU
// timeout in the child. We want to use CPU time because it varies less than wall clock
// time under load, but the CPU resources of the child can only be measured from the
// parent after the child process terminates.
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
match result {
// Received bytes from worker within the time limit.
Ok(Ok(prepare_worker_result)) =>
IdleWorker { stream, pid, worker_dir },
Ok(Err(err)) => {
// Communication error within the time limit.
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {}",
Err(_) => {
// Timed out here on the host.
target: LOG_TARGET,
worker_pid = %pid,
"did not recv a prepare response within the time limit",
/// Handles the case where we successfully received response bytes on the host from the child.
/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
/// by [`with_worker_dir_setup`].
async fn handle_response(
metrics: &Metrics,
worker: IdleWorker,
result: PrepareWorkerResult,
worker_pid: u32,
tmp_file: PathBuf,
cache_path: &Path,
preparation_timeout: Duration,
) -> Outcome {
// TODO: Add `checksum` to `ArtifactPathId`. See:
// https://github.com/paritytech/polkadot-sdk/issues/2399
let PrepareWorkerSuccess {
checksum: _,
stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len },
} = match result.clone() {
Ok(result) => result,
// Timed out on the child. This should already be logged by the child.
Err(PrepareError::TimedOut) => return Outcome::TimedOut,
Err(PrepareError::JobDied { err, job_pid }) => return Outcome::JobDied { err, job_pid },
Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
Err(err) => return Outcome::Concluded { worker, result: Err(err) },
metrics.observe_code_size(observed_wasm_code_len as usize);
if cpu_time_elapsed > preparation_timeout {
// The job didn't complete within the timeout.
target: LOG_TARGET,
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
return Outcome::TimedOut
let size = match tokio::fs::metadata(cache_path).await {
Ok(metadata) => metadata.len(),
Err(err) => {
target: LOG_TARGET,
"failed to read size of the artifact: {}",
return Outcome::IoErr(err.to_string())
// The file name should uniquely identify the artifact even across restarts. In case the cache
// for some reason is not cleared correctly, we cannot
// accidentally execute an artifact compiled under a different wasmtime version, host
// environment, etc.
let artifact_path = generate_artifact_path(cache_path);
target: LOG_TARGET,
"promoting WIP artifact {} to {}",
let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await {
Ok(()) => Outcome::Concluded {
result: Ok(PrepareSuccess {
path: artifact_path,
stats: PrepareStats {
memory_stats: memory_stats.clone(),
Err(err) => {
target: LOG_TARGET,
"failed to rename the artifact from {} to {}: {:?}",
Outcome::RenameTmpFile {
err: format!("{:?}", err),
src: tmp_file.to_str().map(String::from),
dest: artifact_path.to_str().map(String::from),
// If there were no errors up until now, log the memory stats for a successful preparation, if
// available.
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
/// passing the file path in, and clean up the worker cache.
/// Failure to clean up the worker cache results in an error - leaving any files here could be a
/// security issue, and we should shut down the worker. This should be very rare.
async fn with_worker_dir_setup<F, Fut>(
worker_dir: WorkerDir,
stream: UnixStream,
pid: u32,
f: F,
) -> Outcome
Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf, UnixStream, WorkerDir) -> Fut,
// Create the tmp file here so that the child doesn't need any file creation rights. This will
// be cleared at the end of this function.
let tmp_file = worker_dir::prepare_tmp_artifact(worker_dir.path());
if let Err(err) = tokio::fs::File::create(&tmp_file).await {
target: LOG_TARGET,
worker_pid = %pid,
"failed to create a temp file for the artifact: {:?}",
return Outcome::CreateTmpFileErr {
worker: IdleWorker { stream, pid, worker_dir },
err: format!("{:?}", err),
let worker_dir_path = worker_dir.path().to_owned();
let outcome = f(tmp_file, stream, worker_dir).await;
// Try to clear the worker dir.
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
target: LOG_TARGET,
worker_pid = %pid,
"failed to clear worker cache after the job: {:?}",
return Outcome::ClearWorkerDir { err: format!("{:?}", err) }
async fn send_request(stream: &mut UnixStream, pvf: &PvfPrepData) -> io::Result<()> {
framed_send(stream, &pvf.encode()).await?;
async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareWorkerResult> {
let result = framed_recv(stream).await?;
let result = PrepareWorkerResult::decode(&mut &result[..]).map_err(|e| {
// We received invalid bytes from the worker.
let bound_bytes = &result[..result.len().min(4)];
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
format!("prepare pvf recv_response: failed to decode result: {:?}", e),