referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/execute/
worker_interface.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Host interface to the execute worker.
18
19use crate::{
20	artifacts::ArtifactPathId,
21	worker_interface::{
22		clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
23		SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
24	},
25	LOG_TARGET,
26};
27use codec::{Decode, Encode};
28use futures::FutureExt;
29use futures_timer::Delay;
30use polkadot_node_core_pvf_common::{
31	error::InternalValidationError,
32	execute::{Handshake, WorkerError, WorkerResponse},
33	worker_dir, ArtifactChecksum, SecurityStatus,
34};
35use polkadot_node_primitives::PoV;
36use polkadot_primitives::{ExecutorParams, PersistedValidationData};
37use std::{path::Path, sync::Arc, time::Duration};
38use tokio::{io, net::UnixStream};
39
40/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
41///
42/// Sends a handshake message to the worker as soon as it is spawned.
43pub async fn spawn(
44	program_path: &Path,
45	cache_path: &Path,
46	executor_params: ExecutorParams,
47	spawn_timeout: Duration,
48	node_version: Option<&str>,
49	security_status: SecurityStatus,
50) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
51	let mut extra_args = vec!["execute-worker"];
52	if let Some(node_version) = node_version {
53		extra_args.extend_from_slice(&["--node-impl-version", node_version]);
54	}
55
56	let (mut idle_worker, worker_handle) = spawn_with_program_path(
57		"execute",
58		program_path,
59		cache_path,
60		&extra_args,
61		spawn_timeout,
62		security_status,
63	)
64	.await?;
65	send_execute_handshake(&mut idle_worker.stream, Handshake { executor_params })
66		.await
67		.map_err(|error| {
68			let err = SpawnErr::Handshake { err: error.to_string() };
69			gum::warn!(
70				target: LOG_TARGET,
71				worker_pid = %idle_worker.pid,
72				"failed to send a handshake to the spawned worker: {}",
73				error
74			);
75			err
76		})?;
77	Ok((idle_worker, worker_handle))
78}
79
80/// Outcome of PVF execution.
81///
82/// PVF execution completed and the result is returned. The worker is ready for
83/// another job.
84pub struct Response {
85	/// The response (valid/invalid) from the worker.
86	pub worker_response: WorkerResponse,
87	/// Returning the idle worker token means the worker can be reused.
88	pub idle_worker: IdleWorker,
89}
90/// The idle worker token is not returned for any of these cases, meaning the worker must be
91/// terminated.
92///
93/// NOTE: Errors related to the preparation process are not expected to be encountered by the
94/// execution workers.
95#[derive(thiserror::Error, Debug)]
96pub enum Error {
97	/// The execution time exceeded the hard limit. The worker is terminated.
98	#[error("The communication with the worker exceeded the hard limit")]
99	HardTimeout,
100	/// An I/O error happened during communication with the worker. This may mean that the worker
101	/// process already died. The token is not returned in any case.
102	#[error("An I/O error happened during communication with the worker: {0}")]
103	CommunicationErr(#[from] io::Error),
104	/// The worker reported an error (can be from itself or from the job). The worker should not be
105	/// reused.
106	#[error("The worker reported an error: {0}")]
107	WorkerError(#[from] WorkerError),
108
109	/// An internal error happened during the validation. Such an error is most likely related to
110	/// some transient glitch.
111	///
112	/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
113	/// be a problem with the worker, so we terminate it.
114	#[error("An internal error occurred: {0}")]
115	InternalError(#[from] InternalValidationError),
116}
117
118/// Given the idle token of a worker and parameters of work, communicates with the worker and
119/// returns the outcome.
120///
121/// NOTE: Not returning the idle worker token in `Outcome` will trigger the child process being
122/// killed, if it's still alive.
123pub async fn start_work(
124	worker: IdleWorker,
125	artifact: ArtifactPathId,
126	execution_timeout: Duration,
127	pvd: Arc<PersistedValidationData>,
128	pov: Arc<PoV>,
129) -> Result<Response, Error> {
130	let IdleWorker { mut stream, pid, worker_dir } = worker;
131
132	gum::debug!(
133		target: LOG_TARGET,
134		worker_pid = %pid,
135		?worker_dir,
136		validation_code_hash = ?artifact.id.code_hash,
137		"starting execute for {}",
138		artifact.path.display(),
139	);
140
141	with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
142		send_request(&mut stream, pvd, pov, execution_timeout, artifact.checksum)
143			.await
144			.map_err(|error| {
145				gum::warn!(
146					target: LOG_TARGET,
147					worker_pid = %pid,
148					validation_code_hash = ?artifact.id.code_hash,
149					"failed to send an execute request: {}",
150					error,
151				);
152				Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
153			})?;
154
155		// We use a generous timeout here. This is in addition to the one in the child process, in
156		// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
157		// in the child. We want to use CPU time because it varies less than wall clock time under
158		// load, but the CPU resources of the child can only be measured from the parent after the
159		// child process terminates.
160		let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
161		let worker_result = futures::select! {
162			worker_result = recv_result(&mut stream).fuse() => {
163				match worker_result {
164					Ok(result) =>
165						handle_result(
166							result,
167							pid,
168							execution_timeout,
169						)
170							.await,
171					Err(error) => {
172						gum::warn!(
173							target: LOG_TARGET,
174							worker_pid = %pid,
175							validation_code_hash = ?artifact.id.code_hash,
176							"failed to recv an execute result: {}",
177							error,
178						);
179
180						return Err(Error::CommunicationErr(error))
181					},
182				}
183			},
184			_ = Delay::new(timeout).fuse() => {
185				gum::warn!(
186					target: LOG_TARGET,
187					worker_pid = %pid,
188					validation_code_hash = ?artifact.id.code_hash,
189					"execution worker exceeded lenient timeout for execution, child worker likely stalled",
190				);
191				return Err(Error::HardTimeout)
192			},
193		};
194
195		match worker_result {
196			Ok(worker_response) => Ok(Response {
197				worker_response,
198				idle_worker: IdleWorker { stream, pid, worker_dir },
199			}),
200			Err(worker_error) => Err(worker_error.into()),
201		}
202	})
203	.await
204}
205
206/// Handles the case where we successfully received response bytes on the host from the child.
207///
208/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
209/// by [`with_worker_dir_setup`].
210async fn handle_result(
211	worker_result: Result<WorkerResponse, WorkerError>,
212	worker_pid: u32,
213	execution_timeout: Duration,
214) -> Result<WorkerResponse, WorkerError> {
215	if let Ok(WorkerResponse { duration, .. }) = worker_result {
216		if duration > execution_timeout {
217			// The job didn't complete within the timeout.
218			gum::warn!(
219				target: LOG_TARGET,
220				worker_pid,
221				"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
222				duration.as_millis(),
223				execution_timeout.as_millis(),
224			);
225
226			// Return a timeout error.
227			return Err(WorkerError::JobTimedOut)
228		}
229	}
230
231	worker_result
232}
233
234/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
235/// passing the file path in, and clean up the worker cache.
236///
237/// Failure to clean up the worker cache results in an error - leaving any files here could be a
238/// security issue, and we should shut down the worker. This should be very rare.
239async fn with_worker_dir_setup<F, Fut>(
240	worker_dir: WorkerDir,
241	pid: u32,
242	artifact_path: &Path,
243	f: F,
244) -> Result<Response, Error>
245where
246	Fut: futures::Future<Output = Result<Response, Error>>,
247	F: FnOnce(WorkerDir) -> Fut,
248{
249	// Cheaply create a hard link to the artifact. The artifact is always at a known location in the
250	// worker cache, and the child can't access any other artifacts or gain any information from the
251	// original filename.
252	let link_path = worker_dir::execute_artifact(worker_dir.path());
253	if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await {
254		gum::warn!(
255			target: LOG_TARGET,
256			worker_pid = %pid,
257			?worker_dir,
258			"failed to clear worker cache after the job: {}",
259			err,
260		);
261		return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into());
262	}
263
264	let worker_dir_path = worker_dir.path().to_owned();
265	let result = f(worker_dir).await;
266
267	// Try to clear the worker dir.
268	if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
269		gum::warn!(
270			target: LOG_TARGET,
271			worker_pid = %pid,
272			?worker_dir_path,
273			"failed to clear worker cache after the job: {:?}",
274			err,
275		);
276		return Err(InternalValidationError::CouldNotClearWorkerDir {
277			err: format!("{:?}", err),
278			path: worker_dir_path.to_str().map(String::from),
279		}
280		.into())
281	}
282
283	result
284}
285
286/// Sends a handshake with information specific to the execute worker.
287async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
288	framed_send(stream, &handshake.encode()).await
289}
290
291async fn send_request(
292	stream: &mut UnixStream,
293	pvd: Arc<PersistedValidationData>,
294	pov: Arc<PoV>,
295	execution_timeout: Duration,
296	artifact_checksum: ArtifactChecksum,
297) -> io::Result<()> {
298	let request = polkadot_node_core_pvf_common::execute::ExecuteRequest {
299		pvd: (*pvd).clone(),
300		pov: (*pov).clone(),
301		execution_timeout,
302		artifact_checksum,
303	};
304	framed_send(stream, &request.encode()).await
305}
306
307async fn recv_result(stream: &mut UnixStream) -> io::Result<Result<WorkerResponse, WorkerError>> {
308	let result_bytes = framed_recv(stream).await?;
309	Result::<WorkerResponse, WorkerError>::decode(&mut result_bytes.as_slice()).map_err(|e| {
310		io::Error::new(
311			io::ErrorKind::Other,
312			format!("execute pvf recv_result: decode error: {:?}", e),
313		)
314	})
315}