referrerpolicy=no-referrer-when-downgrade

polkadot_approval_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! [`ApprovalDistribution`] implementation.
18//!
19//! See the documentation on [approval distribution][approval-distribution-page] in the
20//! implementers' guide.
21//!
22//! [approval-distribution-page]: https://paritytech.github.io/polkadot-sdk/book/node/approval/approval-distribution.html
23
24#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31	self as net_protocol, filter_by_peer_version,
32	grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33	peer_set::MAX_NOTIFICATION_SIZE,
34	v3 as protocol_v3, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
35};
36use polkadot_node_primitives::{
37	approval::{
38		criteria::{AssignmentCriteria, InvalidAssignment},
39		time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
40		v1::{BlockApprovalMeta, DelayTranche, RelayVRFStory},
41		v2::{
42			AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
43			IndirectSignedApprovalVoteV2,
44		},
45	},
46	DISPUTE_WINDOW,
47};
48use polkadot_node_subsystem::{
49	messages::{
50		ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
51		CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
52		RuntimeApiMessage,
53	},
54	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
55};
56use polkadot_node_subsystem_util::{
57	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
58	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
59};
60use polkadot_primitives::{
61	BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
62	SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
63};
64use rand::{CryptoRng, Rng, SeedableRng};
65use std::{
66	collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
67	sync::Arc,
68	time::Duration,
69};
70
71/// Approval distribution metrics.
72pub mod metrics;
73
74#[cfg(test)]
75mod tests;
76
77const LOG_TARGET: &str = "parachain::approval-distribution";
78
79const COST_UNEXPECTED_MESSAGE: Rep =
80	Rep::CostMinor("Peer sent an out-of-view assignment or approval");
81const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
82const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
83	Rep::CostMinor("The vote was valid but too far in the future");
84const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
85const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
86
87const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
88const BENEFIT_VALID_MESSAGE_FIRST: Rep =
89	Rep::BenefitMinorFirst("Valid message with new information");
90
91// Maximum valid size for the `CandidateBitfield` in the assignment messages.
92const MAX_BITFIELD_SIZE: usize = 500;
93
94/// The Approval Distribution subsystem.
95pub struct ApprovalDistribution {
96	metrics: Metrics,
97	slot_duration_millis: u64,
98	clock: Arc<dyn Clock + Send + Sync>,
99	assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
100}
101
102/// Contains recently finalized
103/// or those pruned due to finalization.
104#[derive(Default)]
105struct RecentlyOutdated {
106	buf: VecDeque<Hash>,
107}
108
109impl RecentlyOutdated {
110	fn note_outdated(&mut self, hash: Hash) {
111		const MAX_BUF_LEN: usize = 20;
112
113		self.buf.push_back(hash);
114
115		while self.buf.len() > MAX_BUF_LEN {
116			let _ = self.buf.pop_front();
117		}
118	}
119
120	fn is_recent_outdated(&self, hash: &Hash) -> bool {
121		self.buf.contains(hash)
122	}
123}
124
125// Contains topology routing information for assignments and approvals.
126struct ApprovalRouting {
127	required_routing: RequiredRouting,
128	local: bool,
129	random_routing: RandomRouting,
130	peers_randomly_routed: Vec<PeerId>,
131}
132
133impl ApprovalRouting {
134	fn mark_randomly_sent(&mut self, peer: PeerId) {
135		self.random_routing.inc_sent();
136		self.peers_randomly_routed.push(peer);
137	}
138}
139
140// This struct is responsible for tracking the full state of an assignment and grid routing
141// information.
142struct ApprovalEntry {
143	// The assignment certificate.
144	assignment: IndirectAssignmentCertV2,
145	// The candidates claimed by the certificate. A mapping between bit index and candidate index.
146	assignment_claimed_candidates: CandidateBitfield,
147	// The approval signatures for each `CandidateIndex` claimed by the assignment certificate.
148	approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
149	// The validator index of the assignment signer.
150	validator_index: ValidatorIndex,
151	// Information required for gossiping to other peers using the grid topology.
152	routing_info: ApprovalRouting,
153}
154
155#[derive(Debug)]
156enum ApprovalEntryError {
157	InvalidValidatorIndex,
158	CandidateIndexOutOfBounds,
159	InvalidCandidateIndex,
160	DuplicateApproval,
161	UnknownAssignment,
162}
163
164impl ApprovalEntry {
165	pub fn new(
166		assignment: IndirectAssignmentCertV2,
167		candidates: CandidateBitfield,
168		routing_info: ApprovalRouting,
169	) -> ApprovalEntry {
170		Self {
171			validator_index: assignment.validator,
172			assignment,
173			approvals: HashMap::new(),
174			assignment_claimed_candidates: candidates,
175			routing_info,
176		}
177	}
178
179	// Create a `MessageSubject` to reference the assignment.
180	pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
181		(
182			MessageSubject(
183				block_hash,
184				self.assignment_claimed_candidates.clone(),
185				self.validator_index,
186			),
187			MessageKind::Assignment,
188		)
189	}
190
191	// Updates routing information and returns the previous information if any.
192	pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
193		&mut self.routing_info
194	}
195
196	// Get the routing information.
197	pub fn routing_info(&self) -> &ApprovalRouting {
198		&self.routing_info
199	}
200
201	// Update routing information.
202	pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
203		self.routing_info.required_routing = required_routing;
204	}
205
206	// Tells if this entry assignment covers at least one candidate in the approval
207	pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
208		for candidate_index in approval.candidate_indices.iter_ones() {
209			if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
210				return true
211			}
212		}
213		return false
214	}
215
216	// Records a new approval. Returns error if the claimed candidate is not found or we already
217	// have received the approval.
218	pub fn note_approval(
219		&mut self,
220		approval: IndirectSignedApprovalVoteV2,
221	) -> Result<(), ApprovalEntryError> {
222		// First do some sanity checks:
223		// - check validator index matches
224		// - check claimed candidate
225		// - check for duplicate approval
226		if self.validator_index != approval.validator {
227			return Err(ApprovalEntryError::InvalidValidatorIndex)
228		}
229
230		// We need at least one of the candidates in the approval to be in this assignment
231		if !self.includes_approval_candidates(&approval) {
232			return Err(ApprovalEntryError::InvalidCandidateIndex)
233		}
234
235		if self.approvals.contains_key(&approval.candidate_indices) {
236			return Err(ApprovalEntryError::DuplicateApproval)
237		}
238
239		self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
240		Ok(())
241	}
242
243	// Get the assignment certificate and claimed candidates.
244	pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
245		(self.assignment.clone(), self.assignment_claimed_candidates.clone())
246	}
247
248	// Get all approvals for all candidates claimed by the assignment.
249	pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
250		self.approvals.values().cloned().collect::<Vec<_>>()
251	}
252
253	// Get validator index.
254	pub fn validator_index(&self) -> ValidatorIndex {
255		self.validator_index
256	}
257}
258
259// We keep track of each peer view and protocol version using this struct.
260struct PeerEntry {
261	pub view: View,
262	pub version: ProtocolVersion,
263}
264
265// In case the original grid topology mechanisms don't work on their own, we need to trade bandwidth
266// for protocol liveliness by introducing aggression.
267//
268// Aggression has 3 levels:
269//
270//  * Aggression Level 0: The basic behaviors described above.
271//  * Aggression Level 1: The originator of a message sends to all peers. Other peers follow the
272//    rules above.
273//  * Aggression Level 2: All peers send all messages to all their row and column neighbors. This
274//    means that each validator will, on average, receive each message approximately `2*sqrt(n)`
275//    times.
276// The aggression level of messages pertaining to a block increases when that block is unfinalized
277// and is a child of the finalized block.
278// This means that only one block at a time has its messages propagated with aggression > 0.
279//
280// A note on aggression thresholds: changes in propagation apply only to blocks which are the
281// _direct descendants_ of the finalized block which are older than the given threshold,
282// not to all blocks older than the threshold. Most likely, a few assignments struggle to
283// be propagated in a single block and this holds up all of its descendants blocks.
284// Accordingly, we only step on the gas for the block which is most obviously holding up finality.
285/// Aggression configuration representation
286#[derive(Clone)]
287struct AggressionConfig {
288	/// Aggression level 1: all validators send all their own messages to all peers.
289	l1_threshold: Option<BlockNumber>,
290	/// Aggression level 2: level 1 + all validators send all messages to all peers in the X and Y
291	/// dimensions.
292	l2_threshold: Option<BlockNumber>,
293	/// How often to re-send messages to all targeted recipients.
294	/// This applies to all unfinalized blocks.
295	resend_unfinalized_period: Option<BlockNumber>,
296}
297
298impl AggressionConfig {
299	/// Returns `true` if age is past threshold depending on the aggression level
300	fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
301		if let Some(t) = self.l1_threshold {
302			age >= t
303		} else if let Some(t) = self.resend_unfinalized_period {
304			age > 0 && age % t == 0
305		} else {
306			false
307		}
308	}
309}
310
311impl Default for AggressionConfig {
312	fn default() -> Self {
313		AggressionConfig {
314			l1_threshold: Some(16),
315			l2_threshold: Some(64),
316			resend_unfinalized_period: Some(8),
317		}
318	}
319}
320
321#[derive(PartialEq)]
322enum Resend {
323	Yes,
324	No,
325}
326
327/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
328///
329/// It tracks metadata about our view of the unfinalized chain,
330/// which assignments and approvals we have seen, and our peers' views.
331#[derive(Default)]
332pub struct State {
333	/// These two fields are used in conjunction to construct a view over the unfinalized chain.
334	blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
335	blocks: HashMap<Hash, BlockEntry>,
336
337	/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
338	/// against the directly mentioned blocks in our view in this map until `NewBlocks` is
339	/// received.
340	///
341	/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't
342	/// delayed by more than a block length, this strategy will work well for mitigating the race.
343	/// This is also a race that occurs typically on local networks.
344	pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
345
346	/// Peer data is partially stored here, and partially inline within the [`BlockEntry`]s
347	peer_views: HashMap<PeerId, PeerEntry>,
348
349	/// Keeps a topology for various different sessions.
350	topologies: SessionGridTopologies,
351
352	/// Tracks recently finalized blocks.
353	recent_outdated_blocks: RecentlyOutdated,
354
355	/// Aggression configuration.
356	aggression_config: AggressionConfig,
357
358	/// Current approval checking finality lag.
359	approval_checking_lag: BlockNumber,
360
361	/// Aggregated reputation change
362	reputation: ReputationAggregator,
363
364	/// Slot duration in millis
365	slot_duration_millis: u64,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum MessageKind {
370	Assignment,
371	Approval,
372}
373
374// Utility structure to identify assignments and approvals for specific candidates.
375// Assignments can span multiple candidates, while approvals refer to only one candidate.
376//
377#[derive(Debug, Clone, Hash, PartialEq, Eq)]
378struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
379
380#[derive(Debug, Clone, Default)]
381struct Knowledge {
382	// When there is no entry, this means the message is unknown
383	// When there is an entry with `MessageKind::Assignment`, the assignment is known.
384	// When there is an entry with `MessageKind::Approval`, the assignment and approval are known.
385	known_messages: HashMap<MessageSubject, MessageKind>,
386}
387
388impl Knowledge {
389	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
390		match (kind, self.known_messages.get(message)) {
391			(_, None) => false,
392			(MessageKind::Assignment, Some(_)) => true,
393			(MessageKind::Approval, Some(MessageKind::Assignment)) => false,
394			(MessageKind::Approval, Some(MessageKind::Approval)) => true,
395		}
396	}
397
398	fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
399		let mut success = match self.known_messages.entry(message.clone()) {
400			hash_map::Entry::Vacant(vacant) => {
401				vacant.insert(kind);
402				// If there are multiple candidates assigned in the message, create
403				// separate entries for each one.
404				true
405			},
406			hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
407				(MessageKind::Assignment, MessageKind::Assignment) => false,
408				(MessageKind::Approval, MessageKind::Approval) => false,
409				(MessageKind::Approval, MessageKind::Assignment) => false,
410				(MessageKind::Assignment, MessageKind::Approval) => {
411					*occupied.get_mut() = MessageKind::Approval;
412					true
413				},
414			},
415		};
416
417		// In case of successful insertion of multiple candidate assignments create additional
418		// entries for each assigned candidate. This fakes knowledge of individual assignments, but
419		// we need to share the same `MessageSubject` with the followup approval candidate index.
420		if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
421			for candidate_index in message.1.iter_ones() {
422				success = success &&
423					self.insert(
424						MessageSubject(
425							message.0,
426							vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
427							message.2,
428						),
429						kind,
430					);
431			}
432		}
433		success
434	}
435}
436
437/// Information that has been circulated to and from a peer.
438#[derive(Debug, Clone, Default)]
439struct PeerKnowledge {
440	/// The knowledge we've sent to the peer.
441	sent: Knowledge,
442	/// The knowledge we've received from the peer.
443	received: Knowledge,
444}
445
446impl PeerKnowledge {
447	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
448		self.sent.contains(message, kind) || self.received.contains(message, kind)
449	}
450
451	// Generate the knowledge keys for querying if all assignments of an approval are known
452	// by this peer.
453	fn generate_assignments_keys(
454		approval: &IndirectSignedApprovalVoteV2,
455	) -> Vec<(MessageSubject, MessageKind)> {
456		approval
457			.candidate_indices
458			.iter_ones()
459			.map(|candidate_index| {
460				(
461					MessageSubject(
462						approval.block_hash,
463						(candidate_index as CandidateIndex).into(),
464						approval.validator,
465					),
466					MessageKind::Assignment,
467				)
468			})
469			.collect_vec()
470	}
471
472	// Generate the knowledge keys for querying if an approval is known by peer.
473	fn generate_approval_key(
474		approval: &IndirectSignedApprovalVoteV2,
475	) -> (MessageSubject, MessageKind) {
476		(
477			MessageSubject(
478				approval.block_hash,
479				approval.candidate_indices.clone(),
480				approval.validator,
481			),
482			MessageKind::Approval,
483		)
484	}
485}
486
487/// Information about blocks in our current view as well as whether peers know of them.
488struct BlockEntry {
489	/// Peers who we know are aware of this block and thus, the candidates within it.
490	/// This maps to their knowledge of messages.
491	known_by: HashMap<PeerId, PeerKnowledge>,
492	/// The number of the block.
493	number: BlockNumber,
494	/// The parent hash of the block.
495	parent_hash: Hash,
496	/// Our knowledge of messages.
497	knowledge: Knowledge,
498	/// A votes entry for each candidate indexed by [`CandidateIndex`].
499	candidates: Vec<CandidateEntry>,
500	/// Information about candidate metadata.
501	candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
502	/// The session index of this block.
503	session: SessionIndex,
504	/// Approval entries for whole block. These also contain all approvals in the case of multiple
505	/// candidates being claimed by assignments.
506	approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
507	/// The block vrf story.
508	vrf_story: RelayVRFStory,
509	/// The block slot.
510	slot: Slot,
511	/// Backing off from re-sending messages to peers.
512	last_resent_at_block_number: Option<u32>,
513}
514
515impl BlockEntry {
516	// Returns the peer which currently know this block.
517	pub fn known_by(&self) -> Vec<PeerId> {
518		self.known_by.keys().cloned().collect::<Vec<_>>()
519	}
520
521	pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
522		// First map one entry per candidate to the same key we will use in `approval_entries`.
523		// Key is (Validator_index, CandidateBitfield) that links the `ApprovalEntry` to the (K,V)
524		// entry in `candidate_entry.messages`.
525		for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
526			match self.candidates.get_mut(claimed_candidate_index) {
527				Some(candidate_entry) => {
528					candidate_entry
529						.assignments
530						.entry(entry.validator_index())
531						.or_insert(entry.assignment_claimed_candidates.clone());
532				},
533				None => {
534					// This should never happen, but if it happens, it means the subsystem is
535					// broken.
536					gum::warn!(
537						target: LOG_TARGET,
538						hash = ?entry.assignment.block_hash,
539						?claimed_candidate_index,
540						"Missing candidate entry on `import_and_circulate_assignment`",
541					);
542				},
543			};
544		}
545
546		self.approval_entries
547			.entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
548			.or_insert(entry)
549	}
550
551	// Tels if all candidate_indices are valid candidates
552	pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
553		candidate_indices
554			.iter_ones()
555			.all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
556	}
557
558	// Saves the given approval in all ApprovalEntries that contain an assignment for any of the
559	// candidates in the approval.
560	//
561	// Returns the required routing needed for this approval and the lit of random peers the
562	// covering assignments were sent.
563	pub fn note_approval(
564		&mut self,
565		approval: IndirectSignedApprovalVoteV2,
566	) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
567		let mut required_routing: Option<RequiredRouting> = None;
568		let mut peers_randomly_routed_to = HashSet::new();
569
570		if self.candidates.len() < approval.candidate_indices.len() as usize {
571			return Err(ApprovalEntryError::CandidateIndexOutOfBounds)
572		}
573
574		// First determine all assignments bitfields that might be covered by this approval
575		let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
576			.candidate_indices
577			.iter_ones()
578			.filter_map(|candidate_index| {
579				self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
580					candidate_entry.assignments.get(&approval.validator).cloned()
581				})
582			})
583			.collect();
584
585		// Mark the vote in all approval entries
586		for assignment_bitfield in covered_assignments_bitfields {
587			if let Some(approval_entry) =
588				self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
589			{
590				approval_entry.note_approval(approval.clone())?;
591				peers_randomly_routed_to
592					.extend(approval_entry.routing_info().peers_randomly_routed.iter());
593
594				if let Some(current_required_routing) = required_routing {
595					required_routing = Some(
596						current_required_routing
597							.combine(approval_entry.routing_info().required_routing),
598					);
599				} else {
600					required_routing = Some(approval_entry.routing_info().required_routing)
601				}
602			}
603		}
604
605		if let Some(required_routing) = required_routing {
606			Ok((required_routing, peers_randomly_routed_to))
607		} else {
608			Err(ApprovalEntryError::UnknownAssignment)
609		}
610	}
611
612	/// Returns the list of approval votes covering this candidate
613	pub fn approval_votes(
614		&self,
615		candidate_index: CandidateIndex,
616	) -> Vec<IndirectSignedApprovalVoteV2> {
617		let result: Option<
618			HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
619		> = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
620			candidate_entry
621				.assignments
622				.iter()
623				.filter_map(|(validator, assignment_bitfield)| {
624					self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
625				})
626				.flat_map(|approval_entry| {
627					approval_entry
628						.approvals
629						.clone()
630						.into_iter()
631						.filter(|(approved_candidates, _)| {
632							approved_candidates.bit_at(candidate_index.as_bit_index())
633						})
634						.map(|(approved_candidates, vote)| {
635							((approval_entry.validator_index, approved_candidates), vote)
636						})
637				})
638				.collect()
639		});
640
641		result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
642	}
643}
644
645// Information about candidates in the context of a particular block they are included in.
646// In other words, multiple `CandidateEntry`s may exist for the same candidate,
647// if it is included by multiple blocks - this is likely the case when there are forks.
648#[derive(Debug, Default)]
649struct CandidateEntry {
650	// The value represents part of the lookup key in `approval_entries` to fetch the assignment
651	// and existing votes.
652	assignments: HashMap<ValidatorIndex, CandidateBitfield>,
653}
654
655#[derive(Debug, Clone, PartialEq)]
656enum MessageSource {
657	Peer(PeerId),
658	Local,
659}
660
661// Encountered error while validating an assignment.
662#[derive(Debug)]
663enum InvalidAssignmentError {
664	// The vrf check for the assignment failed.
665	#[allow(dead_code)]
666	CryptoCheckFailed(InvalidAssignment),
667	// The assignment did not claim any valid candidate.
668	NoClaimedCandidates,
669	// Claimed invalid candidate.
670	#[allow(dead_code)]
671	ClaimedInvalidCandidateIndex {
672		claimed_index: usize,
673		max_index: usize,
674	},
675	// The assignment claimes more candidates than the maximum allowed.
676	OversizedClaimedBitfield,
677	// `SessionInfo`  was not found for the block hash in the assignment.
678	#[allow(dead_code)]
679	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
680}
681
682// Encountered error while validating an approval.
683#[derive(Debug)]
684enum InvalidVoteError {
685	// The candidate index was out of bounds.
686	CandidateIndexOutOfBounds,
687	// The validator index was out of bounds.
688	ValidatorIndexOutOfBounds,
689	// The signature of the vote was invalid.
690	InvalidSignature,
691	// `SessionInfo` was not found for the block hash in the approval.
692	#[allow(dead_code)]
693	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
694}
695
696impl MessageSource {
697	fn peer_id(&self) -> Option<PeerId> {
698		match self {
699			Self::Peer(id) => Some(*id),
700			Self::Local => None,
701		}
702	}
703}
704
705enum PendingMessage {
706	Assignment(IndirectAssignmentCertV2, CandidateBitfield),
707	Approval(IndirectSignedApprovalVoteV2),
708}
709
710#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
711impl State {
712	/// Build State with specified slot duration.
713	pub fn with_config(slot_duration_millis: u64) -> Self {
714		Self { slot_duration_millis, ..Default::default() }
715	}
716
717	async fn handle_network_msg<
718		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
719		A: overseer::SubsystemSender<ApprovalVotingMessage>,
720		RA: overseer::SubsystemSender<RuntimeApiMessage>,
721	>(
722		&mut self,
723		approval_voting_sender: &mut A,
724		network_sender: &mut N,
725		runtime_api_sender: &mut RA,
726		metrics: &Metrics,
727		event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
728		rng: &mut (impl CryptoRng + Rng),
729		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
730		clock: &(impl Clock + ?Sized),
731		session_info_provider: &mut RuntimeInfo,
732	) {
733		match event {
734			NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
735				gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
736				if let Some(authority_ids) = authority_ids {
737					self.topologies.update_authority_ids(peer_id, &authority_ids);
738				}
739				// insert a blank view if none already present
740				self.peer_views
741					.entry(peer_id)
742					.or_insert(PeerEntry { view: Default::default(), version });
743			},
744			NetworkBridgeEvent::PeerDisconnected(peer_id) => {
745				gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
746				self.peer_views.remove(&peer_id);
747				self.blocks.iter_mut().for_each(|(_hash, entry)| {
748					entry.known_by.remove(&peer_id);
749				})
750			},
751			NetworkBridgeEvent::NewGossipTopology(topology) => {
752				self.handle_new_session_topology(
753					network_sender,
754					topology.session,
755					topology.topology,
756					topology.local_index,
757				)
758				.await;
759			},
760			NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
761				self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
762			},
763			NetworkBridgeEvent::OurViewChange(view) => {
764				gum::trace!(target: LOG_TARGET, ?view, "Own view change");
765				for head in view.iter() {
766					if !self.blocks.contains_key(head) {
767						self.pending_known.entry(*head).or_default();
768					}
769				}
770
771				self.pending_known.retain(|h, _| {
772					let live = view.contains(h);
773					if !live {
774						gum::trace!(
775							target: LOG_TARGET,
776							block_hash = ?h,
777							"Cleaning up stale pending messages",
778						);
779					}
780					live
781				});
782			},
783			NetworkBridgeEvent::PeerMessage(peer_id, message) => {
784				self.process_incoming_peer_message(
785					approval_voting_sender,
786					network_sender,
787					runtime_api_sender,
788					metrics,
789					peer_id,
790					message,
791					rng,
792					assignment_criteria,
793					clock,
794					session_info_provider,
795				)
796				.await;
797			},
798			NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
799				gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
800				// If we learn about a new PeerId for an authority ids we need to try to route the
801				// messages that should have sent to that validator according to the topology.
802				if self.topologies.update_authority_ids(peer_id, &authority_ids) {
803					if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
804						let intersection = self
805							.blocks_by_number
806							.iter()
807							.filter(|(block_number, _)| *block_number > &view.finalized_number)
808							.flat_map(|(_, hashes)| {
809								hashes.iter().filter(|hash| {
810									self.blocks
811										.get(&hash)
812										.map(|block| block.known_by.get(&peer_id).is_some())
813										.unwrap_or_default()
814								})
815							});
816						let view_intersection =
817							View::new(intersection.cloned(), view.finalized_number);
818						Self::unify_with_peer(
819							network_sender,
820							metrics,
821							&mut self.blocks,
822							&self.topologies,
823							self.peer_views.len(),
824							peer_id,
825							*version,
826							view_intersection,
827							rng,
828							true,
829						)
830						.await;
831					}
832				}
833			},
834		}
835	}
836
837	async fn handle_new_blocks<
838		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
839		A: overseer::SubsystemSender<ApprovalVotingMessage>,
840		RA: overseer::SubsystemSender<RuntimeApiMessage>,
841	>(
842		&mut self,
843		approval_voting_sender: &mut A,
844		network_sender: &mut N,
845		runtime_api_sender: &mut RA,
846		metrics: &Metrics,
847		metas: Vec<BlockApprovalMeta>,
848		rng: &mut (impl CryptoRng + Rng),
849		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
850		clock: &(impl Clock + ?Sized),
851		session_info_provider: &mut RuntimeInfo,
852	) {
853		let mut new_hashes = HashSet::new();
854
855		gum::debug!(
856			target: LOG_TARGET,
857			"Got new blocks {:?}",
858			metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
859		);
860
861		for meta in metas {
862			match self.blocks.entry(meta.hash) {
863				hash_map::Entry::Vacant(entry) => {
864					let candidates_count = meta.candidates.len();
865					let mut candidates = Vec::with_capacity(candidates_count);
866					candidates.resize_with(candidates_count, Default::default);
867
868					entry.insert(BlockEntry {
869						known_by: HashMap::new(),
870						number: meta.number,
871						parent_hash: meta.parent_hash,
872						knowledge: Knowledge::default(),
873						candidates,
874						session: meta.session,
875						approval_entries: HashMap::new(),
876						candidates_metadata: meta.candidates,
877						vrf_story: meta.vrf_story,
878						slot: meta.slot,
879						last_resent_at_block_number: None,
880					});
881
882					self.topologies.inc_session_refs(meta.session);
883
884					new_hashes.insert(meta.hash);
885
886					// In case there are duplicates, we should only set this if the entry
887					// was vacant.
888					self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
889				},
890				_ => continue,
891			}
892		}
893
894		{
895			for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
896				let intersection = view.iter().filter(|h| new_hashes.contains(h));
897				let view_intersection = View::new(intersection.cloned(), view.finalized_number);
898				Self::unify_with_peer(
899					network_sender,
900					metrics,
901					&mut self.blocks,
902					&self.topologies,
903					self.peer_views.len(),
904					*peer_id,
905					*version,
906					view_intersection,
907					rng,
908					false,
909				)
910				.await;
911			}
912
913			let pending_now_known = self
914				.pending_known
915				.keys()
916				.filter(|k| self.blocks.contains_key(k))
917				.copied()
918				.collect::<Vec<_>>();
919
920			let to_import = pending_now_known
921				.into_iter()
922				.inspect(|h| {
923					gum::trace!(
924						target: LOG_TARGET,
925						block_hash = ?h,
926						"Extracting pending messages for new block"
927					)
928				})
929				.filter_map(|k| self.pending_known.remove(&k))
930				.flatten()
931				.collect::<Vec<_>>();
932
933			if !to_import.is_empty() {
934				gum::debug!(
935					target: LOG_TARGET,
936					num = to_import.len(),
937					"Processing pending assignment/approvals",
938				);
939
940				let _timer = metrics.time_import_pending_now_known();
941
942				for (peer_id, message) in to_import {
943					match message {
944						PendingMessage::Assignment(assignment, claimed_indices) => {
945							self.import_and_circulate_assignment(
946								approval_voting_sender,
947								network_sender,
948								runtime_api_sender,
949								metrics,
950								MessageSource::Peer(peer_id),
951								assignment,
952								claimed_indices,
953								rng,
954								assignment_criteria,
955								clock,
956								session_info_provider,
957							)
958							.await;
959						},
960						PendingMessage::Approval(approval_vote) => {
961							self.import_and_circulate_approval(
962								approval_voting_sender,
963								network_sender,
964								runtime_api_sender,
965								metrics,
966								MessageSource::Peer(peer_id),
967								approval_vote,
968								session_info_provider,
969							)
970							.await;
971						},
972					}
973				}
974			}
975		}
976
977		self.enable_aggression(network_sender, Resend::Yes, metrics).await;
978	}
979
980	async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
981		&mut self,
982		network_sender: &mut N,
983		session: SessionIndex,
984		topology: SessionGridTopology,
985		local_index: Option<ValidatorIndex>,
986	) {
987		if local_index.is_none() {
988			// this subsystem only matters to validators.
989			return
990		}
991
992		self.topologies.insert_topology(session, topology, local_index);
993		let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
994
995		adjust_required_routing_and_propagate(
996			network_sender,
997			&mut self.blocks,
998			&self.topologies,
999			|block_entry| block_entry.session == session,
1000			|required_routing, local, validator_index| {
1001				if required_routing == &RequiredRouting::PendingTopology {
1002					topology
1003						.local_grid_neighbors()
1004						.required_routing_by_index(*validator_index, local)
1005				} else {
1006					*required_routing
1007				}
1008			},
1009			&self.peer_views,
1010		)
1011		.await;
1012	}
1013
1014	async fn process_incoming_assignments<A, N, R, RA>(
1015		&mut self,
1016		approval_voting_sender: &mut A,
1017		network_sender: &mut N,
1018		runtime_api_sender: &mut RA,
1019		metrics: &Metrics,
1020		peer_id: PeerId,
1021		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1022		rng: &mut R,
1023		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1024		clock: &(impl Clock + ?Sized),
1025		session_info_provider: &mut RuntimeInfo,
1026	) where
1027		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1028		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1029		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1030		R: CryptoRng + Rng,
1031	{
1032		for (assignment, claimed_indices) in assignments {
1033			if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1034				let block_hash = &assignment.block_hash;
1035				let validator_index = assignment.validator;
1036
1037				gum::trace!(
1038					target: LOG_TARGET,
1039					%peer_id,
1040					?block_hash,
1041					?claimed_indices,
1042					?validator_index,
1043					"Pending assignment",
1044				);
1045
1046				pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1047
1048				continue
1049			}
1050
1051			self.import_and_circulate_assignment(
1052				approval_voting_sender,
1053				network_sender,
1054				runtime_api_sender,
1055				metrics,
1056				MessageSource::Peer(peer_id),
1057				assignment,
1058				claimed_indices,
1059				rng,
1060				assignment_criteria,
1061				clock,
1062				session_info_provider,
1063			)
1064			.await;
1065		}
1066	}
1067
1068	// Entry point for processing an approval coming from a peer.
1069	async fn process_incoming_approvals<
1070		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1071		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1072		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1073	>(
1074		&mut self,
1075		approval_voting_sender: &mut A,
1076		network_sender: &mut N,
1077		runtime_api_sender: &mut RA,
1078		metrics: &Metrics,
1079		peer_id: PeerId,
1080		approvals: Vec<IndirectSignedApprovalVoteV2>,
1081		session_info_provider: &mut RuntimeInfo,
1082	) {
1083		gum::trace!(
1084			target: LOG_TARGET,
1085			peer_id = %peer_id,
1086			num = approvals.len(),
1087			"Processing approvals from a peer",
1088		);
1089		for approval_vote in approvals.into_iter() {
1090			if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1091				let block_hash = approval_vote.block_hash;
1092				let validator_index = approval_vote.validator;
1093
1094				gum::trace!(
1095					target: LOG_TARGET,
1096					%peer_id,
1097					?block_hash,
1098					?validator_index,
1099					"Pending assignment candidates {:?}",
1100					approval_vote.candidate_indices,
1101				);
1102
1103				pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1104
1105				continue
1106			}
1107
1108			self.import_and_circulate_approval(
1109				approval_voting_sender,
1110				network_sender,
1111				runtime_api_sender,
1112				metrics,
1113				MessageSource::Peer(peer_id),
1114				approval_vote,
1115				session_info_provider,
1116			)
1117			.await;
1118		}
1119	}
1120
1121	async fn process_incoming_peer_message<A, N, RA, R>(
1122		&mut self,
1123		approval_voting_sender: &mut A,
1124		network_sender: &mut N,
1125		runtime_api_sender: &mut RA,
1126		metrics: &Metrics,
1127		peer_id: PeerId,
1128		msg: ValidationProtocols<protocol_v3::ApprovalDistributionMessage>,
1129		rng: &mut R,
1130		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1131		clock: &(impl Clock + ?Sized),
1132		session_info_provider: &mut RuntimeInfo,
1133	) where
1134		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1135		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1136		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1137		R: CryptoRng + Rng,
1138	{
1139		match msg {
1140			ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Assignments(
1141				assignments,
1142			)) => {
1143				gum::trace!(
1144					target: LOG_TARGET,
1145					peer_id = %peer_id,
1146					num = assignments.len(),
1147					"Processing assignments from a peer",
1148				);
1149				let sanitized_assignments =
1150					self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1151
1152				self.process_incoming_assignments(
1153					approval_voting_sender,
1154					network_sender,
1155					runtime_api_sender,
1156					metrics,
1157					peer_id,
1158					sanitized_assignments,
1159					rng,
1160					assignment_criteria,
1161					clock,
1162					session_info_provider,
1163				)
1164				.await;
1165			},
1166			ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Approvals(
1167				approvals,
1168			)) => {
1169				let sanitized_approvals =
1170					self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1171				self.process_incoming_approvals(
1172					approval_voting_sender,
1173					network_sender,
1174					runtime_api_sender,
1175					metrics,
1176					peer_id,
1177					sanitized_approvals,
1178					session_info_provider,
1179				)
1180				.await;
1181			},
1182		}
1183	}
1184
1185	// handle a peer view change: requires that the peer is already connected
1186	// and has an entry in the `PeerData` struct.
1187	async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1188		&mut self,
1189		network_sender: &mut N,
1190		metrics: &Metrics,
1191		peer_id: PeerId,
1192		view: View,
1193		rng: &mut R,
1194	) where
1195		R: CryptoRng + Rng,
1196	{
1197		gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1198		let finalized_number = view.finalized_number;
1199
1200		let (old_view, protocol_version) =
1201			if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1202				(Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1203			} else {
1204				// This shouldn't happen, but if it does we assume protocol version 3.
1205				gum::warn!(
1206					target: LOG_TARGET,
1207					?peer_id,
1208					?view,
1209					"Peer view change for missing `peer_entry`"
1210				);
1211
1212				(None, ValidationVersion::V3.into())
1213			};
1214
1215		let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1216
1217		// we want to prune every block known_by peer up to (including) view.finalized_number
1218		let blocks = &mut self.blocks;
1219		// the `BTreeMap::range` is constrained by stored keys
1220		// so the loop won't take ages if the new finalized_number skyrockets
1221		// but we need to make sure the range is not empty, otherwise it will panic
1222		// it shouldn't be, we make sure of this in the network bridge
1223		let range = old_finalized_number..=finalized_number;
1224		if !range.is_empty() && !blocks.is_empty() {
1225			self.blocks_by_number
1226				.range(range)
1227				.flat_map(|(_number, hashes)| hashes)
1228				.for_each(|hash| {
1229					if let Some(entry) = blocks.get_mut(hash) {
1230						entry.known_by.remove(&peer_id);
1231					}
1232				});
1233		}
1234
1235		Self::unify_with_peer(
1236			network_sender,
1237			metrics,
1238			&mut self.blocks,
1239			&self.topologies,
1240			self.peer_views.len(),
1241			peer_id,
1242			protocol_version,
1243			view,
1244			rng,
1245			false,
1246		)
1247		.await;
1248	}
1249
1250	async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1251		&mut self,
1252		network_sender: &mut N,
1253		metrics: &Metrics,
1254		finalized_number: BlockNumber,
1255	) {
1256		// we want to prune every block up to (including) finalized_number
1257		// why +1 here?
1258		// split_off returns everything after the given key, including the key
1259		let split_point = finalized_number.saturating_add(1);
1260		let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1261
1262		// after split_off old_blocks actually contains new blocks, we need to swap
1263		std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1264
1265		// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
1266		old_blocks.values().flatten().for_each(|relay_block| {
1267			self.recent_outdated_blocks.note_outdated(*relay_block);
1268			if let Some(block_entry) = self.blocks.remove(relay_block) {
1269				self.topologies.dec_session_refs(block_entry.session);
1270			}
1271		});
1272
1273		// If a block was finalized, this means we may need to move our aggression
1274		// forward to the now oldest block(s).
1275		self.enable_aggression(network_sender, Resend::No, metrics).await;
1276	}
1277
1278	// When finality is lagging as a last resort nodes start sending the messages they have
1279	// multiples times. This means it is safe to accept duplicate messages without punishing the
1280	// peer and reduce the reputation and can end up banning the Peer, which in turn will create
1281	// more no-shows.
1282	fn accept_duplicates_from_validators(
1283		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1284		topologies: &SessionGridTopologies,
1285		aggression_config: &AggressionConfig,
1286		entry: &BlockEntry,
1287		peer: PeerId,
1288	) -> bool {
1289		let topology = topologies.get_topology(entry.session);
1290		let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1291		let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1292
1293		// Return if we don't have at least 1 block.
1294		let (min_age, max_age) = match (min_age, max_age) {
1295			(Some(min), Some(max)) => (*min, *max),
1296			_ => return false,
1297		};
1298
1299		let age = max_age.saturating_sub(min_age);
1300
1301		aggression_config.should_trigger_aggression(age) &&
1302			topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1303	}
1304
1305	async fn import_and_circulate_assignment<A, N, RA, R>(
1306		&mut self,
1307		approval_voting_sender: &mut A,
1308		network_sender: &mut N,
1309		runtime_api_sender: &mut RA,
1310		metrics: &Metrics,
1311		source: MessageSource,
1312		assignment: IndirectAssignmentCertV2,
1313		claimed_candidate_indices: CandidateBitfield,
1314		rng: &mut R,
1315		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1316		clock: &(impl Clock + ?Sized),
1317		session_info_provider: &mut RuntimeInfo,
1318	) where
1319		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1320		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1321		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1322		R: CryptoRng + Rng,
1323	{
1324		let block_hash = assignment.block_hash;
1325		let validator_index = assignment.validator;
1326
1327		let entry = match self.blocks.get_mut(&block_hash) {
1328			Some(entry) => entry,
1329			None => {
1330				if let Some(peer_id) = source.peer_id() {
1331					gum::trace!(
1332						target: LOG_TARGET,
1333						?peer_id,
1334						hash = ?block_hash,
1335						?validator_index,
1336						"Unexpected assignment",
1337					);
1338					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1339						modify_reputation(
1340							&mut self.reputation,
1341							network_sender,
1342							peer_id,
1343							COST_UNEXPECTED_MESSAGE,
1344						)
1345						.await;
1346						gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1347						metrics.on_assignment_recent_outdated();
1348					}
1349				}
1350				metrics.on_assignment_invalid_block();
1351				return
1352			},
1353		};
1354
1355		// Compute metadata on the assignment.
1356		let (message_subject, message_kind) = (
1357			MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1358			MessageKind::Assignment,
1359		);
1360
1361		if let Some(peer_id) = source.peer_id() {
1362			// check if our knowledge of the peer already contains this assignment
1363			match entry.known_by.entry(peer_id) {
1364				hash_map::Entry::Occupied(mut peer_knowledge) => {
1365					let peer_knowledge = peer_knowledge.get_mut();
1366					if peer_knowledge.contains(&message_subject, message_kind) {
1367						// wasn't included before
1368						if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1369							if !Self::accept_duplicates_from_validators(
1370								&self.blocks_by_number,
1371								&self.topologies,
1372								&self.aggression_config,
1373								entry,
1374								peer_id,
1375							) {
1376								gum::debug!(
1377									target: LOG_TARGET,
1378									?peer_id,
1379									?message_subject,
1380									"Duplicate assignment",
1381								);
1382
1383								modify_reputation(
1384									&mut self.reputation,
1385									network_sender,
1386									peer_id,
1387									COST_DUPLICATE_MESSAGE,
1388								)
1389								.await;
1390							}
1391
1392							metrics.on_assignment_duplicate();
1393						} else {
1394							gum::trace!(
1395								target: LOG_TARGET,
1396								?peer_id,
1397								hash = ?block_hash,
1398								?validator_index,
1399								?message_subject,
1400								"We sent the message to the peer while peer was sending it to us. Known race condition.",
1401							);
1402						}
1403						return
1404					}
1405				},
1406				hash_map::Entry::Vacant(_) => {
1407					gum::debug!(
1408						target: LOG_TARGET,
1409						?peer_id,
1410						?message_subject,
1411						"Assignment from a peer is out of view",
1412					);
1413					modify_reputation(
1414						&mut self.reputation,
1415						network_sender,
1416						peer_id,
1417						COST_UNEXPECTED_MESSAGE,
1418					)
1419					.await;
1420					metrics.on_assignment_out_of_view();
1421				},
1422			}
1423
1424			// if the assignment is known to be valid, reward the peer
1425			if entry.knowledge.contains(&message_subject, message_kind) {
1426				modify_reputation(
1427					&mut self.reputation,
1428					network_sender,
1429					peer_id,
1430					BENEFIT_VALID_MESSAGE,
1431				)
1432				.await;
1433				if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1434					gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1435					peer_knowledge.received.insert(message_subject, message_kind);
1436				}
1437				metrics.on_assignment_good_known();
1438				return
1439			}
1440
1441			let result = Self::check_assignment_valid(
1442				assignment_criteria,
1443				&entry,
1444				&assignment,
1445				&claimed_candidate_indices,
1446				session_info_provider,
1447				runtime_api_sender,
1448			)
1449			.await;
1450
1451			match result {
1452				Ok(checked_assignment) => {
1453					let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1454					let too_far_in_future =
1455						current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1456
1457					if checked_assignment.tranche() >= too_far_in_future {
1458						gum::debug!(
1459							target: LOG_TARGET,
1460							hash = ?block_hash,
1461							?peer_id,
1462							"Got an assignment too far in the future",
1463						);
1464						modify_reputation(
1465							&mut self.reputation,
1466							network_sender,
1467							peer_id,
1468							COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1469						)
1470						.await;
1471						metrics.on_assignment_far();
1472
1473						return
1474					}
1475
1476					approval_voting_sender
1477						.send_message(ApprovalVotingMessage::ImportAssignment(
1478							checked_assignment,
1479							None,
1480						))
1481						.await;
1482					modify_reputation(
1483						&mut self.reputation,
1484						network_sender,
1485						peer_id,
1486						BENEFIT_VALID_MESSAGE_FIRST,
1487					)
1488					.await;
1489					entry.knowledge.insert(message_subject.clone(), message_kind);
1490					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1491						peer_knowledge.received.insert(message_subject.clone(), message_kind);
1492					}
1493				},
1494				Err(error) => {
1495					gum::info!(
1496						target: LOG_TARGET,
1497						hash = ?block_hash,
1498						?peer_id,
1499						?error,
1500						"Got a bad assignment from peer",
1501					);
1502					modify_reputation(
1503						&mut self.reputation,
1504						network_sender,
1505						peer_id,
1506						COST_INVALID_MESSAGE,
1507					)
1508					.await;
1509					metrics.on_assignment_bad();
1510					return
1511				},
1512			}
1513		} else {
1514			if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1515				// if we already imported an assignment, there is no need to distribute it again
1516				gum::warn!(
1517					target: LOG_TARGET,
1518					?message_subject,
1519					"Importing locally an already known assignment",
1520				);
1521				return
1522			} else {
1523				gum::debug!(
1524					target: LOG_TARGET,
1525					?message_subject,
1526					"Importing locally a new assignment",
1527				);
1528			}
1529		}
1530
1531		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1532		// assignment.
1533		metrics.on_assignment_imported(&assignment.cert.kind);
1534
1535		let topology = self.topologies.get_topology(entry.session);
1536		let local = source == MessageSource::Local;
1537
1538		let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1539			t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1540		});
1541		// Peers that we will send the assignment to.
1542		let mut peers = HashSet::new();
1543
1544		let peers_to_route_to = topology
1545			.as_ref()
1546			.map(|t| t.peers_to_route(required_routing))
1547			.unwrap_or_default();
1548
1549		for peer in peers_to_route_to {
1550			if !entry.known_by.contains_key(&peer) {
1551				continue
1552			}
1553
1554			peers.insert(peer);
1555		}
1556
1557		// All the peers that know the relay chain block.
1558		let peers_to_filter = entry.known_by();
1559
1560		let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1561			assignment.clone(),
1562			claimed_candidate_indices.clone(),
1563			ApprovalRouting {
1564				required_routing,
1565				local,
1566				random_routing: Default::default(),
1567				peers_randomly_routed: Default::default(),
1568			},
1569		));
1570
1571		// Dispatch the message to all peers in the routing set which
1572		// know the block.
1573		//
1574		// If the topology isn't known yet (race with networking subsystems)
1575		// then messages will be sent when we get it.
1576
1577		let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1578		let n_peers_total = self.peer_views.len();
1579		let source_peer = source.peer_id();
1580
1581		// Filter destination peers
1582		for peer in peers_to_filter.into_iter() {
1583			if Some(peer) == source_peer {
1584				continue
1585			}
1586
1587			if peers.contains(&peer) {
1588				continue
1589			}
1590
1591			if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1592				continue
1593			}
1594
1595			// Note: at this point, we haven't received the message from any peers
1596			// other than the source peer, and we just got it, so we haven't sent it
1597			// to any peers either.
1598			let route_random =
1599				approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1600
1601			if route_random {
1602				approval_entry.routing_info_mut().mark_randomly_sent(peer);
1603				peers.insert(peer);
1604			}
1605
1606			if approval_entry.routing_info().random_routing.is_complete() {
1607				break
1608			}
1609		}
1610
1611		// Add the metadata of the assignment to the knowledge of each peer.
1612		for peer in peers.iter() {
1613			// we already filtered peers above, so this should always be Some
1614			if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1615				peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1616			}
1617		}
1618
1619		if !peers.is_empty() {
1620			gum::trace!(
1621				target: LOG_TARGET,
1622				?block_hash,
1623				?claimed_candidate_indices,
1624				local = source.peer_id().is_none(),
1625				num_peers = peers.len(),
1626				"Sending an assignment to peers",
1627			);
1628
1629			let peers = peers
1630				.iter()
1631				.filter_map(|peer_id| {
1632					self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1633				})
1634				.collect::<Vec<_>>();
1635
1636			send_assignments_batched(network_sender, assignments, &peers).await;
1637		}
1638	}
1639
1640	async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1641		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1642		entry: &BlockEntry,
1643		assignment: &IndirectAssignmentCertV2,
1644		claimed_candidate_indices: &CandidateBitfield,
1645		runtime_info: &mut RuntimeInfo,
1646		runtime_api_sender: &mut RA,
1647	) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1648		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1649			.get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1650			.await
1651			.map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1652
1653		if claimed_candidate_indices.len() > session_info.n_cores as usize {
1654			return Err(InvalidAssignmentError::OversizedClaimedBitfield)
1655		}
1656
1657		let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1658			.iter_ones()
1659			.map(|candidate_index| {
1660				entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1661					InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1662						claimed_index: candidate_index,
1663						max_index: entry.candidates_metadata.len(),
1664					},
1665				)
1666			})
1667			.collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1668
1669		let Ok(claimed_cores) = claimed_cores.try_into() else {
1670			return Err(InvalidAssignmentError::NoClaimedCandidates)
1671		};
1672
1673		let backing_groups = claimed_candidate_indices
1674			.iter_ones()
1675			.flat_map(|candidate_index| {
1676				entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1677			})
1678			.collect::<Vec<_>>();
1679
1680		assignment_criteria
1681			.check_assignment_cert(
1682				claimed_cores,
1683				assignment.validator,
1684				&polkadot_node_primitives::approval::criteria::Config::from(session_info),
1685				entry.vrf_story.clone(),
1686				&assignment.cert,
1687				backing_groups,
1688			)
1689			.map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1690			.map(|tranche| {
1691				CheckedIndirectAssignment::from_checked(
1692					assignment.clone(),
1693					claimed_candidate_indices.clone(),
1694					tranche,
1695				)
1696			})
1697	}
1698	// Checks if an approval can be processed.
1699	// Returns true if we can continue with processing the approval and false otherwise.
1700	async fn check_approval_can_be_processed<
1701		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1702	>(
1703		network_sender: &mut N,
1704		assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1705		approval_knowledge_key: &(MessageSubject, MessageKind),
1706		entry: &mut BlockEntry,
1707		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1708		topologies: &SessionGridTopologies,
1709		aggression_config: &AggressionConfig,
1710		reputation: &mut ReputationAggregator,
1711		peer_id: PeerId,
1712		metrics: &Metrics,
1713	) -> bool {
1714		for message_subject in assignments_knowledge_key {
1715			if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1716				gum::trace!(
1717					target: LOG_TARGET,
1718					?peer_id,
1719					?message_subject,
1720					"Unknown approval assignment",
1721				);
1722				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1723					.await;
1724				metrics.on_approval_unknown_assignment();
1725				return false
1726			}
1727		}
1728
1729		// check if our knowledge of the peer already contains this approval
1730		match entry.known_by.entry(peer_id) {
1731			hash_map::Entry::Occupied(mut knowledge) => {
1732				let peer_knowledge = knowledge.get_mut();
1733				if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1734					if !peer_knowledge
1735						.received
1736						.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1737					{
1738						if !Self::accept_duplicates_from_validators(
1739							blocks_by_number,
1740							topologies,
1741							aggression_config,
1742							entry,
1743							peer_id,
1744						) {
1745							gum::trace!(
1746								target: LOG_TARGET,
1747								?peer_id,
1748								?approval_knowledge_key,
1749								"Duplicate approval",
1750							);
1751							modify_reputation(
1752								reputation,
1753								network_sender,
1754								peer_id,
1755								COST_DUPLICATE_MESSAGE,
1756							)
1757							.await;
1758						}
1759						metrics.on_approval_duplicate();
1760					}
1761					return false
1762				}
1763			},
1764			hash_map::Entry::Vacant(_) => {
1765				gum::debug!(
1766					target: LOG_TARGET,
1767					?peer_id,
1768					?approval_knowledge_key,
1769					"Approval from a peer is out of view",
1770				);
1771				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1772					.await;
1773				metrics.on_approval_out_of_view();
1774			},
1775		}
1776
1777		if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1778			if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1779				peer_knowledge
1780					.received
1781					.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1782			}
1783
1784			// We already processed this approval no need to continue.
1785			gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1786			metrics.on_approval_good_known();
1787			modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1788			false
1789		} else {
1790			true
1791		}
1792	}
1793
1794	async fn import_and_circulate_approval<
1795		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1796		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1797		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1798	>(
1799		&mut self,
1800		approval_voting_sender: &mut A,
1801		network_sender: &mut N,
1802		runtime_api_sender: &mut RA,
1803		metrics: &Metrics,
1804		source: MessageSource,
1805		vote: IndirectSignedApprovalVoteV2,
1806		session_info_provider: &mut RuntimeInfo,
1807	) {
1808		let block_hash = vote.block_hash;
1809		let validator_index = vote.validator;
1810		let candidate_indices = &vote.candidate_indices;
1811		let entry = match self.blocks.get_mut(&block_hash) {
1812			Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1813			_ => {
1814				if let Some(peer_id) = source.peer_id() {
1815					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1816						gum::debug!(
1817							target: LOG_TARGET,
1818							?peer_id,
1819							?block_hash,
1820							?validator_index,
1821							?candidate_indices,
1822							"Approval from a peer is out of view",
1823						);
1824						modify_reputation(
1825							&mut self.reputation,
1826							network_sender,
1827							peer_id,
1828							COST_UNEXPECTED_MESSAGE,
1829						)
1830						.await;
1831						metrics.on_approval_invalid_block();
1832					} else {
1833						metrics.on_approval_recent_outdated();
1834					}
1835				}
1836				return
1837			},
1838		};
1839
1840		// compute metadata on the assignment.
1841		let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1842		let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1843
1844		if let Some(peer_id) = source.peer_id() {
1845			if !Self::check_approval_can_be_processed(
1846				network_sender,
1847				&assignments_knowledge_keys,
1848				&approval_knwowledge_key,
1849				entry,
1850				&self.blocks_by_number,
1851				&self.topologies,
1852				&self.aggression_config,
1853				&mut self.reputation,
1854				peer_id,
1855				metrics,
1856			)
1857			.await
1858			{
1859				return
1860			}
1861
1862			let result =
1863				Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1864					.await;
1865
1866			match result {
1867				Ok(vote) => {
1868					approval_voting_sender
1869						.send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1870						.await;
1871
1872					modify_reputation(
1873						&mut self.reputation,
1874						network_sender,
1875						peer_id,
1876						BENEFIT_VALID_MESSAGE_FIRST,
1877					)
1878					.await;
1879
1880					entry
1881						.knowledge
1882						.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1883					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1884						peer_knowledge
1885							.received
1886							.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1887					}
1888				},
1889				Err(err) => {
1890					modify_reputation(
1891						&mut self.reputation,
1892						network_sender,
1893						peer_id,
1894						COST_INVALID_MESSAGE,
1895					)
1896					.await;
1897
1898					gum::info!(
1899						target: LOG_TARGET,
1900						?peer_id,
1901						?err,
1902						"Got a bad approval from peer",
1903					);
1904					metrics.on_approval_bad();
1905					return
1906				},
1907			}
1908		} else {
1909			if !entry
1910				.knowledge
1911				.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1912			{
1913				// if we already imported all approvals, there is no need to distribute it again
1914				gum::warn!(
1915					target: LOG_TARGET,
1916					"Importing locally an already known approval",
1917				);
1918				return
1919			} else {
1920				gum::debug!(
1921					target: LOG_TARGET,
1922					"Importing locally a new approval",
1923				);
1924			}
1925		}
1926
1927		let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1928			Ok(required_routing) => required_routing,
1929			Err(err) => {
1930				gum::warn!(
1931					target: LOG_TARGET,
1932					hash = ?block_hash,
1933					validator_index = ?vote.validator,
1934					candidate_bitfield = ?vote.candidate_indices,
1935					?err,
1936					"Possible bug: Vote import failed",
1937				);
1938				metrics.on_approval_bug();
1939				return
1940			},
1941		};
1942
1943		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1944		// approval.
1945		metrics.on_approval_imported();
1946
1947		// Dispatch a ApprovalDistributionV3Message::Approval(vote)
1948		// to all peers required by the topology, with the exception of the source peer.
1949		let topology = self.topologies.get_topology(entry.session);
1950		let source_peer = source.peer_id();
1951
1952		let peer_filter = move |peer| {
1953			if Some(peer) == source_peer.as_ref() {
1954				return false
1955			}
1956
1957			// Here we're leaning on a few behaviors of assignment propagation:
1958			//   1. At this point, the only peer we're aware of which has the approval message is
1959			//      the source peer.
1960			//   2. We have sent the assignment message to every peer in the required routing which
1961			//      is aware of this block _unless_ the peer we originally received the assignment
1962			//      from was part of the required routing. In that case, we've sent the assignment
1963			//      to all aware peers in the required routing _except_ the original source of the
1964			//      assignment. Hence the `in_topology_check`.
1965			//   3. Any randomly selected peers have been sent the assignment already.
1966			let in_topology = topology
1967				.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
1968			in_topology || peers_randomly_routed_to.contains(peer)
1969		};
1970
1971		let peers = entry
1972			.known_by
1973			.iter()
1974			.filter(|(p, _)| peer_filter(p))
1975			.filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
1976			.collect::<Vec<_>>();
1977
1978		// Add the metadata of the assignment to the knowledge of each peer.
1979		for peer in peers.iter() {
1980			// we already filtered peers above, so this should always be Some
1981			if let Some(entry) = entry.known_by.get_mut(&peer.0) {
1982				entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1983			}
1984		}
1985
1986		if !peers.is_empty() {
1987			let approvals = vec![vote];
1988			gum::trace!(
1989				target: LOG_TARGET,
1990				?block_hash,
1991				local = source.peer_id().is_none(),
1992				num_peers = peers.len(),
1993				"Sending an approval to peers",
1994			);
1995			send_approvals_batched(network_sender, approvals, &peers).await;
1996		}
1997	}
1998
1999	// Checks if the approval vote is valid.
2000	async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2001		vote: &IndirectSignedApprovalVoteV2,
2002		entry: &BlockEntry,
2003		runtime_info: &mut RuntimeInfo,
2004		runtime_api_sender: &mut RA,
2005	) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2006		if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2007			return Err(InvalidVoteError::CandidateIndexOutOfBounds)
2008		}
2009
2010		let candidate_hashes = vote
2011			.candidate_indices
2012			.iter_ones()
2013			.flat_map(|candidate_index| {
2014				entry
2015					.candidates_metadata
2016					.get(candidate_index)
2017					.map(|(candidate_hash, _, _)| *candidate_hash)
2018			})
2019			.collect::<Vec<_>>();
2020
2021		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2022			.get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2023			.await
2024			.map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2025
2026		let pubkey = session_info
2027			.validators
2028			.get(vote.validator)
2029			.ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2030		DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2031			candidate_hashes.clone(),
2032		))
2033		.check_signature(
2034			&pubkey,
2035			*candidate_hashes.first().unwrap(),
2036			entry.session,
2037			&vote.signature,
2038		)
2039		.map_err(|_| InvalidVoteError::InvalidSignature)
2040		.map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2041	}
2042
2043	/// Retrieve approval signatures from state for the given relay block/indices:
2044	fn get_approval_signatures(
2045		&mut self,
2046		indices: HashSet<(Hash, CandidateIndex)>,
2047	) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2048		let mut all_sigs = HashMap::new();
2049		for (hash, index) in indices {
2050			let block_entry = match self.blocks.get(&hash) {
2051				None => {
2052					gum::debug!(
2053						target: LOG_TARGET,
2054						?hash,
2055						"`get_approval_signatures`: could not find block entry for given hash!"
2056					);
2057					continue
2058				},
2059				Some(e) => e,
2060			};
2061
2062			let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2063				(
2064					approval.validator,
2065					(
2066						hash,
2067						approval
2068							.candidate_indices
2069							.iter_ones()
2070							.map(|val| val as CandidateIndex)
2071							.collect_vec(),
2072						approval.signature,
2073					),
2074				)
2075			});
2076			all_sigs.extend(sigs);
2077		}
2078		all_sigs
2079	}
2080
2081	async fn unify_with_peer(
2082		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2083		metrics: &Metrics,
2084		entries: &mut HashMap<Hash, BlockEntry>,
2085		topologies: &SessionGridTopologies,
2086		total_peers: usize,
2087		peer_id: PeerId,
2088		protocol_version: ProtocolVersion,
2089		view: View,
2090		rng: &mut (impl CryptoRng + Rng),
2091		retry_known_blocks: bool,
2092	) {
2093		metrics.on_unify_with_peer();
2094		let _timer = metrics.time_unify_with_peer();
2095
2096		let mut assignments_to_send = Vec::new();
2097		let mut approvals_to_send = Vec::new();
2098
2099		let view_finalized_number = view.finalized_number;
2100		for head in view.into_iter() {
2101			let mut block = head;
2102
2103			// Walk the chain back to last finalized block of the peer view.
2104			loop {
2105				let entry = match entries.get_mut(&block) {
2106					Some(entry) if entry.number > view_finalized_number => entry,
2107					_ => break,
2108				};
2109
2110				// Any peer which is in the `known_by` see and we know its peer_id authority id
2111				// mapping has already been sent all messages it's meant to get for that block and
2112				// all in-scope prior blocks. In case, we just learnt about its peer_id
2113				// authority-id mapping we have to retry sending the messages that should be sent
2114				// to it for all un-finalized blocks.
2115				if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2116					break
2117				}
2118
2119				let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2120				let topology = topologies.get_topology(entry.session);
2121
2122				// We want to iterate the `approval_entries` of the block entry as these contain
2123				// all assignments that also link all approval votes.
2124				for approval_entry in entry.approval_entries.values_mut() {
2125					// Propagate the message to all peers in the required routing set OR
2126					// randomly sample peers.
2127					{
2128						let required_routing = approval_entry.routing_info().required_routing;
2129						let routing_info = &mut approval_entry.routing_info_mut();
2130						let rng = &mut *rng;
2131						let mut peer_filter = move |peer_id| {
2132							let in_topology = topology.as_ref().map_or(false, |t| {
2133								t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2134							});
2135							in_topology || {
2136								if !topology
2137									.map(|topology| topology.is_validator(peer_id))
2138									.unwrap_or(false)
2139								{
2140									return false
2141								}
2142
2143								let route_random =
2144									routing_info.random_routing.sample(total_peers, rng);
2145								if route_random {
2146									routing_info.mark_randomly_sent(*peer_id);
2147								}
2148
2149								route_random
2150							}
2151						};
2152
2153						if !peer_filter(&peer_id) {
2154							continue
2155						}
2156					}
2157
2158					let assignment_message = approval_entry.assignment();
2159					let approval_messages = approval_entry.approvals();
2160					let (assignment_knowledge, message_kind) =
2161						approval_entry.create_assignment_knowledge(block);
2162
2163					// Only send stuff a peer doesn't know in the context of a relay chain
2164					// block.
2165					if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2166						peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2167						assignments_to_send.push(assignment_message);
2168					}
2169
2170					// Filter approval votes.
2171					for approval_message in approval_messages {
2172						let approval_knowledge =
2173							PeerKnowledge::generate_approval_key(&approval_message);
2174
2175						if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2176							approvals_to_send.push(approval_message);
2177							peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2178						}
2179					}
2180				}
2181
2182				block = entry.parent_hash;
2183			}
2184		}
2185
2186		if !assignments_to_send.is_empty() {
2187			gum::trace!(
2188				target: LOG_TARGET,
2189				?peer_id,
2190				?protocol_version,
2191				num = assignments_to_send.len(),
2192				"Sending assignments to unified peer",
2193			);
2194
2195			send_assignments_batched(
2196				sender,
2197				assignments_to_send,
2198				&vec![(peer_id, protocol_version)],
2199			)
2200			.await;
2201		}
2202
2203		if !approvals_to_send.is_empty() {
2204			gum::trace!(
2205				target: LOG_TARGET,
2206				?peer_id,
2207				?protocol_version,
2208				num = approvals_to_send.len(),
2209				"Sending approvals to unified peer",
2210			);
2211
2212			send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2213				.await;
2214		}
2215	}
2216
2217	// It is very important that aggression starts with oldest unfinalized block, rather than oldest
2218	// unapproved block. Using the gossip approach to distribute potentially
2219	// missing votes to validators requires that we always trigger on finality lag, even if
2220	// we have have the approval lag value. The reason for this, is to avoid finality stall
2221	// when more than 1/3 nodes go offline for a period o time. When they come back
2222	// there wouldn't get any of the approvals since the on-line nodes would never trigger
2223	// aggression as they have approved all the candidates and don't detect any approval lag.
2224	//
2225	// In order to switch to using approval lag as a trigger we need a request/response protocol
2226	// to fetch votes from validators rather than use gossip.
2227	async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2228		&mut self,
2229		network_sender: &mut N,
2230		resend: Resend,
2231		metrics: &Metrics,
2232	) {
2233		let config = self.aggression_config.clone();
2234		let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2235		let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2236
2237		// Return if we don't have at least 1 block.
2238		let (min_age, max_age) = match (min_age, max_age) {
2239			(Some(min), Some(max)) => (*min, *max),
2240			_ => return, // empty.
2241		};
2242
2243		let age = max_age.saturating_sub(min_age);
2244
2245		// Trigger on approval checking lag.
2246		if !self.aggression_config.should_trigger_aggression(age) {
2247			gum::trace!(
2248				target: LOG_TARGET,
2249				approval_checking_lag = self.approval_checking_lag,
2250				age,
2251				"Aggression not enabled",
2252			);
2253			return
2254		}
2255		gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2256
2257		adjust_required_routing_and_propagate(
2258			network_sender,
2259			&mut self.blocks,
2260			&self.topologies,
2261			|block_entry| {
2262				let block_age = max_age - block_entry.number;
2263				// We want to resend only for blocks of min_age, there is no point in
2264				// resending for blocks newer than that, because we are just going to create load
2265				// and not gain anything.
2266				let diff_from_min_age = block_entry.number - min_age;
2267
2268				// We want to back-off on resending for blocks that have been resent recently, to
2269				// give time for nodes to process all the extra messages, if we still have not
2270				// finalized we are going to resend again after unfinalized_period * 2 since the
2271				// last resend.
2272				let blocks_since_last_sent = block_entry
2273					.last_resent_at_block_number
2274					.map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2275
2276				let can_resend_at_this_age = blocks_since_last_sent
2277					.zip(config.resend_unfinalized_period)
2278					.map(|(blocks_since_last_sent, unfinalized_period)| {
2279						blocks_since_last_sent >= unfinalized_period * 2
2280					})
2281					.unwrap_or(true);
2282
2283				if resend == Resend::Yes &&
2284					config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2285						block_age > 0 &&
2286							block_age % p == 0 && diff_from_min_age == 0 &&
2287							can_resend_at_this_age
2288					}) {
2289					// Retry sending to all peers.
2290					for (_, knowledge) in block_entry.known_by.iter_mut() {
2291						knowledge.sent = Knowledge::default();
2292					}
2293					block_entry.last_resent_at_block_number = Some(max_age);
2294					gum::debug!(
2295						target: LOG_TARGET,
2296						block_number = ?block_entry.number,
2297						?max_age,
2298						"Aggression enabled with resend for block",
2299					);
2300					true
2301				} else {
2302					false
2303				}
2304			},
2305			|required_routing, _, _| *required_routing,
2306			&self.peer_views,
2307		)
2308		.await;
2309
2310		adjust_required_routing_and_propagate(
2311			network_sender,
2312			&mut self.blocks,
2313			&self.topologies,
2314			|block_entry| {
2315				// Ramp up aggression only for the very oldest block(s).
2316				// Approval voting can get stuck on a single block preventing
2317				// its descendants from being finalized. Waste minimal bandwidth
2318				// this way. Also, disputes might prevent finality - again, nothing
2319				// to waste bandwidth on newer blocks for.
2320				block_entry.number == min_age
2321			},
2322			|required_routing, local, _| {
2323				// It's a bit surprising not to have a topology at this age.
2324				if *required_routing == RequiredRouting::PendingTopology {
2325					gum::debug!(
2326						target: LOG_TARGET,
2327						lag = ?self.approval_checking_lag,
2328						"Encountered old block pending gossip topology",
2329					);
2330					return *required_routing
2331				}
2332
2333				let mut new_required_routing = *required_routing;
2334
2335				if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2336					// Message originator sends to everyone.
2337					if local && new_required_routing != RequiredRouting::All {
2338						metrics.on_aggression_l1();
2339						new_required_routing = RequiredRouting::All;
2340					}
2341				}
2342
2343				if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2344					// Message originator sends to everyone. Everyone else sends to XY.
2345					if !local && new_required_routing != RequiredRouting::GridXY {
2346						metrics.on_aggression_l2();
2347						new_required_routing = RequiredRouting::GridXY;
2348					}
2349				}
2350				new_required_routing
2351			},
2352			&self.peer_views,
2353		)
2354		.await;
2355	}
2356
2357	// Filter out oversized candidate and certificate core bitfields.
2358	// For each invalid assignment we also punish the peer.
2359	async fn sanitize_v2_assignments(
2360		&mut self,
2361		peer_id: PeerId,
2362		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2363		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2364	) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2365		let mut sanitized_assignments = Vec::new();
2366		for (cert, candidate_bitfield) in assignments.into_iter() {
2367			let cert_bitfield_bits = match &cert.cert.kind {
2368				AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2369				// We don't want to run the VRF yet, but the output is always bounded by `n_cores`.
2370				// We assume `candidate_bitfield` length for the core bitfield and we just check
2371				// against `MAX_BITFIELD_SIZE` later.
2372				AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2373				AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
2374					core_bitfield.len(),
2375			};
2376
2377			let candidate_bitfield_bits = candidate_bitfield.len();
2378
2379			// Our bitfield has `Lsb0`.
2380			let msb = candidate_bitfield_bits - 1;
2381
2382			// Ensure bitfields length under hard limit.
2383			if cert_bitfield_bits > MAX_BITFIELD_SIZE
2384				|| candidate_bitfield_bits > MAX_BITFIELD_SIZE
2385				// Ensure minimum bitfield size - MSB needs to be one.
2386				|| !candidate_bitfield.bit_at(msb.as_bit_index())
2387			{
2388				// Punish the peer for the invalid message.
2389				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2390					.await;
2391				for candidate_index in candidate_bitfield.iter_ones() {
2392					gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2393				}
2394			} else {
2395				sanitized_assignments.push((cert, candidate_bitfield))
2396			}
2397		}
2398
2399		sanitized_assignments
2400	}
2401
2402	// Filter out obviously invalid candidate indices.
2403	async fn sanitize_v2_approvals(
2404		&mut self,
2405		peer_id: PeerId,
2406		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2407		approval: Vec<IndirectSignedApprovalVoteV2>,
2408	) -> Vec<IndirectSignedApprovalVoteV2> {
2409		let mut sanitized_approvals = Vec::new();
2410		for approval in approval.into_iter() {
2411			if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE {
2412				// Punish the peer for the invalid message.
2413				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2414					.await;
2415				gum::debug!(
2416					target: LOG_TARGET,
2417					block_hash = ?approval.block_hash,
2418					candidate_indices_len = ?approval.candidate_indices.len(),
2419					"Bad approval v2, invalid candidate indices size"
2420				);
2421			} else {
2422				sanitized_approvals.push(approval)
2423			}
2424		}
2425
2426		sanitized_approvals
2427	}
2428}
2429
2430// This adjusts the required routing of messages in blocks that pass the block filter
2431// according to the modifier function given.
2432//
2433// The modifier accepts as inputs the current required-routing state, whether
2434// the message is locally originating, and the validator index of the message issuer.
2435//
2436// Then, if the topology is known, this propagates messages to all peers in the required
2437// routing set which are aware of the block. Peers which are unaware of the block
2438// will have the message sent when it enters their view in `unify_with_peer`.
2439//
2440// Note that the required routing of a message can be modified even if the
2441// topology is unknown yet.
2442#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2443async fn adjust_required_routing_and_propagate<
2444	N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2445	BlockFilter,
2446	RoutingModifier,
2447>(
2448	network_sender: &mut N,
2449	blocks: &mut HashMap<Hash, BlockEntry>,
2450	topologies: &SessionGridTopologies,
2451	block_filter: BlockFilter,
2452	routing_modifier: RoutingModifier,
2453	peer_views: &HashMap<PeerId, PeerEntry>,
2454) where
2455	BlockFilter: Fn(&mut BlockEntry) -> bool,
2456	RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2457{
2458	let mut peer_assignments = HashMap::new();
2459	let mut peer_approvals = HashMap::new();
2460
2461	// Iterate all blocks in the session, producing payloads
2462	// for each connected peer.
2463	for (block_hash, block_entry) in blocks {
2464		if !block_filter(block_entry) {
2465			continue
2466		}
2467
2468		let topology = match topologies.get_topology(block_entry.session) {
2469			Some(t) => t,
2470			None => continue,
2471		};
2472
2473		// We just need to iterate the `approval_entries` of the block entry as these contain all
2474		// assignments that also link all approval votes.
2475		for approval_entry in block_entry.approval_entries.values_mut() {
2476			let new_required_routing = routing_modifier(
2477				&approval_entry.routing_info().required_routing,
2478				approval_entry.routing_info().local,
2479				&approval_entry.validator_index(),
2480			);
2481
2482			approval_entry.update_required_routing(new_required_routing);
2483
2484			if approval_entry.routing_info().required_routing.is_empty() {
2485				continue
2486			}
2487
2488			let assignment_message = approval_entry.assignment();
2489			let approval_messages = approval_entry.approvals();
2490			let (assignment_knowledge, message_kind) =
2491				approval_entry.create_assignment_knowledge(*block_hash);
2492
2493			for (peer, peer_knowledge) in &mut block_entry.known_by {
2494				if !topology
2495					.local_grid_neighbors()
2496					.route_to_peer(approval_entry.routing_info().required_routing, peer)
2497				{
2498					continue
2499				}
2500
2501				// Only send stuff a peer doesn't know in the context of a relay chain block.
2502				if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2503					peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2504					peer_assignments
2505						.entry(*peer)
2506						.or_insert_with(Vec::new)
2507						.push(assignment_message.clone());
2508				}
2509
2510				// Filter approval votes.
2511				for approval_message in &approval_messages {
2512					let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2513
2514					if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2515						peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2516						peer_approvals
2517							.entry(*peer)
2518							.or_insert_with(Vec::new)
2519							.push(approval_message.clone());
2520					}
2521				}
2522			}
2523		}
2524	}
2525
2526	// Send messages in accumulated packets, assignments preceding approvals.
2527	for (peer, assignments_packet) in peer_assignments {
2528		if let Some(peer_view) = peer_views.get(&peer) {
2529			send_assignments_batched(
2530				network_sender,
2531				assignments_packet,
2532				&vec![(peer, peer_view.version)],
2533			)
2534			.await;
2535		} else {
2536			// This should never happen.
2537			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2538		}
2539	}
2540
2541	for (peer, approvals_packet) in peer_approvals {
2542		if let Some(peer_view) = peer_views.get(&peer) {
2543			send_approvals_batched(
2544				network_sender,
2545				approvals_packet,
2546				&vec![(peer, peer_view.version)],
2547			)
2548			.await;
2549		} else {
2550			// This should never happen.
2551			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2552		}
2553	}
2554}
2555
2556/// Modify the reputation of a peer based on its behavior.
2557async fn modify_reputation(
2558	reputation: &mut ReputationAggregator,
2559	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2560	peer_id: PeerId,
2561	rep: Rep,
2562) {
2563	gum::trace!(
2564		target: LOG_TARGET,
2565		reputation = ?rep,
2566		?peer_id,
2567		"Reputation change for peer",
2568	);
2569	reputation.modify(sender, peer_id, rep).await;
2570}
2571
2572#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2573impl ApprovalDistribution {
2574	/// Create a new instance of the [`ApprovalDistribution`] subsystem.
2575	pub fn new(
2576		metrics: Metrics,
2577		slot_duration_millis: u64,
2578		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2579	) -> Self {
2580		Self::new_with_clock(
2581			metrics,
2582			slot_duration_millis,
2583			Arc::new(SystemClock),
2584			assignment_criteria,
2585		)
2586	}
2587
2588	/// Create a new instance of the [`ApprovalDistribution`] subsystem, with a custom clock.
2589	pub fn new_with_clock(
2590		metrics: Metrics,
2591		slot_duration_millis: u64,
2592		clock: Arc<dyn Clock + Send + Sync>,
2593		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2594	) -> Self {
2595		Self { metrics, slot_duration_millis, clock, assignment_criteria }
2596	}
2597
2598	async fn run<Context>(self, ctx: Context) {
2599		let mut state =
2600			State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2601		// According to the docs of `rand`, this is a ChaCha12 RNG in practice
2602		// and will always be chosen for strong performance and security properties.
2603		let mut rng = rand::rngs::StdRng::from_entropy();
2604		let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2605			keystore: None,
2606			session_cache_lru_size: DISPUTE_WINDOW.get(),
2607		});
2608
2609		self.run_inner(
2610			ctx,
2611			&mut state,
2612			REPUTATION_CHANGE_INTERVAL,
2613			&mut rng,
2614			&mut session_info_provider,
2615		)
2616		.await
2617	}
2618
2619	/// Used for testing.
2620	async fn run_inner<Context>(
2621		self,
2622		mut ctx: Context,
2623		state: &mut State,
2624		reputation_interval: Duration,
2625		rng: &mut (impl CryptoRng + Rng),
2626		session_info_provider: &mut RuntimeInfo,
2627	) {
2628		let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2629		let mut reputation_delay = new_reputation_delay();
2630		let mut approval_voting_sender = ctx.sender().clone();
2631		let mut network_sender = ctx.sender().clone();
2632		let mut runtime_api_sender = ctx.sender().clone();
2633
2634		loop {
2635			select! {
2636				_ = reputation_delay => {
2637					state.reputation.send(ctx.sender()).await;
2638					reputation_delay = new_reputation_delay();
2639				},
2640				message = ctx.recv().fuse() => {
2641					let message = match message {
2642						Ok(message) => message,
2643						Err(e) => {
2644							gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2645							return
2646						},
2647					};
2648
2649					if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2650						return;
2651					}
2652
2653				},
2654			}
2655		}
2656	}
2657
2658	/// Handles a from orchestra message received by approval distribution subystem.
2659	///
2660	/// Returns `true` if the subsystem should be stopped.
2661	pub async fn handle_from_orchestra<
2662		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2663		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2664		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2665	>(
2666		&self,
2667		message: FromOrchestra<ApprovalDistributionMessage>,
2668		approval_voting_sender: &mut A,
2669		network_sender: &mut N,
2670		runtime_api_sender: &mut RA,
2671		state: &mut State,
2672		rng: &mut (impl CryptoRng + Rng),
2673		session_info_provider: &mut RuntimeInfo,
2674	) -> bool {
2675		match message {
2676			FromOrchestra::Communication { msg } =>
2677				Self::handle_incoming(
2678					approval_voting_sender,
2679					network_sender,
2680					runtime_api_sender,
2681					state,
2682					msg,
2683					&self.metrics,
2684					rng,
2685					self.assignment_criteria.as_ref(),
2686					self.clock.as_ref(),
2687					session_info_provider,
2688				)
2689				.await,
2690			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2691				gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2692				// the relay chain blocks relevant to the approval subsystems
2693				// are those that are available, but not finalized yet
2694				// activated and deactivated heads hence are irrelevant to this subsystem, other
2695				// than for tracing purposes.
2696			},
2697			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2698				gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2699				state.handle_block_finalized(network_sender, &self.metrics, number).await;
2700			},
2701			FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2702		}
2703		false
2704	}
2705
2706	async fn handle_incoming<
2707		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2708		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2709		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2710	>(
2711		approval_voting_sender: &mut A,
2712		network_sender: &mut N,
2713		runtime_api_sender: &mut RA,
2714		state: &mut State,
2715		msg: ApprovalDistributionMessage,
2716		metrics: &Metrics,
2717		rng: &mut (impl CryptoRng + Rng),
2718		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2719		clock: &(impl Clock + ?Sized),
2720		session_info_provider: &mut RuntimeInfo,
2721	) {
2722		match msg {
2723			ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2724				state
2725					.handle_network_msg(
2726						approval_voting_sender,
2727						network_sender,
2728						runtime_api_sender,
2729						metrics,
2730						event,
2731						rng,
2732						assignment_criteria,
2733						clock,
2734						session_info_provider,
2735					)
2736					.await;
2737			},
2738			ApprovalDistributionMessage::NewBlocks(metas) => {
2739				state
2740					.handle_new_blocks(
2741						approval_voting_sender,
2742						network_sender,
2743						runtime_api_sender,
2744						metrics,
2745						metas,
2746						rng,
2747						assignment_criteria,
2748						clock,
2749						session_info_provider,
2750					)
2751					.await;
2752			},
2753			ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2754				gum::debug!(
2755					target: LOG_TARGET,
2756					?candidate_indices,
2757					block_hash = ?cert.block_hash,
2758					assignment_kind = ?cert.cert.kind,
2759					"Distributing our assignment on candidates",
2760				);
2761
2762				state
2763					.import_and_circulate_assignment(
2764						approval_voting_sender,
2765						network_sender,
2766						runtime_api_sender,
2767						&metrics,
2768						MessageSource::Local,
2769						cert,
2770						candidate_indices,
2771						rng,
2772						assignment_criteria,
2773						clock,
2774						session_info_provider,
2775					)
2776					.await;
2777			},
2778			ApprovalDistributionMessage::DistributeApproval(vote) => {
2779				gum::debug!(
2780					target: LOG_TARGET,
2781					"Distributing our approval vote on candidate (block={}, index={:?})",
2782					vote.block_hash,
2783					vote.candidate_indices,
2784				);
2785
2786				state
2787					.import_and_circulate_approval(
2788						approval_voting_sender,
2789						network_sender,
2790						runtime_api_sender,
2791						metrics,
2792						MessageSource::Local,
2793						vote,
2794						session_info_provider,
2795					)
2796					.await;
2797			},
2798			ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2799				let sigs = state.get_approval_signatures(indices);
2800				if let Err(_) = tx.send(sigs) {
2801					gum::debug!(
2802						target: LOG_TARGET,
2803						"Sending back approval signatures failed, oneshot got closed"
2804					);
2805				}
2806			},
2807			ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2808				gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2809				state.approval_checking_lag = lag;
2810			},
2811		}
2812	}
2813}
2814
2815#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2816impl<Context> ApprovalDistribution {
2817	fn start(self, ctx: Context) -> SpawnedSubsystem {
2818		let future = self.run(ctx).map(|_| Ok(())).boxed();
2819
2820		SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2821	}
2822}
2823
2824/// Ensures the batch size is always at least 1 element.
2825const fn ensure_size_not_zero(size: usize) -> usize {
2826	if 0 == size {
2827		panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2828	}
2829
2830	size
2831}
2832
2833/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol.
2834/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or
2835/// assignments we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate
2836/// the protocol configuration.
2837pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2838	MAX_NOTIFICATION_SIZE as usize /
2839		std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2840		3,
2841);
2842
2843/// The maximum amount of approvals per batch is 33% of maximum allowed by protocol.
2844pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2845	MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2846);
2847
2848// Low level helper for sending assignments.
2849async fn send_assignments_batched_inner(
2850	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2851	batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2852	peers: Vec<PeerId>,
2853	_peer_version: ValidationVersion,
2854) {
2855	sender
2856		.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2857			peers,
2858			ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2859				protocol_v3::ApprovalDistributionMessage::Assignments(batch.into_iter().collect()),
2860			)),
2861		))
2862		.await;
2863}
2864
2865/// Send assignments while honoring the `max_notification_size` of the protocol.
2866///
2867/// Splitting the messages into multiple notifications allows more granular processing at the
2868/// destination, such that the subsystem doesn't get stuck for long processing a batch
2869/// of assignments and can `select!` other tasks.
2870pub(crate) async fn send_assignments_batched(
2871	network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2872	v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
2873	peers: &[(PeerId, ProtocolVersion)],
2874) {
2875	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2876
2877	if !v3_peers.is_empty() {
2878		let mut v3 = v2_assignments.into_iter().peekable();
2879
2880		while v3.peek().is_some() {
2881			let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
2882			send_assignments_batched_inner(
2883				network_sender,
2884				batch,
2885				v3_peers.clone(),
2886				ValidationVersion::V3,
2887			)
2888			.await;
2889		}
2890	}
2891}
2892
2893/// Send approvals while honoring the `max_notification_size` of the protocol and peer version.
2894pub(crate) async fn send_approvals_batched(
2895	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2896	approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
2897	peers: &[(PeerId, ProtocolVersion)],
2898) {
2899	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2900
2901	if !v3_peers.is_empty() {
2902		let mut batches = approvals.into_iter().peekable();
2903
2904		while batches.peek().is_some() {
2905			let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
2906
2907			sender
2908				.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2909					v3_peers.clone(),
2910					ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2911						protocol_v3::ApprovalDistributionMessage::Approvals(batch),
2912					)),
2913				))
2914				.await;
2915		}
2916	}
2917}