referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/prepare/
worker_interface.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Host interface to the prepare worker.
18
19use crate::{
20	artifacts::generate_artifact_path,
21	metrics::Metrics,
22	worker_interface::{
23		clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
24		SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
25	},
26	LOG_TARGET,
27};
28use codec::{Decode, Encode};
29use polkadot_node_core_pvf_common::{
30	error::{PrepareError, PrepareResult, PrepareWorkerResult},
31	prepare::{PrepareStats, PrepareSuccess, PrepareWorkerSuccess},
32	pvf::PvfPrepData,
33	worker_dir, SecurityStatus,
34};
35
36use sp_core::hexdisplay::HexDisplay;
37use std::{
38	path::{Path, PathBuf},
39	time::Duration,
40};
41use tokio::{io, net::UnixStream};
42
43/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
44///
45/// Sends a handshake message to the worker as soon as it is spawned.
46pub async fn spawn(
47	program_path: &Path,
48	cache_path: &Path,
49	spawn_timeout: Duration,
50	node_version: Option<&str>,
51	security_status: SecurityStatus,
52) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
53	let mut extra_args = vec!["prepare-worker"];
54	if let Some(node_version) = node_version {
55		extra_args.extend_from_slice(&["--node-impl-version", node_version]);
56	}
57
58	spawn_with_program_path(
59		"prepare",
60		program_path,
61		cache_path,
62		&extra_args,
63		spawn_timeout,
64		security_status,
65	)
66	.await
67}
68
69/// Outcome of PVF preparation.
70///
71/// If the idle worker token is not returned, it means the worker must be terminated.
72pub enum Outcome {
73	/// The worker has finished the work assigned to it.
74	Concluded { worker: IdleWorker, result: PrepareResult },
75	/// The host tried to reach the worker but failed. This is most likely because the worked was
76	/// killed by the system.
77	Unreachable,
78	/// The temporary file for the artifact could not be created at the given cache path.
79	CreateTmpFileErr { worker: IdleWorker, err: String },
80	/// The response from the worker is received, but the tmp file cannot be renamed (moved) to the
81	/// final destination location.
82	RenameTmpFile {
83		worker: IdleWorker,
84		err: String,
85		// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
86		// conversion to `Option<String>`.
87		src: Option<String>,
88		dest: Option<String>,
89	},
90	/// The worker cache could not be cleared for the given reason.
91	ClearWorkerDir { err: String },
92	/// The worker failed to finish the job until the given deadline.
93	///
94	/// The worker is no longer usable and should be killed.
95	TimedOut,
96	/// An IO error occurred while receiving the result from the worker process.
97	///
98	/// This doesn't return an idle worker instance, thus this worker is no longer usable.
99	IoErr(String),
100	/// The worker ran out of memory and is aborting. The worker should be ripped.
101	OutOfMemory,
102	/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
103	///
104	/// The worker might still be usable, but we kill it just in case.
105	JobDied { err: String, job_pid: i32 },
106}
107
108/// Given the idle token of a worker and parameters of work, communicates with the worker and
109/// returns the outcome.
110///
111/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process
112/// being killed.
113pub async fn start_work(
114	metrics: &Metrics,
115	worker: IdleWorker,
116	pvf: PvfPrepData,
117	cache_path: PathBuf,
118) -> Outcome {
119	let IdleWorker { stream, pid, worker_dir } = worker;
120
121	gum::debug!(
122		target: LOG_TARGET,
123		worker_pid = %pid,
124		?worker_dir,
125		"starting prepare for {:?}",
126		pvf,
127	);
128
129	with_worker_dir_setup(
130		worker_dir,
131		stream,
132		pid,
133		|tmp_artifact_file, mut stream, worker_dir| async move {
134			let preparation_timeout = pvf.prep_timeout();
135
136			if let Err(err) = send_request(&mut stream, &pvf).await {
137				gum::warn!(
138					target: LOG_TARGET,
139					worker_pid = %pid,
140					"failed to send a prepare request: {:?}",
141					err,
142				);
143				return Outcome::Unreachable
144			}
145
146			// Wait for the result from the worker, keeping in mind that there may be a timeout, the
147			// worker may get killed, or something along these lines. In that case we should
148			// propagate the error to the pool.
149			//
150			// We use a generous timeout here. This is in addition to the one in the child process,
151			// in case the child stalls. We have a wall clock timeout here in the host, but a CPU
152			// timeout in the child. We want to use CPU time because it varies less than wall clock
153			// time under load, but the CPU resources of the child can only be measured from the
154			// parent after the child process terminates.
155			let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
156			let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
157
158			match result {
159				// Received bytes from worker within the time limit.
160				Ok(Ok(prepare_worker_result)) =>
161					handle_response(
162						metrics,
163						IdleWorker { stream, pid, worker_dir },
164						prepare_worker_result,
165						pid,
166						tmp_artifact_file,
167						&cache_path,
168						preparation_timeout,
169					)
170					.await,
171				Ok(Err(err)) => {
172					// Communication error within the time limit.
173					gum::warn!(
174						target: LOG_TARGET,
175						worker_pid = %pid,
176						"failed to recv a prepare response: {}",
177						err,
178					);
179					Outcome::IoErr(err.to_string())
180				},
181				Err(_) => {
182					// Timed out here on the host.
183					gum::warn!(
184						target: LOG_TARGET,
185						worker_pid = %pid,
186						"did not recv a prepare response within the time limit",
187					);
188					Outcome::TimedOut
189				},
190			}
191		},
192	)
193	.await
194}
195
196/// Handles the case where we successfully received response bytes on the host from the child.
197///
198/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
199/// by [`with_worker_dir_setup`].
200async fn handle_response(
201	metrics: &Metrics,
202	worker: IdleWorker,
203	result: PrepareWorkerResult,
204	worker_pid: u32,
205	tmp_file: PathBuf,
206	cache_path: &Path,
207	preparation_timeout: Duration,
208) -> Outcome {
209	// TODO: Add `checksum` to `ArtifactPathId`. See:
210	//       https://github.com/paritytech/polkadot-sdk/issues/2399
211	let PrepareWorkerSuccess {
212		checksum,
213		stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len },
214	} = match result.clone() {
215		Ok(result) => result,
216		// Timed out on the child. This should already be logged by the child.
217		Err(PrepareError::TimedOut) => return Outcome::TimedOut,
218		Err(PrepareError::JobDied { err, job_pid }) => return Outcome::JobDied { err, job_pid },
219		Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
220		Err(err) => return Outcome::Concluded { worker, result: Err(err) },
221	};
222
223	metrics.observe_code_size(observed_wasm_code_len as usize);
224
225	if cpu_time_elapsed > preparation_timeout {
226		// The job didn't complete within the timeout.
227		gum::warn!(
228			target: LOG_TARGET,
229			%worker_pid,
230			"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
231			cpu_time_elapsed.as_millis(),
232			preparation_timeout.as_millis(),
233			tmp_file.display(),
234		);
235		return Outcome::TimedOut
236	}
237
238	let size = match tokio::fs::metadata(cache_path).await {
239		Ok(metadata) => metadata.len(),
240		Err(err) => {
241			gum::warn!(
242				target: LOG_TARGET,
243				?cache_path,
244				"failed to read size of the artifact: {}",
245				err,
246			);
247			return Outcome::IoErr(err.to_string())
248		},
249	};
250
251	// The file name should uniquely identify the artifact even across restarts. In case the cache
252	// for some reason is not cleared correctly, we cannot
253	// accidentally execute an artifact compiled under a different wasmtime version, host
254	// environment, etc.
255	let artifact_path = generate_artifact_path(cache_path);
256
257	gum::debug!(
258		target: LOG_TARGET,
259		%worker_pid,
260		"promoting WIP artifact {} to {}",
261		tmp_file.display(),
262		artifact_path.display(),
263	);
264
265	let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await {
266		Ok(()) => Outcome::Concluded {
267			worker,
268			result: Ok(PrepareSuccess {
269				checksum,
270				path: artifact_path,
271				size,
272				stats: PrepareStats {
273					cpu_time_elapsed,
274					memory_stats: memory_stats.clone(),
275					observed_wasm_code_len,
276				},
277			}),
278		},
279		Err(err) => {
280			gum::warn!(
281				target: LOG_TARGET,
282				%worker_pid,
283				"failed to rename the artifact from {} to {}: {:?}",
284				tmp_file.display(),
285				artifact_path.display(),
286				err,
287			);
288			Outcome::RenameTmpFile {
289				worker,
290				err: format!("{:?}", err),
291				src: tmp_file.to_str().map(String::from),
292				dest: artifact_path.to_str().map(String::from),
293			}
294		},
295	};
296
297	// If there were no errors up until now, log the memory stats for a successful preparation, if
298	// available.
299	metrics.observe_preparation_memory_metrics(memory_stats);
300
301	outcome
302}
303
304/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
305/// passing the file path in, and clean up the worker cache.
306///
307/// Failure to clean up the worker cache results in an error - leaving any files here could be a
308/// security issue, and we should shut down the worker. This should be very rare.
309async fn with_worker_dir_setup<F, Fut>(
310	worker_dir: WorkerDir,
311	stream: UnixStream,
312	pid: u32,
313	f: F,
314) -> Outcome
315where
316	Fut: futures::Future<Output = Outcome>,
317	F: FnOnce(PathBuf, UnixStream, WorkerDir) -> Fut,
318{
319	// Create the tmp file here so that the child doesn't need any file creation rights. This will
320	// be cleared at the end of this function.
321	let tmp_file = worker_dir::prepare_tmp_artifact(worker_dir.path());
322	if let Err(err) = tokio::fs::File::create(&tmp_file).await {
323		gum::warn!(
324			target: LOG_TARGET,
325			worker_pid = %pid,
326			?worker_dir,
327			"failed to create a temp file for the artifact: {:?}",
328			err,
329		);
330		return Outcome::CreateTmpFileErr {
331			worker: IdleWorker { stream, pid, worker_dir },
332			err: format!("{:?}", err),
333		}
334	};
335
336	let worker_dir_path = worker_dir.path().to_owned();
337	let outcome = f(tmp_file, stream, worker_dir).await;
338
339	// Try to clear the worker dir.
340	if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
341		gum::warn!(
342			target: LOG_TARGET,
343			worker_pid = %pid,
344			?worker_dir_path,
345			"failed to clear worker cache after the job: {:?}",
346			err,
347		);
348		return Outcome::ClearWorkerDir { err: format!("{:?}", err) }
349	}
350
351	outcome
352}
353
354async fn send_request(stream: &mut UnixStream, pvf: &PvfPrepData) -> io::Result<()> {
355	framed_send(stream, &pvf.encode()).await?;
356	Ok(())
357}
358
359async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareWorkerResult> {
360	let result = framed_recv(stream).await?;
361	let result = PrepareWorkerResult::decode(&mut &result[..]).map_err(|e| {
362		// We received invalid bytes from the worker.
363		let bound_bytes = &result[..result.len().min(4)];
364		gum::warn!(
365			target: LOG_TARGET,
366			worker_pid = %pid,
367			"received unexpected response from the prepare worker: {}",
368			HexDisplay::from(&bound_bytes),
369		);
370		io::Error::new(
371			io::ErrorKind::Other,
372			format!("prepare pvf recv_response: failed to decode result: {:?}", e),
373		)
374	})?;
375	Ok(result)
376}