referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/
environment.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Test environment implementation
18
19use crate::{
20	configuration::{TestAuthorities, TestConfiguration},
21	mock::AlwaysSupportsParachains,
22	network::NetworkEmulatorHandle,
23	usage::{BenchmarkUsage, ResourceUsage},
24};
25use core::time::Duration;
26use futures::{Future, FutureExt};
27use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
28use polkadot_node_subsystem_types::Hash;
29use polkadot_node_subsystem_util::metrics::prometheus::{
30	self, Gauge, Histogram, PrometheusError, Registry, U64,
31};
32use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
33use sc_service::{SpawnTaskHandle, TaskManager};
34use std::net::{Ipv4Addr, SocketAddr};
35use tokio::runtime::Handle;
36
37const LOG_TARGET: &str = "subsystem-bench::environment";
38
39/// Test environment/configuration metrics
40#[derive(Clone)]
41pub struct TestEnvironmentMetrics {
42	/// Number of bytes sent per peer.
43	n_validators: Gauge<U64>,
44	/// Number of received sent per peer.
45	n_cores: Gauge<U64>,
46	/// PoV size
47	pov_size: Histogram,
48	/// Current block
49	current_block: Gauge<U64>,
50	/// Current block
51	block_time: Gauge<U64>,
52}
53
54impl TestEnvironmentMetrics {
55	pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
56		let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
57			.expect("arguments are always valid; qed");
58
59		Ok(Self {
60			n_validators: prometheus::register(
61				Gauge::new(
62					"subsystem_benchmark_n_validators",
63					"Total number of validators in the test",
64				)?,
65				registry,
66			)?,
67			n_cores: prometheus::register(
68				Gauge::new(
69					"subsystem_benchmark_n_cores",
70					"Number of cores we fetch availability for each block",
71				)?,
72				registry,
73			)?,
74			current_block: prometheus::register(
75				Gauge::new("subsystem_benchmark_current_block", "The current test block")?,
76				registry,
77			)?,
78			block_time: prometheus::register(
79				Gauge::new("subsystem_benchmark_block_time", "The time it takes for the target subsystems(s) to complete all the requests in a block")?,
80				registry,
81			)?,
82			pov_size: prometheus::register(
83				Histogram::with_opts(
84					prometheus::HistogramOpts::new(
85						"subsystem_benchmark_pov_size",
86						"The compressed size of the proof of validity of a candidate",
87					)
88					.buckets(buckets),
89				)?,
90				registry,
91			)?,
92		})
93	}
94
95	pub fn set_n_validators(&self, n_validators: usize) {
96		self.n_validators.set(n_validators as u64);
97	}
98
99	pub fn set_n_cores(&self, n_cores: usize) {
100		self.n_cores.set(n_cores as u64);
101	}
102
103	pub fn set_current_block(&self, current_block: usize) {
104		self.current_block.set(current_block as u64);
105	}
106
107	pub fn set_block_time(&self, block_time_ms: u64) {
108		self.block_time.set(block_time_ms);
109	}
110
111	pub fn on_pov_size(&self, pov_size: usize) {
112		self.pov_size.observe(pov_size as f64);
113	}
114}
115
116fn new_runtime() -> tokio::runtime::Runtime {
117	tokio::runtime::Builder::new_multi_thread()
118		.thread_name("subsystem-bench")
119		.enable_all()
120		.thread_stack_size(3 * 1024 * 1024)
121		.worker_threads(4)
122		.build()
123		.unwrap()
124}
125
126/// Wrapper for dependencies
127pub struct TestEnvironmentDependencies {
128	pub registry: Registry,
129	pub task_manager: TaskManager,
130	pub runtime: tokio::runtime::Runtime,
131}
132
133impl Default for TestEnvironmentDependencies {
134	fn default() -> Self {
135		let runtime = new_runtime();
136		let registry = Registry::new();
137		let task_manager: TaskManager =
138			TaskManager::new(runtime.handle().clone(), Some(&registry)).unwrap();
139
140		Self { runtime, registry, task_manager }
141	}
142}
143
144// A dummy genesis hash
145pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
146
147// We use this to bail out sending messages to the subsystem if it is overloaded such that
148// the time of flight is breaches 5s.
149// This should eventually be a test parameter.
150pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
151
152/// The test environment is the high level wrapper of all things required to test
153/// a certain subsystem.
154///
155/// ## Mockups
156/// The overseer is passed in during construction and it can host an arbitrary number of
157/// real subsystems instances and the corresponding mocked instances such that the real
158/// subsystems can get their messages answered.
159///
160/// As the subsystem's performance depends on network connectivity, the test environment
161/// emulates validator nodes on the network, see `NetworkEmulator`. The network emulation
162/// is configurable in terms of peer bandwidth, latency and connection error rate using
163/// uniform distribution sampling.
164///
165///
166/// ## Usage
167/// `TestEnvironment` is used in tests to send `Overseer` messages or signals to the subsystem
168/// under test.
169///
170/// ## Collecting test metrics
171///
172/// ### Prometheus
173/// A prometheus endpoint is exposed while the test is running. A local Prometheus instance
174/// can scrape it every 1s and a Grafana dashboard is the preferred way of visualizing
175/// the performance characteristics of the subsystem.
176///
177/// ### CLI
178/// A subset of the Prometheus metrics are printed at the end of the test.
179pub struct TestEnvironment {
180	/// Test dependencies
181	dependencies: TestEnvironmentDependencies,
182	/// A runtime handle
183	runtime_handle: tokio::runtime::Handle,
184	/// A handle to the lovely overseer
185	overseer_handle: OverseerHandle,
186	/// The test configuration.
187	config: TestConfiguration,
188	/// A handle to the network emulator.
189	network: NetworkEmulatorHandle,
190	/// Configuration/env metrics
191	metrics: TestEnvironmentMetrics,
192	/// Test authorities generated from the configuration.
193	authorities: TestAuthorities,
194}
195
196impl TestEnvironment {
197	/// Create a new test environment
198	pub fn new(
199		dependencies: TestEnvironmentDependencies,
200		config: TestConfiguration,
201		network: NetworkEmulatorHandle,
202		overseer: Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
203		overseer_handle: OverseerHandle,
204		authorities: TestAuthorities,
205		with_prometheus_endpoint: bool,
206	) -> Self {
207		let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
208			.expect("Metrics need to be registered");
209
210		let spawn_handle = dependencies.task_manager.spawn_handle();
211		spawn_handle.spawn_blocking("overseer", "overseer", overseer.run().boxed());
212
213		if with_prometheus_endpoint {
214			let registry_clone = dependencies.registry.clone();
215			dependencies.task_manager.spawn_handle().spawn_blocking(
216				"prometheus",
217				"test-environment",
218				async move {
219					prometheus_endpoint::init_prometheus(
220						SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 9999),
221						registry_clone,
222					)
223					.await
224					.unwrap();
225				},
226			);
227		}
228
229		TestEnvironment {
230			runtime_handle: dependencies.runtime.handle().clone(),
231			dependencies,
232			overseer_handle,
233			config,
234			network,
235			metrics,
236			authorities,
237		}
238	}
239
240	/// Returns the test configuration.
241	pub fn config(&self) -> &TestConfiguration {
242		&self.config
243	}
244
245	/// Returns a reference to the inner network emulator handle.
246	pub fn network(&self) -> &NetworkEmulatorHandle {
247		&self.network
248	}
249
250	/// Returns a reference to the overseer handle.
251	pub fn overseer_handle(&self) -> &OverseerHandle {
252		&self.overseer_handle
253	}
254
255	/// Returns the Prometheus registry.
256	pub fn registry(&self) -> &Registry {
257		&self.dependencies.registry
258	}
259
260	/// Spawn a named task in the `test-environment` task group.
261	#[allow(unused)]
262	pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
263		self.dependencies
264			.task_manager
265			.spawn_handle()
266			.spawn(name, "test-environment", task);
267	}
268
269	/// Spawn a blocking named task in the `test-environment` task group.
270	pub fn spawn_blocking(
271		&self,
272		name: &'static str,
273		task: impl Future<Output = ()> + Send + 'static,
274	) {
275		self.dependencies.task_manager.spawn_handle().spawn_blocking(
276			name,
277			"test-environment",
278			task,
279		);
280	}
281	/// Returns a reference to the test environment metrics instance
282	pub fn metrics(&self) -> &TestEnvironmentMetrics {
283		&self.metrics
284	}
285
286	/// Returns a handle to the tokio runtime.
287	pub fn runtime(&self) -> Handle {
288		self.runtime_handle.clone()
289	}
290
291	/// Returns a reference to the authority keys used in the test.
292	pub fn authorities(&self) -> &TestAuthorities {
293		&self.authorities
294	}
295
296	/// Send a message to the subsystem under test environment.
297	pub async fn send_message(&mut self, msg: AllMessages) {
298		self.overseer_handle
299			.send_msg(msg, LOG_TARGET)
300			.timeout(MAX_TIME_OF_FLIGHT)
301			.await
302			.unwrap_or_else(|| {
303				panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
304			});
305	}
306
307	/// Send an `ActiveLeavesUpdate` signal to all subsystems under test.
308	pub async fn import_block(&mut self, block: BlockInfo) {
309		self.overseer_handle
310			.block_imported(block)
311			.timeout(MAX_TIME_OF_FLIGHT)
312			.await
313			.unwrap_or_else(|| {
314				panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
315			});
316	}
317
318	/// Stop overseer and subsystems.
319	pub async fn stop(&mut self) {
320		self.overseer_handle.stop().await;
321	}
322
323	/// Tells if entries in bucket metric is lower than `value`
324	pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
325		let test_metrics = super::display::parse_metrics(registry);
326		test_metrics.metric_lower_than(metric_name, value)
327	}
328
329	/// Blocks until `metric_name` >= `value`
330	pub async fn wait_until_metric(
331		&self,
332		metric_name: &str,
333		label: Option<(&str, &str)>,
334		condition: impl Fn(f64) -> bool,
335	) {
336		loop {
337			let test_metrics = if let Some((label_name, label_value)) = label {
338				super::display::parse_metrics(self.registry())
339					.subset_with_label_value(label_name, label_value)
340			} else {
341				super::display::parse_metrics(self.registry())
342			};
343			let current_value = test_metrics.sum_by(metric_name);
344
345			gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
346			if condition(current_value) {
347				break
348			}
349			// Check value every 50ms.
350			tokio::time::sleep(std::time::Duration::from_millis(50)).await;
351		}
352	}
353
354	pub fn collect_resource_usage(
355		&self,
356		subsystems_under_test: &[&str],
357		break_down_cpu_usage_per_task: bool,
358	) -> BenchmarkUsage {
359		BenchmarkUsage {
360			network_usage: self.network_usage(),
361			cpu_usage: self.cpu_usage(subsystems_under_test, break_down_cpu_usage_per_task),
362		}
363	}
364
365	fn network_usage(&self) -> Vec<ResourceUsage> {
366		let stats = self.network().peer_stats(0);
367		let total_node_received = (stats.received() / 1024) as f64;
368		let total_node_sent = (stats.sent() / 1024) as f64;
369		let num_blocks = self.config().num_blocks as f64;
370
371		vec![
372			ResourceUsage {
373				resource_name: "Received from peers".to_string(),
374				total: total_node_received,
375				per_block: total_node_received / num_blocks,
376			},
377			ResourceUsage {
378				resource_name: "Sent to peers".to_string(),
379				total: total_node_sent,
380				per_block: total_node_sent / num_blocks,
381			},
382		]
383	}
384
385	fn cpu_usage(
386		&self,
387		subsystems_under_test: &[&str],
388		break_down_per_task: bool,
389	) -> Vec<ResourceUsage> {
390		let test_metrics = super::display::parse_metrics(self.registry());
391		let mut usage = vec![];
392		let num_blocks = self.config().num_blocks as f64;
393
394		for subsystem in subsystems_under_test.iter() {
395			let subsystem_cpu_metrics =
396				test_metrics.subset_with_label_value("task_group", subsystem);
397			let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
398			usage.push(ResourceUsage {
399				resource_name: subsystem.to_string(),
400				total: total_cpu,
401				per_block: total_cpu / num_blocks,
402			});
403
404			if break_down_per_task {
405				for metric in subsystem_cpu_metrics.all() {
406					if metric.name() != "substrate_tasks_polling_duration_sum" {
407						continue;
408					}
409
410					if let Some(task_name) = metric.label_value("task_name") {
411						usage.push(ResourceUsage {
412							resource_name: format!("{subsystem}/{task_name}"),
413							total: metric.value(),
414							per_block: metric.value() / num_blocks,
415						});
416					}
417				}
418			}
419		}
420
421		let test_env_cpu_metrics =
422			test_metrics.subset_with_label_value("task_group", "test-environment");
423		let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
424
425		usage.push(ResourceUsage {
426			resource_name: "test-environment".to_string(),
427			total: total_cpu,
428			per_block: total_cpu / num_blocks,
429		});
430
431		usage
432	}
433}