1use super::worker_interface::{self, Outcome};
18use crate::{
19 metrics::Metrics,
20 worker_interface::{IdleWorker, WorkerHandle},
21 LOG_TARGET,
22};
23use always_assert::never;
24use futures::{
25 channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
26};
27use polkadot_node_core_pvf_common::{
28 error::{PrepareError, PrepareResult},
29 pvf::PvfPrepData,
30 SecurityStatus,
31};
32use slotmap::HopSlotMap;
33use std::{
34 fmt,
35 path::{Path, PathBuf},
36 task::Poll,
37 time::Duration,
38};
39
40slotmap::new_key_type! { pub struct Worker; }
41
42#[derive(Debug, PartialEq, Eq)]
44pub enum ToPool {
45 Spawn,
55
56 Kill(Worker),
62
63 StartWork { worker: Worker, pvf: PvfPrepData, cache_path: PathBuf },
72}
73
74#[derive(Debug)]
76pub enum FromPool {
77 Spawned(Worker),
79
80 Concluded {
82 worker: Worker,
84 rip: bool,
86 result: PrepareResult,
89 },
90
91 Rip(Worker),
93}
94
95struct WorkerData {
96 idle: Option<IdleWorker>,
97 handle: WorkerHandle,
98}
99
100impl fmt::Debug for WorkerData {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 write!(f, "WorkerData(pid={})", self.handle.id())
103 }
104}
105
106enum PoolEvent {
107 Spawn(IdleWorker, WorkerHandle),
108 StartWork(Worker, Outcome),
109}
110
111type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
112
113struct Pool {
114 program_path: PathBuf,
116 cache_path: PathBuf,
117 spawn_timeout: Duration,
118 node_version: Option<String>,
119 security_status: SecurityStatus,
120
121 to_pool: mpsc::Receiver<ToPool>,
122 from_pool: mpsc::UnboundedSender<FromPool>,
123 spawned: HopSlotMap<Worker, WorkerData>,
124 mux: Mux,
125
126 metrics: Metrics,
127}
128
129struct Fatal;
131
132async fn run(
133 Pool {
134 program_path,
135 cache_path,
136 spawn_timeout,
137 node_version,
138 security_status,
139 to_pool,
140 mut from_pool,
141 mut spawned,
142 mut mux,
143 metrics,
144 }: Pool,
145) {
146 macro_rules! break_if_fatal {
147 ($expr:expr) => {
148 match $expr {
149 Err(Fatal) => break,
150 Ok(v) => v,
151 }
152 };
153 }
154
155 let mut to_pool = to_pool.fuse();
156
157 loop {
158 futures::select! {
159 to_pool = to_pool.next() => {
160 let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
161 handle_to_pool(
162 &metrics,
163 &program_path,
164 &cache_path,
165 spawn_timeout,
166 node_version.clone(),
167 security_status.clone(),
168 &mut spawned,
169 &mut mux,
170 to_pool,
171 )
172 }
173 ev = mux.select_next_some() => {
174 break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev))
175 }
176 }
177
178 break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await);
179 }
180}
181
182async fn purge_dead(
183 metrics: &Metrics,
184 from_pool: &mut mpsc::UnboundedSender<FromPool>,
185 spawned: &mut HopSlotMap<Worker, WorkerData>,
186) -> Result<(), Fatal> {
187 let mut to_remove = vec![];
188 for (worker, data) in spawned.iter_mut() {
189 if data.idle.is_none() {
190 continue
194 }
195
196 if let Poll::Ready(()) = futures::poll!(&mut data.handle) {
197 to_remove.push(worker);
199 }
200 }
201 for w in to_remove {
202 if attempt_retire(metrics, spawned, w) {
203 reply(from_pool, FromPool::Rip(w))?;
204 }
205 }
206 Ok(())
207}
208
209fn handle_to_pool(
210 metrics: &Metrics,
211 program_path: &Path,
212 cache_path: &Path,
213 spawn_timeout: Duration,
214 node_version: Option<String>,
215 security_status: SecurityStatus,
216 spawned: &mut HopSlotMap<Worker, WorkerData>,
217 mux: &mut Mux,
218 to_pool: ToPool,
219) {
220 match to_pool {
221 ToPool::Spawn => {
222 gum::debug!(target: LOG_TARGET, "spawning a new prepare worker");
223 metrics.prepare_worker().on_begin_spawn();
224 mux.push(
225 spawn_worker_task(
226 program_path.to_owned(),
227 cache_path.to_owned(),
228 spawn_timeout,
229 node_version,
230 security_status,
231 )
232 .boxed(),
233 );
234 },
235 ToPool::StartWork { worker, pvf, cache_path } => {
236 if let Some(data) = spawned.get_mut(worker) {
237 if let Some(idle) = data.idle.take() {
238 let preparation_timer = metrics.time_preparation();
239 mux.push(
240 start_work_task(
241 metrics.clone(),
242 worker,
243 idle,
244 pvf,
245 cache_path,
246 preparation_timer,
247 )
248 .boxed(),
249 );
250 } else {
251 never!("unexpected absence of the idle token in prepare pool");
257 }
258 } else {
259 }
262 },
263 ToPool::Kill(worker) => {
264 gum::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
265 let _ = attempt_retire(metrics, spawned, worker);
267 },
268 }
269}
270
271async fn spawn_worker_task(
272 program_path: PathBuf,
273 cache_path: PathBuf,
274 spawn_timeout: Duration,
275 node_version: Option<String>,
276 security_status: SecurityStatus,
277) -> PoolEvent {
278 use futures_timer::Delay;
279
280 loop {
281 match worker_interface::spawn(
282 &program_path,
283 &cache_path,
284 spawn_timeout,
285 node_version.as_deref(),
286 security_status.clone(),
287 )
288 .await
289 {
290 Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
291 Err(err) => {
292 gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
293
294 Delay::new(Duration::from_secs(3)).await;
296 },
297 }
298 }
299}
300
301async fn start_work_task<Timer>(
302 metrics: Metrics,
303 worker: Worker,
304 idle: IdleWorker,
305 pvf: PvfPrepData,
306 cache_path: PathBuf,
307 _preparation_timer: Option<Timer>,
308) -> PoolEvent {
309 let outcome = worker_interface::start_work(&metrics, idle, pvf, cache_path).await;
310 PoolEvent::StartWork(worker, outcome)
311}
312
313fn handle_mux(
314 metrics: &Metrics,
315 from_pool: &mut mpsc::UnboundedSender<FromPool>,
316 spawned: &mut HopSlotMap<Worker, WorkerData>,
317 event: PoolEvent,
318) -> Result<(), Fatal> {
319 match event {
320 PoolEvent::Spawn(idle, handle) => {
321 metrics.prepare_worker().on_spawned();
322
323 let worker = spawned.insert(WorkerData { idle: Some(idle), handle });
324
325 reply(from_pool, FromPool::Spawned(worker))?;
326
327 Ok(())
328 },
329 PoolEvent::StartWork(worker, outcome) => {
330 match outcome {
333 Outcome::Concluded { worker: idle, result } =>
334 handle_concluded_no_rip(from_pool, spawned, worker, idle, result),
335 Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip(
338 from_pool,
339 spawned,
340 worker,
341 idle,
342 Err(PrepareError::CreateTmpFile(err)),
343 ),
344 Outcome::RenameTmpFile { worker: idle, err, src, dest } => handle_concluded_no_rip(
347 from_pool,
348 spawned,
349 worker,
350 idle,
351 Err(PrepareError::RenameTmpFile { err, src, dest }),
352 ),
353 Outcome::ClearWorkerDir { err } => {
355 if attempt_retire(metrics, spawned, worker) {
356 reply(
357 from_pool,
358 FromPool::Concluded {
359 worker,
360 rip: true,
361 result: Err(PrepareError::ClearWorkerDir(err)),
362 },
363 )?;
364 }
365
366 Ok(())
367 },
368 Outcome::Unreachable => {
369 if attempt_retire(metrics, spawned, worker) {
370 reply(from_pool, FromPool::Rip(worker))?;
371 }
372
373 Ok(())
374 },
375 Outcome::IoErr(err) => {
376 if attempt_retire(metrics, spawned, worker) {
377 reply(
378 from_pool,
379 FromPool::Concluded {
380 worker,
381 rip: true,
382 result: Err(PrepareError::IoErr(err)),
383 },
384 )?;
385 }
386
387 Ok(())
388 },
389 Outcome::JobDied { err, job_pid } => {
391 if attempt_retire(metrics, spawned, worker) {
392 reply(
393 from_pool,
394 FromPool::Concluded {
395 worker,
396 rip: true,
397 result: Err(PrepareError::JobDied { err, job_pid }),
398 },
399 )?;
400 }
401
402 Ok(())
403 },
404 Outcome::TimedOut => {
405 if attempt_retire(metrics, spawned, worker) {
406 reply(
407 from_pool,
408 FromPool::Concluded {
409 worker,
410 rip: true,
411 result: Err(PrepareError::TimedOut),
412 },
413 )?;
414 }
415
416 Ok(())
417 },
418 Outcome::OutOfMemory => {
419 if attempt_retire(metrics, spawned, worker) {
420 reply(
421 from_pool,
422 FromPool::Concluded {
423 worker,
424 rip: true,
425 result: Err(PrepareError::OutOfMemory),
426 },
427 )?;
428 }
429
430 Ok(())
431 },
432 }
433 },
434 }
435}
436
437fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result<(), Fatal> {
438 from_pool.unbounded_send(m).map_err(|_| Fatal)
439}
440
441fn attempt_retire(
448 metrics: &Metrics,
449 spawned: &mut HopSlotMap<Worker, WorkerData>,
450 worker: Worker,
451) -> bool {
452 if spawned.remove(worker).is_some() {
453 metrics.prepare_worker().on_retired();
454 true
455 } else {
456 false
457 }
458}
459
460fn handle_concluded_no_rip(
466 from_pool: &mut mpsc::UnboundedSender<FromPool>,
467 spawned: &mut HopSlotMap<Worker, WorkerData>,
468 worker: Worker,
469 idle: IdleWorker,
470 result: PrepareResult,
471) -> Result<(), Fatal> {
472 let data = match spawned.get_mut(worker) {
473 None => {
474 return Ok(())
477 },
478 Some(data) => data,
479 };
480
481 let old = data.idle.replace(idle);
484 never!(
485 old.is_some(),
486 "old idle worker was taken out when starting work; we only replace it here; qed"
487 );
488
489 reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
490
491 Ok(())
492}
493
494pub fn start(
496 metrics: Metrics,
497 program_path: PathBuf,
498 cache_path: PathBuf,
499 spawn_timeout: Duration,
500 node_version: Option<String>,
501 security_status: SecurityStatus,
502) -> (mpsc::Sender<ToPool>, mpsc::UnboundedReceiver<FromPool>, impl Future<Output = ()>) {
503 let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
504 let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
505
506 let run = run(Pool {
507 metrics,
508 program_path,
509 cache_path,
510 spawn_timeout,
511 node_version,
512 security_status,
513 to_pool: to_pool_rx,
514 from_pool: from_pool_tx,
515 spawned: HopSlotMap::with_capacity_and_key(20),
516 mux: Mux::new(),
517 });
518
519 (to_pool_tx, from_pool_rx, run)
520}