polkadot_subsystem_bench/disputes/
mod.rs1use crate::{
33 dummy_builder,
34 environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
35 mock::{
36 approval_voting_parallel::MockApprovalVotingParallel,
37 availability_recovery::MockAvailabilityRecovery,
38 candidate_validation::MockCandidateValidation,
39 chain_api::{ChainApiState, MockChainApi},
40 network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
41 runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
42 AlwaysSupportsParachains,
43 },
44 network::{new_network, NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver},
45 usage::BenchmarkUsage,
46};
47use codec::Encode;
48use colored::Colorize;
49use polkadot_dispute_distribution::DisputeDistributionSubsystem;
50use polkadot_node_core_dispute_coordinator::{
51 Config as DisputeCoordinatorConfig, DisputeCoordinatorSubsystem,
52};
53use polkadot_node_metrics::metrics::Metrics;
54use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames};
55use polkadot_overseer::{
56 Handle as OverseerHandle, Overseer, OverseerConnector, OverseerMetrics, SpawnGlue,
57};
58use polkadot_primitives::{AuthorityDiscoveryId, Block, Hash, ValidatorId};
59use sc_keystore::LocalKeystore;
60use sc_network::request_responses::IncomingRequest as RawIncomingRequest;
61use sc_service::SpawnTaskHandle;
62use serde::{Deserialize, Serialize};
63use sp_keystore::Keystore;
64use sp_runtime::RuntimeAppPublic;
65use std::{sync::Arc, time::Instant};
66pub use test_state::TestState;
67
68mod test_state;
69
70const LOG_TARGET: &str = "subsystem-bench::disputes";
71
72#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
74#[clap(rename_all = "kebab-case")]
75#[allow(missing_docs)]
76pub struct DisputesOptions {
77 #[clap(short, long, default_value_t = 10)]
78 pub n_disputes: u32,
80}
81
82pub fn make_keystore() -> Arc<LocalKeystore> {
83 let keystore = Arc::new(LocalKeystore::in_memory());
84 Keystore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some("//Node0"))
85 .expect("Insert key into keystore");
86 Keystore::sr25519_generate_new(&*keystore, AuthorityDiscoveryId::ID, Some("//Node0"))
87 .expect("Insert key into keystore");
88 keystore
89}
90
91fn build_overseer(
92 state: &TestState,
93 network: NetworkEmulatorHandle,
94 network_interface: NetworkInterface,
95 network_receiver: NetworkInterfaceReceiver,
96 dependencies: &TestEnvironmentDependencies,
97) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
98 let overseer_connector = OverseerConnector::with_event_capacity(64000);
99 let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
100 let spawn_task_handle = dependencies.task_manager.spawn_handle();
101
102 let db = kvdb_memorydb::create(1);
103 let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
104 let store = Arc::new(db);
105 let config = DisputeCoordinatorConfig { col_dispute_data: 0 };
106 let keystore = make_keystore();
107 let (dispute_req_receiver, dispute_req_cfg) = IncomingRequest::get_config_receiver::<
108 Block,
109 sc_network::NetworkWorker<Block, Hash>,
110 >(&ReqProtocolNames::new(GENESIS_HASH, None));
111 let mock_runtime_api = MockRuntimeApi::new(
112 state.config.clone(),
113 state.test_authorities.clone(),
114 state.candidate_receipts.clone(),
115 state.candidate_events.clone(),
116 Default::default(),
117 0,
118 MockRuntimeApiCoreState::Scheduled,
119 );
120 let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
121 let mock_chain_api = MockChainApi::new(chain_api_state);
122 let mock_availability_recovery = MockAvailabilityRecovery::new();
123 let mock_approval_voting = MockApprovalVotingParallel::new();
124 let mock_candidate_validation = MockCandidateValidation::new();
125 let network_bridge_tx = MockNetworkBridgeTx::new(
126 network,
127 network_interface.subsystem_sender(),
128 state.test_authorities.clone(),
129 );
130 let network_bridge_rx = MockNetworkBridgeRx::new(network_receiver, Some(dispute_req_cfg));
131 let dispute_distribution = DisputeDistributionSubsystem::new(
132 keystore.clone(),
133 dispute_req_receiver,
134 state.test_authorities.clone(),
135 Metrics::try_register(&dependencies.registry).unwrap(),
136 );
137 let dispute_coordinator = DisputeCoordinatorSubsystem::new(
138 store,
139 config,
140 keystore,
141 Metrics::try_register(&dependencies.registry).unwrap(),
142 );
143
144 let dummy = dummy_builder!(spawn_task_handle, overseer_metrics)
145 .replace_runtime_api(|_| mock_runtime_api)
146 .replace_chain_api(|_| mock_chain_api)
147 .replace_availability_recovery(|_| mock_availability_recovery)
148 .replace_approval_voting_parallel(|_| mock_approval_voting)
149 .replace_candidate_validation(|_| mock_candidate_validation)
150 .replace_network_bridge_tx(|_| network_bridge_tx)
151 .replace_network_bridge_rx(|_| network_bridge_rx)
152 .replace_dispute_distribution(|_| dispute_distribution)
153 .replace_dispute_coordinator(|_| dispute_coordinator);
154
155 let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).unwrap();
156 let overseer_handle = OverseerHandle::new(raw_handle);
157
158 (overseer, overseer_handle)
159}
160
161pub fn prepare_test(state: &TestState, with_prometheus_endpoint: bool) -> TestEnvironment {
162 let dependencies = TestEnvironmentDependencies::default();
163 let (network, network_interface, network_receiver) = new_network(
164 &state.config,
165 &dependencies,
166 &state.test_authorities,
167 vec![Arc::new(state.clone())],
168 );
169 let (overseer, overseer_handle) =
170 build_overseer(state, network.clone(), network_interface, network_receiver, &dependencies);
171
172 TestEnvironment::new(
173 dependencies,
174 state.config.clone(),
175 network,
176 overseer,
177 overseer_handle,
178 state.test_authorities.clone(),
179 with_prometheus_endpoint,
180 )
181}
182
183pub async fn benchmark_dispute_coordinator(
184 env: &mut TestEnvironment,
185 state: &TestState,
186) -> BenchmarkUsage {
187 let config = env.config().clone();
188
189 let test_start = Instant::now();
190
191 for block_info in state.block_infos.iter() {
192 let block_num = block_info.number as usize;
193 gum::info!(target: LOG_TARGET, "Current block {}/{} {:?}", block_num, config.num_blocks, block_info.hash);
194 env.metrics().set_current_block(block_num);
195 env.import_block(block_info.clone()).await;
196
197 let candidate_receipts =
198 state.candidate_receipts.get(&block_info.hash).expect("pregenerated");
199 for candidate_receipt in candidate_receipts.iter() {
200 let peer_id = *env.authorities().peer_ids.get(1).expect("all validators have ids");
201 let payload =
202 state.dispute_requests.get(&candidate_receipt.hash()).expect("pregenerated");
203 let (pending_response, pending_response_receiver) =
204 futures::channel::oneshot::channel();
205 let request =
206 RawIncomingRequest { peer: peer_id, payload: payload.encode(), pending_response };
207 let peer = env
208 .authorities()
209 .validator_authority_id
210 .get(1)
211 .expect("all validators have keys");
212
213 assert!(env.network().is_peer_connected(peer), "Peer {peer:?} is not connected");
214 env.network().send_request_from_peer(peer, request).unwrap();
215 let res = pending_response_receiver.await.expect("dispute request sent");
216 gum::debug!(target: LOG_TARGET, "Dispute request sent to node from peer {res:?}");
217 }
218
219 let candidate_hashes =
220 candidate_receipts.iter().map(|receipt| receipt.hash()).collect::<Vec<_>>();
221 let requests_expected = candidate_hashes.len() *
222 (state.config.n_validators * state.config.connectivity / 100 - 1);
223
224 loop {
225 let requests_sent = candidate_hashes
226 .iter()
227 .map(|candidate_hash| {
228 state
229 .requests_tracker
230 .lock()
231 .unwrap()
232 .get(candidate_hash)
233 .unwrap_or(&Default::default())
234 .len()
235 })
236 .sum::<usize>();
237
238 gum::info!(target: LOG_TARGET, "Waiting for dispute requests to be sent: {requests_sent}/{requests_expected}");
239 if requests_sent == requests_expected {
240 break;
241 }
242
243 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
244 }
245 }
246
247 let duration: u128 = test_start.elapsed().as_millis();
248 gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{duration:?}ms").cyan());
249 gum::info!(target: LOG_TARGET,
250 "Avg block time: {}",
251 format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
252 );
253
254 env.stop().await;
255 env.collect_resource_usage(&["dispute-coordinator", "dispute-distribution"], false)
256}