referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf/prepare/
queue.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A queue that handles requests for PVF preparation.
18
19use super::pool::{self, Worker};
20use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, LOG_TARGET};
21use always_assert::{always, never};
22use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
23use polkadot_node_core_pvf_common::{error::PrepareResult, pvf::PvfPrepData};
24use std::{
25	collections::{HashMap, VecDeque},
26	path::PathBuf,
27};
28
29#[cfg(test)]
30use std::time::Duration;
31
32/// A request to pool.
33#[derive(Debug)]
34pub enum ToQueue {
35	/// This schedules preparation of the given PVF.
36	///
37	/// Note that it is incorrect to enqueue the same PVF again without first receiving the
38	/// [`FromQueue`] response.
39	Enqueue { priority: Priority, pvf: PvfPrepData },
40}
41
42/// A response from queue.
43#[derive(Debug)]
44pub struct FromQueue {
45	/// Identifier of an artifact.
46	pub(crate) artifact_id: ArtifactId,
47	/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
48	/// is successfully stored on disk. Otherwise, an
49	/// [error](polkadot_node_core_pvf_common::error::PrepareError) is supplied.
50	pub(crate) result: PrepareResult,
51}
52
53#[derive(Default)]
54struct Limits {
55	/// The maximum number of workers this pool can ever host. This is expected to be a small
56	/// number, e.g. within a dozen.
57	hard_capacity: usize,
58
59	/// The number of workers we want aim to have. If there is a critical job and we are already
60	/// at `soft_capacity`, we are allowed to grow up to `hard_capacity`. Thus this should be equal
61	/// or smaller than `hard_capacity`.
62	soft_capacity: usize,
63}
64
65impl Limits {
66	/// Returns `true` if the queue is allowed to request one more worker.
67	fn can_afford_one_more(&self, spawned_num: usize, critical: bool) -> bool {
68		let cap = if critical { self.hard_capacity } else { self.soft_capacity };
69		spawned_num < cap
70	}
71
72	/// Offer the worker back to the pool. The passed worker ID must be considered unusable unless
73	/// it wasn't taken by the pool, in which case it will be returned as `Some`.
74	fn should_cull(&mut self, spawned_num: usize) -> bool {
75		spawned_num > self.soft_capacity
76	}
77}
78
79slotmap::new_key_type! { pub struct Job; }
80
81struct JobData {
82	/// The priority of this job. Can be bumped.
83	priority: Priority,
84	pvf: PvfPrepData,
85	worker: Option<Worker>,
86}
87
88#[derive(Default)]
89struct WorkerData {
90	job: Option<Job>,
91}
92
93impl WorkerData {
94	fn is_idle(&self) -> bool {
95		self.job.is_none()
96	}
97}
98
99/// A queue structured like this is prone to starving, however, we don't care that much since we
100/// expect there is going to be a limited number of critical jobs and we don't really care if
101/// background starve.
102#[derive(Default)]
103struct Unscheduled {
104	normal: VecDeque<Job>,
105	critical: VecDeque<Job>,
106}
107
108impl Unscheduled {
109	fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
110		match prio {
111			Priority::Normal => &mut self.normal,
112			Priority::Critical => &mut self.critical,
113		}
114	}
115
116	fn add(&mut self, prio: Priority, job: Job) {
117		self.queue_mut(prio).push_back(job);
118	}
119
120	fn readd(&mut self, prio: Priority, job: Job) {
121		self.queue_mut(prio).push_front(job);
122	}
123
124	fn is_empty(&self) -> bool {
125		self.normal.is_empty() && self.critical.is_empty()
126	}
127
128	fn next(&mut self) -> Option<Job> {
129		let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
130		check(Priority::Critical).or_else(|| check(Priority::Normal))
131	}
132}
133
134struct Queue {
135	metrics: Metrics,
136
137	to_queue_rx: mpsc::Receiver<ToQueue>,
138	from_queue_tx: mpsc::UnboundedSender<FromQueue>,
139
140	to_pool_tx: mpsc::Sender<pool::ToPool>,
141	from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
142
143	cache_path: PathBuf,
144	limits: Limits,
145
146	jobs: slotmap::SlotMap<Job, JobData>,
147
148	/// A mapping from artifact id to a job.
149	artifact_id_to_job: HashMap<ArtifactId, Job>,
150	/// The registry of all workers.
151	workers: slotmap::SparseSecondaryMap<Worker, WorkerData>,
152	/// The number of workers requested to spawn but not yet spawned.
153	spawn_inflight: usize,
154
155	/// The jobs that are not yet scheduled. These are waiting until the next `poll` where they are
156	/// processed all at once.
157	unscheduled: Unscheduled,
158}
159
160/// A fatal error that warrants stopping the queue.
161struct Fatal;
162
163impl Queue {
164	fn new(
165		metrics: Metrics,
166		soft_capacity: usize,
167		hard_capacity: usize,
168		cache_path: PathBuf,
169		to_queue_rx: mpsc::Receiver<ToQueue>,
170		from_queue_tx: mpsc::UnboundedSender<FromQueue>,
171		to_pool_tx: mpsc::Sender<pool::ToPool>,
172		from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
173	) -> Self {
174		Self {
175			metrics,
176			to_queue_rx,
177			from_queue_tx,
178			to_pool_tx,
179			from_pool_rx,
180			cache_path,
181			spawn_inflight: 0,
182			limits: Limits { hard_capacity, soft_capacity },
183			jobs: slotmap::SlotMap::with_key(),
184			unscheduled: Unscheduled::default(),
185			artifact_id_to_job: HashMap::new(),
186			workers: slotmap::SparseSecondaryMap::new(),
187		}
188	}
189
190	async fn run(mut self) {
191		macro_rules! break_if_fatal {
192			($expr:expr) => {
193				if let Err(Fatal) = $expr {
194					break
195				}
196			};
197		}
198
199		loop {
200			// biased to make it behave deterministically for tests.
201			futures::select_biased! {
202				to_queue = self.to_queue_rx.select_next_some() =>
203					break_if_fatal!(handle_to_queue(&mut self, to_queue).await),
204				from_pool = self.from_pool_rx.select_next_some() =>
205					break_if_fatal!(handle_from_pool(&mut self, from_pool).await),
206			}
207		}
208	}
209}
210
211async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
212	match to_queue {
213		ToQueue::Enqueue { priority, pvf } => {
214			handle_enqueue(queue, priority, pvf).await?;
215		},
216	}
217	Ok(())
218}
219
220async fn handle_enqueue(
221	queue: &mut Queue,
222	priority: Priority,
223	pvf: PvfPrepData,
224) -> Result<(), Fatal> {
225	gum::debug!(
226		target: LOG_TARGET,
227		validation_code_hash = ?pvf.code_hash(),
228		?priority,
229		preparation_timeout = ?pvf.prep_timeout(),
230		"PVF is enqueued for preparation.",
231	);
232	queue.metrics.prepare_enqueued();
233
234	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
235	if never!(
236		queue.artifact_id_to_job.contains_key(&artifact_id),
237		"second Enqueue sent for a known artifact"
238	) {
239		// This function is called in response to a `Enqueue` message;
240		// Precondition for `Enqueue` is that it is sent only once for a PVF;
241		// Thus this should always be `false`;
242		// qed.
243		gum::warn!(
244			target: LOG_TARGET,
245			"duplicate `enqueue` command received for {:?}",
246			artifact_id,
247		);
248		return Ok(())
249	}
250
251	let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
252	queue.artifact_id_to_job.insert(artifact_id, job);
253
254	if let Some(available) = find_idle_worker(queue) {
255		// This may seem not fair (w.r.t priority) on the first glance, but it should be. This is
256		// because as soon as a worker finishes with the job it's immediately given the next one.
257		assign(queue, available, job).await?;
258	} else {
259		spawn_extra_worker(queue, priority.is_critical()).await?;
260		queue.unscheduled.add(priority, job);
261	}
262
263	Ok(())
264}
265
266fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
267	queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
268}
269
270async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
271	use pool::FromPool;
272	match from_pool {
273		FromPool::Spawned(worker) => handle_worker_spawned(queue, worker).await?,
274		FromPool::Concluded { worker, rip, result } =>
275			handle_worker_concluded(queue, worker, rip, result).await?,
276		FromPool::Rip(worker) => handle_worker_rip(queue, worker).await?,
277	}
278	Ok(())
279}
280
281async fn handle_worker_spawned(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
282	queue.workers.insert(worker, WorkerData::default());
283	queue.spawn_inflight -= 1;
284
285	if let Some(job) = queue.unscheduled.next() {
286		assign(queue, worker, job).await?;
287	}
288
289	Ok(())
290}
291
292async fn handle_worker_concluded(
293	queue: &mut Queue,
294	worker: Worker,
295	rip: bool,
296	result: PrepareResult,
297) -> Result<(), Fatal> {
298	queue.metrics.prepare_concluded();
299
300	macro_rules! never_none {
301		($expr:expr) => {
302			match $expr {
303				Some(v) => v,
304				None => {
305					// Precondition of calling this is that the `$expr` is never none;
306					// Assume the conditions holds, then this never is not hit;
307					// qed.
308					never!("never_none, {}", stringify!($expr));
309					return Ok(())
310				},
311			}
312		};
313	}
314
315	// Find out on which artifact was the worker working.
316
317	// workers are registered upon spawn and removed in one of the following cases:
318	//   1. received rip signal
319	//   2. received concluded signal with rip=true;
320	// concluded signal only comes from a spawned worker and only once;
321	// rip signal is not sent after conclusion with rip=true;
322	// the worker should be registered;
323	// this can't be None;
324	// qed.
325	let worker_data = never_none!(queue.workers.get_mut(worker));
326
327	// worker_data.job is set only by `assign` and removed only here for a worker;
328	// concluded signal only comes for a worker that was previously assigned and only once;
329	// the worker should have the job;
330	// this can't be None;
331	// qed.
332	let job = never_none!(worker_data.job.take());
333
334	// job_data is inserted upon enqueue and removed only here;
335	// as was established above, this worker was previously `assign`ed to the job;
336	// that implies that the job was enqueued;
337	// conclude signal only comes once;
338	// we are just to remove the job for the first and the only time;
339	// this can't be None;
340	// qed.
341	let job_data = never_none!(queue.jobs.remove(job));
342	let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf);
343
344	queue.artifact_id_to_job.remove(&artifact_id);
345
346	gum::debug!(
347		target: LOG_TARGET,
348		validation_code_hash = ?artifact_id.code_hash,
349		?worker,
350		?rip,
351		"prepare worker concluded",
352	);
353
354	reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;
355
356	// Figure out what to do with the worker.
357	if rip {
358		let worker_data = queue.workers.remove(worker);
359		// worker should exist, it's asserted above;
360		// qed.
361		always!(worker_data.is_some());
362
363		if !queue.unscheduled.is_empty() {
364			// That is unconditionally not critical just to not accidentally fill up
365			// the pool up to the hard cap.
366			spawn_extra_worker(queue, false).await?;
367		}
368	} else if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) {
369		// We no longer need services of this worker. Kill it.
370		queue.workers.remove(worker);
371		send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
372	} else {
373		// see if there are more work available and schedule it.
374		if let Some(job) = queue.unscheduled.next() {
375			assign(queue, worker, job).await?;
376		}
377	}
378
379	Ok(())
380}
381
382async fn handle_worker_rip(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
383	gum::debug!(target: LOG_TARGET, ?worker, "prepare worker ripped");
384
385	let worker_data = queue.workers.remove(worker);
386	if let Some(WorkerData { job: Some(job), .. }) = worker_data {
387		// This is an edge case where the worker ripped after we sent assignment but before it
388		// was received by the pool.
389		let priority = queue.jobs.get(job).map(|data| data.priority).unwrap_or_else(|| {
390			// job is inserted upon enqueue and removed on concluded signal;
391			// this is enclosed in the if statement that narrows the situation to before
392			// conclusion;
393			// that means that the job still exists and is known;
394			// this path cannot be hit;
395			// qed.
396			never!("the job of the ripped worker must be known but it is not");
397			Priority::Normal
398		});
399		queue.unscheduled.readd(priority, job);
400	}
401
402	// If there are still jobs left, spawn another worker to replace the ripped one (but only if it
403	// was indeed removed). That is unconditionally not critical just to not accidentally fill up
404	// the pool up to the hard cap.
405	if worker_data.is_some() && !queue.unscheduled.is_empty() {
406		spawn_extra_worker(queue, false).await?;
407	}
408	Ok(())
409}
410
411/// Spawns an extra worker if possible.
412async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fatal> {
413	if queue
414		.limits
415		.can_afford_one_more(queue.workers.len() + queue.spawn_inflight, critical)
416	{
417		queue.spawn_inflight += 1;
418		send_pool(&mut queue.to_pool_tx, pool::ToPool::Spawn).await?;
419	}
420
421	Ok(())
422}
423
424/// Attaches the work to the given worker telling the poll about the job.
425async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
426	let job_data = &mut queue.jobs[job];
427	job_data.worker = Some(worker);
428
429	queue.workers[worker].job = Some(job);
430
431	send_pool(
432		&mut queue.to_pool_tx,
433		pool::ToPool::StartWork {
434			worker,
435			pvf: job_data.pvf.clone(),
436			cache_path: queue.cache_path.clone(),
437		},
438	)
439	.await?;
440
441	Ok(())
442}
443
444fn reply(from_queue_tx: &mut mpsc::UnboundedSender<FromQueue>, m: FromQueue) -> Result<(), Fatal> {
445	from_queue_tx.unbounded_send(m).map_err(|_| {
446		// The host has hung up and thus it's fatal and we should shutdown ourselves.
447		Fatal
448	})
449}
450
451async fn send_pool(
452	to_pool_tx: &mut mpsc::Sender<pool::ToPool>,
453	m: pool::ToPool,
454) -> Result<(), Fatal> {
455	to_pool_tx.send(m).await.map_err(|_| {
456		// The pool has hung up and thus we are no longer are able to fulfill our duties. Shutdown.
457		Fatal
458	})
459}
460
461/// Spins up the queue and returns the future that should be polled to make the queue functional.
462pub fn start(
463	metrics: Metrics,
464	soft_capacity: usize,
465	hard_capacity: usize,
466	cache_path: PathBuf,
467	to_pool_tx: mpsc::Sender<pool::ToPool>,
468	from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
469) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
470	let (to_queue_tx, to_queue_rx) = mpsc::channel(150);
471	let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
472
473	let run = Queue::new(
474		metrics,
475		soft_capacity,
476		hard_capacity,
477		cache_path,
478		to_queue_rx,
479		from_queue_tx,
480		to_pool_tx,
481		from_pool_rx,
482	)
483	.run();
484
485	(to_queue_tx, from_queue_rx, run)
486}
487
488#[cfg(test)]
489mod tests {
490	use super::*;
491	use crate::host::tests::TEST_PREPARATION_TIMEOUT;
492	use assert_matches::assert_matches;
493	use futures::{future::BoxFuture, FutureExt};
494	use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareSuccess};
495	use slotmap::SlotMap;
496	use std::task::Poll;
497
498	/// Creates a new PVF which artifact id can be uniquely identified by the given number.
499	fn pvf(discriminator: u32) -> PvfPrepData {
500		PvfPrepData::from_discriminator(discriminator)
501	}
502
503	async fn run_until<R>(
504		task: &mut (impl Future<Output = ()> + Unpin),
505		mut fut: (impl Future<Output = R> + Unpin),
506	) -> R {
507		let start = std::time::Instant::now();
508		let fut = &mut fut;
509		loop {
510			if start.elapsed() > std::time::Duration::from_secs(1) {
511				// We expect that this will take only a couple of iterations and thus to take way
512				// less than a second.
513				panic!("timeout");
514			}
515
516			if let Poll::Ready(r) = futures::poll!(&mut *fut) {
517				break r
518			}
519
520			if futures::poll!(&mut *task).is_ready() {
521				panic!()
522			}
523		}
524	}
525
526	struct Test {
527		_tempdir: tempfile::TempDir,
528		run: BoxFuture<'static, ()>,
529		workers: SlotMap<Worker, ()>,
530		from_pool_tx: mpsc::UnboundedSender<pool::FromPool>,
531		to_pool_rx: mpsc::Receiver<pool::ToPool>,
532		to_queue_tx: mpsc::Sender<ToQueue>,
533		from_queue_rx: mpsc::UnboundedReceiver<FromQueue>,
534	}
535
536	impl Test {
537		fn new(soft_capacity: usize, hard_capacity: usize) -> Self {
538			let tempdir = tempfile::tempdir().unwrap();
539
540			let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
541			let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
542
543			let workers: SlotMap<Worker, ()> = SlotMap::with_key();
544
545			let (to_queue_tx, from_queue_rx, run) = start(
546				Metrics::default(),
547				soft_capacity,
548				hard_capacity,
549				tempdir.path().to_owned().into(),
550				to_pool_tx,
551				from_pool_rx,
552			);
553
554			Self {
555				_tempdir: tempdir,
556				run: run.boxed(),
557				workers,
558				from_pool_tx,
559				to_pool_rx,
560				to_queue_tx,
561				from_queue_rx,
562			}
563		}
564
565		fn send_queue(&mut self, to_queue: ToQueue) {
566			self.to_queue_tx.send(to_queue).now_or_never().unwrap().unwrap();
567		}
568
569		async fn poll_and_recv_from_queue(&mut self) -> FromQueue {
570			let from_queue_rx = &mut self.from_queue_rx;
571			run_until(&mut self.run, async { from_queue_rx.next().await.unwrap() }.boxed()).await
572		}
573
574		fn send_from_pool(&mut self, from_pool: pool::FromPool) {
575			self.from_pool_tx.send(from_pool).now_or_never().unwrap().unwrap();
576		}
577
578		async fn poll_and_recv_to_pool(&mut self) -> pool::ToPool {
579			let to_pool_rx = &mut self.to_pool_rx;
580			run_until(&mut self.run, async { to_pool_rx.next().await.unwrap() }.boxed()).await
581		}
582
583		async fn poll_ensure_to_pool_is_empty(&mut self) {
584			use futures_timer::Delay;
585
586			let to_pool_rx = &mut self.to_pool_rx;
587			run_until(
588				&mut self.run,
589				async {
590					futures::select! {
591						_ = Delay::new(Duration::from_millis(500)).fuse() => (),
592						_ = to_pool_rx.next().fuse() => {
593							panic!("to pool supposed to be empty")
594						}
595					}
596				}
597				.boxed(),
598			)
599			.await
600		}
601	}
602
603	#[tokio::test]
604	async fn properly_concludes() {
605		let mut test = Test::new(2, 2);
606
607		test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
608		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
609
610		let w = test.workers.insert(());
611		test.send_from_pool(pool::FromPool::Spawned(w));
612		test.send_from_pool(pool::FromPool::Concluded {
613			worker: w,
614			rip: false,
615			result: Ok(PrepareSuccess::default()),
616		});
617
618		assert_eq!(
619			test.poll_and_recv_from_queue().await.artifact_id,
620			ArtifactId::from_pvf_prep_data(&pvf(1))
621		);
622	}
623
624	#[tokio::test]
625	async fn dont_spawn_over_soft_limit_unless_critical() {
626		let mut test = Test::new(2, 3);
627
628		let priority = Priority::Normal;
629		test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
630		test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
631		// Start a non-precheck preparation for this one.
632		test.send_queue(ToQueue::Enqueue {
633			priority,
634			pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
635		});
636
637		// Receive only two spawns.
638		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
639		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
640
641		let w1 = test.workers.insert(());
642		let w2 = test.workers.insert(());
643
644		test.send_from_pool(pool::FromPool::Spawned(w1));
645		test.send_from_pool(pool::FromPool::Spawned(w2));
646
647		// Get two start works.
648		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
649		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
650
651		test.send_from_pool(pool::FromPool::Concluded {
652			worker: w1,
653			rip: false,
654			result: Ok(PrepareSuccess::default()),
655		});
656
657		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
658
659		// Enqueue a critical job.
660		test.send_queue(ToQueue::Enqueue {
661			priority: Priority::Critical,
662			pvf: PvfPrepData::from_discriminator(4),
663		});
664
665		// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
666		// another worker is warranted.
667		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
668	}
669
670	#[tokio::test]
671	async fn cull_unwanted() {
672		let mut test = Test::new(1, 2);
673
674		test.send_queue(ToQueue::Enqueue {
675			priority: Priority::Normal,
676			pvf: PvfPrepData::from_discriminator(1),
677		});
678		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
679		let w1 = test.workers.insert(());
680		test.send_from_pool(pool::FromPool::Spawned(w1));
681		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
682
683		// Enqueue a critical job, which warrants spawning over the soft limit.
684		test.send_queue(ToQueue::Enqueue {
685			priority: Priority::Critical,
686			pvf: PvfPrepData::from_discriminator(2),
687		});
688		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
689
690		// However, before the new worker had a chance to spawn, the first worker finishes with its
691		// job. The old worker will be killed while the new worker will be let live, even though
692		// it's not instantiated.
693		//
694		// That's a bit silly in this context, but in production there will be an entire pool up
695		// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
696		// we just check that edge case of an edge case works.
697		test.send_from_pool(pool::FromPool::Concluded {
698			worker: w1,
699			rip: false,
700			result: Ok(PrepareSuccess::default()),
701		});
702		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
703	}
704
705	#[tokio::test]
706	async fn worker_mass_die_out_doesnt_stall_queue() {
707		let mut test = Test::new(2, 2);
708
709		let priority = Priority::Normal;
710		test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
711		test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
712		// Start a non-precheck preparation for this one.
713		test.send_queue(ToQueue::Enqueue {
714			priority,
715			pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
716		});
717
718		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
719		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
720
721		let w1 = test.workers.insert(());
722		let w2 = test.workers.insert(());
723
724		test.send_from_pool(pool::FromPool::Spawned(w1));
725		test.send_from_pool(pool::FromPool::Spawned(w2));
726
727		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
728		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
729
730		// Conclude worker 1 and rip it.
731		test.send_from_pool(pool::FromPool::Concluded {
732			worker: w1,
733			rip: true,
734			result: Ok(PrepareSuccess::default()),
735		});
736
737		// Since there is still work, the queue requested one extra worker to spawn to handle the
738		// remaining enqueued work items.
739		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
740		assert_eq!(
741			test.poll_and_recv_from_queue().await.artifact_id,
742			ArtifactId::from_pvf_prep_data(&pvf(1))
743		);
744	}
745
746	#[tokio::test]
747	async fn doesnt_resurrect_ripped_worker_if_no_work() {
748		let mut test = Test::new(2, 2);
749
750		test.send_queue(ToQueue::Enqueue {
751			priority: Priority::Normal,
752			pvf: PvfPrepData::from_discriminator(1),
753		});
754
755		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
756
757		let w1 = test.workers.insert(());
758		test.send_from_pool(pool::FromPool::Spawned(w1));
759
760		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
761
762		test.send_from_pool(pool::FromPool::Concluded {
763			worker: w1,
764			rip: true,
765			result: Err(PrepareError::IoErr("test".into())),
766		});
767		test.poll_ensure_to_pool_is_empty().await;
768	}
769
770	#[tokio::test]
771	async fn rip_for_start_work() {
772		let mut test = Test::new(2, 2);
773
774		test.send_queue(ToQueue::Enqueue {
775			priority: Priority::Normal,
776			pvf: PvfPrepData::from_discriminator(1),
777		});
778
779		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
780
781		let w1 = test.workers.insert(());
782		test.send_from_pool(pool::FromPool::Spawned(w1));
783
784		// Now, to the interesting part. After the queue normally issues the `start_work` command to
785		// the pool, before receiving the command the queue may report that the worker ripped.
786		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
787		test.send_from_pool(pool::FromPool::Rip(w1));
788
789		// In this case, the pool should spawn a new worker and request it to work on the item.
790		assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
791
792		let w2 = test.workers.insert(());
793		test.send_from_pool(pool::FromPool::Spawned(w2));
794		assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
795	}
796}