1mod memory_stats;
20
21const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
24
25#[cfg(target_os = "linux")]
26use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
27#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
28use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
29use codec::{Decode, Encode};
30use nix::{
31 errno::Errno,
32 sys::{
33 resource::{Usage, UsageWho},
34 wait::WaitStatus,
35 },
36 unistd::{ForkResult, Pid},
37};
38use polkadot_node_core_pvf_common::{
39 compute_checksum,
40 error::{PrepareError, PrepareWorkerResult},
41 executor_interface::{create_runtime_from_artifact_bytes, prepare, prevalidate},
42 framed_recv_blocking, framed_send_blocking,
43 prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
44 pvf::PvfPrepData,
45 worker::{
46 cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
47 send_result, stringify_errno, stringify_panic_payload,
48 thread::{self, spawn_worker_thread, WaitOutcome},
49 PipeFd, WorkerInfo, WorkerKind,
50 },
51 worker_dir, ProcessTime,
52};
53use polkadot_primitives::ExecutorParams;
54use std::{
55 fs,
56 io::{self, Read},
57 os::{
58 fd::{AsRawFd, FromRawFd, RawFd},
59 unix::net::UnixStream,
60 },
61 path::{Path, PathBuf},
62 process,
63 sync::{mpsc::channel, Arc},
64 time::Duration,
65};
66use tracking_allocator::TrackingAllocator;
67
68#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
69#[global_allocator]
70static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
71 TrackingAllocator(tikv_jemallocator::Jemalloc);
72
73#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
74#[global_allocator]
75static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
76
77pub const PREPARE_WORKER_THREAD_NUMBER: u32 = 4;
86
87#[derive(Encode, Decode)]
89pub struct CompiledArtifact(Vec<u8>);
90
91impl CompiledArtifact {
92 pub fn new(code: Vec<u8>) -> Self {
94 Self(code)
95 }
96}
97
98impl AsRef<[u8]> for CompiledArtifact {
99 fn as_ref(&self) -> &[u8] {
100 self.0.as_slice()
101 }
102}
103
104#[derive(Encode, Decode)]
105pub struct PrepareOutcome {
106 pub compiled_artifact: CompiledArtifact,
107 pub observed_wasm_code_len: u32,
108}
109
110fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
112 let pvf = framed_recv_blocking(stream)?;
113 let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
114 io::Error::new(
115 io::ErrorKind::Other,
116 format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
117 )
118 })?;
119 Ok(pvf)
120}
121
122fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
123 unsafe {
124 ALLOC.start_tracking(
131 limit,
132 Some(Box::new(move || {
133 #[cfg(target_os = "linux")]
134 {
135 libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
137 libc::syscall(libc::SYS_close, fd);
138 libc::syscall(libc::SYS_exit_group, 1);
140 loop {
141 libc::syscall(libc::SYS_exit, 1);
142 }
143 }
144 #[cfg(not(target_os = "linux"))]
145 {
146 libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
152 libc::close(fd);
153 libc::_exit(1);
154 }
155 })),
156 );
157 }
158}
159
160fn end_memory_tracking() -> isize {
161 ALLOC.end_tracking()
162}
163
164pub fn worker_entrypoint(
199 socket_path: PathBuf,
200 worker_dir_path: PathBuf,
201 node_version: Option<&str>,
202 worker_version: Option<&str>,
203) {
204 run_worker(
205 WorkerKind::Prepare,
206 socket_path,
207 worker_dir_path,
208 node_version,
209 worker_version,
210 |mut stream, worker_info, security_status| {
211 let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_info.worker_dir_path);
212
213 loop {
214 let pvf = recv_request(&mut stream)?;
215 gum::debug!(
216 target: LOG_TARGET,
217 ?worker_info,
218 ?security_status,
219 "worker: preparing artifact",
220 );
221
222 let preparation_timeout = pvf.prep_timeout();
223 let prepare_job_kind = pvf.prep_kind();
224 let executor_params = pvf.executor_params();
225
226 let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
227
228 let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
229 Ok(usage) => usage,
230 Err(errno) => {
231 let result: PrepareWorkerResult =
232 Err(error_from_errno("getrusage before", errno));
233 send_result(&mut stream, result, worker_info)?;
234 continue
235 },
236 };
237
238 let stream_fd = stream.as_raw_fd();
239
240 cfg_if::cfg_if! {
241 if #[cfg(target_os = "linux")] {
242 let result = if security_status.can_do_secure_clone {
243 handle_clone(
244 &pvf,
245 pipe_write_fd,
246 pipe_read_fd,
247 stream_fd,
248 preparation_timeout,
249 prepare_job_kind,
250 &executor_params,
251 worker_info,
252 security_status.can_unshare_user_namespace_and_change_root,
253 &temp_artifact_dest,
254 usage_before,
255 )
256 } else {
257 handle_fork(
259 &pvf,
260 pipe_write_fd,
261 pipe_read_fd,
262 stream_fd,
263 preparation_timeout,
264 prepare_job_kind,
265 &executor_params,
266 worker_info,
267 &temp_artifact_dest,
268 usage_before,
269 )
270 };
271 } else {
272 let result = handle_fork(
273 &pvf,
274 pipe_write_fd,
275 pipe_read_fd,
276 stream_fd,
277 preparation_timeout,
278 prepare_job_kind,
279 &executor_params,
280 worker_info,
281 &temp_artifact_dest,
282 usage_before,
283 );
284 }
285 }
286
287 gum::trace!(
288 target: LOG_TARGET,
289 ?worker_info,
290 "worker: sending result to host: {:?}",
291 result
292 );
293 send_result(&mut stream, result, worker_info)?;
294 }
295 },
296 );
297}
298
299fn prepare_artifact(pvf: PvfPrepData) -> Result<PrepareOutcome, PrepareError> {
300 let maybe_compressed_code = pvf.maybe_compressed_code();
301 let raw_validation_code = sp_maybe_compressed_blob::decompress(
302 &maybe_compressed_code,
303 pvf.validation_code_bomb_limit() as usize,
304 )
305 .map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?;
306 let observed_wasm_code_len = raw_validation_code.len() as u32;
307
308 let blob = match prevalidate(&raw_validation_code) {
309 Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
310 Ok(b) => b,
311 };
312
313 match prepare(blob, &pvf.executor_params()) {
314 Ok(compiled_artifact) => Ok(PrepareOutcome {
315 compiled_artifact: CompiledArtifact::new(compiled_artifact),
316 observed_wasm_code_len,
317 }),
318 Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
319 }
320}
321
322fn runtime_construction_check(
324 artifact_bytes: &[u8],
325 executor_params: &ExecutorParams,
326) -> Result<(), PrepareError> {
327 let result = unsafe { create_runtime_from_artifact_bytes(artifact_bytes, executor_params) };
329 result
330 .map(|_runtime| ())
331 .map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
332}
333
334#[derive(Encode, Decode)]
335struct JobResponse {
336 artifact: CompiledArtifact,
337 memory_stats: MemoryStats,
338 observed_wasm_code_len: u32,
339}
340
341#[cfg(target_os = "linux")]
342fn handle_clone(
343 pvf: &PvfPrepData,
344 pipe_write_fd: i32,
345 pipe_read_fd: i32,
346 stream_fd: i32,
347 preparation_timeout: Duration,
348 prepare_job_kind: PrepareJobKind,
349 executor_params: &Arc<ExecutorParams>,
350 worker_info: &WorkerInfo,
351 have_unshare_newuser: bool,
352 temp_artifact_dest: &Path,
353 usage_before: Usage,
354) -> Result<PrepareWorkerSuccess, PrepareError> {
355 use polkadot_node_core_pvf_common::worker::security;
356
357 match unsafe {
360 security::clone::clone_on_worker(
361 worker_info,
362 have_unshare_newuser,
363 Box::new(|| {
364 handle_child_process(
365 pvf.clone(),
366 pipe_write_fd,
367 pipe_read_fd,
368 stream_fd,
369 preparation_timeout,
370 prepare_job_kind,
371 Arc::clone(&executor_params),
372 )
373 }),
374 )
375 } {
376 Ok(child) => handle_parent_process(
377 pipe_read_fd,
378 pipe_write_fd,
379 worker_info,
380 child,
381 temp_artifact_dest,
382 usage_before,
383 preparation_timeout,
384 ),
385 Err(security::clone::Error::Clone(errno)) => Err(error_from_errno("clone", errno)),
386 }
387}
388
389fn handle_fork(
390 pvf: &PvfPrepData,
391 pipe_write_fd: i32,
392 pipe_read_fd: i32,
393 stream_fd: i32,
394 preparation_timeout: Duration,
395 prepare_job_kind: PrepareJobKind,
396 executor_params: &Arc<ExecutorParams>,
397 worker_info: &WorkerInfo,
398 temp_artifact_dest: &Path,
399 usage_before: Usage,
400) -> Result<PrepareWorkerSuccess, PrepareError> {
401 match unsafe { nix::unistd::fork() } {
404 Ok(ForkResult::Child) => handle_child_process(
405 pvf.clone(),
406 pipe_write_fd,
407 pipe_read_fd,
408 stream_fd,
409 preparation_timeout,
410 prepare_job_kind,
411 Arc::clone(executor_params),
412 ),
413 Ok(ForkResult::Parent { child }) => handle_parent_process(
414 pipe_read_fd,
415 pipe_write_fd,
416 worker_info,
417 child,
418 temp_artifact_dest,
419 usage_before,
420 preparation_timeout,
421 ),
422 Err(errno) => Err(error_from_errno("fork", errno)),
423 }
424}
425
426fn handle_child_process(
436 pvf: PvfPrepData,
437 pipe_write_fd: i32,
438 pipe_read_fd: i32,
439 stream_fd: i32,
440 preparation_timeout: Duration,
441 prepare_job_kind: PrepareJobKind,
442 executor_params: Arc<ExecutorParams>,
443) -> ! {
444 let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
446
447 if let Err(errno) = nix::unistd::close(pipe_read_fd) {
449 send_child_response(
450 &mut pipe_write,
451 JobResult::Err(error_from_errno("closing pipe", errno)),
452 );
453 }
454
455 if let Err(errno) = nix::unistd::close(stream_fd) {
460 send_child_response(
461 &mut pipe_write,
462 JobResult::Err(error_from_errno("error closing stream", errno)),
463 );
464 }
465
466 let worker_job_pid = process::id();
467 gum::debug!(
468 target: LOG_TARGET,
469 %worker_job_pid,
470 ?prepare_job_kind,
471 ?preparation_timeout,
472 "worker job: preparing artifact",
473 );
474
475 let condvar = thread::get_condvar();
477
478 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
480 let condvar_memory = Arc::clone(&condvar);
481 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
482 let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
483
484 start_memory_tracking(
485 pipe_write.as_raw_fd(),
486 executor_params.prechecking_max_memory().map(|v| {
487 v.try_into().unwrap_or_else(|_| {
488 gum::warn!(
489 LOG_TARGET,
490 %worker_job_pid,
491 "Illegal pre-checking max memory value {} discarded",
492 v,
493 );
494 0
495 })
496 }),
497 );
498
499 let cpu_time_start = ProcessTime::now();
500
501 let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
503 let cpu_time_monitor_thread = thread::spawn_worker_thread(
504 "cpu time monitor thread",
505 move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx),
506 Arc::clone(&condvar),
507 WaitOutcome::TimedOut,
508 )
509 .unwrap_or_else(|err| {
510 send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
511 });
512
513 let prepare_thread = spawn_worker_thread(
514 "prepare worker",
515 move || {
516 #[allow(unused_mut)]
517 let mut result = prepare_artifact(pvf).map(|o| (o,));
518
519 #[cfg(target_os = "linux")]
521 let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread()));
522
523 if let PrepareJobKind::Prechecking = prepare_job_kind {
529 result = result.and_then(|output| {
530 runtime_construction_check(
531 output.0.compiled_artifact.as_ref(),
532 &executor_params,
533 )?;
534 Ok(output)
535 });
536 }
537 result
538 },
539 Arc::clone(&condvar),
540 WaitOutcome::Finished,
541 )
542 .unwrap_or_else(|err| {
543 send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
544 });
545
546 let outcome = thread::wait_for_threads(condvar);
547
548 let peak_alloc = {
549 let peak = end_memory_tracking();
550 gum::debug!(
551 target: LOG_TARGET,
552 %worker_job_pid,
553 "prepare job peak allocation is {} bytes",
554 peak,
555 );
556 peak
557 };
558
559 let result = match outcome {
560 WaitOutcome::Finished => {
561 let _ = cpu_time_monitor_tx.send(());
562
563 match prepare_thread.join().unwrap_or_else(|err| {
564 send_child_response(
565 &mut pipe_write,
566 Err(PrepareError::JobError(stringify_panic_payload(err))),
567 )
568 }) {
569 Err(err) => Err(err),
570 Ok(ok) => {
571 cfg_if::cfg_if! {
572 if #[cfg(target_os = "linux")] {
573 let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok;
574 } else {
575 let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok;
576 }
577 }
578
579 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
581 let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id());
582
583 let memory_stats = MemoryStats {
584 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
585 memory_tracker_stats,
586 #[cfg(target_os = "linux")]
587 max_rss: extract_max_rss_stat(max_rss, process::id()),
588 peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
592 };
593
594 Ok(JobResponse {
595 artifact: compiled_artifact,
596 observed_wasm_code_len,
597 memory_stats,
598 })
599 },
600 }
601 },
602
603 WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
606 Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut),
607 Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())),
608 Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
609 },
610 WaitOutcome::Pending =>
611 unreachable!("we run wait_while until the outcome is no longer pending; qed"),
612 };
613
614 send_child_response(&mut pipe_write, result);
615}
616
617fn handle_parent_process(
628 pipe_read_fd: i32,
629 pipe_write_fd: i32,
630 worker_info: &WorkerInfo,
631 job_pid: Pid,
632 temp_artifact_dest: &Path,
633 usage_before: Usage,
634 timeout: Duration,
635) -> Result<PrepareWorkerSuccess, PrepareError> {
636 if let Err(errno) = nix::unistd::close(pipe_write_fd) {
639 return Err(error_from_errno("closing pipe write fd", errno));
640 };
641
642 let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
644
645 let mut received_data = Vec::new();
647 pipe_read
648 .read_to_end(&mut received_data)
649 .map_err(|err| PrepareError::IoErr(err.to_string()))?;
650
651 let status = nix::sys::wait::waitpid(job_pid, None);
652 gum::trace!(
653 target: LOG_TARGET,
654 ?worker_info,
655 %job_pid,
656 "prepare worker received wait status from job: {:?}",
657 status,
658 );
659
660 let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
661 .map_err(|errno| error_from_errno("getrusage after", errno))?;
662
663 let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
669 if cpu_tv >= timeout {
670 gum::warn!(
671 target: LOG_TARGET,
672 ?worker_info,
673 %job_pid,
674 "prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
675 cpu_tv.as_millis(),
676 timeout.as_millis(),
677 );
678 return Err(PrepareError::TimedOut)
679 }
680
681 match status {
682 Ok(WaitStatus::Exited(_pid, exit_status)) => {
683 let mut reader = io::BufReader::new(received_data.as_slice());
684 let result = recv_child_response(&mut reader, "prepare")
685 .map_err(|err| PrepareError::JobError(err.to_string()))?;
686
687 match result {
688 Err(err) => Err(err),
689 Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => {
690 if exit_status != 0 {
692 return Err(PrepareError::JobError(format!(
693 "unexpected exit status: {}",
694 exit_status
695 )))
696 }
697
698 gum::debug!(
706 target: LOG_TARGET,
707 ?worker_info,
708 %job_pid,
709 "worker: writing artifact to {}",
710 temp_artifact_dest.display(),
711 );
712 if let Err(err) = fs::write(temp_artifact_dest, &artifact) {
714 return Err(PrepareError::IoErr(err.to_string()))
715 };
716
717 let checksum = compute_checksum(&artifact.as_ref());
718 Ok(PrepareWorkerSuccess {
719 checksum,
720 stats: PrepareStats {
721 memory_stats,
722 cpu_time_elapsed: cpu_tv,
723 observed_wasm_code_len,
724 },
725 })
726 },
727 }
728 },
729 Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Err(PrepareError::JobDied {
734 err: format!("received signal: {signal:?}"),
735 job_pid: job_pid.as_raw(),
736 }),
737 Err(errno) => Err(error_from_errno("waitpid", errno)),
738
739 Ok(unexpected_wait_status) => Err(PrepareError::JobDied {
742 err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
743 job_pid: job_pid.as_raw(),
744 }),
745 }
746}
747
748fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
756 framed_send_blocking(pipe_write, response.encode().as_slice())
757 .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
758
759 if response.is_ok() {
760 process::exit(libc::EXIT_SUCCESS)
761 } else {
762 process::exit(libc::EXIT_FAILURE)
763 }
764}
765
766fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
767 PrepareError::Kernel(stringify_errno(context, errno))
768}
769
770type JobResult = Result<JobResponse, PrepareError>;
771
772const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
774
775#[test]
776fn pre_encoded_payloads() {
777 let oom_unencoded: JobResult = JobResult::Err(PrepareError::OutOfMemory);
779 let oom_encoded = oom_unencoded.encode();
780 let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
782 oom_payload.extend(oom_encoded);
783 assert_eq!(oom_payload, OOM_PAYLOAD);
784}