referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/availability/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	availability::av_store_helpers::new_av_store,
19	dummy_builder,
20	environment::{TestEnvironment, TestEnvironmentDependencies},
21	mock::{
22		av_store::{MockAvailabilityStore, NetworkAvailabilityState},
23		chain_api::{ChainApiState, MockChainApi},
24		network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
25		runtime_api::{default_node_features, MockRuntimeApi, MockRuntimeApiCoreState},
26		AlwaysSupportsParachains,
27	},
28	network::new_network,
29	usage::BenchmarkUsage,
30};
31use colored::Colorize;
32use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
33
34use codec::Encode;
35use polkadot_availability_bitfield_distribution::BitfieldDistribution;
36use polkadot_availability_distribution::{
37	AvailabilityDistributionSubsystem, IncomingRequestReceivers,
38};
39use polkadot_availability_recovery::{AvailabilityRecoverySubsystem, RecoveryStrategyKind};
40use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
41use polkadot_node_metrics::metrics::Metrics;
42use polkadot_node_network_protocol::{
43	request_response::{v1, v2, IncomingRequest},
44	OurView,
45};
46use polkadot_node_subsystem::{
47	messages::{AllMessages, AvailabilityRecoveryMessage},
48	Overseer, OverseerConnector, SpawnGlue,
49};
50use polkadot_node_subsystem_types::messages::{AvailabilityStoreMessage, NetworkBridgeEvent};
51use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
52use polkadot_primitives::{Block, CoreIndex, GroupIndex, Hash};
53use sc_network::request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig};
54use std::{ops::Sub, sync::Arc, time::Instant};
55use strum::Display;
56
57use sc_service::SpawnTaskHandle;
58use serde::{Deserialize, Serialize};
59pub use test_state::TestState;
60
61mod av_store_helpers;
62mod test_state;
63
64const LOG_TARGET: &str = "subsystem-bench::availability";
65
66#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Display)]
67#[value(rename_all = "kebab-case")]
68#[strum(serialize_all = "kebab-case")]
69pub enum Strategy {
70	/// Regular random chunk recovery. This is also the fallback for the next strategies.
71	Chunks,
72	/// Recovery from systematic chunks. Much faster than regular chunk recovery becasue it avoid
73	/// doing the reed-solomon reconstruction.
74	Systematic,
75	/// Fetch the full availability datafrom backers first. Saves CPU as we don't need to
76	/// re-construct from chunks. Typically this is only faster if nodes have enough bandwidth.
77	FullFromBackers,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
81#[clap(rename_all = "kebab-case")]
82#[allow(missing_docs)]
83pub struct DataAvailabilityReadOptions {
84	#[clap(short, long, default_value_t = Strategy::Systematic)]
85	pub strategy: Strategy,
86}
87
88pub enum TestDataAvailability {
89	Read(DataAvailabilityReadOptions),
90	Write,
91}
92
93fn build_overseer_for_availability_read(
94	spawn_task_handle: SpawnTaskHandle,
95	runtime_api: MockRuntimeApi,
96	av_store: MockAvailabilityStore,
97	(network_bridge_tx, network_bridge_rx): (MockNetworkBridgeTx, MockNetworkBridgeRx),
98	availability_recovery: AvailabilityRecoverySubsystem,
99	dependencies: &TestEnvironmentDependencies,
100) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
101	let overseer_connector = OverseerConnector::with_event_capacity(64000);
102	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
103
104	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
105	let builder = dummy
106		.replace_runtime_api(|_| runtime_api)
107		.replace_availability_store(|_| av_store)
108		.replace_network_bridge_tx(|_| network_bridge_tx)
109		.replace_network_bridge_rx(|_| network_bridge_rx)
110		.replace_availability_recovery(|_| availability_recovery);
111
112	let (overseer, raw_handle) =
113		builder.build_with_connector(overseer_connector).expect("Should not fail");
114
115	(overseer, OverseerHandle::new(raw_handle))
116}
117
118#[allow(clippy::too_many_arguments)]
119fn build_overseer_for_availability_write(
120	spawn_task_handle: SpawnTaskHandle,
121	runtime_api: MockRuntimeApi,
122	(network_bridge_tx, network_bridge_rx): (MockNetworkBridgeTx, MockNetworkBridgeRx),
123	availability_distribution: AvailabilityDistributionSubsystem,
124	chain_api: MockChainApi,
125	availability_store: AvailabilityStoreSubsystem,
126	bitfield_distribution: BitfieldDistribution,
127	dependencies: &TestEnvironmentDependencies,
128) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
129	let overseer_connector = OverseerConnector::with_event_capacity(64000);
130	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
131
132	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
133	let builder = dummy
134		.replace_runtime_api(|_| runtime_api)
135		.replace_availability_store(|_| availability_store)
136		.replace_network_bridge_tx(|_| network_bridge_tx)
137		.replace_network_bridge_rx(|_| network_bridge_rx)
138		.replace_chain_api(|_| chain_api)
139		.replace_bitfield_distribution(|_| bitfield_distribution)
140		// This is needed to test own chunk recovery for `n_cores`.
141		.replace_availability_distribution(|_| availability_distribution);
142
143	let (overseer, raw_handle) =
144		builder.build_with_connector(overseer_connector).expect("Should not fail");
145
146	(overseer, OverseerHandle::new(raw_handle))
147}
148
149pub fn prepare_test(
150	state: &TestState,
151	mode: TestDataAvailability,
152	with_prometheus_endpoint: bool,
153) -> (TestEnvironment, Vec<ProtocolConfig>) {
154	let dependencies = TestEnvironmentDependencies::default();
155
156	let availability_state = NetworkAvailabilityState {
157		candidate_hashes: state.candidate_hashes.clone(),
158		candidate_hash_to_core_index: state.candidate_hash_to_core_index.clone(),
159		available_data: state.available_data.clone(),
160		chunks: state.chunks.clone(),
161		chunk_indices: state.chunk_indices.clone(),
162		req_protocol_names: state.req_protocol_names.clone(),
163	};
164
165	let mut req_cfgs = Vec::new();
166
167	let (collation_req_receiver, collation_req_cfg) = IncomingRequest::get_config_receiver::<
168		Block,
169		sc_network::NetworkWorker<Block, Hash>,
170	>(&state.req_protocol_names);
171	req_cfgs.push(collation_req_cfg);
172
173	let (pov_req_receiver, pov_req_cfg) = IncomingRequest::get_config_receiver::<
174		Block,
175		sc_network::NetworkWorker<Block, Hash>,
176	>(&state.req_protocol_names);
177	req_cfgs.push(pov_req_cfg);
178
179	let (chunk_req_v1_receiver, chunk_req_v1_cfg) =
180		IncomingRequest::<v1::ChunkFetchingRequest>::get_config_receiver::<
181			Block,
182			sc_network::NetworkWorker<Block, Hash>,
183		>(&state.req_protocol_names);
184
185	// We won't use v1 chunk fetching requests, but we need to keep the inbound queue alive.
186	// Otherwise, av-distribution subsystem will terminate.
187	std::mem::forget(chunk_req_v1_cfg);
188
189	let (chunk_req_v2_receiver, chunk_req_v2_cfg) =
190		IncomingRequest::<v2::ChunkFetchingRequest>::get_config_receiver::<
191			Block,
192			sc_network::NetworkWorker<Block, Hash>,
193		>(&state.req_protocol_names);
194
195	let (network, network_interface, network_receiver) = new_network(
196		&state.config,
197		&dependencies,
198		&state.test_authorities,
199		vec![Arc::new(availability_state.clone())],
200	);
201
202	let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
203		network.clone(),
204		network_interface.subsystem_sender(),
205		state.test_authorities.clone(),
206	);
207	let network_bridge_rx =
208		network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_v2_cfg));
209
210	let runtime_api = MockRuntimeApi::new(
211		state.config.clone(),
212		state.test_authorities.clone(),
213		state.candidate_receipts.clone(),
214		Default::default(),
215		Default::default(),
216		0,
217		MockRuntimeApiCoreState::Occupied,
218	);
219
220	let (overseer, overseer_handle) = match &mode {
221		TestDataAvailability::Read(options) => {
222			let subsystem = match options.strategy {
223				Strategy::FullFromBackers =>
224					AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
225						collation_req_receiver,
226						&state.req_protocol_names,
227						Metrics::try_register(&dependencies.registry).unwrap(),
228						RecoveryStrategyKind::BackersFirstAlways,
229					),
230				Strategy::Chunks => AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
231					collation_req_receiver,
232					&state.req_protocol_names,
233					Metrics::try_register(&dependencies.registry).unwrap(),
234					RecoveryStrategyKind::ChunksAlways,
235				),
236				Strategy::Systematic => AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
237					collation_req_receiver,
238					&state.req_protocol_names,
239					Metrics::try_register(&dependencies.registry).unwrap(),
240					RecoveryStrategyKind::SystematicChunks,
241				),
242			};
243
244			// Use a mocked av-store.
245			let av_store = MockAvailabilityStore::new(
246				state.chunks.clone(),
247				state.chunk_indices.clone(),
248				state.candidate_hashes.clone(),
249				state.candidate_hash_to_core_index.clone(),
250			);
251
252			build_overseer_for_availability_read(
253				dependencies.task_manager.spawn_handle(),
254				runtime_api,
255				av_store,
256				(network_bridge_tx, network_bridge_rx),
257				subsystem,
258				&dependencies,
259			)
260		},
261		TestDataAvailability::Write => {
262			let availability_distribution = AvailabilityDistributionSubsystem::new(
263				state.test_authorities.keyring.keystore(),
264				IncomingRequestReceivers {
265					pov_req_receiver,
266					chunk_req_v1_receiver,
267					chunk_req_v2_receiver,
268				},
269				state.req_protocol_names.clone(),
270				Metrics::try_register(&dependencies.registry).unwrap(),
271			);
272
273			let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
274			let chain_api = MockChainApi::new(chain_api_state);
275			let bitfield_distribution =
276				BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap());
277			build_overseer_for_availability_write(
278				dependencies.task_manager.spawn_handle(),
279				runtime_api,
280				(network_bridge_tx, network_bridge_rx),
281				availability_distribution,
282				chain_api,
283				new_av_store(&dependencies),
284				bitfield_distribution,
285				&dependencies,
286			)
287		},
288	};
289
290	(
291		TestEnvironment::new(
292			dependencies,
293			state.config.clone(),
294			network,
295			overseer,
296			overseer_handle,
297			state.test_authorities.clone(),
298			with_prometheus_endpoint,
299		),
300		req_cfgs,
301	)
302}
303
304pub async fn benchmark_availability_read(
305	env: &mut TestEnvironment,
306	state: &TestState,
307) -> BenchmarkUsage {
308	let config = env.config().clone();
309
310	env.metrics().set_n_validators(config.n_validators);
311	env.metrics().set_n_cores(config.n_cores);
312
313	let mut batch = FuturesUnordered::new();
314	let mut availability_bytes = 0u128;
315	let mut candidates = state.candidates.clone();
316	let test_start = Instant::now();
317	for block_info in state.block_infos.iter() {
318		let block_num = block_info.number as usize;
319		gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
320		env.metrics().set_current_block(block_num);
321
322		let block_start_ts = Instant::now();
323		env.import_block(block_info.clone()).await;
324
325		for candidate_num in 0..config.n_cores as u64 {
326			let candidate =
327				candidates.next().expect("We always send up to n_cores*num_blocks; qed");
328			let (tx, rx) = oneshot::channel();
329			batch.push(rx);
330
331			let message = AllMessages::AvailabilityRecovery(
332				AvailabilityRecoveryMessage::RecoverAvailableData(
333					candidate.clone(),
334					1,
335					Some(GroupIndex(
336						candidate_num as u32 % (std::cmp::max(5, config.n_cores) / 5) as u32,
337					)),
338					Some(*state.candidate_hash_to_core_index.get(&candidate.hash()).unwrap()),
339					tx,
340				),
341			);
342			env.send_message(message).await;
343		}
344
345		gum::info!(target: LOG_TARGET, "{}", format!("{} recoveries pending", batch.len()).bright_black());
346		while let Some(completed) = batch.next().await {
347			let available_data = completed.unwrap().unwrap();
348			env.metrics().on_pov_size(available_data.encoded_size());
349			availability_bytes += available_data.encoded_size() as u128;
350		}
351
352		let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
353		env.metrics().set_block_time(block_time);
354		gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{block_time:?}ms").cyan());
355	}
356
357	let duration: u128 = test_start.elapsed().as_millis();
358	let availability_bytes = availability_bytes / 1024;
359	gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
360	gum::info!(target: LOG_TARGET,
361		"Throughput: {}",
362		format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red()
363	);
364	gum::info!(target: LOG_TARGET,
365		"Avg block time: {}",
366		format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
367	);
368
369	env.stop().await;
370	env.collect_resource_usage(&["availability-recovery"], false)
371}
372
373pub async fn benchmark_availability_write(
374	env: &mut TestEnvironment,
375	state: &TestState,
376) -> BenchmarkUsage {
377	let config = env.config().clone();
378
379	env.metrics().set_n_validators(config.n_validators);
380	env.metrics().set_n_cores(config.n_cores);
381
382	gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ...");
383	for (core_index, backed_candidate) in state.backed_candidates.clone().into_iter().enumerate() {
384		let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap();
385		let available_data = state.available_data[candidate_index].clone();
386		let (tx, rx) = oneshot::channel();
387		env.send_message(AllMessages::AvailabilityStore(
388			AvailabilityStoreMessage::StoreAvailableData {
389				candidate_hash: backed_candidate.hash(),
390				n_validators: config.n_validators as u32,
391				available_data,
392				expected_erasure_root: backed_candidate.descriptor().erasure_root(),
393				tx,
394				core_index: CoreIndex(core_index as u32),
395				node_features: default_node_features(),
396			},
397		))
398		.await;
399
400		rx.await
401			.unwrap()
402			.expect("Test candidates are stored nicely in availability store");
403	}
404
405	gum::info!(target: LOG_TARGET, "Done");
406
407	let test_start = Instant::now();
408	for block_info in state.block_infos.iter() {
409		let block_num = block_info.number as usize;
410		gum::info!(target: LOG_TARGET, "Current block #{}", block_num);
411		env.metrics().set_current_block(block_num);
412
413		let block_start_ts = Instant::now();
414		let relay_block_hash = block_info.hash;
415		env.import_block(block_info.clone()).await;
416
417		// Inform bitfield distribution about our view of current test block
418		let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate(
419			NetworkBridgeEvent::OurViewChange(OurView::new(vec![relay_block_hash], 0))
420		);
421		env.send_message(AllMessages::BitfieldDistribution(message)).await;
422
423		let chunk_fetch_start_ts = Instant::now();
424
425		// Request chunks of our own backed candidate from all other validators.
426		let payloads = state.chunk_fetching_requests.get(block_num - 1).expect("pregenerated");
427		let receivers = (1..config.n_validators).filter_map(|index| {
428			let (pending_response, pending_response_receiver) = oneshot::channel();
429
430			let peer_id = *env.authorities().peer_ids.get(index).expect("all validators have ids");
431			let payload = payloads.get(index).expect("pregenerated").clone();
432			let request = RawIncomingRequest { peer: peer_id, payload, pending_response };
433			let peer = env
434				.authorities()
435				.validator_authority_id
436				.get(index)
437				.expect("all validators have keys");
438
439			if env.network().is_peer_connected(peer) &&
440				env.network().send_request_from_peer(peer, request).is_ok()
441			{
442				Some(pending_response_receiver)
443			} else {
444				None
445			}
446		});
447
448		gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ...");
449
450		let responses = futures::future::try_join_all(receivers)
451			.await
452			.expect("Chunk is always served successfully");
453		// TODO: check if chunk is the one the peer expects to receive.
454		assert!(responses.iter().all(|v| v.result.is_ok()));
455
456		let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis();
457		gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration);
458
459		let network = env.network().clone();
460		let authorities = env.authorities().clone();
461
462		// Spawn a task that will generate `n_validator` - 1 signed bitfields and
463		// send them from the emulated peers to the subsystem.
464		// TODO: Implement topology.
465		let messages = state.signed_bitfields.get(&relay_block_hash).expect("pregenerated").clone();
466		for index in 1..config.n_validators {
467			let from_peer = &authorities.validator_authority_id[index];
468			let message = messages.get(index).expect("pregenerated").clone();
469
470			// Send the action from peer only if it is connected to our node.
471			if network.is_peer_connected(from_peer) {
472				let _ = network.send_message_from_peer(from_peer, message);
473			}
474		}
475
476		gum::info!(
477			"Waiting for {} bitfields to be received and processed",
478			config.connected_count()
479		);
480
481		// Wait for all bitfields to be processed.
482		env.wait_until_metric(
483			"polkadot_parachain_received_availability_bitfields_total",
484			None,
485			|value| value == (config.connected_count() * block_num) as f64,
486		)
487		.await;
488
489		gum::info!(target: LOG_TARGET, "All bitfields processed");
490
491		let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
492		env.metrics().set_block_time(block_time);
493		gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{block_time:?}ms").cyan());
494	}
495
496	let duration: u128 = test_start.elapsed().as_millis();
497	gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
498	gum::info!(target: LOG_TARGET,
499		"Avg block time: {}",
500		format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
501	);
502
503	env.stop().await;
504	env.collect_resource_usage(
505		&["availability-distribution", "bitfield-distribution", "availability-store"],
506		false,
507	)
508}