1use crate::{
18 availability::av_store_helpers::new_av_store,
19 dummy_builder,
20 environment::{TestEnvironment, TestEnvironmentDependencies},
21 mock::{
22 av_store::{MockAvailabilityStore, NetworkAvailabilityState},
23 chain_api::{ChainApiState, MockChainApi},
24 network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
25 runtime_api::{default_node_features, MockRuntimeApi, MockRuntimeApiCoreState},
26 AlwaysSupportsParachains,
27 },
28 network::new_network,
29 usage::BenchmarkUsage,
30};
31use colored::Colorize;
32use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
33
34use codec::Encode;
35use polkadot_availability_bitfield_distribution::BitfieldDistribution;
36use polkadot_availability_distribution::{
37 AvailabilityDistributionSubsystem, IncomingRequestReceivers,
38};
39use polkadot_availability_recovery::{AvailabilityRecoverySubsystem, RecoveryStrategyKind};
40use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
41use polkadot_node_metrics::metrics::Metrics;
42use polkadot_node_network_protocol::{
43 request_response::{v1, v2, IncomingRequest},
44 OurView,
45};
46use polkadot_node_subsystem::{
47 messages::{AllMessages, AvailabilityRecoveryMessage},
48 Overseer, OverseerConnector, SpawnGlue,
49};
50use polkadot_node_subsystem_types::messages::{AvailabilityStoreMessage, NetworkBridgeEvent};
51use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
52use polkadot_primitives::{Block, CoreIndex, GroupIndex, Hash};
53use sc_network::request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig};
54use std::{ops::Sub, sync::Arc, time::Instant};
55use strum::Display;
56
57use sc_service::SpawnTaskHandle;
58use serde::{Deserialize, Serialize};
59pub use test_state::TestState;
60
61mod av_store_helpers;
62mod test_state;
63
64const LOG_TARGET: &str = "subsystem-bench::availability";
65
66#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Display)]
67#[value(rename_all = "kebab-case")]
68#[strum(serialize_all = "kebab-case")]
69pub enum Strategy {
70 Chunks,
72 Systematic,
75 FullFromBackers,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
81#[clap(rename_all = "kebab-case")]
82#[allow(missing_docs)]
83pub struct DataAvailabilityReadOptions {
84 #[clap(short, long, default_value_t = Strategy::Systematic)]
85 pub strategy: Strategy,
86}
87
88pub enum TestDataAvailability {
89 Read(DataAvailabilityReadOptions),
90 Write,
91}
92
93fn build_overseer_for_availability_read(
94 spawn_task_handle: SpawnTaskHandle,
95 runtime_api: MockRuntimeApi,
96 av_store: MockAvailabilityStore,
97 (network_bridge_tx, network_bridge_rx): (MockNetworkBridgeTx, MockNetworkBridgeRx),
98 availability_recovery: AvailabilityRecoverySubsystem,
99 dependencies: &TestEnvironmentDependencies,
100) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
101 let overseer_connector = OverseerConnector::with_event_capacity(64000);
102 let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
103
104 let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
105 let builder = dummy
106 .replace_runtime_api(|_| runtime_api)
107 .replace_availability_store(|_| av_store)
108 .replace_network_bridge_tx(|_| network_bridge_tx)
109 .replace_network_bridge_rx(|_| network_bridge_rx)
110 .replace_availability_recovery(|_| availability_recovery);
111
112 let (overseer, raw_handle) =
113 builder.build_with_connector(overseer_connector).expect("Should not fail");
114
115 (overseer, OverseerHandle::new(raw_handle))
116}
117
118#[allow(clippy::too_many_arguments)]
119fn build_overseer_for_availability_write(
120 spawn_task_handle: SpawnTaskHandle,
121 runtime_api: MockRuntimeApi,
122 (network_bridge_tx, network_bridge_rx): (MockNetworkBridgeTx, MockNetworkBridgeRx),
123 availability_distribution: AvailabilityDistributionSubsystem,
124 chain_api: MockChainApi,
125 availability_store: AvailabilityStoreSubsystem,
126 bitfield_distribution: BitfieldDistribution,
127 dependencies: &TestEnvironmentDependencies,
128) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
129 let overseer_connector = OverseerConnector::with_event_capacity(64000);
130 let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
131
132 let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
133 let builder = dummy
134 .replace_runtime_api(|_| runtime_api)
135 .replace_availability_store(|_| availability_store)
136 .replace_network_bridge_tx(|_| network_bridge_tx)
137 .replace_network_bridge_rx(|_| network_bridge_rx)
138 .replace_chain_api(|_| chain_api)
139 .replace_bitfield_distribution(|_| bitfield_distribution)
140 .replace_availability_distribution(|_| availability_distribution);
142
143 let (overseer, raw_handle) =
144 builder.build_with_connector(overseer_connector).expect("Should not fail");
145
146 (overseer, OverseerHandle::new(raw_handle))
147}
148
149pub fn prepare_test(
150 state: &TestState,
151 mode: TestDataAvailability,
152 with_prometheus_endpoint: bool,
153) -> (TestEnvironment, Vec<ProtocolConfig>) {
154 let dependencies = TestEnvironmentDependencies::default();
155
156 let availability_state = NetworkAvailabilityState {
157 candidate_hashes: state.candidate_hashes.clone(),
158 candidate_hash_to_core_index: state.candidate_hash_to_core_index.clone(),
159 available_data: state.available_data.clone(),
160 chunks: state.chunks.clone(),
161 chunk_indices: state.chunk_indices.clone(),
162 req_protocol_names: state.req_protocol_names.clone(),
163 };
164
165 let mut req_cfgs = Vec::new();
166
167 let (collation_req_receiver, collation_req_cfg) = IncomingRequest::get_config_receiver::<
168 Block,
169 sc_network::NetworkWorker<Block, Hash>,
170 >(&state.req_protocol_names);
171 req_cfgs.push(collation_req_cfg);
172
173 let (pov_req_receiver, pov_req_cfg) = IncomingRequest::get_config_receiver::<
174 Block,
175 sc_network::NetworkWorker<Block, Hash>,
176 >(&state.req_protocol_names);
177 req_cfgs.push(pov_req_cfg);
178
179 let (chunk_req_v1_receiver, chunk_req_v1_cfg) =
180 IncomingRequest::<v1::ChunkFetchingRequest>::get_config_receiver::<
181 Block,
182 sc_network::NetworkWorker<Block, Hash>,
183 >(&state.req_protocol_names);
184
185 std::mem::forget(chunk_req_v1_cfg);
188
189 let (chunk_req_v2_receiver, chunk_req_v2_cfg) =
190 IncomingRequest::<v2::ChunkFetchingRequest>::get_config_receiver::<
191 Block,
192 sc_network::NetworkWorker<Block, Hash>,
193 >(&state.req_protocol_names);
194
195 let (network, network_interface, network_receiver) = new_network(
196 &state.config,
197 &dependencies,
198 &state.test_authorities,
199 vec![Arc::new(availability_state.clone())],
200 );
201
202 let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
203 network.clone(),
204 network_interface.subsystem_sender(),
205 state.test_authorities.clone(),
206 );
207 let network_bridge_rx =
208 network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_v2_cfg));
209
210 let runtime_api = MockRuntimeApi::new(
211 state.config.clone(),
212 state.test_authorities.clone(),
213 state.candidate_receipts.clone(),
214 Default::default(),
215 Default::default(),
216 0,
217 MockRuntimeApiCoreState::Occupied,
218 );
219
220 let (overseer, overseer_handle) = match &mode {
221 TestDataAvailability::Read(options) => {
222 let subsystem = match options.strategy {
223 Strategy::FullFromBackers =>
224 AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
225 collation_req_receiver,
226 &state.req_protocol_names,
227 Metrics::try_register(&dependencies.registry).unwrap(),
228 RecoveryStrategyKind::BackersFirstAlways,
229 ),
230 Strategy::Chunks => AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
231 collation_req_receiver,
232 &state.req_protocol_names,
233 Metrics::try_register(&dependencies.registry).unwrap(),
234 RecoveryStrategyKind::ChunksAlways,
235 ),
236 Strategy::Systematic => AvailabilityRecoverySubsystem::with_recovery_strategy_kind(
237 collation_req_receiver,
238 &state.req_protocol_names,
239 Metrics::try_register(&dependencies.registry).unwrap(),
240 RecoveryStrategyKind::SystematicChunks,
241 ),
242 };
243
244 let av_store = MockAvailabilityStore::new(
246 state.chunks.clone(),
247 state.chunk_indices.clone(),
248 state.candidate_hashes.clone(),
249 state.candidate_hash_to_core_index.clone(),
250 );
251
252 build_overseer_for_availability_read(
253 dependencies.task_manager.spawn_handle(),
254 runtime_api,
255 av_store,
256 (network_bridge_tx, network_bridge_rx),
257 subsystem,
258 &dependencies,
259 )
260 },
261 TestDataAvailability::Write => {
262 let availability_distribution = AvailabilityDistributionSubsystem::new(
263 state.test_authorities.keyring.keystore(),
264 IncomingRequestReceivers {
265 pov_req_receiver,
266 chunk_req_v1_receiver,
267 chunk_req_v2_receiver,
268 },
269 state.req_protocol_names.clone(),
270 Metrics::try_register(&dependencies.registry).unwrap(),
271 );
272
273 let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
274 let chain_api = MockChainApi::new(chain_api_state);
275 let bitfield_distribution =
276 BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap());
277 build_overseer_for_availability_write(
278 dependencies.task_manager.spawn_handle(),
279 runtime_api,
280 (network_bridge_tx, network_bridge_rx),
281 availability_distribution,
282 chain_api,
283 new_av_store(&dependencies),
284 bitfield_distribution,
285 &dependencies,
286 )
287 },
288 };
289
290 (
291 TestEnvironment::new(
292 dependencies,
293 state.config.clone(),
294 network,
295 overseer,
296 overseer_handle,
297 state.test_authorities.clone(),
298 with_prometheus_endpoint,
299 ),
300 req_cfgs,
301 )
302}
303
304pub async fn benchmark_availability_read(
305 env: &mut TestEnvironment,
306 state: &TestState,
307) -> BenchmarkUsage {
308 let config = env.config().clone();
309
310 env.metrics().set_n_validators(config.n_validators);
311 env.metrics().set_n_cores(config.n_cores);
312
313 let mut batch = FuturesUnordered::new();
314 let mut availability_bytes = 0u128;
315 let mut candidates = state.candidates.clone();
316 let test_start = Instant::now();
317 for block_info in state.block_infos.iter() {
318 let block_num = block_info.number as usize;
319 gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
320 env.metrics().set_current_block(block_num);
321
322 let block_start_ts = Instant::now();
323 env.import_block(block_info.clone()).await;
324
325 for candidate_num in 0..config.n_cores as u64 {
326 let candidate =
327 candidates.next().expect("We always send up to n_cores*num_blocks; qed");
328 let (tx, rx) = oneshot::channel();
329 batch.push(rx);
330
331 let message = AllMessages::AvailabilityRecovery(
332 AvailabilityRecoveryMessage::RecoverAvailableData(
333 candidate.clone(),
334 1,
335 Some(GroupIndex(
336 candidate_num as u32 % (std::cmp::max(5, config.n_cores) / 5) as u32,
337 )),
338 Some(*state.candidate_hash_to_core_index.get(&candidate.hash()).unwrap()),
339 tx,
340 ),
341 );
342 env.send_message(message).await;
343 }
344
345 gum::info!(target: LOG_TARGET, "{}", format!("{} recoveries pending", batch.len()).bright_black());
346 while let Some(completed) = batch.next().await {
347 let available_data = completed.unwrap().unwrap();
348 env.metrics().on_pov_size(available_data.encoded_size());
349 availability_bytes += available_data.encoded_size() as u128;
350 }
351
352 let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
353 env.metrics().set_block_time(block_time);
354 gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{block_time:?}ms").cyan());
355 }
356
357 let duration: u128 = test_start.elapsed().as_millis();
358 let availability_bytes = availability_bytes / 1024;
359 gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
360 gum::info!(target: LOG_TARGET,
361 "Throughput: {}",
362 format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red()
363 );
364 gum::info!(target: LOG_TARGET,
365 "Avg block time: {}",
366 format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
367 );
368
369 env.stop().await;
370 env.collect_resource_usage(&["availability-recovery"], false)
371}
372
373pub async fn benchmark_availability_write(
374 env: &mut TestEnvironment,
375 state: &TestState,
376) -> BenchmarkUsage {
377 let config = env.config().clone();
378
379 env.metrics().set_n_validators(config.n_validators);
380 env.metrics().set_n_cores(config.n_cores);
381
382 gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ...");
383 for (core_index, backed_candidate) in state.backed_candidates.clone().into_iter().enumerate() {
384 let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap();
385 let available_data = state.available_data[candidate_index].clone();
386 let (tx, rx) = oneshot::channel();
387 env.send_message(AllMessages::AvailabilityStore(
388 AvailabilityStoreMessage::StoreAvailableData {
389 candidate_hash: backed_candidate.hash(),
390 n_validators: config.n_validators as u32,
391 available_data,
392 expected_erasure_root: backed_candidate.descriptor().erasure_root(),
393 tx,
394 core_index: CoreIndex(core_index as u32),
395 node_features: default_node_features(),
396 },
397 ))
398 .await;
399
400 rx.await
401 .unwrap()
402 .expect("Test candidates are stored nicely in availability store");
403 }
404
405 gum::info!(target: LOG_TARGET, "Done");
406
407 let test_start = Instant::now();
408 for block_info in state.block_infos.iter() {
409 let block_num = block_info.number as usize;
410 gum::info!(target: LOG_TARGET, "Current block #{}", block_num);
411 env.metrics().set_current_block(block_num);
412
413 let block_start_ts = Instant::now();
414 let relay_block_hash = block_info.hash;
415 env.import_block(block_info.clone()).await;
416
417 let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate(
419 NetworkBridgeEvent::OurViewChange(OurView::new(vec![relay_block_hash], 0))
420 );
421 env.send_message(AllMessages::BitfieldDistribution(message)).await;
422
423 let chunk_fetch_start_ts = Instant::now();
424
425 let payloads = state.chunk_fetching_requests.get(block_num - 1).expect("pregenerated");
427 let receivers = (1..config.n_validators).filter_map(|index| {
428 let (pending_response, pending_response_receiver) = oneshot::channel();
429
430 let peer_id = *env.authorities().peer_ids.get(index).expect("all validators have ids");
431 let payload = payloads.get(index).expect("pregenerated").clone();
432 let request = RawIncomingRequest { peer: peer_id, payload, pending_response };
433 let peer = env
434 .authorities()
435 .validator_authority_id
436 .get(index)
437 .expect("all validators have keys");
438
439 if env.network().is_peer_connected(peer) &&
440 env.network().send_request_from_peer(peer, request).is_ok()
441 {
442 Some(pending_response_receiver)
443 } else {
444 None
445 }
446 });
447
448 gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ...");
449
450 let responses = futures::future::try_join_all(receivers)
451 .await
452 .expect("Chunk is always served successfully");
453 assert!(responses.iter().all(|v| v.result.is_ok()));
455
456 let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis();
457 gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration);
458
459 let network = env.network().clone();
460 let authorities = env.authorities().clone();
461
462 let messages = state.signed_bitfields.get(&relay_block_hash).expect("pregenerated").clone();
466 for index in 1..config.n_validators {
467 let from_peer = &authorities.validator_authority_id[index];
468 let message = messages.get(index).expect("pregenerated").clone();
469
470 if network.is_peer_connected(from_peer) {
472 let _ = network.send_message_from_peer(from_peer, message);
473 }
474 }
475
476 gum::info!(
477 "Waiting for {} bitfields to be received and processed",
478 config.connected_count()
479 );
480
481 env.wait_until_metric(
483 "polkadot_parachain_received_availability_bitfields_total",
484 None,
485 |value| value == (config.connected_count() * block_num) as f64,
486 )
487 .await;
488
489 gum::info!(target: LOG_TARGET, "All bitfields processed");
490
491 let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
492 env.metrics().set_block_time(block_time);
493 gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{block_time:?}ms").cyan());
494 }
495
496 let duration: u128 = test_start.elapsed().as_millis();
497 gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
498 gum::info!(target: LOG_TARGET,
499 "Avg block time: {}",
500 format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
501 );
502
503 env.stop().await;
504 env.collect_resource_usage(
505 &["availability-distribution", "bitfield-distribution", "availability-store"],
506 false,
507 )
508}