use crate::{
configuration::{TestAuthorities, TestConfiguration},
mock::AlwaysSupportsParachains,
network::NetworkEmulatorHandle,
usage::{BenchmarkUsage, ResourceUsage},
};
use core::time::Duration;
use futures::{Future, FutureExt};
use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
use polkadot_node_subsystem_types::Hash;
use polkadot_node_subsystem_util::metrics::prometheus::{
self, Gauge, Histogram, PrometheusError, Registry, U64,
};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use sc_service::{SpawnTaskHandle, TaskManager};
use std::net::{Ipv4Addr, SocketAddr};
use tokio::runtime::Handle;
const LOG_TARGET: &str = "subsystem-bench::environment";
#[derive(Clone)]
pub struct TestEnvironmentMetrics {
n_validators: Gauge<U64>,
n_cores: Gauge<U64>,
pov_size: Histogram,
current_block: Gauge<U64>,
block_time: Gauge<U64>,
}
impl TestEnvironmentMetrics {
pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
.expect("arguments are always valid; qed");
Ok(Self {
n_validators: prometheus::register(
Gauge::new(
"subsystem_benchmark_n_validators",
"Total number of validators in the test",
)?,
registry,
)?,
n_cores: prometheus::register(
Gauge::new(
"subsystem_benchmark_n_cores",
"Number of cores we fetch availability for each block",
)?,
registry,
)?,
current_block: prometheus::register(
Gauge::new("subsystem_benchmark_current_block", "The current test block")?,
registry,
)?,
block_time: prometheus::register(
Gauge::new("subsystem_benchmark_block_time", "The time it takes for the target subsystems(s) to complete all the requests in a block")?,
registry,
)?,
pov_size: prometheus::register(
Histogram::with_opts(
prometheus::HistogramOpts::new(
"subsystem_benchmark_pov_size",
"The compressed size of the proof of validity of a candidate",
)
.buckets(buckets),
)?,
registry,
)?,
})
}
pub fn set_n_validators(&self, n_validators: usize) {
self.n_validators.set(n_validators as u64);
}
pub fn set_n_cores(&self, n_cores: usize) {
self.n_cores.set(n_cores as u64);
}
pub fn set_current_block(&self, current_block: usize) {
self.current_block.set(current_block as u64);
}
pub fn set_block_time(&self, block_time_ms: u64) {
self.block_time.set(block_time_ms);
}
pub fn on_pov_size(&self, pov_size: usize) {
self.pov_size.observe(pov_size as f64);
}
}
fn new_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.thread_name("subsystem-bench")
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.worker_threads(4)
.build()
.unwrap()
}
pub struct TestEnvironmentDependencies {
pub registry: Registry,
pub task_manager: TaskManager,
pub runtime: tokio::runtime::Runtime,
}
impl Default for TestEnvironmentDependencies {
fn default() -> Self {
let runtime = new_runtime();
let registry = Registry::new();
let task_manager: TaskManager =
TaskManager::new(runtime.handle().clone(), Some(®istry)).unwrap();
Self { runtime, registry, task_manager }
}
}
pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
pub struct TestEnvironment {
dependencies: TestEnvironmentDependencies,
runtime_handle: tokio::runtime::Handle,
overseer_handle: OverseerHandle,
config: TestConfiguration,
network: NetworkEmulatorHandle,
metrics: TestEnvironmentMetrics,
authorities: TestAuthorities,
}
impl TestEnvironment {
pub fn new(
dependencies: TestEnvironmentDependencies,
config: TestConfiguration,
network: NetworkEmulatorHandle,
overseer: Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
overseer_handle: OverseerHandle,
authorities: TestAuthorities,
with_prometheus_endpoint: bool,
) -> Self {
let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
.expect("Metrics need to be registered");
let spawn_handle = dependencies.task_manager.spawn_handle();
spawn_handle.spawn_blocking("overseer", "overseer", overseer.run().boxed());
if with_prometheus_endpoint {
let registry_clone = dependencies.registry.clone();
dependencies.task_manager.spawn_handle().spawn_blocking(
"prometheus",
"test-environment",
async move {
prometheus_endpoint::init_prometheus(
SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 9999),
registry_clone,
)
.await
.unwrap();
},
);
}
TestEnvironment {
runtime_handle: dependencies.runtime.handle().clone(),
dependencies,
overseer_handle,
config,
network,
metrics,
authorities,
}
}
pub fn config(&self) -> &TestConfiguration {
&self.config
}
pub fn network(&self) -> &NetworkEmulatorHandle {
&self.network
}
pub fn overseer_handle(&self) -> &OverseerHandle {
&self.overseer_handle
}
pub fn registry(&self) -> &Registry {
&self.dependencies.registry
}
#[allow(unused)]
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.dependencies
.task_manager
.spawn_handle()
.spawn(name, "test-environment", task);
}
pub fn spawn_blocking(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.dependencies.task_manager.spawn_handle().spawn_blocking(
name,
"test-environment",
task,
);
}
pub fn metrics(&self) -> &TestEnvironmentMetrics {
&self.metrics
}
pub fn runtime(&self) -> Handle {
self.runtime_handle.clone()
}
pub fn authorities(&self) -> &TestAuthorities {
&self.authorities
}
pub async fn send_message(&mut self, msg: AllMessages) {
self.overseer_handle
.send_msg(msg, LOG_TARGET)
.timeout(MAX_TIME_OF_FLIGHT)
.await
.unwrap_or_else(|| {
panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
});
}
pub async fn import_block(&mut self, block: BlockInfo) {
self.overseer_handle
.block_imported(block)
.timeout(MAX_TIME_OF_FLIGHT)
.await
.unwrap_or_else(|| {
panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
});
}
pub async fn stop(&mut self) {
self.overseer_handle.stop().await;
}
pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
let test_metrics = super::display::parse_metrics(registry);
test_metrics.metric_lower_than(metric_name, value)
}
pub async fn wait_until_metric(
&self,
metric_name: &str,
label: Option<(&str, &str)>,
condition: impl Fn(f64) -> bool,
) {
loop {
let test_metrics = if let Some((label_name, label_value)) = label {
super::display::parse_metrics(self.registry())
.subset_with_label_value(label_name, label_value)
} else {
super::display::parse_metrics(self.registry())
};
let current_value = test_metrics.sum_by(metric_name);
gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
if condition(current_value) {
break
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
pub fn collect_resource_usage(
&self,
subsystems_under_test: &[&str],
break_down_cpu_usage_per_task: bool,
) -> BenchmarkUsage {
BenchmarkUsage {
network_usage: self.network_usage(),
cpu_usage: self.cpu_usage(subsystems_under_test, break_down_cpu_usage_per_task),
}
}
fn network_usage(&self) -> Vec<ResourceUsage> {
let stats = self.network().peer_stats(0);
let total_node_received = (stats.received() / 1024) as f64;
let total_node_sent = (stats.sent() / 1024) as f64;
let num_blocks = self.config().num_blocks as f64;
vec![
ResourceUsage {
resource_name: "Received from peers".to_string(),
total: total_node_received,
per_block: total_node_received / num_blocks,
},
ResourceUsage {
resource_name: "Sent to peers".to_string(),
total: total_node_sent,
per_block: total_node_sent / num_blocks,
},
]
}
fn cpu_usage(
&self,
subsystems_under_test: &[&str],
break_down_per_task: bool,
) -> Vec<ResourceUsage> {
let test_metrics = super::display::parse_metrics(self.registry());
let mut usage = vec![];
let num_blocks = self.config().num_blocks as f64;
for subsystem in subsystems_under_test.iter() {
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
usage.push(ResourceUsage {
resource_name: subsystem.to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
if break_down_per_task {
for metric in subsystem_cpu_metrics.all() {
if metric.name() != "substrate_tasks_polling_duration_sum" {
continue;
}
if let Some(task_name) = metric.label_value("task_name") {
usage.push(ResourceUsage {
resource_name: format!("{}/{}", subsystem, task_name),
total: metric.value(),
per_block: metric.value() / num_blocks,
});
}
}
}
}
let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
usage.push(ResourceUsage {
resource_name: "test-environment".to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
usage
}
}