1use crate::{
18 approval::{
19 helpers::{generate_babe_epoch, generate_topology},
20 test_message::{MessagesBundle, TestMessageInfo},
21 ApprovalTestState, ApprovalsOptions, BlockTestData, GeneratedState,
22 BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS,
23 },
24 configuration::{TestAuthorities, TestConfiguration},
25 mock::runtime_api::session_info_for_peers,
26 NODE_UNDER_TEST,
27};
28use codec::Encode;
29use futures::SinkExt;
30use itertools::Itertools;
31use polkadot_node_core_approval_voting::criteria::{compute_assignments, Config};
32
33use polkadot_node_network_protocol::{
34 grid_topology::{GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology},
35 v3 as protocol_v3,
36};
37use polkadot_node_primitives::approval::{
38 self,
39 time::tranche_to_tick,
40 v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
41};
42use polkadot_primitives::{
43 ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex, CoreIndex, Hash,
44 SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
45};
46use rand::{seq::SliceRandom, RngCore, SeedableRng};
47use rand_chacha::ChaCha20Rng;
48use rand_distr::{Distribution, Normal};
49use sc_keystore::LocalKeystore;
50use sc_network_types::PeerId;
51use sc_service::SpawnTaskHandle;
52use sha1::Digest;
53use sp_application_crypto::AppCrypto;
54use sp_consensus_babe::SlotDuration;
55use sp_keystore::Keystore;
56use sp_timestamp::Timestamp;
57use std::{
58 cmp::max,
59 collections::{BTreeMap, HashSet},
60 fs,
61 io::Write,
62 path::{Path, PathBuf},
63 time::Duration,
64};
65
66pub struct PeerMessagesGenerator {
68 pub topology_node_under_test: GridNeighbors,
70 pub topology: SessionGridTopology,
72 pub validator_index: ValidatorIndex,
74 pub random_samplings: Vec<Vec<ValidatorIndex>>,
79 pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec<MessagesBundle>)>,
81 pub test_authorities: TestAuthorities,
83 pub session_info: SessionInfo,
85 pub blocks: Vec<BlockTestData>,
87 pub options: ApprovalsOptions,
89}
90
91impl PeerMessagesGenerator {
92 pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) {
95 spawn_task_handle.spawn("generate-messages", "generate-messages", async move {
96 for block_info in &self.blocks {
97 let assignments = self.generate_assignments(block_info);
98
99 let bytes = self.validator_index.0.to_be_bytes();
100 let seed = [
101 bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
102 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
103 ];
104
105 let mut rand_chacha = ChaCha20Rng::from_seed(seed);
106 let approvals = issue_approvals(
107 assignments,
108 block_info.hash,
109 &self.test_authorities.validator_public,
110 block_info.candidates.clone(),
111 &self.options,
112 &mut rand_chacha,
113 self.test_authorities.keyring.keystore_ref(),
114 );
115
116 self.tx_messages
117 .send((block_info.hash, approvals))
118 .await
119 .expect("Should not fail");
120 }
121 })
122 }
123
124 fn messages_fingerprint(
127 configuration: &TestConfiguration,
128 options: &ApprovalsOptions,
129 ) -> String {
130 let mut fingerprint = options.fingerprint();
131 let configuration_bytes = bincode::serialize(&configuration).unwrap();
132 fingerprint.extend(configuration_bytes);
133 let mut sha1 = sha1::Sha1::new();
134 sha1.update(fingerprint);
135 let result = sha1.finalize();
136 hex::encode(result)
137 }
138
139 pub fn generate_messages_if_needed(
141 configuration: &TestConfiguration,
142 test_authorities: &TestAuthorities,
143 options: &ApprovalsOptions,
144 spawn_task_handle: &SpawnTaskHandle,
145 ) -> PathBuf {
146 let path_name = format!(
147 "{}/{}",
148 options.workdir_prefix,
149 Self::messages_fingerprint(configuration, options)
150 );
151
152 let path = Path::new(&path_name);
153 if path.exists() {
154 return path.to_path_buf();
155 }
156
157 gum::info!("Generate message because file does not exist");
158 let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS);
159 let initial_slot = Slot::from_timestamp(
160 (*Timestamp::current() - *delta_to_first_slot_under_test).into(),
161 SlotDuration::from_millis(SLOT_DURATION_MILLIS),
162 );
163
164 let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone());
165 let session_info = session_info_for_peers(configuration, test_authorities);
166 let blocks = ApprovalTestState::generate_blocks_information(
167 configuration,
168 &babe_epoch,
169 initial_slot,
170 );
171
172 gum::info!(target: LOG_TARGET, "Generate messages");
173 let topology = generate_topology(test_authorities);
174
175 let random_samplings = random_samplings_to_node(
176 ValidatorIndex(NODE_UNDER_TEST),
177 test_authorities.validator_public.len(),
178 test_authorities.validator_public.len() * 2,
179 );
180
181 let topology_node_under_test =
182 topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
183
184 let (tx, mut rx) = futures::channel::mpsc::unbounded();
185
186 for current_validator_index in 1..test_authorities.validator_public.len() {
189 let peer_message_source = PeerMessagesGenerator {
190 topology_node_under_test: topology_node_under_test.clone(),
191 topology: topology.clone(),
192 validator_index: ValidatorIndex(current_validator_index as u32),
193 test_authorities: test_authorities.clone(),
194 session_info: session_info.clone(),
195 blocks: blocks.clone(),
196 tx_messages: tx.clone(),
197 random_samplings: random_samplings.clone(),
198 options: options.clone(),
199 };
200
201 peer_message_source.generate_messages(spawn_task_handle);
202 }
203
204 std::mem::drop(tx);
205
206 let seed = [0x32; 32];
207 let mut rand_chacha = ChaCha20Rng::from_seed(seed);
208
209 let mut all_messages: BTreeMap<u64, Vec<MessagesBundle>> = BTreeMap::new();
210 loop {
212 match rx.try_next() {
213 Ok(Some((block_hash, messages))) =>
214 for message in messages {
215 let block_info = blocks
216 .iter()
217 .find(|val| val.hash == block_hash)
218 .expect("Should find blocks");
219 let tick_to_send = tranche_to_tick(
220 SLOT_DURATION_MILLIS,
221 block_info.slot,
222 message.tranche_to_send(),
223 );
224 let to_add = all_messages.entry(tick_to_send).or_default();
225 to_add.push(message);
226 },
227 Ok(None) => break,
228 Err(_) => {
229 std::thread::sleep(Duration::from_millis(50));
230 },
231 }
232 }
233 let all_messages = all_messages
234 .into_iter()
235 .flat_map(|(_, mut messages)| {
236 messages.shuffle(&mut rand_chacha);
239 messages
240 })
241 .collect_vec();
242
243 gum::info!("Generated a number of {:} unique messages", all_messages.len());
244
245 let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot };
246
247 let mut messages_file = fs::OpenOptions::new()
248 .write(true)
249 .create(true)
250 .truncate(true)
251 .open(path)
252 .unwrap();
253
254 messages_file
255 .write_all(&generated_state.encode())
256 .expect("Could not update message file");
257 path.to_path_buf()
258 }
259
260 fn generate_assignments(&self, block_info: &BlockTestData) -> Vec<TestMessageInfo> {
263 let config = Config::from(&self.session_info);
264
265 let leaving_cores = block_info
266 .candidates
267 .clone()
268 .into_iter()
269 .map(|candidate_event| {
270 if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) =
271 candidate_event
272 {
273 (candidate.hash(), core_index, group_index)
274 } else {
275 todo!("Variant is never created in this benchmark")
276 }
277 })
278 .collect_vec();
279
280 let mut assignments_by_tranche = BTreeMap::new();
281
282 let bytes = self.validator_index.0.to_be_bytes();
283 let seed = [
284 bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
285 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
286 ];
287 let mut rand_chacha = ChaCha20Rng::from_seed(seed);
288
289 let to_be_sent_by = neighbours_that_would_sent_message(
290 &self.test_authorities.peer_ids,
291 self.validator_index.0,
292 &self.topology_node_under_test,
293 &self.topology,
294 );
295
296 let leaving_cores = leaving_cores
297 .clone()
298 .into_iter()
299 .filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0)
300 .collect_vec();
301
302 let store = LocalKeystore::in_memory();
303 let _public = store
304 .sr25519_generate_new(
305 ASSIGNMENT_KEY_TYPE_ID,
306 Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()),
307 )
308 .expect("should not fail");
309 let assignments = compute_assignments(
310 &store,
311 block_info.relay_vrf_story.clone(),
312 &config,
313 leaving_cores.clone(),
314 self.options.enable_assignments_v2,
315 );
316
317 let random_sending_nodes = self
318 .random_samplings
319 .get(rand_chacha.next_u32() as usize % self.random_samplings.len())
320 .unwrap();
321 let random_sending_peer_ids = random_sending_nodes
322 .iter()
323 .map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize]))
324 .collect_vec();
325
326 let mut unique_assignments = HashSet::new();
327 for (core_index, assignment) in assignments {
328 let assigned_cores = match &assignment.cert().kind {
329 approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
330 core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(),
331 approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } =>
332 vec![*core_index],
333 approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
334 vec![core_index],
335 };
336
337 let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap();
338
339 if unique_assignments.insert(bitfiled) {
342 let this_tranche_assignments =
343 assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new);
344
345 this_tranche_assignments.push((
346 IndirectAssignmentCertV2 {
347 block_hash: block_info.hash,
348 validator: self.validator_index,
349 cert: assignment.cert().clone(),
350 },
351 block_info
352 .candidates
353 .iter()
354 .enumerate()
355 .filter(|(_index, candidate)| {
356 if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate {
357 assigned_cores.contains(core)
358 } else {
359 panic!("Should not happen");
360 }
361 })
362 .map(|(index, _)| index as u32)
363 .collect_vec()
364 .try_into()
365 .unwrap(),
366 to_be_sent_by
367 .iter()
368 .chain(random_sending_peer_ids.iter())
369 .copied()
370 .collect::<HashSet<(ValidatorIndex, PeerId)>>(),
371 assignment.tranche(),
372 ));
373 }
374 }
375
376 assignments_by_tranche
377 .into_values()
378 .flat_map(|assignments| assignments.into_iter())
379 .map(|assignment| {
380 let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![(
381 assignment.0,
382 assignment.1,
383 )]);
384 TestMessageInfo {
385 msg,
386 sent_by: assignment
387 .2
388 .into_iter()
389 .map(|(validator_index, _)| validator_index)
390 .collect_vec(),
391 tranche: assignment.3,
392 block_hash: block_info.hash,
393 }
394 })
395 .collect_vec()
396 }
397}
398
399fn random_samplings_to_node(
406 node_under_test: ValidatorIndex,
407 num_validators: usize,
408 num_samplings: usize,
409) -> Vec<Vec<ValidatorIndex>> {
410 let seed = [7u8; 32];
411 let mut rand_chacha = ChaCha20Rng::from_seed(seed);
412
413 (0..num_samplings)
414 .map(|_| {
415 (0..num_validators)
416 .filter(|sending_validator_index| {
417 *sending_validator_index != NODE_UNDER_TEST as usize
418 })
419 .flat_map(|sending_validator_index| {
420 let mut validators = (0..num_validators).collect_vec();
421 validators.shuffle(&mut rand_chacha);
422
423 let mut random_routing = RandomRouting::default();
424 validators
425 .into_iter()
426 .flat_map(|validator_to_send| {
427 if random_routing.sample(num_validators, &mut rand_chacha) {
428 random_routing.inc_sent();
429 if validator_to_send == node_under_test.0 as usize {
430 Some(ValidatorIndex(sending_validator_index as u32))
431 } else {
432 None
433 }
434 } else {
435 None
436 }
437 })
438 .collect_vec()
439 })
440 .collect_vec()
441 })
442 .collect_vec()
443}
444
445fn coalesce_approvals_len(
448 coalesce_mean: f32,
449 coalesce_std_dev: f32,
450 rand_chacha: &mut ChaCha20Rng,
451) -> usize {
452 max(
453 1,
454 Normal::new(coalesce_mean, coalesce_std_dev)
455 .expect("normal distribution parameters are good")
456 .sample(rand_chacha)
457 .round() as i32,
458 ) as usize
459}
460
461fn issue_approvals(
464 assignments: Vec<TestMessageInfo>,
465 block_hash: Hash,
466 validator_ids: &[ValidatorId],
467 candidates: Vec<CandidateEvent>,
468 options: &ApprovalsOptions,
469 rand_chacha: &mut ChaCha20Rng,
470 store: &LocalKeystore,
471) -> Vec<MessagesBundle> {
472 let mut queued_to_sign: Vec<TestSignInfo> = Vec::new();
473 let mut num_coalesce =
474 coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha);
475 let result = assignments
476 .iter()
477 .map(|message| match &message.msg {
478 protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
479 let mut approvals_to_create = Vec::new();
480
481 let current_validator_index = queued_to_sign
482 .first()
483 .map(|msg| msg.validator_index)
484 .unwrap_or(ValidatorIndex(99999));
485
486 assert_eq!(assignments.len(), 1);
488
489 let assignment = assignments.first().unwrap();
490
491 let earliest_tranche = queued_to_sign
492 .first()
493 .map(|val| val.assignment.tranche)
494 .unwrap_or(message.tranche);
495
496 if queued_to_sign.len() >= num_coalesce ||
497 (!queued_to_sign.is_empty() &&
498 current_validator_index != assignment.0.validator) ||
499 message.tranche - earliest_tranche >= options.coalesce_tranche_diff
500 {
501 approvals_to_create.push(TestSignInfo::sign_candidates(
502 &mut queued_to_sign,
503 validator_ids,
504 block_hash,
505 num_coalesce,
506 store,
507 ));
508 num_coalesce = coalesce_approvals_len(
509 options.coalesce_mean,
510 options.coalesce_std_dev,
511 rand_chacha,
512 );
513 }
514
515 for candidate_index in assignment.1.iter_ones() {
518 let candidate = candidates.get(candidate_index).unwrap();
519 if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate {
520 queued_to_sign.push(TestSignInfo {
521 candidate_hash: candidate.hash(),
522 candidate_index: candidate_index as CandidateIndex,
523 validator_index: assignment.0.validator,
524 assignment: message.clone(),
525 });
526 } else {
527 todo!("Other enum variants are not used in this benchmark");
528 }
529 }
530 approvals_to_create
531 },
532 _ => {
533 todo!("Other enum variants are not used in this benchmark");
534 },
535 })
536 .collect_vec();
537
538 let mut messages = result.into_iter().flatten().collect_vec();
539
540 if !queued_to_sign.is_empty() {
541 messages.push(TestSignInfo::sign_candidates(
542 &mut queued_to_sign,
543 validator_ids,
544 block_hash,
545 num_coalesce,
546 store,
547 ));
548 }
549 messages
550}
551
552struct TestSignInfo {
555 candidate_hash: CandidateHash,
557 candidate_index: CandidateIndex,
559 validator_index: ValidatorIndex,
561 assignment: TestMessageInfo,
563}
564
565impl TestSignInfo {
566 fn sign_candidates(
569 to_sign: &mut Vec<TestSignInfo>,
570 validator_ids: &[ValidatorId],
571 block_hash: Hash,
572 num_coalesce: usize,
573 store: &LocalKeystore,
574 ) -> MessagesBundle {
575 let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap();
576 let tranche_approval_can_be_sent =
577 to_sign.iter().map(|val| val.assignment.tranche).max().unwrap();
578 let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone();
579
580 let unique_assignments: HashSet<TestMessageInfo> =
581 to_sign.iter().map(|info| info.assignment.clone()).collect();
582
583 let mut to_sign = to_sign
584 .drain(..)
585 .sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index))
586 .peekable();
587
588 let mut bundle = MessagesBundle {
589 assignments: unique_assignments.into_iter().collect_vec(),
590 approvals: Vec::new(),
591 };
592
593 while to_sign.peek().is_some() {
594 let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec();
595
596 let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec();
597 let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec();
598
599 let sent_by = to_sign
600 .iter()
601 .flat_map(|val| val.assignment.sent_by.iter())
602 .copied()
603 .collect::<HashSet<ValidatorIndex>>();
604
605 let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1);
606
607 let signature = store
608 .sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..])
609 .unwrap()
610 .unwrap()
611 .into();
612 let indirect = IndirectSignedApprovalVoteV2 {
613 block_hash,
614 candidate_indices: candidate_indices.try_into().unwrap(),
615 validator: current_validator_index,
616 signature,
617 };
618 let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]);
619
620 bundle.approvals.push(TestMessageInfo {
621 msg,
622 sent_by: sent_by.into_iter().collect_vec(),
623 tranche: tranche_approval_can_be_sent,
624 block_hash,
625 });
626 }
627 bundle
628 }
629}
630
631fn neighbours_that_would_sent_message(
633 peer_ids: &[PeerId],
634 current_validator_index: u32,
635 topology_node_under_test: &GridNeighbors,
636 topology: &SessionGridTopology,
637) -> Vec<(ValidatorIndex, PeerId)> {
638 let topology_originator = topology
639 .compute_grid_neighbors_for(ValidatorIndex(current_validator_index))
640 .unwrap();
641
642 let originator_y = topology_originator.validator_indices_y.iter().find(|validator| {
643 topology_node_under_test.required_routing_by_index(**validator, false) ==
644 RequiredRouting::GridY
645 });
646
647 assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST)));
648
649 let originator_x = topology_originator.validator_indices_x.iter().find(|validator| {
650 topology_node_under_test.required_routing_by_index(**validator, false) ==
651 RequiredRouting::GridX
652 });
653
654 assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST)));
655
656 let is_neighbour = topology_originator
657 .validator_indices_x
658 .contains(&ValidatorIndex(NODE_UNDER_TEST)) ||
659 topology_originator
660 .validator_indices_y
661 .contains(&ValidatorIndex(NODE_UNDER_TEST));
662
663 let mut to_be_sent_by = originator_y
664 .into_iter()
665 .chain(originator_x)
666 .map(|val| (*val, peer_ids[val.0 as usize]))
667 .collect_vec();
668
669 if is_neighbour {
670 to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0]));
671 }
672
673 to_be_sent_by
674}