1use crate::{
18 approval::{
19 helpers::{
20 generate_babe_epoch, generate_new_session_topology, generate_peer_view_change_for,
21 make_header, PastSystemClock,
22 },
23 message_generator::PeerMessagesGenerator,
24 mock_chain_selection::MockChainSelection,
25 test_message::{MessagesBundle, TestMessageInfo},
26 },
27 configuration::{TestAuthorities, TestConfiguration},
28 dummy_builder,
29 environment::{TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT},
30 mock::{
31 availability_recovery::MockAvailabilityRecovery,
32 candidate_validation::MockCandidateValidation,
33 chain_api::{ChainApiState, MockChainApi},
34 network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
35 runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
36 AlwaysSupportsParachains, TestSyncOracle,
37 },
38 network::{
39 new_network, HandleNetworkMessage, NetworkEmulatorHandle, NetworkInterface,
40 NetworkInterfaceReceiver,
41 },
42 usage::BenchmarkUsage,
43 NODE_UNDER_TEST,
44};
45use codec::{Decode, Encode};
46use colored::Colorize;
47use futures::channel::oneshot;
48use itertools::Itertools;
49use orchestra::TimeoutExt;
50use overseer::{metrics::Metrics as OverseerMetrics, MetricsTrait};
51use polkadot_approval_distribution::ApprovalDistribution;
52use polkadot_node_core_approval_voting_parallel::ApprovalVotingParallelSubsystem;
53use polkadot_node_primitives::approval::time::{
54 slot_number_to_tick, tick_to_slot_number, Clock, ClockExt, SystemClock,
55};
56
57use polkadot_node_core_approval_voting::{
58 ApprovalVotingSubsystem, Config as ApprovalVotingConfig, RealAssignmentCriteria,
59};
60use polkadot_node_network_protocol::v3 as protocol_v3;
61use polkadot_node_primitives::approval::{self, v1::RelayVRFStory};
62use polkadot_node_subsystem::{
63 messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
64 overseer, AllMessages, Overseer, OverseerConnector, SpawnGlue,
65};
66use polkadot_node_subsystem_test_helpers::mock::new_block_import_info;
67use polkadot_overseer::Handle as OverseerHandleReal;
68use polkadot_primitives::{
69 BlockNumber, CandidateEvent, CandidateIndex, CandidateReceiptV2 as CandidateReceipt, Hash,
70 Header, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
71};
72use prometheus::Registry;
73use sc_keystore::LocalKeystore;
74use sc_service::SpawnTaskHandle;
75use serde::{Deserialize, Serialize};
76use sp_application_crypto::AppCrypto;
77use sp_consensus_babe::Epoch as BabeEpoch;
78use sp_core::H256;
79use sp_keystore::Keystore;
80use std::{
81 cmp::max,
82 collections::{HashMap, HashSet},
83 fs,
84 io::Read,
85 ops::Sub,
86 sync::{
87 atomic::{AtomicBool, AtomicU32, AtomicU64},
88 Arc,
89 },
90 time::{Duration, Instant},
91};
92use tokio::time::sleep;
93
94mod helpers;
95mod message_generator;
96mod mock_chain_selection;
97mod test_message;
98
99pub(crate) const LOG_TARGET: &str = "subsystem-bench::approval";
100pub(crate) const NUM_COLUMNS: u32 = 1;
101pub(crate) const SLOT_DURATION_MILLIS: u64 = 6000;
102pub(crate) const TEST_CONFIG: ApprovalVotingConfig = ApprovalVotingConfig {
103 col_approval_data: DATA_COL,
104 slot_duration_millis: SLOT_DURATION_MILLIS,
105};
106
107const DATA_COL: u32 = 0;
108
109const BUFFER_FOR_GENERATION_MILLIS: u64 = 30_000;
112
113#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
115#[clap(rename_all = "kebab-case")]
116#[allow(missing_docs)]
117pub struct ApprovalsOptions {
118 #[clap(short, long, default_value_t = 89)]
119 pub last_considered_tranche: u32,
121 #[clap(short, long, default_value_t = 1.0)]
122 pub coalesce_mean: f32,
124 #[clap(short, long, default_value_t = 1.0)]
125 pub coalesce_std_dev: f32,
127 pub coalesce_tranche_diff: u32,
129 #[clap(short, long, default_value_t = false)]
130 pub enable_assignments_v2: bool,
132 #[clap(short, long, default_value_t = true)]
133 pub stop_when_approved: bool,
135 #[clap(short, long)]
136 #[clap(short, long, default_value_t = format!("/tmp"))]
138 pub workdir_prefix: String,
139 #[clap(short, long, default_value_t = 0)]
141 pub num_no_shows_per_candidate: u32,
142 #[clap(short, long, default_value_t = true)]
144 pub approval_voting_parallel_enabled: bool,
145}
146
147impl ApprovalsOptions {
148 fn fingerprint(&self) -> Vec<u8> {
150 let mut bytes = Vec::new();
151 bytes.extend(self.coalesce_mean.to_be_bytes());
152 bytes.extend(self.coalesce_std_dev.to_be_bytes());
153 bytes.extend(self.coalesce_tranche_diff.to_be_bytes());
154 bytes.extend((self.enable_assignments_v2 as i32).to_be_bytes());
155 bytes
156 }
157}
158
159#[derive(Clone, Debug)]
163struct BlockTestData {
164 slot: Slot,
167 hash: Hash,
169 block_number: BlockNumber,
171 candidates: Vec<CandidateEvent>,
173 header: Header,
175 relay_vrf_story: RelayVRFStory,
177 approved: Arc<AtomicBool>,
181 total_candidates_before: u64,
183 votes: Arc<Vec<Vec<AtomicBool>>>,
188}
189
190#[derive(Debug)]
192struct CandidateTestData {
193 max_no_shows: u32,
195 last_tranche_with_no_show: u32,
197 sent_assignment: u32,
199 num_no_shows: u32,
201 max_tranche: u32,
203 needed_approvals: u32,
205}
206
207impl CandidateTestData {
208 fn should_send_tranche(&self, tranche: u32) -> bool {
210 self.sent_assignment <= self.needed_approvals ||
211 tranche <= self.max_tranche + self.num_no_shows
212 }
213
214 fn set_max_tranche(&mut self, tranche: u32) {
216 self.max_tranche = max(tranche, self.max_tranche);
217 }
218
219 fn record_no_show(&mut self, tranche: u32) {
221 self.num_no_shows += 1;
222 self.last_tranche_with_no_show = max(tranche, self.last_tranche_with_no_show);
223 }
224
225 fn mark_sent_assignment(&mut self, tranche: u32) {
227 if self.sent_assignment < self.needed_approvals {
228 self.set_max_tranche(tranche);
229 }
230
231 self.sent_assignment += 1;
232 }
233
234 fn should_no_show(&self, tranche: u32) -> bool {
236 (self.num_no_shows < self.max_no_shows && self.last_tranche_with_no_show < tranche) ||
237 (tranche == 0 && self.num_no_shows == 0 && self.max_no_shows > 0)
238 }
239}
240
241#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
244struct GeneratedState {
245 all_messages: Option<Vec<test_message::MessagesBundle>>,
247 initial_slot: Slot,
249}
250
251#[derive(Clone)]
257pub struct ApprovalTestState {
258 configuration: TestConfiguration,
260 options: ApprovalsOptions,
262 blocks: Vec<BlockTestData>,
264 babe_epoch: BabeEpoch,
266 generated_state: GeneratedState,
268 test_authorities: TestAuthorities,
270 last_approved_block: Arc<AtomicU32>,
272 total_sent_messages_to_node: Arc<AtomicU64>,
274 total_sent_messages_from_node: Arc<AtomicU64>,
276 total_unique_messages: Arc<AtomicU64>,
278 approval_voting_parallel_metrics: polkadot_node_core_approval_voting_parallel::Metrics,
280 delta_tick_from_generated: Arc<AtomicU64>,
283}
284
285impl ApprovalTestState {
286 fn new(
289 configuration: &TestConfiguration,
290 options: ApprovalsOptions,
291 dependencies: &TestEnvironmentDependencies,
292 ) -> Self {
293 let test_authorities = configuration.generate_authorities();
294 let start = Instant::now();
295
296 let messages_path = PeerMessagesGenerator::generate_messages_if_needed(
297 configuration,
298 &test_authorities,
299 &options,
300 &dependencies.task_manager.spawn_handle(),
301 );
302
303 let mut messages_file =
304 fs::OpenOptions::new().read(true).open(messages_path.as_path()).unwrap();
305 let mut messages_bytes = Vec::<u8>::with_capacity(2000000);
306
307 messages_file
308 .read_to_end(&mut messages_bytes)
309 .expect("Could not initialize list of messages");
310 let generated_state: GeneratedState =
311 Decode::decode(&mut messages_bytes.as_slice()).expect("Could not decode messages");
312
313 gum::info!(
314 "It took {:?} ms to load {:?} unique messages",
315 start.elapsed().as_millis(),
316 generated_state.all_messages.as_ref().map(|val| val.len()).unwrap_or_default()
317 );
318
319 let babe_epoch =
320 generate_babe_epoch(generated_state.initial_slot, test_authorities.clone());
321 let blocks = Self::generate_blocks_information(
322 configuration,
323 &babe_epoch,
324 generated_state.initial_slot,
325 );
326
327 let state = ApprovalTestState {
328 blocks,
329 babe_epoch: babe_epoch.clone(),
330 generated_state,
331 test_authorities,
332 last_approved_block: Arc::new(AtomicU32::new(0)),
333 total_sent_messages_to_node: Arc::new(AtomicU64::new(0)),
334 total_sent_messages_from_node: Arc::new(AtomicU64::new(0)),
335 total_unique_messages: Arc::new(AtomicU64::new(0)),
336 options,
337 approval_voting_parallel_metrics:
338 polkadot_node_core_approval_voting_parallel::Metrics::try_register(
339 &dependencies.registry,
340 )
341 .unwrap(),
342 delta_tick_from_generated: Arc::new(AtomicU64::new(630720000)),
343 configuration: configuration.clone(),
344 };
345
346 gum::info!("Built testing state");
347
348 state
349 }
350
351 fn generate_blocks_information(
354 configuration: &TestConfiguration,
355 babe_epoch: &BabeEpoch,
356 initial_slot: Slot,
357 ) -> Vec<BlockTestData> {
358 let mut per_block_heads: Vec<BlockTestData> = Vec::new();
359 let mut prev_candidates = 0;
360 for block_number in 1..=configuration.num_blocks {
361 let block_hash = Hash::repeat_byte(block_number as u8);
362 let parent_hash =
363 per_block_heads.last().map(|val| val.hash).unwrap_or(Hash::repeat_byte(0xde));
364 let slot_for_block = initial_slot + (block_number as u64 - 1);
365
366 let header = make_header(parent_hash, slot_for_block, block_number as u32);
367
368 let unsafe_vrf = approval::v1::babe_unsafe_vrf_info(&header)
369 .expect("Can not continue without vrf generator");
370 let relay_vrf_story = unsafe_vrf
371 .compute_randomness(
372 &babe_epoch.authorities,
373 &babe_epoch.randomness,
374 babe_epoch.epoch_index,
375 )
376 .expect("Can not continue without vrf story");
377 let block_info = BlockTestData {
378 slot: slot_for_block,
379 block_number: block_number as BlockNumber,
380 hash: block_hash,
381 header,
382 candidates: helpers::make_candidates(
383 block_hash,
384 block_number as BlockNumber,
385 configuration.n_cores as u32,
386 configuration.n_cores as u32,
387 ),
388 relay_vrf_story,
389 approved: Arc::new(AtomicBool::new(false)),
390 total_candidates_before: prev_candidates,
391 votes: Arc::new(
392 (0..configuration.n_validators)
393 .map(|_| {
394 (0..configuration.n_cores).map(|_| AtomicBool::new(false)).collect_vec()
395 })
396 .collect_vec(),
397 ),
398 };
399 prev_candidates += block_info.candidates.len() as u64;
400 per_block_heads.push(block_info)
401 }
402 per_block_heads
403 }
404
405 async fn start_message_production(
407 &mut self,
408 network_emulator: &NetworkEmulatorHandle,
409 overseer_handle: OverseerHandleReal,
410 env: &TestEnvironment,
411 registry: Registry,
412 ) -> oneshot::Receiver<()> {
413 gum::info!(target: LOG_TARGET, "Start assignments/approvals production");
414
415 let (producer_tx, producer_rx) = oneshot::channel();
416 let peer_message_source = PeerMessageProducer {
417 network: network_emulator.clone(),
418 overseer_handle: overseer_handle.clone(),
419 state: self.clone(),
420 options: self.options.clone(),
421 notify_done: producer_tx,
422 registry,
423 };
424
425 peer_message_source
426 .produce_messages(env, self.generated_state.all_messages.take().unwrap());
427 producer_rx
428 }
429
430 fn build_chain_api_state(&self) -> ChainApiState {
432 ChainApiState {
433 block_headers: self
434 .blocks
435 .iter()
436 .map(|block| (block.hash, block.header.clone()))
437 .collect(),
438 }
439 }
440
441 fn candidate_events_by_block(&self) -> HashMap<H256, Vec<CandidateEvent>> {
443 self.blocks.iter().map(|block| (block.hash, block.candidates.clone())).collect()
444 }
445
446 fn candidate_hashes_by_block(&self) -> HashMap<H256, Vec<CandidateReceipt>> {
448 self.blocks
449 .iter()
450 .map(|block| {
451 (
452 block.hash,
453 block
454 .candidates
455 .iter()
456 .map(|candidate_event| match candidate_event {
457 CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
458 CandidateEvent::CandidateIncluded(receipt, _, _, _) => receipt.clone(),
459 CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
460 })
461 .collect_vec(),
462 )
463 })
464 .collect()
465 }
466
467 fn subsystem_name(&self) -> &'static str {
468 if self.options.approval_voting_parallel_enabled {
469 "approval-voting-parallel-subsystem"
470 } else {
471 "approval-distribution-subsystem"
472 }
473 }
474}
475
476impl ApprovalTestState {
477 fn get_info_by_hash(&self, requested_hash: Hash) -> &BlockTestData {
479 self.blocks
480 .iter()
481 .find(|block| block.hash == requested_hash)
482 .expect("Mocks should not use unknown hashes")
483 }
484
485 fn get_info_by_slot(&self, slot: Slot) -> Option<&BlockTestData> {
487 self.blocks.iter().find(|block| block.slot == slot)
488 }
489}
490
491#[async_trait::async_trait]
492impl HandleNetworkMessage for ApprovalTestState {
493 async fn handle(
494 &self,
495 _message: crate::network::NetworkMessage,
496 _node_sender: &mut futures::channel::mpsc::UnboundedSender<crate::network::NetworkMessage>,
497 ) -> Option<crate::network::NetworkMessage> {
498 self.total_sent_messages_from_node
499 .as_ref()
500 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
501 None
502 }
503}
504
505struct PeerMessageProducer {
507 state: ApprovalTestState,
509 options: ApprovalsOptions,
511 network: NetworkEmulatorHandle,
513 overseer_handle: OverseerHandleReal,
516 notify_done: oneshot::Sender<()>,
519 registry: Registry,
521}
522
523impl PeerMessageProducer {
524 fn produce_messages(
527 mut self,
528 env: &TestEnvironment,
529 all_messages: Vec<test_message::MessagesBundle>,
530 ) {
531 env.spawn_blocking("produce-messages", async move {
532 let mut initialized_blocks = HashSet::new();
533 let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
534 self.initialize_candidates_test_data();
535 let mut skipped_messages: Vec<test_message::MessagesBundle> = Vec::new();
536 let mut re_process_skipped = false;
537
538 let system_clock =
539 PastSystemClock::new(SystemClock {}, self.state.delta_tick_from_generated.clone());
540 let mut all_messages = all_messages.into_iter().peekable();
541
542 while all_messages.peek().is_some() {
543 let current_slot =
544 tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
545 let block_to_initialize = self
546 .state
547 .blocks
548 .iter()
549 .filter(|block_info| {
550 block_info.slot <= current_slot &&
551 !initialized_blocks.contains(&block_info.hash)
552 })
553 .cloned()
554 .collect_vec();
555 for block_info in block_to_initialize {
556 if !TestEnvironment::metric_lower_than(
557 &self.registry,
558 "polkadot_parachain_imported_candidates_total",
559 (block_info.total_candidates_before + block_info.candidates.len() as u64 -
560 1) as f64,
561 ) {
562 initialized_blocks.insert(block_info.hash);
563 self.initialize_block(&block_info).await;
564 }
565 }
566
567 let mut maybe_need_skip = if re_process_skipped {
568 skipped_messages.clone().into_iter().peekable()
569 } else {
570 vec![].into_iter().peekable()
571 };
572
573 let progressing_iterator = if !re_process_skipped {
574 &mut all_messages
575 } else {
576 re_process_skipped = false;
577 skipped_messages.clear();
578 &mut maybe_need_skip
579 };
580
581 while progressing_iterator
582 .peek()
583 .map(|bundle| {
584 self.time_to_process_message(
585 bundle,
586 current_slot,
587 &initialized_blocks,
588 &system_clock,
589 &per_candidate_data,
590 )
591 })
592 .unwrap_or_default()
593 {
594 let bundle = progressing_iterator.next().unwrap();
595 re_process_skipped = self.process_message(
596 bundle,
597 &mut per_candidate_data,
598 &mut skipped_messages,
599 ) || re_process_skipped;
600 }
601 sleep(Duration::from_millis(50)).await;
603 }
604
605 gum::info!(
606 "All messages sent max_tranche {:?} last_tranche_with_no_show {:?}",
607 per_candidate_data.values().map(|data| data.max_tranche).max(),
608 per_candidate_data.values().map(|data| data.last_tranche_with_no_show).max()
609 );
610 sleep(Duration::from_secs(6)).await;
611 let (tx, rx) = oneshot::channel();
615 let msg = if self.options.approval_voting_parallel_enabled {
616 AllMessages::ApprovalVotingParallel(
617 ApprovalVotingParallelMessage::GetApprovalSignatures(HashSet::new(), tx),
618 )
619 } else {
620 AllMessages::ApprovalDistribution(
621 ApprovalDistributionMessage::GetApprovalSignatures(HashSet::new(), tx),
622 )
623 };
624 self.send_overseer_message(msg, ValidatorIndex(0), None).await;
625 rx.await.expect("Failed to get signatures");
626 self.notify_done.send(()).expect("Failed to notify main loop");
627 gum::info!("All messages processed ");
628 });
629 }
630
631 pub fn process_message(
634 &mut self,
635 bundle: test_message::MessagesBundle,
636 per_candidate_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
637 skipped_messages: &mut Vec<test_message::MessagesBundle>,
638 ) -> bool {
639 let mut reprocess_skipped = false;
640 let block_info = self
641 .state
642 .get_info_by_hash(bundle.assignments.first().unwrap().block_hash)
643 .clone();
644
645 if bundle.should_send(per_candidate_data, &self.options) {
646 bundle.record_sent_assignment(per_candidate_data);
647
648 let assignments = bundle.assignments.clone();
649
650 for message in bundle.assignments.into_iter().chain(bundle.approvals.into_iter()) {
651 if message.no_show_if_required(&assignments, per_candidate_data) {
652 reprocess_skipped = true;
653 continue;
654 } else {
655 message.record_vote(&block_info);
656 }
657 self.state
658 .total_unique_messages
659 .as_ref()
660 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
661 for (peer, messages) in
662 message.clone().split_by_peer_id(&self.state.test_authorities)
663 {
664 for message in messages {
665 self.state
666 .total_sent_messages_to_node
667 .as_ref()
668 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
669 self.queue_message_from_peer(message, peer.0)
670 }
671 }
672 }
673 } else if !block_info.approved.load(std::sync::atomic::Ordering::SeqCst) &&
674 self.options.num_no_shows_per_candidate > 0
675 {
676 skipped_messages.push(bundle);
677 }
678 reprocess_skipped
679 }
680
681 pub fn time_to_process_message(
683 &self,
684 bundle: &MessagesBundle,
685 current_slot: Slot,
686 initialized_blocks: &HashSet<Hash>,
687 system_clock: &PastSystemClock,
688 per_candidate_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
689 ) -> bool {
690 let block_info =
691 self.state.get_info_by_hash(bundle.assignments.first().unwrap().block_hash);
692 let tranche_now = system_clock.tranche_now(SLOT_DURATION_MILLIS, block_info.slot);
693
694 Self::is_past_tranche(
695 bundle,
696 tranche_now,
697 current_slot,
698 block_info,
699 initialized_blocks.contains(&block_info.hash),
700 ) || !bundle.should_send(per_candidate_data, &self.options)
701 }
702
703 pub fn is_past_tranche(
705 bundle: &MessagesBundle,
706 tranche_now: u32,
707 current_slot: Slot,
708 block_info: &BlockTestData,
709 block_initialized: bool,
710 ) -> bool {
711 bundle.tranche_to_send() <= tranche_now &&
712 current_slot >= block_info.slot &&
713 block_initialized
714 }
715
716 fn queue_message_from_peer(&mut self, message: TestMessageInfo, sent_by: ValidatorIndex) {
718 let peer_authority_id = self
719 .state
720 .test_authorities
721 .validator_authority_id
722 .get(sent_by.0 as usize)
723 .expect("We can't handle unknown peers")
724 .clone();
725
726 if let Err(err) = self.network.send_message_from_peer(
727 &peer_authority_id,
728 protocol_v3::ValidationProtocol::ApprovalDistribution(message.msg).into(),
729 ) {
730 gum::warn!(target: LOG_TARGET, ?sent_by, ?err, "Validator can not send message");
731 }
732 }
733
734 async fn send_overseer_message(
736 &mut self,
737 message: AllMessages,
738 _sent_by: ValidatorIndex,
739 _latency: Option<Duration>,
740 ) {
741 self.overseer_handle
742 .send_msg(message, LOG_TARGET)
743 .timeout(MAX_TIME_OF_FLIGHT)
744 .await
745 .unwrap_or_else(|| {
746 panic!("{} ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
747 });
748 }
749
750 async fn initialize_block(&mut self, block_info: &BlockTestData) {
753 gum::info!("Initialize block {:?}", block_info.hash);
754 let (tx, rx) = oneshot::channel();
755 self.overseer_handle.wait_for_activation(block_info.hash, tx).await;
756
757 rx.await
758 .expect("We should not fail waiting for block to be activated")
759 .expect("We should not fail waiting for block to be activated");
760
761 for validator in 1..self.state.test_authorities.validator_authority_id.len() as u32 {
762 let peer_id = self.state.test_authorities.peer_ids.get(validator as usize).unwrap();
763 let validator = ValidatorIndex(validator);
764 let view_update = generate_peer_view_change_for(
765 block_info.hash,
766 *peer_id,
767 self.state.options.approval_voting_parallel_enabled,
768 );
769
770 self.send_overseer_message(view_update, validator, None).await;
771 }
772 }
773
774 fn initialize_candidates_test_data(
777 &self,
778 ) -> HashMap<(Hash, CandidateIndex), CandidateTestData> {
779 let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
780 HashMap::new();
781 for block_info in self.state.blocks.iter() {
782 for (candidate_index, _) in block_info.candidates.iter().enumerate() {
783 per_candidate_data.insert(
784 (block_info.hash, candidate_index as CandidateIndex),
785 CandidateTestData {
786 max_no_shows: self.options.num_no_shows_per_candidate,
787 last_tranche_with_no_show: 0,
788 sent_assignment: 0,
789 num_no_shows: 0,
790 max_tranche: 0,
791 needed_approvals: self.state.configuration.needed_approvals as u32,
792 },
793 );
794 }
795 }
796 per_candidate_data
797 }
798}
799
800fn build_overseer(
803 state: &ApprovalTestState,
804 network: &NetworkEmulatorHandle,
805 config: &TestConfiguration,
806 dependencies: &TestEnvironmentDependencies,
807 network_interface: &NetworkInterface,
808 network_receiver: NetworkInterfaceReceiver,
809) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandleReal) {
810 let overseer_connector = OverseerConnector::with_event_capacity(6400000);
811
812 let spawn_task_handle = dependencies.task_manager.spawn_handle();
813
814 let db = kvdb_memorydb::create(NUM_COLUMNS);
815 let db: polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter<kvdb_memorydb::InMemory> =
816 polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
817 let keystore = LocalKeystore::in_memory();
818 keystore
819 .sr25519_generate_new(
820 ASSIGNMENT_KEY_TYPE_ID,
821 Some(state.test_authorities.key_seeds.get(NODE_UNDER_TEST as usize).unwrap().as_str()),
822 )
823 .unwrap();
824 keystore
825 .sr25519_generate_new(
826 ValidatorId::ID,
827 Some(state.test_authorities.key_seeds.get(NODE_UNDER_TEST as usize).unwrap().as_str()),
828 )
829 .unwrap();
830
831 let system_clock =
832 PastSystemClock::new(SystemClock {}, state.delta_tick_from_generated.clone());
833 let keystore = Arc::new(keystore);
834 let db = Arc::new(db);
835
836 let mock_chain_api = MockChainApi::new(state.build_chain_api_state());
837 let mock_chain_selection =
838 MockChainSelection { state: state.clone(), clock: system_clock.clone() };
839 let mock_runtime_api = MockRuntimeApi::new(
840 config.clone(),
841 state.test_authorities.clone(),
842 state.candidate_hashes_by_block(),
843 state.candidate_events_by_block(),
844 Some(state.babe_epoch.clone()),
845 1,
846 MockRuntimeApiCoreState::Occupied,
847 );
848 let mock_tx_bridge = MockNetworkBridgeTx::new(
849 network.clone(),
850 network_interface.subsystem_sender(),
851 state.test_authorities.clone(),
852 );
853 let mock_rx_bridge = MockNetworkBridgeRx::new(network_receiver, None);
854 let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
855 let task_handle = spawn_task_handle.clone();
856 let dummy = dummy_builder!(task_handle, overseer_metrics)
857 .replace_chain_api(|_| mock_chain_api)
858 .replace_chain_selection(|_| mock_chain_selection)
859 .replace_runtime_api(|_| mock_runtime_api)
860 .replace_network_bridge_tx(|_| mock_tx_bridge)
861 .replace_network_bridge_rx(|_| mock_rx_bridge)
862 .replace_availability_recovery(|_| MockAvailabilityRecovery::new())
863 .replace_candidate_validation(|_| MockCandidateValidation::new());
864
865 let (overseer, raw_handle) = if state.options.approval_voting_parallel_enabled {
866 let approval_voting_parallel = ApprovalVotingParallelSubsystem::with_config_and_clock(
867 TEST_CONFIG,
868 db.clone(),
869 keystore.clone(),
870 Box::new(TestSyncOracle {}),
871 state.approval_voting_parallel_metrics.clone(),
872 Arc::new(system_clock.clone()),
873 SpawnGlue(spawn_task_handle.clone()),
874 None,
875 );
876 dummy
877 .replace_approval_voting_parallel(|_| approval_voting_parallel)
878 .build_with_connector(overseer_connector)
879 .expect("Should not fail")
880 } else {
881 let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
882 TEST_CONFIG,
883 db.clone(),
884 keystore.clone(),
885 Box::new(TestSyncOracle {}),
886 state.approval_voting_parallel_metrics.approval_voting_metrics(),
887 Arc::new(system_clock.clone()),
888 Arc::new(SpawnGlue(spawn_task_handle.clone())),
889 1,
890 Duration::from_secs(1),
891 );
892
893 let approval_distribution = ApprovalDistribution::new_with_clock(
894 state.approval_voting_parallel_metrics.approval_distribution_metrics(),
895 TEST_CONFIG.slot_duration_millis,
896 Arc::new(system_clock.clone()),
897 Arc::new(RealAssignmentCriteria {}),
898 );
899
900 dummy
901 .replace_approval_voting(|_| approval_voting)
902 .replace_approval_distribution(|_| approval_distribution)
903 .build_with_connector(overseer_connector)
904 .expect("Should not fail")
905 };
906
907 let overseer_handle = OverseerHandleReal::new(raw_handle);
908 (overseer, overseer_handle)
909}
910
911pub fn prepare_test(
913 config: TestConfiguration,
914 options: ApprovalsOptions,
915 with_prometheus_endpoint: bool,
916) -> (TestEnvironment, ApprovalTestState) {
917 prepare_test_inner(
918 config,
919 TestEnvironmentDependencies::default(),
920 options,
921 with_prometheus_endpoint,
922 )
923}
924
925fn prepare_test_inner(
927 config: TestConfiguration,
928 dependencies: TestEnvironmentDependencies,
929 options: ApprovalsOptions,
930 with_prometheus_endpoint: bool,
931) -> (TestEnvironment, ApprovalTestState) {
932 gum::info!("Prepare test state");
933 let state = ApprovalTestState::new(&config, options, &dependencies);
934
935 gum::info!("Build network emulator");
936
937 let (network, network_interface, network_receiver) =
938 new_network(&config, &dependencies, &state.test_authorities, vec![Arc::new(state.clone())]);
939
940 gum::info!("Build overseer");
941
942 let (overseer, overseer_handle) = build_overseer(
943 &state,
944 &network,
945 &config,
946 &dependencies,
947 &network_interface,
948 network_receiver,
949 );
950
951 (
952 TestEnvironment::new(
953 dependencies,
954 config,
955 network,
956 overseer,
957 overseer_handle,
958 state.test_authorities.clone(),
959 with_prometheus_endpoint,
960 ),
961 state,
962 )
963}
964
965pub async fn bench_approvals(
966 env: &mut TestEnvironment,
967 mut state: ApprovalTestState,
968) -> BenchmarkUsage {
969 let producer_rx = state
970 .start_message_production(
971 env.network(),
972 env.overseer_handle().clone(),
973 env,
974 env.registry().clone(),
975 )
976 .await;
977 bench_approvals_run(env, state, producer_rx).await
978}
979
980pub async fn bench_approvals_run(
982 env: &mut TestEnvironment,
983 state: ApprovalTestState,
984 producer_rx: oneshot::Receiver<()>,
985) -> BenchmarkUsage {
986 let config = env.config().clone();
987
988 env.metrics().set_n_validators(config.n_validators);
989 env.metrics().set_n_cores(config.n_cores);
990
991 let mut initialization_messages = env.network().generate_peer_connected(|e| {
994 if state.options.approval_voting_parallel_enabled {
995 AllMessages::ApprovalVotingParallel(ApprovalVotingParallelMessage::NetworkBridgeUpdate(
996 e,
997 ))
998 } else {
999 AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(e))
1000 }
1001 });
1002 initialization_messages.extend(generate_new_session_topology(
1003 &state.test_authorities,
1004 ValidatorIndex(NODE_UNDER_TEST),
1005 state.options.approval_voting_parallel_enabled,
1006 ));
1007 for message in initialization_messages {
1008 env.send_message(message).await;
1009 }
1010
1011 let start_marker = Instant::now();
1012 let real_clock = SystemClock {};
1013 state.delta_tick_from_generated.store(
1014 real_clock.tick_now() -
1015 slot_number_to_tick(SLOT_DURATION_MILLIS, state.generated_state.initial_slot),
1016 std::sync::atomic::Ordering::SeqCst,
1017 );
1018 let system_clock = PastSystemClock::new(real_clock, state.delta_tick_from_generated.clone());
1019
1020 for block_num in 0..env.config().num_blocks {
1021 let mut current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
1022
1023 while current_slot < state.generated_state.initial_slot {
1025 sleep(Duration::from_millis(5)).await;
1026 current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
1027 }
1028
1029 gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks);
1030 env.metrics().set_current_block(block_num);
1031 let block_start_ts = Instant::now();
1032
1033 if let Some(block_info) = state.get_info_by_slot(current_slot) {
1034 env.import_block(new_block_import_info(block_info.hash, block_info.block_number))
1035 .await;
1036 }
1037
1038 let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
1039 env.metrics().set_block_time(block_time);
1040 gum::info!("Block time {}", format!("{block_time:?}ms").cyan());
1041
1042 system_clock
1043 .wait(slot_number_to_tick(SLOT_DURATION_MILLIS, current_slot + 1))
1044 .await;
1045 }
1046
1047 while state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst) <
1050 env.config().num_blocks as u32
1051 {
1052 gum::info!(
1053 "Waiting for all blocks to be approved current approved {:} num_sent {:} num_unique {:}",
1054 state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst),
1055 state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
1056 state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
1057 );
1058 tokio::time::sleep(Duration::from_secs(6)).await;
1059 }
1060
1061 gum::info!("Awaiting producer to signal done");
1062
1063 producer_rx.await.expect("Failed to receive done from message producer");
1064
1065 gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
1066 let at_least_messages =
1067 state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
1068 env.wait_until_metric(
1069 "polkadot_parachain_subsystem_bounded_received",
1070 Some((
1071 "subsystem_name",
1072 if state.options.approval_voting_parallel_enabled {
1073 "approval-voting-parallel-subsystem"
1074 } else {
1075 "approval-distribution-subsystem"
1076 },
1077 )),
1078 |value| {
1079 gum::debug!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
1080 value >= at_least_messages as f64
1081 },
1082 )
1083 .await;
1084
1085 gum::info!("Requesting approval votes ms");
1086
1087 for info in &state.blocks {
1088 for (index, candidates) in info.candidates.iter().enumerate() {
1089 match candidates {
1090 CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
1091 CandidateEvent::CandidateIncluded(receipt_fetch, _head, _, _) => {
1092 let (tx, rx) = oneshot::channel();
1093
1094 let msg = if state.options.approval_voting_parallel_enabled {
1095 AllMessages::ApprovalVotingParallel(
1096 ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(
1097 receipt_fetch.hash(),
1098 tx,
1099 ),
1100 )
1101 } else {
1102 AllMessages::ApprovalVoting(
1103 ApprovalVotingMessage::GetApprovalSignaturesForCandidate(
1104 receipt_fetch.hash(),
1105 tx,
1106 ),
1107 )
1108 };
1109 env.send_message(msg).await;
1110
1111 let result = rx.await.unwrap();
1112
1113 for (validator, _) in result.iter() {
1114 info.votes
1115 .get(validator.0 as usize)
1116 .unwrap()
1117 .get(index)
1118 .unwrap()
1119 .store(false, std::sync::atomic::Ordering::SeqCst);
1120 }
1121 },
1122
1123 CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
1124 };
1125 }
1126 }
1127
1128 gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
1129 let at_least_messages =
1130 state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
1131 env.wait_until_metric(
1132 "polkadot_parachain_subsystem_bounded_received",
1133 Some(("subsystem_name", state.subsystem_name())),
1134 |value| {
1135 gum::debug!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
1136 value >= at_least_messages as f64
1137 },
1138 )
1139 .await;
1140
1141 for state in &state.blocks {
1142 for (validator, votes) in state
1143 .votes
1144 .as_ref()
1145 .iter()
1146 .enumerate()
1147 .filter(|(validator, _)| *validator != NODE_UNDER_TEST as usize)
1148 {
1149 for (index, candidate) in votes.iter().enumerate() {
1150 assert_eq!(
1151 (
1152 validator,
1153 index,
1154 candidate.load(std::sync::atomic::Ordering::SeqCst),
1155 state.hash
1156 ),
1157 (validator, index, false, state.hash)
1158 );
1159 }
1160 }
1161 }
1162
1163 env.stop().await;
1164
1165 let duration: u128 = start_marker.elapsed().as_millis();
1166 gum::info!(
1167 "All blocks processed in {} total_sent_messages_to_node {} total_sent_messages_from_node {} num_unique_messages {}",
1168 format!("{duration:?}ms").cyan(),
1169 state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
1170 state.total_sent_messages_from_node.load(std::sync::atomic::Ordering::SeqCst),
1171 state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
1172 );
1173
1174 env.collect_resource_usage(
1175 &["approval-distribution", "approval-voting", "approval-voting-parallel"],
1176 true,
1177 )
1178}