referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/
worker_interface.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Common logic for implementation of worker processes.
18
19use crate::LOG_TARGET;
20use codec::Encode;
21use futures::FutureExt as _;
22use futures_timer::Delay;
23use pin_project::pin_project;
24use polkadot_node_core_pvf_common::{SecurityStatus, WorkerHandshake};
25use rand::Rng;
26use std::{
27	fmt, mem,
28	path::{Path, PathBuf},
29	pin::Pin,
30	task::{Context, Poll},
31	time::Duration,
32};
33use tokio::{
34	io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
35	net::{UnixListener, UnixStream},
36	process,
37};
38
39/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
40/// wall clock time). This is lenient because CPU time may go slower than wall clock time.
41pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
42
43/// This is publicly exposed only for integration tests.
44///
45/// # Parameters
46///
47/// - `debug_id`: An identifier for the process (e.g. "execute" or "prepare").
48///
49/// - `program_path`: The path to the program.
50///
51/// - `cache_path`: The path to the artifact cache.
52///
53/// - `extra_args`: Optional extra CLI arguments to the program. NOTE: Should only contain data
54///   required before the handshake, like node/worker versions for the version check. Other data
55///   should go through the handshake.
56///
57/// - `spawn_timeout`: The amount of time to wait for the child process to spawn.
58///
59/// - `security_status`: contains the detected status of security features.
60#[doc(hidden)]
61pub async fn spawn_with_program_path(
62	debug_id: &'static str,
63	program_path: impl Into<PathBuf>,
64	cache_path: &Path,
65	extra_args: &[&str],
66	spawn_timeout: Duration,
67	security_status: SecurityStatus,
68) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
69	let program_path = program_path.into();
70	let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
71	let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
72	// Hack the borrow-checker.
73	let program_path_clone = program_path.clone();
74	let worker_dir_clone = worker_dir.path().to_owned();
75	let extra_args_clone = extra_args.clone();
76
77	with_transient_socket_path(debug_id, |socket_path| {
78		let socket_path = socket_path.to_owned();
79
80		async move {
81			let listener = match UnixListener::bind(&socket_path) {
82				Ok(ok) => ok,
83				Err(err) => return Err(SpawnErr::Bind { socket_path, err: err.to_string() }),
84			};
85
86			let handle =
87				WorkerHandle::spawn(&program_path, &extra_args, &socket_path, &worker_dir.path())
88					.map_err(|err| SpawnErr::ProcessSpawn { program_path, err: err.to_string() })?;
89
90			futures::select! {
91				accept_result = listener.accept().fuse() => {
92					let (mut stream, _) = accept_result
93						.map_err(|err| SpawnErr::Accept { socket_path, err: err.to_string() })?;
94					send_worker_handshake(&mut stream, WorkerHandshake { security_status })
95						.await
96						.map_err(|err| SpawnErr::Handshake { err: err.to_string() })?;
97					Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
98				}
99				_ = Delay::new(spawn_timeout).fuse() => Err(SpawnErr::AcceptTimeout{spawn_timeout}),
100			}
101		}
102	})
103	.await
104	.map_err(|err| {
105		gum::warn!(
106			target: LOG_TARGET,
107			%debug_id,
108			program_path = ?program_path_clone,
109			extra_args = ?extra_args_clone,
110			worker_dir = ?worker_dir_clone,
111			"error spawning worker: {}",
112			err,
113		);
114		err
115	})
116}
117
118/// A temporary, random, free path that is necessary only to establish socket communications. If a
119/// directory exists at the path at the end of this function, it is removed then.
120async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
121where
122	F: FnOnce(&Path) -> Fut,
123	Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
124{
125	/// Returns a path under [`std::env::temp_dir`]. The path name will start with the given prefix.
126	///
127	/// There is only a certain number of retries. If exceeded this function will give up and return
128	/// an error.
129	pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
130		fn make_tmppath(prefix: &str, dir: &Path) -> PathBuf {
131			use rand::distributions::Alphanumeric;
132
133			const DISCRIMINATOR_LEN: usize = 10;
134
135			let mut buf = Vec::with_capacity(prefix.len() + DISCRIMINATOR_LEN);
136			buf.extend(prefix.as_bytes());
137			buf.extend(rand::thread_rng().sample_iter(&Alphanumeric).take(DISCRIMINATOR_LEN));
138
139			let s = std::str::from_utf8(&buf)
140				.expect("the string is collected from a valid utf-8 sequence; qed");
141
142			let mut path = dir.to_owned();
143			path.push(s);
144			path
145		}
146
147		const NUM_RETRIES: usize = 50;
148
149		let dir = std::env::temp_dir();
150		for _ in 0..NUM_RETRIES {
151			let tmp_path = make_tmppath(prefix, &dir);
152			if !tmp_path.exists() {
153				return Ok(tmp_path)
154			}
155		}
156
157		Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary path"))
158	}
159
160	let socket_path = tmppath(&format!("pvf-host-{}-", debug_id))
161		.await
162		.map_err(|_| SpawnErr::TmpPath)?;
163	let result = f(&socket_path).await;
164
165	// Best effort to remove the socket file. Under normal circumstances the socket will be removed
166	// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
167	let _ = tokio::fs::remove_file(socket_path).await;
168
169	result
170}
171
172/// A struct that represents an idle worker.
173///
174/// This struct is supposed to be used as a token that is passed by move into a subroutine that
175/// initiates a job. If the worker dies on the duty, then the token is not returned.
176#[derive(Debug)]
177pub struct IdleWorker {
178	/// The stream to which the child process is connected.
179	pub stream: UnixStream,
180
181	/// The identifier of this process. Used to reset the niceness.
182	pub pid: u32,
183
184	/// The temporary per-worker path. We clean up the worker dir between jobs and delete it when
185	/// the worker dies.
186	pub worker_dir: WorkerDir,
187}
188
189/// This is publicly exposed only for integration tests.
190///
191/// An error happened during spawning a worker process.
192#[derive(thiserror::Error, Clone, Debug)]
193#[doc(hidden)]
194pub enum SpawnErr {
195	#[error("cannot obtain a temporary path location")]
196	TmpPath,
197	#[error("cannot bind the socket to the given path {socket_path:?}: {err}")]
198	Bind { socket_path: PathBuf, err: String },
199	#[error(
200		"an error happened during accepting a connection to the socket {socket_path:?}: {err}"
201	)]
202	Accept { socket_path: PathBuf, err: String },
203	#[error("an error happened during spawning the process at path {program_path:?}: {err}")]
204	ProcessSpawn { program_path: PathBuf, err: String },
205	#[error("the deadline {}ms allotted for the worker spawning and connecting to the socket has elapsed", .spawn_timeout.as_millis())]
206	AcceptTimeout { spawn_timeout: Duration },
207	#[error("failed to send handshake after successful spawning was signaled: {err}")]
208	Handshake { err: String },
209}
210
211/// This is a representation of a potentially running worker. Drop it and the process will be
212/// killed.
213///
214/// A worker's handle is also a future that resolves when it's detected that the worker's process
215/// has been terminated. Since the worker is running in another process it is obviously not
216/// necessary to poll this future to make the worker run, it's only for termination detection.
217///
218/// This future relies on the fact that a child process's stdout `fd` is closed upon its
219/// termination.
220#[pin_project]
221pub struct WorkerHandle {
222	child: process::Child,
223	child_id: u32,
224	#[pin]
225	stdout: process::ChildStdout,
226	program: PathBuf,
227	drop_box: Box<[u8]>,
228}
229
230impl WorkerHandle {
231	fn spawn(
232		program: impl AsRef<Path>,
233		extra_args: &[String],
234		socket_path: impl AsRef<Path>,
235		worker_dir_path: impl AsRef<Path>,
236	) -> io::Result<Self> {
237		// Clear all env vars from the spawned process.
238		let mut command = process::Command::new(program.as_ref());
239		command.env_clear();
240
241		command.env("RUST_LOG", sc_tracing::logging::get_directives().join(","));
242
243		let mut child = command
244			.args(extra_args)
245			.arg("--socket-path")
246			.arg(socket_path.as_ref().as_os_str())
247			.arg("--worker-dir-path")
248			.arg(worker_dir_path.as_ref().as_os_str())
249			.stdout(std::process::Stdio::piped())
250			.kill_on_drop(true)
251			.spawn()?;
252
253		let child_id = child
254			.id()
255			.ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?;
256		let stdout = child
257			.stdout
258			.take()
259			.expect("the process spawned with piped stdout should have the stdout handle");
260
261		Ok(WorkerHandle {
262			child,
263			child_id,
264			stdout,
265			program: program.as_ref().to_path_buf(),
266			// We don't expect the bytes to be ever read. But in case we do, we should not use a
267			// buffer of a small size, because otherwise if the child process does return any data
268			// we will end up issuing a syscall for each byte. We also prefer not to do allocate
269			// that on the stack, since each poll the buffer will be allocated and initialized (and
270			// that's due `poll_read` takes &mut [u8] and there are no guarantees that a `poll_read`
271			// won't ever read from there even though that's unlikely).
272			//
273			// OTOH, we also don't want to be super smart here and we could just afford to allocate
274			// a buffer for that here.
275			drop_box: vec![0; 8192].into_boxed_slice(),
276		})
277	}
278
279	/// Returns the process id of this worker.
280	pub fn id(&self) -> u32 {
281		self.child_id
282	}
283}
284
285impl futures::Future for WorkerHandle {
286	type Output = ();
287
288	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289		let me = self.project();
290		// Create a `ReadBuf` here instead of storing it in `WorkerHandle` to avoid a lifetime
291		// parameter on `WorkerHandle`. Creating the `ReadBuf` is fairly cheap.
292		let mut read_buf = ReadBuf::new(&mut *me.drop_box);
293		match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) {
294			Ok(()) => {
295				if read_buf.filled().len() > 0 {
296					// weird, we've read something. Pretend that never happened and reschedule
297					// ourselves.
298					cx.waker().wake_by_ref();
299					Poll::Pending
300				} else {
301					// Nothing read means `EOF` means the child was terminated. Resolve.
302					Poll::Ready(())
303				}
304			},
305			Err(err) => {
306				// The implementation is guaranteed to not to return `WouldBlock` and Interrupted.
307				// This leaves us with legit errors which we suppose were due to termination.
308
309				// Log the status code.
310				gum::debug!(
311					target: LOG_TARGET,
312					worker_pid = %me.child_id,
313					status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()),
314					"pvf worker ({}): {:?}",
315					me.program.display(),
316					err,
317				);
318				Poll::Ready(())
319			},
320		}
321	}
322}
323
324impl fmt::Debug for WorkerHandle {
325	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326		write!(f, "WorkerHandle(pid={})", self.id())
327	}
328}
329
330/// Write some data prefixed by its length into `w`.
331pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
332	let len_buf = buf.len().to_le_bytes();
333	w.write_all(&len_buf).await?;
334	w.write_all(buf).await?;
335	Ok(())
336}
337
338/// Read some data prefixed by its length from `r`.
339pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
340	let mut len_buf = [0u8; mem::size_of::<usize>()];
341	r.read_exact(&mut len_buf).await?;
342	let len = usize::from_le_bytes(len_buf);
343	let mut buf = vec![0; len];
344	r.read_exact(&mut buf).await?;
345	Ok(buf)
346}
347
348/// Sends a handshake with information for the worker.
349async fn send_worker_handshake(
350	stream: &mut UnixStream,
351	handshake: WorkerHandshake,
352) -> io::Result<()> {
353	framed_send(stream, &handshake.encode()).await
354}
355
356/// A temporary worker dir that contains only files needed by the worker. The worker will change its
357/// root (the `/` directory) to this directory; it should have access to no other paths on its
358/// filesystem.
359///
360/// NOTE: This struct cleans up its associated directory when it is dropped. Therefore it should not
361/// implement `Clone`.
362///
363/// # File structure
364///
365/// The overall file structure for the PVF system is as follows. The `worker-dir-X`s are managed by
366/// this struct.
367///
368/// ```nocompile
369/// + /<cache_path>/
370///   - artifact-1
371///   - artifact-2
372///   - [...]
373///   - worker-dir-1/  (new `/` for worker-1)
374///     + socket                            (created by host)
375///     + tmp-artifact                      (created by host) (prepare-only)
376///     + artifact     (link -> artifact-1) (created by host) (execute-only)
377///   - worker-dir-2/  (new `/` for worker-2)
378///     + [...]
379/// ```
380#[derive(Debug)]
381pub struct WorkerDir {
382	tempdir: tempfile::TempDir,
383}
384
385pub const WORKER_DIR_PREFIX: &str = "worker-dir";
386
387impl WorkerDir {
388	/// Creates a new, empty worker dir with a random name in the given cache dir.
389	pub async fn new(debug_id: &'static str, cache_dir: &Path) -> Result<Self, SpawnErr> {
390		let prefix = format!("{WORKER_DIR_PREFIX}-{debug_id}-");
391		let tempdir = tempfile::Builder::new()
392			.prefix(&prefix)
393			.tempdir_in(cache_dir)
394			.map_err(|_| SpawnErr::TmpPath)?;
395		Ok(Self { tempdir })
396	}
397
398	pub fn path(&self) -> &Path {
399		self.tempdir.path()
400	}
401}
402
403// Not async since Rust has trouble with async recursion. There should be few files here anyway.
404//
405/// Clear the temporary worker dir without deleting it. Not deleting is important because the worker
406/// has mounted its own separate filesystem here.
407///
408/// Should be called right after a job has finished. We don't want jobs to have access to
409/// artifacts from previous jobs.
410pub fn clear_worker_dir_path(worker_dir_path: &Path) -> io::Result<()> {
411	fn remove_dir_contents(path: &Path) -> io::Result<()> {
412		for entry in std::fs::read_dir(path)? {
413			let entry = entry?;
414			let path = entry.path();
415
416			if entry.file_type()?.is_dir() {
417				remove_dir_contents(&path)?;
418				std::fs::remove_dir(path)?;
419			} else {
420				std::fs::remove_file(path)?;
421			}
422		}
423		Ok(())
424	}
425
426	// Note the worker dir may not exist anymore because of the worker dying and being cleaned up.
427	match remove_dir_contents(worker_dir_path) {
428		Err(err) if matches!(err.kind(), io::ErrorKind::NotFound) => Ok(()),
429		result => result,
430	}
431}