use super::pool::{self, Worker};
use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, LOG_TARGET};
use always_assert::{always, never};
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use polkadot_node_core_pvf_common::{error::PrepareResult, pvf::PvfPrepData};
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
};
#[cfg(test)]
use std::time::Duration;
#[derive(Debug)]
pub enum ToQueue {
Enqueue { priority: Priority, pvf: PvfPrepData },
}
#[derive(Debug)]
pub struct FromQueue {
pub(crate) artifact_id: ArtifactId,
pub(crate) result: PrepareResult,
}
#[derive(Default)]
struct Limits {
hard_capacity: usize,
soft_capacity: usize,
}
impl Limits {
fn can_afford_one_more(&self, spawned_num: usize, critical: bool) -> bool {
let cap = if critical { self.hard_capacity } else { self.soft_capacity };
spawned_num < cap
}
fn should_cull(&mut self, spawned_num: usize) -> bool {
spawned_num > self.soft_capacity
}
}
slotmap::new_key_type! { pub struct Job; }
struct JobData {
priority: Priority,
pvf: PvfPrepData,
worker: Option<Worker>,
}
#[derive(Default)]
struct WorkerData {
job: Option<Job>,
}
impl WorkerData {
fn is_idle(&self) -> bool {
self.job.is_none()
}
}
#[derive(Default)]
struct Unscheduled {
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}
impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
}
fn add(&mut self, prio: Priority, job: Job) {
self.queue_mut(prio).push_back(job);
}
fn readd(&mut self, prio: Priority, job: Job) {
self.queue_mut(prio).push_front(job);
}
fn is_empty(&self) -> bool {
self.normal.is_empty() && self.critical.is_empty()
}
fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical).or_else(|| check(Priority::Normal))
}
}
struct Queue {
metrics: Metrics,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
cache_path: PathBuf,
limits: Limits,
jobs: slotmap::SlotMap<Job, JobData>,
artifact_id_to_job: HashMap<ArtifactId, Job>,
workers: slotmap::SparseSecondaryMap<Worker, WorkerData>,
spawn_inflight: usize,
unscheduled: Unscheduled,
}
struct Fatal;
impl Queue {
fn new(
metrics: Metrics,
soft_capacity: usize,
hard_capacity: usize,
cache_path: PathBuf,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
) -> Self {
Self {
metrics,
to_queue_rx,
from_queue_tx,
to_pool_tx,
from_pool_rx,
cache_path,
spawn_inflight: 0,
limits: Limits { hard_capacity, soft_capacity },
jobs: slotmap::SlotMap::with_key(),
unscheduled: Unscheduled::default(),
artifact_id_to_job: HashMap::new(),
workers: slotmap::SparseSecondaryMap::new(),
}
}
async fn run(mut self) {
macro_rules! break_if_fatal {
($expr:expr) => {
if let Err(Fatal) = $expr {
break
}
};
}
loop {
futures::select_biased! {
to_queue = self.to_queue_rx.select_next_some() =>
break_if_fatal!(handle_to_queue(&mut self, to_queue).await),
from_pool = self.from_pool_rx.select_next_some() =>
break_if_fatal!(handle_from_pool(&mut self, from_pool).await),
}
}
}
}
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
},
}
Ok(())
}
async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
pvf: PvfPrepData,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash(),
?priority,
preparation_timeout = ?pvf.prep_timeout(),
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
if never!(
queue.artifact_id_to_job.contains_key(&artifact_id),
"second Enqueue sent for a known artifact"
) {
gum::warn!(
target: LOG_TARGET,
"duplicate `enqueue` command received for {:?}",
artifact_id,
);
return Ok(())
}
let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job);
if let Some(available) = find_idle_worker(queue) {
assign(queue, available, job).await?;
} else {
spawn_extra_worker(queue, priority.is_critical()).await?;
queue.unscheduled.add(priority, job);
}
Ok(())
}
fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
}
async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool;
match from_pool {
FromPool::Spawned(worker) => handle_worker_spawned(queue, worker).await?,
FromPool::Concluded { worker, rip, result } =>
handle_worker_concluded(queue, worker, rip, result).await?,
FromPool::Rip(worker) => handle_worker_rip(queue, worker).await?,
}
Ok(())
}
async fn handle_worker_spawned(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
queue.workers.insert(worker, WorkerData::default());
queue.spawn_inflight -= 1;
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
Ok(())
}
async fn handle_worker_concluded(
queue: &mut Queue,
worker: Worker,
rip: bool,
result: PrepareResult,
) -> Result<(), Fatal> {
queue.metrics.prepare_concluded();
macro_rules! never_none {
($expr:expr) => {
match $expr {
Some(v) => v,
None => {
never!("never_none, {}", stringify!($expr));
return Ok(())
},
}
};
}
let worker_data = never_none!(queue.workers.get_mut(worker));
let job = never_none!(worker_data.job.take());
let job_data = never_none!(queue.jobs.remove(job));
let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf);
queue.artifact_id_to_job.remove(&artifact_id);
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?worker,
?rip,
"prepare worker concluded",
);
reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;
if rip {
let worker_data = queue.workers.remove(worker);
always!(worker_data.is_some());
if !queue.unscheduled.is_empty() {
spawn_extra_worker(queue, false).await?;
}
} else if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) {
queue.workers.remove(worker);
send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
} else {
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
}
Ok(())
}
async fn handle_worker_rip(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
gum::debug!(target: LOG_TARGET, ?worker, "prepare worker ripped");
let worker_data = queue.workers.remove(worker);
if let Some(WorkerData { job: Some(job), .. }) = worker_data {
let priority = queue.jobs.get(job).map(|data| data.priority).unwrap_or_else(|| {
never!("the job of the ripped worker must be known but it is not");
Priority::Normal
});
queue.unscheduled.readd(priority, job);
}
if worker_data.is_some() && !queue.unscheduled.is_empty() {
spawn_extra_worker(queue, false).await?;
}
Ok(())
}
async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fatal> {
if queue
.limits
.can_afford_one_more(queue.workers.len() + queue.spawn_inflight, critical)
{
queue.spawn_inflight += 1;
send_pool(&mut queue.to_pool_tx, pool::ToPool::Spawn).await?;
}
Ok(())
}
async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
let job_data = &mut queue.jobs[job];
job_data.worker = Some(worker);
queue.workers[worker].job = Some(job);
send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
pvf: job_data.pvf.clone(),
cache_path: queue.cache_path.clone(),
},
)
.await?;
Ok(())
}
fn reply(from_queue_tx: &mut mpsc::UnboundedSender<FromQueue>, m: FromQueue) -> Result<(), Fatal> {
from_queue_tx.unbounded_send(m).map_err(|_| {
Fatal
})
}
async fn send_pool(
to_pool_tx: &mut mpsc::Sender<pool::ToPool>,
m: pool::ToPool,
) -> Result<(), Fatal> {
to_pool_tx.send(m).await.map_err(|_| {
Fatal
})
}
pub fn start(
metrics: Metrics,
soft_capacity: usize,
hard_capacity: usize,
cache_path: PathBuf,
to_pool_tx: mpsc::Sender<pool::ToPool>,
from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(150);
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
let run = Queue::new(
metrics,
soft_capacity,
hard_capacity,
cache_path,
to_queue_rx,
from_queue_tx,
to_pool_tx,
from_pool_rx,
)
.run();
(to_queue_tx, from_queue_rx, run)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::host::tests::TEST_PREPARATION_TIMEOUT;
use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt};
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareSuccess};
use slotmap::SlotMap;
use std::task::Poll;
fn pvf(discriminator: u32) -> PvfPrepData {
PvfPrepData::from_discriminator(discriminator)
}
async fn run_until<R>(
task: &mut (impl Future<Output = ()> + Unpin),
mut fut: (impl Future<Output = R> + Unpin),
) -> R {
let start = std::time::Instant::now();
let fut = &mut fut;
loop {
if start.elapsed() > std::time::Duration::from_secs(1) {
panic!("timeout");
}
if let Poll::Ready(r) = futures::poll!(&mut *fut) {
break r
}
if futures::poll!(&mut *task).is_ready() {
panic!()
}
}
}
struct Test {
_tempdir: tempfile::TempDir,
run: BoxFuture<'static, ()>,
workers: SlotMap<Worker, ()>,
from_pool_tx: mpsc::UnboundedSender<pool::FromPool>,
to_pool_rx: mpsc::Receiver<pool::ToPool>,
to_queue_tx: mpsc::Sender<ToQueue>,
from_queue_rx: mpsc::UnboundedReceiver<FromQueue>,
}
impl Test {
fn new(soft_capacity: usize, hard_capacity: usize) -> Self {
let tempdir = tempfile::tempdir().unwrap();
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
let workers: SlotMap<Worker, ()> = SlotMap::with_key();
let (to_queue_tx, from_queue_rx, run) = start(
Metrics::default(),
soft_capacity,
hard_capacity,
tempdir.path().to_owned().into(),
to_pool_tx,
from_pool_rx,
);
Self {
_tempdir: tempdir,
run: run.boxed(),
workers,
from_pool_tx,
to_pool_rx,
to_queue_tx,
from_queue_rx,
}
}
fn send_queue(&mut self, to_queue: ToQueue) {
self.to_queue_tx.send(to_queue).now_or_never().unwrap().unwrap();
}
async fn poll_and_recv_from_queue(&mut self) -> FromQueue {
let from_queue_rx = &mut self.from_queue_rx;
run_until(&mut self.run, async { from_queue_rx.next().await.unwrap() }.boxed()).await
}
fn send_from_pool(&mut self, from_pool: pool::FromPool) {
self.from_pool_tx.send(from_pool).now_or_never().unwrap().unwrap();
}
async fn poll_and_recv_to_pool(&mut self) -> pool::ToPool {
let to_pool_rx = &mut self.to_pool_rx;
run_until(&mut self.run, async { to_pool_rx.next().await.unwrap() }.boxed()).await
}
async fn poll_ensure_to_pool_is_empty(&mut self) {
use futures_timer::Delay;
let to_pool_rx = &mut self.to_pool_rx;
run_until(
&mut self.run,
async {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
_ = to_pool_rx.next().fuse() => {
panic!("to pool supposed to be empty")
}
}
}
.boxed(),
)
.await
}
}
#[tokio::test]
async fn properly_concludes() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded {
worker: w,
rip: false,
result: Ok(PrepareSuccess::default()),
});
assert_eq!(
test.poll_and_recv_from_queue().await.artifact_id,
ArtifactId::from_pvf_prep_data(&pvf(1))
);
}
#[tokio::test]
async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3);
let priority = Priority::Normal;
test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
test.send_queue(ToQueue::Enqueue {
priority,
pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
test.send_from_pool(pool::FromPool::Spawned(w2));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(PrepareSuccess::default()),
});
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: PvfPrepData::from_discriminator(4),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
}
#[tokio::test]
async fn cull_unwanted() {
let mut test = Test::new(1, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: PvfPrepData::from_discriminator(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: PvfPrepData::from_discriminator(2),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(PrepareSuccess::default()),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
#[tokio::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
let priority = Priority::Normal;
test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
test.send_queue(ToQueue::Enqueue {
priority,
pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
test.send_from_pool(pool::FromPool::Spawned(w2));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Ok(PrepareSuccess::default()),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(
test.poll_and_recv_from_queue().await.artifact_id,
ArtifactId::from_pvf_prep_data(&pvf(1))
);
}
#[tokio::test]
async fn doesnt_resurrect_ripped_worker_if_no_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: PvfPrepData::from_discriminator(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Err(PrepareError::IoErr("test".into())),
});
test.poll_ensure_to_pool_is_empty().await;
}
#[tokio::test]
async fn rip_for_start_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: PvfPrepData::from_discriminator(1),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Rip(w1));
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w2 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w2));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
}
}