polkadot_subsystem_bench/
environment.rs1use crate::{
20 configuration::{TestAuthorities, TestConfiguration},
21 mock::AlwaysSupportsParachains,
22 network::NetworkEmulatorHandle,
23 usage::{BenchmarkUsage, ResourceUsage},
24};
25use core::time::Duration;
26use futures::{Future, FutureExt};
27use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
28use polkadot_node_subsystem_types::Hash;
29use polkadot_node_subsystem_util::metrics::prometheus::{
30 self, Gauge, Histogram, PrometheusError, Registry, U64,
31};
32use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
33use sc_service::{SpawnTaskHandle, TaskManager};
34use std::net::{Ipv4Addr, SocketAddr};
35use tokio::runtime::Handle;
36
37const LOG_TARGET: &str = "subsystem-bench::environment";
38
39#[derive(Clone)]
41pub struct TestEnvironmentMetrics {
42 n_validators: Gauge<U64>,
44 n_cores: Gauge<U64>,
46 pov_size: Histogram,
48 current_block: Gauge<U64>,
50 block_time: Gauge<U64>,
52}
53
54impl TestEnvironmentMetrics {
55 pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
56 let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
57 .expect("arguments are always valid; qed");
58
59 Ok(Self {
60 n_validators: prometheus::register(
61 Gauge::new(
62 "subsystem_benchmark_n_validators",
63 "Total number of validators in the test",
64 )?,
65 registry,
66 )?,
67 n_cores: prometheus::register(
68 Gauge::new(
69 "subsystem_benchmark_n_cores",
70 "Number of cores we fetch availability for each block",
71 )?,
72 registry,
73 )?,
74 current_block: prometheus::register(
75 Gauge::new("subsystem_benchmark_current_block", "The current test block")?,
76 registry,
77 )?,
78 block_time: prometheus::register(
79 Gauge::new("subsystem_benchmark_block_time", "The time it takes for the target subsystems(s) to complete all the requests in a block")?,
80 registry,
81 )?,
82 pov_size: prometheus::register(
83 Histogram::with_opts(
84 prometheus::HistogramOpts::new(
85 "subsystem_benchmark_pov_size",
86 "The compressed size of the proof of validity of a candidate",
87 )
88 .buckets(buckets),
89 )?,
90 registry,
91 )?,
92 })
93 }
94
95 pub fn set_n_validators(&self, n_validators: usize) {
96 self.n_validators.set(n_validators as u64);
97 }
98
99 pub fn set_n_cores(&self, n_cores: usize) {
100 self.n_cores.set(n_cores as u64);
101 }
102
103 pub fn set_current_block(&self, current_block: usize) {
104 self.current_block.set(current_block as u64);
105 }
106
107 pub fn set_block_time(&self, block_time_ms: u64) {
108 self.block_time.set(block_time_ms);
109 }
110
111 pub fn on_pov_size(&self, pov_size: usize) {
112 self.pov_size.observe(pov_size as f64);
113 }
114}
115
116fn new_runtime() -> tokio::runtime::Runtime {
117 tokio::runtime::Builder::new_multi_thread()
118 .thread_name("subsystem-bench")
119 .enable_all()
120 .thread_stack_size(3 * 1024 * 1024)
121 .worker_threads(4)
122 .build()
123 .unwrap()
124}
125
126pub struct TestEnvironmentDependencies {
128 pub registry: Registry,
129 pub task_manager: TaskManager,
130 pub runtime: tokio::runtime::Runtime,
131}
132
133impl Default for TestEnvironmentDependencies {
134 fn default() -> Self {
135 let runtime = new_runtime();
136 let registry = Registry::new();
137 let task_manager: TaskManager =
138 TaskManager::new(runtime.handle().clone(), Some(®istry)).unwrap();
139
140 Self { runtime, registry, task_manager }
141 }
142}
143
144pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
146
147pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
151
152pub struct TestEnvironment {
180 dependencies: TestEnvironmentDependencies,
182 runtime_handle: tokio::runtime::Handle,
184 overseer_handle: OverseerHandle,
186 config: TestConfiguration,
188 network: NetworkEmulatorHandle,
190 metrics: TestEnvironmentMetrics,
192 authorities: TestAuthorities,
194}
195
196impl TestEnvironment {
197 pub fn new(
199 dependencies: TestEnvironmentDependencies,
200 config: TestConfiguration,
201 network: NetworkEmulatorHandle,
202 overseer: Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
203 overseer_handle: OverseerHandle,
204 authorities: TestAuthorities,
205 with_prometheus_endpoint: bool,
206 ) -> Self {
207 let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
208 .expect("Metrics need to be registered");
209
210 let spawn_handle = dependencies.task_manager.spawn_handle();
211 spawn_handle.spawn_blocking("overseer", "overseer", overseer.run().boxed());
212
213 if with_prometheus_endpoint {
214 let registry_clone = dependencies.registry.clone();
215 dependencies.task_manager.spawn_handle().spawn_blocking(
216 "prometheus",
217 "test-environment",
218 async move {
219 prometheus_endpoint::init_prometheus(
220 SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 9999),
221 registry_clone,
222 )
223 .await
224 .unwrap();
225 },
226 );
227 }
228
229 TestEnvironment {
230 runtime_handle: dependencies.runtime.handle().clone(),
231 dependencies,
232 overseer_handle,
233 config,
234 network,
235 metrics,
236 authorities,
237 }
238 }
239
240 pub fn config(&self) -> &TestConfiguration {
242 &self.config
243 }
244
245 pub fn network(&self) -> &NetworkEmulatorHandle {
247 &self.network
248 }
249
250 pub fn overseer_handle(&self) -> &OverseerHandle {
252 &self.overseer_handle
253 }
254
255 pub fn registry(&self) -> &Registry {
257 &self.dependencies.registry
258 }
259
260 #[allow(unused)]
262 pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
263 self.dependencies
264 .task_manager
265 .spawn_handle()
266 .spawn(name, "test-environment", task);
267 }
268
269 pub fn spawn_blocking(
271 &self,
272 name: &'static str,
273 task: impl Future<Output = ()> + Send + 'static,
274 ) {
275 self.dependencies.task_manager.spawn_handle().spawn_blocking(
276 name,
277 "test-environment",
278 task,
279 );
280 }
281 pub fn metrics(&self) -> &TestEnvironmentMetrics {
283 &self.metrics
284 }
285
286 pub fn runtime(&self) -> Handle {
288 self.runtime_handle.clone()
289 }
290
291 pub fn authorities(&self) -> &TestAuthorities {
293 &self.authorities
294 }
295
296 pub async fn send_message(&mut self, msg: AllMessages) {
298 self.overseer_handle
299 .send_msg(msg, LOG_TARGET)
300 .timeout(MAX_TIME_OF_FLIGHT)
301 .await
302 .unwrap_or_else(|| {
303 panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
304 });
305 }
306
307 pub async fn import_block(&mut self, block: BlockInfo) {
309 self.overseer_handle
310 .block_imported(block)
311 .timeout(MAX_TIME_OF_FLIGHT)
312 .await
313 .unwrap_or_else(|| {
314 panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
315 });
316 }
317
318 pub async fn stop(&mut self) {
320 self.overseer_handle.stop().await;
321 }
322
323 pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
325 let test_metrics = super::display::parse_metrics(registry);
326 test_metrics.metric_lower_than(metric_name, value)
327 }
328
329 pub async fn wait_until_metric(
331 &self,
332 metric_name: &str,
333 label: Option<(&str, &str)>,
334 condition: impl Fn(f64) -> bool,
335 ) {
336 loop {
337 let test_metrics = if let Some((label_name, label_value)) = label {
338 super::display::parse_metrics(self.registry())
339 .subset_with_label_value(label_name, label_value)
340 } else {
341 super::display::parse_metrics(self.registry())
342 };
343 let current_value = test_metrics.sum_by(metric_name);
344
345 gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
346 if condition(current_value) {
347 break
348 }
349 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
351 }
352 }
353
354 pub fn collect_resource_usage(
355 &self,
356 subsystems_under_test: &[&str],
357 break_down_cpu_usage_per_task: bool,
358 ) -> BenchmarkUsage {
359 BenchmarkUsage {
360 network_usage: self.network_usage(),
361 cpu_usage: self.cpu_usage(subsystems_under_test, break_down_cpu_usage_per_task),
362 }
363 }
364
365 fn network_usage(&self) -> Vec<ResourceUsage> {
366 let stats = self.network().peer_stats(0);
367 let total_node_received = (stats.received() / 1024) as f64;
368 let total_node_sent = (stats.sent() / 1024) as f64;
369 let num_blocks = self.config().num_blocks as f64;
370
371 vec![
372 ResourceUsage {
373 resource_name: "Received from peers".to_string(),
374 total: total_node_received,
375 per_block: total_node_received / num_blocks,
376 },
377 ResourceUsage {
378 resource_name: "Sent to peers".to_string(),
379 total: total_node_sent,
380 per_block: total_node_sent / num_blocks,
381 },
382 ]
383 }
384
385 fn cpu_usage(
386 &self,
387 subsystems_under_test: &[&str],
388 break_down_per_task: bool,
389 ) -> Vec<ResourceUsage> {
390 let test_metrics = super::display::parse_metrics(self.registry());
391 let mut usage = vec![];
392 let num_blocks = self.config().num_blocks as f64;
393
394 for subsystem in subsystems_under_test.iter() {
395 let subsystem_cpu_metrics =
396 test_metrics.subset_with_label_value("task_group", subsystem);
397 let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
398 usage.push(ResourceUsage {
399 resource_name: subsystem.to_string(),
400 total: total_cpu,
401 per_block: total_cpu / num_blocks,
402 });
403
404 if break_down_per_task {
405 for metric in subsystem_cpu_metrics.all() {
406 if metric.name() != "substrate_tasks_polling_duration_sum" {
407 continue;
408 }
409
410 if let Some(task_name) = metric.label_value("task_name") {
411 usage.push(ResourceUsage {
412 resource_name: format!("{subsystem}/{task_name}"),
413 total: metric.value(),
414 per_block: metric.value() / num_blocks,
415 });
416 }
417 }
418 }
419 }
420
421 let test_env_cpu_metrics =
422 test_metrics.subset_with_label_value("task_group", "test-environment");
423 let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
424
425 usage.push(ResourceUsage {
426 resource_name: "test-environment".to_string(),
427 total: total_cpu,
428 per_block: total_cpu / num_blocks,
429 });
430
431 usage
432 }
433}