referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf_execute_worker/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
18
19#![deny(unused_crate_dependencies)]
20#![warn(missing_docs)]
21
22pub use polkadot_node_core_pvf_common::{
23	error::ExecuteError, executor_interface::execute_artifact,
24};
25use polkadot_parachain_primitives::primitives::ValidationParams;
26
27// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
28//       separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
29const LOG_TARGET: &str = "parachain::pvf-execute-worker";
30
31use codec::{Decode, Encode};
32use cpu_time::ProcessTime;
33use nix::{
34	errno::Errno,
35	sys::{
36		resource::{Usage, UsageWho},
37		wait::WaitStatus,
38	},
39	unistd::{ForkResult, Pid},
40};
41use polkadot_node_core_pvf_common::{
42	compute_checksum,
43	error::InternalValidationError,
44	execute::{
45		ExecuteRequest, Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse,
46	},
47	executor_interface::params_to_wasmtime_semantics,
48	framed_recv_blocking, framed_send_blocking,
49	worker::{
50		cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
51		send_result, stringify_errno, stringify_panic_payload,
52		thread::{self, WaitOutcome},
53		PipeFd, WorkerInfo, WorkerKind,
54	},
55	worker_dir, ArtifactChecksum,
56};
57use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT};
58use polkadot_parachain_primitives::primitives::ValidationResult;
59use polkadot_primitives::{ExecutorParams, PersistedValidationData};
60use std::{
61	io::{self, Read},
62	os::{
63		fd::{AsRawFd, FromRawFd},
64		unix::net::UnixStream,
65	},
66	path::PathBuf,
67	process,
68	sync::{mpsc::channel, Arc},
69	time::Duration,
70};
71
72/// The number of threads for the child process:
73/// 1 - Main thread
74/// 2 - Cpu monitor thread
75/// 3 - Execute thread
76///
77/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside
78/// the child process changes in the future, this value must be changed as well.
79pub const EXECUTE_WORKER_THREAD_NUMBER: u32 = 3;
80
81/// Receives a handshake with information specific to the execute worker.
82fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
83	let handshake_enc = framed_recv_blocking(stream)?;
84	let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
85		io::Error::new(
86			io::ErrorKind::Other,
87			"execute pvf recv_execute_handshake: failed to decode Handshake".to_owned(),
88		)
89	})?;
90	Ok(handshake)
91}
92
93fn recv_request(
94	stream: &mut UnixStream,
95) -> io::Result<(PersistedValidationData, PoV, Duration, ArtifactChecksum)> {
96	let request_bytes = framed_recv_blocking(stream)?;
97	let request = ExecuteRequest::decode(&mut &request_bytes[..]).map_err(|_| {
98		io::Error::new(
99			io::ErrorKind::Other,
100			"execute pvf recv_request: failed to decode ExecuteRequest".to_string(),
101		)
102	})?;
103
104	Ok((request.pvd, request.pov, request.execution_timeout, request.artifact_checksum))
105}
106
107/// Sends an error to the host and returns the original error wrapped in `io::Error`.
108macro_rules! map_and_send_err {
109	($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{
110		let err: WorkerError = $err_constructor($error.to_string()).into();
111		let io_err = io::Error::new(io::ErrorKind::Other, err.to_string());
112		let _ = send_result::<WorkerResponse, WorkerError>($stream, Err(err), $worker_info);
113		io_err
114	}};
115}
116
117/// The entrypoint that the spawned execute worker should start with.
118///
119/// # Parameters
120///
121/// - `socket_path`: specifies the path to the socket used to communicate with the host.
122///
123/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
124///
125/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
126///   immediate worker termination. `None` is used for tests and in other situations when version
127///   check is not necessary.
128///
129/// - `worker_version`: see above
130pub fn worker_entrypoint(
131	socket_path: PathBuf,
132	worker_dir_path: PathBuf,
133	node_version: Option<&str>,
134	worker_version: Option<&str>,
135) {
136	run_worker(
137		WorkerKind::Execute,
138		socket_path,
139		worker_dir_path,
140		node_version,
141		worker_version,
142		|mut stream, worker_info, security_status| {
143			let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
144
145			let Handshake { executor_params } =
146				recv_execute_handshake(&mut stream).map_err(|e| {
147					map_and_send_err!(
148						e,
149						InternalValidationError::HostCommunication,
150						&mut stream,
151						worker_info
152					)
153				})?;
154
155			let executor_params: Arc<ExecutorParams> = Arc::new(executor_params);
156			let execute_thread_stack_size = max_stack_size(&executor_params);
157
158			loop {
159				let (pvd, pov, execution_timeout, artifact_checksum) = recv_request(&mut stream)
160					.map_err(|e| {
161						map_and_send_err!(
162							e,
163							InternalValidationError::HostCommunication,
164							&mut stream,
165							worker_info
166						)
167					})?;
168				gum::debug!(
169					target: LOG_TARGET,
170					?worker_info,
171					?security_status,
172					"worker: validating artifact {}",
173					artifact_path.display(),
174				);
175
176				// Get the artifact bytes.
177				let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| {
178					map_and_send_err!(
179						e,
180						InternalValidationError::CouldNotOpenFile,
181						&mut stream,
182						worker_info
183					)
184				})?;
185
186				if artifact_checksum != compute_checksum(&compiled_artifact_blob) {
187					send_result::<WorkerResponse, WorkerError>(
188						&mut stream,
189						Ok(WorkerResponse {
190							job_response: JobResponse::CorruptedArtifact,
191							duration: Duration::ZERO,
192							pov_size: 0,
193						}),
194						worker_info,
195					)?;
196					continue;
197				}
198
199				let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
200					map_and_send_err!(
201						e,
202						InternalValidationError::CouldNotCreatePipe,
203						&mut stream,
204						worker_info
205					)
206				})?;
207
208				let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
209					.map_err(|errno| {
210						let e = stringify_errno("getrusage before", errno);
211						map_and_send_err!(
212							e,
213							InternalValidationError::Kernel,
214							&mut stream,
215							worker_info
216						)
217					})?;
218				let stream_fd = stream.as_raw_fd();
219
220				let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
221
222				let raw_block_data =
223					match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
224						Ok(data) => data,
225						Err(_) => {
226							send_result::<WorkerResponse, WorkerError>(
227								&mut stream,
228								Ok(WorkerResponse {
229									job_response: JobResponse::PoVDecompressionFailure,
230									duration: Duration::ZERO,
231									pov_size: 0,
232								}),
233								worker_info,
234							)?;
235							continue;
236						},
237					};
238
239				let pov_size = raw_block_data.len() as u32;
240
241				let params = ValidationParams {
242					parent_head: pvd.parent_head.clone(),
243					block_data: BlockData(raw_block_data.to_vec()),
244					relay_parent_number: pvd.relay_parent_number,
245					relay_parent_storage_root: pvd.relay_parent_storage_root,
246				};
247				let params = Arc::new(params.encode());
248
249				cfg_if::cfg_if! {
250					if #[cfg(target_os = "linux")] {
251						let result = if security_status.can_do_secure_clone {
252							handle_clone(
253								pipe_write_fd,
254								pipe_read_fd,
255								stream_fd,
256								&compiled_artifact_blob,
257								&executor_params,
258								&params,
259								execution_timeout,
260								execute_thread_stack_size,
261								worker_info,
262								security_status.can_unshare_user_namespace_and_change_root,
263								usage_before,
264								pov_size,
265							)?
266						} else {
267							// Fall back to using fork.
268							handle_fork(
269								pipe_write_fd,
270								pipe_read_fd,
271								stream_fd,
272								&compiled_artifact_blob,
273								&executor_params,
274								&params,
275								execution_timeout,
276								execute_thread_stack_size,
277								worker_info,
278								usage_before,
279								pov_size,
280							)?
281						};
282					} else {
283						let result = handle_fork(
284							pipe_write_fd,
285							pipe_read_fd,
286							stream_fd,
287							&compiled_artifact_blob,
288							&executor_params,
289							&params,
290							execution_timeout,
291							execute_thread_stack_size,
292							worker_info,
293							usage_before,
294							pov_size,
295						)?;
296					}
297				}
298
299				gum::trace!(
300					target: LOG_TARGET,
301					?worker_info,
302					"worker: sending result to host: {:?}",
303					result
304				);
305				send_result(&mut stream, result, worker_info)?;
306			}
307		},
308	);
309}
310
311fn validate_using_artifact(
312	compiled_artifact_blob: &[u8],
313	executor_params: &ExecutorParams,
314	params: &[u8],
315) -> JobResponse {
316	let descriptor_bytes = match unsafe {
317		// SAFETY: this should be safe since the compiled artifact passed here comes from the
318		//         file created by the prepare workers. These files are obtained by calling
319		//         [`executor_interface::prepare`].
320		execute_artifact(compiled_artifact_blob, executor_params, params)
321	} {
322		Err(ExecuteError::RuntimeConstruction(wasmerr)) =>
323			return JobResponse::runtime_construction("execute", &wasmerr.to_string()),
324		Err(err) => return JobResponse::format_invalid("execute", &err.to_string()),
325		Ok(d) => d,
326	};
327
328	let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
329		Err(err) =>
330			return JobResponse::format_invalid(
331				"validation result decoding failed",
332				&err.to_string(),
333			),
334		Ok(r) => r,
335	};
336
337	JobResponse::Ok { result_descriptor }
338}
339
340#[cfg(target_os = "linux")]
341fn handle_clone(
342	pipe_write_fd: i32,
343	pipe_read_fd: i32,
344	stream_fd: i32,
345	compiled_artifact_blob: &Arc<Vec<u8>>,
346	executor_params: &Arc<ExecutorParams>,
347	params: &Arc<Vec<u8>>,
348	execution_timeout: Duration,
349	execute_stack_size: usize,
350	worker_info: &WorkerInfo,
351	have_unshare_newuser: bool,
352	usage_before: Usage,
353	pov_size: u32,
354) -> io::Result<Result<WorkerResponse, WorkerError>> {
355	use polkadot_node_core_pvf_common::worker::security;
356
357	// SAFETY: new process is spawned within a single threaded process. This invariant
358	// is enforced by tests. Stack size being specified to ensure child doesn't overflow
359	match unsafe {
360		security::clone::clone_on_worker(
361			worker_info,
362			have_unshare_newuser,
363			Box::new(|| {
364				handle_child_process(
365					pipe_write_fd,
366					pipe_read_fd,
367					stream_fd,
368					Arc::clone(compiled_artifact_blob),
369					Arc::clone(executor_params),
370					Arc::clone(params),
371					execution_timeout,
372					execute_stack_size,
373				)
374			}),
375		)
376	} {
377		Ok(child) => handle_parent_process(
378			pipe_read_fd,
379			pipe_write_fd,
380			worker_info,
381			child,
382			usage_before,
383			pov_size,
384			execution_timeout,
385		),
386		Err(security::clone::Error::Clone(errno)) =>
387			Ok(Err(internal_error_from_errno("clone", errno))),
388	}
389}
390
391fn handle_fork(
392	pipe_write_fd: i32,
393	pipe_read_fd: i32,
394	stream_fd: i32,
395	compiled_artifact_blob: &Arc<Vec<u8>>,
396	executor_params: &Arc<ExecutorParams>,
397	params: &Arc<Vec<u8>>,
398	execution_timeout: Duration,
399	execute_worker_stack_size: usize,
400	worker_info: &WorkerInfo,
401	usage_before: Usage,
402	pov_size: u32,
403) -> io::Result<Result<WorkerResponse, WorkerError>> {
404	// SAFETY: new process is spawned within a single threaded process. This invariant
405	// is enforced by tests.
406	match unsafe { nix::unistd::fork() } {
407		Ok(ForkResult::Child) => handle_child_process(
408			pipe_write_fd,
409			pipe_read_fd,
410			stream_fd,
411			Arc::clone(compiled_artifact_blob),
412			Arc::clone(executor_params),
413			Arc::clone(params),
414			execution_timeout,
415			execute_worker_stack_size,
416		),
417		Ok(ForkResult::Parent { child }) => handle_parent_process(
418			pipe_read_fd,
419			pipe_write_fd,
420			worker_info,
421			child,
422			usage_before,
423			pov_size,
424			execution_timeout,
425		),
426		Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
427	}
428}
429
430/// This is used to handle child process during pvf execute worker.
431/// It executes the artifact and pipes back the response to the parent process.
432///
433/// # Returns
434///
435/// - pipe back `JobResponse` to the parent process.
436fn handle_child_process(
437	pipe_write_fd: i32,
438	pipe_read_fd: i32,
439	stream_fd: i32,
440	compiled_artifact_blob: Arc<Vec<u8>>,
441	executor_params: Arc<ExecutorParams>,
442	params: Arc<Vec<u8>>,
443	execution_timeout: Duration,
444	execute_thread_stack_size: usize,
445) -> ! {
446	// SAFETY: this is an open and owned file descriptor at this point.
447	let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
448
449	// Drop the read end so we don't have too many FDs open.
450	if let Err(errno) = nix::unistd::close(pipe_read_fd) {
451		send_child_response(&mut pipe_write, job_error_from_errno("closing pipe", errno));
452	}
453
454	// Dropping the stream closes the underlying socket. We want to make sure
455	// that the sandboxed child can't get any kind of information from the
456	// outside world. The only IPC it should be able to do is sending its
457	// response over the pipe.
458	if let Err(errno) = nix::unistd::close(stream_fd) {
459		send_child_response(&mut pipe_write, job_error_from_errno("closing stream", errno));
460	}
461
462	gum::debug!(
463		target: LOG_TARGET,
464		worker_job_pid = %process::id(),
465		"worker job: executing artifact",
466	);
467
468	// Conditional variable to notify us when a thread is done.
469	let condvar = thread::get_condvar();
470	let cpu_time_start = ProcessTime::now();
471
472	// Spawn a new thread that runs the CPU time monitor.
473	let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
474	let cpu_time_monitor_thread = thread::spawn_worker_thread(
475		"cpu time monitor thread",
476		move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx),
477		Arc::clone(&condvar),
478		WaitOutcome::TimedOut,
479	)
480	.unwrap_or_else(|err| {
481		send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
482	});
483
484	let execute_thread = thread::spawn_worker_thread_with_stack_size(
485		"execute thread",
486		move || validate_using_artifact(&compiled_artifact_blob, &executor_params, &params),
487		Arc::clone(&condvar),
488		WaitOutcome::Finished,
489		execute_thread_stack_size,
490	)
491	.unwrap_or_else(|err| {
492		send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
493	});
494
495	let outcome = thread::wait_for_threads(condvar);
496
497	let response = match outcome {
498		WaitOutcome::Finished => {
499			let _ = cpu_time_monitor_tx.send(());
500			execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e)))
501		},
502		// If the CPU thread is not selected, we signal it to end, the join handle is
503		// dropped and the thread will finish in the background.
504		WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
505			Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut),
506			Ok(None) => Err(JobError::CpuTimeMonitorThread(
507				"error communicating over finished channel".into(),
508			)),
509			Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))),
510		},
511		WaitOutcome::Pending =>
512			unreachable!("we run wait_while until the outcome is no longer pending; qed"),
513	};
514
515	send_child_response(&mut pipe_write, response);
516}
517
518/// Returns stack size based on the number of threads.
519/// The stack size is represented by 2MiB * number_of_threads + native stack;
520///
521/// # Background
522///
523/// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
524/// That native code does not create any stacks and just reuses the stack of the thread that
525/// wasmtime was invoked from.
526///
527/// Also, we configure the executor to provide the deterministic stack and that requires
528/// supplying the amount of the native stack space that wasm is allowed to use. This is
529/// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
530///
531/// There are quirks to that configuration knob:
532///
533/// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check that
534///    the stack space is actually available.
535///
536///    That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
537///    more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
538///    guard page and the Rust stack overflow handler will be triggered. That leads to an
539///    **abort**.
540///
541/// 2. It cannot and does not limit the stack space consumed by Rust code.
542///
543///    Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
544///    will abort and that will abort the process as well.
545///
546/// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
547/// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
548/// DEFAULT_NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
549///
550/// Hence we need to increase it. The simplest way to fix that is to spawn an execute thread with
551/// the desired stack limit. We must also make sure the job process has enough stack for *all* its
552/// threads. This function can be used to get the stack size of either the execute thread or execute
553/// job process.
554fn max_stack_size(executor_params: &ExecutorParams) -> usize {
555	let (_sem, deterministic_stack_limit) = params_to_wasmtime_semantics(executor_params);
556	return (2 * 1024 * 1024 + deterministic_stack_limit.native_stack_max) as usize;
557}
558
559/// Waits for child process to finish and handle child response from pipe.
560///
561/// # Returns
562///
563/// - The response, either `Ok` or some error state.
564fn handle_parent_process(
565	pipe_read_fd: i32,
566	pipe_write_fd: i32,
567	worker_info: &WorkerInfo,
568	job_pid: Pid,
569	usage_before: Usage,
570	pov_size: u32,
571	timeout: Duration,
572) -> io::Result<Result<WorkerResponse, WorkerError>> {
573	// the read end will wait until all write ends have been closed,
574	// this drop is necessary to avoid deadlock
575	if let Err(errno) = nix::unistd::close(pipe_write_fd) {
576		return Ok(Err(internal_error_from_errno("closing pipe write fd", errno)));
577	};
578
579	// SAFETY: pipe_read_fd is an open and owned file descriptor at this point.
580	let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
581
582	// Read from the child. Don't decode unless the process exited normally, which we check later.
583	let mut received_data = Vec::new();
584	pipe_read
585		.read_to_end(&mut received_data)
586		// Could not decode job response. There is either a bug or the job was hijacked.
587		// Should retry at any rate.
588		.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
589
590	let status = nix::sys::wait::waitpid(job_pid, None);
591	gum::trace!(
592		target: LOG_TARGET,
593		?worker_info,
594		%job_pid,
595		"execute worker received wait status from job: {:?}",
596		status,
597	);
598
599	let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
600		Ok(usage) => usage,
601		Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))),
602	};
603
604	// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
605	// child to report its own time.
606	// As `getrusage` returns resource usage from all terminated child processes,
607	// it is necessary to subtract the usage before the current child process to isolate its cpu
608	// time
609	let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
610	if cpu_tv >= timeout {
611		gum::warn!(
612			target: LOG_TARGET,
613			?worker_info,
614			%job_pid,
615			"execute job took {}ms cpu time, exceeded execute timeout {}ms",
616			cpu_tv.as_millis(),
617			timeout.as_millis(),
618		);
619		return Ok(Err(WorkerError::JobTimedOut))
620	}
621
622	match status {
623		Ok(WaitStatus::Exited(_, exit_status)) => {
624			let mut reader = io::BufReader::new(received_data.as_slice());
625			let result = recv_child_response(&mut reader, "execute")?;
626
627			match result {
628				Ok(job_response) => {
629					// The exit status should have been zero if no error occurred.
630					if exit_status != 0 {
631						return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus(
632							exit_status,
633						))));
634					}
635
636					Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv }))
637				},
638				Err(job_error) => {
639					gum::warn!(
640						target: LOG_TARGET,
641						?worker_info,
642						%job_pid,
643						"execute job error: {}",
644						job_error,
645					);
646					if matches!(job_error, JobError::TimedOut) {
647						Ok(Err(WorkerError::JobTimedOut))
648					} else {
649						Ok(Err(WorkerError::JobError(job_error.into())))
650					}
651				},
652			}
653		},
654		// The job was killed by the given signal.
655		//
656		// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
657		// other reason, so we still need to check for seccomp violations elsewhere.
658		Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied {
659			err: format!("received signal: {signal:?}"),
660			job_pid: job_pid.as_raw(),
661		})),
662		Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))),
663
664		// It is within an attacker's power to send an unexpected exit status. So we cannot treat
665		// this as an internal error (which would make us abstain), but must vote against.
666		Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied {
667			err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
668			job_pid: job_pid.as_raw(),
669		})),
670	}
671}
672
673/// Write a job response to the pipe and exit process after.
674///
675/// # Arguments
676///
677/// - `pipe_write`: A `PipeFd` structure, the writing end of a pipe.
678///
679/// - `response`: Child process response
680fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
681	framed_send_blocking(pipe_write, response.encode().as_slice())
682		.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
683
684	if response.is_ok() {
685		process::exit(libc::EXIT_SUCCESS)
686	} else {
687		process::exit(libc::EXIT_FAILURE)
688	}
689}
690
691fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError {
692	WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno)))
693}
694
695fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
696	Err(JobError::Kernel(stringify_errno(context, errno)))
697}