#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
pub use polkadot_node_core_pvf_common::{
error::ExecuteError, executor_interface::execute_artifact,
};
use polkadot_parachain_primitives::primitives::ValidationParams;
const LOG_TARGET: &str = "parachain::pvf-execute-worker";
use codec::{Decode, Encode};
use cpu_time::ProcessTime;
use nix::{
errno::Errno,
sys::{
resource::{Usage, UsageWho},
wait::WaitStatus,
},
unistd::{ForkResult, Pid},
};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse},
executor_interface::params_to_wasmtime_semantics,
framed_recv_blocking, framed_send_blocking,
worker::{
cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
send_result, stringify_errno, stringify_panic_payload,
thread::{self, WaitOutcome},
PipeFd, WorkerInfo, WorkerKind,
},
worker_dir,
};
use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::{ExecutorParams, PersistedValidationData};
use std::{
io::{self, Read},
os::{
fd::{AsRawFd, FromRawFd},
unix::net::UnixStream,
},
path::PathBuf,
process,
sync::{mpsc::channel, Arc},
time::Duration,
};
pub const EXECUTE_WORKER_THREAD_NUMBER: u32 = 3;
fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
let handshake_enc = framed_recv_blocking(stream)?;
let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_execute_handshake: failed to decode Handshake".to_owned(),
)
})?;
Ok(handshake)
}
fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> {
let pvd = framed_recv_blocking(stream)?;
let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode persisted validation data".to_string(),
)
})?;
let pov = framed_recv_blocking(stream)?;
let pov = PoV::decode(&mut &pov[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode PoV".to_string(),
)
})?;
let execution_timeout = framed_recv_blocking(stream)?;
let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode duration".to_string(),
)
})?;
Ok((pvd, pov, execution_timeout))
}
macro_rules! map_and_send_err {
($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{
let err: WorkerError = $err_constructor($error.to_string()).into();
let io_err = io::Error::new(io::ErrorKind::Other, err.to_string());
let _ = send_result::<WorkerResponse, WorkerError>($stream, Err(err), $worker_info);
io_err
}};
}
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
) {
run_worker(
WorkerKind::Execute,
socket_path,
worker_dir_path,
node_version,
worker_version,
|mut stream, worker_info, security_status| {
let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
let Handshake { executor_params } =
recv_execute_handshake(&mut stream).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
let executor_params: Arc<ExecutorParams> = Arc::new(executor_params);
let execute_thread_stack_size = max_stack_size(&executor_params);
loop {
let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
gum::debug!(
target: LOG_TARGET,
?worker_info,
?security_status,
"worker: validating artifact {}",
artifact_path.display(),
);
let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::CouldNotOpenFile,
&mut stream,
worker_info
)
})?;
let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::CouldNotCreatePipe,
&mut stream,
worker_info
)
})?;
let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
.map_err(|errno| {
let e = stringify_errno("getrusage before", errno);
map_and_send_err!(
e,
InternalValidationError::Kernel,
&mut stream,
worker_info
)
})?;
let stream_fd = stream.as_raw_fd();
let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
let raw_block_data =
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
Ok(data) => data,
Err(_) => {
send_result::<WorkerResponse, WorkerError>(
&mut stream,
Ok(WorkerResponse {
job_response: JobResponse::PoVDecompressionFailure,
duration: Duration::ZERO,
pov_size: 0,
}),
worker_info,
)?;
continue;
},
};
let pov_size = raw_block_data.len() as u32;
let params = ValidationParams {
parent_head: pvd.parent_head.clone(),
block_data: BlockData(raw_block_data.to_vec()),
relay_parent_number: pvd.relay_parent_number,
relay_parent_storage_root: pvd.relay_parent_storage_root,
};
let params = Arc::new(params.encode());
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let result = if security_status.can_do_secure_clone {
handle_clone(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
¶ms,
execution_timeout,
execute_thread_stack_size,
worker_info,
security_status.can_unshare_user_namespace_and_change_root,
usage_before,
pov_size,
)?
} else {
handle_fork(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
¶ms,
execution_timeout,
execute_thread_stack_size,
worker_info,
usage_before,
pov_size,
)?
};
} else {
let result = handle_fork(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
¶ms,
execution_timeout,
execute_thread_stack_size,
worker_info,
usage_before,
pov_size,
)?;
}
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);
send_result(&mut stream, result, worker_info)?;
}
},
);
}
fn validate_using_artifact(
compiled_artifact_blob: &[u8],
executor_params: &ExecutorParams,
params: &[u8],
) -> JobResponse {
let descriptor_bytes = match unsafe {
execute_artifact(compiled_artifact_blob, executor_params, params)
} {
Err(ExecuteError::RuntimeConstruction(wasmerr)) =>
return JobResponse::runtime_construction("execute", &wasmerr.to_string()),
Err(err) => return JobResponse::format_invalid("execute", &err.to_string()),
Ok(d) => d,
};
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return JobResponse::format_invalid(
"validation result decoding failed",
&err.to_string(),
),
Ok(r) => r,
};
JobResponse::Ok { result_descriptor }
}
#[cfg(target_os = "linux")]
fn handle_clone(
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: &Arc<Vec<u8>>,
executor_params: &Arc<ExecutorParams>,
params: &Arc<Vec<u8>>,
execution_timeout: Duration,
execute_stack_size: usize,
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
usage_before: Usage,
pov_size: u32,
) -> io::Result<Result<WorkerResponse, WorkerError>> {
use polkadot_node_core_pvf_common::worker::security;
match unsafe {
security::clone::clone_on_worker(
worker_info,
have_unshare_newuser,
Box::new(|| {
handle_child_process(
pipe_write_fd,
pipe_read_fd,
stream_fd,
Arc::clone(compiled_artifact_blob),
Arc::clone(executor_params),
Arc::clone(params),
execution_timeout,
execute_stack_size,
)
}),
)
} {
Ok(child) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
usage_before,
pov_size,
execution_timeout,
),
Err(security::clone::Error::Clone(errno)) =>
Ok(Err(internal_error_from_errno("clone", errno))),
}
}
fn handle_fork(
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: &Arc<Vec<u8>>,
executor_params: &Arc<ExecutorParams>,
params: &Arc<Vec<u8>>,
execution_timeout: Duration,
execute_worker_stack_size: usize,
worker_info: &WorkerInfo,
usage_before: Usage,
pov_size: u32,
) -> io::Result<Result<WorkerResponse, WorkerError>> {
match unsafe { nix::unistd::fork() } {
Ok(ForkResult::Child) => handle_child_process(
pipe_write_fd,
pipe_read_fd,
stream_fd,
Arc::clone(compiled_artifact_blob),
Arc::clone(executor_params),
Arc::clone(params),
execution_timeout,
execute_worker_stack_size,
),
Ok(ForkResult::Parent { child }) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
usage_before,
pov_size,
execution_timeout,
),
Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
}
}
fn handle_child_process(
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: Arc<Vec<u8>>,
executor_params: Arc<ExecutorParams>,
params: Arc<Vec<u8>>,
execution_timeout: Duration,
execute_thread_stack_size: usize,
) -> ! {
let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
if let Err(errno) = nix::unistd::close(pipe_read_fd) {
send_child_response(&mut pipe_write, job_error_from_errno("closing pipe", errno));
}
if let Err(errno) = nix::unistd::close(stream_fd) {
send_child_response(&mut pipe_write, job_error_from_errno("closing stream", errno));
}
gum::debug!(
target: LOG_TARGET,
worker_job_pid = %process::id(),
"worker job: executing artifact",
);
let condvar = thread::get_condvar();
let cpu_time_start = ProcessTime::now();
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx),
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || validate_using_artifact(&compiled_artifact_blob, &executor_params, ¶ms),
Arc::clone(&condvar),
WaitOutcome::Finished,
execute_thread_stack_size,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});
let outcome = thread::wait_for_threads(condvar);
let response = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e)))
},
WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut),
Ok(None) => Err(JobError::CpuTimeMonitorThread(
"error communicating over finished channel".into(),
)),
Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))),
},
WaitOutcome::Pending =>
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};
send_child_response(&mut pipe_write, response);
}
fn max_stack_size(executor_params: &ExecutorParams) -> usize {
let (_sem, deterministic_stack_limit) = params_to_wasmtime_semantics(executor_params);
return (2 * 1024 * 1024 + deterministic_stack_limit.native_stack_max) as usize;
}
fn handle_parent_process(
pipe_read_fd: i32,
pipe_write_fd: i32,
worker_info: &WorkerInfo,
job_pid: Pid,
usage_before: Usage,
pov_size: u32,
timeout: Duration,
) -> io::Result<Result<WorkerResponse, WorkerError>> {
if let Err(errno) = nix::unistd::close(pipe_write_fd) {
return Ok(Err(internal_error_from_errno("closing pipe write fd", errno)));
};
let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
let mut received_data = Vec::new();
pipe_read
.read_to_end(&mut received_data)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
let status = nix::sys::wait::waitpid(job_pid, None);
gum::trace!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"execute worker received wait status from job: {:?}",
status,
);
let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))),
};
let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Ok(Err(WorkerError::JobTimedOut))
}
match status {
Ok(WaitStatus::Exited(_, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
let result = recv_child_response(&mut reader, "execute")?;
match result {
Ok(job_response) => {
if exit_status != 0 {
return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus(
exit_status,
))));
}
Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv }))
},
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"execute job error: {}",
job_error,
);
if matches!(job_error, JobError::TimedOut) {
Ok(Err(WorkerError::JobTimedOut))
} else {
Ok(Err(WorkerError::JobError(job_error.into())))
}
},
}
},
Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied {
err: format!("received signal: {signal:?}"),
job_pid: job_pid.as_raw(),
})),
Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))),
Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied {
err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
job_pid: job_pid.as_raw(),
})),
}
}
fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
if response.is_ok() {
process::exit(libc::EXIT_SUCCESS)
} else {
process::exit(libc::EXIT_FAILURE)
}
}
fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError {
WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno)))
}
fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
Err(JobError::Kernel(stringify_errno(context, errno)))
}