#![warn(missing_docs)]
use self::metrics::Metrics;
use futures::{select, FutureExt as _};
use itertools::Itertools;
use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
use polkadot_node_network_protocol::{
self as net_protocol, filter_by_peer_version,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
peer_set::MAX_NOTIFICATION_SIZE,
v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, PeerId,
UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::{
approval::{
criteria::{AssignmentCriteria, InvalidAssignment},
time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
v1::{
AssignmentCertKind, BlockApprovalMeta, DelayTranche, IndirectAssignmentCert,
IndirectSignedApprovalVote, RelayVRFStory,
},
v2::{
AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
IndirectSignedApprovalVoteV2,
},
},
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::{
ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
RuntimeApiMessage,
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
};
use polkadot_primitives::{
BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
};
use rand::{CryptoRng, Rng, SeedableRng};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc,
time::Duration,
};
pub mod metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::approval-distribution";
const COST_UNEXPECTED_MESSAGE: Rep =
Rep::CostMinor("Peer sent an out-of-view assignment or approval");
const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
Rep::CostMinor("The vote was valid but too far in the future");
const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
const BENEFIT_VALID_MESSAGE_FIRST: Rep =
Rep::BenefitMinorFirst("Valid message with new information");
const MAX_BITFIELD_SIZE: usize = 500;
pub struct ApprovalDistribution {
metrics: Metrics,
slot_duration_millis: u64,
clock: Arc<dyn Clock + Send + Sync>,
assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
}
#[derive(Default)]
struct RecentlyOutdated {
buf: VecDeque<Hash>,
}
impl RecentlyOutdated {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 20;
self.buf.push_back(hash);
while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}
fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}
struct ApprovalRouting {
required_routing: RequiredRouting,
local: bool,
random_routing: RandomRouting,
peers_randomly_routed: Vec<PeerId>,
}
impl ApprovalRouting {
fn mark_randomly_sent(&mut self, peer: PeerId) {
self.random_routing.inc_sent();
self.peers_randomly_routed.push(peer);
}
}
struct ApprovalEntry {
assignment: IndirectAssignmentCertV2,
assignment_claimed_candidates: CandidateBitfield,
approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
validator_index: ValidatorIndex,
routing_info: ApprovalRouting,
}
#[derive(Debug)]
enum ApprovalEntryError {
InvalidValidatorIndex,
CandidateIndexOutOfBounds,
InvalidCandidateIndex,
DuplicateApproval,
UnknownAssignment,
#[allow(dead_code)]
AssignmentsFollowedDifferentPaths(RequiredRouting, RequiredRouting),
}
impl ApprovalEntry {
pub fn new(
assignment: IndirectAssignmentCertV2,
candidates: CandidateBitfield,
routing_info: ApprovalRouting,
) -> ApprovalEntry {
Self {
validator_index: assignment.validator,
assignment,
approvals: HashMap::new(),
assignment_claimed_candidates: candidates,
routing_info,
}
}
pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
(
MessageSubject(
block_hash,
self.assignment_claimed_candidates.clone(),
self.validator_index,
),
MessageKind::Assignment,
)
}
pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
&mut self.routing_info
}
pub fn routing_info(&self) -> &ApprovalRouting {
&self.routing_info
}
pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
self.routing_info.required_routing = required_routing;
}
pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
for candidate_index in approval.candidate_indices.iter_ones() {
if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
return true
}
}
return false
}
pub fn note_approval(
&mut self,
approval: IndirectSignedApprovalVoteV2,
) -> Result<(), ApprovalEntryError> {
if self.validator_index != approval.validator {
return Err(ApprovalEntryError::InvalidValidatorIndex)
}
if !self.includes_approval_candidates(&approval) {
return Err(ApprovalEntryError::InvalidCandidateIndex)
}
if self.approvals.contains_key(&approval.candidate_indices) {
return Err(ApprovalEntryError::DuplicateApproval)
}
self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
Ok(())
}
pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
(self.assignment.clone(), self.assignment_claimed_candidates.clone())
}
pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
self.approvals.values().cloned().collect::<Vec<_>>()
}
pub fn validator_index(&self) -> ValidatorIndex {
self.validator_index
}
}
struct PeerEntry {
pub view: View,
pub version: ProtocolVersion,
}
#[derive(Clone)]
struct AggressionConfig {
l1_threshold: Option<BlockNumber>,
l2_threshold: Option<BlockNumber>,
resend_unfinalized_period: Option<BlockNumber>,
}
impl AggressionConfig {
fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
if let Some(t) = self.l1_threshold {
age >= t
} else if let Some(t) = self.resend_unfinalized_period {
age > 0 && age % t == 0
} else {
false
}
}
}
impl Default for AggressionConfig {
fn default() -> Self {
AggressionConfig {
l1_threshold: Some(16),
l2_threshold: Some(28),
resend_unfinalized_period: Some(8),
}
}
}
#[derive(PartialEq)]
enum Resend {
Yes,
No,
}
#[derive(Default)]
pub struct State {
blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
blocks: HashMap<Hash, BlockEntry>,
pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
peer_views: HashMap<PeerId, PeerEntry>,
topologies: SessionGridTopologies,
recent_outdated_blocks: RecentlyOutdated,
aggression_config: AggressionConfig,
approval_checking_lag: BlockNumber,
reputation: ReputationAggregator,
slot_duration_millis: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MessageKind {
Assignment,
Approval,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
#[derive(Debug, Clone, Default)]
struct Knowledge {
known_messages: HashMap<MessageSubject, MessageKind>,
}
impl Knowledge {
fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
match (kind, self.known_messages.get(message)) {
(_, None) => false,
(MessageKind::Assignment, Some(_)) => true,
(MessageKind::Approval, Some(MessageKind::Assignment)) => false,
(MessageKind::Approval, Some(MessageKind::Approval)) => true,
}
}
fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
let mut success = match self.known_messages.entry(message.clone()) {
hash_map::Entry::Vacant(vacant) => {
vacant.insert(kind);
true
},
hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
(MessageKind::Assignment, MessageKind::Assignment) => false,
(MessageKind::Approval, MessageKind::Approval) => false,
(MessageKind::Approval, MessageKind::Assignment) => false,
(MessageKind::Assignment, MessageKind::Approval) => {
*occupied.get_mut() = MessageKind::Approval;
true
},
},
};
if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
for candidate_index in message.1.iter_ones() {
success = success &&
self.insert(
MessageSubject(
message.0,
vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
message.2,
),
kind,
);
}
}
success
}
}
#[derive(Debug, Clone, Default)]
struct PeerKnowledge {
sent: Knowledge,
received: Knowledge,
}
impl PeerKnowledge {
fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
self.sent.contains(message, kind) || self.received.contains(message, kind)
}
fn generate_assignments_keys(
approval: &IndirectSignedApprovalVoteV2,
) -> Vec<(MessageSubject, MessageKind)> {
approval
.candidate_indices
.iter_ones()
.map(|candidate_index| {
(
MessageSubject(
approval.block_hash,
(candidate_index as CandidateIndex).into(),
approval.validator,
),
MessageKind::Assignment,
)
})
.collect_vec()
}
fn generate_approval_key(
approval: &IndirectSignedApprovalVoteV2,
) -> (MessageSubject, MessageKind) {
(
MessageSubject(
approval.block_hash,
approval.candidate_indices.clone(),
approval.validator,
),
MessageKind::Approval,
)
}
}
struct BlockEntry {
known_by: HashMap<PeerId, PeerKnowledge>,
number: BlockNumber,
parent_hash: Hash,
knowledge: Knowledge,
candidates: Vec<CandidateEntry>,
candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
session: SessionIndex,
approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
vrf_story: RelayVRFStory,
slot: Slot,
}
impl BlockEntry {
pub fn known_by(&self) -> Vec<PeerId> {
self.known_by.keys().cloned().collect::<Vec<_>>()
}
pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
match self.candidates.get_mut(claimed_candidate_index) {
Some(candidate_entry) => {
candidate_entry
.assignments
.entry(entry.validator_index())
.or_insert(entry.assignment_claimed_candidates.clone());
},
None => {
gum::warn!(
target: LOG_TARGET,
hash = ?entry.assignment.block_hash,
?claimed_candidate_index,
"Missing candidate entry on `import_and_circulate_assignment`",
);
},
};
}
self.approval_entries
.entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
.or_insert(entry)
}
pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
candidate_indices
.iter_ones()
.all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
}
pub fn note_approval(
&mut self,
approval: IndirectSignedApprovalVoteV2,
) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
let mut required_routing = None;
let mut peers_randomly_routed_to = HashSet::new();
if self.candidates.len() < approval.candidate_indices.len() as usize {
return Err(ApprovalEntryError::CandidateIndexOutOfBounds)
}
let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
.candidate_indices
.iter_ones()
.filter_map(|candidate_index| {
self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
candidate_entry.assignments.get(&approval.validator).cloned()
})
})
.collect();
for assignment_bitfield in covered_assignments_bitfields {
if let Some(approval_entry) =
self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
{
approval_entry.note_approval(approval.clone())?;
peers_randomly_routed_to
.extend(approval_entry.routing_info().peers_randomly_routed.iter());
if let Some(required_routing) = required_routing {
if required_routing != approval_entry.routing_info().required_routing {
return Err(ApprovalEntryError::AssignmentsFollowedDifferentPaths(
required_routing,
approval_entry.routing_info().required_routing,
))
}
} else {
required_routing = Some(approval_entry.routing_info().required_routing)
}
}
}
if let Some(required_routing) = required_routing {
Ok((required_routing, peers_randomly_routed_to))
} else {
Err(ApprovalEntryError::UnknownAssignment)
}
}
pub fn approval_votes(
&self,
candidate_index: CandidateIndex,
) -> Vec<IndirectSignedApprovalVoteV2> {
let result: Option<
HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
> = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
candidate_entry
.assignments
.iter()
.filter_map(|(validator, assignment_bitfield)| {
self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
})
.flat_map(|approval_entry| {
approval_entry
.approvals
.clone()
.into_iter()
.filter(|(approved_candidates, _)| {
approved_candidates.bit_at(candidate_index.as_bit_index())
})
.map(|(approved_candidates, vote)| {
((approval_entry.validator_index, approved_candidates), vote)
})
})
.collect()
});
result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
}
}
#[derive(Debug, Default)]
struct CandidateEntry {
assignments: HashMap<ValidatorIndex, CandidateBitfield>,
}
#[derive(Debug, Clone, PartialEq)]
enum MessageSource {
Peer(PeerId),
Local,
}
#[derive(Debug)]
enum InvalidAssignmentError {
#[allow(dead_code)]
CryptoCheckFailed(InvalidAssignment),
NoClaimedCandidates,
#[allow(dead_code)]
ClaimedInvalidCandidateIndex {
claimed_index: usize,
max_index: usize,
},
OversizedClaimedBitfield,
#[allow(dead_code)]
SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
}
#[derive(Debug)]
enum InvalidVoteError {
CandidateIndexOutOfBounds,
ValidatorIndexOutOfBounds,
InvalidSignature,
#[allow(dead_code)]
SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
}
impl MessageSource {
fn peer_id(&self) -> Option<PeerId> {
match self {
Self::Peer(id) => Some(*id),
Self::Local => None,
}
}
}
enum PendingMessage {
Assignment(IndirectAssignmentCertV2, CandidateBitfield),
Approval(IndirectSignedApprovalVoteV2),
}
#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
impl State {
pub fn with_config(slot_duration_millis: u64) -> Self {
Self { slot_duration_millis, ..Default::default() }
}
async fn handle_network_msg<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
rng: &mut (impl CryptoRng + Rng),
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
if let Some(authority_ids) = authority_ids {
self.topologies.update_authority_ids(peer_id, &authority_ids);
}
self.peer_views
.entry(peer_id)
.or_insert(PeerEntry { view: Default::default(), version });
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
self.peer_views.remove(&peer_id);
self.blocks.iter_mut().for_each(|(_hash, entry)| {
entry.known_by.remove(&peer_id);
})
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
self.handle_new_session_topology(
network_sender,
topology.session,
topology.topology,
topology.local_index,
)
.await;
},
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
},
NetworkBridgeEvent::OurViewChange(view) => {
gum::trace!(target: LOG_TARGET, ?view, "Own view change");
for head in view.iter() {
if !self.blocks.contains_key(head) {
self.pending_known.entry(*head).or_default();
}
}
self.pending_known.retain(|h, _| {
let live = view.contains(h);
if !live {
gum::trace!(
target: LOG_TARGET,
block_hash = ?h,
"Cleaning up stale pending messages",
);
}
live
});
},
NetworkBridgeEvent::PeerMessage(peer_id, message) => {
self.process_incoming_peer_message(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
peer_id,
message,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
if self.topologies.update_authority_ids(peer_id, &authority_ids) {
if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
let intersection = self
.blocks_by_number
.iter()
.filter(|(block_number, _)| *block_number > &view.finalized_number)
.flat_map(|(_, hashes)| {
hashes.iter().filter(|hash| {
self.blocks
.get(&hash)
.map(|block| block.known_by.get(&peer_id).is_some())
.unwrap_or_default()
})
});
let view_intersection =
View::new(intersection.cloned(), view.finalized_number);
Self::unify_with_peer(
network_sender,
metrics,
&mut self.blocks,
&self.topologies,
self.peer_views.len(),
peer_id,
*version,
view_intersection,
rng,
true,
)
.await;
}
}
},
}
}
async fn handle_new_blocks<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
metas: Vec<BlockApprovalMeta>,
rng: &mut (impl CryptoRng + Rng),
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) {
let mut new_hashes = HashSet::new();
gum::debug!(
target: LOG_TARGET,
"Got new blocks {:?}",
metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
);
for meta in metas {
match self.blocks.entry(meta.hash) {
hash_map::Entry::Vacant(entry) => {
let candidates_count = meta.candidates.len();
let mut candidates = Vec::with_capacity(candidates_count);
candidates.resize_with(candidates_count, Default::default);
entry.insert(BlockEntry {
known_by: HashMap::new(),
number: meta.number,
parent_hash: meta.parent_hash,
knowledge: Knowledge::default(),
candidates,
session: meta.session,
approval_entries: HashMap::new(),
candidates_metadata: meta.candidates,
vrf_story: meta.vrf_story,
slot: meta.slot,
});
self.topologies.inc_session_refs(meta.session);
new_hashes.insert(meta.hash);
self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
},
_ => continue,
}
}
{
for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
let intersection = view.iter().filter(|h| new_hashes.contains(h));
let view_intersection = View::new(intersection.cloned(), view.finalized_number);
Self::unify_with_peer(
network_sender,
metrics,
&mut self.blocks,
&self.topologies,
self.peer_views.len(),
*peer_id,
*version,
view_intersection,
rng,
false,
)
.await;
}
let pending_now_known = self
.pending_known
.keys()
.filter(|k| self.blocks.contains_key(k))
.copied()
.collect::<Vec<_>>();
let to_import = pending_now_known
.into_iter()
.inspect(|h| {
gum::trace!(
target: LOG_TARGET,
block_hash = ?h,
"Extracting pending messages for new block"
)
})
.filter_map(|k| self.pending_known.remove(&k))
.flatten()
.collect::<Vec<_>>();
if !to_import.is_empty() {
gum::debug!(
target: LOG_TARGET,
num = to_import.len(),
"Processing pending assignment/approvals",
);
let _timer = metrics.time_import_pending_now_known();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_indices) => {
self.import_and_circulate_assignment(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_indices,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
PendingMessage::Approval(approval_vote) => {
self.import_and_circulate_approval(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
session_info_provider,
)
.await;
},
}
}
}
}
self.enable_aggression(network_sender, Resend::Yes, metrics).await;
}
async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
&mut self,
network_sender: &mut N,
session: SessionIndex,
topology: SessionGridTopology,
local_index: Option<ValidatorIndex>,
) {
if local_index.is_none() {
return
}
self.topologies.insert_topology(session, topology, local_index);
let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
adjust_required_routing_and_propagate(
network_sender,
&mut self.blocks,
&self.topologies,
|block_entry| block_entry.session == session,
|required_routing, local, validator_index| {
if required_routing == &RequiredRouting::PendingTopology {
topology
.local_grid_neighbors()
.required_routing_by_index(*validator_index, local)
} else {
*required_routing
}
},
&self.peer_views,
)
.await;
}
async fn process_incoming_assignments<A, N, R, RA>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
peer_id: PeerId,
assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
rng: &mut R,
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) where
A: overseer::SubsystemSender<ApprovalVotingMessage>,
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
R: CryptoRng + Rng,
{
for (assignment, claimed_indices) in assignments {
if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
let block_hash = &assignment.block_hash;
let validator_index = assignment.validator;
gum::trace!(
target: LOG_TARGET,
%peer_id,
?block_hash,
?claimed_indices,
?validator_index,
"Pending assignment",
);
pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
continue
}
self.import_and_circulate_assignment(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_indices,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
}
}
async fn process_incoming_approvals<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
peer_id: PeerId,
approvals: Vec<IndirectSignedApprovalVoteV2>,
session_info_provider: &mut RuntimeInfo,
) {
gum::trace!(
target: LOG_TARGET,
peer_id = %peer_id,
num = approvals.len(),
"Processing approvals from a peer",
);
for approval_vote in approvals.into_iter() {
if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
let block_hash = approval_vote.block_hash;
let validator_index = approval_vote.validator;
gum::trace!(
target: LOG_TARGET,
%peer_id,
?block_hash,
?validator_index,
"Pending assignment candidates {:?}",
approval_vote.candidate_indices,
);
pending.push((peer_id, PendingMessage::Approval(approval_vote)));
continue
}
self.import_and_circulate_approval(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
session_info_provider,
)
.await;
}
}
async fn process_incoming_peer_message<A, N, RA, R>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
peer_id: PeerId,
msg: Versioned<
protocol_v1::ApprovalDistributionMessage,
protocol_v2::ApprovalDistributionMessage,
protocol_v3::ApprovalDistributionMessage,
>,
rng: &mut R,
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) where
A: overseer::SubsystemSender<ApprovalVotingMessage>,
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
R: CryptoRng + Rng,
{
match msg {
Versioned::V3(protocol_v3::ApprovalDistributionMessage::Assignments(assignments)) => {
gum::trace!(
target: LOG_TARGET,
peer_id = %peer_id,
num = assignments.len(),
"Processing assignments from a peer",
);
let sanitized_assignments =
self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
self.process_incoming_assignments(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
peer_id,
sanitized_assignments,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
Versioned::V1(protocol_v1::ApprovalDistributionMessage::Assignments(assignments)) |
Versioned::V2(protocol_v2::ApprovalDistributionMessage::Assignments(assignments)) => {
gum::trace!(
target: LOG_TARGET,
peer_id = %peer_id,
num = assignments.len(),
"Processing assignments from a peer",
);
let sanitized_assignments =
self.sanitize_v1_assignments(peer_id, network_sender, assignments).await;
self.process_incoming_assignments(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
peer_id,
sanitized_assignments,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
Versioned::V3(protocol_v3::ApprovalDistributionMessage::Approvals(approvals)) => {
let sanitized_approvals =
self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
self.process_incoming_approvals(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
peer_id,
sanitized_approvals,
session_info_provider,
)
.await;
},
Versioned::V1(protocol_v1::ApprovalDistributionMessage::Approvals(approvals)) |
Versioned::V2(protocol_v2::ApprovalDistributionMessage::Approvals(approvals)) => {
let sanitized_approvals =
self.sanitize_v1_approvals(peer_id, network_sender, approvals).await;
self.process_incoming_approvals(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
peer_id,
sanitized_approvals,
session_info_provider,
)
.await;
},
}
}
async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
&mut self,
network_sender: &mut N,
metrics: &Metrics,
peer_id: PeerId,
view: View,
rng: &mut R,
) where
R: CryptoRng + Rng,
{
gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
let finalized_number = view.finalized_number;
let (old_view, protocol_version) =
if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
(Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
} else {
gum::warn!(
target: LOG_TARGET,
?peer_id,
?view,
"Peer view change for missing `peer_entry`"
);
(None, ValidationVersion::V1.into())
};
let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
let blocks = &mut self.blocks;
let range = old_finalized_number..=finalized_number;
if !range.is_empty() && !blocks.is_empty() {
self.blocks_by_number
.range(range)
.flat_map(|(_number, hashes)| hashes)
.for_each(|hash| {
if let Some(entry) = blocks.get_mut(hash) {
entry.known_by.remove(&peer_id);
}
});
}
Self::unify_with_peer(
network_sender,
metrics,
&mut self.blocks,
&self.topologies,
self.peer_views.len(),
peer_id,
protocol_version,
view,
rng,
false,
)
.await;
}
async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
&mut self,
network_sender: &mut N,
metrics: &Metrics,
finalized_number: BlockNumber,
) {
let split_point = finalized_number.saturating_add(1);
let mut old_blocks = self.blocks_by_number.split_off(&split_point);
std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
old_blocks.values().flatten().for_each(|relay_block| {
self.recent_outdated_blocks.note_outdated(*relay_block);
if let Some(block_entry) = self.blocks.remove(relay_block) {
self.topologies.dec_session_refs(block_entry.session);
}
});
self.enable_aggression(network_sender, Resend::No, metrics).await;
}
async fn import_and_circulate_assignment<A, N, RA, R>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
source: MessageSource,
assignment: IndirectAssignmentCertV2,
claimed_candidate_indices: CandidateBitfield,
rng: &mut R,
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) where
A: overseer::SubsystemSender<ApprovalVotingMessage>,
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
R: CryptoRng + Rng,
{
let block_hash = assignment.block_hash;
let validator_index = assignment.validator;
let entry = match self.blocks.get_mut(&block_hash) {
Some(entry) => entry,
None => {
if let Some(peer_id) = source.peer_id() {
gum::trace!(
target: LOG_TARGET,
?peer_id,
hash = ?block_hash,
?validator_index,
"Unexpected assignment",
);
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
metrics.on_assignment_recent_outdated();
}
}
metrics.on_assignment_invalid_block();
return
},
};
let (message_subject, message_kind) = (
MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
MessageKind::Assignment,
);
if let Some(peer_id) = source.peer_id() {
match entry.known_by.entry(peer_id) {
hash_map::Entry::Occupied(mut peer_knowledge) => {
let peer_knowledge = peer_knowledge.get_mut();
if peer_knowledge.contains(&message_subject, message_kind) {
if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Duplicate assignment",
);
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
metrics.on_assignment_duplicate();
} else {
gum::trace!(
target: LOG_TARGET,
?peer_id,
hash = ?block_hash,
?validator_index,
?message_subject,
"We sent the message to the peer while peer was sending it to us. Known race condition.",
);
}
return
}
},
hash_map::Entry::Vacant(_) => {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Assignment from a peer is out of view",
);
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
metrics.on_assignment_out_of_view();
},
}
if entry.knowledge.contains(&message_subject, message_kind) {
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
peer_knowledge.received.insert(message_subject, message_kind);
}
metrics.on_assignment_good_known();
return
}
let result = Self::check_assignment_valid(
assignment_criteria,
&entry,
&assignment,
&claimed_candidate_indices,
session_info_provider,
runtime_api_sender,
)
.await;
match result {
Ok(checked_assignment) => {
let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
let too_far_in_future =
current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
if checked_assignment.tranche() >= too_far_in_future {
gum::debug!(
target: LOG_TARGET,
hash = ?block_hash,
?peer_id,
"Got an assignment too far in the future",
);
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
)
.await;
metrics.on_assignment_far();
return
}
approval_voting_sender
.send_message(ApprovalVotingMessage::ImportAssignment(
checked_assignment,
None,
))
.await;
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry.knowledge.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
},
Err(error) => {
gum::info!(
target: LOG_TARGET,
hash = ?block_hash,
?peer_id,
?error,
"Got a bad assignment from peer",
);
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_INVALID_MESSAGE,
)
.await;
metrics.on_assignment_bad();
return
},
}
} else {
if !entry.knowledge.insert(message_subject.clone(), message_kind) {
gum::warn!(
target: LOG_TARGET,
?message_subject,
"Importing locally an already known assignment",
);
return
} else {
gum::debug!(
target: LOG_TARGET,
?message_subject,
"Importing locally a new assignment",
);
}
}
metrics.on_assignment_imported(&assignment.cert.kind);
let topology = self.topologies.get_topology(entry.session);
let local = source == MessageSource::Local;
let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});
let mut peers = HashSet::new();
let peers_to_route_to = topology
.as_ref()
.map(|t| t.peers_to_route(required_routing))
.unwrap_or_default();
for peer in peers_to_route_to {
if !entry.known_by.contains_key(&peer) {
continue
}
peers.insert(peer);
}
let peers_to_filter = entry.known_by();
let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
assignment.clone(),
claimed_candidate_indices.clone(),
ApprovalRouting {
required_routing,
local,
random_routing: Default::default(),
peers_randomly_routed: Default::default(),
},
));
let assignments = vec![(assignment, claimed_candidate_indices.clone())];
let n_peers_total = self.peer_views.len();
let source_peer = source.peer_id();
for peer in peers_to_filter.into_iter() {
if Some(peer) == source_peer {
continue
}
if peers.contains(&peer) {
continue
}
if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
continue
}
let route_random =
approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
if route_random {
approval_entry.routing_info_mut().mark_randomly_sent(peer);
peers.insert(peer);
}
if approval_entry.routing_info().random_routing.is_complete() {
break
}
}
for peer in peers.iter() {
if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
peer_knowledge.sent.insert(message_subject.clone(), message_kind);
}
}
if !peers.is_empty() {
gum::trace!(
target: LOG_TARGET,
?block_hash,
?claimed_candidate_indices,
local = source.peer_id().is_none(),
num_peers = peers.len(),
"Sending an assignment to peers",
);
let peers = peers
.iter()
.filter_map(|peer_id| {
self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
})
.collect::<Vec<_>>();
send_assignments_batched(network_sender, assignments, &peers).await;
}
}
async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
entry: &BlockEntry,
assignment: &IndirectAssignmentCertV2,
claimed_candidate_indices: &CandidateBitfield,
runtime_info: &mut RuntimeInfo,
runtime_api_sender: &mut RA,
) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
let ExtendedSessionInfo { ref session_info, .. } = runtime_info
.get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
.await
.map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
if claimed_candidate_indices.len() > session_info.n_cores as usize {
return Err(InvalidAssignmentError::OversizedClaimedBitfield)
}
let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
.iter_ones()
.map(|candidate_index| {
entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
InvalidAssignmentError::ClaimedInvalidCandidateIndex {
claimed_index: candidate_index,
max_index: entry.candidates_metadata.len(),
},
)
})
.collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
let Ok(claimed_cores) = claimed_cores.try_into() else {
return Err(InvalidAssignmentError::NoClaimedCandidates)
};
let backing_groups = claimed_candidate_indices
.iter_ones()
.flat_map(|candidate_index| {
entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
})
.collect::<Vec<_>>();
assignment_criteria
.check_assignment_cert(
claimed_cores,
assignment.validator,
&polkadot_node_primitives::approval::criteria::Config::from(session_info),
entry.vrf_story.clone(),
&assignment.cert,
backing_groups,
)
.map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
.map(|tranche| {
CheckedIndirectAssignment::from_checked(
assignment.clone(),
claimed_candidate_indices.clone(),
tranche,
)
})
}
async fn check_approval_can_be_processed<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
>(
network_sender: &mut N,
assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
approval_knowledge_key: &(MessageSubject, MessageKind),
entry: &mut BlockEntry,
reputation: &mut ReputationAggregator,
peer_id: PeerId,
metrics: &Metrics,
) -> bool {
for message_subject in assignments_knowledge_key {
if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Unknown approval assignment",
);
modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
.await;
metrics.on_approval_unknown_assignment();
return false
}
}
match entry.known_by.entry(peer_id) {
hash_map::Entry::Occupied(mut knowledge) => {
let peer_knowledge = knowledge.get_mut();
if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
if !peer_knowledge
.received
.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
{
gum::trace!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Duplicate approval",
);
modify_reputation(
reputation,
network_sender,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
metrics.on_approval_duplicate();
}
return false
}
},
hash_map::Entry::Vacant(_) => {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Approval from a peer is out of view",
);
modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
.await;
metrics.on_approval_out_of_view();
},
}
if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge
.received
.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
}
gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
metrics.on_approval_good_known();
modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
false
} else {
true
}
}
async fn import_and_circulate_approval<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
&mut self,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
metrics: &Metrics,
source: MessageSource,
vote: IndirectSignedApprovalVoteV2,
session_info_provider: &mut RuntimeInfo,
) {
let block_hash = vote.block_hash;
let validator_index = vote.validator;
let candidate_indices = &vote.candidate_indices;
let entry = match self.blocks.get_mut(&block_hash) {
Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
_ => {
if let Some(peer_id) = source.peer_id() {
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?block_hash,
?validator_index,
?candidate_indices,
"Approval from a peer is out of view",
);
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
metrics.on_approval_invalid_block();
} else {
metrics.on_approval_recent_outdated();
}
}
return
},
};
let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
if let Some(peer_id) = source.peer_id() {
if !Self::check_approval_can_be_processed(
network_sender,
&assignments_knowledge_keys,
&approval_knwowledge_key,
entry,
&mut self.reputation,
peer_id,
metrics,
)
.await
{
return
}
let result =
Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
.await;
match result {
Ok(vote) => {
approval_voting_sender
.send_message(ApprovalVotingMessage::ImportApproval(vote, None))
.await;
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry
.knowledge
.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge
.received
.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
}
},
Err(err) => {
modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_INVALID_MESSAGE,
)
.await;
gum::info!(
target: LOG_TARGET,
?peer_id,
?err,
"Got a bad approval from peer",
);
metrics.on_approval_bad();
return
},
}
} else {
if !entry
.knowledge
.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
{
gum::warn!(
target: LOG_TARGET,
"Importing locally an already known approval",
);
return
} else {
gum::debug!(
target: LOG_TARGET,
"Importing locally a new approval",
);
}
}
let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
Ok(required_routing) => required_routing,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
hash = ?block_hash,
validator_index = ?vote.validator,
candidate_bitfield = ?vote.candidate_indices,
?err,
"Possible bug: Vote import failed",
);
metrics.on_approval_bug();
return
},
};
metrics.on_approval_imported();
let topology = self.topologies.get_topology(entry.session);
let source_peer = source.peer_id();
let peer_filter = move |peer| {
if Some(peer) == source_peer.as_ref() {
return false
}
let in_topology = topology
.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
in_topology || peers_randomly_routed_to.contains(peer)
};
let peers = entry
.known_by
.iter()
.filter(|(p, _)| peer_filter(p))
.filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
.collect::<Vec<_>>();
for peer in peers.iter() {
if let Some(entry) = entry.known_by.get_mut(&peer.0) {
entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
}
}
if !peers.is_empty() {
let approvals = vec![vote];
gum::trace!(
target: LOG_TARGET,
?block_hash,
local = source.peer_id().is_none(),
num_peers = peers.len(),
"Sending an approval to peers",
);
send_approvals_batched(network_sender, approvals, &peers).await;
}
}
async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
vote: &IndirectSignedApprovalVoteV2,
entry: &BlockEntry,
runtime_info: &mut RuntimeInfo,
runtime_api_sender: &mut RA,
) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
if vote.candidate_indices.len() > entry.candidates_metadata.len() {
return Err(InvalidVoteError::CandidateIndexOutOfBounds)
}
let candidate_hashes = vote
.candidate_indices
.iter_ones()
.flat_map(|candidate_index| {
entry
.candidates_metadata
.get(candidate_index)
.map(|(candidate_hash, _, _)| *candidate_hash)
})
.collect::<Vec<_>>();
let ExtendedSessionInfo { ref session_info, .. } = runtime_info
.get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
.await
.map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
let pubkey = session_info
.validators
.get(vote.validator)
.ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
candidate_hashes.clone(),
))
.check_signature(
&pubkey,
*candidate_hashes.first().unwrap(),
entry.session,
&vote.signature,
)
.map_err(|_| InvalidVoteError::InvalidSignature)
.map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
}
fn get_approval_signatures(
&mut self,
indices: HashSet<(Hash, CandidateIndex)>,
) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
let mut all_sigs = HashMap::new();
for (hash, index) in indices {
let block_entry = match self.blocks.get(&hash) {
None => {
gum::debug!(
target: LOG_TARGET,
?hash,
"`get_approval_signatures`: could not find block entry for given hash!"
);
continue
},
Some(e) => e,
};
let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
(
approval.validator,
(
hash,
approval
.candidate_indices
.iter_ones()
.map(|val| val as CandidateIndex)
.collect_vec(),
approval.signature,
),
)
});
all_sigs.extend(sigs);
}
all_sigs
}
async fn unify_with_peer(
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
metrics: &Metrics,
entries: &mut HashMap<Hash, BlockEntry>,
topologies: &SessionGridTopologies,
total_peers: usize,
peer_id: PeerId,
protocol_version: ProtocolVersion,
view: View,
rng: &mut (impl CryptoRng + Rng),
retry_known_blocks: bool,
) {
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut assignments_to_send = Vec::new();
let mut approvals_to_send = Vec::new();
let view_finalized_number = view.finalized_number;
for head in view.into_iter() {
let mut block = head;
loop {
let entry = match entries.get_mut(&block) {
Some(entry) if entry.number > view_finalized_number => entry,
_ => break,
};
if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
break
}
let peer_knowledge = entry.known_by.entry(peer_id).or_default();
let topology = topologies.get_topology(entry.session);
for approval_entry in entry.approval_entries.values_mut() {
{
let required_routing = approval_entry.routing_info().required_routing;
let routing_info = &mut approval_entry.routing_info_mut();
let rng = &mut *rng;
let mut peer_filter = move |peer_id| {
let in_topology = topology.as_ref().map_or(false, |t| {
t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
});
in_topology || {
if !topology
.map(|topology| topology.is_validator(peer_id))
.unwrap_or(false)
{
return false
}
let route_random =
routing_info.random_routing.sample(total_peers, rng);
if route_random {
routing_info.mark_randomly_sent(*peer_id);
}
route_random
}
};
if !peer_filter(&peer_id) {
continue
}
}
let assignment_message = approval_entry.assignment();
let approval_messages = approval_entry.approvals();
let (assignment_knowledge, message_kind) =
approval_entry.create_assignment_knowledge(block);
if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
peer_knowledge.sent.insert(assignment_knowledge, message_kind);
assignments_to_send.push(assignment_message);
}
for approval_message in approval_messages {
let approval_knowledge =
PeerKnowledge::generate_approval_key(&approval_message);
if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
approvals_to_send.push(approval_message);
peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
}
}
}
block = entry.parent_hash;
}
}
if !assignments_to_send.is_empty() {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?protocol_version,
num = assignments_to_send.len(),
"Sending assignments to unified peer",
);
send_assignments_batched(
sender,
assignments_to_send,
&vec![(peer_id, protocol_version)],
)
.await;
}
if !approvals_to_send.is_empty() {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?protocol_version,
num = approvals_to_send.len(),
"Sending approvals to unified peer",
);
send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
.await;
}
}
async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
&mut self,
network_sender: &mut N,
resend: Resend,
metrics: &Metrics,
) {
let config = self.aggression_config.clone();
let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
let (min_age, max_age) = match (min_age, max_age) {
(Some(min), Some(max)) => (*min, *max),
_ => return, };
let age = max_age.saturating_sub(min_age);
if !self.aggression_config.should_trigger_aggression(age) {
gum::trace!(
target: LOG_TARGET,
approval_checking_lag = self.approval_checking_lag,
age,
"Aggression not enabled",
);
return
}
gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
adjust_required_routing_and_propagate(
network_sender,
&mut self.blocks,
&self.topologies,
|block_entry| {
let block_age = max_age - block_entry.number;
if resend == Resend::Yes &&
config
.resend_unfinalized_period
.as_ref()
.map_or(false, |p| block_age > 0 && block_age % p == 0)
{
for (_, knowledge) in block_entry.known_by.iter_mut() {
knowledge.sent = Knowledge::default();
}
true
} else {
false
}
},
|required_routing, _, _| *required_routing,
&self.peer_views,
)
.await;
adjust_required_routing_and_propagate(
network_sender,
&mut self.blocks,
&self.topologies,
|block_entry| {
block_entry.number == min_age
},
|required_routing, local, _| {
if *required_routing == RequiredRouting::PendingTopology {
gum::debug!(
target: LOG_TARGET,
lag = ?self.approval_checking_lag,
"Encountered old block pending gossip topology",
);
return *required_routing
}
let mut new_required_routing = *required_routing;
if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
if local && new_required_routing != RequiredRouting::All {
metrics.on_aggression_l1();
new_required_routing = RequiredRouting::All;
}
}
if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
if !local && new_required_routing != RequiredRouting::GridXY {
metrics.on_aggression_l2();
new_required_routing = RequiredRouting::GridXY;
}
}
new_required_routing
},
&self.peer_views,
)
.await;
}
async fn sanitize_v1_assignments(
&mut self,
peer_id: PeerId,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>,
) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
let mut sanitized_assignments = Vec::new();
for (cert, candidate_index) in assignments.into_iter() {
let cert_bitfield_bits = match cert.cert.kind {
AssignmentCertKind::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
AssignmentCertKind::RelayVRFModulo { .. } => candidate_index as usize + 1,
};
let candidate_bitfield_bits = candidate_index as usize + 1;
if cert_bitfield_bits > MAX_BITFIELD_SIZE || candidate_bitfield_bits > MAX_BITFIELD_SIZE
{
modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
.await;
gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, kind = ?cert.cert.kind, "Bad assignment v1, invalid candidate index");
} else {
sanitized_assignments.push((cert.into(), candidate_index.into()))
}
}
sanitized_assignments
}
async fn sanitize_v2_assignments(
&mut self,
peer_id: PeerId,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
let mut sanitized_assignments = Vec::new();
for (cert, candidate_bitfield) in assignments.into_iter() {
let cert_bitfield_bits = match &cert.cert.kind {
AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
core_bitfield.len(),
};
let candidate_bitfield_bits = candidate_bitfield.len();
let msb = candidate_bitfield_bits - 1;
if cert_bitfield_bits > MAX_BITFIELD_SIZE
|| candidate_bitfield_bits > MAX_BITFIELD_SIZE
|| !candidate_bitfield.bit_at(msb.as_bit_index())
{
modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
.await;
for candidate_index in candidate_bitfield.iter_ones() {
gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
}
} else {
sanitized_assignments.push((cert, candidate_bitfield))
}
}
sanitized_assignments
}
async fn sanitize_v1_approvals(
&mut self,
peer_id: PeerId,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
approval: Vec<IndirectSignedApprovalVote>,
) -> Vec<IndirectSignedApprovalVoteV2> {
let mut sanitized_approvals = Vec::new();
for approval in approval.into_iter() {
if approval.candidate_index as usize > MAX_BITFIELD_SIZE {
modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
.await;
gum::debug!(
target: LOG_TARGET,
block_hash = ?approval.block_hash,
candidate_index = ?approval.candidate_index,
"Bad approval v1, invalid candidate index"
);
} else {
sanitized_approvals.push(approval.into())
}
}
sanitized_approvals
}
async fn sanitize_v2_approvals(
&mut self,
peer_id: PeerId,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
approval: Vec<IndirectSignedApprovalVoteV2>,
) -> Vec<IndirectSignedApprovalVoteV2> {
let mut sanitized_approvals = Vec::new();
for approval in approval.into_iter() {
if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE {
modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
.await;
gum::debug!(
target: LOG_TARGET,
block_hash = ?approval.block_hash,
candidate_indices_len = ?approval.candidate_indices.len(),
"Bad approval v2, invalid candidate indices size"
);
} else {
sanitized_approvals.push(approval)
}
}
sanitized_approvals
}
}
#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
async fn adjust_required_routing_and_propagate<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
BlockFilter,
RoutingModifier,
>(
network_sender: &mut N,
blocks: &mut HashMap<Hash, BlockEntry>,
topologies: &SessionGridTopologies,
block_filter: BlockFilter,
routing_modifier: RoutingModifier,
peer_views: &HashMap<PeerId, PeerEntry>,
) where
BlockFilter: Fn(&mut BlockEntry) -> bool,
RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
{
let mut peer_assignments = HashMap::new();
let mut peer_approvals = HashMap::new();
for (block_hash, block_entry) in blocks {
if !block_filter(block_entry) {
continue
}
let topology = match topologies.get_topology(block_entry.session) {
Some(t) => t,
None => continue,
};
for approval_entry in block_entry.approval_entries.values_mut() {
let new_required_routing = routing_modifier(
&approval_entry.routing_info().required_routing,
approval_entry.routing_info().local,
&approval_entry.validator_index(),
);
approval_entry.update_required_routing(new_required_routing);
if approval_entry.routing_info().required_routing.is_empty() {
continue
}
let assignment_message = approval_entry.assignment();
let approval_messages = approval_entry.approvals();
let (assignment_knowledge, message_kind) =
approval_entry.create_assignment_knowledge(*block_hash);
for (peer, peer_knowledge) in &mut block_entry.known_by {
if !topology
.local_grid_neighbors()
.route_to_peer(approval_entry.routing_info().required_routing, peer)
{
continue
}
if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
peer_assignments
.entry(*peer)
.or_insert_with(Vec::new)
.push(assignment_message.clone());
}
for approval_message in &approval_messages {
let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
peer_approvals
.entry(*peer)
.or_insert_with(Vec::new)
.push(approval_message.clone());
}
}
}
}
}
for (peer, assignments_packet) in peer_assignments {
if let Some(peer_view) = peer_views.get(&peer) {
send_assignments_batched(
network_sender,
assignments_packet,
&vec![(peer, peer_view.version)],
)
.await;
} else {
gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
}
}
for (peer, approvals_packet) in peer_approvals {
if let Some(peer_view) = peer_views.get(&peer) {
send_approvals_batched(
network_sender,
approvals_packet,
&vec![(peer, peer_view.version)],
)
.await;
} else {
gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
}
}
}
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
peer_id: PeerId,
rep: Rep,
) {
gum::trace!(
target: LOG_TARGET,
reputation = ?rep,
?peer_id,
"Reputation change for peer",
);
reputation.modify(sender, peer_id, rep).await;
}
#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
impl ApprovalDistribution {
pub fn new(
metrics: Metrics,
slot_duration_millis: u64,
assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
) -> Self {
Self::new_with_clock(
metrics,
slot_duration_millis,
Arc::new(SystemClock),
assignment_criteria,
)
}
pub fn new_with_clock(
metrics: Metrics,
slot_duration_millis: u64,
clock: Arc<dyn Clock + Send + Sync>,
assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
) -> Self {
Self { metrics, slot_duration_millis, clock, assignment_criteria }
}
async fn run<Context>(self, ctx: Context) {
let mut state =
State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
let mut rng = rand::rngs::StdRng::from_entropy();
let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
keystore: None,
session_cache_lru_size: DISPUTE_WINDOW.get(),
});
self.run_inner(
ctx,
&mut state,
REPUTATION_CHANGE_INTERVAL,
&mut rng,
&mut session_info_provider,
)
.await
}
async fn run_inner<Context>(
self,
mut ctx: Context,
state: &mut State,
reputation_interval: Duration,
rng: &mut (impl CryptoRng + Rng),
session_info_provider: &mut RuntimeInfo,
) {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
let mut approval_voting_sender = ctx.sender().clone();
let mut network_sender = ctx.sender().clone();
let mut runtime_api_sender = ctx.sender().clone();
loop {
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
},
};
if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
return;
}
},
}
}
}
pub async fn handle_from_orchestra<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
&self,
message: FromOrchestra<ApprovalDistributionMessage>,
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
state: &mut State,
rng: &mut (impl CryptoRng + Rng),
session_info_provider: &mut RuntimeInfo,
) -> bool {
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(
approval_voting_sender,
network_sender,
runtime_api_sender,
state,
msg,
&self.metrics,
rng,
self.assignment_criteria.as_ref(),
self.clock.as_ref(),
session_info_provider,
)
.await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(network_sender, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
}
false
}
async fn handle_incoming<
N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
A: overseer::SubsystemSender<ApprovalVotingMessage>,
RA: overseer::SubsystemSender<RuntimeApiMessage>,
>(
approval_voting_sender: &mut A,
network_sender: &mut N,
runtime_api_sender: &mut RA,
state: &mut State,
msg: ApprovalDistributionMessage,
metrics: &Metrics,
rng: &mut (impl CryptoRng + Rng),
assignment_criteria: &(impl AssignmentCriteria + ?Sized),
clock: &(impl Clock + ?Sized),
session_info_provider: &mut RuntimeInfo,
) {
match msg {
ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
state
.handle_network_msg(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
event,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
ApprovalDistributionMessage::NewBlocks(metas) => {
state
.handle_new_blocks(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
metas,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
gum::debug!(
target: LOG_TARGET,
?candidate_indices,
block_hash = ?cert.block_hash,
assignment_kind = ?cert.cert.kind,
"Distributing our assignment on candidates",
);
state
.import_and_circulate_assignment(
approval_voting_sender,
network_sender,
runtime_api_sender,
&metrics,
MessageSource::Local,
cert,
candidate_indices,
rng,
assignment_criteria,
clock,
session_info_provider,
)
.await;
},
ApprovalDistributionMessage::DistributeApproval(vote) => {
gum::debug!(
target: LOG_TARGET,
"Distributing our approval vote on candidate (block={}, index={:?})",
vote.block_hash,
vote.candidate_indices,
);
state
.import_and_circulate_approval(
approval_voting_sender,
network_sender,
runtime_api_sender,
metrics,
MessageSource::Local,
vote,
session_info_provider,
)
.await;
},
ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
let sigs = state.get_approval_signatures(indices);
if let Err(_) = tx.send(sigs) {
gum::debug!(
target: LOG_TARGET,
"Sending back approval signatures failed, oneshot got closed"
);
}
},
ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
state.approval_checking_lag = lag;
},
}
}
}
#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
impl<Context> ApprovalDistribution {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "approval-distribution-subsystem", future }
}
}
const fn ensure_size_not_zero(size: usize) -> usize {
if 0 == size {
panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
}
size
}
pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
MAX_NOTIFICATION_SIZE as usize /
std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
3,
);
pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
);
async fn send_assignments_batched_inner(
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
peers: Vec<PeerId>,
peer_version: ValidationVersion,
) {
if peer_version == ValidationVersion::V3 {
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
protocol_v3::ApprovalDistributionMessage::Assignments(
batch.into_iter().collect(),
),
)),
))
.await;
} else {
let batch = batch
.into_iter()
.filter_map(|(cert, candidates)| {
cert.try_into().ok().map(|cert| {
(
cert,
candidates
.first_one()
.map(|index| index as CandidateIndex)
.expect("Assignment was checked for not being empty; qed"),
)
})
})
.collect();
let message = if peer_version == ValidationVersion::V1 {
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(batch),
))
} else {
Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
protocol_v2::ApprovalDistributionMessage::Assignments(batch),
))
};
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(peers, message))
.await;
}
}
pub(crate) async fn send_assignments_batched(
network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
peers: &[(PeerId, ProtocolVersion)],
) {
let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
if !v1_peers.is_empty() || !v2_peers.is_empty() {
let v1_assignments = v2_assignments
.clone()
.into_iter()
.filter(|(_, candidates)| candidates.count_ones() == 1);
let mut v1_batches = v1_assignments.peekable();
while v1_batches.peek().is_some() {
let batch: Vec<_> = v1_batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect();
if !v1_peers.is_empty() {
send_assignments_batched_inner(
network_sender,
batch.clone(),
v1_peers.clone(),
ValidationVersion::V1,
)
.await;
}
if !v2_peers.is_empty() {
send_assignments_batched_inner(
network_sender,
batch,
v2_peers.clone(),
ValidationVersion::V2,
)
.await;
}
}
}
if !v3_peers.is_empty() {
let mut v3 = v2_assignments.into_iter().peekable();
while v3.peek().is_some() {
let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
send_assignments_batched_inner(
network_sender,
batch,
v3_peers.clone(),
ValidationVersion::V3,
)
.await;
}
}
}
pub(crate) async fn send_approvals_batched(
sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
peers: &[(PeerId, ProtocolVersion)],
) {
let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
if !v1_peers.is_empty() || !v2_peers.is_empty() {
let mut batches = approvals
.clone()
.into_iter()
.filter(|approval| approval.candidate_indices.count_ones() == 1)
.filter_map(|val| val.try_into().ok())
.peekable();
while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
if !v1_peers.is_empty() {
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v1_peers.clone(),
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(batch.clone()),
)),
))
.await;
}
if !v2_peers.is_empty() {
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v2_peers.clone(),
Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
protocol_v2::ApprovalDistributionMessage::Approvals(batch),
)),
))
.await;
}
}
}
if !v3_peers.is_empty() {
let mut batches = approvals.into_iter().peekable();
while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v3_peers.clone(),
Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
protocol_v3::ApprovalDistributionMessage::Approvals(batch),
)),
))
.await;
}
}
}