1use crate::{
18 configuration::TestAuthorities,
19 dummy_builder,
20 environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
21 mock::{
22 candidate_backing::MockCandidateBacking,
23 chain_api::{ChainApiState, MockChainApi},
24 network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
25 prospective_parachains::MockProspectiveParachains,
26 runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
27 AlwaysSupportsParachains,
28 },
29 network::{new_network, NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver},
30 usage::BenchmarkUsage,
31 NODE_UNDER_TEST,
32};
33use bitvec::vec::BitVec;
34use colored::Colorize;
35use itertools::Itertools;
36use polkadot_node_metrics::metrics::Metrics;
37use polkadot_node_network_protocol::{
38 grid_topology::{SessionGridTopology, TopologyPeerInfo},
39 request_response::{IncomingRequest, ReqProtocolNames},
40 v3::{self, BackedCandidateManifest, StatementFilter},
41 view, ValidationProtocols, View,
42};
43use polkadot_node_subsystem::messages::{
44 network_bridge_event::NewGossipTopology, AllMessages, NetworkBridgeEvent,
45 StatementDistributionMessage,
46};
47use polkadot_overseer::{
48 Handle as OverseerHandle, Overseer, OverseerConnector, OverseerMetrics, SpawnGlue,
49};
50use polkadot_primitives::{
51 AuthorityDiscoveryId, Block, GroupIndex, Hash, Id, ValidatorId, ValidatorIndex,
52};
53use polkadot_statement_distribution::StatementDistributionSubsystem;
54use sc_keystore::LocalKeystore;
55use sc_network_types::PeerId;
56use sc_service::SpawnTaskHandle;
57use sp_keystore::{Keystore, KeystorePtr};
58use sp_runtime::RuntimeAppPublic;
59use std::{
60 sync::{atomic::Ordering, Arc},
61 time::{Duration, Instant},
62};
63pub use test_state::TestState;
64
65mod test_state;
66
67const LOG_TARGET: &str = "subsystem-bench::statement";
68
69pub fn make_keystore() -> KeystorePtr {
70 let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());
71 Keystore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some("//Node0"))
72 .expect("Insert key into keystore");
73 Keystore::sr25519_generate_new(&*keystore, AuthorityDiscoveryId::ID, Some("//Node0"))
74 .expect("Insert key into keystore");
75 keystore
76}
77
78fn build_overseer(
79 state: &TestState,
80 network: NetworkEmulatorHandle,
81 network_interface: NetworkInterface,
82 network_receiver: NetworkInterfaceReceiver,
83 dependencies: &TestEnvironmentDependencies,
84) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
85 let overseer_connector = OverseerConnector::with_event_capacity(64000);
86 let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
87 let spawn_task_handle = dependencies.task_manager.spawn_handle();
88 let mock_runtime_api = MockRuntimeApi::new(
89 state.config.clone(),
90 state.test_authorities.clone(),
91 state.candidate_receipts.clone(),
92 Default::default(),
93 Default::default(),
94 0,
95 MockRuntimeApiCoreState::Scheduled,
96 );
97 let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
98 let mock_chain_api = MockChainApi::new(chain_api_state);
99 let mock_prospective_parachains = MockProspectiveParachains::new();
100 let mock_candidate_backing = MockCandidateBacking::new(
101 state.config.clone(),
102 state
103 .test_authorities
104 .validator_pairs
105 .get(NODE_UNDER_TEST as usize)
106 .unwrap()
107 .clone(),
108 state.pvd.clone(),
109 state.own_backing_group.clone(),
110 );
111 let (candidate_req_receiver, candidate_req_cfg) =
112 IncomingRequest::get_config_receiver::<Block, sc_network::NetworkWorker<Block, Hash>>(
113 &ReqProtocolNames::new(GENESIS_HASH, None),
114 );
115 let keystore = make_keystore();
116 let subsystem = StatementDistributionSubsystem::new(
117 keystore.clone(),
118 candidate_req_receiver,
119 Metrics::try_register(&dependencies.registry).unwrap(),
120 );
121 let network_bridge_tx = MockNetworkBridgeTx::new(
122 network,
123 network_interface.subsystem_sender(),
124 state.test_authorities.clone(),
125 );
126 let network_bridge_rx = MockNetworkBridgeRx::new(network_receiver, Some(candidate_req_cfg));
127
128 let dummy = dummy_builder!(spawn_task_handle, overseer_metrics)
129 .replace_runtime_api(|_| mock_runtime_api)
130 .replace_chain_api(|_| mock_chain_api)
131 .replace_prospective_parachains(|_| mock_prospective_parachains)
132 .replace_candidate_backing(|_| mock_candidate_backing)
133 .replace_statement_distribution(|_| subsystem)
134 .replace_network_bridge_tx(|_| network_bridge_tx)
135 .replace_network_bridge_rx(|_| network_bridge_rx);
136 let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).unwrap();
137 let overseer_handle = OverseerHandle::new(raw_handle);
138
139 (overseer, overseer_handle)
140}
141
142pub fn prepare_test(state: &TestState, with_prometheus_endpoint: bool) -> TestEnvironment {
143 let dependencies = TestEnvironmentDependencies::default();
144 let (network, network_interface, network_receiver) = new_network(
145 &state.config,
146 &dependencies,
147 &state.test_authorities,
148 vec![Arc::new(state.clone())],
149 );
150 let (overseer, overseer_handle) =
151 build_overseer(state, network.clone(), network_interface, network_receiver, &dependencies);
152
153 TestEnvironment::new(
154 dependencies,
155 state.config.clone(),
156 network,
157 overseer,
158 overseer_handle,
159 state.test_authorities.clone(),
160 with_prometheus_endpoint,
161 )
162}
163
164pub fn generate_peer_view_change(block_hash: Hash, peer_id: PeerId) -> AllMessages {
165 let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
166
167 AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(network))
168}
169
170pub fn generate_new_session_topology(
171 topology: &SessionGridTopology,
172 test_node: ValidatorIndex,
173) -> Vec<AllMessages> {
174 let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
175 session: 0,
176 topology: topology.clone(),
177 local_index: Some(test_node),
178 });
179 vec![AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(
180 event,
181 ))]
182}
183
184pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
186 let keyrings = test_authorities
187 .validator_authority_id
188 .clone()
189 .into_iter()
190 .zip(test_authorities.peer_ids.clone())
191 .collect_vec();
192
193 let topology = keyrings
194 .clone()
195 .into_iter()
196 .enumerate()
197 .map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
198 peer_ids: vec![peer_id],
199 validator_index: ValidatorIndex(index as u32),
200 discovery_id,
201 })
202 .collect_vec();
203 let shuffled = (0..keyrings.len()).collect_vec();
204
205 SessionGridTopology::new(shuffled, topology)
206}
207
208pub async fn benchmark_statement_distribution(
209 env: &mut TestEnvironment,
210 state: &TestState,
211) -> BenchmarkUsage {
212 state.reset_trackers();
213
214 let connected_validators = state
215 .test_authorities
216 .validator_authority_id
217 .iter()
218 .enumerate()
219 .filter_map(|(i, id)| if env.network().is_peer_connected(id) { Some(i) } else { None })
220 .collect_vec();
221 let seconding_validator_in_own_backing_group = state
222 .own_backing_group
223 .iter()
224 .find(|v| connected_validators.contains(&(v.0 as usize)))
225 .unwrap()
226 .to_owned();
227
228 let config = env.config().clone();
229 let groups = state.session_info.validator_groups.clone();
230 let own_backing_group_index = groups
231 .iter()
232 .position(|group| group.iter().any(|v| v.0 == NODE_UNDER_TEST))
233 .unwrap();
234
235 env.metrics().set_n_validators(config.n_validators);
236 env.metrics().set_n_cores(config.n_cores);
237
238 let topology = generate_topology(&state.test_authorities);
239 let peer_connected_messages = env.network().generate_peer_connected(|e| {
240 AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(e))
241 });
242 let new_session_topology_messages =
243 generate_new_session_topology(&topology, ValidatorIndex(NODE_UNDER_TEST));
244 for message in peer_connected_messages.into_iter().chain(new_session_topology_messages) {
245 env.send_message(message).await;
246 }
247
248 let test_start = Instant::now();
249 let mut candidates_advertised = 0;
250 for block_info in state.block_infos.iter() {
251 let block_num = block_info.number as usize;
252 gum::info!(target: LOG_TARGET, "Current block {}/{} {:?}", block_num, config.num_blocks, block_info.hash);
253 env.metrics().set_current_block(block_num);
254 env.import_block(block_info.clone()).await;
255
256 for peer_view_change in env
257 .network()
258 .generate_statement_distribution_peer_view_change(view![block_info.hash])
259 {
260 env.send_message(peer_view_change).await;
261 }
262
263 let seconding_peer_id = *state
264 .test_authorities
265 .peer_ids
266 .get(seconding_validator_in_own_backing_group.0 as usize)
267 .unwrap();
268 let candidate = state.candidate_receipts.get(&block_info.hash).unwrap().first().unwrap();
269 let candidate_hash = candidate.hash();
270 let statement = state
271 .statements
272 .get(&candidate_hash)
273 .unwrap()
274 .get(seconding_validator_in_own_backing_group.0 as usize)
275 .unwrap()
276 .clone();
277 let message = AllMessages::StatementDistribution(
278 StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
279 seconding_peer_id,
280 ValidationProtocols::V3(v3::StatementDistributionMessage::Statement(
281 block_info.hash,
282 statement,
283 )),
284 )),
285 );
286 env.send_message(message).await;
287
288 let max_messages_per_candidate = state.config.max_candidate_depth + 1;
289 let mut messages_tracker = (0..groups.len())
291 .map(|i| if i == own_backing_group_index { max_messages_per_candidate } else { 0 })
292 .collect_vec();
293
294 let neighbors =
295 topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
296 let connected_neighbors_x = neighbors
297 .validator_indices_x
298 .iter()
299 .filter(|&v| connected_validators.contains(&(v.0 as usize)))
300 .cloned()
301 .collect_vec();
302 let connected_neighbors_y = neighbors
303 .validator_indices_y
304 .iter()
305 .filter(|&v| connected_validators.contains(&(v.0 as usize)))
306 .cloned()
307 .collect_vec();
308 let one_hop_peers_and_groups = connected_neighbors_x
309 .iter()
310 .chain(connected_neighbors_y.iter())
311 .map(|validator_index| {
312 let peer_id =
313 *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
314 let group_index =
315 groups.iter().position(|group| group.contains(validator_index)).unwrap();
316 (peer_id, group_index)
317 })
318 .collect_vec();
319 let two_hop_x_peers_and_groups = connected_neighbors_x
320 .iter()
321 .flat_map(|validator_index| {
322 let peer_id =
323 *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
324 topology
325 .compute_grid_neighbors_for(*validator_index)
326 .unwrap()
327 .validator_indices_y
328 .iter()
329 .map(|validator_neighbor| {
330 let group_index = groups
331 .iter()
332 .position(|group| group.contains(validator_neighbor))
333 .unwrap();
334 (peer_id, group_index)
335 })
336 .collect_vec()
337 })
338 .collect_vec();
339 let two_hop_y_peers_and_groups = connected_neighbors_y
340 .iter()
341 .flat_map(|validator_index| {
342 let peer_id =
343 *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
344 topology
345 .compute_grid_neighbors_for(*validator_index)
346 .unwrap()
347 .validator_indices_x
348 .iter()
349 .map(|validator_neighbor| {
350 let group_index = groups
351 .iter()
352 .position(|group| group.contains(validator_neighbor))
353 .unwrap();
354 (peer_id, group_index)
355 })
356 .collect_vec()
357 })
358 .collect_vec();
359
360 for (seconding_peer_id, group_index) in one_hop_peers_and_groups
361 .into_iter()
362 .chain(two_hop_x_peers_and_groups)
363 .chain(two_hop_y_peers_and_groups)
364 {
365 let messages_sent_count = messages_tracker.get_mut(group_index).unwrap();
366 if *messages_sent_count == max_messages_per_candidate {
367 continue
368 }
369 *messages_sent_count += 1;
370
371 let candidate_hash = state
372 .candidate_receipts
373 .get(&block_info.hash)
374 .unwrap()
375 .get(group_index)
376 .unwrap()
377 .hash();
378 let manifest = BackedCandidateManifest {
379 relay_parent: block_info.hash,
380 candidate_hash,
381 group_index: GroupIndex(group_index as u32),
382 para_id: Id::new(group_index as u32 + 1),
383 parent_head_data_hash: state.pvd.parent_head.hash(),
384 statement_knowledge: StatementFilter {
385 seconded_in_group: BitVec::from_iter(
386 groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| true),
387 ),
388 validated_in_group: BitVec::from_iter(
389 groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| false),
390 ),
391 },
392 };
393 let message = AllMessages::StatementDistribution(
394 StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
395 seconding_peer_id,
396 ValidationProtocols::V3(
397 v3::StatementDistributionMessage::BackedCandidateManifest(manifest),
398 ),
399 )),
400 );
401 env.send_message(message).await;
402 }
403
404 candidates_advertised += messages_tracker.iter().filter(|&&v| v > 0).collect_vec().len();
405
406 loop {
407 let manifests_count = state
408 .manifests_tracker
409 .values()
410 .filter(|v| v.load(Ordering::SeqCst))
411 .collect::<Vec<_>>()
412 .len();
413 gum::debug!(target: LOG_TARGET, "{}/{} manifest exchanges", manifests_count, candidates_advertised);
414
415 if manifests_count == candidates_advertised {
416 break;
417 }
418 tokio::time::sleep(Duration::from_millis(50)).await;
419 }
420 }
421
422 let duration: u128 = test_start.elapsed().as_millis();
423 gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
424 gum::info!(target: LOG_TARGET,
425 "Avg block time: {}",
426 format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
427 );
428
429 env.stop().await;
430 env.collect_resource_usage(&["statement-distribution"], false)
431}