referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf_prepare_worker/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Contains the logic for preparing PVFs. Used by the polkadot-prepare-worker binary.
18
19mod memory_stats;
20
21// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
22//       separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`.
23const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
24
25#[cfg(target_os = "linux")]
26use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
27#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
28use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
29use codec::{Decode, Encode};
30use nix::{
31	errno::Errno,
32	sys::{
33		resource::{Usage, UsageWho},
34		wait::WaitStatus,
35	},
36	unistd::{ForkResult, Pid},
37};
38use polkadot_node_core_pvf_common::{
39	compute_checksum,
40	error::{PrepareError, PrepareWorkerResult},
41	executor_interface::{create_runtime_from_artifact_bytes, prepare, prevalidate},
42	framed_recv_blocking, framed_send_blocking,
43	prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
44	pvf::PvfPrepData,
45	worker::{
46		cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
47		send_result, stringify_errno, stringify_panic_payload,
48		thread::{self, spawn_worker_thread, WaitOutcome},
49		PipeFd, WorkerInfo, WorkerKind,
50	},
51	worker_dir, ProcessTime,
52};
53use polkadot_primitives::ExecutorParams;
54use std::{
55	fs,
56	io::{self, Read},
57	os::{
58		fd::{AsRawFd, FromRawFd, RawFd},
59		unix::net::UnixStream,
60	},
61	path::{Path, PathBuf},
62	process,
63	sync::{mpsc::channel, Arc},
64	time::Duration,
65};
66use tracking_allocator::TrackingAllocator;
67
68#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
69#[global_allocator]
70static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
71	TrackingAllocator(tikv_jemallocator::Jemalloc);
72
73#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
74#[global_allocator]
75static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
76
77/// The number of threads for the child process:
78/// 1 - Main thread
79/// 2 - Cpu monitor thread
80/// 3 - Memory tracker thread
81/// 4 - Prepare thread
82///
83/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside
84/// the child process changes in the future, this value must be changed as well.
85pub const PREPARE_WORKER_THREAD_NUMBER: u32 = 4;
86
87/// Contains the bytes for a successfully compiled artifact.
88#[derive(Encode, Decode)]
89pub struct CompiledArtifact(Vec<u8>);
90
91impl CompiledArtifact {
92	/// Creates a `CompiledArtifact`.
93	pub fn new(code: Vec<u8>) -> Self {
94		Self(code)
95	}
96}
97
98impl AsRef<[u8]> for CompiledArtifact {
99	fn as_ref(&self) -> &[u8] {
100		self.0.as_slice()
101	}
102}
103
104#[derive(Encode, Decode)]
105pub struct PrepareOutcome {
106	pub compiled_artifact: CompiledArtifact,
107	pub observed_wasm_code_len: u32,
108}
109
110/// Get a worker request.
111fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
112	let pvf = framed_recv_blocking(stream)?;
113	let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
114		io::Error::new(
115			io::ErrorKind::Other,
116			format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
117		)
118	})?;
119	Ok(pvf)
120}
121
122fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
123	unsafe {
124		// SAFETY: Inside the failure handler, the allocator is locked and no allocations or
125		// deallocations are possible. For Linux, that always holds for the code below, so it's
126		// safe. For MacOS, that technically holds at the time of writing, but there are no future
127		// guarantees.
128		// The arguments of unsafe `libc` calls are valid, the payload validity is covered with
129		// a test.
130		ALLOC.start_tracking(
131			limit,
132			Some(Box::new(move || {
133				#[cfg(target_os = "linux")]
134				{
135					// Syscalls never allocate or deallocate, so this is safe.
136					libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
137					libc::syscall(libc::SYS_close, fd);
138					// Make sure we exit from all threads. Copied from glibc.
139					libc::syscall(libc::SYS_exit_group, 1);
140					loop {
141						libc::syscall(libc::SYS_exit, 1);
142					}
143				}
144				#[cfg(not(target_os = "linux"))]
145				{
146					// Syscalls are not available on MacOS, so we have to use `libc` wrappers.
147					// Technically, there may be allocations inside, although they shouldn't be
148					// there. In that case, we'll see deadlocks on MacOS after the OOM condition
149					// triggered. As we consider running a validator on MacOS unsafe, and this
150					// code is only run by a validator, it's a lesser evil.
151					libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
152					libc::close(fd);
153					libc::_exit(1);
154				}
155			})),
156		);
157	}
158}
159
160fn end_memory_tracking() -> isize {
161	ALLOC.end_tracking()
162}
163
164/// The entrypoint that the spawned prepare worker should start with.
165///
166/// # Parameters
167///
168/// - `socket_path`: specifies the path to the socket used to communicate with the host.
169///
170/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
171///
172/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
173///   immediate worker termination. `None` is used for tests and in other situations when version
174///   check is not necessary.
175///
176/// - `worker_version`: see above
177///
178/// # Flow
179///
180/// This runs the following in a loop:
181///
182/// 1. Get the code and parameters for preparation from the host.
183///
184/// 2. Start a new child process
185///
186/// 3. Start the memory tracker and the actual preparation in two separate threads.
187///
188/// 4. Wait on the two threads created in step 3.
189///
190/// 5. Stop the memory tracker and get the stats.
191///
192/// 6. Pipe the result back to the parent process and exit from child process.
193///
194/// 7. If compilation succeeded, write the compiled artifact into a temporary file.
195///
196/// 8. Send the result of preparation back to the host, including the checksum of the artifact. If
197///    any error occurred in the above steps, we send that in the `PrepareWorkerResult`.
198pub fn worker_entrypoint(
199	socket_path: PathBuf,
200	worker_dir_path: PathBuf,
201	node_version: Option<&str>,
202	worker_version: Option<&str>,
203) {
204	run_worker(
205		WorkerKind::Prepare,
206		socket_path,
207		worker_dir_path,
208		node_version,
209		worker_version,
210		|mut stream, worker_info, security_status| {
211			let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_info.worker_dir_path);
212
213			loop {
214				let pvf = recv_request(&mut stream)?;
215				gum::debug!(
216					target: LOG_TARGET,
217					?worker_info,
218					?security_status,
219					"worker: preparing artifact",
220				);
221
222				let preparation_timeout = pvf.prep_timeout();
223				let prepare_job_kind = pvf.prep_kind();
224				let executor_params = pvf.executor_params();
225
226				let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
227
228				let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
229					Ok(usage) => usage,
230					Err(errno) => {
231						let result: PrepareWorkerResult =
232							Err(error_from_errno("getrusage before", errno));
233						send_result(&mut stream, result, worker_info)?;
234						continue
235					},
236				};
237
238				let stream_fd = stream.as_raw_fd();
239
240				cfg_if::cfg_if! {
241					if #[cfg(target_os = "linux")] {
242						let result = if security_status.can_do_secure_clone {
243							handle_clone(
244								&pvf,
245								pipe_write_fd,
246								pipe_read_fd,
247								stream_fd,
248								preparation_timeout,
249								prepare_job_kind,
250								&executor_params,
251								worker_info,
252								security_status.can_unshare_user_namespace_and_change_root,
253								&temp_artifact_dest,
254								usage_before,
255							)
256						} else {
257							// Fall back to using fork.
258							handle_fork(
259								&pvf,
260								pipe_write_fd,
261								pipe_read_fd,
262								stream_fd,
263								preparation_timeout,
264								prepare_job_kind,
265								&executor_params,
266								worker_info,
267								&temp_artifact_dest,
268								usage_before,
269							)
270						};
271					} else {
272						let result = handle_fork(
273							&pvf,
274							pipe_write_fd,
275							pipe_read_fd,
276							stream_fd,
277							preparation_timeout,
278							prepare_job_kind,
279							&executor_params,
280							worker_info,
281							&temp_artifact_dest,
282							usage_before,
283						);
284					}
285				}
286
287				gum::trace!(
288					target: LOG_TARGET,
289					?worker_info,
290					"worker: sending result to host: {:?}",
291					result
292				);
293				send_result(&mut stream, result, worker_info)?;
294			}
295		},
296	);
297}
298
299fn prepare_artifact(pvf: PvfPrepData) -> Result<PrepareOutcome, PrepareError> {
300	let maybe_compressed_code = pvf.maybe_compressed_code();
301	let raw_validation_code = sp_maybe_compressed_blob::decompress(
302		&maybe_compressed_code,
303		pvf.validation_code_bomb_limit() as usize,
304	)
305	.map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?;
306	let observed_wasm_code_len = raw_validation_code.len() as u32;
307
308	let blob = match prevalidate(&raw_validation_code) {
309		Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
310		Ok(b) => b,
311	};
312
313	match prepare(blob, &pvf.executor_params()) {
314		Ok(compiled_artifact) => Ok(PrepareOutcome {
315			compiled_artifact: CompiledArtifact::new(compiled_artifact),
316			observed_wasm_code_len,
317		}),
318		Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
319	}
320}
321
322/// Try constructing the runtime to catch any instantiation errors during pre-checking.
323fn runtime_construction_check(
324	artifact_bytes: &[u8],
325	executor_params: &ExecutorParams,
326) -> Result<(), PrepareError> {
327	// SAFETY: We just compiled this artifact.
328	let result = unsafe { create_runtime_from_artifact_bytes(artifact_bytes, executor_params) };
329	result
330		.map(|_runtime| ())
331		.map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
332}
333
334#[derive(Encode, Decode)]
335struct JobResponse {
336	artifact: CompiledArtifact,
337	memory_stats: MemoryStats,
338	observed_wasm_code_len: u32,
339}
340
341#[cfg(target_os = "linux")]
342fn handle_clone(
343	pvf: &PvfPrepData,
344	pipe_write_fd: i32,
345	pipe_read_fd: i32,
346	stream_fd: i32,
347	preparation_timeout: Duration,
348	prepare_job_kind: PrepareJobKind,
349	executor_params: &Arc<ExecutorParams>,
350	worker_info: &WorkerInfo,
351	have_unshare_newuser: bool,
352	temp_artifact_dest: &Path,
353	usage_before: Usage,
354) -> Result<PrepareWorkerSuccess, PrepareError> {
355	use polkadot_node_core_pvf_common::worker::security;
356
357	// SAFETY: new process is spawned within a single threaded process. This invariant
358	// is enforced by tests. Stack size being specified to ensure child doesn't overflow
359	match unsafe {
360		security::clone::clone_on_worker(
361			worker_info,
362			have_unshare_newuser,
363			Box::new(|| {
364				handle_child_process(
365					pvf.clone(),
366					pipe_write_fd,
367					pipe_read_fd,
368					stream_fd,
369					preparation_timeout,
370					prepare_job_kind,
371					Arc::clone(&executor_params),
372				)
373			}),
374		)
375	} {
376		Ok(child) => handle_parent_process(
377			pipe_read_fd,
378			pipe_write_fd,
379			worker_info,
380			child,
381			temp_artifact_dest,
382			usage_before,
383			preparation_timeout,
384		),
385		Err(security::clone::Error::Clone(errno)) => Err(error_from_errno("clone", errno)),
386	}
387}
388
389fn handle_fork(
390	pvf: &PvfPrepData,
391	pipe_write_fd: i32,
392	pipe_read_fd: i32,
393	stream_fd: i32,
394	preparation_timeout: Duration,
395	prepare_job_kind: PrepareJobKind,
396	executor_params: &Arc<ExecutorParams>,
397	worker_info: &WorkerInfo,
398	temp_artifact_dest: &Path,
399	usage_before: Usage,
400) -> Result<PrepareWorkerSuccess, PrepareError> {
401	// SAFETY: new process is spawned within a single threaded process. This invariant
402	// is enforced by tests.
403	match unsafe { nix::unistd::fork() } {
404		Ok(ForkResult::Child) => handle_child_process(
405			pvf.clone(),
406			pipe_write_fd,
407			pipe_read_fd,
408			stream_fd,
409			preparation_timeout,
410			prepare_job_kind,
411			Arc::clone(executor_params),
412		),
413		Ok(ForkResult::Parent { child }) => handle_parent_process(
414			pipe_read_fd,
415			pipe_write_fd,
416			worker_info,
417			child,
418			temp_artifact_dest,
419			usage_before,
420			preparation_timeout,
421		),
422		Err(errno) => Err(error_from_errno("fork", errno)),
423	}
424}
425
426/// This is used to handle child process during pvf prepare worker.
427/// It prepares the artifact and tracks memory stats during preparation
428/// and pipes back the response to the parent process.
429///
430/// # Returns
431///
432/// - If any error occur, pipe response back with `PrepareError`.
433///
434/// - If success, pipe back `JobResponse`.
435fn handle_child_process(
436	pvf: PvfPrepData,
437	pipe_write_fd: i32,
438	pipe_read_fd: i32,
439	stream_fd: i32,
440	preparation_timeout: Duration,
441	prepare_job_kind: PrepareJobKind,
442	executor_params: Arc<ExecutorParams>,
443) -> ! {
444	// SAFETY: pipe_writer is an open and owned file descriptor at this point.
445	let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
446
447	// Drop the read end so we don't have too many FDs open.
448	if let Err(errno) = nix::unistd::close(pipe_read_fd) {
449		send_child_response(
450			&mut pipe_write,
451			JobResult::Err(error_from_errno("closing pipe", errno)),
452		);
453	}
454
455	// Dropping the stream closes the underlying socket. We want to make sure
456	// that the sandboxed child can't get any kind of information from the
457	// outside world. The only IPC it should be able to do is sending its
458	// response over the pipe.
459	if let Err(errno) = nix::unistd::close(stream_fd) {
460		send_child_response(
461			&mut pipe_write,
462			JobResult::Err(error_from_errno("error closing stream", errno)),
463		);
464	}
465
466	let worker_job_pid = process::id();
467	gum::debug!(
468		target: LOG_TARGET,
469		%worker_job_pid,
470		?prepare_job_kind,
471		?preparation_timeout,
472		"worker job: preparing artifact",
473	);
474
475	// Conditional variable to notify us when a thread is done.
476	let condvar = thread::get_condvar();
477
478	// Run the memory tracker in a regular, non-worker thread.
479	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
480	let condvar_memory = Arc::clone(&condvar);
481	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
482	let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
483
484	start_memory_tracking(
485		pipe_write.as_raw_fd(),
486		executor_params.prechecking_max_memory().map(|v| {
487			v.try_into().unwrap_or_else(|_| {
488				gum::warn!(
489					LOG_TARGET,
490					%worker_job_pid,
491					"Illegal pre-checking max memory value {} discarded",
492					v,
493				);
494				0
495			})
496		}),
497	);
498
499	let cpu_time_start = ProcessTime::now();
500
501	// Spawn a new thread that runs the CPU time monitor.
502	let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
503	let cpu_time_monitor_thread = thread::spawn_worker_thread(
504		"cpu time monitor thread",
505		move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx),
506		Arc::clone(&condvar),
507		WaitOutcome::TimedOut,
508	)
509	.unwrap_or_else(|err| {
510		send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
511	});
512
513	let prepare_thread = spawn_worker_thread(
514		"prepare worker",
515		move || {
516			#[allow(unused_mut)]
517			let mut result = prepare_artifact(pvf).map(|o| (o,));
518
519			// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
520			#[cfg(target_os = "linux")]
521			let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread()));
522
523			// If we are pre-checking, check for runtime construction errors.
524			//
525			// As pre-checking is more strict than just preparation in terms of memory
526			// and time, it is okay to do extra checks here. This takes negligible time
527			// anyway.
528			if let PrepareJobKind::Prechecking = prepare_job_kind {
529				result = result.and_then(|output| {
530					runtime_construction_check(
531						output.0.compiled_artifact.as_ref(),
532						&executor_params,
533					)?;
534					Ok(output)
535				});
536			}
537			result
538		},
539		Arc::clone(&condvar),
540		WaitOutcome::Finished,
541	)
542	.unwrap_or_else(|err| {
543		send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
544	});
545
546	let outcome = thread::wait_for_threads(condvar);
547
548	let peak_alloc = {
549		let peak = end_memory_tracking();
550		gum::debug!(
551			target: LOG_TARGET,
552			%worker_job_pid,
553			"prepare job peak allocation is {} bytes",
554			peak,
555		);
556		peak
557	};
558
559	let result = match outcome {
560		WaitOutcome::Finished => {
561			let _ = cpu_time_monitor_tx.send(());
562
563			match prepare_thread.join().unwrap_or_else(|err| {
564				send_child_response(
565					&mut pipe_write,
566					Err(PrepareError::JobError(stringify_panic_payload(err))),
567				)
568			}) {
569				Err(err) => Err(err),
570				Ok(ok) => {
571					cfg_if::cfg_if! {
572						if #[cfg(target_os = "linux")] {
573							let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok;
574						} else {
575							let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok;
576						}
577					}
578
579					// Stop the memory stats worker and get its observed memory stats.
580					#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
581					let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id());
582
583					let memory_stats = MemoryStats {
584						#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
585						memory_tracker_stats,
586						#[cfg(target_os = "linux")]
587						max_rss: extract_max_rss_stat(max_rss, process::id()),
588						// Negative peak allocation values are legit; they are narrow
589						// corner cases and shouldn't affect overall statistics
590						// significantly
591						peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
592					};
593
594					Ok(JobResponse {
595						artifact: compiled_artifact,
596						observed_wasm_code_len,
597						memory_stats,
598					})
599				},
600			}
601		},
602
603		// If the CPU thread is not selected, we signal it to end, the join handle is
604		// dropped and the thread will finish in the background.
605		WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
606			Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut),
607			Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())),
608			Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
609		},
610		WaitOutcome::Pending =>
611			unreachable!("we run wait_while until the outcome is no longer pending; qed"),
612	};
613
614	send_child_response(&mut pipe_write, result);
615}
616
617/// Waits for child process to finish and handle child response from pipe.
618///
619/// # Returns
620///
621/// - If the child send response without an error, this function returns `Ok(PrepareStats)`
622///   containing memory and CPU usage statistics.
623///
624/// - If the child send response with an error, it returns a `PrepareError` with that error.
625///
626/// - If the child process timeout, it returns `PrepareError::TimedOut`.
627fn handle_parent_process(
628	pipe_read_fd: i32,
629	pipe_write_fd: i32,
630	worker_info: &WorkerInfo,
631	job_pid: Pid,
632	temp_artifact_dest: &Path,
633	usage_before: Usage,
634	timeout: Duration,
635) -> Result<PrepareWorkerSuccess, PrepareError> {
636	// the read end will wait until all write ends have been closed,
637	// this drop is necessary to avoid deadlock
638	if let Err(errno) = nix::unistd::close(pipe_write_fd) {
639		return Err(error_from_errno("closing pipe write fd", errno));
640	};
641
642	// SAFETY: this is an open and owned file descriptor at this point.
643	let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
644
645	// Read from the child. Don't decode unless the process exited normally, which we check later.
646	let mut received_data = Vec::new();
647	pipe_read
648		.read_to_end(&mut received_data)
649		.map_err(|err| PrepareError::IoErr(err.to_string()))?;
650
651	let status = nix::sys::wait::waitpid(job_pid, None);
652	gum::trace!(
653		target: LOG_TARGET,
654		?worker_info,
655		%job_pid,
656		"prepare worker received wait status from job: {:?}",
657		status,
658	);
659
660	let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
661		.map_err(|errno| error_from_errno("getrusage after", errno))?;
662
663	// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
664	// child to report its own time.
665	// As `getrusage` returns resource usage from all terminated child processes,
666	// it is necessary to subtract the usage before the current child process to isolate its cpu
667	// time
668	let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
669	if cpu_tv >= timeout {
670		gum::warn!(
671			target: LOG_TARGET,
672			?worker_info,
673			%job_pid,
674			"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
675			cpu_tv.as_millis(),
676			timeout.as_millis(),
677		);
678		return Err(PrepareError::TimedOut)
679	}
680
681	match status {
682		Ok(WaitStatus::Exited(_pid, exit_status)) => {
683			let mut reader = io::BufReader::new(received_data.as_slice());
684			let result = recv_child_response(&mut reader, "prepare")
685				.map_err(|err| PrepareError::JobError(err.to_string()))?;
686
687			match result {
688				Err(err) => Err(err),
689				Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => {
690					// The exit status should have been zero if no error occurred.
691					if exit_status != 0 {
692						return Err(PrepareError::JobError(format!(
693							"unexpected exit status: {}",
694							exit_status
695						)))
696					}
697
698					// Write the serialized artifact into a temp file.
699					//
700					// PVF host only keeps artifacts statuses in its memory,
701					// successfully compiled code gets stored on the disk (and
702					// consequently deserialized by execute-workers). The prepare worker
703					// is only required to send `Ok` to the pool to indicate the
704					// success.
705					gum::debug!(
706						target: LOG_TARGET,
707						?worker_info,
708						%job_pid,
709						"worker: writing artifact to {}",
710						temp_artifact_dest.display(),
711					);
712					// Write to the temp file created by the host.
713					if let Err(err) = fs::write(temp_artifact_dest, &artifact) {
714						return Err(PrepareError::IoErr(err.to_string()))
715					};
716
717					let checksum = compute_checksum(&artifact.as_ref());
718					Ok(PrepareWorkerSuccess {
719						checksum,
720						stats: PrepareStats {
721							memory_stats,
722							cpu_time_elapsed: cpu_tv,
723							observed_wasm_code_len,
724						},
725					})
726				},
727			}
728		},
729		// The job was killed by the given signal.
730		//
731		// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
732		// other reason, so we still need to check for seccomp violations elsewhere.
733		Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Err(PrepareError::JobDied {
734			err: format!("received signal: {signal:?}"),
735			job_pid: job_pid.as_raw(),
736		}),
737		Err(errno) => Err(error_from_errno("waitpid", errno)),
738
739		// An attacker can make the child process return any exit status it wants. So we can treat
740		// all unexpected cases the same way.
741		Ok(unexpected_wait_status) => Err(PrepareError::JobDied {
742			err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
743			job_pid: job_pid.as_raw(),
744		}),
745	}
746}
747
748/// Write a job response to the pipe and exit process after.
749///
750/// # Arguments
751///
752/// - `pipe_write`: A `PipeFd` structure, the writing end of a pipe.
753///
754/// - `response`: Child process response
755fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
756	framed_send_blocking(pipe_write, response.encode().as_slice())
757		.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
758
759	if response.is_ok() {
760		process::exit(libc::EXIT_SUCCESS)
761	} else {
762		process::exit(libc::EXIT_FAILURE)
763	}
764}
765
766fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
767	PrepareError::Kernel(stringify_errno(context, errno))
768}
769
770type JobResult = Result<JobResponse, PrepareError>;
771
772/// Pre-encoded length-prefixed `JobResult::Err(PrepareError::OutOfMemory)`
773const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
774
775#[test]
776fn pre_encoded_payloads() {
777	// NOTE: This must match the type of `response` in `send_child_response`.
778	let oom_unencoded: JobResult = JobResult::Err(PrepareError::OutOfMemory);
779	let oom_encoded = oom_unencoded.encode();
780	// The payload is prefixed with	its length in `framed_send`.
781	let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
782	oom_payload.extend(oom_encoded);
783	assert_eq!(oom_payload, OOM_PAYLOAD);
784}