1use crate::LOG_TARGET;
20use codec::Encode;
21use futures::FutureExt as _;
22use futures_timer::Delay;
23use pin_project::pin_project;
24use polkadot_node_core_pvf_common::{SecurityStatus, WorkerHandshake};
25use rand::Rng;
26use std::{
27 fmt, mem,
28 path::{Path, PathBuf},
29 pin::Pin,
30 task::{Context, Poll},
31 time::Duration,
32};
33use tokio::{
34 io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
35 net::{UnixListener, UnixStream},
36 process,
37};
38
39pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
42
43#[doc(hidden)]
61pub async fn spawn_with_program_path(
62 debug_id: &'static str,
63 program_path: impl Into<PathBuf>,
64 cache_path: &Path,
65 extra_args: &[&str],
66 spawn_timeout: Duration,
67 security_status: SecurityStatus,
68) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
69 let program_path = program_path.into();
70 let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
71 let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
72 let program_path_clone = program_path.clone();
74 let worker_dir_clone = worker_dir.path().to_owned();
75 let extra_args_clone = extra_args.clone();
76
77 with_transient_socket_path(debug_id, |socket_path| {
78 let socket_path = socket_path.to_owned();
79
80 async move {
81 let listener = match UnixListener::bind(&socket_path) {
82 Ok(ok) => ok,
83 Err(err) => return Err(SpawnErr::Bind { socket_path, err: err.to_string() }),
84 };
85
86 let handle =
87 WorkerHandle::spawn(&program_path, &extra_args, &socket_path, &worker_dir.path())
88 .map_err(|err| SpawnErr::ProcessSpawn { program_path, err: err.to_string() })?;
89
90 futures::select! {
91 accept_result = listener.accept().fuse() => {
92 let (mut stream, _) = accept_result
93 .map_err(|err| SpawnErr::Accept { socket_path, err: err.to_string() })?;
94 send_worker_handshake(&mut stream, WorkerHandshake { security_status })
95 .await
96 .map_err(|err| SpawnErr::Handshake { err: err.to_string() })?;
97 Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
98 }
99 _ = Delay::new(spawn_timeout).fuse() => Err(SpawnErr::AcceptTimeout{spawn_timeout}),
100 }
101 }
102 })
103 .await
104 .map_err(|err| {
105 gum::warn!(
106 target: LOG_TARGET,
107 %debug_id,
108 program_path = ?program_path_clone,
109 extra_args = ?extra_args_clone,
110 worker_dir = ?worker_dir_clone,
111 "error spawning worker: {}",
112 err,
113 );
114 err
115 })
116}
117
118async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
121where
122 F: FnOnce(&Path) -> Fut,
123 Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
124{
125 pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
130 fn make_tmppath(prefix: &str, dir: &Path) -> PathBuf {
131 use rand::distributions::Alphanumeric;
132
133 const DISCRIMINATOR_LEN: usize = 10;
134
135 let mut buf = Vec::with_capacity(prefix.len() + DISCRIMINATOR_LEN);
136 buf.extend(prefix.as_bytes());
137 buf.extend(rand::thread_rng().sample_iter(&Alphanumeric).take(DISCRIMINATOR_LEN));
138
139 let s = std::str::from_utf8(&buf)
140 .expect("the string is collected from a valid utf-8 sequence; qed");
141
142 let mut path = dir.to_owned();
143 path.push(s);
144 path
145 }
146
147 const NUM_RETRIES: usize = 50;
148
149 let dir = std::env::temp_dir();
150 for _ in 0..NUM_RETRIES {
151 let tmp_path = make_tmppath(prefix, &dir);
152 if !tmp_path.exists() {
153 return Ok(tmp_path)
154 }
155 }
156
157 Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary path"))
158 }
159
160 let socket_path = tmppath(&format!("pvf-host-{}-", debug_id))
161 .await
162 .map_err(|_| SpawnErr::TmpPath)?;
163 let result = f(&socket_path).await;
164
165 let _ = tokio::fs::remove_file(socket_path).await;
168
169 result
170}
171
172#[derive(Debug)]
177pub struct IdleWorker {
178 pub stream: UnixStream,
180
181 pub pid: u32,
183
184 pub worker_dir: WorkerDir,
187}
188
189#[derive(thiserror::Error, Clone, Debug)]
193#[doc(hidden)]
194pub enum SpawnErr {
195 #[error("cannot obtain a temporary path location")]
196 TmpPath,
197 #[error("cannot bind the socket to the given path {socket_path:?}: {err}")]
198 Bind { socket_path: PathBuf, err: String },
199 #[error(
200 "an error happened during accepting a connection to the socket {socket_path:?}: {err}"
201 )]
202 Accept { socket_path: PathBuf, err: String },
203 #[error("an error happened during spawning the process at path {program_path:?}: {err}")]
204 ProcessSpawn { program_path: PathBuf, err: String },
205 #[error("the deadline {}ms allotted for the worker spawning and connecting to the socket has elapsed", .spawn_timeout.as_millis())]
206 AcceptTimeout { spawn_timeout: Duration },
207 #[error("failed to send handshake after successful spawning was signaled: {err}")]
208 Handshake { err: String },
209}
210
211#[pin_project]
221pub struct WorkerHandle {
222 child: process::Child,
223 child_id: u32,
224 #[pin]
225 stdout: process::ChildStdout,
226 program: PathBuf,
227 drop_box: Box<[u8]>,
228}
229
230impl WorkerHandle {
231 fn spawn(
232 program: impl AsRef<Path>,
233 extra_args: &[String],
234 socket_path: impl AsRef<Path>,
235 worker_dir_path: impl AsRef<Path>,
236 ) -> io::Result<Self> {
237 let mut command = process::Command::new(program.as_ref());
239 command.env_clear();
240
241 command.env("RUST_LOG", sc_tracing::logging::get_directives().join(","));
242
243 let mut child = command
244 .args(extra_args)
245 .arg("--socket-path")
246 .arg(socket_path.as_ref().as_os_str())
247 .arg("--worker-dir-path")
248 .arg(worker_dir_path.as_ref().as_os_str())
249 .stdout(std::process::Stdio::piped())
250 .kill_on_drop(true)
251 .spawn()?;
252
253 let child_id = child
254 .id()
255 .ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?;
256 let stdout = child
257 .stdout
258 .take()
259 .expect("the process spawned with piped stdout should have the stdout handle");
260
261 Ok(WorkerHandle {
262 child,
263 child_id,
264 stdout,
265 program: program.as_ref().to_path_buf(),
266 drop_box: vec![0; 8192].into_boxed_slice(),
276 })
277 }
278
279 pub fn id(&self) -> u32 {
281 self.child_id
282 }
283}
284
285impl futures::Future for WorkerHandle {
286 type Output = ();
287
288 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289 let me = self.project();
290 let mut read_buf = ReadBuf::new(&mut *me.drop_box);
293 match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) {
294 Ok(()) => {
295 if read_buf.filled().len() > 0 {
296 cx.waker().wake_by_ref();
299 Poll::Pending
300 } else {
301 Poll::Ready(())
303 }
304 },
305 Err(err) => {
306 gum::debug!(
311 target: LOG_TARGET,
312 worker_pid = %me.child_id,
313 status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()),
314 "pvf worker ({}): {:?}",
315 me.program.display(),
316 err,
317 );
318 Poll::Ready(())
319 },
320 }
321 }
322}
323
324impl fmt::Debug for WorkerHandle {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 write!(f, "WorkerHandle(pid={})", self.id())
327 }
328}
329
330pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
332 let len_buf = buf.len().to_le_bytes();
333 w.write_all(&len_buf).await?;
334 w.write_all(buf).await?;
335 Ok(())
336}
337
338pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
340 let mut len_buf = [0u8; mem::size_of::<usize>()];
341 r.read_exact(&mut len_buf).await?;
342 let len = usize::from_le_bytes(len_buf);
343 let mut buf = vec![0; len];
344 r.read_exact(&mut buf).await?;
345 Ok(buf)
346}
347
348async fn send_worker_handshake(
350 stream: &mut UnixStream,
351 handshake: WorkerHandshake,
352) -> io::Result<()> {
353 framed_send(stream, &handshake.encode()).await
354}
355
356#[derive(Debug)]
381pub struct WorkerDir {
382 tempdir: tempfile::TempDir,
383}
384
385pub const WORKER_DIR_PREFIX: &str = "worker-dir";
386
387impl WorkerDir {
388 pub async fn new(debug_id: &'static str, cache_dir: &Path) -> Result<Self, SpawnErr> {
390 let prefix = format!("{WORKER_DIR_PREFIX}-{debug_id}-");
391 let tempdir = tempfile::Builder::new()
392 .prefix(&prefix)
393 .tempdir_in(cache_dir)
394 .map_err(|_| SpawnErr::TmpPath)?;
395 Ok(Self { tempdir })
396 }
397
398 pub fn path(&self) -> &Path {
399 self.tempdir.path()
400 }
401}
402
403pub fn clear_worker_dir_path(worker_dir_path: &Path) -> io::Result<()> {
411 fn remove_dir_contents(path: &Path) -> io::Result<()> {
412 for entry in std::fs::read_dir(path)? {
413 let entry = entry?;
414 let path = entry.path();
415
416 if entry.file_type()?.is_dir() {
417 remove_dir_contents(&path)?;
418 std::fs::remove_dir(path)?;
419 } else {
420 std::fs::remove_file(path)?;
421 }
422 }
423 Ok(())
424 }
425
426 match remove_dir_contents(worker_dir_path) {
428 Err(err) if matches!(err.kind(), io::ErrorKind::NotFound) => Ok(()),
429 result => result,
430 }
431}