1#![deny(unused_crate_dependencies)]
20#![warn(missing_docs)]
21
22pub use polkadot_node_core_pvf_common::{
23 error::ExecuteError, executor_interface::execute_artifact,
24};
25use polkadot_parachain_primitives::primitives::ValidationParams;
26
27const LOG_TARGET: &str = "parachain::pvf-execute-worker";
30
31use codec::{Decode, Encode};
32use cpu_time::ProcessTime;
33use nix::{
34 errno::Errno,
35 sys::{
36 resource::{Usage, UsageWho},
37 wait::WaitStatus,
38 },
39 unistd::{ForkResult, Pid},
40};
41use polkadot_node_core_pvf_common::{
42 compute_checksum,
43 error::InternalValidationError,
44 execute::{
45 ExecuteRequest, Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse,
46 },
47 executor_interface::params_to_wasmtime_semantics,
48 framed_recv_blocking, framed_send_blocking,
49 worker::{
50 cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
51 send_result, stringify_errno, stringify_panic_payload,
52 thread::{self, WaitOutcome},
53 PipeFd, WorkerInfo, WorkerKind,
54 },
55 worker_dir, ArtifactChecksum,
56};
57use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT};
58use polkadot_parachain_primitives::primitives::ValidationResult;
59use polkadot_primitives::{ExecutorParams, PersistedValidationData};
60use std::{
61 io::{self, Read},
62 os::{
63 fd::{AsRawFd, FromRawFd},
64 unix::net::UnixStream,
65 },
66 path::PathBuf,
67 process,
68 sync::{mpsc::channel, Arc},
69 time::Duration,
70};
71
72pub const EXECUTE_WORKER_THREAD_NUMBER: u32 = 3;
80
81fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
83 let handshake_enc = framed_recv_blocking(stream)?;
84 let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
85 io::Error::new(
86 io::ErrorKind::Other,
87 "execute pvf recv_execute_handshake: failed to decode Handshake".to_owned(),
88 )
89 })?;
90 Ok(handshake)
91}
92
93fn recv_request(
94 stream: &mut UnixStream,
95) -> io::Result<(PersistedValidationData, PoV, Duration, ArtifactChecksum)> {
96 let request_bytes = framed_recv_blocking(stream)?;
97 let request = ExecuteRequest::decode(&mut &request_bytes[..]).map_err(|_| {
98 io::Error::new(
99 io::ErrorKind::Other,
100 "execute pvf recv_request: failed to decode ExecuteRequest".to_string(),
101 )
102 })?;
103
104 Ok((request.pvd, request.pov, request.execution_timeout, request.artifact_checksum))
105}
106
107macro_rules! map_and_send_err {
109 ($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{
110 let err: WorkerError = $err_constructor($error.to_string()).into();
111 let io_err = io::Error::new(io::ErrorKind::Other, err.to_string());
112 let _ = send_result::<WorkerResponse, WorkerError>($stream, Err(err), $worker_info);
113 io_err
114 }};
115}
116
117pub fn worker_entrypoint(
131 socket_path: PathBuf,
132 worker_dir_path: PathBuf,
133 node_version: Option<&str>,
134 worker_version: Option<&str>,
135) {
136 run_worker(
137 WorkerKind::Execute,
138 socket_path,
139 worker_dir_path,
140 node_version,
141 worker_version,
142 |mut stream, worker_info, security_status| {
143 let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
144
145 let Handshake { executor_params } =
146 recv_execute_handshake(&mut stream).map_err(|e| {
147 map_and_send_err!(
148 e,
149 InternalValidationError::HostCommunication,
150 &mut stream,
151 worker_info
152 )
153 })?;
154
155 let executor_params: Arc<ExecutorParams> = Arc::new(executor_params);
156 let execute_thread_stack_size = max_stack_size(&executor_params);
157
158 loop {
159 let (pvd, pov, execution_timeout, artifact_checksum) = recv_request(&mut stream)
160 .map_err(|e| {
161 map_and_send_err!(
162 e,
163 InternalValidationError::HostCommunication,
164 &mut stream,
165 worker_info
166 )
167 })?;
168 gum::debug!(
169 target: LOG_TARGET,
170 ?worker_info,
171 ?security_status,
172 "worker: validating artifact {}",
173 artifact_path.display(),
174 );
175
176 let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| {
178 map_and_send_err!(
179 e,
180 InternalValidationError::CouldNotOpenFile,
181 &mut stream,
182 worker_info
183 )
184 })?;
185
186 if artifact_checksum != compute_checksum(&compiled_artifact_blob) {
187 send_result::<WorkerResponse, WorkerError>(
188 &mut stream,
189 Ok(WorkerResponse {
190 job_response: JobResponse::CorruptedArtifact,
191 duration: Duration::ZERO,
192 pov_size: 0,
193 }),
194 worker_info,
195 )?;
196 continue;
197 }
198
199 let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
200 map_and_send_err!(
201 e,
202 InternalValidationError::CouldNotCreatePipe,
203 &mut stream,
204 worker_info
205 )
206 })?;
207
208 let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
209 .map_err(|errno| {
210 let e = stringify_errno("getrusage before", errno);
211 map_and_send_err!(
212 e,
213 InternalValidationError::Kernel,
214 &mut stream,
215 worker_info
216 )
217 })?;
218 let stream_fd = stream.as_raw_fd();
219
220 let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
221
222 let raw_block_data =
223 match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
224 Ok(data) => data,
225 Err(_) => {
226 send_result::<WorkerResponse, WorkerError>(
227 &mut stream,
228 Ok(WorkerResponse {
229 job_response: JobResponse::PoVDecompressionFailure,
230 duration: Duration::ZERO,
231 pov_size: 0,
232 }),
233 worker_info,
234 )?;
235 continue;
236 },
237 };
238
239 let pov_size = raw_block_data.len() as u32;
240
241 let params = ValidationParams {
242 parent_head: pvd.parent_head.clone(),
243 block_data: BlockData(raw_block_data.to_vec()),
244 relay_parent_number: pvd.relay_parent_number,
245 relay_parent_storage_root: pvd.relay_parent_storage_root,
246 };
247 let params = Arc::new(params.encode());
248
249 cfg_if::cfg_if! {
250 if #[cfg(target_os = "linux")] {
251 let result = if security_status.can_do_secure_clone {
252 handle_clone(
253 pipe_write_fd,
254 pipe_read_fd,
255 stream_fd,
256 &compiled_artifact_blob,
257 &executor_params,
258 ¶ms,
259 execution_timeout,
260 execute_thread_stack_size,
261 worker_info,
262 security_status.can_unshare_user_namespace_and_change_root,
263 usage_before,
264 pov_size,
265 )?
266 } else {
267 handle_fork(
269 pipe_write_fd,
270 pipe_read_fd,
271 stream_fd,
272 &compiled_artifact_blob,
273 &executor_params,
274 ¶ms,
275 execution_timeout,
276 execute_thread_stack_size,
277 worker_info,
278 usage_before,
279 pov_size,
280 )?
281 };
282 } else {
283 let result = handle_fork(
284 pipe_write_fd,
285 pipe_read_fd,
286 stream_fd,
287 &compiled_artifact_blob,
288 &executor_params,
289 ¶ms,
290 execution_timeout,
291 execute_thread_stack_size,
292 worker_info,
293 usage_before,
294 pov_size,
295 )?;
296 }
297 }
298
299 gum::trace!(
300 target: LOG_TARGET,
301 ?worker_info,
302 "worker: sending result to host: {:?}",
303 result
304 );
305 send_result(&mut stream, result, worker_info)?;
306 }
307 },
308 );
309}
310
311fn validate_using_artifact(
312 compiled_artifact_blob: &[u8],
313 executor_params: &ExecutorParams,
314 params: &[u8],
315) -> JobResponse {
316 let descriptor_bytes = match unsafe {
317 execute_artifact(compiled_artifact_blob, executor_params, params)
321 } {
322 Err(ExecuteError::RuntimeConstruction(wasmerr)) =>
323 return JobResponse::runtime_construction("execute", &wasmerr.to_string()),
324 Err(err) => return JobResponse::format_invalid("execute", &err.to_string()),
325 Ok(d) => d,
326 };
327
328 let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
329 Err(err) =>
330 return JobResponse::format_invalid(
331 "validation result decoding failed",
332 &err.to_string(),
333 ),
334 Ok(r) => r,
335 };
336
337 JobResponse::Ok { result_descriptor }
338}
339
340#[cfg(target_os = "linux")]
341fn handle_clone(
342 pipe_write_fd: i32,
343 pipe_read_fd: i32,
344 stream_fd: i32,
345 compiled_artifact_blob: &Arc<Vec<u8>>,
346 executor_params: &Arc<ExecutorParams>,
347 params: &Arc<Vec<u8>>,
348 execution_timeout: Duration,
349 execute_stack_size: usize,
350 worker_info: &WorkerInfo,
351 have_unshare_newuser: bool,
352 usage_before: Usage,
353 pov_size: u32,
354) -> io::Result<Result<WorkerResponse, WorkerError>> {
355 use polkadot_node_core_pvf_common::worker::security;
356
357 match unsafe {
360 security::clone::clone_on_worker(
361 worker_info,
362 have_unshare_newuser,
363 Box::new(|| {
364 handle_child_process(
365 pipe_write_fd,
366 pipe_read_fd,
367 stream_fd,
368 Arc::clone(compiled_artifact_blob),
369 Arc::clone(executor_params),
370 Arc::clone(params),
371 execution_timeout,
372 execute_stack_size,
373 )
374 }),
375 )
376 } {
377 Ok(child) => handle_parent_process(
378 pipe_read_fd,
379 pipe_write_fd,
380 worker_info,
381 child,
382 usage_before,
383 pov_size,
384 execution_timeout,
385 ),
386 Err(security::clone::Error::Clone(errno)) =>
387 Ok(Err(internal_error_from_errno("clone", errno))),
388 }
389}
390
391fn handle_fork(
392 pipe_write_fd: i32,
393 pipe_read_fd: i32,
394 stream_fd: i32,
395 compiled_artifact_blob: &Arc<Vec<u8>>,
396 executor_params: &Arc<ExecutorParams>,
397 params: &Arc<Vec<u8>>,
398 execution_timeout: Duration,
399 execute_worker_stack_size: usize,
400 worker_info: &WorkerInfo,
401 usage_before: Usage,
402 pov_size: u32,
403) -> io::Result<Result<WorkerResponse, WorkerError>> {
404 match unsafe { nix::unistd::fork() } {
407 Ok(ForkResult::Child) => handle_child_process(
408 pipe_write_fd,
409 pipe_read_fd,
410 stream_fd,
411 Arc::clone(compiled_artifact_blob),
412 Arc::clone(executor_params),
413 Arc::clone(params),
414 execution_timeout,
415 execute_worker_stack_size,
416 ),
417 Ok(ForkResult::Parent { child }) => handle_parent_process(
418 pipe_read_fd,
419 pipe_write_fd,
420 worker_info,
421 child,
422 usage_before,
423 pov_size,
424 execution_timeout,
425 ),
426 Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
427 }
428}
429
430fn handle_child_process(
437 pipe_write_fd: i32,
438 pipe_read_fd: i32,
439 stream_fd: i32,
440 compiled_artifact_blob: Arc<Vec<u8>>,
441 executor_params: Arc<ExecutorParams>,
442 params: Arc<Vec<u8>>,
443 execution_timeout: Duration,
444 execute_thread_stack_size: usize,
445) -> ! {
446 let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
448
449 if let Err(errno) = nix::unistd::close(pipe_read_fd) {
451 send_child_response(&mut pipe_write, job_error_from_errno("closing pipe", errno));
452 }
453
454 if let Err(errno) = nix::unistd::close(stream_fd) {
459 send_child_response(&mut pipe_write, job_error_from_errno("closing stream", errno));
460 }
461
462 gum::debug!(
463 target: LOG_TARGET,
464 worker_job_pid = %process::id(),
465 "worker job: executing artifact",
466 );
467
468 let condvar = thread::get_condvar();
470 let cpu_time_start = ProcessTime::now();
471
472 let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
474 let cpu_time_monitor_thread = thread::spawn_worker_thread(
475 "cpu time monitor thread",
476 move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx),
477 Arc::clone(&condvar),
478 WaitOutcome::TimedOut,
479 )
480 .unwrap_or_else(|err| {
481 send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
482 });
483
484 let execute_thread = thread::spawn_worker_thread_with_stack_size(
485 "execute thread",
486 move || validate_using_artifact(&compiled_artifact_blob, &executor_params, ¶ms),
487 Arc::clone(&condvar),
488 WaitOutcome::Finished,
489 execute_thread_stack_size,
490 )
491 .unwrap_or_else(|err| {
492 send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
493 });
494
495 let outcome = thread::wait_for_threads(condvar);
496
497 let response = match outcome {
498 WaitOutcome::Finished => {
499 let _ = cpu_time_monitor_tx.send(());
500 execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e)))
501 },
502 WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
505 Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut),
506 Ok(None) => Err(JobError::CpuTimeMonitorThread(
507 "error communicating over finished channel".into(),
508 )),
509 Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))),
510 },
511 WaitOutcome::Pending =>
512 unreachable!("we run wait_while until the outcome is no longer pending; qed"),
513 };
514
515 send_child_response(&mut pipe_write, response);
516}
517
518fn max_stack_size(executor_params: &ExecutorParams) -> usize {
555 let (_sem, deterministic_stack_limit) = params_to_wasmtime_semantics(executor_params);
556 return (2 * 1024 * 1024 + deterministic_stack_limit.native_stack_max) as usize;
557}
558
559fn handle_parent_process(
565 pipe_read_fd: i32,
566 pipe_write_fd: i32,
567 worker_info: &WorkerInfo,
568 job_pid: Pid,
569 usage_before: Usage,
570 pov_size: u32,
571 timeout: Duration,
572) -> io::Result<Result<WorkerResponse, WorkerError>> {
573 if let Err(errno) = nix::unistd::close(pipe_write_fd) {
576 return Ok(Err(internal_error_from_errno("closing pipe write fd", errno)));
577 };
578
579 let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
581
582 let mut received_data = Vec::new();
584 pipe_read
585 .read_to_end(&mut received_data)
586 .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
589
590 let status = nix::sys::wait::waitpid(job_pid, None);
591 gum::trace!(
592 target: LOG_TARGET,
593 ?worker_info,
594 %job_pid,
595 "execute worker received wait status from job: {:?}",
596 status,
597 );
598
599 let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
600 Ok(usage) => usage,
601 Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))),
602 };
603
604 let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
610 if cpu_tv >= timeout {
611 gum::warn!(
612 target: LOG_TARGET,
613 ?worker_info,
614 %job_pid,
615 "execute job took {}ms cpu time, exceeded execute timeout {}ms",
616 cpu_tv.as_millis(),
617 timeout.as_millis(),
618 );
619 return Ok(Err(WorkerError::JobTimedOut))
620 }
621
622 match status {
623 Ok(WaitStatus::Exited(_, exit_status)) => {
624 let mut reader = io::BufReader::new(received_data.as_slice());
625 let result = recv_child_response(&mut reader, "execute")?;
626
627 match result {
628 Ok(job_response) => {
629 if exit_status != 0 {
631 return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus(
632 exit_status,
633 ))));
634 }
635
636 Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv }))
637 },
638 Err(job_error) => {
639 gum::warn!(
640 target: LOG_TARGET,
641 ?worker_info,
642 %job_pid,
643 "execute job error: {}",
644 job_error,
645 );
646 if matches!(job_error, JobError::TimedOut) {
647 Ok(Err(WorkerError::JobTimedOut))
648 } else {
649 Ok(Err(WorkerError::JobError(job_error.into())))
650 }
651 },
652 }
653 },
654 Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied {
659 err: format!("received signal: {signal:?}"),
660 job_pid: job_pid.as_raw(),
661 })),
662 Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))),
663
664 Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied {
667 err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
668 job_pid: job_pid.as_raw(),
669 })),
670 }
671}
672
673fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
681 framed_send_blocking(pipe_write, response.encode().as_slice())
682 .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
683
684 if response.is_ok() {
685 process::exit(libc::EXIT_SUCCESS)
686 } else {
687 process::exit(libc::EXIT_FAILURE)
688 }
689}
690
691fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError {
692 WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno)))
693}
694
695fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
696 Err(JobError::Kernel(stringify_errno(context, errno)))
697}