1#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31 self as net_protocol, filter_by_peer_version,
32 grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33 peer_set::MAX_NOTIFICATION_SIZE,
34 v3 as protocol_v3, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
35};
36use polkadot_node_primitives::{
37 approval::{
38 criteria::{AssignmentCriteria, InvalidAssignment},
39 time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
40 v1::{BlockApprovalMeta, DelayTranche, RelayVRFStory},
41 v2::{
42 AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
43 IndirectSignedApprovalVoteV2,
44 },
45 },
46 DISPUTE_WINDOW,
47};
48use polkadot_node_subsystem::{
49 messages::{
50 ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
51 CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
52 RuntimeApiMessage,
53 },
54 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
55};
56use polkadot_node_subsystem_util::{
57 reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
58 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
59};
60use polkadot_primitives::{
61 BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
62 SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
63};
64use rand::{CryptoRng, Rng, SeedableRng};
65use std::{
66 collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
67 sync::Arc,
68 time::Duration,
69};
70
71pub mod metrics;
73
74#[cfg(test)]
75mod tests;
76
77const LOG_TARGET: &str = "parachain::approval-distribution";
78
79const COST_UNEXPECTED_MESSAGE: Rep =
80 Rep::CostMinor("Peer sent an out-of-view assignment or approval");
81const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
82const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
83 Rep::CostMinor("The vote was valid but too far in the future");
84const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
85const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
86
87const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
88const BENEFIT_VALID_MESSAGE_FIRST: Rep =
89 Rep::BenefitMinorFirst("Valid message with new information");
90
91const MAX_BITFIELD_SIZE: usize = 500;
93
94pub struct ApprovalDistribution {
96 metrics: Metrics,
97 slot_duration_millis: u64,
98 clock: Arc<dyn Clock + Send + Sync>,
99 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
100}
101
102#[derive(Default)]
105struct RecentlyOutdated {
106 buf: VecDeque<Hash>,
107}
108
109impl RecentlyOutdated {
110 fn note_outdated(&mut self, hash: Hash) {
111 const MAX_BUF_LEN: usize = 20;
112
113 self.buf.push_back(hash);
114
115 while self.buf.len() > MAX_BUF_LEN {
116 let _ = self.buf.pop_front();
117 }
118 }
119
120 fn is_recent_outdated(&self, hash: &Hash) -> bool {
121 self.buf.contains(hash)
122 }
123}
124
125struct ApprovalRouting {
127 required_routing: RequiredRouting,
128 local: bool,
129 random_routing: RandomRouting,
130 peers_randomly_routed: Vec<PeerId>,
131}
132
133impl ApprovalRouting {
134 fn mark_randomly_sent(&mut self, peer: PeerId) {
135 self.random_routing.inc_sent();
136 self.peers_randomly_routed.push(peer);
137 }
138}
139
140struct ApprovalEntry {
143 assignment: IndirectAssignmentCertV2,
145 assignment_claimed_candidates: CandidateBitfield,
147 approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
149 validator_index: ValidatorIndex,
151 routing_info: ApprovalRouting,
153}
154
155#[derive(Debug)]
156enum ApprovalEntryError {
157 InvalidValidatorIndex,
158 CandidateIndexOutOfBounds,
159 InvalidCandidateIndex,
160 DuplicateApproval,
161 UnknownAssignment,
162}
163
164impl ApprovalEntry {
165 pub fn new(
166 assignment: IndirectAssignmentCertV2,
167 candidates: CandidateBitfield,
168 routing_info: ApprovalRouting,
169 ) -> ApprovalEntry {
170 Self {
171 validator_index: assignment.validator,
172 assignment,
173 approvals: HashMap::new(),
174 assignment_claimed_candidates: candidates,
175 routing_info,
176 }
177 }
178
179 pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
181 (
182 MessageSubject(
183 block_hash,
184 self.assignment_claimed_candidates.clone(),
185 self.validator_index,
186 ),
187 MessageKind::Assignment,
188 )
189 }
190
191 pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
193 &mut self.routing_info
194 }
195
196 pub fn routing_info(&self) -> &ApprovalRouting {
198 &self.routing_info
199 }
200
201 pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
203 self.routing_info.required_routing = required_routing;
204 }
205
206 pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
208 for candidate_index in approval.candidate_indices.iter_ones() {
209 if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
210 return true
211 }
212 }
213 return false
214 }
215
216 pub fn note_approval(
219 &mut self,
220 approval: IndirectSignedApprovalVoteV2,
221 ) -> Result<(), ApprovalEntryError> {
222 if self.validator_index != approval.validator {
227 return Err(ApprovalEntryError::InvalidValidatorIndex)
228 }
229
230 if !self.includes_approval_candidates(&approval) {
232 return Err(ApprovalEntryError::InvalidCandidateIndex)
233 }
234
235 if self.approvals.contains_key(&approval.candidate_indices) {
236 return Err(ApprovalEntryError::DuplicateApproval)
237 }
238
239 self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
240 Ok(())
241 }
242
243 pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
245 (self.assignment.clone(), self.assignment_claimed_candidates.clone())
246 }
247
248 pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
250 self.approvals.values().cloned().collect::<Vec<_>>()
251 }
252
253 pub fn validator_index(&self) -> ValidatorIndex {
255 self.validator_index
256 }
257}
258
259struct PeerEntry {
261 pub view: View,
262 pub version: ProtocolVersion,
263}
264
265#[derive(Clone)]
287struct AggressionConfig {
288 l1_threshold: Option<BlockNumber>,
290 l2_threshold: Option<BlockNumber>,
293 resend_unfinalized_period: Option<BlockNumber>,
296}
297
298impl AggressionConfig {
299 fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
301 if let Some(t) = self.l1_threshold {
302 age >= t
303 } else if let Some(t) = self.resend_unfinalized_period {
304 age > 0 && age % t == 0
305 } else {
306 false
307 }
308 }
309}
310
311impl Default for AggressionConfig {
312 fn default() -> Self {
313 AggressionConfig {
314 l1_threshold: Some(16),
315 l2_threshold: Some(64),
316 resend_unfinalized_period: Some(8),
317 }
318 }
319}
320
321#[derive(PartialEq)]
322enum Resend {
323 Yes,
324 No,
325}
326
327#[derive(Default)]
332pub struct State {
333 blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
335 blocks: HashMap<Hash, BlockEntry>,
336
337 pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
345
346 peer_views: HashMap<PeerId, PeerEntry>,
348
349 topologies: SessionGridTopologies,
351
352 recent_outdated_blocks: RecentlyOutdated,
354
355 aggression_config: AggressionConfig,
357
358 approval_checking_lag: BlockNumber,
360
361 reputation: ReputationAggregator,
363
364 slot_duration_millis: u64,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum MessageKind {
370 Assignment,
371 Approval,
372}
373
374#[derive(Debug, Clone, Hash, PartialEq, Eq)]
378struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
379
380#[derive(Debug, Clone, Default)]
381struct Knowledge {
382 known_messages: HashMap<MessageSubject, MessageKind>,
386}
387
388impl Knowledge {
389 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
390 match (kind, self.known_messages.get(message)) {
391 (_, None) => false,
392 (MessageKind::Assignment, Some(_)) => true,
393 (MessageKind::Approval, Some(MessageKind::Assignment)) => false,
394 (MessageKind::Approval, Some(MessageKind::Approval)) => true,
395 }
396 }
397
398 fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
399 let mut success = match self.known_messages.entry(message.clone()) {
400 hash_map::Entry::Vacant(vacant) => {
401 vacant.insert(kind);
402 true
405 },
406 hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
407 (MessageKind::Assignment, MessageKind::Assignment) => false,
408 (MessageKind::Approval, MessageKind::Approval) => false,
409 (MessageKind::Approval, MessageKind::Assignment) => false,
410 (MessageKind::Assignment, MessageKind::Approval) => {
411 *occupied.get_mut() = MessageKind::Approval;
412 true
413 },
414 },
415 };
416
417 if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
421 for candidate_index in message.1.iter_ones() {
422 success = success &&
423 self.insert(
424 MessageSubject(
425 message.0,
426 vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
427 message.2,
428 ),
429 kind,
430 );
431 }
432 }
433 success
434 }
435}
436
437#[derive(Debug, Clone, Default)]
439struct PeerKnowledge {
440 sent: Knowledge,
442 received: Knowledge,
444}
445
446impl PeerKnowledge {
447 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
448 self.sent.contains(message, kind) || self.received.contains(message, kind)
449 }
450
451 fn generate_assignments_keys(
454 approval: &IndirectSignedApprovalVoteV2,
455 ) -> Vec<(MessageSubject, MessageKind)> {
456 approval
457 .candidate_indices
458 .iter_ones()
459 .map(|candidate_index| {
460 (
461 MessageSubject(
462 approval.block_hash,
463 (candidate_index as CandidateIndex).into(),
464 approval.validator,
465 ),
466 MessageKind::Assignment,
467 )
468 })
469 .collect_vec()
470 }
471
472 fn generate_approval_key(
474 approval: &IndirectSignedApprovalVoteV2,
475 ) -> (MessageSubject, MessageKind) {
476 (
477 MessageSubject(
478 approval.block_hash,
479 approval.candidate_indices.clone(),
480 approval.validator,
481 ),
482 MessageKind::Approval,
483 )
484 }
485}
486
487struct BlockEntry {
489 known_by: HashMap<PeerId, PeerKnowledge>,
492 number: BlockNumber,
494 parent_hash: Hash,
496 knowledge: Knowledge,
498 candidates: Vec<CandidateEntry>,
500 candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
502 session: SessionIndex,
504 approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
507 vrf_story: RelayVRFStory,
509 slot: Slot,
511 last_resent_at_block_number: Option<u32>,
513}
514
515impl BlockEntry {
516 pub fn known_by(&self) -> Vec<PeerId> {
518 self.known_by.keys().cloned().collect::<Vec<_>>()
519 }
520
521 pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
522 for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
526 match self.candidates.get_mut(claimed_candidate_index) {
527 Some(candidate_entry) => {
528 candidate_entry
529 .assignments
530 .entry(entry.validator_index())
531 .or_insert(entry.assignment_claimed_candidates.clone());
532 },
533 None => {
534 gum::warn!(
537 target: LOG_TARGET,
538 hash = ?entry.assignment.block_hash,
539 ?claimed_candidate_index,
540 "Missing candidate entry on `import_and_circulate_assignment`",
541 );
542 },
543 };
544 }
545
546 self.approval_entries
547 .entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
548 .or_insert(entry)
549 }
550
551 pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
553 candidate_indices
554 .iter_ones()
555 .all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
556 }
557
558 pub fn note_approval(
564 &mut self,
565 approval: IndirectSignedApprovalVoteV2,
566 ) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
567 let mut required_routing: Option<RequiredRouting> = None;
568 let mut peers_randomly_routed_to = HashSet::new();
569
570 if self.candidates.len() < approval.candidate_indices.len() as usize {
571 return Err(ApprovalEntryError::CandidateIndexOutOfBounds)
572 }
573
574 let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
576 .candidate_indices
577 .iter_ones()
578 .filter_map(|candidate_index| {
579 self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
580 candidate_entry.assignments.get(&approval.validator).cloned()
581 })
582 })
583 .collect();
584
585 for assignment_bitfield in covered_assignments_bitfields {
587 if let Some(approval_entry) =
588 self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
589 {
590 approval_entry.note_approval(approval.clone())?;
591 peers_randomly_routed_to
592 .extend(approval_entry.routing_info().peers_randomly_routed.iter());
593
594 if let Some(current_required_routing) = required_routing {
595 required_routing = Some(
596 current_required_routing
597 .combine(approval_entry.routing_info().required_routing),
598 );
599 } else {
600 required_routing = Some(approval_entry.routing_info().required_routing)
601 }
602 }
603 }
604
605 if let Some(required_routing) = required_routing {
606 Ok((required_routing, peers_randomly_routed_to))
607 } else {
608 Err(ApprovalEntryError::UnknownAssignment)
609 }
610 }
611
612 pub fn approval_votes(
614 &self,
615 candidate_index: CandidateIndex,
616 ) -> Vec<IndirectSignedApprovalVoteV2> {
617 let result: Option<
618 HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
619 > = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
620 candidate_entry
621 .assignments
622 .iter()
623 .filter_map(|(validator, assignment_bitfield)| {
624 self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
625 })
626 .flat_map(|approval_entry| {
627 approval_entry
628 .approvals
629 .clone()
630 .into_iter()
631 .filter(|(approved_candidates, _)| {
632 approved_candidates.bit_at(candidate_index.as_bit_index())
633 })
634 .map(|(approved_candidates, vote)| {
635 ((approval_entry.validator_index, approved_candidates), vote)
636 })
637 })
638 .collect()
639 });
640
641 result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
642 }
643}
644
645#[derive(Debug, Default)]
649struct CandidateEntry {
650 assignments: HashMap<ValidatorIndex, CandidateBitfield>,
653}
654
655#[derive(Debug, Clone, PartialEq)]
656enum MessageSource {
657 Peer(PeerId),
658 Local,
659}
660
661#[derive(Debug)]
663enum InvalidAssignmentError {
664 #[allow(dead_code)]
666 CryptoCheckFailed(InvalidAssignment),
667 NoClaimedCandidates,
669 #[allow(dead_code)]
671 ClaimedInvalidCandidateIndex {
672 claimed_index: usize,
673 max_index: usize,
674 },
675 OversizedClaimedBitfield,
677 #[allow(dead_code)]
679 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
680}
681
682#[derive(Debug)]
684enum InvalidVoteError {
685 CandidateIndexOutOfBounds,
687 ValidatorIndexOutOfBounds,
689 InvalidSignature,
691 #[allow(dead_code)]
693 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
694}
695
696impl MessageSource {
697 fn peer_id(&self) -> Option<PeerId> {
698 match self {
699 Self::Peer(id) => Some(*id),
700 Self::Local => None,
701 }
702 }
703}
704
705enum PendingMessage {
706 Assignment(IndirectAssignmentCertV2, CandidateBitfield),
707 Approval(IndirectSignedApprovalVoteV2),
708}
709
710#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
711impl State {
712 pub fn with_config(slot_duration_millis: u64) -> Self {
714 Self { slot_duration_millis, ..Default::default() }
715 }
716
717 async fn handle_network_msg<
718 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
719 A: overseer::SubsystemSender<ApprovalVotingMessage>,
720 RA: overseer::SubsystemSender<RuntimeApiMessage>,
721 >(
722 &mut self,
723 approval_voting_sender: &mut A,
724 network_sender: &mut N,
725 runtime_api_sender: &mut RA,
726 metrics: &Metrics,
727 event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
728 rng: &mut (impl CryptoRng + Rng),
729 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
730 clock: &(impl Clock + ?Sized),
731 session_info_provider: &mut RuntimeInfo,
732 ) {
733 match event {
734 NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
735 gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
736 if let Some(authority_ids) = authority_ids {
737 self.topologies.update_authority_ids(peer_id, &authority_ids);
738 }
739 self.peer_views
741 .entry(peer_id)
742 .or_insert(PeerEntry { view: Default::default(), version });
743 },
744 NetworkBridgeEvent::PeerDisconnected(peer_id) => {
745 gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
746 self.peer_views.remove(&peer_id);
747 self.blocks.iter_mut().for_each(|(_hash, entry)| {
748 entry.known_by.remove(&peer_id);
749 })
750 },
751 NetworkBridgeEvent::NewGossipTopology(topology) => {
752 self.handle_new_session_topology(
753 network_sender,
754 topology.session,
755 topology.topology,
756 topology.local_index,
757 )
758 .await;
759 },
760 NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
761 self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
762 },
763 NetworkBridgeEvent::OurViewChange(view) => {
764 gum::trace!(target: LOG_TARGET, ?view, "Own view change");
765 for head in view.iter() {
766 if !self.blocks.contains_key(head) {
767 self.pending_known.entry(*head).or_default();
768 }
769 }
770
771 self.pending_known.retain(|h, _| {
772 let live = view.contains(h);
773 if !live {
774 gum::trace!(
775 target: LOG_TARGET,
776 block_hash = ?h,
777 "Cleaning up stale pending messages",
778 );
779 }
780 live
781 });
782 },
783 NetworkBridgeEvent::PeerMessage(peer_id, message) => {
784 self.process_incoming_peer_message(
785 approval_voting_sender,
786 network_sender,
787 runtime_api_sender,
788 metrics,
789 peer_id,
790 message,
791 rng,
792 assignment_criteria,
793 clock,
794 session_info_provider,
795 )
796 .await;
797 },
798 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
799 gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
800 if self.topologies.update_authority_ids(peer_id, &authority_ids) {
803 if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
804 let intersection = self
805 .blocks_by_number
806 .iter()
807 .filter(|(block_number, _)| *block_number > &view.finalized_number)
808 .flat_map(|(_, hashes)| {
809 hashes.iter().filter(|hash| {
810 self.blocks
811 .get(&hash)
812 .map(|block| block.known_by.get(&peer_id).is_some())
813 .unwrap_or_default()
814 })
815 });
816 let view_intersection =
817 View::new(intersection.cloned(), view.finalized_number);
818 Self::unify_with_peer(
819 network_sender,
820 metrics,
821 &mut self.blocks,
822 &self.topologies,
823 self.peer_views.len(),
824 peer_id,
825 *version,
826 view_intersection,
827 rng,
828 true,
829 )
830 .await;
831 }
832 }
833 },
834 }
835 }
836
837 async fn handle_new_blocks<
838 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
839 A: overseer::SubsystemSender<ApprovalVotingMessage>,
840 RA: overseer::SubsystemSender<RuntimeApiMessage>,
841 >(
842 &mut self,
843 approval_voting_sender: &mut A,
844 network_sender: &mut N,
845 runtime_api_sender: &mut RA,
846 metrics: &Metrics,
847 metas: Vec<BlockApprovalMeta>,
848 rng: &mut (impl CryptoRng + Rng),
849 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
850 clock: &(impl Clock + ?Sized),
851 session_info_provider: &mut RuntimeInfo,
852 ) {
853 let mut new_hashes = HashSet::new();
854
855 gum::debug!(
856 target: LOG_TARGET,
857 "Got new blocks {:?}",
858 metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
859 );
860
861 for meta in metas {
862 match self.blocks.entry(meta.hash) {
863 hash_map::Entry::Vacant(entry) => {
864 let candidates_count = meta.candidates.len();
865 let mut candidates = Vec::with_capacity(candidates_count);
866 candidates.resize_with(candidates_count, Default::default);
867
868 entry.insert(BlockEntry {
869 known_by: HashMap::new(),
870 number: meta.number,
871 parent_hash: meta.parent_hash,
872 knowledge: Knowledge::default(),
873 candidates,
874 session: meta.session,
875 approval_entries: HashMap::new(),
876 candidates_metadata: meta.candidates,
877 vrf_story: meta.vrf_story,
878 slot: meta.slot,
879 last_resent_at_block_number: None,
880 });
881
882 self.topologies.inc_session_refs(meta.session);
883
884 new_hashes.insert(meta.hash);
885
886 self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
889 },
890 _ => continue,
891 }
892 }
893
894 {
895 for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
896 let intersection = view.iter().filter(|h| new_hashes.contains(h));
897 let view_intersection = View::new(intersection.cloned(), view.finalized_number);
898 Self::unify_with_peer(
899 network_sender,
900 metrics,
901 &mut self.blocks,
902 &self.topologies,
903 self.peer_views.len(),
904 *peer_id,
905 *version,
906 view_intersection,
907 rng,
908 false,
909 )
910 .await;
911 }
912
913 let pending_now_known = self
914 .pending_known
915 .keys()
916 .filter(|k| self.blocks.contains_key(k))
917 .copied()
918 .collect::<Vec<_>>();
919
920 let to_import = pending_now_known
921 .into_iter()
922 .inspect(|h| {
923 gum::trace!(
924 target: LOG_TARGET,
925 block_hash = ?h,
926 "Extracting pending messages for new block"
927 )
928 })
929 .filter_map(|k| self.pending_known.remove(&k))
930 .flatten()
931 .collect::<Vec<_>>();
932
933 if !to_import.is_empty() {
934 gum::debug!(
935 target: LOG_TARGET,
936 num = to_import.len(),
937 "Processing pending assignment/approvals",
938 );
939
940 let _timer = metrics.time_import_pending_now_known();
941
942 for (peer_id, message) in to_import {
943 match message {
944 PendingMessage::Assignment(assignment, claimed_indices) => {
945 self.import_and_circulate_assignment(
946 approval_voting_sender,
947 network_sender,
948 runtime_api_sender,
949 metrics,
950 MessageSource::Peer(peer_id),
951 assignment,
952 claimed_indices,
953 rng,
954 assignment_criteria,
955 clock,
956 session_info_provider,
957 )
958 .await;
959 },
960 PendingMessage::Approval(approval_vote) => {
961 self.import_and_circulate_approval(
962 approval_voting_sender,
963 network_sender,
964 runtime_api_sender,
965 metrics,
966 MessageSource::Peer(peer_id),
967 approval_vote,
968 session_info_provider,
969 )
970 .await;
971 },
972 }
973 }
974 }
975 }
976
977 self.enable_aggression(network_sender, Resend::Yes, metrics).await;
978 }
979
980 async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
981 &mut self,
982 network_sender: &mut N,
983 session: SessionIndex,
984 topology: SessionGridTopology,
985 local_index: Option<ValidatorIndex>,
986 ) {
987 if local_index.is_none() {
988 return
990 }
991
992 self.topologies.insert_topology(session, topology, local_index);
993 let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
994
995 adjust_required_routing_and_propagate(
996 network_sender,
997 &mut self.blocks,
998 &self.topologies,
999 |block_entry| block_entry.session == session,
1000 |required_routing, local, validator_index| {
1001 if required_routing == &RequiredRouting::PendingTopology {
1002 topology
1003 .local_grid_neighbors()
1004 .required_routing_by_index(*validator_index, local)
1005 } else {
1006 *required_routing
1007 }
1008 },
1009 &self.peer_views,
1010 )
1011 .await;
1012 }
1013
1014 async fn process_incoming_assignments<A, N, R, RA>(
1015 &mut self,
1016 approval_voting_sender: &mut A,
1017 network_sender: &mut N,
1018 runtime_api_sender: &mut RA,
1019 metrics: &Metrics,
1020 peer_id: PeerId,
1021 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1022 rng: &mut R,
1023 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1024 clock: &(impl Clock + ?Sized),
1025 session_info_provider: &mut RuntimeInfo,
1026 ) where
1027 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1028 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1029 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1030 R: CryptoRng + Rng,
1031 {
1032 for (assignment, claimed_indices) in assignments {
1033 if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1034 let block_hash = &assignment.block_hash;
1035 let validator_index = assignment.validator;
1036
1037 gum::trace!(
1038 target: LOG_TARGET,
1039 %peer_id,
1040 ?block_hash,
1041 ?claimed_indices,
1042 ?validator_index,
1043 "Pending assignment",
1044 );
1045
1046 pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1047
1048 continue
1049 }
1050
1051 self.import_and_circulate_assignment(
1052 approval_voting_sender,
1053 network_sender,
1054 runtime_api_sender,
1055 metrics,
1056 MessageSource::Peer(peer_id),
1057 assignment,
1058 claimed_indices,
1059 rng,
1060 assignment_criteria,
1061 clock,
1062 session_info_provider,
1063 )
1064 .await;
1065 }
1066 }
1067
1068 async fn process_incoming_approvals<
1070 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1071 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1072 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1073 >(
1074 &mut self,
1075 approval_voting_sender: &mut A,
1076 network_sender: &mut N,
1077 runtime_api_sender: &mut RA,
1078 metrics: &Metrics,
1079 peer_id: PeerId,
1080 approvals: Vec<IndirectSignedApprovalVoteV2>,
1081 session_info_provider: &mut RuntimeInfo,
1082 ) {
1083 gum::trace!(
1084 target: LOG_TARGET,
1085 peer_id = %peer_id,
1086 num = approvals.len(),
1087 "Processing approvals from a peer",
1088 );
1089 for approval_vote in approvals.into_iter() {
1090 if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1091 let block_hash = approval_vote.block_hash;
1092 let validator_index = approval_vote.validator;
1093
1094 gum::trace!(
1095 target: LOG_TARGET,
1096 %peer_id,
1097 ?block_hash,
1098 ?validator_index,
1099 "Pending assignment candidates {:?}",
1100 approval_vote.candidate_indices,
1101 );
1102
1103 pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1104
1105 continue
1106 }
1107
1108 self.import_and_circulate_approval(
1109 approval_voting_sender,
1110 network_sender,
1111 runtime_api_sender,
1112 metrics,
1113 MessageSource::Peer(peer_id),
1114 approval_vote,
1115 session_info_provider,
1116 )
1117 .await;
1118 }
1119 }
1120
1121 async fn process_incoming_peer_message<A, N, RA, R>(
1122 &mut self,
1123 approval_voting_sender: &mut A,
1124 network_sender: &mut N,
1125 runtime_api_sender: &mut RA,
1126 metrics: &Metrics,
1127 peer_id: PeerId,
1128 msg: ValidationProtocols<protocol_v3::ApprovalDistributionMessage>,
1129 rng: &mut R,
1130 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1131 clock: &(impl Clock + ?Sized),
1132 session_info_provider: &mut RuntimeInfo,
1133 ) where
1134 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1135 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1136 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1137 R: CryptoRng + Rng,
1138 {
1139 match msg {
1140 ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Assignments(
1141 assignments,
1142 )) => {
1143 gum::trace!(
1144 target: LOG_TARGET,
1145 peer_id = %peer_id,
1146 num = assignments.len(),
1147 "Processing assignments from a peer",
1148 );
1149 let sanitized_assignments =
1150 self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1151
1152 self.process_incoming_assignments(
1153 approval_voting_sender,
1154 network_sender,
1155 runtime_api_sender,
1156 metrics,
1157 peer_id,
1158 sanitized_assignments,
1159 rng,
1160 assignment_criteria,
1161 clock,
1162 session_info_provider,
1163 )
1164 .await;
1165 },
1166 ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Approvals(
1167 approvals,
1168 )) => {
1169 let sanitized_approvals =
1170 self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1171 self.process_incoming_approvals(
1172 approval_voting_sender,
1173 network_sender,
1174 runtime_api_sender,
1175 metrics,
1176 peer_id,
1177 sanitized_approvals,
1178 session_info_provider,
1179 )
1180 .await;
1181 },
1182 }
1183 }
1184
1185 async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1188 &mut self,
1189 network_sender: &mut N,
1190 metrics: &Metrics,
1191 peer_id: PeerId,
1192 view: View,
1193 rng: &mut R,
1194 ) where
1195 R: CryptoRng + Rng,
1196 {
1197 gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1198 let finalized_number = view.finalized_number;
1199
1200 let (old_view, protocol_version) =
1201 if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1202 (Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1203 } else {
1204 gum::warn!(
1206 target: LOG_TARGET,
1207 ?peer_id,
1208 ?view,
1209 "Peer view change for missing `peer_entry`"
1210 );
1211
1212 (None, ValidationVersion::V3.into())
1213 };
1214
1215 let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1216
1217 let blocks = &mut self.blocks;
1219 let range = old_finalized_number..=finalized_number;
1224 if !range.is_empty() && !blocks.is_empty() {
1225 self.blocks_by_number
1226 .range(range)
1227 .flat_map(|(_number, hashes)| hashes)
1228 .for_each(|hash| {
1229 if let Some(entry) = blocks.get_mut(hash) {
1230 entry.known_by.remove(&peer_id);
1231 }
1232 });
1233 }
1234
1235 Self::unify_with_peer(
1236 network_sender,
1237 metrics,
1238 &mut self.blocks,
1239 &self.topologies,
1240 self.peer_views.len(),
1241 peer_id,
1242 protocol_version,
1243 view,
1244 rng,
1245 false,
1246 )
1247 .await;
1248 }
1249
1250 async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1251 &mut self,
1252 network_sender: &mut N,
1253 metrics: &Metrics,
1254 finalized_number: BlockNumber,
1255 ) {
1256 let split_point = finalized_number.saturating_add(1);
1260 let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1261
1262 std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1264
1265 old_blocks.values().flatten().for_each(|relay_block| {
1267 self.recent_outdated_blocks.note_outdated(*relay_block);
1268 if let Some(block_entry) = self.blocks.remove(relay_block) {
1269 self.topologies.dec_session_refs(block_entry.session);
1270 }
1271 });
1272
1273 self.enable_aggression(network_sender, Resend::No, metrics).await;
1276 }
1277
1278 fn accept_duplicates_from_validators(
1283 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1284 topologies: &SessionGridTopologies,
1285 aggression_config: &AggressionConfig,
1286 entry: &BlockEntry,
1287 peer: PeerId,
1288 ) -> bool {
1289 let topology = topologies.get_topology(entry.session);
1290 let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1291 let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1292
1293 let (min_age, max_age) = match (min_age, max_age) {
1295 (Some(min), Some(max)) => (*min, *max),
1296 _ => return false,
1297 };
1298
1299 let age = max_age.saturating_sub(min_age);
1300
1301 aggression_config.should_trigger_aggression(age) &&
1302 topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1303 }
1304
1305 async fn import_and_circulate_assignment<A, N, RA, R>(
1306 &mut self,
1307 approval_voting_sender: &mut A,
1308 network_sender: &mut N,
1309 runtime_api_sender: &mut RA,
1310 metrics: &Metrics,
1311 source: MessageSource,
1312 assignment: IndirectAssignmentCertV2,
1313 claimed_candidate_indices: CandidateBitfield,
1314 rng: &mut R,
1315 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1316 clock: &(impl Clock + ?Sized),
1317 session_info_provider: &mut RuntimeInfo,
1318 ) where
1319 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1320 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1321 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1322 R: CryptoRng + Rng,
1323 {
1324 let block_hash = assignment.block_hash;
1325 let validator_index = assignment.validator;
1326
1327 let entry = match self.blocks.get_mut(&block_hash) {
1328 Some(entry) => entry,
1329 None => {
1330 if let Some(peer_id) = source.peer_id() {
1331 gum::trace!(
1332 target: LOG_TARGET,
1333 ?peer_id,
1334 hash = ?block_hash,
1335 ?validator_index,
1336 "Unexpected assignment",
1337 );
1338 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1339 modify_reputation(
1340 &mut self.reputation,
1341 network_sender,
1342 peer_id,
1343 COST_UNEXPECTED_MESSAGE,
1344 )
1345 .await;
1346 gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1347 metrics.on_assignment_recent_outdated();
1348 }
1349 }
1350 metrics.on_assignment_invalid_block();
1351 return
1352 },
1353 };
1354
1355 let (message_subject, message_kind) = (
1357 MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1358 MessageKind::Assignment,
1359 );
1360
1361 if let Some(peer_id) = source.peer_id() {
1362 match entry.known_by.entry(peer_id) {
1364 hash_map::Entry::Occupied(mut peer_knowledge) => {
1365 let peer_knowledge = peer_knowledge.get_mut();
1366 if peer_knowledge.contains(&message_subject, message_kind) {
1367 if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1369 if !Self::accept_duplicates_from_validators(
1370 &self.blocks_by_number,
1371 &self.topologies,
1372 &self.aggression_config,
1373 entry,
1374 peer_id,
1375 ) {
1376 gum::debug!(
1377 target: LOG_TARGET,
1378 ?peer_id,
1379 ?message_subject,
1380 "Duplicate assignment",
1381 );
1382
1383 modify_reputation(
1384 &mut self.reputation,
1385 network_sender,
1386 peer_id,
1387 COST_DUPLICATE_MESSAGE,
1388 )
1389 .await;
1390 }
1391
1392 metrics.on_assignment_duplicate();
1393 } else {
1394 gum::trace!(
1395 target: LOG_TARGET,
1396 ?peer_id,
1397 hash = ?block_hash,
1398 ?validator_index,
1399 ?message_subject,
1400 "We sent the message to the peer while peer was sending it to us. Known race condition.",
1401 );
1402 }
1403 return
1404 }
1405 },
1406 hash_map::Entry::Vacant(_) => {
1407 gum::debug!(
1408 target: LOG_TARGET,
1409 ?peer_id,
1410 ?message_subject,
1411 "Assignment from a peer is out of view",
1412 );
1413 modify_reputation(
1414 &mut self.reputation,
1415 network_sender,
1416 peer_id,
1417 COST_UNEXPECTED_MESSAGE,
1418 )
1419 .await;
1420 metrics.on_assignment_out_of_view();
1421 },
1422 }
1423
1424 if entry.knowledge.contains(&message_subject, message_kind) {
1426 modify_reputation(
1427 &mut self.reputation,
1428 network_sender,
1429 peer_id,
1430 BENEFIT_VALID_MESSAGE,
1431 )
1432 .await;
1433 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1434 gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1435 peer_knowledge.received.insert(message_subject, message_kind);
1436 }
1437 metrics.on_assignment_good_known();
1438 return
1439 }
1440
1441 let result = Self::check_assignment_valid(
1442 assignment_criteria,
1443 &entry,
1444 &assignment,
1445 &claimed_candidate_indices,
1446 session_info_provider,
1447 runtime_api_sender,
1448 )
1449 .await;
1450
1451 match result {
1452 Ok(checked_assignment) => {
1453 let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1454 let too_far_in_future =
1455 current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1456
1457 if checked_assignment.tranche() >= too_far_in_future {
1458 gum::debug!(
1459 target: LOG_TARGET,
1460 hash = ?block_hash,
1461 ?peer_id,
1462 "Got an assignment too far in the future",
1463 );
1464 modify_reputation(
1465 &mut self.reputation,
1466 network_sender,
1467 peer_id,
1468 COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1469 )
1470 .await;
1471 metrics.on_assignment_far();
1472
1473 return
1474 }
1475
1476 approval_voting_sender
1477 .send_message(ApprovalVotingMessage::ImportAssignment(
1478 checked_assignment,
1479 None,
1480 ))
1481 .await;
1482 modify_reputation(
1483 &mut self.reputation,
1484 network_sender,
1485 peer_id,
1486 BENEFIT_VALID_MESSAGE_FIRST,
1487 )
1488 .await;
1489 entry.knowledge.insert(message_subject.clone(), message_kind);
1490 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1491 peer_knowledge.received.insert(message_subject.clone(), message_kind);
1492 }
1493 },
1494 Err(error) => {
1495 gum::info!(
1496 target: LOG_TARGET,
1497 hash = ?block_hash,
1498 ?peer_id,
1499 ?error,
1500 "Got a bad assignment from peer",
1501 );
1502 modify_reputation(
1503 &mut self.reputation,
1504 network_sender,
1505 peer_id,
1506 COST_INVALID_MESSAGE,
1507 )
1508 .await;
1509 metrics.on_assignment_bad();
1510 return
1511 },
1512 }
1513 } else {
1514 if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1515 gum::warn!(
1517 target: LOG_TARGET,
1518 ?message_subject,
1519 "Importing locally an already known assignment",
1520 );
1521 return
1522 } else {
1523 gum::debug!(
1524 target: LOG_TARGET,
1525 ?message_subject,
1526 "Importing locally a new assignment",
1527 );
1528 }
1529 }
1530
1531 metrics.on_assignment_imported(&assignment.cert.kind);
1534
1535 let topology = self.topologies.get_topology(entry.session);
1536 let local = source == MessageSource::Local;
1537
1538 let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1539 t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1540 });
1541 let mut peers = HashSet::new();
1543
1544 let peers_to_route_to = topology
1545 .as_ref()
1546 .map(|t| t.peers_to_route(required_routing))
1547 .unwrap_or_default();
1548
1549 for peer in peers_to_route_to {
1550 if !entry.known_by.contains_key(&peer) {
1551 continue
1552 }
1553
1554 peers.insert(peer);
1555 }
1556
1557 let peers_to_filter = entry.known_by();
1559
1560 let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1561 assignment.clone(),
1562 claimed_candidate_indices.clone(),
1563 ApprovalRouting {
1564 required_routing,
1565 local,
1566 random_routing: Default::default(),
1567 peers_randomly_routed: Default::default(),
1568 },
1569 ));
1570
1571 let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1578 let n_peers_total = self.peer_views.len();
1579 let source_peer = source.peer_id();
1580
1581 for peer in peers_to_filter.into_iter() {
1583 if Some(peer) == source_peer {
1584 continue
1585 }
1586
1587 if peers.contains(&peer) {
1588 continue
1589 }
1590
1591 if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1592 continue
1593 }
1594
1595 let route_random =
1599 approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1600
1601 if route_random {
1602 approval_entry.routing_info_mut().mark_randomly_sent(peer);
1603 peers.insert(peer);
1604 }
1605
1606 if approval_entry.routing_info().random_routing.is_complete() {
1607 break
1608 }
1609 }
1610
1611 for peer in peers.iter() {
1613 if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1615 peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1616 }
1617 }
1618
1619 if !peers.is_empty() {
1620 gum::trace!(
1621 target: LOG_TARGET,
1622 ?block_hash,
1623 ?claimed_candidate_indices,
1624 local = source.peer_id().is_none(),
1625 num_peers = peers.len(),
1626 "Sending an assignment to peers",
1627 );
1628
1629 let peers = peers
1630 .iter()
1631 .filter_map(|peer_id| {
1632 self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1633 })
1634 .collect::<Vec<_>>();
1635
1636 send_assignments_batched(network_sender, assignments, &peers).await;
1637 }
1638 }
1639
1640 async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1641 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1642 entry: &BlockEntry,
1643 assignment: &IndirectAssignmentCertV2,
1644 claimed_candidate_indices: &CandidateBitfield,
1645 runtime_info: &mut RuntimeInfo,
1646 runtime_api_sender: &mut RA,
1647 ) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1648 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1649 .get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1650 .await
1651 .map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1652
1653 if claimed_candidate_indices.len() > session_info.n_cores as usize {
1654 return Err(InvalidAssignmentError::OversizedClaimedBitfield)
1655 }
1656
1657 let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1658 .iter_ones()
1659 .map(|candidate_index| {
1660 entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1661 InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1662 claimed_index: candidate_index,
1663 max_index: entry.candidates_metadata.len(),
1664 },
1665 )
1666 })
1667 .collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1668
1669 let Ok(claimed_cores) = claimed_cores.try_into() else {
1670 return Err(InvalidAssignmentError::NoClaimedCandidates)
1671 };
1672
1673 let backing_groups = claimed_candidate_indices
1674 .iter_ones()
1675 .flat_map(|candidate_index| {
1676 entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1677 })
1678 .collect::<Vec<_>>();
1679
1680 assignment_criteria
1681 .check_assignment_cert(
1682 claimed_cores,
1683 assignment.validator,
1684 &polkadot_node_primitives::approval::criteria::Config::from(session_info),
1685 entry.vrf_story.clone(),
1686 &assignment.cert,
1687 backing_groups,
1688 )
1689 .map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1690 .map(|tranche| {
1691 CheckedIndirectAssignment::from_checked(
1692 assignment.clone(),
1693 claimed_candidate_indices.clone(),
1694 tranche,
1695 )
1696 })
1697 }
1698 async fn check_approval_can_be_processed<
1701 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1702 >(
1703 network_sender: &mut N,
1704 assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1705 approval_knowledge_key: &(MessageSubject, MessageKind),
1706 entry: &mut BlockEntry,
1707 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1708 topologies: &SessionGridTopologies,
1709 aggression_config: &AggressionConfig,
1710 reputation: &mut ReputationAggregator,
1711 peer_id: PeerId,
1712 metrics: &Metrics,
1713 ) -> bool {
1714 for message_subject in assignments_knowledge_key {
1715 if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1716 gum::trace!(
1717 target: LOG_TARGET,
1718 ?peer_id,
1719 ?message_subject,
1720 "Unknown approval assignment",
1721 );
1722 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1723 .await;
1724 metrics.on_approval_unknown_assignment();
1725 return false
1726 }
1727 }
1728
1729 match entry.known_by.entry(peer_id) {
1731 hash_map::Entry::Occupied(mut knowledge) => {
1732 let peer_knowledge = knowledge.get_mut();
1733 if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1734 if !peer_knowledge
1735 .received
1736 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1737 {
1738 if !Self::accept_duplicates_from_validators(
1739 blocks_by_number,
1740 topologies,
1741 aggression_config,
1742 entry,
1743 peer_id,
1744 ) {
1745 gum::trace!(
1746 target: LOG_TARGET,
1747 ?peer_id,
1748 ?approval_knowledge_key,
1749 "Duplicate approval",
1750 );
1751 modify_reputation(
1752 reputation,
1753 network_sender,
1754 peer_id,
1755 COST_DUPLICATE_MESSAGE,
1756 )
1757 .await;
1758 }
1759 metrics.on_approval_duplicate();
1760 }
1761 return false
1762 }
1763 },
1764 hash_map::Entry::Vacant(_) => {
1765 gum::debug!(
1766 target: LOG_TARGET,
1767 ?peer_id,
1768 ?approval_knowledge_key,
1769 "Approval from a peer is out of view",
1770 );
1771 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1772 .await;
1773 metrics.on_approval_out_of_view();
1774 },
1775 }
1776
1777 if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1778 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1779 peer_knowledge
1780 .received
1781 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1782 }
1783
1784 gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1786 metrics.on_approval_good_known();
1787 modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1788 false
1789 } else {
1790 true
1791 }
1792 }
1793
1794 async fn import_and_circulate_approval<
1795 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1796 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1797 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1798 >(
1799 &mut self,
1800 approval_voting_sender: &mut A,
1801 network_sender: &mut N,
1802 runtime_api_sender: &mut RA,
1803 metrics: &Metrics,
1804 source: MessageSource,
1805 vote: IndirectSignedApprovalVoteV2,
1806 session_info_provider: &mut RuntimeInfo,
1807 ) {
1808 let block_hash = vote.block_hash;
1809 let validator_index = vote.validator;
1810 let candidate_indices = &vote.candidate_indices;
1811 let entry = match self.blocks.get_mut(&block_hash) {
1812 Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1813 _ => {
1814 if let Some(peer_id) = source.peer_id() {
1815 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1816 gum::debug!(
1817 target: LOG_TARGET,
1818 ?peer_id,
1819 ?block_hash,
1820 ?validator_index,
1821 ?candidate_indices,
1822 "Approval from a peer is out of view",
1823 );
1824 modify_reputation(
1825 &mut self.reputation,
1826 network_sender,
1827 peer_id,
1828 COST_UNEXPECTED_MESSAGE,
1829 )
1830 .await;
1831 metrics.on_approval_invalid_block();
1832 } else {
1833 metrics.on_approval_recent_outdated();
1834 }
1835 }
1836 return
1837 },
1838 };
1839
1840 let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1842 let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1843
1844 if let Some(peer_id) = source.peer_id() {
1845 if !Self::check_approval_can_be_processed(
1846 network_sender,
1847 &assignments_knowledge_keys,
1848 &approval_knwowledge_key,
1849 entry,
1850 &self.blocks_by_number,
1851 &self.topologies,
1852 &self.aggression_config,
1853 &mut self.reputation,
1854 peer_id,
1855 metrics,
1856 )
1857 .await
1858 {
1859 return
1860 }
1861
1862 let result =
1863 Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1864 .await;
1865
1866 match result {
1867 Ok(vote) => {
1868 approval_voting_sender
1869 .send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1870 .await;
1871
1872 modify_reputation(
1873 &mut self.reputation,
1874 network_sender,
1875 peer_id,
1876 BENEFIT_VALID_MESSAGE_FIRST,
1877 )
1878 .await;
1879
1880 entry
1881 .knowledge
1882 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1883 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1884 peer_knowledge
1885 .received
1886 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1887 }
1888 },
1889 Err(err) => {
1890 modify_reputation(
1891 &mut self.reputation,
1892 network_sender,
1893 peer_id,
1894 COST_INVALID_MESSAGE,
1895 )
1896 .await;
1897
1898 gum::info!(
1899 target: LOG_TARGET,
1900 ?peer_id,
1901 ?err,
1902 "Got a bad approval from peer",
1903 );
1904 metrics.on_approval_bad();
1905 return
1906 },
1907 }
1908 } else {
1909 if !entry
1910 .knowledge
1911 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1912 {
1913 gum::warn!(
1915 target: LOG_TARGET,
1916 "Importing locally an already known approval",
1917 );
1918 return
1919 } else {
1920 gum::debug!(
1921 target: LOG_TARGET,
1922 "Importing locally a new approval",
1923 );
1924 }
1925 }
1926
1927 let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1928 Ok(required_routing) => required_routing,
1929 Err(err) => {
1930 gum::warn!(
1931 target: LOG_TARGET,
1932 hash = ?block_hash,
1933 validator_index = ?vote.validator,
1934 candidate_bitfield = ?vote.candidate_indices,
1935 ?err,
1936 "Possible bug: Vote import failed",
1937 );
1938 metrics.on_approval_bug();
1939 return
1940 },
1941 };
1942
1943 metrics.on_approval_imported();
1946
1947 let topology = self.topologies.get_topology(entry.session);
1950 let source_peer = source.peer_id();
1951
1952 let peer_filter = move |peer| {
1953 if Some(peer) == source_peer.as_ref() {
1954 return false
1955 }
1956
1957 let in_topology = topology
1967 .map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
1968 in_topology || peers_randomly_routed_to.contains(peer)
1969 };
1970
1971 let peers = entry
1972 .known_by
1973 .iter()
1974 .filter(|(p, _)| peer_filter(p))
1975 .filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
1976 .collect::<Vec<_>>();
1977
1978 for peer in peers.iter() {
1980 if let Some(entry) = entry.known_by.get_mut(&peer.0) {
1982 entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1983 }
1984 }
1985
1986 if !peers.is_empty() {
1987 let approvals = vec![vote];
1988 gum::trace!(
1989 target: LOG_TARGET,
1990 ?block_hash,
1991 local = source.peer_id().is_none(),
1992 num_peers = peers.len(),
1993 "Sending an approval to peers",
1994 );
1995 send_approvals_batched(network_sender, approvals, &peers).await;
1996 }
1997 }
1998
1999 async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2001 vote: &IndirectSignedApprovalVoteV2,
2002 entry: &BlockEntry,
2003 runtime_info: &mut RuntimeInfo,
2004 runtime_api_sender: &mut RA,
2005 ) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2006 if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2007 return Err(InvalidVoteError::CandidateIndexOutOfBounds)
2008 }
2009
2010 let candidate_hashes = vote
2011 .candidate_indices
2012 .iter_ones()
2013 .flat_map(|candidate_index| {
2014 entry
2015 .candidates_metadata
2016 .get(candidate_index)
2017 .map(|(candidate_hash, _, _)| *candidate_hash)
2018 })
2019 .collect::<Vec<_>>();
2020
2021 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2022 .get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2023 .await
2024 .map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2025
2026 let pubkey = session_info
2027 .validators
2028 .get(vote.validator)
2029 .ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2030 DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2031 candidate_hashes.clone(),
2032 ))
2033 .check_signature(
2034 &pubkey,
2035 *candidate_hashes.first().unwrap(),
2036 entry.session,
2037 &vote.signature,
2038 )
2039 .map_err(|_| InvalidVoteError::InvalidSignature)
2040 .map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2041 }
2042
2043 fn get_approval_signatures(
2045 &mut self,
2046 indices: HashSet<(Hash, CandidateIndex)>,
2047 ) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2048 let mut all_sigs = HashMap::new();
2049 for (hash, index) in indices {
2050 let block_entry = match self.blocks.get(&hash) {
2051 None => {
2052 gum::debug!(
2053 target: LOG_TARGET,
2054 ?hash,
2055 "`get_approval_signatures`: could not find block entry for given hash!"
2056 );
2057 continue
2058 },
2059 Some(e) => e,
2060 };
2061
2062 let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2063 (
2064 approval.validator,
2065 (
2066 hash,
2067 approval
2068 .candidate_indices
2069 .iter_ones()
2070 .map(|val| val as CandidateIndex)
2071 .collect_vec(),
2072 approval.signature,
2073 ),
2074 )
2075 });
2076 all_sigs.extend(sigs);
2077 }
2078 all_sigs
2079 }
2080
2081 async fn unify_with_peer(
2082 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2083 metrics: &Metrics,
2084 entries: &mut HashMap<Hash, BlockEntry>,
2085 topologies: &SessionGridTopologies,
2086 total_peers: usize,
2087 peer_id: PeerId,
2088 protocol_version: ProtocolVersion,
2089 view: View,
2090 rng: &mut (impl CryptoRng + Rng),
2091 retry_known_blocks: bool,
2092 ) {
2093 metrics.on_unify_with_peer();
2094 let _timer = metrics.time_unify_with_peer();
2095
2096 let mut assignments_to_send = Vec::new();
2097 let mut approvals_to_send = Vec::new();
2098
2099 let view_finalized_number = view.finalized_number;
2100 for head in view.into_iter() {
2101 let mut block = head;
2102
2103 loop {
2105 let entry = match entries.get_mut(&block) {
2106 Some(entry) if entry.number > view_finalized_number => entry,
2107 _ => break,
2108 };
2109
2110 if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2116 break
2117 }
2118
2119 let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2120 let topology = topologies.get_topology(entry.session);
2121
2122 for approval_entry in entry.approval_entries.values_mut() {
2125 {
2128 let required_routing = approval_entry.routing_info().required_routing;
2129 let routing_info = &mut approval_entry.routing_info_mut();
2130 let rng = &mut *rng;
2131 let mut peer_filter = move |peer_id| {
2132 let in_topology = topology.as_ref().map_or(false, |t| {
2133 t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2134 });
2135 in_topology || {
2136 if !topology
2137 .map(|topology| topology.is_validator(peer_id))
2138 .unwrap_or(false)
2139 {
2140 return false
2141 }
2142
2143 let route_random =
2144 routing_info.random_routing.sample(total_peers, rng);
2145 if route_random {
2146 routing_info.mark_randomly_sent(*peer_id);
2147 }
2148
2149 route_random
2150 }
2151 };
2152
2153 if !peer_filter(&peer_id) {
2154 continue
2155 }
2156 }
2157
2158 let assignment_message = approval_entry.assignment();
2159 let approval_messages = approval_entry.approvals();
2160 let (assignment_knowledge, message_kind) =
2161 approval_entry.create_assignment_knowledge(block);
2162
2163 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2166 peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2167 assignments_to_send.push(assignment_message);
2168 }
2169
2170 for approval_message in approval_messages {
2172 let approval_knowledge =
2173 PeerKnowledge::generate_approval_key(&approval_message);
2174
2175 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2176 approvals_to_send.push(approval_message);
2177 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2178 }
2179 }
2180 }
2181
2182 block = entry.parent_hash;
2183 }
2184 }
2185
2186 if !assignments_to_send.is_empty() {
2187 gum::trace!(
2188 target: LOG_TARGET,
2189 ?peer_id,
2190 ?protocol_version,
2191 num = assignments_to_send.len(),
2192 "Sending assignments to unified peer",
2193 );
2194
2195 send_assignments_batched(
2196 sender,
2197 assignments_to_send,
2198 &vec![(peer_id, protocol_version)],
2199 )
2200 .await;
2201 }
2202
2203 if !approvals_to_send.is_empty() {
2204 gum::trace!(
2205 target: LOG_TARGET,
2206 ?peer_id,
2207 ?protocol_version,
2208 num = approvals_to_send.len(),
2209 "Sending approvals to unified peer",
2210 );
2211
2212 send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2213 .await;
2214 }
2215 }
2216
2217 async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2228 &mut self,
2229 network_sender: &mut N,
2230 resend: Resend,
2231 metrics: &Metrics,
2232 ) {
2233 let config = self.aggression_config.clone();
2234 let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2235 let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2236
2237 let (min_age, max_age) = match (min_age, max_age) {
2239 (Some(min), Some(max)) => (*min, *max),
2240 _ => return, };
2242
2243 let age = max_age.saturating_sub(min_age);
2244
2245 if !self.aggression_config.should_trigger_aggression(age) {
2247 gum::trace!(
2248 target: LOG_TARGET,
2249 approval_checking_lag = self.approval_checking_lag,
2250 age,
2251 "Aggression not enabled",
2252 );
2253 return
2254 }
2255 gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2256
2257 adjust_required_routing_and_propagate(
2258 network_sender,
2259 &mut self.blocks,
2260 &self.topologies,
2261 |block_entry| {
2262 let block_age = max_age - block_entry.number;
2263 let diff_from_min_age = block_entry.number - min_age;
2267
2268 let blocks_since_last_sent = block_entry
2273 .last_resent_at_block_number
2274 .map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2275
2276 let can_resend_at_this_age = blocks_since_last_sent
2277 .zip(config.resend_unfinalized_period)
2278 .map(|(blocks_since_last_sent, unfinalized_period)| {
2279 blocks_since_last_sent >= unfinalized_period * 2
2280 })
2281 .unwrap_or(true);
2282
2283 if resend == Resend::Yes &&
2284 config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2285 block_age > 0 &&
2286 block_age % p == 0 && diff_from_min_age == 0 &&
2287 can_resend_at_this_age
2288 }) {
2289 for (_, knowledge) in block_entry.known_by.iter_mut() {
2291 knowledge.sent = Knowledge::default();
2292 }
2293 block_entry.last_resent_at_block_number = Some(max_age);
2294 gum::debug!(
2295 target: LOG_TARGET,
2296 block_number = ?block_entry.number,
2297 ?max_age,
2298 "Aggression enabled with resend for block",
2299 );
2300 true
2301 } else {
2302 false
2303 }
2304 },
2305 |required_routing, _, _| *required_routing,
2306 &self.peer_views,
2307 )
2308 .await;
2309
2310 adjust_required_routing_and_propagate(
2311 network_sender,
2312 &mut self.blocks,
2313 &self.topologies,
2314 |block_entry| {
2315 block_entry.number == min_age
2321 },
2322 |required_routing, local, _| {
2323 if *required_routing == RequiredRouting::PendingTopology {
2325 gum::debug!(
2326 target: LOG_TARGET,
2327 lag = ?self.approval_checking_lag,
2328 "Encountered old block pending gossip topology",
2329 );
2330 return *required_routing
2331 }
2332
2333 let mut new_required_routing = *required_routing;
2334
2335 if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2336 if local && new_required_routing != RequiredRouting::All {
2338 metrics.on_aggression_l1();
2339 new_required_routing = RequiredRouting::All;
2340 }
2341 }
2342
2343 if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2344 if !local && new_required_routing != RequiredRouting::GridXY {
2346 metrics.on_aggression_l2();
2347 new_required_routing = RequiredRouting::GridXY;
2348 }
2349 }
2350 new_required_routing
2351 },
2352 &self.peer_views,
2353 )
2354 .await;
2355 }
2356
2357 async fn sanitize_v2_assignments(
2360 &mut self,
2361 peer_id: PeerId,
2362 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2363 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2364 ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2365 let mut sanitized_assignments = Vec::new();
2366 for (cert, candidate_bitfield) in assignments.into_iter() {
2367 let cert_bitfield_bits = match &cert.cert.kind {
2368 AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2369 AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2373 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
2374 core_bitfield.len(),
2375 };
2376
2377 let candidate_bitfield_bits = candidate_bitfield.len();
2378
2379 let msb = candidate_bitfield_bits - 1;
2381
2382 if cert_bitfield_bits > MAX_BITFIELD_SIZE
2384 || candidate_bitfield_bits > MAX_BITFIELD_SIZE
2385 || !candidate_bitfield.bit_at(msb.as_bit_index())
2387 {
2388 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2390 .await;
2391 for candidate_index in candidate_bitfield.iter_ones() {
2392 gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2393 }
2394 } else {
2395 sanitized_assignments.push((cert, candidate_bitfield))
2396 }
2397 }
2398
2399 sanitized_assignments
2400 }
2401
2402 async fn sanitize_v2_approvals(
2404 &mut self,
2405 peer_id: PeerId,
2406 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2407 approval: Vec<IndirectSignedApprovalVoteV2>,
2408 ) -> Vec<IndirectSignedApprovalVoteV2> {
2409 let mut sanitized_approvals = Vec::new();
2410 for approval in approval.into_iter() {
2411 if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE {
2412 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2414 .await;
2415 gum::debug!(
2416 target: LOG_TARGET,
2417 block_hash = ?approval.block_hash,
2418 candidate_indices_len = ?approval.candidate_indices.len(),
2419 "Bad approval v2, invalid candidate indices size"
2420 );
2421 } else {
2422 sanitized_approvals.push(approval)
2423 }
2424 }
2425
2426 sanitized_approvals
2427 }
2428}
2429
2430#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2443async fn adjust_required_routing_and_propagate<
2444 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2445 BlockFilter,
2446 RoutingModifier,
2447>(
2448 network_sender: &mut N,
2449 blocks: &mut HashMap<Hash, BlockEntry>,
2450 topologies: &SessionGridTopologies,
2451 block_filter: BlockFilter,
2452 routing_modifier: RoutingModifier,
2453 peer_views: &HashMap<PeerId, PeerEntry>,
2454) where
2455 BlockFilter: Fn(&mut BlockEntry) -> bool,
2456 RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2457{
2458 let mut peer_assignments = HashMap::new();
2459 let mut peer_approvals = HashMap::new();
2460
2461 for (block_hash, block_entry) in blocks {
2464 if !block_filter(block_entry) {
2465 continue
2466 }
2467
2468 let topology = match topologies.get_topology(block_entry.session) {
2469 Some(t) => t,
2470 None => continue,
2471 };
2472
2473 for approval_entry in block_entry.approval_entries.values_mut() {
2476 let new_required_routing = routing_modifier(
2477 &approval_entry.routing_info().required_routing,
2478 approval_entry.routing_info().local,
2479 &approval_entry.validator_index(),
2480 );
2481
2482 approval_entry.update_required_routing(new_required_routing);
2483
2484 if approval_entry.routing_info().required_routing.is_empty() {
2485 continue
2486 }
2487
2488 let assignment_message = approval_entry.assignment();
2489 let approval_messages = approval_entry.approvals();
2490 let (assignment_knowledge, message_kind) =
2491 approval_entry.create_assignment_knowledge(*block_hash);
2492
2493 for (peer, peer_knowledge) in &mut block_entry.known_by {
2494 if !topology
2495 .local_grid_neighbors()
2496 .route_to_peer(approval_entry.routing_info().required_routing, peer)
2497 {
2498 continue
2499 }
2500
2501 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2503 peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2504 peer_assignments
2505 .entry(*peer)
2506 .or_insert_with(Vec::new)
2507 .push(assignment_message.clone());
2508 }
2509
2510 for approval_message in &approval_messages {
2512 let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2513
2514 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2515 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2516 peer_approvals
2517 .entry(*peer)
2518 .or_insert_with(Vec::new)
2519 .push(approval_message.clone());
2520 }
2521 }
2522 }
2523 }
2524 }
2525
2526 for (peer, assignments_packet) in peer_assignments {
2528 if let Some(peer_view) = peer_views.get(&peer) {
2529 send_assignments_batched(
2530 network_sender,
2531 assignments_packet,
2532 &vec![(peer, peer_view.version)],
2533 )
2534 .await;
2535 } else {
2536 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2538 }
2539 }
2540
2541 for (peer, approvals_packet) in peer_approvals {
2542 if let Some(peer_view) = peer_views.get(&peer) {
2543 send_approvals_batched(
2544 network_sender,
2545 approvals_packet,
2546 &vec![(peer, peer_view.version)],
2547 )
2548 .await;
2549 } else {
2550 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2552 }
2553 }
2554}
2555
2556async fn modify_reputation(
2558 reputation: &mut ReputationAggregator,
2559 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2560 peer_id: PeerId,
2561 rep: Rep,
2562) {
2563 gum::trace!(
2564 target: LOG_TARGET,
2565 reputation = ?rep,
2566 ?peer_id,
2567 "Reputation change for peer",
2568 );
2569 reputation.modify(sender, peer_id, rep).await;
2570}
2571
2572#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2573impl ApprovalDistribution {
2574 pub fn new(
2576 metrics: Metrics,
2577 slot_duration_millis: u64,
2578 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2579 ) -> Self {
2580 Self::new_with_clock(
2581 metrics,
2582 slot_duration_millis,
2583 Arc::new(SystemClock),
2584 assignment_criteria,
2585 )
2586 }
2587
2588 pub fn new_with_clock(
2590 metrics: Metrics,
2591 slot_duration_millis: u64,
2592 clock: Arc<dyn Clock + Send + Sync>,
2593 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2594 ) -> Self {
2595 Self { metrics, slot_duration_millis, clock, assignment_criteria }
2596 }
2597
2598 async fn run<Context>(self, ctx: Context) {
2599 let mut state =
2600 State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2601 let mut rng = rand::rngs::StdRng::from_entropy();
2604 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2605 keystore: None,
2606 session_cache_lru_size: DISPUTE_WINDOW.get(),
2607 });
2608
2609 self.run_inner(
2610 ctx,
2611 &mut state,
2612 REPUTATION_CHANGE_INTERVAL,
2613 &mut rng,
2614 &mut session_info_provider,
2615 )
2616 .await
2617 }
2618
2619 async fn run_inner<Context>(
2621 self,
2622 mut ctx: Context,
2623 state: &mut State,
2624 reputation_interval: Duration,
2625 rng: &mut (impl CryptoRng + Rng),
2626 session_info_provider: &mut RuntimeInfo,
2627 ) {
2628 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2629 let mut reputation_delay = new_reputation_delay();
2630 let mut approval_voting_sender = ctx.sender().clone();
2631 let mut network_sender = ctx.sender().clone();
2632 let mut runtime_api_sender = ctx.sender().clone();
2633
2634 loop {
2635 select! {
2636 _ = reputation_delay => {
2637 state.reputation.send(ctx.sender()).await;
2638 reputation_delay = new_reputation_delay();
2639 },
2640 message = ctx.recv().fuse() => {
2641 let message = match message {
2642 Ok(message) => message,
2643 Err(e) => {
2644 gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2645 return
2646 },
2647 };
2648
2649 if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2650 return;
2651 }
2652
2653 },
2654 }
2655 }
2656 }
2657
2658 pub async fn handle_from_orchestra<
2662 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2663 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2664 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2665 >(
2666 &self,
2667 message: FromOrchestra<ApprovalDistributionMessage>,
2668 approval_voting_sender: &mut A,
2669 network_sender: &mut N,
2670 runtime_api_sender: &mut RA,
2671 state: &mut State,
2672 rng: &mut (impl CryptoRng + Rng),
2673 session_info_provider: &mut RuntimeInfo,
2674 ) -> bool {
2675 match message {
2676 FromOrchestra::Communication { msg } =>
2677 Self::handle_incoming(
2678 approval_voting_sender,
2679 network_sender,
2680 runtime_api_sender,
2681 state,
2682 msg,
2683 &self.metrics,
2684 rng,
2685 self.assignment_criteria.as_ref(),
2686 self.clock.as_ref(),
2687 session_info_provider,
2688 )
2689 .await,
2690 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2691 gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2692 },
2697 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2698 gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2699 state.handle_block_finalized(network_sender, &self.metrics, number).await;
2700 },
2701 FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2702 }
2703 false
2704 }
2705
2706 async fn handle_incoming<
2707 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2708 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2709 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2710 >(
2711 approval_voting_sender: &mut A,
2712 network_sender: &mut N,
2713 runtime_api_sender: &mut RA,
2714 state: &mut State,
2715 msg: ApprovalDistributionMessage,
2716 metrics: &Metrics,
2717 rng: &mut (impl CryptoRng + Rng),
2718 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2719 clock: &(impl Clock + ?Sized),
2720 session_info_provider: &mut RuntimeInfo,
2721 ) {
2722 match msg {
2723 ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2724 state
2725 .handle_network_msg(
2726 approval_voting_sender,
2727 network_sender,
2728 runtime_api_sender,
2729 metrics,
2730 event,
2731 rng,
2732 assignment_criteria,
2733 clock,
2734 session_info_provider,
2735 )
2736 .await;
2737 },
2738 ApprovalDistributionMessage::NewBlocks(metas) => {
2739 state
2740 .handle_new_blocks(
2741 approval_voting_sender,
2742 network_sender,
2743 runtime_api_sender,
2744 metrics,
2745 metas,
2746 rng,
2747 assignment_criteria,
2748 clock,
2749 session_info_provider,
2750 )
2751 .await;
2752 },
2753 ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2754 gum::debug!(
2755 target: LOG_TARGET,
2756 ?candidate_indices,
2757 block_hash = ?cert.block_hash,
2758 assignment_kind = ?cert.cert.kind,
2759 "Distributing our assignment on candidates",
2760 );
2761
2762 state
2763 .import_and_circulate_assignment(
2764 approval_voting_sender,
2765 network_sender,
2766 runtime_api_sender,
2767 &metrics,
2768 MessageSource::Local,
2769 cert,
2770 candidate_indices,
2771 rng,
2772 assignment_criteria,
2773 clock,
2774 session_info_provider,
2775 )
2776 .await;
2777 },
2778 ApprovalDistributionMessage::DistributeApproval(vote) => {
2779 gum::debug!(
2780 target: LOG_TARGET,
2781 "Distributing our approval vote on candidate (block={}, index={:?})",
2782 vote.block_hash,
2783 vote.candidate_indices,
2784 );
2785
2786 state
2787 .import_and_circulate_approval(
2788 approval_voting_sender,
2789 network_sender,
2790 runtime_api_sender,
2791 metrics,
2792 MessageSource::Local,
2793 vote,
2794 session_info_provider,
2795 )
2796 .await;
2797 },
2798 ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2799 let sigs = state.get_approval_signatures(indices);
2800 if let Err(_) = tx.send(sigs) {
2801 gum::debug!(
2802 target: LOG_TARGET,
2803 "Sending back approval signatures failed, oneshot got closed"
2804 );
2805 }
2806 },
2807 ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2808 gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2809 state.approval_checking_lag = lag;
2810 },
2811 }
2812 }
2813}
2814
2815#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2816impl<Context> ApprovalDistribution {
2817 fn start(self, ctx: Context) -> SpawnedSubsystem {
2818 let future = self.run(ctx).map(|_| Ok(())).boxed();
2819
2820 SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2821 }
2822}
2823
2824const fn ensure_size_not_zero(size: usize) -> usize {
2826 if 0 == size {
2827 panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2828 }
2829
2830 size
2831}
2832
2833pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2838 MAX_NOTIFICATION_SIZE as usize /
2839 std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2840 3,
2841);
2842
2843pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2845 MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2846);
2847
2848async fn send_assignments_batched_inner(
2850 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2851 batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2852 peers: Vec<PeerId>,
2853 _peer_version: ValidationVersion,
2854) {
2855 sender
2856 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
2857 peers,
2858 ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2859 protocol_v3::ApprovalDistributionMessage::Assignments(batch.into_iter().collect()),
2860 )),
2861 ))
2862 .await;
2863}
2864
2865pub(crate) async fn send_assignments_batched(
2871 network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2872 v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
2873 peers: &[(PeerId, ProtocolVersion)],
2874) {
2875 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2876
2877 if !v3_peers.is_empty() {
2878 let mut v3 = v2_assignments.into_iter().peekable();
2879
2880 while v3.peek().is_some() {
2881 let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
2882 send_assignments_batched_inner(
2883 network_sender,
2884 batch,
2885 v3_peers.clone(),
2886 ValidationVersion::V3,
2887 )
2888 .await;
2889 }
2890 }
2891}
2892
2893pub(crate) async fn send_approvals_batched(
2895 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2896 approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
2897 peers: &[(PeerId, ProtocolVersion)],
2898) {
2899 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2900
2901 if !v3_peers.is_empty() {
2902 let mut batches = approvals.into_iter().peekable();
2903
2904 while batches.peek().is_some() {
2905 let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
2906
2907 sender
2908 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
2909 v3_peers.clone(),
2910 ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2911 protocol_v3::ApprovalDistributionMessage::Approvals(batch),
2912 )),
2913 ))
2914 .await;
2915 }
2916 }
2917}