use super::worker_interface::{self, Outcome};
use crate::{
metrics::Metrics,
worker_interface::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
pvf::PvfPrepData,
SecurityStatus,
};
use slotmap::HopSlotMap;
use std::{
fmt,
path::{Path, PathBuf},
task::Poll,
time::Duration,
};
slotmap::new_key_type! { pub struct Worker; }
#[derive(Debug, PartialEq, Eq)]
pub enum ToPool {
Spawn,
Kill(Worker),
StartWork { worker: Worker, pvf: PvfPrepData, cache_path: PathBuf },
}
#[derive(Debug)]
pub enum FromPool {
Spawned(Worker),
Concluded {
worker: Worker,
rip: bool,
result: PrepareResult,
},
Rip(Worker),
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
enum PoolEvent {
Spawn(IdleWorker, WorkerHandle),
StartWork(Worker, Outcome),
}
type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
struct Pool {
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
to_pool: mpsc::Receiver<ToPool>,
from_pool: mpsc::UnboundedSender<FromPool>,
spawned: HopSlotMap<Worker, WorkerData>,
mux: Mux,
metrics: Metrics,
}
struct Fatal;
async fn run(
Pool {
program_path,
cache_path,
spawn_timeout,
node_version,
security_status,
to_pool,
mut from_pool,
mut spawned,
mut mux,
metrics,
}: Pool,
) {
macro_rules! break_if_fatal {
($expr:expr) => {
match $expr {
Err(Fatal) => break,
Ok(v) => v,
}
};
}
let mut to_pool = to_pool.fuse();
loop {
futures::select! {
to_pool = to_pool.next() => {
let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
handle_to_pool(
&metrics,
&program_path,
&cache_path,
spawn_timeout,
node_version.clone(),
security_status.clone(),
&mut spawned,
&mut mux,
to_pool,
)
}
ev = mux.select_next_some() => {
break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev))
}
}
break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await);
}
}
async fn purge_dead(
metrics: &Metrics,
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
) -> Result<(), Fatal> {
let mut to_remove = vec![];
for (worker, data) in spawned.iter_mut() {
if data.idle.is_none() {
continue
}
if let Poll::Ready(()) = futures::poll!(&mut data.handle) {
to_remove.push(worker);
}
}
for w in to_remove {
if attempt_retire(metrics, spawned, w) {
reply(from_pool, FromPool::Rip(w))?;
}
}
Ok(())
}
fn handle_to_pool(
metrics: &Metrics,
program_path: &Path,
cache_path: &Path,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
spawned: &mut HopSlotMap<Worker, WorkerData>,
mux: &mut Mux,
to_pool: ToPool,
) {
match to_pool {
ToPool::Spawn => {
gum::debug!(target: LOG_TARGET, "spawning a new prepare worker");
metrics.prepare_worker().on_begin_spawn();
mux.push(
spawn_worker_task(
program_path.to_owned(),
cache_path.to_owned(),
spawn_timeout,
node_version,
security_status,
)
.boxed(),
);
},
ToPool::StartWork { worker, pvf, cache_path } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
mux.push(
start_work_task(
metrics.clone(),
worker,
idle,
pvf,
cache_path,
preparation_timer,
)
.boxed(),
);
} else {
never!("unexpected absence of the idle token in prepare pool");
}
} else {
}
},
ToPool::Kill(worker) => {
gum::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
let _ = attempt_retire(metrics, spawned, worker);
},
}
}
async fn spawn_worker_task(
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> PoolEvent {
use futures_timer::Delay;
loop {
match worker_interface::spawn(
&program_path,
&cache_path,
spawn_timeout,
node_version.as_deref(),
security_status.clone(),
)
.await
{
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
Delay::new(Duration::from_secs(3)).await;
},
}
}
}
async fn start_work_task<Timer>(
metrics: Metrics,
worker: Worker,
idle: IdleWorker,
pvf: PvfPrepData,
cache_path: PathBuf,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker_interface::start_work(&metrics, idle, pvf, cache_path).await;
PoolEvent::StartWork(worker, outcome)
}
fn handle_mux(
metrics: &Metrics,
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
event: PoolEvent,
) -> Result<(), Fatal> {
match event {
PoolEvent::Spawn(idle, handle) => {
metrics.prepare_worker().on_spawned();
let worker = spawned.insert(WorkerData { idle: Some(idle), handle });
reply(from_pool, FromPool::Spawned(worker))?;
Ok(())
},
PoolEvent::StartWork(worker, outcome) => {
match outcome {
Outcome::Concluded { worker: idle, result } =>
handle_concluded_no_rip(from_pool, spawned, worker, idle, result),
Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::CreateTmpFile(err)),
),
Outcome::RenameTmpFile { worker: idle, err, src, dest } => handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::RenameTmpFile { err, src, dest }),
),
Outcome::ClearWorkerDir { err } => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::ClearWorkerDir(err)),
},
)?;
}
Ok(())
},
Outcome::Unreachable => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Rip(worker))?;
}
Ok(())
},
Outcome::IoErr(err) => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::IoErr(err)),
},
)?;
}
Ok(())
},
Outcome::JobDied { err, job_pid } => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::JobDied { err, job_pid }),
},
)?;
}
Ok(())
},
Outcome::TimedOut => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::TimedOut),
},
)?;
}
Ok(())
},
Outcome::OutOfMemory => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::OutOfMemory),
},
)?;
}
Ok(())
},
}
},
}
}
fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result<(), Fatal> {
from_pool.unbounded_send(m).map_err(|_| Fatal)
}
fn attempt_retire(
metrics: &Metrics,
spawned: &mut HopSlotMap<Worker, WorkerData>,
worker: Worker,
) -> bool {
if spawned.remove(worker).is_some() {
metrics.prepare_worker().on_retired();
true
} else {
false
}
}
fn handle_concluded_no_rip(
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
worker: Worker,
idle: IdleWorker,
result: PrepareResult,
) -> Result<(), Fatal> {
let data = match spawned.get_mut(worker) {
None => {
return Ok(())
},
Some(data) => data,
};
let old = data.idle.replace(idle);
never!(
old.is_some(),
"old idle worker was taken out when starting work; we only replace it here; qed"
);
reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
Ok(())
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToPool>, mpsc::UnboundedReceiver<FromPool>, impl Future<Output = ()>) {
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
let run = run(Pool {
metrics,
program_path,
cache_path,
spawn_timeout,
node_version,
security_status,
to_pool: to_pool_rx,
from_pool: from_pool_tx,
spawned: HopSlotMap::with_capacity_and_key(20),
mux: Mux::new(),
});
(to_pool_tx, from_pool_rx, run)
}