referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/approval/
message_generator.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	approval::{
19		helpers::{generate_babe_epoch, generate_topology},
20		test_message::{MessagesBundle, TestMessageInfo},
21		ApprovalTestState, ApprovalsOptions, BlockTestData, GeneratedState,
22		BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS,
23	},
24	configuration::{TestAuthorities, TestConfiguration},
25	mock::runtime_api::session_info_for_peers,
26	NODE_UNDER_TEST,
27};
28use codec::Encode;
29use futures::SinkExt;
30use itertools::Itertools;
31use polkadot_node_core_approval_voting::criteria::{compute_assignments, Config};
32
33use polkadot_node_network_protocol::{
34	grid_topology::{GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology},
35	v3 as protocol_v3,
36};
37use polkadot_node_primitives::approval::{
38	self,
39	time::tranche_to_tick,
40	v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
41};
42use polkadot_primitives::{
43	ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex, CoreIndex, Hash,
44	SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
45};
46use rand::{seq::SliceRandom, RngCore, SeedableRng};
47use rand_chacha::ChaCha20Rng;
48use rand_distr::{Distribution, Normal};
49use sc_keystore::LocalKeystore;
50use sc_network_types::PeerId;
51use sc_service::SpawnTaskHandle;
52use sha1::Digest;
53use sp_application_crypto::AppCrypto;
54use sp_consensus_babe::SlotDuration;
55use sp_keystore::Keystore;
56use sp_timestamp::Timestamp;
57use std::{
58	cmp::max,
59	collections::{BTreeMap, HashSet},
60	fs,
61	io::Write,
62	path::{Path, PathBuf},
63	time::Duration,
64};
65
66/// A generator of messages coming from a given Peer/Validator
67pub struct PeerMessagesGenerator {
68	/// The grid neighbors of the node under test.
69	pub topology_node_under_test: GridNeighbors,
70	/// The topology of the network for the epoch under test.
71	pub topology: SessionGridTopology,
72	/// The validator index for this object generates the messages.
73	pub validator_index: ValidatorIndex,
74	/// An array of pre-generated random samplings, that is used to determine, which nodes would
75	/// send a given assignment, to the node under test because of the random samplings.
76	/// As an optimization we generate this sampling at the beginning of the test and just pick
77	/// one randomly, because always taking the samples would be too expensive for benchmark.
78	pub random_samplings: Vec<Vec<ValidatorIndex>>,
79	/// Channel for sending the generated messages to the aggregator
80	pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec<MessagesBundle>)>,
81	/// The list of test authorities
82	pub test_authorities: TestAuthorities,
83	//// The session info used for the test.
84	pub session_info: SessionInfo,
85	/// The blocks used for testing
86	pub blocks: Vec<BlockTestData>,
87	/// Approval options params.
88	pub options: ApprovalsOptions,
89}
90
91impl PeerMessagesGenerator {
92	/// Generates messages by spawning a blocking task in the background which begins creating
93	/// the assignments/approvals and peer view changes at the beginning of each block.
94	pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) {
95		spawn_task_handle.spawn("generate-messages", "generate-messages", async move {
96			for block_info in &self.blocks {
97				let assignments = self.generate_assignments(block_info);
98
99				let bytes = self.validator_index.0.to_be_bytes();
100				let seed = [
101					bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
102					0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
103				];
104
105				let mut rand_chacha = ChaCha20Rng::from_seed(seed);
106				let approvals = issue_approvals(
107					assignments,
108					block_info.hash,
109					&self.test_authorities.validator_public,
110					block_info.candidates.clone(),
111					&self.options,
112					&mut rand_chacha,
113					self.test_authorities.keyring.keystore_ref(),
114				);
115
116				self.tx_messages
117					.send((block_info.hash, approvals))
118					.await
119					.expect("Should not fail");
120			}
121		})
122	}
123
124	// Builds the messages finger print corresponding to this configuration.
125	// When the finger print exists already on disk the messages are not re-generated.
126	fn messages_fingerprint(
127		configuration: &TestConfiguration,
128		options: &ApprovalsOptions,
129	) -> String {
130		let mut fingerprint = options.fingerprint();
131		let configuration_bytes = bincode::serialize(&configuration).unwrap();
132		fingerprint.extend(configuration_bytes);
133		let mut sha1 = sha1::Sha1::new();
134		sha1.update(fingerprint);
135		let result = sha1.finalize();
136		hex::encode(result)
137	}
138
139	/// Generate all messages(Assignments & Approvals) needed for approving `blocks``.
140	pub fn generate_messages_if_needed(
141		configuration: &TestConfiguration,
142		test_authorities: &TestAuthorities,
143		options: &ApprovalsOptions,
144		spawn_task_handle: &SpawnTaskHandle,
145	) -> PathBuf {
146		let path_name = format!(
147			"{}/{}",
148			options.workdir_prefix,
149			Self::messages_fingerprint(configuration, options)
150		);
151
152		let path = Path::new(&path_name);
153		if path.exists() {
154			return path.to_path_buf();
155		}
156
157		gum::info!("Generate message because file does not exist");
158		let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS);
159		let initial_slot = Slot::from_timestamp(
160			(*Timestamp::current() - *delta_to_first_slot_under_test).into(),
161			SlotDuration::from_millis(SLOT_DURATION_MILLIS),
162		);
163
164		let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone());
165		let session_info = session_info_for_peers(configuration, test_authorities);
166		let blocks = ApprovalTestState::generate_blocks_information(
167			configuration,
168			&babe_epoch,
169			initial_slot,
170		);
171
172		gum::info!(target: LOG_TARGET, "Generate messages");
173		let topology = generate_topology(test_authorities);
174
175		let random_samplings = random_samplings_to_node(
176			ValidatorIndex(NODE_UNDER_TEST),
177			test_authorities.validator_public.len(),
178			test_authorities.validator_public.len() * 2,
179		);
180
181		let topology_node_under_test =
182			topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
183
184		let (tx, mut rx) = futures::channel::mpsc::unbounded();
185
186		// Spawn a thread to generate the messages for each validator, so that we speed up the
187		// generation.
188		for current_validator_index in 1..test_authorities.validator_public.len() {
189			let peer_message_source = PeerMessagesGenerator {
190				topology_node_under_test: topology_node_under_test.clone(),
191				topology: topology.clone(),
192				validator_index: ValidatorIndex(current_validator_index as u32),
193				test_authorities: test_authorities.clone(),
194				session_info: session_info.clone(),
195				blocks: blocks.clone(),
196				tx_messages: tx.clone(),
197				random_samplings: random_samplings.clone(),
198				options: options.clone(),
199			};
200
201			peer_message_source.generate_messages(spawn_task_handle);
202		}
203
204		std::mem::drop(tx);
205
206		let seed = [0x32; 32];
207		let mut rand_chacha = ChaCha20Rng::from_seed(seed);
208
209		let mut all_messages: BTreeMap<u64, Vec<MessagesBundle>> = BTreeMap::new();
210		// Receive all messages and sort them by Tick they have to be sent.
211		loop {
212			match rx.try_next() {
213				Ok(Some((block_hash, messages))) =>
214					for message in messages {
215						let block_info = blocks
216							.iter()
217							.find(|val| val.hash == block_hash)
218							.expect("Should find blocks");
219						let tick_to_send = tranche_to_tick(
220							SLOT_DURATION_MILLIS,
221							block_info.slot,
222							message.tranche_to_send(),
223						);
224						let to_add = all_messages.entry(tick_to_send).or_default();
225						to_add.push(message);
226					},
227				Ok(None) => break,
228				Err(_) => {
229					std::thread::sleep(Duration::from_millis(50));
230				},
231			}
232		}
233		let all_messages = all_messages
234			.into_iter()
235			.flat_map(|(_, mut messages)| {
236				// Shuffle the messages inside the same tick, so that we don't priorities messages
237				// for older nodes. we try to simulate the same behaviour as in real world.
238				messages.shuffle(&mut rand_chacha);
239				messages
240			})
241			.collect_vec();
242
243		gum::info!("Generated a number of {:} unique messages", all_messages.len());
244
245		let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot };
246
247		let mut messages_file = fs::OpenOptions::new()
248			.write(true)
249			.create(true)
250			.truncate(true)
251			.open(path)
252			.unwrap();
253
254		messages_file
255			.write_all(&generated_state.encode())
256			.expect("Could not update message file");
257		path.to_path_buf()
258	}
259
260	/// Generates assignments for the given `current_validator_index`
261	/// Returns a list of assignments to be sent sorted by tranche.
262	fn generate_assignments(&self, block_info: &BlockTestData) -> Vec<TestMessageInfo> {
263		let config = Config::from(&self.session_info);
264
265		let leaving_cores = block_info
266			.candidates
267			.clone()
268			.into_iter()
269			.map(|candidate_event| {
270				if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) =
271					candidate_event
272				{
273					(candidate.hash(), core_index, group_index)
274				} else {
275					todo!("Variant is never created in this benchmark")
276				}
277			})
278			.collect_vec();
279
280		let mut assignments_by_tranche = BTreeMap::new();
281
282		let bytes = self.validator_index.0.to_be_bytes();
283		let seed = [
284			bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
285			0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
286		];
287		let mut rand_chacha = ChaCha20Rng::from_seed(seed);
288
289		let to_be_sent_by = neighbours_that_would_sent_message(
290			&self.test_authorities.peer_ids,
291			self.validator_index.0,
292			&self.topology_node_under_test,
293			&self.topology,
294		);
295
296		let leaving_cores = leaving_cores
297			.clone()
298			.into_iter()
299			.filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0)
300			.collect_vec();
301
302		let store = LocalKeystore::in_memory();
303		let _public = store
304			.sr25519_generate_new(
305				ASSIGNMENT_KEY_TYPE_ID,
306				Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()),
307			)
308			.expect("should not fail");
309		let assignments = compute_assignments(
310			&store,
311			block_info.relay_vrf_story.clone(),
312			&config,
313			leaving_cores.clone(),
314			self.options.enable_assignments_v2,
315		);
316
317		let random_sending_nodes = self
318			.random_samplings
319			.get(rand_chacha.next_u32() as usize % self.random_samplings.len())
320			.unwrap();
321		let random_sending_peer_ids = random_sending_nodes
322			.iter()
323			.map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize]))
324			.collect_vec();
325
326		let mut unique_assignments = HashSet::new();
327		for (core_index, assignment) in assignments {
328			let assigned_cores = match &assignment.cert().kind {
329				approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
330					core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(),
331				approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } =>
332					vec![*core_index],
333				approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
334					vec![core_index],
335			};
336
337			let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap();
338
339			// For the cases where tranch0 assignments are in a single certificate we need to make
340			// sure we create a single message.
341			if unique_assignments.insert(bitfiled) {
342				let this_tranche_assignments =
343					assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new);
344
345				this_tranche_assignments.push((
346					IndirectAssignmentCertV2 {
347						block_hash: block_info.hash,
348						validator: self.validator_index,
349						cert: assignment.cert().clone(),
350					},
351					block_info
352						.candidates
353						.iter()
354						.enumerate()
355						.filter(|(_index, candidate)| {
356							if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate {
357								assigned_cores.contains(core)
358							} else {
359								panic!("Should not happen");
360							}
361						})
362						.map(|(index, _)| index as u32)
363						.collect_vec()
364						.try_into()
365						.unwrap(),
366					to_be_sent_by
367						.iter()
368						.chain(random_sending_peer_ids.iter())
369						.copied()
370						.collect::<HashSet<(ValidatorIndex, PeerId)>>(),
371					assignment.tranche(),
372				));
373			}
374		}
375
376		assignments_by_tranche
377			.into_values()
378			.flat_map(|assignments| assignments.into_iter())
379			.map(|assignment| {
380				let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![(
381					assignment.0,
382					assignment.1,
383				)]);
384				TestMessageInfo {
385					msg,
386					sent_by: assignment
387						.2
388						.into_iter()
389						.map(|(validator_index, _)| validator_index)
390						.collect_vec(),
391					tranche: assignment.3,
392					block_hash: block_info.hash,
393				}
394			})
395			.collect_vec()
396	}
397}
398
399/// A list of random samplings that we use to determine which nodes should send a given message to
400/// the node under test.
401/// We can not sample every time for all the messages because that would be too expensive to
402/// perform, so pre-generate a list of samples for a given network size.
403/// - result[i] give us as a list of random nodes that would send a given message to the node under
404///   test.
405fn random_samplings_to_node(
406	node_under_test: ValidatorIndex,
407	num_validators: usize,
408	num_samplings: usize,
409) -> Vec<Vec<ValidatorIndex>> {
410	let seed = [7u8; 32];
411	let mut rand_chacha = ChaCha20Rng::from_seed(seed);
412
413	(0..num_samplings)
414		.map(|_| {
415			(0..num_validators)
416				.filter(|sending_validator_index| {
417					*sending_validator_index != NODE_UNDER_TEST as usize
418				})
419				.flat_map(|sending_validator_index| {
420					let mut validators = (0..num_validators).collect_vec();
421					validators.shuffle(&mut rand_chacha);
422
423					let mut random_routing = RandomRouting::default();
424					validators
425						.into_iter()
426						.flat_map(|validator_to_send| {
427							if random_routing.sample(num_validators, &mut rand_chacha) {
428								random_routing.inc_sent();
429								if validator_to_send == node_under_test.0 as usize {
430									Some(ValidatorIndex(sending_validator_index as u32))
431								} else {
432									None
433								}
434							} else {
435								None
436							}
437						})
438						.collect_vec()
439				})
440				.collect_vec()
441		})
442		.collect_vec()
443}
444
445/// Helper function to randomly determine how many approvals we coalesce together in a single
446/// message.
447fn coalesce_approvals_len(
448	coalesce_mean: f32,
449	coalesce_std_dev: f32,
450	rand_chacha: &mut ChaCha20Rng,
451) -> usize {
452	max(
453		1,
454		Normal::new(coalesce_mean, coalesce_std_dev)
455			.expect("normal distribution parameters are good")
456			.sample(rand_chacha)
457			.round() as i32,
458	) as usize
459}
460
461/// Helper function to create approvals signatures for all assignments passed as arguments.
462/// Returns a list of Approvals messages that need to be sent.
463fn issue_approvals(
464	assignments: Vec<TestMessageInfo>,
465	block_hash: Hash,
466	validator_ids: &[ValidatorId],
467	candidates: Vec<CandidateEvent>,
468	options: &ApprovalsOptions,
469	rand_chacha: &mut ChaCha20Rng,
470	store: &LocalKeystore,
471) -> Vec<MessagesBundle> {
472	let mut queued_to_sign: Vec<TestSignInfo> = Vec::new();
473	let mut num_coalesce =
474		coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha);
475	let result = assignments
476		.iter()
477		.map(|message| match &message.msg {
478			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
479				let mut approvals_to_create = Vec::new();
480
481				let current_validator_index = queued_to_sign
482					.first()
483					.map(|msg| msg.validator_index)
484					.unwrap_or(ValidatorIndex(99999));
485
486				// Invariant for this benchmark.
487				assert_eq!(assignments.len(), 1);
488
489				let assignment = assignments.first().unwrap();
490
491				let earliest_tranche = queued_to_sign
492					.first()
493					.map(|val| val.assignment.tranche)
494					.unwrap_or(message.tranche);
495
496				if queued_to_sign.len() >= num_coalesce ||
497					(!queued_to_sign.is_empty() &&
498						current_validator_index != assignment.0.validator) ||
499					message.tranche - earliest_tranche >= options.coalesce_tranche_diff
500				{
501					approvals_to_create.push(TestSignInfo::sign_candidates(
502						&mut queued_to_sign,
503						validator_ids,
504						block_hash,
505						num_coalesce,
506						store,
507					));
508					num_coalesce = coalesce_approvals_len(
509						options.coalesce_mean,
510						options.coalesce_std_dev,
511						rand_chacha,
512					);
513				}
514
515				// If more that one candidate was in the assignment queue all of them for issuing
516				// approvals
517				for candidate_index in assignment.1.iter_ones() {
518					let candidate = candidates.get(candidate_index).unwrap();
519					if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate {
520						queued_to_sign.push(TestSignInfo {
521							candidate_hash: candidate.hash(),
522							candidate_index: candidate_index as CandidateIndex,
523							validator_index: assignment.0.validator,
524							assignment: message.clone(),
525						});
526					} else {
527						todo!("Other enum variants are not used in this benchmark");
528					}
529				}
530				approvals_to_create
531			},
532			_ => {
533				todo!("Other enum variants are not used in this benchmark");
534			},
535		})
536		.collect_vec();
537
538	let mut messages = result.into_iter().flatten().collect_vec();
539
540	if !queued_to_sign.is_empty() {
541		messages.push(TestSignInfo::sign_candidates(
542			&mut queued_to_sign,
543			validator_ids,
544			block_hash,
545			num_coalesce,
546			store,
547		));
548	}
549	messages
550}
551
552/// Helper struct to gather information about more than one candidate an sign it in a single
553/// approval message.
554struct TestSignInfo {
555	/// The candidate hash
556	candidate_hash: CandidateHash,
557	/// The candidate index
558	candidate_index: CandidateIndex,
559	/// The validator sending the assignments
560	validator_index: ValidatorIndex,
561	/// The assignments covering this candidate
562	assignment: TestMessageInfo,
563}
564
565impl TestSignInfo {
566	/// Helper function to create a signature for all candidates in `to_sign` parameter.
567	/// Returns a TestMessage
568	fn sign_candidates(
569		to_sign: &mut Vec<TestSignInfo>,
570		validator_ids: &[ValidatorId],
571		block_hash: Hash,
572		num_coalesce: usize,
573		store: &LocalKeystore,
574	) -> MessagesBundle {
575		let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap();
576		let tranche_approval_can_be_sent =
577			to_sign.iter().map(|val| val.assignment.tranche).max().unwrap();
578		let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone();
579
580		let unique_assignments: HashSet<TestMessageInfo> =
581			to_sign.iter().map(|info| info.assignment.clone()).collect();
582
583		let mut to_sign = to_sign
584			.drain(..)
585			.sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index))
586			.peekable();
587
588		let mut bundle = MessagesBundle {
589			assignments: unique_assignments.into_iter().collect_vec(),
590			approvals: Vec::new(),
591		};
592
593		while to_sign.peek().is_some() {
594			let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec();
595
596			let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec();
597			let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec();
598
599			let sent_by = to_sign
600				.iter()
601				.flat_map(|val| val.assignment.sent_by.iter())
602				.copied()
603				.collect::<HashSet<ValidatorIndex>>();
604
605			let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1);
606
607			let signature = store
608				.sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..])
609				.unwrap()
610				.unwrap()
611				.into();
612			let indirect = IndirectSignedApprovalVoteV2 {
613				block_hash,
614				candidate_indices: candidate_indices.try_into().unwrap(),
615				validator: current_validator_index,
616				signature,
617			};
618			let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]);
619
620			bundle.approvals.push(TestMessageInfo {
621				msg,
622				sent_by: sent_by.into_iter().collect_vec(),
623				tranche: tranche_approval_can_be_sent,
624				block_hash,
625			});
626		}
627		bundle
628	}
629}
630
631/// Determine what neighbours would send a given message to the node under test.
632fn neighbours_that_would_sent_message(
633	peer_ids: &[PeerId],
634	current_validator_index: u32,
635	topology_node_under_test: &GridNeighbors,
636	topology: &SessionGridTopology,
637) -> Vec<(ValidatorIndex, PeerId)> {
638	let topology_originator = topology
639		.compute_grid_neighbors_for(ValidatorIndex(current_validator_index))
640		.unwrap();
641
642	let originator_y = topology_originator.validator_indices_y.iter().find(|validator| {
643		topology_node_under_test.required_routing_by_index(**validator, false) ==
644			RequiredRouting::GridY
645	});
646
647	assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST)));
648
649	let originator_x = topology_originator.validator_indices_x.iter().find(|validator| {
650		topology_node_under_test.required_routing_by_index(**validator, false) ==
651			RequiredRouting::GridX
652	});
653
654	assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST)));
655
656	let is_neighbour = topology_originator
657		.validator_indices_x
658		.contains(&ValidatorIndex(NODE_UNDER_TEST)) ||
659		topology_originator
660			.validator_indices_y
661			.contains(&ValidatorIndex(NODE_UNDER_TEST));
662
663	let mut to_be_sent_by = originator_y
664		.into_iter()
665		.chain(originator_x)
666		.map(|val| (*val, peer_ids[val.0 as usize]))
667		.collect_vec();
668
669	if is_neighbour {
670		to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0]));
671	}
672
673	to_be_sent_by
674}