1use crate::{
20 artifacts::generate_artifact_path,
21 metrics::Metrics,
22 worker_interface::{
23 clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
24 SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
25 },
26 LOG_TARGET,
27};
28use codec::{Decode, Encode};
29use polkadot_node_core_pvf_common::{
30 error::{PrepareError, PrepareResult, PrepareWorkerResult},
31 prepare::{PrepareStats, PrepareSuccess, PrepareWorkerSuccess},
32 pvf::PvfPrepData,
33 worker_dir, SecurityStatus,
34};
35
36use sp_core::hexdisplay::HexDisplay;
37use std::{
38 path::{Path, PathBuf},
39 time::Duration,
40};
41use tokio::{io, net::UnixStream};
42
43pub async fn spawn(
47 program_path: &Path,
48 cache_path: &Path,
49 spawn_timeout: Duration,
50 node_version: Option<&str>,
51 security_status: SecurityStatus,
52) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
53 let mut extra_args = vec!["prepare-worker"];
54 if let Some(node_version) = node_version {
55 extra_args.extend_from_slice(&["--node-impl-version", node_version]);
56 }
57
58 spawn_with_program_path(
59 "prepare",
60 program_path,
61 cache_path,
62 &extra_args,
63 spawn_timeout,
64 security_status,
65 )
66 .await
67}
68
69pub enum Outcome {
73 Concluded { worker: IdleWorker, result: PrepareResult },
75 Unreachable,
78 CreateTmpFileErr { worker: IdleWorker, err: String },
80 RenameTmpFile {
83 worker: IdleWorker,
84 err: String,
85 src: Option<String>,
88 dest: Option<String>,
89 },
90 ClearWorkerDir { err: String },
92 TimedOut,
96 IoErr(String),
100 OutOfMemory,
102 JobDied { err: String, job_pid: i32 },
106}
107
108pub async fn start_work(
114 metrics: &Metrics,
115 worker: IdleWorker,
116 pvf: PvfPrepData,
117 cache_path: PathBuf,
118) -> Outcome {
119 let IdleWorker { stream, pid, worker_dir } = worker;
120
121 gum::debug!(
122 target: LOG_TARGET,
123 worker_pid = %pid,
124 ?worker_dir,
125 "starting prepare for {:?}",
126 pvf,
127 );
128
129 with_worker_dir_setup(
130 worker_dir,
131 stream,
132 pid,
133 |tmp_artifact_file, mut stream, worker_dir| async move {
134 let preparation_timeout = pvf.prep_timeout();
135
136 if let Err(err) = send_request(&mut stream, &pvf).await {
137 gum::warn!(
138 target: LOG_TARGET,
139 worker_pid = %pid,
140 "failed to send a prepare request: {:?}",
141 err,
142 );
143 return Outcome::Unreachable
144 }
145
146 let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
156 let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
157
158 match result {
159 Ok(Ok(prepare_worker_result)) =>
161 handle_response(
162 metrics,
163 IdleWorker { stream, pid, worker_dir },
164 prepare_worker_result,
165 pid,
166 tmp_artifact_file,
167 &cache_path,
168 preparation_timeout,
169 )
170 .await,
171 Ok(Err(err)) => {
172 gum::warn!(
174 target: LOG_TARGET,
175 worker_pid = %pid,
176 "failed to recv a prepare response: {}",
177 err,
178 );
179 Outcome::IoErr(err.to_string())
180 },
181 Err(_) => {
182 gum::warn!(
184 target: LOG_TARGET,
185 worker_pid = %pid,
186 "did not recv a prepare response within the time limit",
187 );
188 Outcome::TimedOut
189 },
190 }
191 },
192 )
193 .await
194}
195
196async fn handle_response(
201 metrics: &Metrics,
202 worker: IdleWorker,
203 result: PrepareWorkerResult,
204 worker_pid: u32,
205 tmp_file: PathBuf,
206 cache_path: &Path,
207 preparation_timeout: Duration,
208) -> Outcome {
209 let PrepareWorkerSuccess {
212 checksum,
213 stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len },
214 } = match result.clone() {
215 Ok(result) => result,
216 Err(PrepareError::TimedOut) => return Outcome::TimedOut,
218 Err(PrepareError::JobDied { err, job_pid }) => return Outcome::JobDied { err, job_pid },
219 Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
220 Err(err) => return Outcome::Concluded { worker, result: Err(err) },
221 };
222
223 metrics.observe_code_size(observed_wasm_code_len as usize);
224
225 if cpu_time_elapsed > preparation_timeout {
226 gum::warn!(
228 target: LOG_TARGET,
229 %worker_pid,
230 "prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
231 cpu_time_elapsed.as_millis(),
232 preparation_timeout.as_millis(),
233 tmp_file.display(),
234 );
235 return Outcome::TimedOut
236 }
237
238 let size = match tokio::fs::metadata(cache_path).await {
239 Ok(metadata) => metadata.len(),
240 Err(err) => {
241 gum::warn!(
242 target: LOG_TARGET,
243 ?cache_path,
244 "failed to read size of the artifact: {}",
245 err,
246 );
247 return Outcome::IoErr(err.to_string())
248 },
249 };
250
251 let artifact_path = generate_artifact_path(cache_path);
256
257 gum::debug!(
258 target: LOG_TARGET,
259 %worker_pid,
260 "promoting WIP artifact {} to {}",
261 tmp_file.display(),
262 artifact_path.display(),
263 );
264
265 let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await {
266 Ok(()) => Outcome::Concluded {
267 worker,
268 result: Ok(PrepareSuccess {
269 checksum,
270 path: artifact_path,
271 size,
272 stats: PrepareStats {
273 cpu_time_elapsed,
274 memory_stats: memory_stats.clone(),
275 observed_wasm_code_len,
276 },
277 }),
278 },
279 Err(err) => {
280 gum::warn!(
281 target: LOG_TARGET,
282 %worker_pid,
283 "failed to rename the artifact from {} to {}: {:?}",
284 tmp_file.display(),
285 artifact_path.display(),
286 err,
287 );
288 Outcome::RenameTmpFile {
289 worker,
290 err: format!("{:?}", err),
291 src: tmp_file.to_str().map(String::from),
292 dest: artifact_path.to_str().map(String::from),
293 }
294 },
295 };
296
297 metrics.observe_preparation_memory_metrics(memory_stats);
300
301 outcome
302}
303
304async fn with_worker_dir_setup<F, Fut>(
310 worker_dir: WorkerDir,
311 stream: UnixStream,
312 pid: u32,
313 f: F,
314) -> Outcome
315where
316 Fut: futures::Future<Output = Outcome>,
317 F: FnOnce(PathBuf, UnixStream, WorkerDir) -> Fut,
318{
319 let tmp_file = worker_dir::prepare_tmp_artifact(worker_dir.path());
322 if let Err(err) = tokio::fs::File::create(&tmp_file).await {
323 gum::warn!(
324 target: LOG_TARGET,
325 worker_pid = %pid,
326 ?worker_dir,
327 "failed to create a temp file for the artifact: {:?}",
328 err,
329 );
330 return Outcome::CreateTmpFileErr {
331 worker: IdleWorker { stream, pid, worker_dir },
332 err: format!("{:?}", err),
333 }
334 };
335
336 let worker_dir_path = worker_dir.path().to_owned();
337 let outcome = f(tmp_file, stream, worker_dir).await;
338
339 if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
341 gum::warn!(
342 target: LOG_TARGET,
343 worker_pid = %pid,
344 ?worker_dir_path,
345 "failed to clear worker cache after the job: {:?}",
346 err,
347 );
348 return Outcome::ClearWorkerDir { err: format!("{:?}", err) }
349 }
350
351 outcome
352}
353
354async fn send_request(stream: &mut UnixStream, pvf: &PvfPrepData) -> io::Result<()> {
355 framed_send(stream, &pvf.encode()).await?;
356 Ok(())
357}
358
359async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareWorkerResult> {
360 let result = framed_recv(stream).await?;
361 let result = PrepareWorkerResult::decode(&mut &result[..]).map_err(|e| {
362 let bound_bytes = &result[..result.len().min(4)];
364 gum::warn!(
365 target: LOG_TARGET,
366 worker_pid = %pid,
367 "received unexpected response from the prepare worker: {}",
368 HexDisplay::from(&bound_bytes),
369 );
370 io::Error::new(
371 io::ErrorKind::Other,
372 format!("prepare pvf recv_response: failed to decode result: {:?}", e),
373 )
374 })?;
375 Ok(result)
376}