referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/prepare/
pool.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use super::worker_interface::{self, Outcome};
18use crate::{
19	metrics::Metrics,
20	worker_interface::{IdleWorker, WorkerHandle},
21	LOG_TARGET,
22};
23use always_assert::never;
24use futures::{
25	channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
26};
27use polkadot_node_core_pvf_common::{
28	error::{PrepareError, PrepareResult},
29	pvf::PvfPrepData,
30	SecurityStatus,
31};
32use slotmap::HopSlotMap;
33use std::{
34	fmt,
35	path::{Path, PathBuf},
36	task::Poll,
37	time::Duration,
38};
39
40slotmap::new_key_type! { pub struct Worker; }
41
42/// Messages that the pool handles.
43#[derive(Debug, PartialEq, Eq)]
44pub enum ToPool {
45	/// Request a new worker to spawn.
46	///
47	/// This request won't fail in case if the worker cannot be created. Instead, we consider
48	/// the failures transient and we try to spawn a worker after a delay.
49	///
50	/// [`FromPool::Spawned`] will be returned as soon as the worker is spawned.
51	///
52	/// The client should anticipate a [`FromPool::Rip`] message, in case the spawned worker was
53	/// stopped for some reason.
54	Spawn,
55
56	/// Kill the given worker. No-op if the given worker is not running.
57	///
58	/// [`FromPool::Rip`] won't be sent in this case. However, the client should be prepared to
59	/// receive [`FromPool::Rip`] nonetheless, since the worker may be have been ripped before
60	/// this message is processed.
61	Kill(Worker),
62
63	/// Request the given worker to start working on the given code.
64	///
65	/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent
66	/// back. It's also possible that the worker dies before handling the message in which case
67	/// [`FromPool::Rip`] will be sent back.
68	///
69	/// In either case, the worker is considered busy and no further `StartWork` messages should be
70	/// sent until either `Concluded` or `Rip` message is received.
71	StartWork { worker: Worker, pvf: PvfPrepData, cache_path: PathBuf },
72}
73
74/// A message sent from pool to its client.
75#[derive(Debug)]
76pub enum FromPool {
77	/// The given worker was just spawned and is ready to be used.
78	Spawned(Worker),
79
80	/// The given worker either succeeded or failed the given job.
81	Concluded {
82		/// A key for retrieving the worker data from the pool.
83		worker: Worker,
84		/// Indicates whether the worker process was killed.
85		rip: bool,
86		/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
87		/// Otherwise, an [error](PrepareError) is supplied.
88		result: PrepareResult,
89	},
90
91	/// The given worker ceased to exist.
92	Rip(Worker),
93}
94
95struct WorkerData {
96	idle: Option<IdleWorker>,
97	handle: WorkerHandle,
98}
99
100impl fmt::Debug for WorkerData {
101	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102		write!(f, "WorkerData(pid={})", self.handle.id())
103	}
104}
105
106enum PoolEvent {
107	Spawn(IdleWorker, WorkerHandle),
108	StartWork(Worker, Outcome),
109}
110
111type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
112
113struct Pool {
114	// Some variables related to the current session.
115	program_path: PathBuf,
116	cache_path: PathBuf,
117	spawn_timeout: Duration,
118	node_version: Option<String>,
119	security_status: SecurityStatus,
120
121	to_pool: mpsc::Receiver<ToPool>,
122	from_pool: mpsc::UnboundedSender<FromPool>,
123	spawned: HopSlotMap<Worker, WorkerData>,
124	mux: Mux,
125
126	metrics: Metrics,
127}
128
129/// A fatal error that warrants stopping the event loop of the pool.
130struct Fatal;
131
132async fn run(
133	Pool {
134		program_path,
135		cache_path,
136		spawn_timeout,
137		node_version,
138		security_status,
139		to_pool,
140		mut from_pool,
141		mut spawned,
142		mut mux,
143		metrics,
144	}: Pool,
145) {
146	macro_rules! break_if_fatal {
147		($expr:expr) => {
148			match $expr {
149				Err(Fatal) => break,
150				Ok(v) => v,
151			}
152		};
153	}
154
155	let mut to_pool = to_pool.fuse();
156
157	loop {
158		futures::select! {
159			to_pool = to_pool.next() => {
160				let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
161				handle_to_pool(
162					&metrics,
163					&program_path,
164					&cache_path,
165					spawn_timeout,
166					node_version.clone(),
167					security_status.clone(),
168					&mut spawned,
169					&mut mux,
170					to_pool,
171				)
172			}
173			ev = mux.select_next_some() => {
174				break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev))
175			}
176		}
177
178		break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await);
179	}
180}
181
182async fn purge_dead(
183	metrics: &Metrics,
184	from_pool: &mut mpsc::UnboundedSender<FromPool>,
185	spawned: &mut HopSlotMap<Worker, WorkerData>,
186) -> Result<(), Fatal> {
187	let mut to_remove = vec![];
188	for (worker, data) in spawned.iter_mut() {
189		if data.idle.is_none() {
190			// The idle token is missing, meaning this worker is now occupied: skip it. This is
191			// because the worker process is observed by the work task and should it reach the
192			// deadline or be terminated it will be handled by the corresponding mux event.
193			continue
194		}
195
196		if let Poll::Ready(()) = futures::poll!(&mut data.handle) {
197			// a resolved future means that the worker has terminated. Weed it out.
198			to_remove.push(worker);
199		}
200	}
201	for w in to_remove {
202		if attempt_retire(metrics, spawned, w) {
203			reply(from_pool, FromPool::Rip(w))?;
204		}
205	}
206	Ok(())
207}
208
209fn handle_to_pool(
210	metrics: &Metrics,
211	program_path: &Path,
212	cache_path: &Path,
213	spawn_timeout: Duration,
214	node_version: Option<String>,
215	security_status: SecurityStatus,
216	spawned: &mut HopSlotMap<Worker, WorkerData>,
217	mux: &mut Mux,
218	to_pool: ToPool,
219) {
220	match to_pool {
221		ToPool::Spawn => {
222			gum::debug!(target: LOG_TARGET, "spawning a new prepare worker");
223			metrics.prepare_worker().on_begin_spawn();
224			mux.push(
225				spawn_worker_task(
226					program_path.to_owned(),
227					cache_path.to_owned(),
228					spawn_timeout,
229					node_version,
230					security_status,
231				)
232				.boxed(),
233			);
234		},
235		ToPool::StartWork { worker, pvf, cache_path } => {
236			if let Some(data) = spawned.get_mut(worker) {
237				if let Some(idle) = data.idle.take() {
238					let preparation_timer = metrics.time_preparation();
239					mux.push(
240						start_work_task(
241							metrics.clone(),
242							worker,
243							idle,
244							pvf,
245							cache_path,
246							preparation_timer,
247						)
248						.boxed(),
249					);
250				} else {
251					// idle token is present after spawn and after a job is concluded;
252					// the precondition for `StartWork` is it should be sent only if all previous
253					// work items concluded;
254					// thus idle token is Some;
255					// qed.
256					never!("unexpected absence of the idle token in prepare pool");
257				}
258			} else {
259				// That's a relatively normal situation since the queue may send `start_work` and
260				// before receiving it the pool would report that the worker died.
261			}
262		},
263		ToPool::Kill(worker) => {
264			gum::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
265			// It may be absent if it were previously already removed by `purge_dead`.
266			let _ = attempt_retire(metrics, spawned, worker);
267		},
268	}
269}
270
271async fn spawn_worker_task(
272	program_path: PathBuf,
273	cache_path: PathBuf,
274	spawn_timeout: Duration,
275	node_version: Option<String>,
276	security_status: SecurityStatus,
277) -> PoolEvent {
278	use futures_timer::Delay;
279
280	loop {
281		match worker_interface::spawn(
282			&program_path,
283			&cache_path,
284			spawn_timeout,
285			node_version.as_deref(),
286			security_status.clone(),
287		)
288		.await
289		{
290			Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
291			Err(err) => {
292				gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
293
294				// Assume that the failure intermittent and retry after a delay.
295				Delay::new(Duration::from_secs(3)).await;
296			},
297		}
298	}
299}
300
301async fn start_work_task<Timer>(
302	metrics: Metrics,
303	worker: Worker,
304	idle: IdleWorker,
305	pvf: PvfPrepData,
306	cache_path: PathBuf,
307	_preparation_timer: Option<Timer>,
308) -> PoolEvent {
309	let outcome = worker_interface::start_work(&metrics, idle, pvf, cache_path).await;
310	PoolEvent::StartWork(worker, outcome)
311}
312
313fn handle_mux(
314	metrics: &Metrics,
315	from_pool: &mut mpsc::UnboundedSender<FromPool>,
316	spawned: &mut HopSlotMap<Worker, WorkerData>,
317	event: PoolEvent,
318) -> Result<(), Fatal> {
319	match event {
320		PoolEvent::Spawn(idle, handle) => {
321			metrics.prepare_worker().on_spawned();
322
323			let worker = spawned.insert(WorkerData { idle: Some(idle), handle });
324
325			reply(from_pool, FromPool::Spawned(worker))?;
326
327			Ok(())
328		},
329		PoolEvent::StartWork(worker, outcome) => {
330			// If we receive an outcome that the worker is unreachable or that an error occurred on
331			// the worker, we attempt to kill the worker process.
332			match outcome {
333				Outcome::Concluded { worker: idle, result } =>
334					handle_concluded_no_rip(from_pool, spawned, worker, idle, result),
335				// Return `Concluded`, but do not kill the worker since the error was on the host
336				// side.
337				Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip(
338					from_pool,
339					spawned,
340					worker,
341					idle,
342					Err(PrepareError::CreateTmpFile(err)),
343				),
344				// Return `Concluded`, but do not kill the worker since the error was on the host
345				// side.
346				Outcome::RenameTmpFile { worker: idle, err, src, dest } => handle_concluded_no_rip(
347					from_pool,
348					spawned,
349					worker,
350					idle,
351					Err(PrepareError::RenameTmpFile { err, src, dest }),
352				),
353				// Could not clear worker cache. Kill the worker so other jobs can't see the data.
354				Outcome::ClearWorkerDir { err } => {
355					if attempt_retire(metrics, spawned, worker) {
356						reply(
357							from_pool,
358							FromPool::Concluded {
359								worker,
360								rip: true,
361								result: Err(PrepareError::ClearWorkerDir(err)),
362							},
363						)?;
364					}
365
366					Ok(())
367				},
368				Outcome::Unreachable => {
369					if attempt_retire(metrics, spawned, worker) {
370						reply(from_pool, FromPool::Rip(worker))?;
371					}
372
373					Ok(())
374				},
375				Outcome::IoErr(err) => {
376					if attempt_retire(metrics, spawned, worker) {
377						reply(
378							from_pool,
379							FromPool::Concluded {
380								worker,
381								rip: true,
382								result: Err(PrepareError::IoErr(err)),
383							},
384						)?;
385					}
386
387					Ok(())
388				},
389				// The worker might still be usable, but we kill it just in case.
390				Outcome::JobDied { err, job_pid } => {
391					if attempt_retire(metrics, spawned, worker) {
392						reply(
393							from_pool,
394							FromPool::Concluded {
395								worker,
396								rip: true,
397								result: Err(PrepareError::JobDied { err, job_pid }),
398							},
399						)?;
400					}
401
402					Ok(())
403				},
404				Outcome::TimedOut => {
405					if attempt_retire(metrics, spawned, worker) {
406						reply(
407							from_pool,
408							FromPool::Concluded {
409								worker,
410								rip: true,
411								result: Err(PrepareError::TimedOut),
412							},
413						)?;
414					}
415
416					Ok(())
417				},
418				Outcome::OutOfMemory => {
419					if attempt_retire(metrics, spawned, worker) {
420						reply(
421							from_pool,
422							FromPool::Concluded {
423								worker,
424								rip: true,
425								result: Err(PrepareError::OutOfMemory),
426							},
427						)?;
428					}
429
430					Ok(())
431				},
432			}
433		},
434	}
435}
436
437fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result<(), Fatal> {
438	from_pool.unbounded_send(m).map_err(|_| Fatal)
439}
440
441/// Removes the given worker from the registry if it there. This will lead to dropping and hence
442/// to killing the worker process.
443///
444/// Returns `true` if the worker exists and was removed and the process was killed.
445///
446/// This function takes care about counting the retired workers metric.
447fn attempt_retire(
448	metrics: &Metrics,
449	spawned: &mut HopSlotMap<Worker, WorkerData>,
450	worker: Worker,
451) -> bool {
452	if spawned.remove(worker).is_some() {
453		metrics.prepare_worker().on_retired();
454		true
455	} else {
456		false
457	}
458}
459
460/// Handles the case where we received a response. There potentially was an error, but not the fault
461/// of the worker as far as we know, so the worker should not be killed.
462///
463/// This function tries to put the idle worker back into the pool and then replies with
464/// `FromPool::Concluded` with `rip: false`.
465fn handle_concluded_no_rip(
466	from_pool: &mut mpsc::UnboundedSender<FromPool>,
467	spawned: &mut HopSlotMap<Worker, WorkerData>,
468	worker: Worker,
469	idle: IdleWorker,
470	result: PrepareResult,
471) -> Result<(), Fatal> {
472	let data = match spawned.get_mut(worker) {
473		None => {
474			// Perhaps the worker was killed meanwhile and the result is no longer relevant. We
475			// already send `Rip` when purging if we detect that the worker is dead.
476			return Ok(())
477		},
478		Some(data) => data,
479	};
480
481	// We just replace the idle worker that was loaned from this option during
482	// the work starting.
483	let old = data.idle.replace(idle);
484	never!(
485		old.is_some(),
486		"old idle worker was taken out when starting work; we only replace it here; qed"
487	);
488
489	reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
490
491	Ok(())
492}
493
494/// Spins up the pool and returns the future that should be polled to make the pool functional.
495pub fn start(
496	metrics: Metrics,
497	program_path: PathBuf,
498	cache_path: PathBuf,
499	spawn_timeout: Duration,
500	node_version: Option<String>,
501	security_status: SecurityStatus,
502) -> (mpsc::Sender<ToPool>, mpsc::UnboundedReceiver<FromPool>, impl Future<Output = ()>) {
503	let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
504	let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
505
506	let run = run(Pool {
507		metrics,
508		program_path,
509		cache_path,
510		spawn_timeout,
511		node_version,
512		security_status,
513		to_pool: to_pool_rx,
514		from_pool: from_pool_tx,
515		spawned: HopSlotMap::with_capacity_and_key(20),
516		mux: Mux::new(),
517	});
518
519	(to_pool_tx, from_pool_rx, run)
520}