use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse};
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_interface::{IdleWorker, WorkerHandle},
InvalidCandidate, PossiblyInvalidError, ValidationError, LOG_TARGET,
};
use futures::{
channel::{mpsc, oneshot},
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use polkadot_node_core_pvf_common::{
execute::{JobResponse, WorkerError, WorkerResponse},
SecurityStatus,
};
use polkadot_node_primitives::PoV;
use polkadot_node_subsystem::{messages::PvfExecKind, ActiveLeavesUpdate};
use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, Hash, PersistedValidationData};
use slotmap::HopSlotMap;
use std::{
collections::{HashMap, VecDeque},
fmt,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use strum::{EnumIter, IntoEnumIterator};
const MAX_KEEP_WAITING: Duration = Duration::from_secs(4);
slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec<Hash> },
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}
#[derive(Debug)]
pub enum FromQueue {
RemoveArtifact { artifact: ArtifactId, reply_to: oneshot::Sender<()> },
}
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub pvd: Arc<PersistedValidationData>,
pub pov: Arc<PoV>,
pub executor_params: ExecutorParams,
pub result_tx: ResultSender,
pub exec_kind: PvfExecKind,
}
struct ExecuteJob {
artifact: ArtifactPathId,
exec_timeout: Duration,
exec_kind: PvfExecKind,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
result_tx: ResultSender,
waiting_since: Instant,
}
struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
executor_params_hash: ExecutorParamsHash,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
struct Workers {
running: HopSlotMap<Worker, WorkerData>,
spawn_inflight: usize,
capacity: usize,
}
impl Workers {
fn can_afford_one_more(&self) -> bool {
self.spawn_inflight + self.running.len() < self.capacity
}
fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option<Worker> {
self.running.iter().find_map(|d| {
if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
Some(d.0)
} else {
None
}
})
}
fn find_idle(&self) -> Option<Worker> {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
}
fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
self.running.get_mut(worker)?.idle.take()
}
}
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle, ExecuteJob),
FinishWork(
Worker,
Result<WorkerInterfaceResponse, WorkerInterfaceError>,
ArtifactId,
ResultSender,
),
}
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
struct Queue {
metrics: Metrics,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
unscheduled: Unscheduled,
workers: Workers,
mux: Mux,
active_leaves: HashMap<Hash, Vec<Hash>>,
}
impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
cache_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
) -> Self {
Self {
metrics,
program_path,
cache_path,
spawn_timeout,
node_version,
security_status,
to_queue_rx,
from_queue_tx,
unscheduled: Unscheduled::new(),
mux: Mux::new(),
workers: Workers {
running: HopSlotMap::with_capacity_and_key(10),
spawn_inflight: 0,
capacity: worker_capacity,
},
active_leaves: Default::default(),
}
}
async fn run(mut self) {
loop {
futures::select! {
to_queue = self.to_queue_rx.next() => {
if let Some(to_queue) = to_queue {
handle_to_queue(&mut self, to_queue);
} else {
break;
}
}
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&self.metrics, &mut self.workers).await;
}
}
fn try_assign_next_job(&mut self, finished_worker: Option<Worker>) {
let priority = self.unscheduled.select_next_priority();
let Some(queue) = self.unscheduled.get_mut(priority) else { return };
let eldest = if let Some(eldest) = queue.get(0) { eldest } else { return };
let mut worker = None;
let mut job_index = 0;
if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
if let Some(finished_worker) = finished_worker {
if let Some(worker_data) = self.workers.running.get(finished_worker) {
for (i, job) in queue.iter().enumerate() {
if worker_data.executor_params_hash == job.executor_params.hash() {
(worker, job_index) = (Some(finished_worker), i);
break
}
}
}
}
}
if worker.is_none() {
worker = self.workers.find_available(queue[job_index].executor_params.hash());
}
if worker.is_none() {
if let Some(idle) = self.workers.find_idle() {
if self.workers.running.remove(idle).is_some() {
self.metrics.execute_worker().on_retired();
}
}
}
if worker.is_none() && !self.workers.can_afford_one_more() {
return
}
let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed");
let exec_kind = job.exec_kind;
if let Some(worker) = worker {
assign(self, worker, job);
} else {
spawn_extra_worker(self, job);
}
self.metrics.on_execute_kind(exec_kind);
self.unscheduled.mark_scheduled(priority);
}
fn update_active_leaves(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
self.prune_deactivated_leaves(&update);
self.insert_active_leaf(update, ancestors);
self.prune_old_jobs();
}
fn prune_deactivated_leaves(&mut self, update: &ActiveLeavesUpdate) {
for hash in &update.deactivated {
let _ = self.active_leaves.remove(&hash);
}
}
fn insert_active_leaf(&mut self, update: ActiveLeavesUpdate, ancestors: Vec<Hash>) {
let Some(leaf) = update.activated else { return };
let _ = self.active_leaves.insert(leaf.hash, ancestors);
}
fn prune_old_jobs(&mut self) {
for &priority in &[Priority::Backing, Priority::BackingSystemParas] {
let Some(queue) = self.unscheduled.get_mut(priority) else { continue };
let to_remove: Vec<usize> = queue
.iter()
.enumerate()
.filter_map(|(index, job)| {
let relay_parent = match job.exec_kind {
PvfExecKind::Backing(x) | PvfExecKind::BackingSystemParas(x) => x,
_ => return None,
};
let in_active_fork = self.active_leaves.iter().any(|(hash, ancestors)| {
*hash == relay_parent || ancestors.contains(&relay_parent)
});
if in_active_fork {
None
} else {
Some(index)
}
})
.collect();
for &index in to_remove.iter().rev() {
if index > queue.len() {
continue
}
let Some(job) = queue.remove(index) else { continue };
let _ = job.result_tx.send(Err(ValidationError::ExecutionDeadline));
gum::warn!(
target: LOG_TARGET,
?priority,
exec_kind = ?job.exec_kind,
"Job exceeded its deadline and was dropped without execution",
);
}
}
}
}
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
to_remove.push(worker);
}
}
for w in to_remove {
if workers.running.remove(w).is_some() {
metrics.execute_worker().on_retired();
}
}
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
match to_queue {
ToQueue::UpdateActiveLeaves { update, ancestors } => {
queue.update_active_leaves(update, ancestors);
},
ToQueue::Enqueue { artifact, pending_execution_request } => {
let PendingExecutionRequest {
exec_timeout,
pvd,
pov,
executor_params,
result_tx,
exec_kind,
} = pending_execution_request;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.observe_pov_size(pov.block_data.0.len(), true);
queue.metrics.execute_enqueued();
let job = ExecuteJob {
artifact,
exec_timeout,
exec_kind,
pvd,
pov,
executor_params,
result_tx,
waiting_since: Instant::now(),
};
queue.unscheduled.add(job, exec_kind.into());
queue.try_assign_next_job(None);
},
}
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn(idle, handle, job) => {
handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::FinishWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx).await;
},
}
}
fn handle_worker_spawned(
queue: &mut Queue,
idle: IdleWorker,
handle: WorkerHandle,
job: ExecuteJob,
) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData {
idle: Some(idle),
handle,
executor_params_hash: job.executor_params.hash(),
});
gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
assign(queue, worker, job);
}
async fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
worker_result: Result<WorkerInterfaceResponse, WorkerInterfaceError>,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result {
Ok(WorkerInterfaceResponse {
worker_response:
WorkerResponse {
job_response: JobResponse::Ok { result_descriptor },
duration,
pov_size,
},
idle_worker,
}) => {
(Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size))
},
Ok(WorkerInterfaceResponse {
worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
idle_worker,
}) => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
None,
None,
None,
),
Ok(WorkerInterfaceResponse {
worker_response:
WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. },
idle_worker,
}) => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)),
None,
None,
None,
),
Ok(WorkerInterfaceResponse {
worker_response:
WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. },
idle_worker,
}) => {
let (result_tx, result_rx) = oneshot::channel();
queue
.from_queue_tx
.unbounded_send(FromQueue::RemoveArtifact {
artifact: artifact_id.clone(),
reply_to: result_tx,
})
.expect("from execute queue receiver is listened by the host; qed");
(
Some(idle_worker),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(
err,
))),
None,
Some(result_rx),
None,
)
},
Err(WorkerInterfaceError::InternalError(err)) |
Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
(None, Err(ValidationError::Internal(err)), None, None, None),
Err(WorkerInterfaceError::HardTimeout) |
Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None),
Err(WorkerInterfaceError::CommunicationErr(_err)) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
None,
None,
None,
),
Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
None,
None,
None,
),
Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
None,
None,
None,
),
};
queue.metrics.execute_finished();
if let Some(pov_size) = pov_size {
queue.metrics.observe_pov_size(pov_size as usize, false)
}
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?artifact_id,
?worker,
worker_rip = idle_worker.is_none(),
"execution worker concluded, error occurred: {}",
err
);
} else {
gum::trace!(
target: LOG_TARGET,
?artifact_id,
?worker,
worker_rip = idle_worker.is_none(),
?duration,
"execute worker concluded successfully",
);
}
if let Some(sync_channel) = sync_channel {
let _ = sync_channel.await;
}
let _ = result_tx.send(result);
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
return queue.try_assign_next_job(Some(worker))
}
} else {
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
}
queue.try_assign_next_job(None);
}
fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
queue.metrics.execute_worker().on_begin_spawn();
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
queue.mux.push(
spawn_worker_task(
queue.program_path.clone(),
queue.cache_path.clone(),
job,
queue.spawn_timeout,
queue.node_version.clone(),
queue.security_status.clone(),
)
.boxed(),
);
queue.workers.spawn_inflight += 1;
}
async fn spawn_worker_task(
program_path: PathBuf,
cache_path: PathBuf,
job: ExecuteJob,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker_interface::spawn(
&program_path,
&cache_path,
job.executor_params.clone(),
spawn_timeout,
node_version.as_deref(),
security_status.clone(),
)
.await
{
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
Delay::new(Duration::from_secs(3)).await;
},
}
}
}
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?job.artifact.id,
?worker,
"assigning the execute worker",
);
debug_assert_eq!(
queue
.workers
.running
.get(worker)
.expect("caller must provide existing worker; qed")
.executor_params_hash,
job.executor_params.hash()
);
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
qed.",
);
queue
.metrics
.observe_execution_queued_time(job.waiting_since.elapsed().as_millis() as u32);
let execution_timer = queue.metrics.time_execution();
queue.mux.push(
async move {
let _timer = execution_timer;
let result = super::worker_interface::start_work(
idle,
job.artifact.clone(),
job.exec_timeout,
job.pvd,
job.pov,
)
.await;
QueueEvent::FinishWork(worker, result, job.artifact.id, job.result_tx)
}
.boxed(),
);
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
cache_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
let run = Queue::new(
metrics,
program_path,
cache_path,
worker_capacity,
spawn_timeout,
node_version,
security_status,
to_queue_rx,
from_queue_tx,
)
.run();
(to_queue_tx, from_queue_rx, run)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)]
enum Priority {
Dispute,
Approval,
BackingSystemParas,
Backing,
}
impl From<PvfExecKind> for Priority {
fn from(kind: PvfExecKind) -> Self {
match kind {
PvfExecKind::Dispute => Priority::Dispute,
PvfExecKind::Approval => Priority::Approval,
PvfExecKind::BackingSystemParas(_) => Priority::BackingSystemParas,
PvfExecKind::Backing(_) => Priority::Backing,
}
}
}
struct Unscheduled {
unscheduled: HashMap<Priority, VecDeque<ExecuteJob>>,
counter: HashMap<Priority, usize>,
}
impl Unscheduled {
const SCHEDULING_WINDOW_SIZE: usize = 12;
const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(Priority, usize)] = &[
(Priority::Dispute, 70),
(Priority::Approval, 80),
(Priority::BackingSystemParas, 100),
(Priority::Backing, 100),
];
fn new() -> Self {
Self {
unscheduled: Priority::iter().map(|priority| (priority, VecDeque::new())).collect(),
counter: Priority::iter().map(|priority| (priority, 0)).collect(),
}
}
fn select_next_priority(&self) -> Priority {
gum::debug!(
target: LOG_TARGET,
unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::<HashMap<Priority, usize>>(),
counter = ?self.counter,
"Selecting next execution priority...",
);
let priority = Priority::iter()
.find(|priority| self.has_pending(priority) && !self.has_reached_threshold(priority))
.unwrap_or_else(|| {
Priority::iter()
.find(|priority| self.has_pending(priority))
.unwrap_or(Priority::Backing)
});
gum::debug!(
target: LOG_TARGET,
?priority,
"Selected next execution priority",
);
priority
}
fn get_mut(&mut self, priority: Priority) -> Option<&mut VecDeque<ExecuteJob>> {
self.unscheduled.get_mut(&priority)
}
fn add(&mut self, job: ExecuteJob, priority: Priority) {
self.unscheduled.entry(priority).or_default().push_back(job);
}
fn has_pending(&self, priority: &Priority) -> bool {
!self.unscheduled.get(priority).unwrap_or(&VecDeque::new()).is_empty()
}
fn priority_allocation_threshold(priority: &Priority) -> Option<usize> {
Self::PRIORITY_ALLOCATION_THRESHOLDS.iter().find_map(|&(p, value)| {
if p == *priority {
Some(value)
} else {
None
}
})
}
fn has_reached_threshold(&self, priority: &Priority) -> bool {
let Some(threshold) = Self::priority_allocation_threshold(priority) else { return false };
let Some(count) = self.counter.get(&priority) else { return false };
let total_scheduled_at_priority_or_lower: usize = self
.counter
.iter()
.filter_map(|(p, c)| if *p >= *priority { Some(c) } else { None })
.sum();
if total_scheduled_at_priority_or_lower == 0 {
return false
}
let has_reached_threshold = count * 100 / total_scheduled_at_priority_or_lower >= threshold;
gum::debug!(
target: LOG_TARGET,
?priority,
?count,
?total_scheduled_at_priority_or_lower,
"Execution priority has {}reached threshold: {}/{}%",
if has_reached_threshold {""} else {"not "},
count * 100 / total_scheduled_at_priority_or_lower,
threshold
);
has_reached_threshold
}
fn mark_scheduled(&mut self, priority: Priority) {
*self.counter.entry(priority).or_default() += 1;
if self.counter.values().sum::<usize>() >= Self::SCHEDULING_WINDOW_SIZE {
self.reset_counter();
}
gum::debug!(
target: LOG_TARGET,
?priority,
"Job marked as scheduled",
);
}
fn reset_counter(&mut self) {
self.counter = Priority::iter().map(|kind| (kind, 0)).collect();
}
}
#[cfg(test)]
mod tests {
use polkadot_node_primitives::BlockData;
use polkadot_node_subsystem_test_helpers::mock::new_leaf;
use sp_core::H256;
use super::*;
use crate::testing::artifact_id;
use std::time::Duration;
fn create_execution_job() -> ExecuteJob {
let (result_tx, _result_rx) = oneshot::channel();
let pvd = Arc::new(PersistedValidationData {
parent_head: Default::default(),
relay_parent_number: 1u32,
relay_parent_storage_root: H256::default(),
max_pov_size: 4096 * 1024,
});
let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
ExecuteJob {
artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() },
exec_timeout: Duration::from_secs(10),
exec_kind: PvfExecKind::Approval,
pvd,
pov,
executor_params: ExecutorParams::default(),
result_tx,
waiting_since: Instant::now(),
}
}
#[test]
fn test_unscheduled_add() {
let mut unscheduled = Unscheduled::new();
Priority::iter().for_each(|priority| {
unscheduled.add(create_execution_job(), priority);
});
Priority::iter().for_each(|priority| {
let queue = unscheduled.unscheduled.get(&priority).unwrap();
assert_eq!(queue.len(), 1);
});
}
#[test]
fn test_unscheduled_priority_distribution() {
use Priority::*;
let mut priorities = vec![];
let mut unscheduled = Unscheduled::new();
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
unscheduled.add(create_execution_job(), Dispute);
unscheduled.add(create_execution_job(), Approval);
unscheduled.add(create_execution_job(), BackingSystemParas);
unscheduled.add(create_execution_job(), Backing);
}
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
let priority = unscheduled.select_next_priority();
priorities.push(priority);
unscheduled.mark_scheduled(priority);
}
assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 1);
}
#[test]
fn test_unscheduled_priority_distribution_without_backing_system_paras() {
use Priority::*;
let mut priorities = vec![];
let mut unscheduled = Unscheduled::new();
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
unscheduled.add(create_execution_job(), Dispute);
unscheduled.add(create_execution_job(), Approval);
unscheduled.add(create_execution_job(), Backing);
}
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
let priority = unscheduled.select_next_priority();
priorities.push(priority);
unscheduled.mark_scheduled(priority);
}
assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8);
assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3);
assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
}
#[test]
fn test_unscheduled_priority_distribution_without_disputes() {
use Priority::*;
let mut priorities = vec![];
let mut unscheduled = Unscheduled::new();
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
unscheduled.add(create_execution_job(), Approval);
unscheduled.add(create_execution_job(), BackingSystemParas);
unscheduled.add(create_execution_job(), Backing);
}
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
let priority = unscheduled.select_next_priority();
priorities.push(priority);
unscheduled.mark_scheduled(priority);
}
assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 9);
assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 2);
assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
}
#[test]
fn test_unscheduled_priority_distribution_without_disputes_and_only_one_backing() {
use Priority::*;
let mut priorities = vec![];
let mut unscheduled = Unscheduled::new();
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
unscheduled.add(create_execution_job(), Approval);
}
unscheduled.add(create_execution_job(), Backing);
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
let priority = unscheduled.select_next_priority();
priorities.push(priority);
unscheduled.mark_scheduled(priority);
}
assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 11);
assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1);
}
#[test]
fn test_unscheduled_does_not_postpone_backing() {
use Priority::*;
let mut priorities = vec![];
let mut unscheduled = Unscheduled::new();
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
unscheduled.add(create_execution_job(), Approval);
}
unscheduled.add(create_execution_job(), Backing);
for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE {
let priority = unscheduled.select_next_priority();
priorities.push(priority);
unscheduled.mark_scheduled(priority);
}
assert_eq!(&priorities[..4], &[Approval, Backing, Approval, Approval]);
}
#[tokio::test]
async fn test_prunes_old_jobs_on_active_leaves_update() {
let (_, to_queue_rx) = mpsc::channel(1);
let (from_queue_tx, _) = mpsc::unbounded();
let mut queue = Queue::new(
Metrics::default(),
PathBuf::new(),
PathBuf::new(),
1,
Duration::from_secs(1),
None,
SecurityStatus::default(),
to_queue_rx,
from_queue_tx,
);
let old_relay_parent = Hash::random();
let relevant_relay_parent = Hash::random();
assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 0);
let mut result_rxs = vec![];
let (result_tx, _result_rx) = oneshot::channel();
let relevant_job = ExecuteJob {
artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() },
exec_timeout: Duration::from_secs(1),
exec_kind: PvfExecKind::Backing(relevant_relay_parent),
pvd: Arc::new(PersistedValidationData::default()),
pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
executor_params: ExecutorParams::default(),
result_tx,
waiting_since: Instant::now(),
};
queue.unscheduled.add(relevant_job, Priority::Backing);
for _ in 0..10 {
let (result_tx, result_rx) = oneshot::channel();
let expired_job = ExecuteJob {
artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() },
exec_timeout: Duration::from_secs(1),
exec_kind: PvfExecKind::Backing(old_relay_parent),
pvd: Arc::new(PersistedValidationData::default()),
pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }),
executor_params: ExecutorParams::default(),
result_tx,
waiting_since: Instant::now(),
};
queue.unscheduled.add(expired_job, Priority::Backing);
result_rxs.push(result_rx);
}
assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 11);
queue.update_active_leaves(
ActiveLeavesUpdate::start_work(new_leaf(Hash::random(), 1)),
vec![relevant_relay_parent],
);
for rx in result_rxs {
assert!(matches!(rx.await, Ok(Err(ValidationError::ExecutionDeadline))));
}
assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::<usize>(), 1);
}
}