use codec::Encode;
use net_protocol::{filter_by_peer_version, peer_set::ProtocolVersion};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{GridNeighbors, RequiredRouting, SessionBoundGridTopologyStorage},
peer_set::{IsAuthority, PeerSet, ValidationVersion},
v1::{self as protocol_v1, StatementMetadata},
v2 as protocol_v2, v3 as protocol_v3, IfDisconnected, PeerId, UnifiedReputationChange as Rep,
Versioned, View,
};
use polkadot_node_primitives::{
SignedFullStatement, Statement, StatementWithPVD, UncheckedSignedFullStatement,
};
use polkadot_node_subsystem_util::{
self as util, rand, reputation::ReputationAggregator, MIN_GOSSIP_PEERS,
};
use polkadot_node_subsystem::{
messages::{CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage},
overseer, ActivatedLeaf, StatementDistributionSenderTrait,
};
use polkadot_primitives::{
vstaging::CommittedCandidateReceiptV2 as CommittedCandidateReceipt, AuthorityDiscoveryId,
CandidateHash, CompactStatement, Hash, Id as ParaId, IndexedVec, OccupiedCoreAssumption,
PersistedValidationData, SignedStatement, SigningContext, UncheckedSignedStatement,
ValidatorId, ValidatorIndex, ValidatorSignature,
};
use futures::{
channel::{mpsc, oneshot},
future::RemoteHandle,
prelude::*,
};
use indexmap::{map::Entry as IEntry, IndexMap};
use rand::Rng;
use sp_keystore::KeystorePtr;
use util::runtime::RuntimeInfo;
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use crate::error::{Error, JfyiError, JfyiErrorResult, Result};
mod requester;
use requester::fetch;
mod responder;
use crate::{metrics::Metrics, LOG_TARGET};
pub use requester::RequesterMessage;
pub use responder::{respond, ResponderMessage};
#[cfg(test)]
mod tests;
const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent");
const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep =
Rep::CostMinor("Unexpected Statement, unknown candidate");
const COST_UNEXPECTED_STATEMENT_REMOTE: Rep =
Rep::CostMinor("Unexpected Statement, remote not allowed");
const COST_FETCH_FAIL: Rep =
Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed");
const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
const COST_WRONG_HASH: Rep = Rep::CostMajor("Received candidate had wrong hash");
const COST_DUPLICATE_STATEMENT: Rep =
Rep::CostMajorRepeated("Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::Malicious("Peer appears to be flooding us with statements");
const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep =
Rep::BenefitMajorFirst("Peer was the first to provide a valid statement");
const BENEFIT_VALID_RESPONSE: Rep =
Rep::BenefitMajor("Peer provided a valid large statement response");
const VC_THRESHOLD: usize = 2;
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;
pub(crate) struct State {
peers: HashMap<PeerId, PeerData>,
topology_storage: SessionBoundGridTopologyStorage,
authorities: HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: RecentOutdatedHeads,
runtime: RuntimeInfo,
}
impl State {
pub(crate) fn new(keystore: KeystorePtr) -> Self {
State {
peers: HashMap::new(),
topology_storage: Default::default(),
authorities: HashMap::new(),
active_heads: HashMap::new(),
recent_outdated_heads: RecentOutdatedHeads::default(),
runtime: RuntimeInfo::new(Some(keystore)),
}
}
pub(crate) fn contains_relay_parent(&self, relay_parent: &Hash) -> bool {
self.active_heads.contains_key(relay_parent)
}
}
#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
}
impl RecentOutdatedHeads {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 10;
self.buf.push_back(hash);
while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}
fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}
#[derive(Default)]
struct VcPerPeerTracker {
local_observed: arrayvec::ArrayVec<CandidateHash, VC_THRESHOLD>,
remote_observed: arrayvec::ArrayVec<CandidateHash, VC_THRESHOLD>,
}
impl VcPerPeerTracker {
fn note_local(&mut self, h: CandidateHash) {
if !note_hash(&mut self.local_observed, h) {
gum::warn!(
target: LOG_TARGET,
"Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring",
VC_THRESHOLD,
);
}
}
fn note_remote(&mut self, h: CandidateHash) -> bool {
note_hash(&mut self.remote_observed, h)
}
fn is_wanted_candidate(&self, h: &CandidateHash) -> bool {
!self.remote_observed.contains(h) && !self.remote_observed.is_full()
}
}
fn note_hash(
observed: &mut arrayvec::ArrayVec<CandidateHash, VC_THRESHOLD>,
h: CandidateHash,
) -> bool {
if observed.contains(&h) {
return true
}
observed.try_push(h).is_ok()
}
#[derive(Default)]
struct PeerRelayParentKnowledge {
sent_candidates: HashSet<CandidateHash>,
received_candidates: HashSet<CandidateHash>,
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
received_statements: HashSet<(CompactStatement, ValidatorIndex)>,
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
received_message_count: HashMap<CandidateHash, usize>,
large_statement_count: usize,
unexpected_count: usize,
}
impl PeerRelayParentKnowledge {
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
debug_assert!(
self.can_send(fingerprint),
"send is only called after `can_send` returns true; qed",
);
let new_known = match fingerprint.0 {
CompactStatement::Seconded(ref h) => {
self.seconded_counts.entry(fingerprint.1).or_default().note_local(*h);
let was_known = self.is_known_candidate(h);
self.sent_candidates.insert(*h);
!was_known
},
CompactStatement::Valid(_) => false,
};
self.sent_statements.insert(fingerprint.clone());
new_known
}
fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
let already_known = self.sent_statements.contains(fingerprint) ||
self.received_statements.contains(fingerprint);
if already_known {
return false
}
match fingerprint.0 {
CompactStatement::Valid(ref h) => {
self.is_known_candidate(h)
},
CompactStatement::Seconded(_) => true,
}
}
fn receive(
&mut self,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> std::result::Result<bool, Rep> {
if self.received_statements.contains(fingerprint) {
return Err(COST_DUPLICATE_STATEMENT)
}
let (candidate_hash, fresh) = match fingerprint.0 {
CompactStatement::Seconded(ref h) => {
let allowed_remote = self
.seconded_counts
.entry(fingerprint.1)
.or_insert_with(Default::default)
.note_remote(*h);
if !allowed_remote {
return Err(COST_UNEXPECTED_STATEMENT_REMOTE)
}
(h, !self.is_known_candidate(h))
},
CompactStatement::Valid(ref h) => {
if !self.is_known_candidate(h) {
return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE)
}
(h, false)
},
};
{
let received_per_candidate =
self.received_message_count.entry(*candidate_hash).or_insert(0);
if *received_per_candidate >= max_message_count {
return Err(COST_APPARENT_FLOOD)
}
*received_per_candidate += 1;
}
self.received_statements.insert(fingerprint.clone());
self.received_candidates.insert(*candidate_hash);
Ok(fresh)
}
fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> {
if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER {
return Err(COST_APPARENT_FLOOD)
}
self.large_statement_count += 1;
Ok(())
}
fn check_can_receive(
&self,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> std::result::Result<(), Rep> {
if self.received_statements.contains(fingerprint) {
return Err(COST_DUPLICATE_STATEMENT)
}
let candidate_hash = match fingerprint.0 {
CompactStatement::Seconded(ref h) => {
let allowed_remote = self
.seconded_counts
.get(&fingerprint.1)
.map_or(true, |r| r.is_wanted_candidate(h));
if !allowed_remote {
return Err(COST_UNEXPECTED_STATEMENT_REMOTE)
}
h
},
CompactStatement::Valid(ref h) => {
if !self.is_known_candidate(&h) {
return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE)
}
h
},
};
let received_per_candidate = self.received_message_count.get(candidate_hash).unwrap_or(&0);
if *received_per_candidate >= max_message_count {
Err(COST_APPARENT_FLOOD)
} else {
Ok(())
}
}
fn is_known_candidate(&self, candidate: &CandidateHash) -> bool {
self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate)
}
}
pub struct PeerData {
view: View,
protocol_version: ValidationVersion,
view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>,
maybe_authority: Option<HashSet<AuthorityDiscoveryId>>,
}
impl PeerData {
fn send(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> bool {
debug_assert!(
self.can_send(relay_parent, fingerprint),
"send is only called after `can_send` returns true; qed",
);
self.view_knowledge
.get_mut(relay_parent)
.expect("send is only called after `can_send` returns true; qed")
.send(fingerprint)
}
fn can_send(
&self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> bool {
self.view_knowledge.get(relay_parent).map_or(false, |k| k.can_send(fingerprint))
}
fn receive(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> std::result::Result<bool, Rep> {
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)?
.receive(fingerprint, max_message_count)
}
fn check_can_receive(
&self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> std::result::Result<(), Rep> {
self.view_knowledge
.get(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)?
.check_can_receive(fingerprint, max_message_count)
}
fn receive_unexpected(&mut self, relay_parent: &Hash) -> usize {
self.view_knowledge
.get_mut(relay_parent)
.map_or(0_usize, |relay_parent_peer_knowledge| {
let old = relay_parent_peer_knowledge.unexpected_count;
relay_parent_peer_knowledge.unexpected_count += 1_usize;
old
})
}
fn receive_large_statement(&mut self, relay_parent: &Hash) -> std::result::Result<(), Rep> {
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)?
.receive_large_statement()
}
}
#[derive(Debug, Copy, Clone)]
struct StoredStatement<'a> {
comparator: &'a StoredStatementComparator,
statement: &'a SignedFullStatement,
}
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
struct StoredStatementComparator {
compact: CompactStatement,
validator_index: ValidatorIndex,
signature: ValidatorSignature,
}
impl<'a> From<(&'a StoredStatementComparator, &'a SignedFullStatement)> for StoredStatement<'a> {
fn from(
(comparator, statement): (&'a StoredStatementComparator, &'a SignedFullStatement),
) -> Self {
Self { comparator, statement }
}
}
impl<'a> StoredStatement<'a> {
fn compact(&self) -> &'a CompactStatement {
&self.comparator.compact
}
fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
(self.comparator.compact.clone(), self.statement.validator_index())
}
}
#[derive(Debug)]
enum NotedStatement<'a> {
NotUseful,
Fresh(StoredStatement<'a>),
UsefulButKnown,
}
enum LargeStatementStatus {
Fetching(FetchingInfo),
FetchedOrShared(CommittedCandidateReceipt),
}
struct FetchingInfo {
available_peers: IndexMap<PeerId, Vec<net_protocol::StatementDistributionMessage>>,
peers_to_try: Vec<PeerId>,
peer_sender: Option<oneshot::Sender<Vec<PeerId>>>,
#[allow(dead_code)]
fetching_task: RemoteHandle<()>,
}
#[derive(Debug, PartialEq, Eq)]
enum DeniedStatement {
NotUseful,
UsefulButKnown,
}
pub(crate) struct ActiveHeadData {
candidates: HashSet<CandidateHash>,
cached_validation_data: HashMap<ParaId, PersistedValidationData>,
statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
validators: IndexedVec<ValidatorIndex, ValidatorId>,
session_index: sp_staking::SessionIndex,
seconded_counts: HashMap<ValidatorIndex, usize>,
}
impl ActiveHeadData {
fn new(
validators: IndexedVec<ValidatorIndex, ValidatorId>,
session_index: sp_staking::SessionIndex,
) -> Self {
ActiveHeadData {
candidates: Default::default(),
cached_validation_data: Default::default(),
statements: Default::default(),
waiting_large_statements: Default::default(),
validators,
session_index,
seconded_counts: Default::default(),
}
}
async fn fetch_persisted_validation_data<Sender>(
&mut self,
sender: &mut Sender,
relay_parent: Hash,
para_id: ParaId,
) -> Result<Option<&PersistedValidationData>>
where
Sender: StatementDistributionSenderTrait,
{
if let Entry::Vacant(entry) = self.cached_validation_data.entry(para_id) {
let persisted_validation_data =
polkadot_node_subsystem_util::request_persisted_validation_data(
relay_parent,
para_id,
OccupiedCoreAssumption::Free,
sender,
)
.await
.await
.map_err(Error::RuntimeApiUnavailable)?
.map_err(|err| Error::FetchPersistedValidationData(para_id, err))?;
match persisted_validation_data {
Some(pvd) => entry.insert(pvd),
None => return Ok(None),
};
}
Ok(self.cached_validation_data.get(¶_id))
}
fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement {
let validator_index = statement.validator_index();
let comparator = StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index,
signature: statement.signature().clone(),
};
match comparator.compact {
CompactStatement::Seconded(h) => {
let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
if *seconded_so_far >= VC_THRESHOLD {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Extra statement is ignored"
);
return NotedStatement::NotUseful
}
self.candidates.insert(h);
if let Some(old) = self.statements.insert(comparator.clone(), statement) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?old,
"Known statement"
);
NotedStatement::UsefulButKnown
} else {
*seconded_so_far += 1;
gum::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.last().expect("Just inserted").1,
"Noted new statement"
);
let key_value = self
.statements
.get_key_value(&comparator)
.expect("Statement was just inserted; qed");
NotedStatement::Fresh(key_value.into())
}
},
CompactStatement::Valid(h) => {
if !self.candidates.contains(&h) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Statement for unknown candidate"
);
return NotedStatement::NotUseful
}
if let Some(old) = self.statements.insert(comparator.clone(), statement) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?old,
"Known statement"
);
NotedStatement::UsefulButKnown
} else {
gum::trace!(
target: LOG_TARGET,
?validator_index,
statement = ?self.statements.last().expect("Just inserted").1,
"Noted new statement"
);
NotedStatement::Fresh(
self.statements
.get_key_value(&comparator)
.expect("Statement was just inserted; qed")
.into(),
)
}
},
}
}
fn check_useful_or_unknown(
&self,
statement: &UncheckedSignedStatement,
) -> std::result::Result<(), DeniedStatement> {
let validator_index = statement.unchecked_validator_index();
let compact = statement.unchecked_payload();
let comparator = StoredStatementComparator {
compact: compact.clone(),
validator_index,
signature: statement.unchecked_signature().clone(),
};
match compact {
CompactStatement::Seconded(_) => {
let seconded_so_far = self.seconded_counts.get(&validator_index).unwrap_or(&0);
if *seconded_so_far >= VC_THRESHOLD {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Extra statement is ignored",
);
return Err(DeniedStatement::NotUseful)
}
if self.statements.contains_key(&comparator) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Known statement",
);
return Err(DeniedStatement::UsefulButKnown)
}
},
CompactStatement::Valid(h) => {
if !self.candidates.contains(&h) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Statement for unknown candidate",
);
return Err(DeniedStatement::NotUseful)
}
if self.statements.contains_key(&comparator) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
?statement,
"Known statement",
);
return Err(DeniedStatement::UsefulButKnown)
}
},
}
Ok(())
}
fn statements(&self) -> impl Iterator<Item = StoredStatement<'_>> + '_ {
self.statements.iter().map(Into::into)
}
fn statements_about(
&self,
candidate_hash: CandidateHash,
) -> impl Iterator<Item = StoredStatement<'_>> + '_ {
self.statements()
.filter(move |s| s.compact().candidate_hash() == &candidate_hash)
}
}
fn check_statement_signature(
head: &ActiveHeadData,
relay_parent: Hash,
statement: UncheckedSignedStatement,
) -> std::result::Result<SignedStatement, UncheckedSignedStatement> {
let signing_context =
SigningContext { session_index: head.session_index, parent_hash: relay_parent };
head.validators
.get(statement.unchecked_validator_index())
.ok_or_else(|| statement.clone())
.and_then(|v| statement.try_into_checked(&signing_context, v))
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn circulate_statement_and_dependents<Context>(
topology_store: &SessionBoundGridTopologyStorage,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut Context,
relay_parent: Hash,
statement: SignedFullStatement,
priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
None => return,
};
let topology = topology_store
.get_topology_or_fallback(active_head.session_index)
.local_grid_neighbors();
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(
RequiredRouting::GridXY,
topology,
peers,
ctx,
relay_parent,
stored,
priority_peers,
metrics,
rng,
)
.await,
)),
_ => None,
}
};
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
)
.await;
}
}
}
}
fn v1_statement_message(
relay_parent: Hash,
statement: SignedFullStatement,
metrics: &Metrics,
) -> protocol_v1::StatementDistributionMessage {
let (is_large, size) = is_statement_large(&statement);
if let Some(size) = size {
metrics.on_created_message(size);
}
if is_large {
protocol_v1::StatementDistributionMessage::LargeStatement(StatementMetadata {
relay_parent,
candidate_hash: statement.payload().candidate_hash(),
signed_by: statement.validator_index(),
signature: statement.signature().clone(),
})
} else {
protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement.into())
}
}
fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option<usize>) {
match &statement.payload() {
Statement::Seconded(committed) => {
let size = statement.as_unchecked().encoded_size();
if committed.commitments.new_validation_code.is_some() {
return (true, Some(size))
}
let threshold =
PeerSet::Validation.get_max_notification_size(IsAuthority::Yes) as usize / 2;
(size >= threshold, Some(size))
},
Statement::Valid(_) => (false, None),
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn circulate_statement<'a, Context>(
required_routing: RequiredRouting,
topology: &GridNeighbors,
peers: &mut HashMap<PeerId, PeerData>,
ctx: &mut Context,
relay_parent: Hash,
stored: StoredStatement<'a>,
mut priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
let mut peers_to_send: Vec<PeerId> = peers
.iter()
.filter_map(
|(peer, data)| {
if data.can_send(&relay_parent, &fingerprint) {
Some(*peer)
} else {
None
}
},
)
.collect();
let good_peers: HashSet<&PeerId> = peers_to_send.iter().collect();
priority_peers.retain(|p| good_peers.contains(p));
let priority_set: HashSet<&PeerId> = priority_peers.iter().collect();
peers_to_send.retain(|p| !priority_set.contains(p));
util::choose_random_subset_with_rng(
|e| topology.route_to_peer(required_routing, e),
&mut peers_to_send,
rng,
MIN_GOSSIP_PEERS,
);
let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS);
let needed_peers = min_size as i64 - priority_peers.len() as i64;
if needed_peers > 0 {
peers_to_send.truncate(needed_peers as usize);
priority_peers.append(&mut peers_to_send);
}
peers_to_send = priority_peers;
debug_assert!(
peers_to_send.len() == peers_to_send.clone().into_iter().collect::<HashSet<_>>().len(),
"We filter out duplicates above. qed.",
);
let (v1_peers_to_send, non_v1_peers_to_send) = peers_to_send
.into_iter()
.map(|peer_id| {
let peer_data =
peers.get_mut(&peer_id).expect("a subset is taken above, so it exists; qed");
let new = peer_data.send(&relay_parent, &fingerprint);
(peer_id, new, peer_data.protocol_version)
})
.partition::<Vec<_>, _>(|(_, _, version)| match version {
ValidationVersion::V1 => true,
ValidationVersion::V2 | ValidationVersion::V3 => false,
}); let payload = v1_statement_message(relay_parent, stored.statement.clone(), metrics);
if !v1_peers_to_send.is_empty() {
gum::trace!(
target: LOG_TARGET,
?v1_peers_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement to v1 peers",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v1_peers_to_send.iter().map(|(p, _, _)| *p).collect(),
compatible_v1_message(ValidationVersion::V1, payload.clone()).into(),
))
.await;
}
let peers_to_send: Vec<(PeerId, ProtocolVersion)> = non_v1_peers_to_send
.iter()
.map(|(p, _, version)| (*p, (*version).into()))
.collect();
let peer_needs_dependent_statement = v1_peers_to_send
.into_iter()
.chain(non_v1_peers_to_send)
.filter_map(|(peer, needs_dependent, _)| if needs_dependent { Some(peer) } else { None })
.collect();
let v2_peers_to_send = filter_by_peer_version(&peers_to_send, ValidationVersion::V2.into());
let v3_to_send = filter_by_peer_version(&peers_to_send, ValidationVersion::V3.into());
if !v2_peers_to_send.is_empty() {
gum::trace!(
target: LOG_TARGET,
?v2_peers_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement to v2 peers",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v2_peers_to_send,
compatible_v1_message(ValidationVersion::V2, payload.clone()).into(),
))
.await;
}
if !v3_to_send.is_empty() {
gum::trace!(
target: LOG_TARGET,
?v3_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement to v3 peers",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
v3_to_send,
compatible_v1_message(ValidationVersion::V3, payload.clone()).into(),
))
.await;
}
peer_needs_dependent_statement
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_statements_about<Context>(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut Context,
relay_parent: Hash,
candidate_hash: CandidateHash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) {
for statement in active_head.statements_about(candidate_hash) {
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue
}
peer_data.send(&relay_parent, &fingerprint);
let payload = v1_statement_message(relay_parent, statement.statement.clone(), metrics);
gum::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
compatible_v1_message(peer_data.protocol_version, payload).into(),
))
.await;
metrics.on_statement_distributed();
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_statements<Context>(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut Context,
relay_parent: Hash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) {
for statement in active_head.statements() {
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue
}
peer_data.send(&relay_parent, &fingerprint);
let payload = v1_statement_message(relay_parent, statement.statement.clone(), metrics);
gum::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
compatible_v1_message(peer_data.protocol_version, payload).into(),
))
.await;
metrics.on_statement_distributed();
}
}
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::StatementDistributionSenderTrait,
peer: PeerId,
rep: Rep,
) {
reputation.modify(sender, peer, rep).await;
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn retrieve_statement_from_message<'a, Context>(
peer: PeerId,
peer_version: ValidationVersion,
message: protocol_v1::StatementDistributionMessage,
active_head: &'a mut ActiveHeadData,
ctx: &mut Context,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
) -> Option<UncheckedSignedFullStatement> {
let fingerprint = message.get_fingerprint();
let candidate_hash = *fingerprint.0.candidate_hash();
let message = if let protocol_v1::StatementDistributionMessage::Statement(h, s) = message {
if let Statement::Seconded(_) = s.unchecked_payload() {
return Some(s)
}
protocol_v1::StatementDistributionMessage::Statement(h, s)
} else {
message
};
match active_head.waiting_large_statements.entry(candidate_hash) {
Entry::Occupied(mut occupied) => {
match occupied.get_mut() {
LargeStatementStatus::Fetching(info) => {
let is_large_statement = message.is_large_statement();
let is_new_peer = match info.available_peers.entry(peer) {
IEntry::Occupied(mut occupied) => {
occupied.get_mut().push(compatible_v1_message(peer_version, message));
false
},
IEntry::Vacant(vacant) => {
vacant.insert(vec![compatible_v1_message(peer_version, message)]);
true
},
};
if is_new_peer & is_large_statement {
info.peers_to_try.push(peer);
if let Some(sender) = info.peer_sender.take() {
let to_send = std::mem::take(&mut info.peers_to_try);
if let Err(peers) = sender.send(to_send) {
info.peers_to_try = peers;
}
}
}
},
LargeStatementStatus::FetchedOrShared(committed) => {
match message {
protocol_v1::StatementDistributionMessage::Statement(_, s) => {
return Some(s)
},
protocol_v1::StatementDistributionMessage::LargeStatement(metadata) =>
return Some(UncheckedSignedFullStatement::new(
Statement::Seconded(committed.clone()),
metadata.signed_by,
metadata.signature.clone(),
)),
}
},
}
},
Entry::Vacant(vacant) => {
match message {
protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => {
if let Some(new_status) = launch_request(
metadata,
peer,
peer_version,
req_sender.clone(),
ctx,
metrics,
)
.await
{
vacant.insert(new_status);
}
},
protocol_v1::StatementDistributionMessage::Statement(_, s) => {
return Some(s)
},
}
},
}
None
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn launch_request<Context>(
meta: StatementMetadata,
peer: PeerId,
peer_version: ValidationVersion,
req_sender: mpsc::Sender<RequesterMessage>,
ctx: &mut Context,
metrics: &Metrics,
) -> Option<LargeStatementStatus> {
let (task, handle) =
fetch(meta.relay_parent, meta.candidate_hash, vec![peer], req_sender, metrics.clone())
.remote_handle();
let result = ctx.spawn("large-statement-fetcher", task.boxed());
if let Err(err) = result {
gum::error!(target: LOG_TARGET, ?err, "Spawning task failed.");
return None
}
let available_peers = {
let mut m = IndexMap::new();
m.insert(
peer,
vec![compatible_v1_message(
peer_version,
protocol_v1::StatementDistributionMessage::LargeStatement(meta),
)],
);
m
};
Some(LargeStatementStatus::Fetching(FetchingInfo {
available_peers,
peers_to_try: Vec::new(),
peer_sender: None,
fetching_task: handle,
}))
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_message_and_circulate<'a, Context, R>(
peer: PeerId,
topology_storage: &SessionBoundGridTopologyStorage,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut Context,
message: net_protocol::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
runtime: &mut RuntimeInfo,
rng: &mut R,
reputation: &mut ReputationAggregator,
) where
R: rand::Rng,
{
let handled_incoming = match peers.get_mut(&peer) {
Some(data) =>
handle_incoming_message(
peer,
data,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
metrics,
reputation,
)
.await,
None => None,
};
if let Some((relay_parent, statement)) = handled_incoming {
let _ = metrics.time_network_bridge_update("circulate_statement");
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await;
let topology = match session_index {
Ok(session_index) =>
topology_storage.get_topology_or_fallback(session_index).local_grid_neighbors(),
Err(e) => {
gum::debug!(
target: LOG_TARGET,
%relay_parent,
"cannot get session index for the specific relay parent: {:?}",
e
);
topology_storage.get_current_topology().local_grid_neighbors()
},
};
let required_routing =
topology.required_routing_by_index(statement.statement.validator_index(), false);
let _ = circulate_statement(
required_routing,
topology,
peers,
ctx,
relay_parent,
statement,
Vec::new(),
metrics,
rng,
)
.await;
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_message<'a, Context>(
peer: PeerId,
peer_data: &mut PeerData,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut Context,
message: net_protocol::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
reputation: &mut ReputationAggregator,
) -> Option<(Hash, StoredStatement<'a>)> {
let _ = metrics.time_network_bridge_update("handle_incoming_message");
let message = match message {
Versioned::V1(m) => m,
Versioned::V2(protocol_v2::StatementDistributionMessage::V1Compatibility(m)) |
Versioned::V3(protocol_v3::StatementDistributionMessage::V1Compatibility(m)) => m,
Versioned::V2(_) | Versioned::V3(_) => {
gum::debug!(
target: LOG_TARGET,
"Legacy statement-distribution code received unintended v2 message"
);
return None
},
};
let relay_parent = message.get_relay_parent();
let active_head = match active_heads.get_mut(&relay_parent) {
Some(h) => h,
None => {
gum::debug!(
target: LOG_TARGET,
%relay_parent,
"our view out-of-sync with active heads; head not found",
);
if !recent_outdated_heads.is_recent_outdated(&relay_parent) {
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await;
}
return None
},
};
if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message {
if let Err(rep) = peer_data.receive_large_statement(&relay_parent) {
gum::debug!(target: LOG_TARGET, ?peer, ?message, ?rep, "Unexpected large statement.",);
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return None
}
}
let fingerprint = message.get_fingerprint();
let candidate_hash = *fingerprint.0.candidate_hash();
let max_message_count = active_head.validators.len() * 2;
if let Err(rep) = peer_data.check_can_receive(&relay_parent, &fingerprint, max_message_count) {
let unexpected_count = peer_data.receive_unexpected(&relay_parent);
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?peer,
?message,
?rep,
?unexpected_count,
"Error inserting received statement"
);
match rep {
COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE => {
metrics.on_unexpected_statement_valid();
if unexpected_count == 0_usize {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
}
},
COST_UNEXPECTED_STATEMENT_REMOTE => {
metrics.on_unexpected_statement_seconded();
modify_reputation(reputation, ctx.sender(), peer, rep).await;
},
_ => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
},
}
return None
}
let checked_compact = {
let (compact, validator_index) = message.get_fingerprint();
let signature = message.get_signature();
let unchecked_compact = UncheckedSignedStatement::new(compact, validator_index, signature);
match active_head.check_useful_or_unknown(&unchecked_compact) {
Ok(()) => {},
Err(DeniedStatement::NotUseful) => return None,
Err(DeniedStatement::UsefulButKnown) => {
peer_data
.receive(&relay_parent, &fingerprint, max_message_count)
.expect("checked in `check_can_receive` above; qed");
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
return None
},
}
match check_statement_signature(&active_head, relay_parent, unchecked_compact) {
Err(statement) => {
gum::debug!(target: LOG_TARGET, ?peer, ?statement, "Invalid statement signature");
modify_reputation(reputation, ctx.sender(), peer, COST_INVALID_SIGNATURE).await;
return None
},
Ok(statement) => statement,
}
};
let is_large_statement = message.is_large_statement();
let statement = retrieve_statement_from_message(
peer,
peer_data.protocol_version,
message,
active_head,
ctx,
req_sender,
metrics,
)
.await?;
let payload = statement.unchecked_into_payload();
let statement: SignedFullStatement = match checked_compact.convert_to_superpayload(payload) {
Err((compact, _)) => {
gum::debug!(
target: LOG_TARGET,
?peer,
?compact,
is_large_statement,
"Full statement had bad payload."
);
modify_reputation(reputation, ctx.sender(), peer, COST_WRONG_HASH).await;
return None
},
Ok(statement) => statement,
};
match peer_data.receive(&relay_parent, &fingerprint, max_message_count) {
Err(_) => {
unreachable!("checked in `check_can_receive` above; qed");
},
Ok(true) => {
gum::trace!(target: LOG_TARGET, ?peer, ?statement, "Statement accepted");
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
)
.await;
},
Ok(false) => {},
}
let pvd = if let Statement::Seconded(receipt) = statement.payload() {
let para_id = receipt.descriptor.para_id();
let result = active_head
.fetch_persisted_validation_data(ctx.sender(), relay_parent, para_id)
.await;
match result {
Ok(Some(pvd)) => Some(pvd.clone()),
Ok(None) | Err(_) => return None,
}
} else {
None
};
let statement_with_pvd = statement
.clone()
.convert_to_superpayload_with(move |statement| match statement {
Statement::Seconded(receipt) => {
let persisted_validation_data = pvd
.expect("PVD is ensured to be `Some` for all `Seconded` messages above; qed");
StatementWithPVD::Seconded(receipt, persisted_validation_data)
},
Statement::Valid(candidate_hash) => StatementWithPVD::Valid(candidate_hash),
})
.expect("payload was checked with conversion from compact; qed");
match active_head.note_statement(statement) {
NotedStatement::NotUseful | NotedStatement::UsefulButKnown => {
unreachable!("checked in `is_useful_or_unknown` above; qed");
},
NotedStatement::Fresh(statement) => {
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
ctx.send_message(CandidateBackingMessage::Statement(relay_parent, statement_with_pvd))
.await;
Some((relay_parent, statement))
},
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn update_peer_view_and_maybe_send_unlocked<Context, R>(
peer: PeerId,
topology: &GridNeighbors,
peer_data: &mut PeerData,
ctx: &mut Context,
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
metrics: &Metrics,
rng: &mut R,
) where
R: rand::Rng,
{
let old_view = std::mem::replace(&mut peer_data.view, new_view);
for removed in old_view.difference(&peer_data.view) {
let _ = peer_data.view_knowledge.remove(removed);
}
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &peer);
let lucky = is_gossip_peer ||
util::gen_ratio_rng(
util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()),
util::MIN_GOSSIP_PEERS,
rng,
);
let new_view = peer_data.view.difference(&old_view).copied().collect::<Vec<_>>();
for new in new_view.iter().copied() {
peer_data.view_knowledge.insert(new, Default::default());
if !lucky {
continue
}
if let Some(active_head) = active_heads.get(&new) {
send_statements(peer, peer_data, ctx, new, active_head, metrics).await;
}
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_network_update<Context, R>(
ctx: &mut Context,
state: &mut State,
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<net_protocol::StatementDistributionMessage>,
rng: &mut R,
metrics: &Metrics,
reputation: &mut ReputationAggregator,
) where
R: rand::Rng,
{
let peers = &mut state.peers;
let topology_storage = &mut state.topology_storage;
let authorities = &mut state.authorities;
let active_heads = &mut state.active_heads;
let recent_outdated_heads = &state.recent_outdated_heads;
let runtime = &mut state.runtime;
match update {
NetworkBridgeEvent::PeerConnected(peer, role, protocol_version, maybe_authority) => {
gum::trace!(target: LOG_TARGET, ?peer, ?role, ?protocol_version, "Peer connected");
let protocol_version = match ValidationVersion::try_from(protocol_version).ok() {
Some(v) => v,
None => {
gum::trace!(
target: LOG_TARGET,
?peer,
?protocol_version,
"unknown protocol version, ignoring"
);
return
},
};
peers.insert(
peer,
PeerData {
view: Default::default(),
protocol_version,
view_knowledge: Default::default(),
maybe_authority: maybe_authority.clone(),
},
);
if let Some(authority_ids) = maybe_authority {
authority_ids.into_iter().for_each(|a| {
authorities.insert(a, peer);
});
}
},
NetworkBridgeEvent::PeerDisconnected(peer) => {
gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected");
if let Some(auth_ids) = peers.remove(&peer).and_then(|p| p.maybe_authority) {
auth_ids.into_iter().for_each(|a| {
authorities.remove(&a);
});
}
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
let _ = metrics.time_network_bridge_update("new_gossip_topology");
let new_session_index = topology.session;
let new_topology = topology.topology;
let old_topology =
topology_storage.get_current_topology().local_grid_neighbors().clone();
topology_storage.update_topology(new_session_index, new_topology, topology.local_index);
let newly_added = topology_storage
.get_current_topology()
.local_grid_neighbors()
.peers_diff(&old_topology);
for peer in newly_added {
if let Some(data) = peers.get_mut(&peer) {
let view = std::mem::take(&mut data.view);
update_peer_view_and_maybe_send_unlocked(
peer,
topology_storage.get_current_topology().local_grid_neighbors(),
data,
ctx,
&*active_heads,
view,
metrics,
rng,
)
.await
}
}
},
NetworkBridgeEvent::PeerMessage(peer, message) => {
handle_incoming_message_and_circulate(
peer,
topology_storage,
peers,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
metrics,
runtime,
rng,
reputation,
)
.await;
},
NetworkBridgeEvent::PeerViewChange(peer, view) => {
let _ = metrics.time_network_bridge_update("peer_view_change");
gum::trace!(target: LOG_TARGET, ?peer, ?view, "Peer view change");
match peers.get_mut(&peer) {
Some(data) =>
update_peer_view_and_maybe_send_unlocked(
peer,
topology_storage.get_current_topology().local_grid_neighbors(),
data,
ctx,
&*active_heads,
view,
metrics,
rng,
)
.await,
None => (),
}
},
NetworkBridgeEvent::OurViewChange(_view) => {
},
NetworkBridgeEvent::UpdatedAuthorityIds(peer, authority_ids) => {
gum::trace!(
target: LOG_TARGET,
?peer,
?authority_ids,
"Updated `AuthorityDiscoveryId`s"
);
topology_storage
.get_current_topology_mut()
.update_authority_ids(peer, &authority_ids);
authorities.retain(|a, p| p != &peer || authority_ids.contains(a));
for a in authority_ids.iter().cloned() {
authorities.insert(a, peer);
}
if let Some(data) = peers.get_mut(&peer) {
data.maybe_authority = Some(authority_ids);
}
},
}
}
pub(crate) async fn handle_responder_message(
state: &mut State,
message: ResponderMessage,
) -> JfyiErrorResult<()> {
let peers = &state.peers;
let active_heads = &mut state.active_heads;
match message {
ResponderMessage::GetData { requesting_peer, relay_parent, candidate_hash, tx } => {
if !requesting_peer_knows_about_candidate(
peers,
&requesting_peer,
&relay_parent,
&candidate_hash,
)? {
return Err(JfyiError::RequestedUnannouncedCandidate(
requesting_peer,
candidate_hash,
))
}
let active_head =
active_heads.get(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?;
let committed = match active_head.waiting_large_statements.get(&candidate_hash) {
Some(LargeStatementStatus::FetchedOrShared(committed)) => committed.clone(),
_ =>
return Err(JfyiError::NoSuchFetchedLargeStatement(relay_parent, candidate_hash)),
};
tx.send(committed).map_err(|_| JfyiError::ResponderGetDataCanceled)?;
},
}
Ok(())
}
#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
pub(crate) async fn handle_requester_message<Context, R: rand::Rng>(
ctx: &mut Context,
state: &mut State,
req_sender: &mpsc::Sender<RequesterMessage>,
rng: &mut R,
message: RequesterMessage,
metrics: &Metrics,
reputation: &mut ReputationAggregator,
) -> JfyiErrorResult<()> {
let topology_storage = &state.topology_storage;
let peers = &mut state.peers;
let active_heads = &mut state.active_heads;
let recent_outdated_heads = &state.recent_outdated_heads;
let runtime = &mut state.runtime;
match message {
RequesterMessage::Finished {
relay_parent,
candidate_hash,
from_peer,
response,
bad_peers,
} => {
for bad in bad_peers {
modify_reputation(reputation, ctx.sender(), bad, COST_FETCH_FAIL).await;
}
modify_reputation(reputation, ctx.sender(), from_peer, BENEFIT_VALID_RESPONSE).await;
let active_head =
active_heads.get_mut(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?;
let status = active_head.waiting_large_statements.remove(&candidate_hash);
let info = match status {
Some(LargeStatementStatus::Fetching(info)) => info,
Some(LargeStatementStatus::FetchedOrShared(_)) => {
return Ok(())
},
None =>
return Err(JfyiError::NoSuchLargeStatementStatus(relay_parent, candidate_hash)),
};
active_head
.waiting_large_statements
.insert(candidate_hash, LargeStatementStatus::FetchedOrShared(response));
for (peer, messages) in info.available_peers {
for message in messages {
handle_incoming_message_and_circulate(
peer,
topology_storage,
peers,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
&metrics,
runtime,
rng,
reputation,
)
.await;
}
}
},
RequesterMessage::SendRequest(req) => {
ctx.send_message(NetworkBridgeTxMessage::SendRequests(
vec![req],
IfDisconnected::ImmediateError,
))
.await;
},
RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx } => {
let active_head =
active_heads.get_mut(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?;
let status = active_head.waiting_large_statements.get_mut(&candidate_hash);
let info = match status {
Some(LargeStatementStatus::Fetching(info)) => info,
Some(LargeStatementStatus::FetchedOrShared(_)) => {
gum::debug!(target: LOG_TARGET, "Zombie task wanted more peers.");
return Ok(())
},
None =>
return Err(JfyiError::NoSuchLargeStatementStatus(relay_parent, candidate_hash)),
};
if info.peers_to_try.is_empty() {
info.peer_sender = Some(tx);
} else {
let peers_to_try = std::mem::take(&mut info.peers_to_try);
if let Err(peers) = tx.send(peers_to_try) {
info.peers_to_try = peers;
}
}
},
RequesterMessage::ReportPeer(peer, rep) =>
modify_reputation(reputation, ctx.sender(), peer, rep).await,
}
Ok(())
}
pub(crate) fn handle_deactivate_leaf(state: &mut State, deactivated: Hash) {
if state.active_heads.remove(&deactivated).is_some() {
gum::trace!(
target: LOG_TARGET,
hash = ?deactivated,
"Deactivating leaf",
);
state.recent_outdated_heads.note_outdated(deactivated);
}
}
#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
pub(crate) async fn handle_activated_leaf<Context>(
ctx: &mut Context,
state: &mut State,
activated: ActivatedLeaf,
) -> Result<()> {
let relay_parent = activated.hash;
gum::trace!(
target: LOG_TARGET,
hash = ?relay_parent,
"New active leaf",
);
let session_index =
state.runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = state
.runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;
let session_info = &info.session_info;
state
.active_heads
.entry(relay_parent)
.or_insert(ActiveHeadData::new(session_info.validators.clone(), session_index));
Ok(())
}
#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
pub(crate) async fn share_local_statement<Context, R: Rng>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
statement: SignedFullStatement,
rng: &mut R,
metrics: &Metrics,
) -> Result<()> {
if is_statement_large(&statement).0 {
if let Statement::Seconded(committed) = &statement.payload() {
let active_head = state
.active_heads
.get_mut(&relay_parent)
.ok_or(JfyiError::NoSuchHead(relay_parent))?;
active_head.waiting_large_statements.insert(
statement.payload().candidate_hash(),
LargeStatementStatus::FetchedOrShared(committed.clone()),
);
}
}
let info = state.runtime.get_session_info(ctx.sender(), relay_parent).await?;
let session_info = &info.session_info;
let validator_info = &info.validator_info;
let group_peers = {
if let Some(our_group) = validator_info.our_group {
let our_group = &session_info
.validator_groups
.get(our_group)
.expect("`our_group` is derived from `validator_groups`; qed");
our_group
.into_iter()
.filter_map(|i| {
if Some(*i) == validator_info.our_index {
return None
}
let authority_id = &session_info.discovery_keys[i.0 as usize];
state.authorities.get(authority_id).map(|p| *p)
})
.collect()
} else {
Vec::new()
}
};
circulate_statement_and_dependents(
&mut state.topology_storage,
&mut state.peers,
&mut state.active_heads,
ctx,
relay_parent,
statement,
group_peers,
metrics,
rng,
)
.await;
Ok(())
}
fn requesting_peer_knows_about_candidate(
peers: &HashMap<PeerId, PeerData>,
requesting_peer: &PeerId,
relay_parent: &Hash,
candidate_hash: &CandidateHash,
) -> JfyiErrorResult<bool> {
let peer_data = peers
.get(requesting_peer)
.ok_or_else(|| JfyiError::NoSuchPeer(*requesting_peer))?;
let knowledge = peer_data
.view_knowledge
.get(relay_parent)
.ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?;
Ok(knowledge.sent_candidates.get(&candidate_hash).is_some())
}
fn compatible_v1_message(
version: ValidationVersion,
message: protocol_v1::StatementDistributionMessage,
) -> net_protocol::StatementDistributionMessage {
match version {
ValidationVersion::V1 => Versioned::V1(message),
ValidationVersion::V2 =>
Versioned::V2(protocol_v2::StatementDistributionMessage::V1Compatibility(message)),
ValidationVersion::V3 =>
Versioned::V3(protocol_v3::StatementDistributionMessage::V1Compatibility(message)),
}
}