pub mod security;
use crate::{
framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET,
};
use codec::{Decode, Encode};
use cpu_time::ProcessTime;
use futures::never::Never;
use nix::{errno::Errno, sys::resource::Usage};
use std::{
any::Any,
fmt::{self},
fs::File,
io::{self, Read, Write},
os::{
fd::{AsRawFd, FromRawFd, RawFd},
unix::net::UnixStream,
},
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
};
#[macro_export]
macro_rules! decl_worker_main {
($expected_command:expr, $entrypoint:expr, $worker_version:expr, $worker_version_hash:expr $(,)*) => {
fn get_full_version() -> String {
format!("{}-{}", $worker_version, $worker_version_hash)
}
fn print_help(expected_command: &str) {
println!("{} {}", expected_command, $worker_version);
println!("commit: {}", $worker_version_hash);
println!();
println!("PVF worker that is called by polkadot.");
}
fn main() {
#[cfg(target_os = "linux")]
use $crate::worker::security;
$crate::sp_tracing::try_init_simple();
let args = std::env::args().collect::<Vec<_>>();
if args.len() == 1 {
print_help($expected_command);
return
}
match args[1].as_ref() {
"--help" | "-h" => {
print_help($expected_command);
return
},
"--version" | "-v" => {
println!("{}", $worker_version);
return
},
"--full-version" => {
println!("{}", get_full_version());
return
},
"--check-can-enable-landlock" => {
#[cfg(target_os = "linux")]
let status = if let Err(err) = security::landlock::check_can_fully_enable() {
eprintln!("{}", err);
-1
} else {
0
};
#[cfg(not(target_os = "linux"))]
let status = -1;
std::process::exit(status)
},
"--check-can-enable-seccomp" => {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
let status = if let Err(err) = security::seccomp::check_can_fully_enable() {
eprintln!("{}", err);
-1
} else {
0
};
#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
let status = -1;
std::process::exit(status)
},
"--check-can-unshare-user-namespace-and-change-root" => {
#[cfg(target_os = "linux")]
let cache_path_tempdir = std::path::Path::new(&args[2]);
#[cfg(target_os = "linux")]
let status = if let Err(err) =
security::change_root::check_can_fully_enable(&cache_path_tempdir)
{
eprintln!("{}", err);
-1
} else {
0
};
#[cfg(not(target_os = "linux"))]
let status = -1;
std::process::exit(status)
},
"--check-can-do-secure-clone" => {
#[cfg(target_os = "linux")]
let status = if let Err(err) = unsafe { security::clone::check_can_fully_clone() } {
eprintln!("{}", err);
-1
} else {
0
};
#[cfg(not(target_os = "linux"))]
let status = -1;
std::process::exit(status)
},
"test-sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
return
},
subcommand => {
if subcommand != $expected_command {
panic!(
"trying to run {} binary with the {} subcommand",
$expected_command, subcommand
)
}
},
}
let mut socket_path = None;
let mut worker_dir_path = None;
let mut node_version = None;
let mut i = 2;
while i < args.len() {
match args[i].as_ref() {
"--socket-path" => {
socket_path = Some(args[i + 1].as_str());
i += 1
},
"--worker-dir-path" => {
worker_dir_path = Some(args[i + 1].as_str());
i += 1
},
"--node-impl-version" => {
node_version = Some(args[i + 1].as_str());
i += 1
},
arg => panic!("Unexpected argument found: {}", arg),
}
i += 1;
}
let socket_path = socket_path.expect("the --socket-path argument is required");
let worker_dir_path =
worker_dir_path.expect("the --worker-dir-path argument is required");
let socket_path = std::path::Path::new(socket_path).to_owned();
let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
$entrypoint(socket_path, worker_dir_path, node_version, Some($worker_version));
}
};
}
#[cfg(not(target_os = "macos"))]
pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let res = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
Ok((fds[0], fds[1]))
}
#[cfg(target_os = "macos")]
pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
if res != 0 {
return Err(io::Error::last_os_error())
}
let res = unsafe { libc::fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
let res = unsafe { libc::fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
Ok((fds[0], fds[1]))
}
pub struct PipeFd {
file: File,
}
impl AsRawFd for PipeFd {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl FromRawFd for PipeFd {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
PipeFd { file: File::from_raw_fd(fd) }
}
}
impl Read for PipeFd {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.file.read(buf)
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.file.read_to_end(buf)
}
}
impl Write for PipeFd {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.file.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.file.write_all(buf)
}
}
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
#[derive(Debug, Clone, Copy)]
pub enum WorkerKind {
Prepare,
Execute,
CheckPivotRoot,
}
impl fmt::Display for WorkerKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Prepare => write!(f, "prepare"),
Self::Execute => write!(f, "execute"),
Self::CheckPivotRoot => write!(f, "check pivot root"),
}
}
}
#[derive(Debug)]
pub struct WorkerInfo {
pub pid: u32,
pub kind: WorkerKind,
pub version: Option<String>,
pub worker_dir_path: PathBuf,
}
pub fn run_worker<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(UnixStream, &WorkerInfo, SecurityStatus) -> io::Result<Never>,
{
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut worker_info = WorkerInfo {
pid: std::process::id(),
kind: worker_kind,
version: worker_version.map(|v| v.to_string()),
worker_dir_path,
};
gum::debug!(
target: LOG_TARGET,
?worker_info,
?socket_path,
"starting pvf worker ({})",
worker_info.kind
);
if let (Some(node_version), Some(worker_version)) = (node_version, &worker_info.version) {
if node_version != worker_version {
gum::error!(
target: LOG_TARGET,
?worker_info,
%node_version,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
kill_parent_node_in_emergency();
worker_shutdown(worker_info, "Version mismatch");
}
}
let entries: io::Result<Vec<_>> = std::fs::read_dir(&worker_info.worker_dir_path)
.and_then(|d| d.map(|res| res.map(|e| e.file_name())).collect());
match entries {
Ok(entries) =>
gum::trace!(target: LOG_TARGET, ?worker_info, "content of worker dir: {:?}", entries),
Err(err) => {
let err = format!("Could not read worker dir: {}", err.to_string());
worker_shutdown_error(worker_info, &err);
},
}
let stream = || -> io::Result<UnixStream> {
let stream = UnixStream::connect(&socket_path)?;
let _ = std::fs::remove_file(&socket_path);
Ok(stream)
}();
let mut stream = match stream {
Ok(ok) => ok,
Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
};
let WorkerHandshake { security_status } = match recv_worker_handshake(&mut stream) {
Ok(ok) => ok,
Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
};
{
gum::trace!(target: LOG_TARGET, ?security_status, "Enabling security features");
if !security::check_env_vars_were_cleared(&worker_info) {
let err = "not all env vars were cleared when spawning the process";
gum::error!(
target: LOG_TARGET,
?worker_info,
"{}",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, err);
}
}
#[cfg(target_os = "linux")]
if security_status.can_unshare_user_namespace_and_change_root {
if let Err(err) = security::change_root::enable_for_worker(&worker_info) {
let err = format!("Could not change root to be the worker cache path: {}", err);
worker_shutdown_error(worker_info, &err);
}
worker_info.worker_dir_path = std::path::Path::new("/").to_owned();
}
#[cfg(target_os = "linux")]
if security_status.can_enable_landlock {
if let Err(err) = security::landlock::enable_for_worker(&worker_info) {
let err = format!("could not fully enable landlock: {:?}", err);
gum::error!(
target: LOG_TARGET,
?worker_info,
"{}. This should not happen, please report an issue",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, &err);
}
}
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
if security_status.can_enable_seccomp {
if let Err(err) = security::seccomp::enable_for_worker(&worker_info) {
let err = format!("could not fully enable seccomp: {:?}", err);
gum::error!(
target: LOG_TARGET,
?worker_info,
"{}. This should not happen, please report an issue",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, &err);
}
}
}
}
let err = event_loop(stream, &worker_info, security_status)
.unwrap_err();
worker_shutdown(worker_info, &err.to_string());
}
fn worker_shutdown(worker_info: WorkerInfo, err: &str) -> ! {
gum::warn!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
std::process::exit(1);
}
fn worker_shutdown_error(worker_info: WorkerInfo, err: &str) -> ! {
gum::error!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
std::process::exit(1);
}
pub fn cpu_time_monitor_loop(
cpu_time_start: ProcessTime,
timeout: Duration,
finished_rx: Receiver<()>,
) -> Option<Duration> {
loop {
let cpu_time_elapsed = cpu_time_start.elapsed();
if cpu_time_elapsed <= timeout {
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
match finished_rx.recv_timeout(sleep_interval) {
Ok(()) => return None,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return None,
}
}
return Some(cpu_time_elapsed)
}
}
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
Err(_) => "unknown panic payload".to_string(),
},
}
}
fn kill_parent_node_in_emergency() {
unsafe {
let ppid = libc::getppid();
if ppid > 1 {
libc::kill(ppid, libc::SIGTERM);
}
}
}
fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake> {
let worker_handshake = framed_recv_blocking(stream)?;
let worker_handshake = WorkerHandshake::decode(&mut &worker_handshake[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("recv_worker_handshake: failed to decode WorkerHandshake: {}", e),
)
})?;
Ok(worker_handshake)
}
pub fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
return Duration::from_micros(micros)
}
pub fn recv_child_response<T>(
received_data: &mut io::BufReader<&[u8]>,
context: &'static str,
) -> io::Result<T>
where
T: Decode,
{
let response_bytes = framed_recv_blocking(received_data)?;
T::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("{} pvf recv_child_response: decode error: {}", context, e),
)
})
}
pub fn send_result<T, E>(
stream: &mut UnixStream,
result: Result<T, E>,
worker_info: &WorkerInfo,
) -> io::Result<()>
where
T: std::fmt::Debug,
E: std::fmt::Debug + std::fmt::Display,
Result<T, E>: Encode,
{
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred: {}",
err
);
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);
framed_send_blocking(stream, &result.encode()).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred sending result to host: {}",
err
);
err
})
}
pub fn stringify_errno(context: &'static str, errno: Errno) -> String {
format!("{}: {}: {}", context, errno, io::Error::last_os_error())
}
pub mod thread {
use std::{
io, panic,
sync::{Arc, Condvar, Mutex},
thread,
time::Duration,
};
#[derive(Debug, Clone, Copy)]
pub enum WaitOutcome {
Finished,
TimedOut,
Pending,
}
impl WaitOutcome {
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
}
pub type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>;
pub fn get_condvar() -> Cond {
Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()))
}
pub fn spawn_worker_thread<F, R>(
name: &str,
f: F,
cond: Cond,
outcome: WaitOutcome,
) -> io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
R: Send + 'static,
{
thread::Builder::new()
.name(name.into())
.spawn(move || cond_notify_on_done(f, cond, outcome))
}
pub fn spawn_worker_thread_with_stack_size<F, R>(
name: &str,
f: F,
cond: Cond,
outcome: WaitOutcome,
stack_size: usize,
) -> io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
R: Send + 'static,
{
thread::Builder::new()
.name(name.into())
.stack_size(stack_size)
.spawn(move || cond_notify_on_done(f, cond, outcome))
}
fn cond_notify_on_done<F, R>(f: F, cond: Cond, outcome: WaitOutcome) -> R
where
F: FnOnce() -> R,
F: panic::UnwindSafe,
{
let result = panic::catch_unwind(|| f());
cond_notify_all(cond, outcome);
match result {
Ok(inner) => return inner,
Err(err) => panic::resume_unwind(err),
}
}
fn cond_notify_all(cond: Cond, outcome: WaitOutcome) {
let (lock, cvar) = &*cond;
let mut flag = lock.lock().unwrap();
if !flag.is_pending() {
return
}
*flag = outcome;
cvar.notify_all();
}
pub fn wait_for_threads(cond: Cond) -> WaitOutcome {
let (lock, cvar) = &*cond;
let guard = cvar.wait_while(lock.lock().unwrap(), |flag| flag.is_pending()).unwrap();
*guard
}
#[cfg_attr(not(any(target_os = "linux", feature = "jemalloc-allocator")), allow(dead_code))]
pub fn wait_for_threads_with_timeout(cond: &Cond, dur: Duration) -> Option<WaitOutcome> {
let (lock, cvar) = &**cond;
let result = cvar
.wait_timeout_while(lock.lock().unwrap(), dur, |flag| flag.is_pending())
.unwrap();
if result.1.timed_out() {
None
} else {
Some(*result.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
#[test]
fn get_condvar_should_be_pending() {
let condvar = get_condvar();
let outcome = *condvar.0.lock().unwrap();
assert!(outcome.is_pending());
}
#[test]
fn wait_for_threads_with_timeout_return_none_on_time_out() {
let condvar = Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()));
let outcome = wait_for_threads_with_timeout(&condvar, Duration::from_millis(100));
assert!(outcome.is_none());
}
#[test]
fn wait_for_threads_with_timeout_returns_outcome() {
let condvar = Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()));
let condvar2 = condvar.clone();
cond_notify_all(condvar2, WaitOutcome::Finished);
let outcome = wait_for_threads_with_timeout(&condvar, Duration::from_secs(2));
assert_matches!(outcome.unwrap(), WaitOutcome::Finished);
}
#[test]
fn spawn_worker_thread_should_notify_on_done() {
let condvar = Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()));
let response =
spawn_worker_thread("thread", || 2, condvar.clone(), WaitOutcome::TimedOut);
let (lock, _) = &*condvar;
let r = response.unwrap().join().unwrap();
assert_eq!(r, 2);
assert_matches!(*lock.lock().unwrap(), WaitOutcome::TimedOut);
}
#[test]
fn spawn_worker_should_not_change_finished_outcome() {
let condvar = Arc::new((Mutex::new(WaitOutcome::Finished), Condvar::new()));
let response =
spawn_worker_thread("thread", move || 2, condvar.clone(), WaitOutcome::TimedOut);
let r = response.unwrap().join().unwrap();
assert_eq!(r, 2);
assert_matches!(*condvar.0.lock().unwrap(), WaitOutcome::Finished);
}
#[test]
fn cond_notify_on_done_should_update_wait_outcome_when_panic() {
let condvar = Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()));
let err = panic::catch_unwind(panic::AssertUnwindSafe(|| {
cond_notify_on_done(|| panic!("test"), condvar.clone(), WaitOutcome::Finished)
}));
assert_matches!(*condvar.0.lock().unwrap(), WaitOutcome::Finished);
assert!(err.is_err());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc::channel;
#[test]
fn cpu_time_monitor_loop_should_return_time_elapsed() {
let cpu_time_start = ProcessTime::now();
let timeout = Duration::from_secs(0);
let (_tx, rx) = channel();
let result = cpu_time_monitor_loop(cpu_time_start, timeout, rx);
assert_ne!(result, None);
}
#[test]
fn cpu_time_monitor_loop_should_return_none() {
let cpu_time_start = ProcessTime::now();
let timeout = Duration::from_secs(10);
let (tx, rx) = channel();
tx.send(()).unwrap();
let result = cpu_time_monitor_loop(cpu_time_start, timeout, rx);
assert_eq!(result, None);
}
}