polkadot_node_core_pvf/execute/
worker_interface.rs1use crate::{
20 artifacts::ArtifactPathId,
21 worker_interface::{
22 clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
23 SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
24 },
25 LOG_TARGET,
26};
27use codec::{Decode, Encode};
28use futures::FutureExt;
29use futures_timer::Delay;
30use polkadot_node_core_pvf_common::{
31 error::InternalValidationError,
32 execute::{Handshake, WorkerError, WorkerResponse},
33 worker_dir, ArtifactChecksum, SecurityStatus,
34};
35use polkadot_node_primitives::PoV;
36use polkadot_primitives::{ExecutorParams, PersistedValidationData};
37use std::{path::Path, sync::Arc, time::Duration};
38use tokio::{io, net::UnixStream};
39
40pub async fn spawn(
44 program_path: &Path,
45 cache_path: &Path,
46 executor_params: ExecutorParams,
47 spawn_timeout: Duration,
48 node_version: Option<&str>,
49 security_status: SecurityStatus,
50) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
51 let mut extra_args = vec!["execute-worker"];
52 if let Some(node_version) = node_version {
53 extra_args.extend_from_slice(&["--node-impl-version", node_version]);
54 }
55
56 let (mut idle_worker, worker_handle) = spawn_with_program_path(
57 "execute",
58 program_path,
59 cache_path,
60 &extra_args,
61 spawn_timeout,
62 security_status,
63 )
64 .await?;
65 send_execute_handshake(&mut idle_worker.stream, Handshake { executor_params })
66 .await
67 .map_err(|error| {
68 let err = SpawnErr::Handshake { err: error.to_string() };
69 gum::warn!(
70 target: LOG_TARGET,
71 worker_pid = %idle_worker.pid,
72 "failed to send a handshake to the spawned worker: {}",
73 error
74 );
75 err
76 })?;
77 Ok((idle_worker, worker_handle))
78}
79
80pub struct Response {
85 pub worker_response: WorkerResponse,
87 pub idle_worker: IdleWorker,
89}
90#[derive(thiserror::Error, Debug)]
96pub enum Error {
97 #[error("The communication with the worker exceeded the hard limit")]
99 HardTimeout,
100 #[error("An I/O error happened during communication with the worker: {0}")]
103 CommunicationErr(#[from] io::Error),
104 #[error("The worker reported an error: {0}")]
107 WorkerError(#[from] WorkerError),
108
109 #[error("An internal error occurred: {0}")]
115 InternalError(#[from] InternalValidationError),
116}
117
118pub async fn start_work(
124 worker: IdleWorker,
125 artifact: ArtifactPathId,
126 execution_timeout: Duration,
127 pvd: Arc<PersistedValidationData>,
128 pov: Arc<PoV>,
129) -> Result<Response, Error> {
130 let IdleWorker { mut stream, pid, worker_dir } = worker;
131
132 gum::debug!(
133 target: LOG_TARGET,
134 worker_pid = %pid,
135 ?worker_dir,
136 validation_code_hash = ?artifact.id.code_hash,
137 "starting execute for {}",
138 artifact.path.display(),
139 );
140
141 with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
142 send_request(&mut stream, pvd, pov, execution_timeout, artifact.checksum)
143 .await
144 .map_err(|error| {
145 gum::warn!(
146 target: LOG_TARGET,
147 worker_pid = %pid,
148 validation_code_hash = ?artifact.id.code_hash,
149 "failed to send an execute request: {}",
150 error,
151 );
152 Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
153 })?;
154
155 let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
161 let worker_result = futures::select! {
162 worker_result = recv_result(&mut stream).fuse() => {
163 match worker_result {
164 Ok(result) =>
165 handle_result(
166 result,
167 pid,
168 execution_timeout,
169 )
170 .await,
171 Err(error) => {
172 gum::warn!(
173 target: LOG_TARGET,
174 worker_pid = %pid,
175 validation_code_hash = ?artifact.id.code_hash,
176 "failed to recv an execute result: {}",
177 error,
178 );
179
180 return Err(Error::CommunicationErr(error))
181 },
182 }
183 },
184 _ = Delay::new(timeout).fuse() => {
185 gum::warn!(
186 target: LOG_TARGET,
187 worker_pid = %pid,
188 validation_code_hash = ?artifact.id.code_hash,
189 "execution worker exceeded lenient timeout for execution, child worker likely stalled",
190 );
191 return Err(Error::HardTimeout)
192 },
193 };
194
195 match worker_result {
196 Ok(worker_response) => Ok(Response {
197 worker_response,
198 idle_worker: IdleWorker { stream, pid, worker_dir },
199 }),
200 Err(worker_error) => Err(worker_error.into()),
201 }
202 })
203 .await
204}
205
206async fn handle_result(
211 worker_result: Result<WorkerResponse, WorkerError>,
212 worker_pid: u32,
213 execution_timeout: Duration,
214) -> Result<WorkerResponse, WorkerError> {
215 if let Ok(WorkerResponse { duration, .. }) = worker_result {
216 if duration > execution_timeout {
217 gum::warn!(
219 target: LOG_TARGET,
220 worker_pid,
221 "execute job took {}ms cpu time, exceeded execution timeout {}ms.",
222 duration.as_millis(),
223 execution_timeout.as_millis(),
224 );
225
226 return Err(WorkerError::JobTimedOut)
228 }
229 }
230
231 worker_result
232}
233
234async fn with_worker_dir_setup<F, Fut>(
240 worker_dir: WorkerDir,
241 pid: u32,
242 artifact_path: &Path,
243 f: F,
244) -> Result<Response, Error>
245where
246 Fut: futures::Future<Output = Result<Response, Error>>,
247 F: FnOnce(WorkerDir) -> Fut,
248{
249 let link_path = worker_dir::execute_artifact(worker_dir.path());
253 if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await {
254 gum::warn!(
255 target: LOG_TARGET,
256 worker_pid = %pid,
257 ?worker_dir,
258 "failed to clear worker cache after the job: {}",
259 err,
260 );
261 return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into());
262 }
263
264 let worker_dir_path = worker_dir.path().to_owned();
265 let result = f(worker_dir).await;
266
267 if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
269 gum::warn!(
270 target: LOG_TARGET,
271 worker_pid = %pid,
272 ?worker_dir_path,
273 "failed to clear worker cache after the job: {:?}",
274 err,
275 );
276 return Err(InternalValidationError::CouldNotClearWorkerDir {
277 err: format!("{:?}", err),
278 path: worker_dir_path.to_str().map(String::from),
279 }
280 .into())
281 }
282
283 result
284}
285
286async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
288 framed_send(stream, &handshake.encode()).await
289}
290
291async fn send_request(
292 stream: &mut UnixStream,
293 pvd: Arc<PersistedValidationData>,
294 pov: Arc<PoV>,
295 execution_timeout: Duration,
296 artifact_checksum: ArtifactChecksum,
297) -> io::Result<()> {
298 let request = polkadot_node_core_pvf_common::execute::ExecuteRequest {
299 pvd: (*pvd).clone(),
300 pov: (*pov).clone(),
301 execution_timeout,
302 artifact_checksum,
303 };
304 framed_send(stream, &request.encode()).await
305}
306
307async fn recv_result(stream: &mut UnixStream) -> io::Result<Result<WorkerResponse, WorkerError>> {
308 let result_bytes = framed_recv(stream).await?;
309 Result::<WorkerResponse, WorkerError>::decode(&mut result_bytes.as_slice()).map_err(|e| {
310 io::Error::new(
311 io::ErrorKind::Other,
312 format!("execute pvf recv_result: decode error: {:?}", e),
313 )
314 })
315}