use bitvec::prelude::{BitVec, Lsb0};
use polkadot_node_network_protocol::{
self as net_protocol, filter_by_peer_version,
grid_topology::SessionGridTopology,
peer_set::{ProtocolVersion, ValidationVersion},
request_response::{
incoming::OutgoingResponse,
v2::{AttestedCandidateRequest, AttestedCandidateResponse},
IncomingRequest, IncomingRequestReceiver, Requests,
MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
},
v2::{self as protocol_v2, StatementFilter},
v3 as protocol_v3, IfDisconnected, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::{
SignedFullStatementWithPVD, StatementWithPVD as FullStatementWithPVD,
};
use polkadot_node_subsystem::{
messages::{
network_bridge_event::NewGossipTopology, CandidateBackingMessage, HypotheticalCandidate,
HypotheticalMembershipRequest, NetworkBridgeEvent, NetworkBridgeTxMessage,
ProspectiveParachainsMessage,
},
overseer, ActivatedLeaf,
};
use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{
request_min_backing_votes, request_node_features, ClaimQueueSnapshot,
ProspectiveParachainsMode,
},
};
use polkadot_primitives::{
node_features::FeatureIndex,
vstaging::{transpose_claim_queue, CandidateDescriptorVersion, TransposedClaimQueue},
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, GroupIndex,
GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, SessionIndex, SessionInfo,
SignedStatement, SigningContext, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
};
use sp_keystore::KeystorePtr;
use fatality::Nested;
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
select,
stream::FuturesUnordered,
SinkExt, StreamExt,
};
use std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
time::{Duration, Instant},
};
use crate::{
error::{JfyiError, JfyiErrorResult},
metrics::Metrics,
LOG_TARGET,
};
use candidates::{BadAdvertisement, Candidates, PostConfirmation};
use cluster::{Accept as ClusterAccept, ClusterTracker, RejectIncoming as ClusterRejectIncoming};
use grid::GridTracker;
use groups::Groups;
use requests::{CandidateIdentifier, RequestProperties};
use statement_store::{StatementOrigin, StatementStore};
pub use requests::{RequestManager, ResponseManager, UnhandledResponse};
mod candidates;
mod cluster;
mod grid;
mod groups;
mod requests;
mod statement_store;
#[cfg(test)]
mod tests;
const COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR: Rep =
Rep::CostMinor("Unexpected Statement, not a validator");
const COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND: Rep =
Rep::CostMinor("Unexpected Statement, validator not found");
const COST_UNEXPECTED_STATEMENT_INVALID_SENDER: Rep =
Rep::CostMinor("Unexpected Statement, invalid sender");
const COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE: Rep =
Rep::CostMinor("Unexpected Statement, bad advertise");
const COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED: Rep =
Rep::CostMinor("Unexpected Statement, cluster rejected");
const COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP: Rep =
Rep::CostMinor("Unexpected Statement, not in group");
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator");
const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Manifest, missing knowledge for relay parent");
const COST_UNEXPECTED_MANIFEST_DISALLOWED: Rep =
Rep::CostMinor("Unexpected Manifest, Peer Disallowed");
const COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN: Rep =
Rep::CostMinor("Unexpected Manifest, Peer Unknown");
const COST_CONFLICTING_MANIFEST: Rep = Rep::CostMajor("Manifest conflicts with previous");
const COST_INSUFFICIENT_MANIFEST: Rep =
Rep::CostMajor("Manifest statements insufficient to back candidate");
const COST_MALFORMED_MANIFEST: Rep = Rep::CostMajor("Manifest is malformed");
const COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE: Rep =
Rep::CostMinor("Unexpected acknowledgement, unknown candidate");
const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
const COST_IMPROPERLY_DECODED_RESPONSE: Rep =
Rep::CostMajor("Improperly Encoded Candidate Response");
const COST_INVALID_RESPONSE: Rep = Rep::CostMajor("Invalid Candidate Response");
const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep =
Rep::CostMajor("Un-requested Statement In Response");
const COST_INACCURATE_ADVERTISEMENT: Rep =
Rep::CostMajor("Peer advertised a candidate inaccurately");
const COST_UNSUPPORTED_DESCRIPTOR_VERSION: Rep =
Rep::CostMajor("Candidate Descriptor version is not supported");
const COST_INVALID_CORE_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid core index");
const COST_INVALID_SESSION_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid session index");
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep =
Rep::CostMajor("Attested candidate request bitfields have wrong size");
const COST_UNEXPECTED_REQUEST: Rep = Rep::CostMajor("Unexpected attested candidate request");
const BENEFIT_VALID_RESPONSE: Rep = Rep::BenefitMajor("Peer Answered Candidate Request");
const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep =
Rep::BenefitMajorFirst("Peer was the first to provide a given valid statement");
pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
seconding_limit: usize,
session: SessionIndex,
transposed_cq: TransposedClaimQueue,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
disabled_validators: HashSet<ValidatorIndex>,
}
impl PerRelayParentState {
fn active_validator_state(&self) -> Option<&ActiveValidatorState> {
self.local_validator.as_ref().and_then(|local| local.active.as_ref())
}
fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
self.local_validator.as_mut().and_then(|local| local.active.as_mut())
}
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}
pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
}
}
struct LocalValidatorState {
grid_tracker: GridTracker,
active: Option<ActiveValidatorState>,
}
struct ActiveValidatorState {
index: ValidatorIndex,
group: GroupIndex,
assignments: Vec<ParaId>,
cluster_tracker: ClusterTracker,
}
#[derive(Debug, Copy, Clone)]
enum LocalValidatorIndex {
Active(ValidatorIndex),
Inactive,
}
#[derive(Debug)]
struct PerSessionState {
session_info: SessionInfo,
groups: Groups,
authority_lookup: HashMap<AuthorityDiscoveryId, ValidatorIndex>,
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
allow_v2_descriptors: bool,
}
impl PerSessionState {
fn new(
session_info: SessionInfo,
keystore: &KeystorePtr,
backing_threshold: u32,
allow_v2_descriptors: bool,
) -> Self {
let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold);
let mut authority_lookup = HashMap::new();
for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() {
authority_lookup.insert(ad, ValidatorIndex(i as _));
}
let local_validator = polkadot_node_subsystem_util::signing_key_and_index(
session_info.validators.iter(),
keystore,
)
.map(|(_, index)| LocalValidatorIndex::Active(index));
PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
allow_v2_descriptors,
}
}
fn supply_topology(
&mut self,
topology: &SessionGridTopology,
local_index: Option<ValidatorIndex>,
) {
let grid_view = grid::build_session_topology(
self.session_info.validator_groups.iter(),
topology,
local_index,
);
self.grid_view = Some(grid_view);
if local_index.is_some() {
self.local_validator.get_or_insert(LocalValidatorIndex::Inactive);
}
gum::info!(
target: LOG_TARGET,
index_in_gossip_topology = ?local_index,
index_in_parachain_authorities = ?self.local_validator,
"Node uses the following topology indices"
);
}
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}
fn candidate_receipt_v2_enabled(&self) -> bool {
self.allow_v2_descriptors
}
}
pub(crate) struct State {
implicit_view: ImplicitView,
candidates: Candidates,
per_relay_parent: HashMap<Hash, PerRelayParentState>,
per_session: HashMap<SessionIndex, PerSessionState>,
unused_topologies: HashMap<SessionIndex, NewGossipTopology>,
peers: HashMap<PeerId, PeerState>,
keystore: KeystorePtr,
authorities: HashMap<AuthorityDiscoveryId, PeerId>,
request_manager: RequestManager,
response_manager: ResponseManager,
}
impl State {
pub(crate) fn new(keystore: KeystorePtr) -> Self {
State {
implicit_view: Default::default(),
candidates: Default::default(),
per_relay_parent: HashMap::new(),
per_session: HashMap::new(),
peers: HashMap::new(),
keystore,
authorities: HashMap::new(),
request_manager: RequestManager::new(),
response_manager: ResponseManager::new(),
unused_topologies: HashMap::new(),
}
}
pub(crate) fn request_and_response_managers(
&mut self,
) -> (&mut RequestManager, &mut ResponseManager) {
(&mut self.request_manager, &mut self.response_manager)
}
}
fn connected_validator_peer(
authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
per_session: &PerSessionState,
validator_index: ValidatorIndex,
) -> Option<PeerId> {
per_session
.session_info
.discovery_keys
.get(validator_index.0 as usize)
.and_then(|k| authorities.get(k))
.map(|p| *p)
}
struct PeerState {
view: View,
protocol_version: ValidationVersion,
implicit_view: HashSet<Hash>,
discovery_ids: Option<HashSet<AuthorityDiscoveryId>>,
}
impl PeerState {
fn update_view(&mut self, new_view: View, local_implicit: &ImplicitView) -> Vec<Hash> {
let next_implicit = new_view
.iter()
.flat_map(|x| local_implicit.known_allowed_relay_parents_under(x, None))
.flatten()
.cloned()
.collect::<HashSet<_>>();
let fresh_implicit = next_implicit
.iter()
.filter(|x| !self.implicit_view.contains(x))
.cloned()
.collect();
self.view = new_view;
self.implicit_view = next_implicit;
fresh_implicit
}
fn reconcile_active_leaf(&mut self, leaf_hash: Hash, implicit: &[Hash]) -> Vec<Hash> {
if !self.view.contains(&leaf_hash) {
return Vec::new()
}
let mut v = Vec::with_capacity(implicit.len());
for i in implicit {
if self.implicit_view.insert(*i) {
v.push(*i);
}
}
v
}
fn knows_relay_parent(&self, relay_parent: &Hash) -> bool {
self.implicit_view.contains(relay_parent) || self.view.contains(relay_parent)
}
fn is_authority(&self, authority_id: &AuthorityDiscoveryId) -> bool {
self.discovery_ids.as_ref().map_or(false, |x| x.contains(authority_id))
}
fn iter_known_discovery_ids(&self) -> impl Iterator<Item = &AuthorityDiscoveryId> {
self.discovery_ids.as_ref().into_iter().flatten()
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_network_update<Context>(
ctx: &mut Context,
state: &mut State,
update: NetworkBridgeEvent<net_protocol::StatementDistributionMessage>,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer_id, role, protocol_version, mut authority_ids) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?protocol_version, "Peer connected");
let versioned_protocol = if protocol_version != ValidationVersion::V2.into() &&
protocol_version != ValidationVersion::V3.into()
{
return
} else {
protocol_version.try_into().expect("Qed, we checked above")
};
if let Some(ref mut authority_ids) = authority_ids {
authority_ids.retain(|a| match state.authorities.entry(a.clone()) {
Entry::Vacant(e) => {
e.insert(peer_id);
true
},
Entry::Occupied(e) => {
gum::debug!(
target: LOG_TARGET,
authority_id = ?a,
existing_peer = ?e.get(),
new_peer = ?peer_id,
"Ignoring new peer with duplicate authority ID as a bearer of that identity"
);
false
},
});
}
state.peers.insert(
peer_id,
PeerState {
view: View::default(),
implicit_view: HashSet::new(),
protocol_version: versioned_protocol,
discovery_ids: authority_ids,
},
);
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(p) = state.peers.remove(&peer_id) {
for discovery_key in p.discovery_ids.into_iter().flatten() {
state.authorities.remove(&discovery_key);
}
}
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
let new_session_index = &topology.session;
let new_topology = &topology.topology;
let local_index = &topology.local_index;
if let Some(per_session) = state.per_session.get_mut(new_session_index) {
per_session.supply_topology(new_topology, *local_index);
} else {
state.unused_topologies.insert(*new_session_index, topology);
}
},
NetworkBridgeEvent::PeerMessage(peer_id, message) => match message {
net_protocol::StatementDistributionMessage::V1(_) => return,
net_protocol::StatementDistributionMessage::V2(
protocol_v2::StatementDistributionMessage::V1Compatibility(_),
) |
net_protocol::StatementDistributionMessage::V3(
protocol_v3::StatementDistributionMessage::V1Compatibility(_),
) => return,
net_protocol::StatementDistributionMessage::V2(
protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
) |
net_protocol::StatementDistributionMessage::V3(
protocol_v3::StatementDistributionMessage::Statement(relay_parent, statement),
) =>
handle_incoming_statement(
ctx,
state,
peer_id,
relay_parent,
statement,
reputation,
metrics,
)
.await,
net_protocol::StatementDistributionMessage::V2(
protocol_v2::StatementDistributionMessage::BackedCandidateManifest(inner),
) |
net_protocol::StatementDistributionMessage::V3(
protocol_v3::StatementDistributionMessage::BackedCandidateManifest(inner),
) => handle_incoming_manifest(ctx, state, peer_id, inner, reputation, metrics).await,
net_protocol::StatementDistributionMessage::V2(
protocol_v2::StatementDistributionMessage::BackedCandidateKnown(inner),
) |
net_protocol::StatementDistributionMessage::V3(
protocol_v3::StatementDistributionMessage::BackedCandidateKnown(inner),
) =>
handle_incoming_acknowledgement(ctx, state, peer_id, inner, reputation, metrics)
.await,
},
NetworkBridgeEvent::PeerViewChange(peer_id, view) =>
handle_peer_view_update(ctx, state, peer_id, view, metrics).await,
NetworkBridgeEvent::OurViewChange(_view) => {
},
NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?authority_ids,
"Updated `AuthorityDiscoveryId`s"
);
let peer_state = match state.peers.get_mut(&peer_id) {
None => return,
Some(p) => p,
};
state.authorities.retain(|a, p| p != &peer_id || authority_ids.contains(a));
for a in authority_ids.iter().cloned() {
state.authorities.insert(a, peer_id);
}
peer_state.discovery_ids = Some(authority_ids);
},
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_active_leaves_update<Context>(
ctx: &mut Context,
state: &mut State,
activated: &ActivatedLeaf,
leaf_mode: ProspectiveParachainsMode,
metrics: &Metrics,
) -> JfyiErrorResult<()> {
let max_candidate_depth = match leaf_mode {
ProspectiveParachainsMode::Disabled => return Ok(()),
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
};
let seconding_limit = max_candidate_depth + 1;
state
.implicit_view
.activate_leaf(ctx.sender(), activated.hash)
.await
.map_err(JfyiError::ActivateLeafFailure)?;
let new_relay_parents =
state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
for new_relay_parent in new_relay_parents.iter().cloned() {
let disabled_validators: HashSet<_> =
polkadot_node_subsystem_util::runtime::get_disabled_validators_with_fallback(
ctx.sender(),
new_relay_parent,
)
.await
.map_err(JfyiError::FetchDisabledValidators)?
.into_iter()
.collect();
let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
new_relay_parent,
ctx.sender(),
)
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchSessionIndex)?;
if !state.per_session.contains_key(&session_index) {
let session_info = polkadot_node_subsystem_util::request_session_info(
new_relay_parent,
session_index,
ctx.sender(),
)
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchSessionInfo)?;
let session_info = match session_info {
None => {
gum::warn!(
target: LOG_TARGET,
relay_parent = ?new_relay_parent,
"No session info available for current session"
);
continue
},
Some(s) => s,
};
let minimum_backing_votes =
request_min_backing_votes(new_relay_parent, session_index, ctx.sender()).await?;
let node_features =
request_node_features(new_relay_parent, session_index, ctx.sender()).await?;
let mut per_session_state = PerSessionState::new(
session_info,
&state.keystore,
minimum_backing_votes,
node_features
.unwrap_or(NodeFeatures::EMPTY)
.get(FeatureIndex::CandidateReceiptV2 as usize)
.map(|b| *b)
.unwrap_or(false),
);
if let Some(topology) = state.unused_topologies.remove(&session_index) {
per_session_state.supply_topology(&topology.topology, topology.local_index);
}
state.per_session.insert(session_index, per_session_state);
}
let per_session = state
.per_session
.get_mut(&session_index)
.expect("either existed or just inserted; qed");
if !disabled_validators.is_empty() {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?new_relay_parent,
?session_index,
?disabled_validators,
"Disabled validators detected"
);
}
if state.per_relay_parent.contains_key(&new_relay_parent) {
continue
}
let group_rotation_info =
polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchValidatorGroups)?
.1;
let claim_queue = ClaimQueueSnapshot(
polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchClaimQueue)?,
);
let local_validator = per_session.local_validator.and_then(|v| {
if let LocalValidatorIndex::Active(idx) = v {
find_active_validator_state(
idx,
&per_session.groups,
&group_rotation_info,
&claim_queue,
seconding_limit,
)
} else {
Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
}
});
let groups_per_para = determine_groups_per_para(
per_session.groups.all().len(),
group_rotation_info,
&claim_queue,
)
.await;
let transposed_cq = transpose_claim_queue(claim_queue.0);
state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
seconding_limit,
session: session_index,
groups_per_para,
disabled_validators,
transposed_cq,
},
);
}
gum::debug!(
target: LOG_TARGET,
"Activated leaves. Now tracking {} relay-parents across {} sessions",
state.per_relay_parent.len(),
state.per_session.len(),
);
{
let mut update_peers = Vec::new();
for (peer, peer_state) in state.peers.iter_mut() {
let fresh = peer_state.reconcile_active_leaf(activated.hash, &new_relay_parents);
if !fresh.is_empty() {
update_peers.push((*peer, fresh));
}
}
for (peer, fresh) in update_peers {
for fresh_relay_parent in fresh {
send_peer_messages_for_relay_parent(ctx, state, peer, fresh_relay_parent, metrics)
.await;
}
}
}
new_leaf_fragment_chain_updates(ctx, state, activated.hash).await;
Ok(())
}
fn find_active_validator_state(
validator_index: ValidatorIndex,
groups: &Groups,
group_rotation_info: &GroupRotationInfo,
claim_queue: &ClaimQueueSnapshot,
seconding_limit: usize,
) -> Option<LocalValidatorState> {
if groups.all().is_empty() {
return None
}
let our_group = groups.by_validator_index(validator_index)?;
let core_index = group_rotation_info.core_for_group(our_group, groups.all().len());
let paras_assigned_to_core = claim_queue.iter_claims_for_core(&core_index).copied().collect();
let group_validators = groups.get(our_group)?.to_owned();
Some(LocalValidatorState {
active: Some(ActiveValidatorState {
index: validator_index,
group: our_group,
assignments: paras_assigned_to_core,
cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
.expect("group is non-empty because we are in it; qed"),
}),
grid_tracker: GridTracker::default(),
})
}
pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) {
for leaf in leaves {
let pruned = state.implicit_view.deactivate_leaf(*leaf);
for pruned_rp in pruned {
state
.per_relay_parent
.remove(&pruned_rp)
.as_ref()
.and_then(|pruned| pruned.active_validator_state())
.map(|active_state| {
active_state.cluster_tracker.warn_if_too_many_pending_statements(pruned_rp)
});
state.request_manager.remove_by_relay_parent(*leaf);
}
}
state
.candidates
.on_deactivate_leaves(&leaves, |h| state.per_relay_parent.contains_key(h));
let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect();
state.per_session.retain(|s, _| sessions.contains(s));
let last_session_index = state.unused_topologies.keys().max().copied();
state
.unused_topologies
.retain(|s, _| sessions.contains(s) || last_session_index == Some(*s));
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_peer_view_update<Context>(
ctx: &mut Context,
state: &mut State,
peer: PeerId,
new_view: View,
metrics: &Metrics,
) {
let fresh_implicit = {
let peer_data = match state.peers.get_mut(&peer) {
None => return,
Some(p) => p,
};
peer_data.update_view(new_view, &state.implicit_view)
};
for new_relay_parent in fresh_implicit {
send_peer_messages_for_relay_parent(ctx, state, peer, new_relay_parent, metrics).await;
}
}
fn find_validator_ids<'a>(
known_discovery_ids: impl IntoIterator<Item = &'a AuthorityDiscoveryId>,
discovery_mapping: impl Fn(&AuthorityDiscoveryId) -> Option<&'a ValidatorIndex>,
) -> impl Iterator<Item = ValidatorIndex> {
known_discovery_ids.into_iter().filter_map(discovery_mapping).cloned()
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_peer_messages_for_relay_parent<Context>(
ctx: &mut Context,
state: &mut State,
peer: PeerId,
relay_parent: Hash,
metrics: &Metrics,
) {
let peer_data = match state.peers.get_mut(&peer) {
None => return,
Some(p) => p,
};
let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => return,
Some(s) => s,
};
let per_session_state = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session_state.authority_lookup.get(a)
}) {
if let Some(active) = relay_parent_state
.local_validator
.as_mut()
.and_then(|local| local.active.as_mut())
{
send_pending_cluster_statements(
ctx,
relay_parent,
&(peer, peer_data.protocol_version),
validator_id,
&mut active.cluster_tracker,
&state.candidates,
&relay_parent_state.statement_store,
metrics,
)
.await;
}
send_pending_grid_messages(
ctx,
relay_parent,
&(peer, peer_data.protocol_version),
validator_id,
&per_session_state.groups,
relay_parent_state,
&state.candidates,
metrics,
)
.await;
}
}
fn pending_statement_network_message(
statement_store: &StatementStore,
relay_parent: Hash,
peer: &(PeerId, ValidationVersion),
originator: ValidatorIndex,
compact: CompactStatement,
) -> Option<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> {
match peer.1 {
ValidationVersion::V2 => statement_store
.validator_statement(originator, compact)
.map(|s| s.as_unchecked().clone())
.map(|signed| {
protocol_v2::StatementDistributionMessage::Statement(relay_parent, signed)
})
.map(|msg| (vec![peer.0], Versioned::V2(msg).into())),
ValidationVersion::V3 => statement_store
.validator_statement(originator, compact)
.map(|s| s.as_unchecked().clone())
.map(|signed| {
protocol_v3::StatementDistributionMessage::Statement(relay_parent, signed)
})
.map(|msg| (vec![peer.0], Versioned::V3(msg).into())),
ValidationVersion::V1 => {
gum::error!(
target: LOG_TARGET,
"Bug ValidationVersion::V1 should not be used in statement-distribution v2,
legacy should have handled this"
);
None
},
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_pending_cluster_statements<Context>(
ctx: &mut Context,
relay_parent: Hash,
peer_id: &(PeerId, ValidationVersion),
peer_validator_id: ValidatorIndex,
cluster_tracker: &mut ClusterTracker,
candidates: &Candidates,
statement_store: &StatementStore,
metrics: &Metrics,
) {
let pending_statements = cluster_tracker.pending_statements_for(peer_validator_id);
let network_messages = pending_statements
.into_iter()
.filter_map(|(originator, compact)| {
if !candidates.is_confirmed(compact.candidate_hash()) {
return None
}
let res = pending_statement_network_message(
&statement_store,
relay_parent,
peer_id,
originator,
compact.clone(),
);
if res.is_some() {
cluster_tracker.note_sent(peer_validator_id, originator, compact);
}
res
})
.collect::<Vec<_>>();
if !network_messages.is_empty() {
let count = network_messages.len();
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(network_messages))
.await;
metrics.on_statements_distributed(count);
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_pending_grid_messages<Context>(
ctx: &mut Context,
relay_parent: Hash,
peer_id: &(PeerId, ValidationVersion),
peer_validator_id: ValidatorIndex,
groups: &Groups,
relay_parent_state: &mut PerRelayParentState,
candidates: &Candidates,
metrics: &Metrics,
) {
let pending_manifests = {
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(l) => l,
};
let grid_tracker = &mut local_validator.grid_tracker;
grid_tracker.pending_manifests_for(peer_validator_id)
};
let mut messages: Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> = Vec::new();
let mut statements_count = 0;
for (candidate_hash, kind) in pending_manifests {
let confirmed_candidate = match candidates.get_confirmed(&candidate_hash) {
None => continue, Some(c) => c,
};
let group_index = confirmed_candidate.group_index();
let local_knowledge = {
let group_size = match groups.get(group_index) {
None => return, Some(x) => x.len(),
};
local_knowledge_filter(
group_size,
group_index,
candidate_hash,
&relay_parent_state.statement_store,
)
};
match kind {
grid::ManifestKind::Full => {
let manifest = protocol_v2::BackedCandidateManifest {
relay_parent,
candidate_hash,
group_index,
para_id: confirmed_candidate.para_id(),
parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
statement_knowledge: local_knowledge.clone(),
};
let grid = &mut relay_parent_state
.local_validator
.as_mut()
.expect("determined to be some earlier in this function; qed")
.grid_tracker;
grid.manifest_sent_to(
groups,
peer_validator_id,
candidate_hash,
local_knowledge.clone(),
);
match peer_id.1 {
ValidationVersion::V2 => messages.push((
vec![peer_id.0],
Versioned::V2(
protocol_v2::StatementDistributionMessage::BackedCandidateManifest(
manifest,
),
)
.into(),
)),
ValidationVersion::V3 => messages.push((
vec![peer_id.0],
Versioned::V3(
protocol_v3::StatementDistributionMessage::BackedCandidateManifest(
manifest,
),
)
.into(),
)),
ValidationVersion::V1 => {
gum::error!(
target: LOG_TARGET,
"Bug ValidationVersion::V1 should not be used in statement-distribution v2,
legacy should have handled this"
);
},
};
},
grid::ManifestKind::Acknowledgement => {
let (m, c) = acknowledgement_and_statement_messages(
peer_id,
peer_validator_id,
groups,
relay_parent_state,
relay_parent,
group_index,
candidate_hash,
local_knowledge,
);
messages.extend(m);
statements_count += c;
},
}
}
{
let grid_tracker = &mut relay_parent_state
.local_validator
.as_mut()
.expect("checked earlier; qed")
.grid_tracker;
let pending_statements = grid_tracker.all_pending_statements_for(peer_validator_id);
let extra_statements = pending_statements
.into_iter()
.filter_map(|(originator, compact)| {
let res = pending_statement_network_message(
&relay_parent_state.statement_store,
relay_parent,
peer_id,
originator,
compact.clone(),
);
if res.is_some() {
grid_tracker.sent_or_received_direct_statement(
groups,
originator,
peer_validator_id,
&compact,
false,
);
}
res
})
.collect::<Vec<_>>();
statements_count += extra_statements.len();
messages.extend(extra_statements);
}
if !messages.is_empty() {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
metrics.on_statements_distributed(statements_count);
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn share_local_statement<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
statement: SignedFullStatementWithPVD,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) -> JfyiErrorResult<()> {
let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
None => return Err(JfyiError::InvalidShare),
Some(x) => x,
};
gum::debug!(
target: LOG_TARGET,
statement = ?statement.payload().to_compact(),
"Sharing Statement",
);
let per_session = match state.per_session.get(&per_relay_parent.session) {
Some(s) => s,
None => return Ok(()),
};
let (local_index, local_assignments, local_group) =
match per_relay_parent.active_validator_state() {
None => return Err(JfyiError::InvalidShare),
Some(l) => (l.index, &l.assignments, l.group),
};
let expected = match statement.payload() {
FullStatementWithPVD::Seconded(ref c, _) =>
Some((c.descriptor.para_id(), c.descriptor.relay_parent())),
FullStatementWithPVD::Valid(hash) =>
state.candidates.get_confirmed(&hash).map(|c| (c.para_id(), c.relay_parent())),
};
let is_seconded = match statement.payload() {
FullStatementWithPVD::Seconded(_, _) => true,
FullStatementWithPVD::Valid(_) => false,
};
let (expected_para, expected_relay_parent) = match expected {
None => return Err(JfyiError::InvalidShare),
Some(x) => x,
};
if local_index != statement.validator_index() {
return Err(JfyiError::InvalidShare)
}
if is_seconded &&
per_relay_parent.statement_store.seconded_count(&local_index) ==
per_relay_parent.seconding_limit
{
gum::warn!(
target: LOG_TARGET,
limit = ?per_relay_parent.seconding_limit,
"Local node has issued too many `Seconded` statements",
);
return Err(JfyiError::InvalidShare)
}
if !local_assignments.contains(&expected_para) || relay_parent != expected_relay_parent {
return Err(JfyiError::InvalidShare)
}
let mut post_confirmation = None;
let compact_statement = {
let compact_statement = FullStatementWithPVD::signed_to_compact(statement.clone());
let candidate_hash = CandidateHash(*statement.payload().candidate_hash());
if let FullStatementWithPVD::Seconded(ref c, ref pvd) = statement.payload() {
post_confirmation = state.candidates.confirm_candidate(
candidate_hash,
c.clone(),
pvd.clone(),
local_group,
);
};
match per_relay_parent.statement_store.insert(
&per_session.groups,
compact_statement.clone(),
StatementOrigin::Local,
) {
Ok(false) | Err(_) => {
gum::warn!(
target: LOG_TARGET,
statement = ?compact_statement.payload(),
"Candidate backing issued redundant statement?",
);
return Err(JfyiError::InvalidShare)
},
Ok(true) => {},
}
{
let l = per_relay_parent.active_validator_state_mut().expect("checked above; qed");
l.cluster_tracker.note_issued(local_index, compact_statement.payload().clone());
}
if let Some(ref session_topology) = per_session.grid_view {
let l = per_relay_parent.local_validator.as_mut().expect("checked above; qed");
l.grid_tracker.learned_fresh_statement(
&per_session.groups,
session_topology,
local_index,
&compact_statement.payload(),
);
}
compact_statement
};
circulate_statement(
ctx,
relay_parent,
per_relay_parent,
per_session,
&state.candidates,
&state.authorities,
&state.peers,
compact_statement,
metrics,
)
.await;
if let Some(post_confirmation) = post_confirmation {
apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
}
Ok(())
}
#[derive(Debug)]
enum DirectTargetKind {
Cluster,
Grid,
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn circulate_statement<Context>(
ctx: &mut Context,
relay_parent: Hash,
relay_parent_state: &mut PerRelayParentState,
per_session: &PerSessionState,
candidates: &Candidates,
authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
peers: &HashMap<PeerId, PeerState>,
statement: SignedStatement,
metrics: &Metrics,
) {
let session_info = &per_session.session_info;
let candidate_hash = *statement.payload().candidate_hash();
let compact_statement = statement.payload().clone();
let is_confirmed = candidates.is_confirmed(&candidate_hash);
let originator = statement.validator_index();
let (local_validator, targets) = {
let local_validator = match relay_parent_state.local_validator.as_mut() {
Some(v) => v,
None => return, };
let statement_group = per_session.groups.by_validator_index(originator);
let (cluster_relevant, cluster_targets, all_cluster_targets) = local_validator
.active
.as_mut()
.map(|active| {
let cluster_relevant = Some(active.group) == statement_group;
let cluster_targets = if is_confirmed && cluster_relevant {
Some(
active
.cluster_tracker
.targets()
.iter()
.filter(|&&v| {
active
.cluster_tracker
.can_send(v, originator, compact_statement.clone())
.is_ok()
})
.filter(|&v| v != &active.index)
.map(|v| (*v, DirectTargetKind::Cluster)),
)
} else {
None
};
let all_cluster_targets = active.cluster_tracker.targets();
(cluster_relevant, cluster_targets, all_cluster_targets)
})
.unwrap_or((false, None, &[]));
let grid_targets = local_validator
.grid_tracker
.direct_statement_targets(&per_session.groups, originator, &compact_statement)
.into_iter()
.filter(|v| !cluster_relevant || !all_cluster_targets.contains(v))
.map(|v| (v, DirectTargetKind::Grid));
let targets = cluster_targets
.into_iter()
.flatten()
.chain(grid_targets)
.filter_map(|(v, k)| {
session_info.discovery_keys.get(v.0 as usize).map(|a| (v, a.clone(), k))
})
.collect::<Vec<_>>();
(local_validator, targets)
};
let mut statement_to_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
for (target, authority_id, kind) in targets {
let peer_id: (PeerId, ProtocolVersion) = match authorities.get(&authority_id) {
Some(p) if peers.get(p).map_or(false, |p| p.knows_relay_parent(&relay_parent)) => (
*p,
peers
.get(p)
.expect("Qed, can't fail because it was checked above")
.protocol_version
.into(),
),
None | Some(_) => continue,
};
match kind {
DirectTargetKind::Cluster => {
let active = local_validator
.active
.as_mut()
.expect("cluster target means local is active validator; qed");
if let Ok(()) =
active.cluster_tracker.can_send(target, originator, compact_statement.clone())
{
active.cluster_tracker.note_sent(target, originator, compact_statement.clone());
statement_to_peers.push(peer_id);
}
},
DirectTargetKind::Grid => {
statement_to_peers.push(peer_id);
local_validator.grid_tracker.sent_or_received_direct_statement(
&per_session.groups,
originator,
target,
&compact_statement,
false,
);
},
}
}
let statement_to_v2_peers =
filter_by_peer_version(&statement_to_peers, ValidationVersion::V2.into());
let statement_to_v3_peers =
filter_by_peer_version(&statement_to_peers, ValidationVersion::V3.into());
if !statement_to_v2_peers.is_empty() {
gum::debug!(
target: LOG_TARGET,
?compact_statement,
n_peers = ?statement_to_v2_peers.len(),
"Sending statement to v2 peers",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
statement_to_v2_peers,
Versioned::V2(protocol_v2::StatementDistributionMessage::Statement(
relay_parent,
statement.as_unchecked().clone(),
))
.into(),
))
.await;
metrics.on_statement_distributed();
}
if !statement_to_v3_peers.is_empty() {
gum::debug!(
target: LOG_TARGET,
?compact_statement,
n_peers = ?statement_to_peers.len(),
"Sending statement to v3 peers",
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
statement_to_v3_peers,
Versioned::V3(protocol_v3::StatementDistributionMessage::Statement(
relay_parent,
statement.as_unchecked().clone(),
))
.into(),
))
.await;
metrics.on_statement_distributed();
}
}
fn check_statement_signature(
session_index: SessionIndex,
validators: &IndexedVec<ValidatorIndex, ValidatorId>,
relay_parent: Hash,
statement: UncheckedSignedStatement,
) -> std::result::Result<SignedStatement, UncheckedSignedStatement> {
let signing_context = SigningContext { session_index, parent_hash: relay_parent };
validators
.get(statement.unchecked_validator_index())
.ok_or_else(|| statement.clone())
.and_then(|v| statement.try_into_checked(&signing_context, v))
}
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::StatementDistributionSenderTrait,
peer: PeerId,
rep: Rep,
) {
reputation.modify(sender, peer, rep).await;
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_statement<Context>(
ctx: &mut Context,
state: &mut State,
peer: PeerId,
relay_parent: Hash,
statement: UncheckedSignedStatement,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
let peer_state = match state.peers.get(&peer) {
None => {
return
},
Some(p) => p,
};
let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE,
)
.await;
return
},
Some(p) => p,
};
let per_session = match state.per_session.get(&per_relay_parent.session) {
None => {
gum::warn!(
target: LOG_TARGET,
session = ?per_relay_parent.session,
"Missing expected session info.",
);
return
},
Some(s) => s,
};
let session_info = &per_session.session_info;
if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
return
}
let local_validator = match per_relay_parent.local_validator.as_mut() {
None => {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR,
)
.await;
}
return
},
Some(l) => l,
};
let originator_group =
match per_session.groups.by_validator_index(statement.unchecked_validator_index()) {
Some(g) => g,
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND,
)
.await;
return
},
};
let (active, cluster_sender_index) = {
let active = local_validator.active.as_mut();
let allowed_senders = active
.as_ref()
.map(|active| {
active
.cluster_tracker
.senders_for_originator(statement.unchecked_validator_index())
})
.unwrap_or_default();
let idx = allowed_senders
.iter()
.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (*i, ad)))
.filter(|(_, ad)| peer_state.is_authority(ad))
.map(|(i, _)| i)
.next();
(active, idx)
};
let checked_statement = if let Some((active, cluster_sender_index)) =
active.zip(cluster_sender_index)
{
match handle_cluster_statement(
relay_parent,
&mut active.cluster_tracker,
per_relay_parent.session,
&per_session.session_info,
statement,
cluster_sender_index,
) {
Ok(Some(s)) => s,
Ok(None) => return,
Err(rep) => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return
},
}
} else {
let grid_sender_index = local_validator
.grid_tracker
.direct_statement_providers(
&per_session.groups,
statement.unchecked_validator_index(),
statement.unchecked_payload(),
)
.into_iter()
.filter_map(|(i, validator_knows_statement)| {
session_info
.discovery_keys
.get(i.0 as usize)
.map(|ad| (i, ad, validator_knows_statement))
})
.filter(|(_, ad, _)| peer_state.is_authority(ad))
.map(|(i, _, validator_knows_statement)| (i, validator_knows_statement))
.next();
if let Some((grid_sender_index, validator_knows_statement)) = grid_sender_index {
if !validator_knows_statement {
match handle_grid_statement(
relay_parent,
&mut local_validator.grid_tracker,
per_relay_parent.session,
&per_session,
statement,
grid_sender_index,
) {
Ok(s) => s,
Err(rep) => {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
return
},
}
} else {
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
return;
}
} else {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_STATEMENT_INVALID_SENDER,
)
.await;
return
}
};
let statement = checked_statement.payload().clone();
let originator_index = checked_statement.validator_index();
let candidate_hash = *checked_statement.payload().candidate_hash();
{
let res = state.candidates.insert_unconfirmed(
peer,
candidate_hash,
relay_parent,
originator_group,
None,
);
if let Err(BadAdvertisement) = res {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE,
)
.await;
return
}
}
let confirmed = state.candidates.get_confirmed(&candidate_hash);
let is_confirmed = state.candidates.is_confirmed(&candidate_hash);
if !is_confirmed {
let mut request_entry =
state
.request_manager
.get_or_insert(relay_parent, candidate_hash, originator_group);
request_entry.add_peer(peer);
request_entry.set_cluster_priority();
}
let was_fresh = match per_relay_parent.statement_store.insert(
&per_session.groups,
checked_statement.clone(),
StatementOrigin::Remote,
) {
Err(statement_store::Error::ValidatorUnknown) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?originator_index,
"Error - accepted message from unknown validator."
);
return
},
Ok(known) => known,
};
if was_fresh {
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
let is_importable = state.candidates.is_importable(&candidate_hash);
if let Some(ref session_topology) = per_session.grid_view {
local_validator.grid_tracker.learned_fresh_statement(
&per_session.groups,
session_topology,
originator_index,
&statement,
);
}
if let (true, &Some(confirmed)) = (is_importable, &confirmed) {
send_backing_fresh_statements(
ctx,
candidate_hash,
originator_group,
&relay_parent,
&mut *per_relay_parent,
confirmed,
per_session,
)
.await;
}
circulate_statement(
ctx,
relay_parent,
per_relay_parent,
per_session,
&state.candidates,
&state.authorities,
&state.peers,
checked_statement,
metrics,
)
.await;
} else {
modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
}
}
fn handle_cluster_statement(
relay_parent: Hash,
cluster_tracker: &mut ClusterTracker,
session: SessionIndex,
session_info: &SessionInfo,
statement: UncheckedSignedStatement,
cluster_sender_index: ValidatorIndex,
) -> Result<Option<SignedStatement>, Rep> {
let should_import = {
match cluster_tracker.can_receive(
cluster_sender_index,
statement.unchecked_validator_index(),
statement.unchecked_payload().clone(),
) {
Ok(ClusterAccept::Ok) => true,
Ok(ClusterAccept::WithPrejudice) => false,
Err(ClusterRejectIncoming::ExcessiveSeconded) => return Err(COST_EXCESSIVE_SECONDED),
Err(ClusterRejectIncoming::CandidateUnknown | ClusterRejectIncoming::Duplicate) =>
return Err(COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED),
Err(ClusterRejectIncoming::NotInGroup) => {
return Err(COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP)
},
}
};
let checked_statement =
match check_statement_signature(session, &session_info.validators, relay_parent, statement)
{
Ok(s) => s,
Err(_) => return Err(COST_INVALID_SIGNATURE),
};
cluster_tracker.note_received(
cluster_sender_index,
checked_statement.validator_index(),
checked_statement.payload().clone(),
);
Ok(if should_import { Some(checked_statement) } else { None })
}
fn handle_grid_statement(
relay_parent: Hash,
grid_tracker: &mut GridTracker,
session: SessionIndex,
per_session: &PerSessionState,
statement: UncheckedSignedStatement,
grid_sender_index: ValidatorIndex,
) -> Result<SignedStatement, Rep> {
let checked_statement = match check_statement_signature(
session,
&per_session.session_info.validators,
relay_parent,
statement,
) {
Ok(s) => s,
Err(_) => return Err(COST_INVALID_SIGNATURE),
};
grid_tracker.sent_or_received_direct_statement(
&per_session.groups,
checked_statement.validator_index(),
grid_sender_index,
&checked_statement.payload(),
true,
);
Ok(checked_statement)
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_backing_fresh_statements<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
group_index: GroupIndex,
relay_parent: &Hash,
relay_parent_state: &mut PerRelayParentState,
confirmed: &candidates::ConfirmedCandidate,
per_session: &PerSessionState,
) {
let group_validators = per_session.groups.get(group_index).unwrap_or(&[]);
let mut imported = Vec::new();
for statement in relay_parent_state
.statement_store
.fresh_statements_for_backing(group_validators, candidate_hash)
{
let v = statement.validator_index();
let compact = statement.payload().clone();
imported.push((v, compact));
let carrying_pvd = statement
.clone()
.convert_to_superpayload_with(|statement| match statement {
CompactStatement::Seconded(_) => FullStatementWithPVD::Seconded(
(&**confirmed.candidate_receipt()).clone(),
confirmed.persisted_validation_data().clone(),
),
CompactStatement::Valid(c_hash) => FullStatementWithPVD::Valid(c_hash),
})
.expect("statements refer to same candidate; qed");
ctx.send_message(CandidateBackingMessage::Statement(*relay_parent, carrying_pvd))
.await;
}
for (v, s) in imported {
relay_parent_state.statement_store.note_known_by_backing(v, s);
}
}
fn local_knowledge_filter(
group_size: usize,
group_index: GroupIndex,
candidate_hash: CandidateHash,
statement_store: &StatementStore,
) -> StatementFilter {
let mut f = StatementFilter::blank(group_size);
statement_store.fill_statement_filter(group_index, candidate_hash, &mut f);
f
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn provide_candidate_to_grid<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
relay_parent_state: &mut PerRelayParentState,
confirmed_candidate: &candidates::ConfirmedCandidate,
per_session: &PerSessionState,
authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
peers: &HashMap<PeerId, PeerState>,
metrics: &Metrics,
) {
let local_validator = match relay_parent_state.local_validator {
Some(ref mut v) => v,
None => return,
};
let relay_parent = confirmed_candidate.relay_parent();
let group_index = confirmed_candidate.group_index();
let grid_view = match per_session.grid_view {
Some(ref t) => t,
None => {
gum::debug!(
target: LOG_TARGET,
session = relay_parent_state.session,
"Cannot handle backable candidate due to lack of topology",
);
return
},
};
let group_size = match per_session.groups.get(group_index) {
None => {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
?relay_parent,
?group_index,
session = relay_parent_state.session,
"Handled backed candidate with unknown group?",
);
return
},
Some(g) => g.len(),
};
let filter = local_knowledge_filter(
group_size,
group_index,
candidate_hash,
&relay_parent_state.statement_store,
);
let actions = local_validator.grid_tracker.add_backed_candidate(
grid_view,
candidate_hash,
group_index,
filter.clone(),
);
let manifest = protocol_v2::BackedCandidateManifest {
relay_parent,
candidate_hash,
group_index,
para_id: confirmed_candidate.para_id(),
parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
statement_knowledge: filter.clone(),
};
let acknowledgement = protocol_v2::BackedCandidateAcknowledgement {
candidate_hash,
statement_knowledge: filter.clone(),
};
let mut manifest_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
let mut ack_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
let mut post_statements = Vec::new();
for (v, action) in actions {
let p = match connected_validator_peer(authorities, per_session, v) {
None => continue,
Some(p) =>
if peers.get(&p).map_or(false, |d| d.knows_relay_parent(&relay_parent)) {
(p, peers.get(&p).expect("Qed, was checked above").protocol_version.into())
} else {
continue
},
};
match action {
grid::ManifestKind::Full => manifest_peers.push(p),
grid::ManifestKind::Acknowledgement => ack_peers.push(p),
}
local_validator.grid_tracker.manifest_sent_to(
&per_session.groups,
v,
candidate_hash,
filter.clone(),
);
post_statements.extend(
post_acknowledgement_statement_messages(
v,
relay_parent,
&mut local_validator.grid_tracker,
&relay_parent_state.statement_store,
&per_session.groups,
group_index,
candidate_hash,
&(p.0, p.1.try_into().expect("Qed, can not fail was checked above")),
)
.into_iter()
.map(|m| (vec![p.0], m)),
);
}
let manifest_peers_v2 = filter_by_peer_version(&manifest_peers, ValidationVersion::V2.into());
let manifest_peers_v3 = filter_by_peer_version(&manifest_peers, ValidationVersion::V3.into());
if !manifest_peers_v2.is_empty() {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?per_session.local_validator,
n_peers = manifest_peers_v2.len(),
"Sending manifest to v2 peers"
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
manifest_peers_v2,
Versioned::V2(protocol_v2::StatementDistributionMessage::BackedCandidateManifest(
manifest.clone(),
))
.into(),
))
.await;
}
if !manifest_peers_v3.is_empty() {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?per_session.local_validator,
n_peers = manifest_peers_v3.len(),
"Sending manifest to v3 peers"
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
manifest_peers_v3,
Versioned::V3(protocol_v3::StatementDistributionMessage::BackedCandidateManifest(
manifest,
))
.into(),
))
.await;
}
let ack_peers_v2 = filter_by_peer_version(&ack_peers, ValidationVersion::V2.into());
let ack_peers_v3 = filter_by_peer_version(&ack_peers, ValidationVersion::V3.into());
if !ack_peers_v2.is_empty() {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?per_session.local_validator,
n_peers = ack_peers_v2.len(),
"Sending acknowledgement to v2 peers"
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
ack_peers_v2,
Versioned::V2(protocol_v2::StatementDistributionMessage::BackedCandidateKnown(
acknowledgement.clone(),
))
.into(),
))
.await;
}
if !ack_peers_v3.is_empty() {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
local_validator = ?per_session.local_validator,
n_peers = ack_peers_v3.len(),
"Sending acknowledgement to v3 peers"
);
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
ack_peers_v3,
Versioned::V3(protocol_v3::StatementDistributionMessage::BackedCandidateKnown(
acknowledgement,
))
.into(),
))
.await;
}
if !post_statements.is_empty() {
let count = post_statements.len();
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(post_statements))
.await;
metrics.on_statements_distributed(count);
}
}
async fn determine_groups_per_para(
n_cores: usize,
group_rotation_info: GroupRotationInfo,
claim_queue: &ClaimQueueSnapshot,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let schedule: HashMap<CoreIndex, Vec<ParaId>> = claim_queue
.iter_all_claims()
.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
.collect();
let mut groups_per_para = HashMap::new();
for (core_index, paras) in schedule {
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
for para in paras {
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index);
}
}
groups_per_para
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn fragment_chain_update_inner<Context>(
ctx: &mut Context,
state: &mut State,
active_leaf_hash: Option<Hash>,
required_parent_info: Option<(Hash, ParaId)>,
known_hypotheticals: Option<Vec<HypotheticalCandidate>>,
) {
let hypotheticals = match known_hypotheticals {
None => state.candidates.frontier_hypotheticals(required_parent_info),
Some(h) => h,
};
gum::debug!(
target: LOG_TARGET,
active_leaf_hash = ?active_leaf_hash,
"Calling getHypotheticalMembership from statement distribution for candidates: {:?}",
&hypotheticals.iter().map(|hypo| hypo.candidate_hash()).collect::<Vec<_>>()
);
let candidate_memberships = {
let (tx, rx) = oneshot::channel();
ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
HypotheticalMembershipRequest {
candidates: hypotheticals,
fragment_chain_relay_parent: active_leaf_hash,
},
tx,
))
.await;
match rx.await {
Ok(candidate_memberships) => candidate_memberships,
Err(oneshot::Canceled) => return,
}
};
for (hypo, membership) in candidate_memberships {
if membership.is_empty() {
continue
}
for leaf_hash in membership {
state.candidates.note_importable_under(&hypo, leaf_hash);
}
if let HypotheticalCandidate::Complete {
candidate_hash,
receipt,
persisted_validation_data: _,
} = hypo
{
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor.relay_parent());
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let per_session = state.per_session.get(&prs.session);
let group_index = confirmed.group_index();
let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor.para_id())
else {
continue
};
if !expected_groups.iter().any(|g| *g == group_index) {
continue
}
if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
confirmed.group_index(),
&receipt.descriptor.relay_parent(),
prs,
confirmed,
per_session,
)
.await;
}
}
}
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn new_leaf_fragment_chain_updates<Context>(
ctx: &mut Context,
state: &mut State,
leaf_hash: Hash,
) {
fragment_chain_update_inner(ctx, state, Some(leaf_hash), None, None).await
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn prospective_backed_notification_fragment_chain_updates<Context>(
ctx: &mut Context,
state: &mut State,
para_id: ParaId,
para_head: Hash,
) {
fragment_chain_update_inner(ctx, state, None, Some((para_head, para_id)), None).await
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn new_confirmed_candidate_fragment_chain_updates<Context>(
ctx: &mut Context,
state: &mut State,
candidate: HypotheticalCandidate,
) {
fragment_chain_update_inner(ctx, state, None, None, Some(vec![candidate])).await
}
struct ManifestImportSuccess<'a> {
relay_parent_state: &'a mut PerRelayParentState,
per_session: &'a PerSessionState,
acknowledge: bool,
sender_index: ValidatorIndex,
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_manifest_common<'a, Context>(
ctx: &mut Context,
peer: PeerId,
peers: &HashMap<PeerId, PeerState>,
per_relay_parent: &'a mut HashMap<Hash, PerRelayParentState>,
per_session: &'a HashMap<SessionIndex, PerSessionState>,
candidates: &mut Candidates,
candidate_hash: CandidateHash,
relay_parent: Hash,
para_id: ParaId,
mut manifest_summary: grid::ManifestSummary,
manifest_kind: grid::ManifestKind,
reputation: &mut ReputationAggregator,
) -> Option<ManifestImportSuccess<'a>> {
let peer_state = match peers.get(&peer) {
None => return None,
Some(p) => p,
};
let relay_parent_state = match per_relay_parent.get_mut(&relay_parent) {
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
return None
},
Some(s) => s,
};
let per_session = match per_session.get(&relay_parent_state.session) {
None => return None,
Some(s) => s,
};
if relay_parent_state.local_validator.is_none() {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
}
let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
};
if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
let grid_topology = match per_session.grid_view.as_ref() {
None => return None,
Some(x) => x,
};
let sender_index = grid_topology
.iter_sending_for_group(manifest_summary.claimed_group_index, manifest_kind)
.filter_map(|i| per_session.session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
.filter(|(_, ad)| peer_state.is_authority(ad))
.map(|(i, _)| i)
.next();
let sender_index = match sender_index {
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN,
)
.await;
return None
},
Some(s) => s,
};
let group_index = manifest_summary.claimed_group_index;
let claimed_parent_hash = manifest_summary.claimed_parent_hash;
let group = per_session.groups.get(group_index).unwrap_or(&[]);
let disabled_mask = relay_parent_state.disabled_bitmask(group);
manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");
let acknowledge = match local_validator.grid_tracker.import_manifest(
grid_topology,
&per_session.groups,
candidate_hash,
relay_parent_state.seconding_limit,
manifest_summary,
manifest_kind,
sender_index,
) {
Ok(x) => x,
Err(grid::ManifestImportError::Conflicting) => {
modify_reputation(reputation, ctx.sender(), peer, COST_CONFLICTING_MANIFEST).await;
return None
},
Err(grid::ManifestImportError::Overflow) => {
modify_reputation(reputation, ctx.sender(), peer, COST_EXCESSIVE_SECONDED).await;
return None
},
Err(grid::ManifestImportError::Insufficient) => {
modify_reputation(reputation, ctx.sender(), peer, COST_INSUFFICIENT_MANIFEST).await;
return None
},
Err(grid::ManifestImportError::Malformed) => {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
},
Err(grid::ManifestImportError::Disallowed) => {
modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_MANIFEST_DISALLOWED)
.await;
return None
},
};
if let Err(BadAdvertisement) = candidates.insert_unconfirmed(
peer,
candidate_hash,
relay_parent,
group_index,
Some((claimed_parent_hash, para_id)),
) {
modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
return None
}
if acknowledge {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
from = ?sender_index,
local_index = ?per_session.local_validator,
?manifest_kind,
"immediate ack, known candidate"
);
}
Some(ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index })
}
fn post_acknowledgement_statement_messages(
recipient: ValidatorIndex,
relay_parent: Hash,
grid_tracker: &mut GridTracker,
statement_store: &StatementStore,
groups: &Groups,
group_index: GroupIndex,
candidate_hash: CandidateHash,
peer: &(PeerId, ValidationVersion),
) -> Vec<net_protocol::VersionedValidationProtocol> {
let sending_filter = match grid_tracker.pending_statements_for(recipient, candidate_hash) {
None => return Vec::new(),
Some(f) => f,
};
let mut messages = Vec::new();
for statement in
statement_store.group_statements(groups, group_index, candidate_hash, &sending_filter)
{
grid_tracker.sent_or_received_direct_statement(
groups,
statement.validator_index(),
recipient,
statement.payload(),
false,
);
match peer.1.into() {
ValidationVersion::V2 => messages.push(Versioned::V2(
protocol_v2::StatementDistributionMessage::Statement(
relay_parent,
statement.as_unchecked().clone(),
)
.into(),
)),
ValidationVersion::V3 => messages.push(Versioned::V3(
protocol_v3::StatementDistributionMessage::Statement(
relay_parent,
statement.as_unchecked().clone(),
)
.into(),
)),
ValidationVersion::V1 => {
gum::error!(
target: LOG_TARGET,
"Bug ValidationVersion::V1 should not be used in statement-distribution v2,
legacy should have handled this"
);
},
};
}
messages
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_manifest<Context>(
ctx: &mut Context,
state: &mut State,
peer: PeerId,
manifest: net_protocol::v2::BackedCandidateManifest,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?manifest.candidate_hash,
?peer,
"Received incoming manifest",
);
let x = match handle_incoming_manifest_common(
ctx,
peer,
&state.peers,
&mut state.per_relay_parent,
&state.per_session,
&mut state.candidates,
manifest.candidate_hash,
manifest.relay_parent,
manifest.para_id,
grid::ManifestSummary {
claimed_parent_hash: manifest.parent_head_data_hash,
claimed_group_index: manifest.group_index,
statement_knowledge: manifest.statement_knowledge,
},
grid::ManifestKind::Full,
reputation,
)
.await
{
Some(x) => x,
None => return,
};
let ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index } = x;
if acknowledge {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?manifest.candidate_hash,
"Known candidate - acknowledging manifest",
);
let local_knowledge = {
let group_size = match per_session.groups.get(manifest.group_index) {
None => return, Some(x) => x.len(),
};
local_knowledge_filter(
group_size,
manifest.group_index,
manifest.candidate_hash,
&relay_parent_state.statement_store,
)
};
let (messages, statements_count) = acknowledgement_and_statement_messages(
&(
peer,
state
.peers
.get(&peer)
.map(|val| val.protocol_version)
.unwrap_or(ValidationVersion::V2),
),
sender_index,
&per_session.groups,
relay_parent_state,
manifest.relay_parent,
manifest.group_index,
manifest.candidate_hash,
local_knowledge,
);
if !messages.is_empty() {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
metrics.on_statements_distributed(statements_count);
}
} else if !state.candidates.is_confirmed(&manifest.candidate_hash) {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?manifest.candidate_hash,
"Unknown candidate - requesting",
);
state
.request_manager
.get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index)
.add_peer(peer);
}
}
fn acknowledgement_and_statement_messages(
peer: &(PeerId, ValidationVersion),
validator_index: ValidatorIndex,
groups: &Groups,
relay_parent_state: &mut PerRelayParentState,
relay_parent: Hash,
group_index: GroupIndex,
candidate_hash: CandidateHash,
local_knowledge: StatementFilter,
) -> (Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)>, usize) {
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return (Vec::new(), 0),
Some(l) => l,
};
let acknowledgement = protocol_v2::BackedCandidateAcknowledgement {
candidate_hash,
statement_knowledge: local_knowledge.clone(),
};
let msg_v2 = Versioned::V2(protocol_v2::StatementDistributionMessage::BackedCandidateKnown(
acknowledgement.clone(),
));
let mut messages = match peer.1 {
ValidationVersion::V2 => vec![(vec![peer.0], msg_v2.into())],
ValidationVersion::V3 => vec![(
vec![peer.0],
Versioned::V3(protocol_v2::StatementDistributionMessage::BackedCandidateKnown(
acknowledgement,
))
.into(),
)],
ValidationVersion::V1 => {
gum::error!(
target: LOG_TARGET,
"Bug ValidationVersion::V1 should not be used in statement-distribution v2,
legacy should have handled this"
);
return (Vec::new(), 0)
},
};
local_validator.grid_tracker.manifest_sent_to(
groups,
validator_index,
candidate_hash,
local_knowledge.clone(),
);
let statement_messages = post_acknowledgement_statement_messages(
validator_index,
relay_parent,
&mut local_validator.grid_tracker,
&relay_parent_state.statement_store,
&groups,
group_index,
candidate_hash,
peer,
);
let statements_count = statement_messages.len();
messages.extend(statement_messages.into_iter().map(|m| (vec![peer.0], m)));
(messages, statements_count)
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn handle_incoming_acknowledgement<Context>(
ctx: &mut Context,
state: &mut State,
peer: PeerId,
acknowledgement: net_protocol::v2::BackedCandidateAcknowledgement,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?acknowledgement.candidate_hash,
?peer,
"Received incoming acknowledgement",
);
let candidate_hash = acknowledgement.candidate_hash;
let (relay_parent, parent_head_data_hash, group_index, para_id) = {
match state.candidates.get_confirmed(&candidate_hash) {
Some(c) => (c.relay_parent(), c.parent_head_data_hash(), c.group_index(), c.para_id()),
None => {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE,
)
.await;
return
},
}
};
let x = match handle_incoming_manifest_common(
ctx,
peer,
&state.peers,
&mut state.per_relay_parent,
&state.per_session,
&mut state.candidates,
candidate_hash,
relay_parent,
para_id,
grid::ManifestSummary {
claimed_parent_hash: parent_head_data_hash,
claimed_group_index: group_index,
statement_knowledge: acknowledgement.statement_knowledge,
},
grid::ManifestKind::Acknowledgement,
reputation,
)
.await
{
Some(x) => x,
None => return,
};
let ManifestImportSuccess { relay_parent_state, per_session, sender_index, .. } = x;
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(l) => l,
};
let messages = post_acknowledgement_statement_messages(
sender_index,
relay_parent,
&mut local_validator.grid_tracker,
&relay_parent_state.statement_store,
&per_session.groups,
group_index,
candidate_hash,
&(
peer,
state
.peers
.get(&peer)
.map(|val| val.protocol_version)
.unwrap_or(ValidationVersion::V2),
),
);
if !messages.is_empty() {
let count = messages.len();
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(
messages.into_iter().map(|m| (vec![peer], m)).collect(),
))
.await;
metrics.on_statements_distributed(count);
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_backed_candidate_message<Context>(
ctx: &mut Context,
state: &mut State,
candidate_hash: CandidateHash,
metrics: &Metrics,
) {
let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
None => {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
"Received backed candidate notification for unknown or unconfirmed",
);
return
},
Some(c) => c,
};
let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
None => return,
Some(s) => s,
};
let per_session = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
group_index = ?confirmed.group_index(),
"Candidate Backed - initiating grid distribution & child fetches"
);
provide_candidate_to_grid(
ctx,
candidate_hash,
relay_parent_state,
confirmed,
per_session,
&state.authorities,
&state.peers,
metrics,
)
.await;
prospective_backed_notification_fragment_chain_updates(
ctx,
state,
confirmed.para_id(),
confirmed.candidate_receipt().descriptor.para_head(),
)
.await;
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn send_cluster_candidate_statements<Context>(
ctx: &mut Context,
state: &mut State,
candidate_hash: CandidateHash,
relay_parent: Hash,
metrics: &Metrics,
) {
let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => return,
Some(s) => s,
};
let per_session = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
let local_group = match relay_parent_state.active_validator_state_mut() {
None => return,
Some(v) => v.group,
};
let group_size = match per_session.groups.get(local_group) {
None => return,
Some(g) => g.len(),
};
let statements: Vec<_> = relay_parent_state
.statement_store
.group_statements(
&per_session.groups,
local_group,
candidate_hash,
&StatementFilter::full(group_size),
)
.map(|x| x.clone())
.collect();
for statement in statements {
circulate_statement(
ctx,
relay_parent,
relay_parent_state,
per_session,
&state.candidates,
&state.authorities,
&state.peers,
statement,
metrics,
)
.await;
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
async fn apply_post_confirmation<Context>(
ctx: &mut Context,
state: &mut State,
post_confirmation: PostConfirmation,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
for peer in post_confirmation.reckoning.incorrect {
modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
}
let candidate_hash = post_confirmation.hypothetical.candidate_hash();
state.request_manager.remove_for(candidate_hash);
send_cluster_candidate_statements(
ctx,
state,
candidate_hash,
post_confirmation.hypothetical.relay_parent(),
metrics,
)
.await;
new_confirmed_candidate_fragment_chain_updates(ctx, state, post_confirmation.hypothetical)
.await;
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut State) {
if !state.request_manager.has_pending_requests() {
return
}
let peers = &state.peers;
let peer_advertised = |identifier: &CandidateIdentifier, peer: &_| {
let peer_data = peers.get(peer)?;
let relay_parent_state = state.per_relay_parent.get(&identifier.relay_parent)?;
let per_session = state.per_session.get(&relay_parent_state.session)?;
let local_validator = relay_parent_state.local_validator.as_ref()?;
for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session.authority_lookup.get(a)
}) {
if let Some(active) = local_validator.active.as_ref() {
if active.cluster_tracker.knows_candidate(validator_id, identifier.candidate_hash) {
return Some(StatementFilter::blank(active.cluster_tracker.targets().len()))
}
}
let filter = local_validator
.grid_tracker
.advertised_statements(validator_id, &identifier.candidate_hash);
if let Some(f) = filter {
return Some(f)
}
}
None
};
let request_props = |identifier: &CandidateIdentifier| {
let &CandidateIdentifier { relay_parent, group_index, .. } = identifier;
let relay_parent_state = state.per_relay_parent.get(&relay_parent)?;
let per_session = state.per_session.get(&relay_parent_state.session)?;
let group = per_session.groups.get(group_index)?;
let seconding_limit = relay_parent_state.seconding_limit;
let mut unwanted_mask = StatementFilter::blank(group.len());
for (i, v) in group.iter().enumerate() {
if relay_parent_state.statement_store.seconded_count(v) >= seconding_limit {
unwanted_mask.seconded_in_group.set(i, true);
}
}
let disabled_mask = relay_parent_state.disabled_bitmask(group);
unwanted_mask.seconded_in_group |= &disabled_mask;
unwanted_mask.validated_in_group |= &disabled_mask;
let local_validator = relay_parent_state.local_validator.as_ref()?;
let require_backing = local_validator
.active
.as_ref()
.map_or(true, |active| active.group != group_index);
let backing_threshold = if require_backing {
let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1;
Some(threshold)
} else {
None
};
Some(RequestProperties { unwanted_mask, backing_threshold })
};
while let Some(request) = state.request_manager.next_request(
&mut state.response_manager,
request_props,
peer_advertised,
) {
ctx.send_message(NetworkBridgeTxMessage::SendRequests(
vec![Requests::AttestedCandidateV2(request)],
IfDisconnected::ImmediateError,
))
.await;
}
}
pub(crate) async fn receive_response(response_manager: &mut ResponseManager) -> UnhandledResponse {
match response_manager.incoming().await {
Some(r) => r,
None => futures::future::pending().await,
}
}
pub(crate) async fn next_retry(request_manager: &mut RequestManager) {
match request_manager.next_retry_time() {
Some(instant) =>
futures_timer::Delay::new(instant.saturating_duration_since(Instant::now())).await,
None => futures::future::pending().await,
}
}
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_response<Context>(
ctx: &mut Context,
state: &mut State,
response: UnhandledResponse,
reputation: &mut ReputationAggregator,
metrics: &Metrics,
) {
let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
response.candidate_identifier();
let peer = *response.requested_peer();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?peer,
"Received response",
);
let post_confirmation = {
let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => return,
Some(s) => s,
};
let per_session = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
let group = match per_session.groups.get(group_index) {
None => return,
Some(g) => g,
};
let disabled_mask = relay_parent_state.disabled_bitmask(group);
let res = response.validate_response(
&mut state.request_manager,
group,
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶) else {
return false
};
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
&relay_parent_state.transposed_cq,
per_session.candidate_receipt_v2_enabled(),
);
for (peer, rep) in res.reputation_changes {
modify_reputation(reputation, ctx.sender(), peer, rep).await;
}
let (candidate, pvd, statements) = match res.request_status {
requests::CandidateRequestStatus::Outdated => return,
requests::CandidateRequestStatus::Incomplete => {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
"Response incomplete. Retrying"
);
return
},
requests::CandidateRequestStatus::Complete {
candidate,
persisted_validation_data,
statements,
} => {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
n_statements = statements.len(),
"Successfully received candidate"
);
(candidate, persisted_validation_data, statements)
},
};
for statement in statements {
let _ = relay_parent_state.statement_store.insert(
&per_session.groups,
statement,
StatementOrigin::Remote,
);
}
if let Some(post_confirmation) =
state.candidates.confirm_candidate(candidate_hash, candidate, pvd, group_index)
{
post_confirmation
} else {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
"Candidate re-confirmed by request/response: logic error",
);
return
}
};
apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
let confirmed = state.candidates.get_confirmed(&candidate_hash).expect("just confirmed; qed");
if !confirmed.is_importable(None) {
return
}
let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => return,
Some(s) => s,
};
let per_session = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
&relay_parent,
relay_parent_state,
confirmed,
per_session,
)
.await;
}
pub(crate) fn seconded_and_sufficient(
filter: &StatementFilter,
backing_threshold: Option<usize>,
) -> bool {
backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
}
pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
let ResponderMessage { request, sent_feedback } = message;
let AttestedCandidateRequest { candidate_hash, ref mask } = &request.payload;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
peer = ?request.peer,
"Received request"
);
let _ = sent_feedback.send(());
let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
None => return, Some(c) => c,
};
let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
None => return,
Some(s) => s,
};
let local_validator = match relay_parent_state.local_validator.as_ref() {
None => return,
Some(s) => s,
};
let per_session = match state.per_session.get(&relay_parent_state.session) {
None => return,
Some(s) => s,
};
let peer_data = match state.peers.get(&request.peer) {
None => return,
Some(d) => d,
};
let group_index = confirmed.group_index();
let group = per_session
.groups
.get(group_index)
.expect("group from session's candidate always known; qed");
let group_size = group.len();
if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size {
let _ = request.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_INVALID_REQUEST_BITFIELD_SIZE],
sent_feedback: None,
});
return
}
let (validator_id, is_cluster) = {
let mut validator_id = None;
let mut is_cluster = false;
for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
per_session.authority_lookup.get(a)
}) {
if local_validator
.active
.as_ref()
.map_or(false, |active| active.cluster_tracker.can_request(v, *candidate_hash))
{
validator_id = Some(v);
is_cluster = true;
break
}
if local_validator.grid_tracker.can_request(v, *candidate_hash) {
validator_id = Some(v);
break
}
}
match validator_id {
Some(v) => (v, is_cluster),
None => {
let _ = request.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_UNEXPECTED_REQUEST],
sent_feedback: None,
});
return
},
}
};
let and_mask = StatementFilter {
seconded_in_group: !mask.seconded_in_group.clone(),
validated_in_group: !mask.validated_in_group.clone(),
};
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(s) => s,
};
let mut sent_filter = StatementFilter::blank(group_size);
let statements: Vec<_> = relay_parent_state
.statement_store
.group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask)
.map(|s| {
let s = s.as_unchecked().clone();
let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s };
match s.unchecked_payload() {
CompactStatement::Seconded(_) => {
sent_filter.seconded_in_group.set(i, true);
},
CompactStatement::Valid(_) => {
sent_filter.validated_in_group.set(i, true);
},
}
s
})
.collect();
if !is_cluster {
let threshold = per_session
.groups
.get_size_and_backing_threshold(group_index)
.expect("group existence checked above; qed")
.1;
if !seconded_and_sufficient(&sent_filter, Some(threshold)) {
gum::info!(
target: LOG_TARGET,
?candidate_hash,
relay_parent = ?confirmed.relay_parent(),
?group_index,
"Dropping a request from a grid peer because the backing threshold is no longer met."
);
return
}
}
for statement in &statements {
if is_cluster {
local_validator
.active
.as_mut()
.expect("cluster peer means local is active validator; qed")
.cluster_tracker
.note_sent(
validator_id,
statement.unchecked_validator_index(),
statement.unchecked_payload().clone(),
);
} else {
local_validator.grid_tracker.sent_or_received_direct_statement(
&per_session.groups,
statement.unchecked_validator_index(),
validator_id,
statement.unchecked_payload(),
false,
);
}
}
let response = AttestedCandidateResponse {
candidate_receipt: (&**confirmed.candidate_receipt()).clone(),
persisted_validation_data: confirmed.persisted_validation_data().clone(),
statements,
};
let _ = request.send_response(response);
}
pub(crate) struct ResponderMessage {
request: IncomingRequest<AttestedCandidateRequest>,
sent_feedback: oneshot::Sender<()>,
}
pub(crate) async fn respond_task(
mut receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
mut sender: mpsc::Sender<ResponderMessage>,
metrics: Metrics,
) {
let mut pending_out = FuturesUnordered::new();
let mut active_peers = HashSet::new();
loop {
select! {
request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
let request = match request_result.into_nested() {
Ok(Ok(v)) => v,
Err(fatal) => {
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
return
},
Ok(Err(jfyi)) => {
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
continue
},
};
if active_peers.contains(&request.peer) {
gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request");
metrics.on_request_dropped_peer_rate_limit();
continue
}
if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish");
metrics.on_max_parallel_requests_reached();
let (_, peer) = pending_out.select_next_some().await;
active_peers.remove(&peer);
}
let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
let peer = request.peer;
if let Err(err) = sender
.feed(ResponderMessage { request, sent_feedback: pending_sent_tx })
.await
{
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
return
}
let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
pending_out.push(future_with_peer);
active_peers.insert(peer);
},
result = pending_out.select_next_some() => {
let (_, peer) = result;
active_peers.remove(&peer);
},
}
}
}