mod memory_stats;
const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
#[cfg(target_os = "linux")]
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use nix::{
errno::Errno,
sys::{
resource::{Usage, UsageWho},
wait::WaitStatus,
},
unistd::{ForkResult, Pid},
};
use polkadot_node_core_pvf_common::{
executor_interface::{prepare, prevalidate},
worker::{pipe2_cloexec, PipeFd, WorkerInfo},
};
use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT;
use codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareWorkerResult},
executor_interface::create_runtime_from_artifact_bytes,
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
pvf::PvfPrepData,
worker::{
cpu_time_monitor_loop, get_total_cpu_usage, recv_child_response, run_worker, send_result,
stringify_errno, stringify_panic_payload,
thread::{self, spawn_worker_thread, WaitOutcome},
WorkerKind,
},
worker_dir, ProcessTime,
};
use polkadot_primitives::ExecutorParams;
use std::{
fs,
io::{self, Read},
os::{
fd::{AsRawFd, FromRawFd, RawFd},
unix::net::UnixStream,
},
path::{Path, PathBuf},
process,
sync::{mpsc::channel, Arc},
time::Duration,
};
use tracking_allocator::TrackingAllocator;
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[global_allocator]
static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
TrackingAllocator(tikv_jemallocator::Jemalloc);
#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
#[global_allocator]
static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
pub const PREPARE_WORKER_THREAD_NUMBER: u32 = 4;
#[derive(Encode, Decode)]
pub struct CompiledArtifact(Vec<u8>);
impl CompiledArtifact {
pub fn new(code: Vec<u8>) -> Self {
Self(code)
}
}
impl AsRef<[u8]> for CompiledArtifact {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}
#[derive(Encode, Decode)]
pub struct PrepareOutcome {
pub compiled_artifact: CompiledArtifact,
pub observed_wasm_code_len: u32,
}
fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
let pvf = framed_recv_blocking(stream)?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
)
})?;
Ok(pvf)
}
fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
unsafe {
ALLOC.start_tracking(
limit,
Some(Box::new(move || {
#[cfg(target_os = "linux")]
{
libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
libc::syscall(libc::SYS_close, fd);
libc::syscall(libc::SYS_exit_group, 1);
loop {
libc::syscall(libc::SYS_exit, 1);
}
}
#[cfg(not(target_os = "linux"))]
{
libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
libc::close(fd);
libc::_exit(1);
}
})),
);
}
}
fn end_memory_tracking() -> isize {
ALLOC.end_tracking()
}
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
) {
run_worker(
WorkerKind::Prepare,
socket_path,
worker_dir_path,
node_version,
worker_version,
|mut stream, worker_info, security_status| {
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_info.worker_dir_path);
loop {
let pvf = recv_request(&mut stream)?;
gum::debug!(
target: LOG_TARGET,
?worker_info,
?security_status,
"worker: preparing artifact",
);
let preparation_timeout = pvf.prep_timeout();
let prepare_job_kind = pvf.prep_kind();
let executor_params = pvf.executor_params();
let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => {
let result: PrepareWorkerResult =
Err(error_from_errno("getrusage before", errno));
send_result(&mut stream, result, worker_info)?;
continue
},
};
let stream_fd = stream.as_raw_fd();
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let result = if security_status.can_do_secure_clone {
handle_clone(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
&executor_params,
worker_info,
security_status.can_unshare_user_namespace_and_change_root,
&temp_artifact_dest,
usage_before,
)
} else {
handle_fork(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
&executor_params,
worker_info,
&temp_artifact_dest,
usage_before,
)
};
} else {
let result = handle_fork(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
&executor_params,
worker_info,
&temp_artifact_dest,
usage_before,
);
}
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);
send_result(&mut stream, result, worker_info)?;
}
},
);
}
fn prepare_artifact(pvf: PvfPrepData) -> Result<PrepareOutcome, PrepareError> {
let maybe_compressed_code = pvf.maybe_compressed_code();
let raw_validation_code =
sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT)
.map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?;
let observed_wasm_code_len = raw_validation_code.len() as u32;
let blob = match prevalidate(&raw_validation_code) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
match prepare(blob, &pvf.executor_params()) {
Ok(compiled_artifact) => Ok(PrepareOutcome {
compiled_artifact: CompiledArtifact::new(compiled_artifact),
observed_wasm_code_len,
}),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
}
fn runtime_construction_check(
artifact_bytes: &[u8],
executor_params: &ExecutorParams,
) -> Result<(), PrepareError> {
let result = unsafe { create_runtime_from_artifact_bytes(artifact_bytes, executor_params) };
result
.map(|_runtime| ())
.map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
}
#[derive(Encode, Decode)]
struct JobResponse {
artifact: CompiledArtifact,
memory_stats: MemoryStats,
observed_wasm_code_len: u32,
}
#[cfg(target_os = "linux")]
fn handle_clone(
pvf: &PvfPrepData,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: &Arc<ExecutorParams>,
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
temp_artifact_dest: &Path,
usage_before: Usage,
) -> Result<PrepareWorkerSuccess, PrepareError> {
use polkadot_node_core_pvf_common::worker::security;
match unsafe {
security::clone::clone_on_worker(
worker_info,
have_unshare_newuser,
Box::new(|| {
handle_child_process(
pvf.clone(),
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
Arc::clone(&executor_params),
)
}),
)
} {
Ok(child) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
temp_artifact_dest,
usage_before,
preparation_timeout,
),
Err(security::clone::Error::Clone(errno)) => Err(error_from_errno("clone", errno)),
}
}
fn handle_fork(
pvf: &PvfPrepData,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: &Arc<ExecutorParams>,
worker_info: &WorkerInfo,
temp_artifact_dest: &Path,
usage_before: Usage,
) -> Result<PrepareWorkerSuccess, PrepareError> {
match unsafe { nix::unistd::fork() } {
Ok(ForkResult::Child) => handle_child_process(
pvf.clone(),
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
Arc::clone(executor_params),
),
Ok(ForkResult::Parent { child }) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
temp_artifact_dest,
usage_before,
preparation_timeout,
),
Err(errno) => Err(error_from_errno("fork", errno)),
}
}
fn handle_child_process(
pvf: PvfPrepData,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
if let Err(errno) = nix::unistd::close(pipe_read_fd) {
send_child_response(
&mut pipe_write,
JobResult::Err(error_from_errno("closing pipe", errno)),
);
}
if let Err(errno) = nix::unistd::close(stream_fd) {
send_child_response(
&mut pipe_write,
JobResult::Err(error_from_errno("error closing stream", errno)),
);
}
let worker_job_pid = process::id();
gum::debug!(
target: LOG_TARGET,
%worker_job_pid,
?prepare_job_kind,
?preparation_timeout,
"worker job: preparing artifact",
);
let condvar = thread::get_condvar();
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let condvar_memory = Arc::clone(&condvar);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
start_memory_tracking(
pipe_write.as_raw_fd(),
executor_params.prechecking_max_memory().map(|v| {
v.try_into().unwrap_or_else(|_| {
gum::warn!(
LOG_TARGET,
%worker_job_pid,
"Illegal pre-checking max memory value {} discarded",
v,
);
0
})
}),
);
let cpu_time_start = ProcessTime::now();
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx),
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
let prepare_thread = spawn_worker_thread(
"prepare worker",
move || {
#[allow(unused_mut)]
let mut result = prepare_artifact(pvf).map(|o| (o,));
#[cfg(target_os = "linux")]
let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread()));
if let PrepareJobKind::Prechecking = prepare_job_kind {
result = result.and_then(|output| {
runtime_construction_check(
output.0.compiled_artifact.as_ref(),
&executor_params,
)?;
Ok(output)
});
}
result
},
Arc::clone(&condvar),
WaitOutcome::Finished,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
let outcome = thread::wait_for_threads(condvar);
let peak_alloc = {
let peak = end_memory_tracking();
gum::debug!(
target: LOG_TARGET,
%worker_job_pid,
"prepare job peak allocation is {} bytes",
peak,
);
peak
};
let result = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
match prepare_thread.join().unwrap_or_else(|err| {
send_child_response(
&mut pipe_write,
Err(PrepareError::JobError(stringify_panic_payload(err))),
)
}) {
Err(err) => Err(err),
Ok(ok) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok;
} else {
let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok;
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id());
let memory_stats = MemoryStats {
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, process::id()),
peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
};
Ok(JobResponse {
artifact: compiled_artifact,
observed_wasm_code_len,
memory_stats,
})
},
}
},
WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut),
Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())),
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
},
WaitOutcome::Pending =>
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};
send_child_response(&mut pipe_write, result);
}
fn handle_parent_process(
pipe_read_fd: i32,
pipe_write_fd: i32,
worker_info: &WorkerInfo,
job_pid: Pid,
temp_artifact_dest: &Path,
usage_before: Usage,
timeout: Duration,
) -> Result<PrepareWorkerSuccess, PrepareError> {
if let Err(errno) = nix::unistd::close(pipe_write_fd) {
return Err(error_from_errno("closing pipe write fd", errno));
};
let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
let mut received_data = Vec::new();
pipe_read
.read_to_end(&mut received_data)
.map_err(|err| PrepareError::IoErr(err.to_string()))?;
let status = nix::sys::wait::waitpid(job_pid, None);
gum::trace!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"prepare worker received wait status from job: {:?}",
status,
);
let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
.map_err(|errno| error_from_errno("getrusage after", errno))?;
let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Err(PrepareError::TimedOut)
}
match status {
Ok(WaitStatus::Exited(_pid, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
let result = recv_child_response(&mut reader, "prepare")
.map_err(|err| PrepareError::JobError(err.to_string()))?;
match result {
Err(err) => Err(err),
Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => {
if exit_status != 0 {
return Err(PrepareError::JobError(format!(
"unexpected exit status: {}",
exit_status
)))
}
gum::debug!(
target: LOG_TARGET,
?worker_info,
%job_pid,
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
if let Err(err) = fs::write(temp_artifact_dest, &artifact) {
return Err(PrepareError::IoErr(err.to_string()))
};
let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string();
Ok(PrepareWorkerSuccess {
checksum,
stats: PrepareStats {
memory_stats,
cpu_time_elapsed: cpu_tv,
observed_wasm_code_len,
},
})
},
}
},
Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Err(PrepareError::JobDied {
err: format!("received signal: {signal:?}"),
job_pid: job_pid.as_raw(),
}),
Err(errno) => Err(error_from_errno("waitpid", errno)),
Ok(unexpected_wait_status) => Err(PrepareError::JobDied {
err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
job_pid: job_pid.as_raw(),
}),
}
}
fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
if response.is_ok() {
process::exit(libc::EXIT_SUCCESS)
} else {
process::exit(libc::EXIT_FAILURE)
}
}
fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
PrepareError::Kernel(stringify_errno(context, errno))
}
type JobResult = Result<JobResponse, PrepareError>;
const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
#[test]
fn pre_encoded_payloads() {
let oom_unencoded: JobResult = JobResult::Err(PrepareError::OutOfMemory);
let oom_encoded = oom_unencoded.encode();
let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
oom_payload.extend(oom_encoded);
assert_eq!(oom_payload, OOM_PAYLOAD);
}