referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/approval/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	approval::{
19		helpers::{
20			generate_babe_epoch, generate_new_session_topology, generate_peer_view_change_for,
21			make_header, PastSystemClock,
22		},
23		message_generator::PeerMessagesGenerator,
24		mock_chain_selection::MockChainSelection,
25		test_message::{MessagesBundle, TestMessageInfo},
26	},
27	configuration::{TestAuthorities, TestConfiguration},
28	dummy_builder,
29	environment::{TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT},
30	mock::{
31		availability_recovery::MockAvailabilityRecovery,
32		candidate_validation::MockCandidateValidation,
33		chain_api::{ChainApiState, MockChainApi},
34		network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
35		runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
36		AlwaysSupportsParachains, TestSyncOracle,
37	},
38	network::{
39		new_network, HandleNetworkMessage, NetworkEmulatorHandle, NetworkInterface,
40		NetworkInterfaceReceiver,
41	},
42	usage::BenchmarkUsage,
43	NODE_UNDER_TEST,
44};
45use codec::{Decode, Encode};
46use colored::Colorize;
47use futures::channel::oneshot;
48use itertools::Itertools;
49use orchestra::TimeoutExt;
50use overseer::{metrics::Metrics as OverseerMetrics, MetricsTrait};
51use polkadot_approval_distribution::ApprovalDistribution;
52use polkadot_node_core_approval_voting_parallel::ApprovalVotingParallelSubsystem;
53use polkadot_node_primitives::approval::time::{
54	slot_number_to_tick, tick_to_slot_number, Clock, ClockExt, SystemClock,
55};
56
57use polkadot_node_core_approval_voting::{
58	ApprovalVotingSubsystem, Config as ApprovalVotingConfig, RealAssignmentCriteria,
59};
60use polkadot_node_network_protocol::v3 as protocol_v3;
61use polkadot_node_primitives::approval::{self, v1::RelayVRFStory};
62use polkadot_node_subsystem::{
63	messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
64	overseer, AllMessages, Overseer, OverseerConnector, SpawnGlue,
65};
66use polkadot_node_subsystem_test_helpers::mock::new_block_import_info;
67use polkadot_overseer::Handle as OverseerHandleReal;
68use polkadot_primitives::{
69	BlockNumber, CandidateEvent, CandidateIndex, CandidateReceiptV2 as CandidateReceipt, Hash,
70	Header, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
71};
72use prometheus::Registry;
73use sc_keystore::LocalKeystore;
74use sc_service::SpawnTaskHandle;
75use serde::{Deserialize, Serialize};
76use sp_application_crypto::AppCrypto;
77use sp_consensus_babe::Epoch as BabeEpoch;
78use sp_core::H256;
79use sp_keystore::Keystore;
80use std::{
81	cmp::max,
82	collections::{HashMap, HashSet},
83	fs,
84	io::Read,
85	ops::Sub,
86	sync::{
87		atomic::{AtomicBool, AtomicU32, AtomicU64},
88		Arc,
89	},
90	time::{Duration, Instant},
91};
92use tokio::time::sleep;
93
94mod helpers;
95mod message_generator;
96mod mock_chain_selection;
97mod test_message;
98
99pub(crate) const LOG_TARGET: &str = "subsystem-bench::approval";
100pub(crate) const NUM_COLUMNS: u32 = 1;
101pub(crate) const SLOT_DURATION_MILLIS: u64 = 6000;
102pub(crate) const TEST_CONFIG: ApprovalVotingConfig = ApprovalVotingConfig {
103	col_approval_data: DATA_COL,
104	slot_duration_millis: SLOT_DURATION_MILLIS,
105};
106
107const DATA_COL: u32 = 0;
108
109/// Start generating messages for a slot into the future, so that the
110/// generation never falls behind the current slot.
111const BUFFER_FOR_GENERATION_MILLIS: u64 = 30_000;
112
113/// Parameters specific to the approvals benchmark
114#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
115#[clap(rename_all = "kebab-case")]
116#[allow(missing_docs)]
117pub struct ApprovalsOptions {
118	#[clap(short, long, default_value_t = 89)]
119	/// The last considered tranche for which we send the message.
120	pub last_considered_tranche: u32,
121	#[clap(short, long, default_value_t = 1.0)]
122	/// Min candidates to be signed in a single approval.
123	pub coalesce_mean: f32,
124	#[clap(short, long, default_value_t = 1.0)]
125	/// Max candidate to be signed in a single approval.
126	pub coalesce_std_dev: f32,
127	/// The maximum tranche diff between approvals coalesced together.
128	pub coalesce_tranche_diff: u32,
129	#[clap(short, long, default_value_t = false)]
130	/// Enable assignments v2.
131	pub enable_assignments_v2: bool,
132	#[clap(short, long, default_value_t = true)]
133	/// Sends messages only till block is approved.
134	pub stop_when_approved: bool,
135	#[clap(short, long)]
136	/// Work directory.
137	#[clap(short, long, default_value_t = format!("/tmp"))]
138	pub workdir_prefix: String,
139	/// The number of no shows per candidate
140	#[clap(short, long, default_value_t = 0)]
141	pub num_no_shows_per_candidate: u32,
142	/// Enable approval voting parallel.
143	#[clap(short, long, default_value_t = true)]
144	pub approval_voting_parallel_enabled: bool,
145}
146
147impl ApprovalsOptions {
148	// Generates a fingerprint use to determine if messages need to be re-generated.
149	fn fingerprint(&self) -> Vec<u8> {
150		let mut bytes = Vec::new();
151		bytes.extend(self.coalesce_mean.to_be_bytes());
152		bytes.extend(self.coalesce_std_dev.to_be_bytes());
153		bytes.extend(self.coalesce_tranche_diff.to_be_bytes());
154		bytes.extend((self.enable_assignments_v2 as i32).to_be_bytes());
155		bytes
156	}
157}
158
159/// Information about a block. It is part of test state and it is used by the mock
160/// subsystems to be able to answer the calls approval-voting and approval-distribution
161/// do into the outside world.
162#[derive(Clone, Debug)]
163struct BlockTestData {
164	/// The slot this block occupies, see implementer's guide to understand what a slot
165	/// is in the context of polkadot.
166	slot: Slot,
167	/// The hash of the block.
168	hash: Hash,
169	/// The block number.
170	block_number: BlockNumber,
171	/// The list of candidates included in this block.
172	candidates: Vec<CandidateEvent>,
173	/// The block header.
174	header: Header,
175	/// The vrf story for the given block.
176	relay_vrf_story: RelayVRFStory,
177	/// If the block has been approved by the approval-voting subsystem.
178	/// This set on `true` when ChainSelectionMessage::Approved is received inside the chain
179	/// selection mock subsystem.
180	approved: Arc<AtomicBool>,
181	/// The total number of candidates before this block.
182	total_candidates_before: u64,
183	/// The votes we sent.
184	/// votes[validator_index][candidate_index] tells if validator sent vote for candidate.
185	/// We use this to mark the test as successful if GetApprovalSignatures returns all the votes
186	/// from here.
187	votes: Arc<Vec<Vec<AtomicBool>>>,
188}
189
190/// Candidate information used during the test to decide if more messages are needed.
191#[derive(Debug)]
192struct CandidateTestData {
193	/// The configured maximum number of no-shows for this candidate.
194	max_no_shows: u32,
195	/// The last tranche where we had a no-show.
196	last_tranche_with_no_show: u32,
197	/// The number of sent assignments.
198	sent_assignment: u32,
199	/// The number of no-shows.
200	num_no_shows: u32,
201	/// The maximum tranche were we covered the needed approvals
202	max_tranche: u32,
203	/// Minimum needed votes to approve candidate.
204	needed_approvals: u32,
205}
206
207impl CandidateTestData {
208	/// If message in this tranche needs to be sent.
209	fn should_send_tranche(&self, tranche: u32) -> bool {
210		self.sent_assignment <= self.needed_approvals ||
211			tranche <= self.max_tranche + self.num_no_shows
212	}
213
214	/// Sets max tranche
215	fn set_max_tranche(&mut self, tranche: u32) {
216		self.max_tranche = max(tranche, self.max_tranche);
217	}
218
219	/// Records no-show for candidate.
220	fn record_no_show(&mut self, tranche: u32) {
221		self.num_no_shows += 1;
222		self.last_tranche_with_no_show = max(tranche, self.last_tranche_with_no_show);
223	}
224
225	/// Marks an assignment sent.
226	fn mark_sent_assignment(&mut self, tranche: u32) {
227		if self.sent_assignment < self.needed_approvals {
228			self.set_max_tranche(tranche);
229		}
230
231		self.sent_assignment += 1;
232	}
233
234	/// Tells if a message in this tranche should be a no-show.
235	fn should_no_show(&self, tranche: u32) -> bool {
236		(self.num_no_shows < self.max_no_shows && self.last_tranche_with_no_show < tranche) ||
237			(tranche == 0 && self.num_no_shows == 0 && self.max_no_shows > 0)
238	}
239}
240
241/// Test state that is pre-generated and loaded from a file that matches the fingerprint
242/// of the TestConfiguration.
243#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
244struct GeneratedState {
245	/// All assignments and approvals
246	all_messages: Option<Vec<test_message::MessagesBundle>>,
247	/// The first slot in the test.
248	initial_slot: Slot,
249}
250
251/// Approval test state used by all mock subsystems to be able to answer messages emitted
252/// by the approval-voting and approval-distribution-subsystems.
253///
254/// This gets cloned across all mock subsystems, so if there is any information that gets
255/// updated between subsystems, they would have to be wrapped in Arc's.
256#[derive(Clone)]
257pub struct ApprovalTestState {
258	/// The main test configuration
259	configuration: TestConfiguration,
260	/// The specific test configurations passed when starting the benchmark.
261	options: ApprovalsOptions,
262	/// The list of blocks used for testing.
263	blocks: Vec<BlockTestData>,
264	/// The babe epoch used during testing.
265	babe_epoch: BabeEpoch,
266	/// The pre-generated state.
267	generated_state: GeneratedState,
268	/// The test authorities
269	test_authorities: TestAuthorities,
270	/// Last approved block number.
271	last_approved_block: Arc<AtomicU32>,
272	/// Total sent messages from peers to node
273	total_sent_messages_to_node: Arc<AtomicU64>,
274	/// Total sent messages from test node to other peers
275	total_sent_messages_from_node: Arc<AtomicU64>,
276	/// Total unique sent messages.
277	total_unique_messages: Arc<AtomicU64>,
278	/// Approval voting metrics.
279	approval_voting_parallel_metrics: polkadot_node_core_approval_voting_parallel::Metrics,
280	/// The delta ticks from the tick the messages were generated to the the time we start this
281	/// message.
282	delta_tick_from_generated: Arc<AtomicU64>,
283}
284
285impl ApprovalTestState {
286	/// Build a new `ApprovalTestState` object out of the configurations passed when the benchmark
287	/// was tested.
288	fn new(
289		configuration: &TestConfiguration,
290		options: ApprovalsOptions,
291		dependencies: &TestEnvironmentDependencies,
292	) -> Self {
293		let test_authorities = configuration.generate_authorities();
294		let start = Instant::now();
295
296		let messages_path = PeerMessagesGenerator::generate_messages_if_needed(
297			configuration,
298			&test_authorities,
299			&options,
300			&dependencies.task_manager.spawn_handle(),
301		);
302
303		let mut messages_file =
304			fs::OpenOptions::new().read(true).open(messages_path.as_path()).unwrap();
305		let mut messages_bytes = Vec::<u8>::with_capacity(2000000);
306
307		messages_file
308			.read_to_end(&mut messages_bytes)
309			.expect("Could not initialize list of messages");
310		let generated_state: GeneratedState =
311			Decode::decode(&mut messages_bytes.as_slice()).expect("Could not decode messages");
312
313		gum::info!(
314			"It took {:?} ms to load {:?} unique messages",
315			start.elapsed().as_millis(),
316			generated_state.all_messages.as_ref().map(|val| val.len()).unwrap_or_default()
317		);
318
319		let babe_epoch =
320			generate_babe_epoch(generated_state.initial_slot, test_authorities.clone());
321		let blocks = Self::generate_blocks_information(
322			configuration,
323			&babe_epoch,
324			generated_state.initial_slot,
325		);
326
327		let state = ApprovalTestState {
328			blocks,
329			babe_epoch: babe_epoch.clone(),
330			generated_state,
331			test_authorities,
332			last_approved_block: Arc::new(AtomicU32::new(0)),
333			total_sent_messages_to_node: Arc::new(AtomicU64::new(0)),
334			total_sent_messages_from_node: Arc::new(AtomicU64::new(0)),
335			total_unique_messages: Arc::new(AtomicU64::new(0)),
336			options,
337			approval_voting_parallel_metrics:
338				polkadot_node_core_approval_voting_parallel::Metrics::try_register(
339					&dependencies.registry,
340				)
341				.unwrap(),
342			delta_tick_from_generated: Arc::new(AtomicU64::new(630720000)),
343			configuration: configuration.clone(),
344		};
345
346		gum::info!("Built testing state");
347
348		state
349	}
350
351	/// Generates the blocks and the information about the blocks that will be used
352	/// to drive this test.
353	fn generate_blocks_information(
354		configuration: &TestConfiguration,
355		babe_epoch: &BabeEpoch,
356		initial_slot: Slot,
357	) -> Vec<BlockTestData> {
358		let mut per_block_heads: Vec<BlockTestData> = Vec::new();
359		let mut prev_candidates = 0;
360		for block_number in 1..=configuration.num_blocks {
361			let block_hash = Hash::repeat_byte(block_number as u8);
362			let parent_hash =
363				per_block_heads.last().map(|val| val.hash).unwrap_or(Hash::repeat_byte(0xde));
364			let slot_for_block = initial_slot + (block_number as u64 - 1);
365
366			let header = make_header(parent_hash, slot_for_block, block_number as u32);
367
368			let unsafe_vrf = approval::v1::babe_unsafe_vrf_info(&header)
369				.expect("Can not continue without vrf generator");
370			let relay_vrf_story = unsafe_vrf
371				.compute_randomness(
372					&babe_epoch.authorities,
373					&babe_epoch.randomness,
374					babe_epoch.epoch_index,
375				)
376				.expect("Can not continue without vrf story");
377			let block_info = BlockTestData {
378				slot: slot_for_block,
379				block_number: block_number as BlockNumber,
380				hash: block_hash,
381				header,
382				candidates: helpers::make_candidates(
383					block_hash,
384					block_number as BlockNumber,
385					configuration.n_cores as u32,
386					configuration.n_cores as u32,
387				),
388				relay_vrf_story,
389				approved: Arc::new(AtomicBool::new(false)),
390				total_candidates_before: prev_candidates,
391				votes: Arc::new(
392					(0..configuration.n_validators)
393						.map(|_| {
394							(0..configuration.n_cores).map(|_| AtomicBool::new(false)).collect_vec()
395						})
396						.collect_vec(),
397				),
398			};
399			prev_candidates += block_info.candidates.len() as u64;
400			per_block_heads.push(block_info)
401		}
402		per_block_heads
403	}
404
405	/// Starts the generation of messages(Assignments & Approvals) needed for approving blocks.
406	async fn start_message_production(
407		&mut self,
408		network_emulator: &NetworkEmulatorHandle,
409		overseer_handle: OverseerHandleReal,
410		env: &TestEnvironment,
411		registry: Registry,
412	) -> oneshot::Receiver<()> {
413		gum::info!(target: LOG_TARGET, "Start assignments/approvals production");
414
415		let (producer_tx, producer_rx) = oneshot::channel();
416		let peer_message_source = PeerMessageProducer {
417			network: network_emulator.clone(),
418			overseer_handle: overseer_handle.clone(),
419			state: self.clone(),
420			options: self.options.clone(),
421			notify_done: producer_tx,
422			registry,
423		};
424
425		peer_message_source
426			.produce_messages(env, self.generated_state.all_messages.take().unwrap());
427		producer_rx
428	}
429
430	// Generates a ChainApiState used for driving MockChainApi
431	fn build_chain_api_state(&self) -> ChainApiState {
432		ChainApiState {
433			block_headers: self
434				.blocks
435				.iter()
436				.map(|block| (block.hash, block.header.clone()))
437				.collect(),
438		}
439	}
440
441	// Builds a map  with the list of candidate events per-block.
442	fn candidate_events_by_block(&self) -> HashMap<H256, Vec<CandidateEvent>> {
443		self.blocks.iter().map(|block| (block.hash, block.candidates.clone())).collect()
444	}
445
446	// Builds a map  with the list of candidate hashes per-block.
447	fn candidate_hashes_by_block(&self) -> HashMap<H256, Vec<CandidateReceipt>> {
448		self.blocks
449			.iter()
450			.map(|block| {
451				(
452					block.hash,
453					block
454						.candidates
455						.iter()
456						.map(|candidate_event| match candidate_event {
457							CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
458							CandidateEvent::CandidateIncluded(receipt, _, _, _) => receipt.clone(),
459							CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
460						})
461						.collect_vec(),
462				)
463			})
464			.collect()
465	}
466
467	fn subsystem_name(&self) -> &'static str {
468		if self.options.approval_voting_parallel_enabled {
469			"approval-voting-parallel-subsystem"
470		} else {
471			"approval-distribution-subsystem"
472		}
473	}
474}
475
476impl ApprovalTestState {
477	/// Returns test data for the given hash
478	fn get_info_by_hash(&self, requested_hash: Hash) -> &BlockTestData {
479		self.blocks
480			.iter()
481			.find(|block| block.hash == requested_hash)
482			.expect("Mocks should not use unknown hashes")
483	}
484
485	/// Returns test data for the given slot
486	fn get_info_by_slot(&self, slot: Slot) -> Option<&BlockTestData> {
487		self.blocks.iter().find(|block| block.slot == slot)
488	}
489}
490
491#[async_trait::async_trait]
492impl HandleNetworkMessage for ApprovalTestState {
493	async fn handle(
494		&self,
495		_message: crate::network::NetworkMessage,
496		_node_sender: &mut futures::channel::mpsc::UnboundedSender<crate::network::NetworkMessage>,
497	) -> Option<crate::network::NetworkMessage> {
498		self.total_sent_messages_from_node
499			.as_ref()
500			.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
501		None
502	}
503}
504
505/// A generator of messages coming from a given Peer/Validator
506struct PeerMessageProducer {
507	/// The state state used to know what messages to generate.
508	state: ApprovalTestState,
509	/// Configuration options, passed at the beginning of the test.
510	options: ApprovalsOptions,
511	/// A reference to the network emulator
512	network: NetworkEmulatorHandle,
513	/// A handle to the overseer, used for sending messages to the node
514	/// under test.
515	overseer_handle: OverseerHandleReal,
516	/// Channel for producer to notify main loop it finished sending
517	/// all messages and they have been processed.
518	notify_done: oneshot::Sender<()>,
519	/// The metrics registry.
520	registry: Registry,
521}
522
523impl PeerMessageProducer {
524	/// Generates messages by spawning a blocking task in the background which begins creating
525	/// the assignments/approvals and peer view changes at the beginning of each block.
526	fn produce_messages(
527		mut self,
528		env: &TestEnvironment,
529		all_messages: Vec<test_message::MessagesBundle>,
530	) {
531		env.spawn_blocking("produce-messages", async move {
532			let mut initialized_blocks = HashSet::new();
533			let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
534				self.initialize_candidates_test_data();
535			let mut skipped_messages: Vec<test_message::MessagesBundle> = Vec::new();
536			let mut re_process_skipped = false;
537
538			let system_clock =
539				PastSystemClock::new(SystemClock {}, self.state.delta_tick_from_generated.clone());
540			let mut all_messages = all_messages.into_iter().peekable();
541
542			while all_messages.peek().is_some() {
543				let current_slot =
544					tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
545				let block_to_initialize = self
546					.state
547					.blocks
548					.iter()
549					.filter(|block_info| {
550						block_info.slot <= current_slot &&
551							!initialized_blocks.contains(&block_info.hash)
552					})
553					.cloned()
554					.collect_vec();
555				for block_info in block_to_initialize {
556					if !TestEnvironment::metric_lower_than(
557						&self.registry,
558						"polkadot_parachain_imported_candidates_total",
559						(block_info.total_candidates_before + block_info.candidates.len() as u64 -
560							1) as f64,
561					) {
562						initialized_blocks.insert(block_info.hash);
563						self.initialize_block(&block_info).await;
564					}
565				}
566
567				let mut maybe_need_skip = if re_process_skipped {
568					skipped_messages.clone().into_iter().peekable()
569				} else {
570					vec![].into_iter().peekable()
571				};
572
573				let progressing_iterator = if !re_process_skipped {
574					&mut all_messages
575				} else {
576					re_process_skipped = false;
577					skipped_messages.clear();
578					&mut maybe_need_skip
579				};
580
581				while progressing_iterator
582					.peek()
583					.map(|bundle| {
584						self.time_to_process_message(
585							bundle,
586							current_slot,
587							&initialized_blocks,
588							&system_clock,
589							&per_candidate_data,
590						)
591					})
592					.unwrap_or_default()
593				{
594					let bundle = progressing_iterator.next().unwrap();
595					re_process_skipped = self.process_message(
596						bundle,
597						&mut per_candidate_data,
598						&mut skipped_messages,
599					) || re_process_skipped;
600				}
601				// Sleep, so that we don't busy wait in this loop when don't have anything to send.
602				sleep(Duration::from_millis(50)).await;
603			}
604
605			gum::info!(
606				"All messages sent max_tranche {:?} last_tranche_with_no_show {:?}",
607				per_candidate_data.values().map(|data| data.max_tranche).max(),
608				per_candidate_data.values().map(|data| data.last_tranche_with_no_show).max()
609			);
610			sleep(Duration::from_secs(6)).await;
611			// Send an empty GetApprovalSignatures as the last message
612			// so when the approval-distribution answered to it, we know it doesn't have anything
613			// else to process.
614			let (tx, rx) = oneshot::channel();
615			let msg = if self.options.approval_voting_parallel_enabled {
616				AllMessages::ApprovalVotingParallel(
617					ApprovalVotingParallelMessage::GetApprovalSignatures(HashSet::new(), tx),
618				)
619			} else {
620				AllMessages::ApprovalDistribution(
621					ApprovalDistributionMessage::GetApprovalSignatures(HashSet::new(), tx),
622				)
623			};
624			self.send_overseer_message(msg, ValidatorIndex(0), None).await;
625			rx.await.expect("Failed to get signatures");
626			self.notify_done.send(()).expect("Failed to notify main loop");
627			gum::info!("All messages processed ");
628		});
629	}
630
631	// Processes a single message bundle and queue the messages to be sent by the peers that would
632	// send the message in our simulation.
633	pub fn process_message(
634		&mut self,
635		bundle: test_message::MessagesBundle,
636		per_candidate_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
637		skipped_messages: &mut Vec<test_message::MessagesBundle>,
638	) -> bool {
639		let mut reprocess_skipped = false;
640		let block_info = self
641			.state
642			.get_info_by_hash(bundle.assignments.first().unwrap().block_hash)
643			.clone();
644
645		if bundle.should_send(per_candidate_data, &self.options) {
646			bundle.record_sent_assignment(per_candidate_data);
647
648			let assignments = bundle.assignments.clone();
649
650			for message in bundle.assignments.into_iter().chain(bundle.approvals.into_iter()) {
651				if message.no_show_if_required(&assignments, per_candidate_data) {
652					reprocess_skipped = true;
653					continue;
654				} else {
655					message.record_vote(&block_info);
656				}
657				self.state
658					.total_unique_messages
659					.as_ref()
660					.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
661				for (peer, messages) in
662					message.clone().split_by_peer_id(&self.state.test_authorities)
663				{
664					for message in messages {
665						self.state
666							.total_sent_messages_to_node
667							.as_ref()
668							.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
669						self.queue_message_from_peer(message, peer.0)
670					}
671				}
672			}
673		} else if !block_info.approved.load(std::sync::atomic::Ordering::SeqCst) &&
674			self.options.num_no_shows_per_candidate > 0
675		{
676			skipped_messages.push(bundle);
677		}
678		reprocess_skipped
679	}
680
681	// Tells if it is the time to process a message.
682	pub fn time_to_process_message(
683		&self,
684		bundle: &MessagesBundle,
685		current_slot: Slot,
686		initialized_blocks: &HashSet<Hash>,
687		system_clock: &PastSystemClock,
688		per_candidate_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
689	) -> bool {
690		let block_info =
691			self.state.get_info_by_hash(bundle.assignments.first().unwrap().block_hash);
692		let tranche_now = system_clock.tranche_now(SLOT_DURATION_MILLIS, block_info.slot);
693
694		Self::is_past_tranche(
695			bundle,
696			tranche_now,
697			current_slot,
698			block_info,
699			initialized_blocks.contains(&block_info.hash),
700		) || !bundle.should_send(per_candidate_data, &self.options)
701	}
702
703	// Tells if the tranche where the bundle should be sent has passed.
704	pub fn is_past_tranche(
705		bundle: &MessagesBundle,
706		tranche_now: u32,
707		current_slot: Slot,
708		block_info: &BlockTestData,
709		block_initialized: bool,
710	) -> bool {
711		bundle.tranche_to_send() <= tranche_now &&
712			current_slot >= block_info.slot &&
713			block_initialized
714	}
715
716	// Queue message to be sent by validator `sent_by`
717	fn queue_message_from_peer(&mut self, message: TestMessageInfo, sent_by: ValidatorIndex) {
718		let peer_authority_id = self
719			.state
720			.test_authorities
721			.validator_authority_id
722			.get(sent_by.0 as usize)
723			.expect("We can't handle unknown peers")
724			.clone();
725
726		if let Err(err) = self.network.send_message_from_peer(
727			&peer_authority_id,
728			protocol_v3::ValidationProtocol::ApprovalDistribution(message.msg).into(),
729		) {
730			gum::warn!(target: LOG_TARGET, ?sent_by, ?err, "Validator can not send message");
731		}
732	}
733
734	// Queues a message to be sent by the peer identified by the `sent_by` value.
735	async fn send_overseer_message(
736		&mut self,
737		message: AllMessages,
738		_sent_by: ValidatorIndex,
739		_latency: Option<Duration>,
740	) {
741		self.overseer_handle
742			.send_msg(message, LOG_TARGET)
743			.timeout(MAX_TIME_OF_FLIGHT)
744			.await
745			.unwrap_or_else(|| {
746				panic!("{} ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
747			});
748	}
749
750	// Sends the messages needed by approval-distribution and approval-voting for processing a
751	// message. E.g: PeerViewChange.
752	async fn initialize_block(&mut self, block_info: &BlockTestData) {
753		gum::info!("Initialize block {:?}", block_info.hash);
754		let (tx, rx) = oneshot::channel();
755		self.overseer_handle.wait_for_activation(block_info.hash, tx).await;
756
757		rx.await
758			.expect("We should not fail waiting for block to be activated")
759			.expect("We should not fail waiting for block to be activated");
760
761		for validator in 1..self.state.test_authorities.validator_authority_id.len() as u32 {
762			let peer_id = self.state.test_authorities.peer_ids.get(validator as usize).unwrap();
763			let validator = ValidatorIndex(validator);
764			let view_update = generate_peer_view_change_for(
765				block_info.hash,
766				*peer_id,
767				self.state.options.approval_voting_parallel_enabled,
768			);
769
770			self.send_overseer_message(view_update, validator, None).await;
771		}
772	}
773
774	// Initializes the candidates test data. This is used for bookkeeping if more assignments and
775	// approvals would be needed.
776	fn initialize_candidates_test_data(
777		&self,
778	) -> HashMap<(Hash, CandidateIndex), CandidateTestData> {
779		let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
780			HashMap::new();
781		for block_info in self.state.blocks.iter() {
782			for (candidate_index, _) in block_info.candidates.iter().enumerate() {
783				per_candidate_data.insert(
784					(block_info.hash, candidate_index as CandidateIndex),
785					CandidateTestData {
786						max_no_shows: self.options.num_no_shows_per_candidate,
787						last_tranche_with_no_show: 0,
788						sent_assignment: 0,
789						num_no_shows: 0,
790						max_tranche: 0,
791						needed_approvals: self.state.configuration.needed_approvals as u32,
792					},
793				);
794			}
795		}
796		per_candidate_data
797	}
798}
799
800/// Helper function to build an overseer with the real implementation for `ApprovalDistribution` and
801/// `ApprovalVoting` subsystems and mock subsystems for all others.
802fn build_overseer(
803	state: &ApprovalTestState,
804	network: &NetworkEmulatorHandle,
805	config: &TestConfiguration,
806	dependencies: &TestEnvironmentDependencies,
807	network_interface: &NetworkInterface,
808	network_receiver: NetworkInterfaceReceiver,
809) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandleReal) {
810	let overseer_connector = OverseerConnector::with_event_capacity(6400000);
811
812	let spawn_task_handle = dependencies.task_manager.spawn_handle();
813
814	let db = kvdb_memorydb::create(NUM_COLUMNS);
815	let db: polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter<kvdb_memorydb::InMemory> =
816		polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
817	let keystore = LocalKeystore::in_memory();
818	keystore
819		.sr25519_generate_new(
820			ASSIGNMENT_KEY_TYPE_ID,
821			Some(state.test_authorities.key_seeds.get(NODE_UNDER_TEST as usize).unwrap().as_str()),
822		)
823		.unwrap();
824	keystore
825		.sr25519_generate_new(
826			ValidatorId::ID,
827			Some(state.test_authorities.key_seeds.get(NODE_UNDER_TEST as usize).unwrap().as_str()),
828		)
829		.unwrap();
830
831	let system_clock =
832		PastSystemClock::new(SystemClock {}, state.delta_tick_from_generated.clone());
833	let keystore = Arc::new(keystore);
834	let db = Arc::new(db);
835
836	let mock_chain_api = MockChainApi::new(state.build_chain_api_state());
837	let mock_chain_selection =
838		MockChainSelection { state: state.clone(), clock: system_clock.clone() };
839	let mock_runtime_api = MockRuntimeApi::new(
840		config.clone(),
841		state.test_authorities.clone(),
842		state.candidate_hashes_by_block(),
843		state.candidate_events_by_block(),
844		Some(state.babe_epoch.clone()),
845		1,
846		MockRuntimeApiCoreState::Occupied,
847	);
848	let mock_tx_bridge = MockNetworkBridgeTx::new(
849		network.clone(),
850		network_interface.subsystem_sender(),
851		state.test_authorities.clone(),
852	);
853	let mock_rx_bridge = MockNetworkBridgeRx::new(network_receiver, None);
854	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
855	let task_handle = spawn_task_handle.clone();
856	let dummy = dummy_builder!(task_handle, overseer_metrics)
857		.replace_chain_api(|_| mock_chain_api)
858		.replace_chain_selection(|_| mock_chain_selection)
859		.replace_runtime_api(|_| mock_runtime_api)
860		.replace_network_bridge_tx(|_| mock_tx_bridge)
861		.replace_network_bridge_rx(|_| mock_rx_bridge)
862		.replace_availability_recovery(|_| MockAvailabilityRecovery::new())
863		.replace_candidate_validation(|_| MockCandidateValidation::new());
864
865	let (overseer, raw_handle) = if state.options.approval_voting_parallel_enabled {
866		let approval_voting_parallel = ApprovalVotingParallelSubsystem::with_config_and_clock(
867			TEST_CONFIG,
868			db.clone(),
869			keystore.clone(),
870			Box::new(TestSyncOracle {}),
871			state.approval_voting_parallel_metrics.clone(),
872			Arc::new(system_clock.clone()),
873			SpawnGlue(spawn_task_handle.clone()),
874			None,
875		);
876		dummy
877			.replace_approval_voting_parallel(|_| approval_voting_parallel)
878			.build_with_connector(overseer_connector)
879			.expect("Should not fail")
880	} else {
881		let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
882			TEST_CONFIG,
883			db.clone(),
884			keystore.clone(),
885			Box::new(TestSyncOracle {}),
886			state.approval_voting_parallel_metrics.approval_voting_metrics(),
887			Arc::new(system_clock.clone()),
888			Arc::new(SpawnGlue(spawn_task_handle.clone())),
889			1,
890			Duration::from_secs(1),
891		);
892
893		let approval_distribution = ApprovalDistribution::new_with_clock(
894			state.approval_voting_parallel_metrics.approval_distribution_metrics(),
895			TEST_CONFIG.slot_duration_millis,
896			Arc::new(system_clock.clone()),
897			Arc::new(RealAssignmentCriteria {}),
898		);
899
900		dummy
901			.replace_approval_voting(|_| approval_voting)
902			.replace_approval_distribution(|_| approval_distribution)
903			.build_with_connector(overseer_connector)
904			.expect("Should not fail")
905	};
906
907	let overseer_handle = OverseerHandleReal::new(raw_handle);
908	(overseer, overseer_handle)
909}
910
911/// Takes a test configuration and uses it to creates the `TestEnvironment`.
912pub fn prepare_test(
913	config: TestConfiguration,
914	options: ApprovalsOptions,
915	with_prometheus_endpoint: bool,
916) -> (TestEnvironment, ApprovalTestState) {
917	prepare_test_inner(
918		config,
919		TestEnvironmentDependencies::default(),
920		options,
921		with_prometheus_endpoint,
922	)
923}
924
925/// Build the test environment for an Approval benchmark.
926fn prepare_test_inner(
927	config: TestConfiguration,
928	dependencies: TestEnvironmentDependencies,
929	options: ApprovalsOptions,
930	with_prometheus_endpoint: bool,
931) -> (TestEnvironment, ApprovalTestState) {
932	gum::info!("Prepare test state");
933	let state = ApprovalTestState::new(&config, options, &dependencies);
934
935	gum::info!("Build network emulator");
936
937	let (network, network_interface, network_receiver) =
938		new_network(&config, &dependencies, &state.test_authorities, vec![Arc::new(state.clone())]);
939
940	gum::info!("Build overseer");
941
942	let (overseer, overseer_handle) = build_overseer(
943		&state,
944		&network,
945		&config,
946		&dependencies,
947		&network_interface,
948		network_receiver,
949	);
950
951	(
952		TestEnvironment::new(
953			dependencies,
954			config,
955			network,
956			overseer,
957			overseer_handle,
958			state.test_authorities.clone(),
959			with_prometheus_endpoint,
960		),
961		state,
962	)
963}
964
965pub async fn bench_approvals(
966	env: &mut TestEnvironment,
967	mut state: ApprovalTestState,
968) -> BenchmarkUsage {
969	let producer_rx = state
970		.start_message_production(
971			env.network(),
972			env.overseer_handle().clone(),
973			env,
974			env.registry().clone(),
975		)
976		.await;
977	bench_approvals_run(env, state, producer_rx).await
978}
979
980/// Runs the approval benchmark.
981pub async fn bench_approvals_run(
982	env: &mut TestEnvironment,
983	state: ApprovalTestState,
984	producer_rx: oneshot::Receiver<()>,
985) -> BenchmarkUsage {
986	let config = env.config().clone();
987
988	env.metrics().set_n_validators(config.n_validators);
989	env.metrics().set_n_cores(config.n_cores);
990
991	// First create the initialization messages that make sure that then node under
992	// tests receives notifications about the topology used and the connected peers.
993	let mut initialization_messages = env.network().generate_peer_connected(|e| {
994		if state.options.approval_voting_parallel_enabled {
995			AllMessages::ApprovalVotingParallel(ApprovalVotingParallelMessage::NetworkBridgeUpdate(
996				e,
997			))
998		} else {
999			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(e))
1000		}
1001	});
1002	initialization_messages.extend(generate_new_session_topology(
1003		&state.test_authorities,
1004		ValidatorIndex(NODE_UNDER_TEST),
1005		state.options.approval_voting_parallel_enabled,
1006	));
1007	for message in initialization_messages {
1008		env.send_message(message).await;
1009	}
1010
1011	let start_marker = Instant::now();
1012	let real_clock = SystemClock {};
1013	state.delta_tick_from_generated.store(
1014		real_clock.tick_now() -
1015			slot_number_to_tick(SLOT_DURATION_MILLIS, state.generated_state.initial_slot),
1016		std::sync::atomic::Ordering::SeqCst,
1017	);
1018	let system_clock = PastSystemClock::new(real_clock, state.delta_tick_from_generated.clone());
1019
1020	for block_num in 0..env.config().num_blocks {
1021		let mut current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
1022
1023		// Wait until the time arrives at the first slot under test.
1024		while current_slot < state.generated_state.initial_slot {
1025			sleep(Duration::from_millis(5)).await;
1026			current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
1027		}
1028
1029		gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks);
1030		env.metrics().set_current_block(block_num);
1031		let block_start_ts = Instant::now();
1032
1033		if let Some(block_info) = state.get_info_by_slot(current_slot) {
1034			env.import_block(new_block_import_info(block_info.hash, block_info.block_number))
1035				.await;
1036		}
1037
1038		let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
1039		env.metrics().set_block_time(block_time);
1040		gum::info!("Block time {}", format!("{block_time:?}ms").cyan());
1041
1042		system_clock
1043			.wait(slot_number_to_tick(SLOT_DURATION_MILLIS, current_slot + 1))
1044			.await;
1045	}
1046
1047	// Wait for all blocks to be approved before exiting.
1048	// This is an invariant of the benchmark, if this does not happen something went terribly wrong.
1049	while state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst) <
1050		env.config().num_blocks as u32
1051	{
1052		gum::info!(
1053			"Waiting for all blocks to be approved current approved {:} num_sent {:} num_unique {:}",
1054			state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst),
1055			state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
1056			state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
1057		);
1058		tokio::time::sleep(Duration::from_secs(6)).await;
1059	}
1060
1061	gum::info!("Awaiting producer to signal done");
1062
1063	producer_rx.await.expect("Failed to receive done from message producer");
1064
1065	gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
1066	let at_least_messages =
1067		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
1068	env.wait_until_metric(
1069		"polkadot_parachain_subsystem_bounded_received",
1070		Some((
1071			"subsystem_name",
1072			if state.options.approval_voting_parallel_enabled {
1073				"approval-voting-parallel-subsystem"
1074			} else {
1075				"approval-distribution-subsystem"
1076			},
1077		)),
1078		|value| {
1079			gum::debug!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
1080			value >= at_least_messages as f64
1081		},
1082	)
1083	.await;
1084
1085	gum::info!("Requesting approval votes ms");
1086
1087	for info in &state.blocks {
1088		for (index, candidates) in info.candidates.iter().enumerate() {
1089			match candidates {
1090				CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
1091				CandidateEvent::CandidateIncluded(receipt_fetch, _head, _, _) => {
1092					let (tx, rx) = oneshot::channel();
1093
1094					let msg = if state.options.approval_voting_parallel_enabled {
1095						AllMessages::ApprovalVotingParallel(
1096							ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(
1097								receipt_fetch.hash(),
1098								tx,
1099							),
1100						)
1101					} else {
1102						AllMessages::ApprovalVoting(
1103							ApprovalVotingMessage::GetApprovalSignaturesForCandidate(
1104								receipt_fetch.hash(),
1105								tx,
1106							),
1107						)
1108					};
1109					env.send_message(msg).await;
1110
1111					let result = rx.await.unwrap();
1112
1113					for (validator, _) in result.iter() {
1114						info.votes
1115							.get(validator.0 as usize)
1116							.unwrap()
1117							.get(index)
1118							.unwrap()
1119							.store(false, std::sync::atomic::Ordering::SeqCst);
1120					}
1121				},
1122
1123				CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
1124			};
1125		}
1126	}
1127
1128	gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
1129	let at_least_messages =
1130		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
1131	env.wait_until_metric(
1132		"polkadot_parachain_subsystem_bounded_received",
1133		Some(("subsystem_name", state.subsystem_name())),
1134		|value| {
1135			gum::debug!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
1136			value >= at_least_messages as f64
1137		},
1138	)
1139	.await;
1140
1141	for state in &state.blocks {
1142		for (validator, votes) in state
1143			.votes
1144			.as_ref()
1145			.iter()
1146			.enumerate()
1147			.filter(|(validator, _)| *validator != NODE_UNDER_TEST as usize)
1148		{
1149			for (index, candidate) in votes.iter().enumerate() {
1150				assert_eq!(
1151					(
1152						validator,
1153						index,
1154						candidate.load(std::sync::atomic::Ordering::SeqCst),
1155						state.hash
1156					),
1157					(validator, index, false, state.hash)
1158				);
1159			}
1160		}
1161	}
1162
1163	env.stop().await;
1164
1165	let duration: u128 = start_marker.elapsed().as_millis();
1166	gum::info!(
1167		"All blocks processed in {} total_sent_messages_to_node {} total_sent_messages_from_node {} num_unique_messages {}",
1168		format!("{duration:?}ms").cyan(),
1169		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
1170		state.total_sent_messages_from_node.load(std::sync::atomic::Ordering::SeqCst),
1171		state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
1172	);
1173
1174	env.collect_resource_usage(
1175		&["approval-distribution", "approval-voting", "approval-voting-parallel"],
1176		true,
1177	)
1178}