referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/statement/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::TestAuthorities,
19	dummy_builder,
20	environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
21	mock::{
22		candidate_backing::MockCandidateBacking,
23		chain_api::{ChainApiState, MockChainApi},
24		network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
25		prospective_parachains::MockProspectiveParachains,
26		runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
27		AlwaysSupportsParachains,
28	},
29	network::{new_network, NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver},
30	usage::BenchmarkUsage,
31	NODE_UNDER_TEST,
32};
33use bitvec::vec::BitVec;
34use colored::Colorize;
35use itertools::Itertools;
36use polkadot_node_metrics::metrics::Metrics;
37use polkadot_node_network_protocol::{
38	grid_topology::{SessionGridTopology, TopologyPeerInfo},
39	request_response::{IncomingRequest, ReqProtocolNames},
40	v3::{self, BackedCandidateManifest, StatementFilter},
41	view, ValidationProtocols, View,
42};
43use polkadot_node_subsystem::messages::{
44	network_bridge_event::NewGossipTopology, AllMessages, NetworkBridgeEvent,
45	StatementDistributionMessage,
46};
47use polkadot_overseer::{
48	Handle as OverseerHandle, Overseer, OverseerConnector, OverseerMetrics, SpawnGlue,
49};
50use polkadot_primitives::{
51	AuthorityDiscoveryId, Block, GroupIndex, Hash, Id, ValidatorId, ValidatorIndex,
52};
53use polkadot_statement_distribution::StatementDistributionSubsystem;
54use sc_keystore::LocalKeystore;
55use sc_network_types::PeerId;
56use sc_service::SpawnTaskHandle;
57use sp_keystore::{Keystore, KeystorePtr};
58use sp_runtime::RuntimeAppPublic;
59use std::{
60	sync::{atomic::Ordering, Arc},
61	time::{Duration, Instant},
62};
63pub use test_state::TestState;
64
65mod test_state;
66
67const LOG_TARGET: &str = "subsystem-bench::statement";
68
69pub fn make_keystore() -> KeystorePtr {
70	let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());
71	Keystore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some("//Node0"))
72		.expect("Insert key into keystore");
73	Keystore::sr25519_generate_new(&*keystore, AuthorityDiscoveryId::ID, Some("//Node0"))
74		.expect("Insert key into keystore");
75	keystore
76}
77
78fn build_overseer(
79	state: &TestState,
80	network: NetworkEmulatorHandle,
81	network_interface: NetworkInterface,
82	network_receiver: NetworkInterfaceReceiver,
83	dependencies: &TestEnvironmentDependencies,
84) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
85	let overseer_connector = OverseerConnector::with_event_capacity(64000);
86	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
87	let spawn_task_handle = dependencies.task_manager.spawn_handle();
88	let mock_runtime_api = MockRuntimeApi::new(
89		state.config.clone(),
90		state.test_authorities.clone(),
91		state.candidate_receipts.clone(),
92		Default::default(),
93		Default::default(),
94		0,
95		MockRuntimeApiCoreState::Scheduled,
96	);
97	let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
98	let mock_chain_api = MockChainApi::new(chain_api_state);
99	let mock_prospective_parachains = MockProspectiveParachains::new();
100	let mock_candidate_backing = MockCandidateBacking::new(
101		state.config.clone(),
102		state
103			.test_authorities
104			.validator_pairs
105			.get(NODE_UNDER_TEST as usize)
106			.unwrap()
107			.clone(),
108		state.pvd.clone(),
109		state.own_backing_group.clone(),
110	);
111	let (candidate_req_receiver, candidate_req_cfg) =
112		IncomingRequest::get_config_receiver::<Block, sc_network::NetworkWorker<Block, Hash>>(
113			&ReqProtocolNames::new(GENESIS_HASH, None),
114		);
115	let keystore = make_keystore();
116	let subsystem = StatementDistributionSubsystem::new(
117		keystore.clone(),
118		candidate_req_receiver,
119		Metrics::try_register(&dependencies.registry).unwrap(),
120	);
121	let network_bridge_tx = MockNetworkBridgeTx::new(
122		network,
123		network_interface.subsystem_sender(),
124		state.test_authorities.clone(),
125	);
126	let network_bridge_rx = MockNetworkBridgeRx::new(network_receiver, Some(candidate_req_cfg));
127
128	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics)
129		.replace_runtime_api(|_| mock_runtime_api)
130		.replace_chain_api(|_| mock_chain_api)
131		.replace_prospective_parachains(|_| mock_prospective_parachains)
132		.replace_candidate_backing(|_| mock_candidate_backing)
133		.replace_statement_distribution(|_| subsystem)
134		.replace_network_bridge_tx(|_| network_bridge_tx)
135		.replace_network_bridge_rx(|_| network_bridge_rx);
136	let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).unwrap();
137	let overseer_handle = OverseerHandle::new(raw_handle);
138
139	(overseer, overseer_handle)
140}
141
142pub fn prepare_test(state: &TestState, with_prometheus_endpoint: bool) -> TestEnvironment {
143	let dependencies = TestEnvironmentDependencies::default();
144	let (network, network_interface, network_receiver) = new_network(
145		&state.config,
146		&dependencies,
147		&state.test_authorities,
148		vec![Arc::new(state.clone())],
149	);
150	let (overseer, overseer_handle) =
151		build_overseer(state, network.clone(), network_interface, network_receiver, &dependencies);
152
153	TestEnvironment::new(
154		dependencies,
155		state.config.clone(),
156		network,
157		overseer,
158		overseer_handle,
159		state.test_authorities.clone(),
160		with_prometheus_endpoint,
161	)
162}
163
164pub fn generate_peer_view_change(block_hash: Hash, peer_id: PeerId) -> AllMessages {
165	let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
166
167	AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(network))
168}
169
170pub fn generate_new_session_topology(
171	topology: &SessionGridTopology,
172	test_node: ValidatorIndex,
173) -> Vec<AllMessages> {
174	let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
175		session: 0,
176		topology: topology.clone(),
177		local_index: Some(test_node),
178	});
179	vec![AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(
180		event,
181	))]
182}
183
184/// Generates a topology to be used for this benchmark.
185pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
186	let keyrings = test_authorities
187		.validator_authority_id
188		.clone()
189		.into_iter()
190		.zip(test_authorities.peer_ids.clone())
191		.collect_vec();
192
193	let topology = keyrings
194		.clone()
195		.into_iter()
196		.enumerate()
197		.map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
198			peer_ids: vec![peer_id],
199			validator_index: ValidatorIndex(index as u32),
200			discovery_id,
201		})
202		.collect_vec();
203	let shuffled = (0..keyrings.len()).collect_vec();
204
205	SessionGridTopology::new(shuffled, topology)
206}
207
208pub async fn benchmark_statement_distribution(
209	env: &mut TestEnvironment,
210	state: &TestState,
211) -> BenchmarkUsage {
212	state.reset_trackers();
213
214	let connected_validators = state
215		.test_authorities
216		.validator_authority_id
217		.iter()
218		.enumerate()
219		.filter_map(|(i, id)| if env.network().is_peer_connected(id) { Some(i) } else { None })
220		.collect_vec();
221	let seconding_validator_in_own_backing_group = state
222		.own_backing_group
223		.iter()
224		.find(|v| connected_validators.contains(&(v.0 as usize)))
225		.unwrap()
226		.to_owned();
227
228	let config = env.config().clone();
229	let groups = state.session_info.validator_groups.clone();
230	let own_backing_group_index = groups
231		.iter()
232		.position(|group| group.iter().any(|v| v.0 == NODE_UNDER_TEST))
233		.unwrap();
234
235	env.metrics().set_n_validators(config.n_validators);
236	env.metrics().set_n_cores(config.n_cores);
237
238	let topology = generate_topology(&state.test_authorities);
239	let peer_connected_messages = env.network().generate_peer_connected(|e| {
240		AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(e))
241	});
242	let new_session_topology_messages =
243		generate_new_session_topology(&topology, ValidatorIndex(NODE_UNDER_TEST));
244	for message in peer_connected_messages.into_iter().chain(new_session_topology_messages) {
245		env.send_message(message).await;
246	}
247
248	let test_start = Instant::now();
249	let mut candidates_advertised = 0;
250	for block_info in state.block_infos.iter() {
251		let block_num = block_info.number as usize;
252		gum::info!(target: LOG_TARGET, "Current block {}/{} {:?}", block_num, config.num_blocks, block_info.hash);
253		env.metrics().set_current_block(block_num);
254		env.import_block(block_info.clone()).await;
255
256		for peer_view_change in env
257			.network()
258			.generate_statement_distribution_peer_view_change(view![block_info.hash])
259		{
260			env.send_message(peer_view_change).await;
261		}
262
263		let seconding_peer_id = *state
264			.test_authorities
265			.peer_ids
266			.get(seconding_validator_in_own_backing_group.0 as usize)
267			.unwrap();
268		let candidate = state.candidate_receipts.get(&block_info.hash).unwrap().first().unwrap();
269		let candidate_hash = candidate.hash();
270		let statement = state
271			.statements
272			.get(&candidate_hash)
273			.unwrap()
274			.get(seconding_validator_in_own_backing_group.0 as usize)
275			.unwrap()
276			.clone();
277		let message = AllMessages::StatementDistribution(
278			StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
279				seconding_peer_id,
280				ValidationProtocols::V3(v3::StatementDistributionMessage::Statement(
281					block_info.hash,
282					statement,
283				)),
284			)),
285		);
286		env.send_message(message).await;
287
288		let max_messages_per_candidate = state.config.max_candidate_depth + 1;
289		// One was just sent for the own backing group
290		let mut messages_tracker = (0..groups.len())
291			.map(|i| if i == own_backing_group_index { max_messages_per_candidate } else { 0 })
292			.collect_vec();
293
294		let neighbors =
295			topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
296		let connected_neighbors_x = neighbors
297			.validator_indices_x
298			.iter()
299			.filter(|&v| connected_validators.contains(&(v.0 as usize)))
300			.cloned()
301			.collect_vec();
302		let connected_neighbors_y = neighbors
303			.validator_indices_y
304			.iter()
305			.filter(|&v| connected_validators.contains(&(v.0 as usize)))
306			.cloned()
307			.collect_vec();
308		let one_hop_peers_and_groups = connected_neighbors_x
309			.iter()
310			.chain(connected_neighbors_y.iter())
311			.map(|validator_index| {
312				let peer_id =
313					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
314				let group_index =
315					groups.iter().position(|group| group.contains(validator_index)).unwrap();
316				(peer_id, group_index)
317			})
318			.collect_vec();
319		let two_hop_x_peers_and_groups = connected_neighbors_x
320			.iter()
321			.flat_map(|validator_index| {
322				let peer_id =
323					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
324				topology
325					.compute_grid_neighbors_for(*validator_index)
326					.unwrap()
327					.validator_indices_y
328					.iter()
329					.map(|validator_neighbor| {
330						let group_index = groups
331							.iter()
332							.position(|group| group.contains(validator_neighbor))
333							.unwrap();
334						(peer_id, group_index)
335					})
336					.collect_vec()
337			})
338			.collect_vec();
339		let two_hop_y_peers_and_groups = connected_neighbors_y
340			.iter()
341			.flat_map(|validator_index| {
342				let peer_id =
343					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
344				topology
345					.compute_grid_neighbors_for(*validator_index)
346					.unwrap()
347					.validator_indices_x
348					.iter()
349					.map(|validator_neighbor| {
350						let group_index = groups
351							.iter()
352							.position(|group| group.contains(validator_neighbor))
353							.unwrap();
354						(peer_id, group_index)
355					})
356					.collect_vec()
357			})
358			.collect_vec();
359
360		for (seconding_peer_id, group_index) in one_hop_peers_and_groups
361			.into_iter()
362			.chain(two_hop_x_peers_and_groups)
363			.chain(two_hop_y_peers_and_groups)
364		{
365			let messages_sent_count = messages_tracker.get_mut(group_index).unwrap();
366			if *messages_sent_count == max_messages_per_candidate {
367				continue
368			}
369			*messages_sent_count += 1;
370
371			let candidate_hash = state
372				.candidate_receipts
373				.get(&block_info.hash)
374				.unwrap()
375				.get(group_index)
376				.unwrap()
377				.hash();
378			let manifest = BackedCandidateManifest {
379				relay_parent: block_info.hash,
380				candidate_hash,
381				group_index: GroupIndex(group_index as u32),
382				para_id: Id::new(group_index as u32 + 1),
383				parent_head_data_hash: state.pvd.parent_head.hash(),
384				statement_knowledge: StatementFilter {
385					seconded_in_group: BitVec::from_iter(
386						groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| true),
387					),
388					validated_in_group: BitVec::from_iter(
389						groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| false),
390					),
391				},
392			};
393			let message = AllMessages::StatementDistribution(
394				StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
395					seconding_peer_id,
396					ValidationProtocols::V3(
397						v3::StatementDistributionMessage::BackedCandidateManifest(manifest),
398					),
399				)),
400			);
401			env.send_message(message).await;
402		}
403
404		candidates_advertised += messages_tracker.iter().filter(|&&v| v > 0).collect_vec().len();
405
406		loop {
407			let manifests_count = state
408				.manifests_tracker
409				.values()
410				.filter(|v| v.load(Ordering::SeqCst))
411				.collect::<Vec<_>>()
412				.len();
413			gum::debug!(target: LOG_TARGET, "{}/{} manifest exchanges", manifests_count, candidates_advertised);
414
415			if manifests_count == candidates_advertised {
416				break;
417			}
418			tokio::time::sleep(Duration::from_millis(50)).await;
419		}
420	}
421
422	let duration: u128 = test_start.elapsed().as_millis();
423	gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
424	gum::info!(target: LOG_TARGET,
425		"Avg block time: {}",
426		format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
427	);
428
429	env.stop().await;
430	env.collect_resource_usage(&["statement-distribution"], false)
431}