1use bitvec::prelude::{BitVec, Lsb0};
21use polkadot_node_network_protocol::{
22 self as net_protocol, filter_by_peer_version,
23 grid_topology::SessionGridTopology,
24 peer_set::{ProtocolVersion, ValidationVersion},
25 request_response::{
26 incoming::OutgoingResponse,
27 v2::{AttestedCandidateRequest, AttestedCandidateResponse},
28 IncomingRequest, IncomingRequestReceiver, Requests,
29 MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
30 },
31 v3::{self as protocol_v3, StatementFilter},
32 IfDisconnected, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
33};
34use polkadot_node_primitives::{
35 SignedFullStatementWithPVD, StatementWithPVD as FullStatementWithPVD,
36};
37use polkadot_node_subsystem::{
38 messages::{
39 network_bridge_event::NewGossipTopology, CandidateBackingMessage, HypotheticalCandidate,
40 HypotheticalMembershipRequest, NetworkBridgeEvent, NetworkBridgeTxMessage,
41 ProspectiveParachainsMessage,
42 },
43 overseer, ActivatedLeaf,
44};
45use polkadot_node_subsystem_util::{
46 backing_implicit_view::View as ImplicitView, reputation::ReputationAggregator,
47 request_min_backing_votes, request_node_features, runtime::ClaimQueueSnapshot,
48};
49use polkadot_primitives::{
50 node_features::FeatureIndex, transpose_claim_queue, AuthorityDiscoveryId,
51 CandidateDescriptorVersion, CandidateHash, CompactStatement, CoreIndex, GroupIndex,
52 GroupRotationInfo, Hash, Id as ParaId, IndexedVec, SessionIndex, SessionInfo, SignedStatement,
53 SigningContext, TransposedClaimQueue, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
54};
55
56use sp_keystore::KeystorePtr;
57
58use fatality::Nested;
59use futures::{
60 channel::{mpsc, oneshot},
61 future::FutureExt,
62 select,
63 stream::FuturesUnordered,
64 SinkExt, StreamExt,
65};
66
67use std::{
68 collections::{
69 hash_map::{Entry, HashMap},
70 HashSet,
71 },
72 time::{Duration, Instant},
73};
74
75use crate::{
76 error::{JfyiError, JfyiErrorResult},
77 metrics::Metrics,
78 LOG_TARGET,
79};
80use candidates::{BadAdvertisement, Candidates, PostConfirmation};
81use cluster::{Accept as ClusterAccept, ClusterTracker, RejectIncoming as ClusterRejectIncoming};
82use grid::GridTracker;
83use groups::Groups;
84use requests::{CandidateIdentifier, RequestProperties};
85use statement_store::{StatementOrigin, StatementStore};
86
87pub use requests::{RequestManager, ResponseManager, UnhandledResponse};
88
89mod candidates;
90mod cluster;
91mod grid;
92mod groups;
93mod requests;
94mod statement_store;
95
96#[cfg(test)]
97mod tests;
98
99const COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR: Rep =
100 Rep::CostMinor("Unexpected Statement, not a validator");
101const COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND: Rep =
102 Rep::CostMinor("Unexpected Statement, validator not found");
103const COST_UNEXPECTED_STATEMENT_INVALID_SENDER: Rep =
104 Rep::CostMinor("Unexpected Statement, invalid sender");
105const COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE: Rep =
106 Rep::CostMinor("Unexpected Statement, bad advertise");
107const COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED: Rep =
108 Rep::CostMinor("Unexpected Statement, cluster rejected");
109const COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP: Rep =
110 Rep::CostMinor("Unexpected Statement, not in group");
111
112const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
113 Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
114const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
115const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator");
116
117const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
118 Rep::CostMinor("Unexpected Manifest, missing knowledge for relay parent");
119const COST_UNEXPECTED_MANIFEST_DISALLOWED: Rep =
120 Rep::CostMinor("Unexpected Manifest, Peer Disallowed");
121const COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN: Rep =
122 Rep::CostMinor("Unexpected Manifest, Peer Unknown");
123const COST_CONFLICTING_MANIFEST: Rep = Rep::CostMajor("Manifest conflicts with previous");
124const COST_INSUFFICIENT_MANIFEST: Rep =
125 Rep::CostMajor("Manifest statements insufficient to back candidate");
126const COST_MALFORMED_MANIFEST: Rep = Rep::CostMajor("Manifest is malformed");
127const COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE: Rep =
128 Rep::CostMinor("Unexpected acknowledgement, unknown candidate");
129
130const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
131const COST_IMPROPERLY_DECODED_RESPONSE: Rep =
132 Rep::CostMajor("Improperly Encoded Candidate Response");
133const COST_INVALID_RESPONSE: Rep = Rep::CostMajor("Invalid Candidate Response");
134const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep =
135 Rep::CostMajor("Un-requested Statement In Response");
136const COST_INACCURATE_ADVERTISEMENT: Rep =
137 Rep::CostMajor("Peer advertised a candidate inaccurately");
138const COST_UNSUPPORTED_DESCRIPTOR_VERSION: Rep =
139 Rep::CostMajor("Candidate Descriptor version is not supported");
140const COST_INVALID_UMP_SIGNALS: Rep =
141 Rep::CostMajor("Candidate Descriptor contains invalid ump signals");
142const COST_INVALID_SESSION_INDEX: Rep =
143 Rep::CostMajor("Candidate Descriptor contains an invalid session index");
144
145const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
146const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep =
147 Rep::CostMajor("Attested candidate request bitfields have wrong size");
148const COST_UNEXPECTED_REQUEST: Rep = Rep::CostMajor("Unexpected attested candidate request");
149
150const BENEFIT_VALID_RESPONSE: Rep = Rep::BenefitMajor("Peer Answered Candidate Request");
151const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement");
152const BENEFIT_VALID_STATEMENT_FIRST: Rep =
153 Rep::BenefitMajorFirst("Peer was the first to provide a given valid statement");
154
155pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
157
158struct PerRelayParentState {
159 local_validator: Option<LocalValidatorState>,
160 statement_store: StatementStore,
161 session: SessionIndex,
162 transposed_cq: TransposedClaimQueue,
163 groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
164 disabled_validators: HashSet<ValidatorIndex>,
165 assignments_per_group: HashMap<GroupIndex, Vec<ParaId>>,
166}
167
168impl PerRelayParentState {
169 fn active_validator_state(&self) -> Option<&ActiveValidatorState> {
170 self.local_validator.as_ref().and_then(|local| local.active.as_ref())
171 }
172
173 fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
174 self.local_validator.as_mut().and_then(|local| local.active.as_mut())
175 }
176
177 pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
179 self.disabled_validators.contains(validator_index)
180 }
181
182 pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
185 BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
186 }
187}
188
189struct LocalValidatorState {
191 grid_tracker: GridTracker,
193 active: Option<ActiveValidatorState>,
195 }
198
199struct ActiveValidatorState {
200 index: ValidatorIndex,
202 group: GroupIndex,
204 assignments: Vec<ParaId>,
206 cluster_tracker: ClusterTracker,
208}
209
210#[derive(Debug, Copy, Clone)]
211enum LocalValidatorIndex {
212 Active(ValidatorIndex),
214 Inactive,
216}
217
218#[derive(Debug)]
219struct PerSessionState {
220 session_info: SessionInfo,
221 groups: Groups,
222 authority_lookup: HashMap<AuthorityDiscoveryId, ValidatorIndex>,
223 grid_view: Option<grid::SessionTopologyView>,
226 local_validator: Option<LocalValidatorIndex>,
227 allow_v2_descriptors: bool,
229}
230
231impl PerSessionState {
232 fn new(
233 session_info: SessionInfo,
234 keystore: &KeystorePtr,
235 backing_threshold: u32,
236 allow_v2_descriptors: bool,
237 ) -> Self {
238 let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold);
239 let mut authority_lookup = HashMap::new();
240 for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() {
241 authority_lookup.insert(ad, ValidatorIndex(i as _));
242 }
243
244 let local_validator = polkadot_node_subsystem_util::signing_key_and_index(
245 session_info.validators.iter(),
246 keystore,
247 )
248 .map(|(_, index)| LocalValidatorIndex::Active(index));
249
250 PerSessionState {
251 session_info,
252 groups,
253 authority_lookup,
254 grid_view: None,
255 local_validator,
256 allow_v2_descriptors,
257 }
258 }
259
260 fn supply_topology(
261 &mut self,
262 topology: &SessionGridTopology,
263 local_index: Option<ValidatorIndex>,
264 ) {
265 let grid_view = grid::build_session_topology(
269 self.session_info.validator_groups.iter(),
270 topology,
271 local_index,
272 );
273
274 self.grid_view = Some(grid_view);
275 if local_index.is_some() {
276 self.local_validator.get_or_insert(LocalValidatorIndex::Inactive);
277 }
278
279 gum::info!(
280 target: LOG_TARGET,
281 index_in_gossip_topology = ?local_index,
282 index_in_parachain_authorities = ?self.local_validator,
283 "Node uses the following topology indices"
284 );
285 }
286
287 fn is_not_validator(&self) -> bool {
291 self.grid_view.is_some() && self.local_validator.is_none()
292 }
293
294 fn candidate_receipt_v2_enabled(&self) -> bool {
296 self.allow_v2_descriptors
297 }
298}
299
300pub(crate) struct State {
301 implicit_view: ImplicitView,
303 candidates: Candidates,
304 per_relay_parent: HashMap<Hash, PerRelayParentState>,
305 per_session: HashMap<SessionIndex, PerSessionState>,
306 unused_topologies: HashMap<SessionIndex, NewGossipTopology>,
310 peers: HashMap<PeerId, PeerState>,
311 keystore: KeystorePtr,
312 authorities: HashMap<AuthorityDiscoveryId, PeerId>,
313 request_manager: RequestManager,
314 response_manager: ResponseManager,
315}
316
317impl State {
318 pub(crate) fn new(keystore: KeystorePtr) -> Self {
320 State {
321 implicit_view: Default::default(),
322 candidates: Default::default(),
323 per_relay_parent: HashMap::new(),
324 per_session: HashMap::new(),
325 peers: HashMap::new(),
326 keystore,
327 authorities: HashMap::new(),
328 request_manager: RequestManager::new(),
329 response_manager: ResponseManager::new(),
330 unused_topologies: HashMap::new(),
331 }
332 }
333
334 pub(crate) fn request_and_response_managers(
335 &mut self,
336 ) -> (&mut RequestManager, &mut ResponseManager) {
337 (&mut self.request_manager, &mut self.response_manager)
338 }
339}
340
341fn connected_validator_peer(
344 authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
345 per_session: &PerSessionState,
346 validator_index: ValidatorIndex,
347) -> Option<PeerId> {
348 per_session
349 .session_info
350 .discovery_keys
351 .get(validator_index.0 as usize)
352 .and_then(|k| authorities.get(k))
353 .map(|p| *p)
354}
355
356struct PeerState {
357 view: View,
358 protocol_version: ValidationVersion,
359 implicit_view: HashSet<Hash>,
360 discovery_ids: Option<HashSet<AuthorityDiscoveryId>>,
361}
362
363impl PeerState {
364 fn update_view(&mut self, new_view: View, local_implicit: &ImplicitView) -> Vec<Hash> {
367 let next_implicit = new_view
368 .iter()
369 .flat_map(|x| local_implicit.known_allowed_relay_parents_under(x, None))
370 .flatten()
371 .cloned()
372 .collect::<HashSet<_>>();
373
374 let fresh_implicit = next_implicit
375 .iter()
376 .filter(|x| !self.implicit_view.contains(x))
377 .cloned()
378 .collect();
379
380 self.view = new_view;
381 self.implicit_view = next_implicit;
382
383 fresh_implicit
384 }
385
386 fn reconcile_active_leaf(&mut self, leaf_hash: Hash, implicit: &[Hash]) -> Vec<Hash> {
389 if !self.view.contains(&leaf_hash) {
390 return Vec::new()
391 }
392
393 let mut v = Vec::with_capacity(implicit.len());
394 for i in implicit {
395 if self.implicit_view.insert(*i) {
396 v.push(*i);
397 }
398 }
399 v
400 }
401
402 fn knows_relay_parent(&self, relay_parent: &Hash) -> bool {
408 self.implicit_view.contains(relay_parent) || self.view.contains(relay_parent)
409 }
410
411 fn is_authority(&self, authority_id: &AuthorityDiscoveryId) -> bool {
412 self.discovery_ids.as_ref().map_or(false, |x| x.contains(authority_id))
413 }
414
415 fn iter_known_discovery_ids(&self) -> impl Iterator<Item = &AuthorityDiscoveryId> {
416 self.discovery_ids.as_ref().into_iter().flatten()
417 }
418}
419
420#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
421pub(crate) async fn handle_network_update<Context>(
422 ctx: &mut Context,
423 state: &mut State,
424 update: NetworkBridgeEvent<net_protocol::StatementDistributionMessage>,
425 reputation: &mut ReputationAggregator,
426 metrics: &Metrics,
427) {
428 match update {
429 NetworkBridgeEvent::PeerConnected(peer_id, role, protocol_version, mut authority_ids) => {
430 gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?protocol_version, "Peer connected");
431
432 let versioned_protocol = if protocol_version != ValidationVersion::V3.into() {
433 return
434 } else {
435 protocol_version.try_into().expect("Qed, we checked above")
436 };
437
438 if let Some(ref mut authority_ids) = authority_ids {
439 authority_ids.retain(|a| match state.authorities.entry(a.clone()) {
440 Entry::Vacant(e) => {
441 e.insert(peer_id);
442 true
443 },
444 Entry::Occupied(e) => {
445 gum::debug!(
446 target: LOG_TARGET,
447 authority_id = ?a,
448 existing_peer = ?e.get(),
449 new_peer = ?peer_id,
450 "Ignoring new peer with duplicate authority ID as a bearer of that identity"
451 );
452
453 false
454 },
455 });
456 }
457
458 state.peers.insert(
459 peer_id,
460 PeerState {
461 view: View::default(),
462 implicit_view: HashSet::new(),
463 protocol_version: versioned_protocol,
464 discovery_ids: authority_ids,
465 },
466 );
467 },
468 NetworkBridgeEvent::PeerDisconnected(peer_id) => {
469 if let Some(p) = state.peers.remove(&peer_id) {
470 for discovery_key in p.discovery_ids.into_iter().flatten() {
471 state.authorities.remove(&discovery_key);
472 }
473 }
474 },
475 NetworkBridgeEvent::NewGossipTopology(topology) => {
476 let new_session_index = &topology.session;
477 let new_topology = &topology.topology;
478 let local_index = &topology.local_index;
479
480 if let Some(per_session) = state.per_session.get_mut(new_session_index) {
481 per_session.supply_topology(new_topology, *local_index);
482 } else {
483 state.unused_topologies.insert(*new_session_index, topology);
484 }
485
486 },
494 NetworkBridgeEvent::PeerMessage(peer_id, message) => match message {
495 net_protocol::StatementDistributionMessage::V3(
496 protocol_v3::StatementDistributionMessage::Statement(relay_parent, statement),
497 ) =>
498 handle_incoming_statement(
499 ctx,
500 state,
501 peer_id,
502 relay_parent,
503 statement,
504 reputation,
505 metrics,
506 )
507 .await,
508 net_protocol::StatementDistributionMessage::V3(
509 protocol_v3::StatementDistributionMessage::BackedCandidateManifest(inner),
510 ) => handle_incoming_manifest(ctx, state, peer_id, inner, reputation, metrics).await,
511 net_protocol::StatementDistributionMessage::V3(
512 protocol_v3::StatementDistributionMessage::BackedCandidateKnown(inner),
513 ) =>
514 handle_incoming_acknowledgement(ctx, state, peer_id, inner, reputation, metrics)
515 .await,
516 },
517 NetworkBridgeEvent::PeerViewChange(peer_id, view) =>
518 handle_peer_view_update(ctx, state, peer_id, view, metrics).await,
519 NetworkBridgeEvent::OurViewChange(_view) => {
520 },
522 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
523 gum::trace!(
524 target: LOG_TARGET,
525 ?peer_id,
526 ?authority_ids,
527 "Updated `AuthorityDiscoveryId`s"
528 );
529
530 let peer_state = match state.peers.get_mut(&peer_id) {
532 None => return,
533 Some(p) => p,
534 };
535
536 state.authorities.retain(|a, p| p != &peer_id || authority_ids.contains(a));
539
540 for a in authority_ids.iter().cloned() {
542 state.authorities.insert(a, peer_id);
543 }
544
545 peer_state.discovery_ids = Some(authority_ids);
546 },
547 }
548}
549
550#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
551async fn handle_active_leaf_update<Context>(
552 ctx: &mut Context,
553 state: &mut State,
554 new_relay_parent: Hash,
555) -> JfyiErrorResult<()> {
556 let disabled_validators: HashSet<_> =
557 polkadot_node_subsystem_util::request_disabled_validators(new_relay_parent, ctx.sender())
558 .await
559 .await
560 .map_err(JfyiError::RuntimeApiUnavailable)?
561 .map_err(JfyiError::FetchDisabledValidators)?
562 .into_iter()
563 .collect();
564
565 let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
566 new_relay_parent,
567 ctx.sender(),
568 )
569 .await
570 .await
571 .map_err(JfyiError::RuntimeApiUnavailable)?
572 .map_err(JfyiError::FetchSessionIndex)?;
573
574 if !state.per_session.contains_key(&session_index) {
575 let session_info = polkadot_node_subsystem_util::request_session_info(
576 new_relay_parent,
577 session_index,
578 ctx.sender(),
579 )
580 .await
581 .await
582 .map_err(JfyiError::RuntimeApiUnavailable)?
583 .map_err(JfyiError::FetchSessionInfo)?;
584
585 let session_info = match session_info {
586 None => {
587 gum::warn!(
588 target: LOG_TARGET,
589 relay_parent = ?new_relay_parent,
590 "No session info available for current session"
591 );
592
593 return Ok(())
594 },
595 Some(s) => s,
596 };
597
598 let minimum_backing_votes =
599 request_min_backing_votes(new_relay_parent, session_index, ctx.sender())
600 .await
601 .await
602 .map_err(JfyiError::RuntimeApiUnavailable)?
603 .map_err(JfyiError::FetchMinimumBackingVotes)?;
604 let node_features = request_node_features(new_relay_parent, session_index, ctx.sender())
605 .await
606 .await
607 .map_err(JfyiError::RuntimeApiUnavailable)?
608 .map_err(JfyiError::FetchNodeFeatures)?;
609 let mut per_session_state = PerSessionState::new(
610 session_info,
611 &state.keystore,
612 minimum_backing_votes,
613 node_features
614 .get(FeatureIndex::CandidateReceiptV2 as usize)
615 .map(|b| *b)
616 .unwrap_or(false),
617 );
618 if let Some(topology) = state.unused_topologies.remove(&session_index) {
619 per_session_state.supply_topology(&topology.topology, topology.local_index);
620 }
621 state.per_session.insert(session_index, per_session_state);
622 }
623
624 let per_session = state
625 .per_session
626 .get_mut(&session_index)
627 .expect("either existed or just inserted; qed");
628
629 if !disabled_validators.is_empty() {
630 gum::debug!(
631 target: LOG_TARGET,
632 relay_parent = ?new_relay_parent,
633 ?session_index,
634 ?disabled_validators,
635 "Disabled validators detected"
636 );
637 }
638
639 let group_rotation_info =
640 polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
641 .await
642 .await
643 .map_err(JfyiError::RuntimeApiUnavailable)?
644 .map_err(JfyiError::FetchValidatorGroups)?
645 .1;
646
647 let claim_queue = ClaimQueueSnapshot(
648 polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender())
649 .await
650 .await
651 .map_err(JfyiError::RuntimeApiUnavailable)?
652 .map_err(JfyiError::FetchClaimQueue)?,
653 );
654
655 let (groups_per_para, assignments_per_group) = determine_group_assignments(
656 per_session.groups.all().len(),
657 group_rotation_info,
658 &claim_queue,
659 )
660 .await;
661
662 let local_validator = per_session.local_validator.and_then(|v| {
663 if let LocalValidatorIndex::Active(idx) = v {
664 find_active_validator_state(idx, &per_session.groups, &assignments_per_group)
665 } else {
666 Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
667 }
668 });
669
670 let transposed_cq = transpose_claim_queue(claim_queue.0);
671
672 state.per_relay_parent.insert(
673 new_relay_parent,
674 PerRelayParentState {
675 local_validator,
676 statement_store: StatementStore::new(&per_session.groups),
677 session: session_index,
678 groups_per_para,
679 disabled_validators,
680 transposed_cq,
681 assignments_per_group,
682 },
683 );
684 Ok(())
685}
686
687#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
689pub(crate) async fn handle_active_leaves_update<Context>(
690 ctx: &mut Context,
691 state: &mut State,
692 activated: &ActivatedLeaf,
693 metrics: &Metrics,
694) -> JfyiErrorResult<()> {
695 state
696 .implicit_view
697 .activate_leaf(ctx.sender(), activated.hash)
698 .await
699 .map_err(JfyiError::ActivateLeafFailure)?;
700
701 let new_relay_parents =
702 state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
703
704 for new_relay_parent in new_relay_parents.iter().cloned() {
705 if state.per_relay_parent.contains_key(&new_relay_parent) {
706 continue
707 }
708
709 if let Err(err) = handle_active_leaf_update(ctx, state, new_relay_parent).await {
710 gum::warn!(
711 target: LOG_TARGET,
712 relay_parent = ?new_relay_parent,
713 error = ?err,
714 "Failed to handle active leaf update"
715 );
716 continue
717 }
718 }
719
720 gum::debug!(
721 target: LOG_TARGET,
722 "Activated leaves. Now tracking {} relay-parents across {} sessions",
723 state.per_relay_parent.len(),
724 state.per_session.len(),
725 );
726
727 {
731 let mut update_peers = Vec::new();
732 for (peer, peer_state) in state.peers.iter_mut() {
733 let fresh = peer_state.reconcile_active_leaf(activated.hash, &new_relay_parents);
734 if !fresh.is_empty() {
735 update_peers.push((*peer, fresh));
736 }
737 }
738
739 for (peer, fresh) in update_peers {
740 for fresh_relay_parent in fresh {
741 send_peer_messages_for_relay_parent(ctx, state, peer, fresh_relay_parent, metrics)
742 .await;
743 }
744 }
745 }
746
747 new_leaf_fragment_chain_updates(ctx, state, activated.hash).await;
748
749 Ok(())
750}
751
752fn find_active_validator_state(
753 validator_index: ValidatorIndex,
754 groups: &Groups,
755 assignments_per_group: &HashMap<GroupIndex, Vec<ParaId>>,
756) -> Option<LocalValidatorState> {
757 if groups.all().is_empty() {
758 return None
759 }
760
761 let our_group = groups.by_validator_index(validator_index)?;
762
763 let group_validators = groups.get(our_group)?.to_owned();
764 let paras_assigned_to_core = assignments_per_group.get(&our_group).cloned().unwrap_or_default();
765 let seconding_limit = paras_assigned_to_core.len();
766
767 Some(LocalValidatorState {
768 active: Some(ActiveValidatorState {
769 index: validator_index,
770 group: our_group,
771 cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
772 .expect("group is non-empty because we are in it; qed"),
773 assignments: paras_assigned_to_core.clone(),
774 }),
775 grid_tracker: GridTracker::default(),
776 })
777}
778
779pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) {
780 for leaf in leaves {
782 let pruned = state.implicit_view.deactivate_leaf(*leaf);
783 for pruned_rp in pruned {
784 state
786 .per_relay_parent
787 .remove(&pruned_rp)
788 .as_ref()
789 .and_then(|pruned| pruned.active_validator_state())
790 .map(|active_state| {
791 active_state.cluster_tracker.warn_if_too_many_pending_statements(pruned_rp)
792 });
793
794 state.request_manager.remove_by_relay_parent(*leaf);
796 }
797 }
798
799 state
800 .candidates
801 .on_deactivate_leaves(&leaves, |h| state.per_relay_parent.contains_key(h));
802
803 let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect();
805 state.per_session.retain(|s, _| sessions.contains(s));
806
807 let last_session_index = state.unused_topologies.keys().max().copied();
808 state
814 .unused_topologies
815 .retain(|s, _| sessions.contains(s) || last_session_index == Some(*s));
816}
817
818#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
819async fn handle_peer_view_update<Context>(
820 ctx: &mut Context,
821 state: &mut State,
822 peer: PeerId,
823 new_view: View,
824 metrics: &Metrics,
825) {
826 let fresh_implicit = {
827 let peer_data = match state.peers.get_mut(&peer) {
828 None => return,
829 Some(p) => p,
830 };
831
832 peer_data.update_view(new_view, &state.implicit_view)
833 };
834
835 for new_relay_parent in fresh_implicit {
836 send_peer_messages_for_relay_parent(ctx, state, peer, new_relay_parent, metrics).await;
837 }
838}
839
840fn find_validator_ids<'a>(
843 known_discovery_ids: impl IntoIterator<Item = &'a AuthorityDiscoveryId>,
844 discovery_mapping: impl Fn(&AuthorityDiscoveryId) -> Option<&'a ValidatorIndex>,
845) -> impl Iterator<Item = ValidatorIndex> {
846 known_discovery_ids.into_iter().filter_map(discovery_mapping).cloned()
847}
848
849#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
862async fn send_peer_messages_for_relay_parent<Context>(
863 ctx: &mut Context,
864 state: &mut State,
865 peer: PeerId,
866 relay_parent: Hash,
867 metrics: &Metrics,
868) {
869 let peer_data = match state.peers.get_mut(&peer) {
870 None => return,
871 Some(p) => p,
872 };
873
874 let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
875 None => return,
876 Some(s) => s,
877 };
878
879 let per_session_state = match state.per_session.get(&relay_parent_state.session) {
880 None => return,
881 Some(s) => s,
882 };
883
884 for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
885 per_session_state.authority_lookup.get(a)
886 }) {
887 if let Some(active) = relay_parent_state
888 .local_validator
889 .as_mut()
890 .and_then(|local| local.active.as_mut())
891 {
892 send_pending_cluster_statements(
893 ctx,
894 relay_parent,
895 &(peer, peer_data.protocol_version),
896 validator_id,
897 &mut active.cluster_tracker,
898 &state.candidates,
899 &relay_parent_state.statement_store,
900 metrics,
901 )
902 .await;
903 }
904
905 send_pending_grid_messages(
906 ctx,
907 relay_parent,
908 &(peer, peer_data.protocol_version),
909 validator_id,
910 &per_session_state.groups,
911 relay_parent_state,
912 &state.candidates,
913 metrics,
914 )
915 .await;
916 }
917}
918
919fn pending_statement_network_message(
920 statement_store: &StatementStore,
921 relay_parent: Hash,
922 peer: &(PeerId, ValidationVersion),
923 originator: ValidatorIndex,
924 compact: CompactStatement,
925) -> Option<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> {
926 match peer.1 {
927 ValidationVersion::V3 => statement_store
928 .validator_statement(originator, compact)
929 .map(|s| s.as_unchecked().clone())
930 .map(|signed| {
931 protocol_v3::StatementDistributionMessage::Statement(relay_parent, signed)
932 })
933 .map(|msg| (vec![peer.0], ValidationProtocols::V3(msg).into())),
934 }
935}
936
937#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
939async fn send_pending_cluster_statements<Context>(
940 ctx: &mut Context,
941 relay_parent: Hash,
942 peer_id: &(PeerId, ValidationVersion),
943 peer_validator_id: ValidatorIndex,
944 cluster_tracker: &mut ClusterTracker,
945 candidates: &Candidates,
946 statement_store: &StatementStore,
947 metrics: &Metrics,
948) {
949 let pending_statements = cluster_tracker.pending_statements_for(peer_validator_id);
950 let network_messages = pending_statements
951 .into_iter()
952 .filter_map(|(originator, compact)| {
953 if !candidates.is_confirmed(compact.candidate_hash()) {
954 return None
955 }
956
957 let res = pending_statement_network_message(
958 &statement_store,
959 relay_parent,
960 peer_id,
961 originator,
962 compact.clone(),
963 );
964
965 if res.is_some() {
966 cluster_tracker.note_sent(peer_validator_id, originator, compact);
967 }
968
969 res
970 })
971 .collect::<Vec<_>>();
972
973 if !network_messages.is_empty() {
974 let count = network_messages.len();
975 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(network_messages))
976 .await;
977 metrics.on_statements_distributed(count);
978 }
979}
980
981#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
984async fn send_pending_grid_messages<Context>(
985 ctx: &mut Context,
986 relay_parent: Hash,
987 peer_id: &(PeerId, ValidationVersion),
988 peer_validator_id: ValidatorIndex,
989 groups: &Groups,
990 relay_parent_state: &mut PerRelayParentState,
991 candidates: &Candidates,
992 metrics: &Metrics,
993) {
994 let pending_manifests = {
995 let local_validator = match relay_parent_state.local_validator.as_mut() {
996 None => return,
997 Some(l) => l,
998 };
999
1000 let grid_tracker = &mut local_validator.grid_tracker;
1001 grid_tracker.pending_manifests_for(peer_validator_id)
1002 };
1003
1004 let mut messages: Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> = Vec::new();
1005 let mut statements_count = 0;
1006 for (candidate_hash, kind) in pending_manifests {
1007 let confirmed_candidate = match candidates.get_confirmed(&candidate_hash) {
1008 None => continue, Some(c) => c,
1010 };
1011
1012 let group_index = confirmed_candidate.group_index();
1013
1014 let local_knowledge = {
1015 let group_size = match groups.get(group_index) {
1016 None => return, Some(x) => x.len(),
1018 };
1019
1020 local_knowledge_filter(
1021 group_size,
1022 group_index,
1023 candidate_hash,
1024 &relay_parent_state.statement_store,
1025 )
1026 };
1027
1028 match kind {
1029 grid::ManifestKind::Full => {
1030 let manifest = protocol_v3::BackedCandidateManifest {
1031 relay_parent,
1032 candidate_hash,
1033 group_index,
1034 para_id: confirmed_candidate.para_id(),
1035 parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
1036 statement_knowledge: local_knowledge.clone(),
1037 };
1038
1039 let grid = &mut relay_parent_state
1040 .local_validator
1041 .as_mut()
1042 .expect("determined to be some earlier in this function; qed")
1043 .grid_tracker;
1044
1045 grid.manifest_sent_to(
1046 groups,
1047 peer_validator_id,
1048 candidate_hash,
1049 local_knowledge.clone(),
1050 );
1051 match peer_id.1 {
1052 ValidationVersion::V3 => messages.push((
1053 vec![peer_id.0],
1054 ValidationProtocols::V3(
1055 protocol_v3::StatementDistributionMessage::BackedCandidateManifest(
1056 manifest,
1057 ),
1058 )
1059 .into(),
1060 )),
1061 };
1062 },
1063 grid::ManifestKind::Acknowledgement => {
1064 let (m, c) = acknowledgement_and_statement_messages(
1065 peer_id,
1066 peer_validator_id,
1067 groups,
1068 relay_parent_state,
1069 relay_parent,
1070 group_index,
1071 candidate_hash,
1072 local_knowledge,
1073 );
1074 messages.extend(m);
1075 statements_count += c;
1076 },
1077 }
1078 }
1079
1080 {
1086 let grid_tracker = &mut relay_parent_state
1087 .local_validator
1088 .as_mut()
1089 .expect("checked earlier; qed")
1090 .grid_tracker;
1091
1092 let pending_statements = grid_tracker.all_pending_statements_for(peer_validator_id);
1093
1094 let extra_statements = pending_statements
1095 .into_iter()
1096 .filter_map(|(originator, compact)| {
1097 let res = pending_statement_network_message(
1098 &relay_parent_state.statement_store,
1099 relay_parent,
1100 peer_id,
1101 originator,
1102 compact.clone(),
1103 );
1104
1105 if res.is_some() {
1106 grid_tracker.sent_or_received_direct_statement(
1107 groups,
1108 originator,
1109 peer_validator_id,
1110 &compact,
1111 false,
1112 );
1113 }
1114
1115 res
1116 })
1117 .collect::<Vec<_>>();
1118
1119 statements_count += extra_statements.len();
1120 messages.extend(extra_statements);
1121 }
1122
1123 if !messages.is_empty() {
1124 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
1125 metrics.on_statements_distributed(statements_count);
1126 }
1127}
1128
1129#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1131pub(crate) async fn share_local_statement<Context>(
1132 ctx: &mut Context,
1133 state: &mut State,
1134 relay_parent: Hash,
1135 statement: SignedFullStatementWithPVD,
1136 reputation: &mut ReputationAggregator,
1137 metrics: &Metrics,
1138) -> JfyiErrorResult<()> {
1139 let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1140 None => return Err(JfyiError::InvalidShare),
1141 Some(x) => x,
1142 };
1143
1144 gum::debug!(
1145 target: LOG_TARGET,
1146 statement = ?statement.payload().to_compact(),
1147 "Sharing Statement",
1148 );
1149
1150 let per_session = match state.per_session.get(&per_relay_parent.session) {
1151 Some(s) => s,
1152 None => return Ok(()),
1153 };
1154
1155 let (local_index, local_assignments, local_group) =
1156 match per_relay_parent.active_validator_state() {
1157 None => return Err(JfyiError::InvalidShare),
1158 Some(l) => (l.index, &l.assignments, l.group),
1159 };
1160
1161 let expected = match statement.payload() {
1164 FullStatementWithPVD::Seconded(ref c, _) =>
1165 Some((c.descriptor.para_id(), c.descriptor.relay_parent())),
1166 FullStatementWithPVD::Valid(hash) =>
1167 state.candidates.get_confirmed(&hash).map(|c| (c.para_id(), c.relay_parent())),
1168 };
1169
1170 let is_seconded = match statement.payload() {
1171 FullStatementWithPVD::Seconded(_, _) => true,
1172 FullStatementWithPVD::Valid(_) => false,
1173 };
1174
1175 let (expected_para, expected_relay_parent) = match expected {
1176 None => return Err(JfyiError::InvalidShare),
1177 Some(x) => x,
1178 };
1179
1180 if local_index != statement.validator_index() {
1181 return Err(JfyiError::InvalidShare)
1182 }
1183
1184 let seconding_limit = local_assignments.len();
1185
1186 if is_seconded &&
1187 per_relay_parent.statement_store.seconded_count(&local_index) >= seconding_limit
1188 {
1189 gum::warn!(
1190 target: LOG_TARGET,
1191 limit = ?seconding_limit,
1192 "Local node has issued too many `Seconded` statements",
1193 );
1194 return Err(JfyiError::InvalidShare)
1195 }
1196
1197 if !local_assignments.contains(&expected_para) || relay_parent != expected_relay_parent {
1198 return Err(JfyiError::InvalidShare)
1199 }
1200
1201 let mut post_confirmation = None;
1202
1203 let compact_statement = {
1205 let compact_statement = FullStatementWithPVD::signed_to_compact(statement.clone());
1206 let candidate_hash = CandidateHash(*statement.payload().candidate_hash());
1207
1208 if let FullStatementWithPVD::Seconded(ref c, ref pvd) = statement.payload() {
1209 post_confirmation = state.candidates.confirm_candidate(
1210 candidate_hash,
1211 c.clone(),
1212 pvd.clone(),
1213 local_group,
1214 );
1215 };
1216
1217 match per_relay_parent.statement_store.insert(
1218 &per_session.groups,
1219 compact_statement.clone(),
1220 StatementOrigin::Local,
1221 ) {
1222 Ok(false) | Err(_) => {
1223 gum::warn!(
1224 target: LOG_TARGET,
1225 statement = ?compact_statement.payload(),
1226 "Candidate backing issued redundant statement?",
1227 );
1228 return Err(JfyiError::InvalidShare)
1229 },
1230 Ok(true) => {},
1231 }
1232
1233 {
1234 let l = per_relay_parent.active_validator_state_mut().expect("checked above; qed");
1235 l.cluster_tracker.note_issued(local_index, compact_statement.payload().clone());
1236 }
1237
1238 if let Some(ref session_topology) = per_session.grid_view {
1239 let l = per_relay_parent.local_validator.as_mut().expect("checked above; qed");
1240 l.grid_tracker.learned_fresh_statement(
1241 &per_session.groups,
1242 session_topology,
1243 local_index,
1244 &compact_statement.payload(),
1245 );
1246 }
1247
1248 compact_statement
1249 };
1250
1251 circulate_statement(
1253 ctx,
1254 relay_parent,
1255 per_relay_parent,
1256 per_session,
1257 &state.candidates,
1258 &state.authorities,
1259 &state.peers,
1260 compact_statement,
1261 metrics,
1262 )
1263 .await;
1264
1265 if let Some(post_confirmation) = post_confirmation {
1266 apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
1267 }
1268
1269 Ok(())
1270}
1271
1272#[derive(Debug)]
1275enum DirectTargetKind {
1276 Cluster,
1277 Grid,
1278}
1279
1280#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1294async fn circulate_statement<Context>(
1295 ctx: &mut Context,
1296 relay_parent: Hash,
1297 relay_parent_state: &mut PerRelayParentState,
1298 per_session: &PerSessionState,
1299 candidates: &Candidates,
1300 authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
1301 peers: &HashMap<PeerId, PeerState>,
1302 statement: SignedStatement,
1303 metrics: &Metrics,
1304) {
1305 let session_info = &per_session.session_info;
1306
1307 let candidate_hash = *statement.payload().candidate_hash();
1308
1309 let compact_statement = statement.payload().clone();
1310 let is_confirmed = candidates.is_confirmed(&candidate_hash);
1311
1312 let originator = statement.validator_index();
1313 let (local_validator, targets) = {
1314 let local_validator = match relay_parent_state.local_validator.as_mut() {
1315 Some(v) => v,
1316 None => return, };
1318
1319 let statement_group = per_session.groups.by_validator_index(originator);
1320
1321 let (cluster_relevant, cluster_targets, all_cluster_targets) = local_validator
1326 .active
1327 .as_mut()
1328 .map(|active| {
1329 let cluster_relevant = Some(active.group) == statement_group;
1330 let cluster_targets = if is_confirmed && cluster_relevant {
1331 Some(
1332 active
1333 .cluster_tracker
1334 .targets()
1335 .iter()
1336 .filter(|&&v| {
1337 active
1338 .cluster_tracker
1339 .can_send(v, originator, compact_statement.clone())
1340 .is_ok()
1341 })
1342 .filter(|&v| v != &active.index)
1343 .map(|v| (*v, DirectTargetKind::Cluster)),
1344 )
1345 } else {
1346 None
1347 };
1348 let all_cluster_targets = active.cluster_tracker.targets();
1349 (cluster_relevant, cluster_targets, all_cluster_targets)
1350 })
1351 .unwrap_or((false, None, &[]));
1352
1353 let grid_targets = local_validator
1354 .grid_tracker
1355 .direct_statement_targets(&per_session.groups, originator, &compact_statement)
1356 .into_iter()
1357 .filter(|v| !cluster_relevant || !all_cluster_targets.contains(v))
1358 .map(|v| (v, DirectTargetKind::Grid));
1359
1360 let targets = cluster_targets
1361 .into_iter()
1362 .flatten()
1363 .chain(grid_targets)
1364 .filter_map(|(v, k)| {
1365 session_info.discovery_keys.get(v.0 as usize).map(|a| (v, a.clone(), k))
1366 })
1367 .collect::<Vec<_>>();
1368
1369 (local_validator, targets)
1370 };
1371
1372 let mut statement_to_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1373 for (target, authority_id, kind) in targets {
1374 let peer_id: (PeerId, ProtocolVersion) = match authorities.get(&authority_id) {
1376 Some(p) if peers.get(p).map_or(false, |p| p.knows_relay_parent(&relay_parent)) => (
1377 *p,
1378 peers
1379 .get(p)
1380 .expect("Qed, can't fail because it was checked above")
1381 .protocol_version
1382 .into(),
1383 ),
1384 None | Some(_) => continue,
1385 };
1386
1387 match kind {
1388 DirectTargetKind::Cluster => {
1389 let active = local_validator
1390 .active
1391 .as_mut()
1392 .expect("cluster target means local is active validator; qed");
1393
1394 if let Ok(()) =
1397 active.cluster_tracker.can_send(target, originator, compact_statement.clone())
1398 {
1399 active.cluster_tracker.note_sent(target, originator, compact_statement.clone());
1400 statement_to_peers.push(peer_id);
1401 }
1402 },
1403 DirectTargetKind::Grid => {
1404 statement_to_peers.push(peer_id);
1405 local_validator.grid_tracker.sent_or_received_direct_statement(
1406 &per_session.groups,
1407 originator,
1408 target,
1409 &compact_statement,
1410 false,
1411 );
1412 },
1413 }
1414 }
1415
1416 let statement_to_v3_peers =
1417 filter_by_peer_version(&statement_to_peers, ValidationVersion::V3.into());
1418
1419 if !statement_to_v3_peers.is_empty() {
1421 gum::debug!(
1422 target: LOG_TARGET,
1423 ?compact_statement,
1424 n_peers = ?statement_to_peers.len(),
1425 "Sending statement to v3 peers",
1426 );
1427
1428 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
1429 statement_to_v3_peers,
1430 ValidationProtocols::V3(protocol_v3::StatementDistributionMessage::Statement(
1431 relay_parent,
1432 statement.as_unchecked().clone(),
1433 ))
1434 .into(),
1435 ))
1436 .await;
1437 metrics.on_statement_distributed();
1438 }
1439}
1440fn check_statement_signature(
1442 session_index: SessionIndex,
1443 validators: &IndexedVec<ValidatorIndex, ValidatorId>,
1444 relay_parent: Hash,
1445 statement: UncheckedSignedStatement,
1446) -> std::result::Result<SignedStatement, UncheckedSignedStatement> {
1447 let signing_context = SigningContext { session_index, parent_hash: relay_parent };
1448
1449 validators
1450 .get(statement.unchecked_validator_index())
1451 .ok_or_else(|| statement.clone())
1452 .and_then(|v| statement.try_into_checked(&signing_context, v))
1453}
1454
1455async fn modify_reputation(
1457 reputation: &mut ReputationAggregator,
1458 sender: &mut impl overseer::StatementDistributionSenderTrait,
1459 peer: PeerId,
1460 rep: Rep,
1461) {
1462 reputation.modify(sender, peer, rep).await;
1463}
1464
1465#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1477async fn handle_incoming_statement<Context>(
1478 ctx: &mut Context,
1479 state: &mut State,
1480 peer: PeerId,
1481 relay_parent: Hash,
1482 statement: UncheckedSignedStatement,
1483 reputation: &mut ReputationAggregator,
1484 metrics: &Metrics,
1485) {
1486 let peer_state = match state.peers.get(&peer) {
1487 None => {
1488 return
1490 },
1491 Some(p) => p,
1492 };
1493
1494 let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1496 None => {
1497 modify_reputation(
1498 reputation,
1499 ctx.sender(),
1500 peer,
1501 COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE,
1502 )
1503 .await;
1504 return
1505 },
1506 Some(p) => p,
1507 };
1508
1509 let per_session = match state.per_session.get(&per_relay_parent.session) {
1510 None => {
1511 gum::warn!(
1512 target: LOG_TARGET,
1513 session = ?per_relay_parent.session,
1514 "Missing expected session info.",
1515 );
1516
1517 return
1518 },
1519 Some(s) => s,
1520 };
1521 let session_info = &per_session.session_info;
1522
1523 if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
1524 gum::debug!(
1525 target: LOG_TARGET,
1526 ?relay_parent,
1527 validator_index = ?statement.unchecked_validator_index(),
1528 "Ignoring a statement from disabled validator."
1529 );
1530 modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
1531 return
1532 }
1533
1534 let local_validator = match per_relay_parent.local_validator.as_mut() {
1535 None => {
1536 if per_session.is_not_validator() {
1539 modify_reputation(
1540 reputation,
1541 ctx.sender(),
1542 peer,
1543 COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR,
1544 )
1545 .await;
1546 }
1547 return
1548 },
1549 Some(l) => l,
1550 };
1551
1552 let originator_group =
1553 match per_session.groups.by_validator_index(statement.unchecked_validator_index()) {
1554 Some(g) => g,
1555 None => {
1556 modify_reputation(
1557 reputation,
1558 ctx.sender(),
1559 peer,
1560 COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND,
1561 )
1562 .await;
1563 return
1564 },
1565 };
1566
1567 let (active, cluster_sender_index) = {
1568 let active = local_validator.active.as_mut();
1571
1572 let allowed_senders = active
1573 .as_ref()
1574 .map(|active| {
1575 active
1576 .cluster_tracker
1577 .senders_for_originator(statement.unchecked_validator_index())
1578 })
1579 .unwrap_or_default();
1580
1581 let idx = allowed_senders
1582 .iter()
1583 .filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (*i, ad)))
1584 .filter(|(_, ad)| peer_state.is_authority(ad))
1585 .map(|(i, _)| i)
1586 .next();
1587 (active, idx)
1588 };
1589
1590 let checked_statement = if let Some((active, cluster_sender_index)) =
1591 active.zip(cluster_sender_index)
1592 {
1593 match handle_cluster_statement(
1594 relay_parent,
1595 &mut active.cluster_tracker,
1596 per_relay_parent.session,
1597 &per_session.session_info,
1598 statement,
1599 cluster_sender_index,
1600 ) {
1601 Ok(Some(s)) => s,
1602 Ok(None) => return,
1603 Err(rep) => {
1604 modify_reputation(reputation, ctx.sender(), peer, rep).await;
1605 return
1606 },
1607 }
1608 } else {
1609 let grid_sender_index = local_validator
1610 .grid_tracker
1611 .direct_statement_providers(
1612 &per_session.groups,
1613 statement.unchecked_validator_index(),
1614 statement.unchecked_payload(),
1615 )
1616 .into_iter()
1617 .filter_map(|(i, validator_knows_statement)| {
1618 session_info
1619 .discovery_keys
1620 .get(i.0 as usize)
1621 .map(|ad| (i, ad, validator_knows_statement))
1622 })
1623 .filter(|(_, ad, _)| peer_state.is_authority(ad))
1624 .map(|(i, _, validator_knows_statement)| (i, validator_knows_statement))
1625 .next();
1626
1627 if let Some((grid_sender_index, validator_knows_statement)) = grid_sender_index {
1628 if !validator_knows_statement {
1629 match handle_grid_statement(
1630 relay_parent,
1631 &mut local_validator.grid_tracker,
1632 per_relay_parent.session,
1633 &per_session,
1634 statement,
1635 grid_sender_index,
1636 ) {
1637 Ok(s) => s,
1638 Err(rep) => {
1639 modify_reputation(reputation, ctx.sender(), peer, rep).await;
1640 return
1641 },
1642 }
1643 } else {
1644 modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
1646 return;
1647 }
1648 } else {
1649 modify_reputation(
1651 reputation,
1652 ctx.sender(),
1653 peer,
1654 COST_UNEXPECTED_STATEMENT_INVALID_SENDER,
1655 )
1656 .await;
1657 return
1658 }
1659 };
1660
1661 let statement = checked_statement.payload().clone();
1662 let originator_index = checked_statement.validator_index();
1663 let candidate_hash = *checked_statement.payload().candidate_hash();
1664
1665 {
1669 let res = state.candidates.insert_unconfirmed(
1670 peer,
1671 candidate_hash,
1672 relay_parent,
1673 originator_group,
1674 None,
1675 );
1676
1677 if let Err(BadAdvertisement) = res {
1678 modify_reputation(
1679 reputation,
1680 ctx.sender(),
1681 peer,
1682 COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE,
1683 )
1684 .await;
1685 return
1686 }
1687 }
1688
1689 let confirmed = state.candidates.get_confirmed(&candidate_hash);
1690 let is_confirmed = state.candidates.is_confirmed(&candidate_hash);
1691 if !is_confirmed {
1692 let mut request_entry =
1695 state
1696 .request_manager
1697 .get_or_insert(relay_parent, candidate_hash, originator_group);
1698
1699 request_entry.add_peer(peer);
1700
1701 request_entry.set_cluster_priority();
1704 }
1705
1706 let was_fresh = match per_relay_parent.statement_store.insert(
1707 &per_session.groups,
1708 checked_statement.clone(),
1709 StatementOrigin::Remote,
1710 ) {
1711 Err(statement_store::Error::ValidatorUnknown) => {
1712 gum::warn!(
1714 target: LOG_TARGET,
1715 ?relay_parent,
1716 validator_index = ?originator_index,
1717 "Error - accepted message from unknown validator."
1718 );
1719
1720 return
1721 },
1722 Ok(known) => known,
1723 };
1724
1725 if was_fresh {
1726 modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
1727 let is_importable = state.candidates.is_importable(&candidate_hash);
1728
1729 if let Some(ref session_topology) = per_session.grid_view {
1730 local_validator.grid_tracker.learned_fresh_statement(
1731 &per_session.groups,
1732 session_topology,
1733 originator_index,
1734 &statement,
1735 );
1736 }
1737
1738 if let (true, &Some(confirmed)) = (is_importable, &confirmed) {
1739 send_backing_fresh_statements(
1740 ctx,
1741 candidate_hash,
1742 originator_group,
1743 &relay_parent,
1744 &mut *per_relay_parent,
1745 confirmed,
1746 per_session,
1747 )
1748 .await;
1749 }
1750
1751 circulate_statement(
1753 ctx,
1754 relay_parent,
1755 per_relay_parent,
1756 per_session,
1757 &state.candidates,
1758 &state.authorities,
1759 &state.peers,
1760 checked_statement,
1761 metrics,
1762 )
1763 .await;
1764 } else {
1765 modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
1766 }
1767}
1768
1769fn handle_cluster_statement(
1775 relay_parent: Hash,
1776 cluster_tracker: &mut ClusterTracker,
1777 session: SessionIndex,
1778 session_info: &SessionInfo,
1779 statement: UncheckedSignedStatement,
1780 cluster_sender_index: ValidatorIndex,
1781) -> Result<Option<SignedStatement>, Rep> {
1782 let should_import = {
1784 match cluster_tracker.can_receive(
1785 cluster_sender_index,
1786 statement.unchecked_validator_index(),
1787 statement.unchecked_payload().clone(),
1788 ) {
1789 Ok(ClusterAccept::Ok) => true,
1790 Ok(ClusterAccept::WithPrejudice) => false,
1791 Err(ClusterRejectIncoming::ExcessiveSeconded) => return Err(COST_EXCESSIVE_SECONDED),
1792 Err(ClusterRejectIncoming::CandidateUnknown | ClusterRejectIncoming::Duplicate) =>
1793 return Err(COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED),
1794 Err(ClusterRejectIncoming::NotInGroup) => {
1795 return Err(COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP)
1798 },
1799 }
1800 };
1801
1802 let checked_statement =
1804 match check_statement_signature(session, &session_info.validators, relay_parent, statement)
1805 {
1806 Ok(s) => s,
1807 Err(_) => return Err(COST_INVALID_SIGNATURE),
1808 };
1809
1810 cluster_tracker.note_received(
1811 cluster_sender_index,
1812 checked_statement.validator_index(),
1813 checked_statement.payload().clone(),
1814 );
1815
1816 Ok(if should_import { Some(checked_statement) } else { None })
1817}
1818
1819fn handle_grid_statement(
1825 relay_parent: Hash,
1826 grid_tracker: &mut GridTracker,
1827 session: SessionIndex,
1828 per_session: &PerSessionState,
1829 statement: UncheckedSignedStatement,
1830 grid_sender_index: ValidatorIndex,
1831) -> Result<SignedStatement, Rep> {
1832 let checked_statement = match check_statement_signature(
1834 session,
1835 &per_session.session_info.validators,
1836 relay_parent,
1837 statement,
1838 ) {
1839 Ok(s) => s,
1840 Err(_) => return Err(COST_INVALID_SIGNATURE),
1841 };
1842
1843 grid_tracker.sent_or_received_direct_statement(
1844 &per_session.groups,
1845 checked_statement.validator_index(),
1846 grid_sender_index,
1847 &checked_statement.payload(),
1848 true,
1849 );
1850
1851 Ok(checked_statement)
1852}
1853
1854#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1857async fn send_backing_fresh_statements<Context>(
1858 ctx: &mut Context,
1859 candidate_hash: CandidateHash,
1860 group_index: GroupIndex,
1861 relay_parent: &Hash,
1862 relay_parent_state: &mut PerRelayParentState,
1863 confirmed: &candidates::ConfirmedCandidate,
1864 per_session: &PerSessionState,
1865) {
1866 let group_validators = per_session.groups.get(group_index).unwrap_or(&[]);
1867 let mut imported = Vec::new();
1868
1869 for statement in relay_parent_state
1870 .statement_store
1871 .fresh_statements_for_backing(group_validators, candidate_hash)
1872 {
1873 let v = statement.validator_index();
1874 let compact = statement.payload().clone();
1875 imported.push((v, compact));
1876 let carrying_pvd = statement
1877 .clone()
1878 .convert_to_superpayload_with(|statement| match statement {
1879 CompactStatement::Seconded(_) => FullStatementWithPVD::Seconded(
1880 (&**confirmed.candidate_receipt()).clone(),
1881 confirmed.persisted_validation_data().clone(),
1882 ),
1883 CompactStatement::Valid(c_hash) => FullStatementWithPVD::Valid(c_hash),
1884 })
1885 .expect("statements refer to same candidate; qed");
1886
1887 ctx.send_message(CandidateBackingMessage::Statement(*relay_parent, carrying_pvd))
1888 .await;
1889 }
1890
1891 for (v, s) in imported {
1892 relay_parent_state.statement_store.note_known_by_backing(v, s);
1893 }
1894}
1895
1896fn local_knowledge_filter(
1897 group_size: usize,
1898 group_index: GroupIndex,
1899 candidate_hash: CandidateHash,
1900 statement_store: &StatementStore,
1901) -> StatementFilter {
1902 let mut f = StatementFilter::blank(group_size);
1903 statement_store.fill_statement_filter(group_index, candidate_hash, &mut f);
1904 f
1905}
1906
1907#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1911async fn provide_candidate_to_grid<Context>(
1912 ctx: &mut Context,
1913 candidate_hash: CandidateHash,
1914 relay_parent_state: &mut PerRelayParentState,
1915 confirmed_candidate: &candidates::ConfirmedCandidate,
1916 per_session: &PerSessionState,
1917 authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
1918 peers: &HashMap<PeerId, PeerState>,
1919 metrics: &Metrics,
1920) {
1921 let local_validator = match relay_parent_state.local_validator {
1922 Some(ref mut v) => v,
1923 None => return,
1924 };
1925
1926 let relay_parent = confirmed_candidate.relay_parent();
1927 let group_index = confirmed_candidate.group_index();
1928
1929 let grid_view = match per_session.grid_view {
1930 Some(ref t) => t,
1931 None => {
1932 gum::debug!(
1933 target: LOG_TARGET,
1934 session = relay_parent_state.session,
1935 "Cannot handle backable candidate due to lack of topology",
1936 );
1937
1938 return
1939 },
1940 };
1941
1942 let group_size = match per_session.groups.get(group_index) {
1943 None => {
1944 gum::warn!(
1945 target: LOG_TARGET,
1946 ?candidate_hash,
1947 ?relay_parent,
1948 ?group_index,
1949 session = relay_parent_state.session,
1950 "Handled backed candidate with unknown group?",
1951 );
1952
1953 return
1954 },
1955 Some(g) => g.len(),
1956 };
1957
1958 let filter = local_knowledge_filter(
1959 group_size,
1960 group_index,
1961 candidate_hash,
1962 &relay_parent_state.statement_store,
1963 );
1964
1965 let actions = local_validator.grid_tracker.add_backed_candidate(
1966 grid_view,
1967 candidate_hash,
1968 group_index,
1969 filter.clone(),
1970 );
1971
1972 let manifest = protocol_v3::BackedCandidateManifest {
1973 relay_parent,
1974 candidate_hash,
1975 group_index,
1976 para_id: confirmed_candidate.para_id(),
1977 parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
1978 statement_knowledge: filter.clone(),
1979 };
1980 let acknowledgement = protocol_v3::BackedCandidateAcknowledgement {
1981 candidate_hash,
1982 statement_knowledge: filter.clone(),
1983 };
1984
1985 let mut manifest_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1986 let mut ack_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1987
1988 let mut post_statements = Vec::new();
1989 for (v, action) in actions {
1990 let p = match connected_validator_peer(authorities, per_session, v) {
1991 None => continue,
1992 Some(p) =>
1993 if peers.get(&p).map_or(false, |d| d.knows_relay_parent(&relay_parent)) {
1994 (p, peers.get(&p).expect("Qed, was checked above").protocol_version.into())
1995 } else {
1996 continue
1997 },
1998 };
1999
2000 match action {
2001 grid::ManifestKind::Full => manifest_peers.push(p),
2002 grid::ManifestKind::Acknowledgement => ack_peers.push(p),
2003 }
2004
2005 local_validator.grid_tracker.manifest_sent_to(
2006 &per_session.groups,
2007 v,
2008 candidate_hash,
2009 filter.clone(),
2010 );
2011 post_statements.extend(
2012 post_acknowledgement_statement_messages(
2013 v,
2014 relay_parent,
2015 &mut local_validator.grid_tracker,
2016 &relay_parent_state.statement_store,
2017 &per_session.groups,
2018 group_index,
2019 candidate_hash,
2020 &(p.0, p.1.try_into().expect("Qed, can not fail was checked above")),
2021 )
2022 .into_iter()
2023 .map(|m| (vec![p.0], m)),
2024 );
2025 }
2026
2027 let manifest_peers_v3 = filter_by_peer_version(&manifest_peers, ValidationVersion::V3.into());
2028 if !manifest_peers_v3.is_empty() {
2029 gum::debug!(
2030 target: LOG_TARGET,
2031 ?candidate_hash,
2032 local_validator = ?per_session.local_validator,
2033 n_peers = manifest_peers_v3.len(),
2034 "Sending manifest to v3 peers"
2035 );
2036
2037 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2038 manifest_peers_v3,
2039 ValidationProtocols::V3(
2040 protocol_v3::StatementDistributionMessage::BackedCandidateManifest(manifest),
2041 )
2042 .into(),
2043 ))
2044 .await;
2045 }
2046
2047 let ack_peers_v3 = filter_by_peer_version(&ack_peers, ValidationVersion::V3.into());
2048 if !ack_peers_v3.is_empty() {
2049 gum::debug!(
2050 target: LOG_TARGET,
2051 ?candidate_hash,
2052 local_validator = ?per_session.local_validator,
2053 n_peers = ack_peers_v3.len(),
2054 "Sending acknowledgement to v3 peers"
2055 );
2056
2057 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2058 ack_peers_v3,
2059 ValidationProtocols::V3(
2060 protocol_v3::StatementDistributionMessage::BackedCandidateKnown(acknowledgement),
2061 )
2062 .into(),
2063 ))
2064 .await;
2065 }
2066 if !post_statements.is_empty() {
2067 let count = post_statements.len();
2068 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(post_statements))
2069 .await;
2070 metrics.on_statements_distributed(count);
2071 }
2072}
2073
2074async fn determine_group_assignments(
2078 n_cores: usize,
2079 group_rotation_info: GroupRotationInfo,
2080 claim_queue: &ClaimQueueSnapshot,
2081) -> (HashMap<ParaId, Vec<GroupIndex>>, HashMap<GroupIndex, Vec<ParaId>>) {
2082 let schedule: HashMap<CoreIndex, Vec<ParaId>> = claim_queue
2085 .iter_all_claims()
2086 .map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
2087 .collect();
2088
2089 let mut groups_per_para = HashMap::new();
2090 let mut assignments_per_group = HashMap::with_capacity(schedule.len());
2091
2092 for (core_index, paras) in schedule {
2094 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
2095 assignments_per_group.insert(group_index, paras.clone());
2096
2097 for para in paras {
2098 groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index);
2099 }
2100 }
2101
2102 (groups_per_para, assignments_per_group)
2103}
2104
2105#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2106async fn fragment_chain_update_inner<Context>(
2107 ctx: &mut Context,
2108 state: &mut State,
2109 active_leaf_hash: Option<Hash>,
2110 required_parent_info: Option<(Hash, ParaId)>,
2111 known_hypotheticals: Option<Vec<HypotheticalCandidate>>,
2112) {
2113 let hypotheticals = match known_hypotheticals {
2115 None => state.candidates.frontier_hypotheticals(required_parent_info),
2116 Some(h) => h,
2117 };
2118
2119 gum::debug!(
2121 target: LOG_TARGET,
2122 active_leaf_hash = ?active_leaf_hash,
2123 "Calling getHypotheticalMembership from statement distribution for candidates: {:?}",
2124 &hypotheticals.iter().map(|hypo| hypo.candidate_hash()).collect::<Vec<_>>()
2125 );
2126 let candidate_memberships = {
2127 let (tx, rx) = oneshot::channel();
2128 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
2129 HypotheticalMembershipRequest {
2130 candidates: hypotheticals,
2131 fragment_chain_relay_parent: active_leaf_hash,
2132 },
2133 tx,
2134 ))
2135 .await;
2136
2137 match rx.await {
2138 Ok(candidate_memberships) => candidate_memberships,
2139 Err(oneshot::Canceled) => return,
2140 }
2141 };
2142 for (hypo, membership) in candidate_memberships {
2144 if membership.is_empty() {
2146 continue
2147 }
2148
2149 for leaf_hash in membership {
2150 state.candidates.note_importable_under(&hypo, leaf_hash);
2151 }
2152
2153 if let HypotheticalCandidate::Complete {
2155 candidate_hash,
2156 receipt,
2157 persisted_validation_data: _,
2158 } = hypo
2159 {
2160 let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
2161 let prs = state.per_relay_parent.get_mut(&receipt.descriptor.relay_parent());
2162 if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
2163 let per_session = state.per_session.get(&prs.session);
2164 let group_index = confirmed.group_index();
2165
2166 let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor.para_id())
2168 else {
2169 continue
2170 };
2171 if !expected_groups.iter().any(|g| *g == group_index) {
2172 continue
2173 }
2174
2175 if let Some(per_session) = per_session {
2176 send_backing_fresh_statements(
2177 ctx,
2178 candidate_hash,
2179 confirmed.group_index(),
2180 &receipt.descriptor.relay_parent(),
2181 prs,
2182 confirmed,
2183 per_session,
2184 )
2185 .await;
2186 }
2187 }
2188 }
2189 }
2190}
2191
2192#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2193async fn new_leaf_fragment_chain_updates<Context>(
2194 ctx: &mut Context,
2195 state: &mut State,
2196 leaf_hash: Hash,
2197) {
2198 fragment_chain_update_inner(ctx, state, Some(leaf_hash), None, None).await
2199}
2200
2201#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2202async fn prospective_backed_notification_fragment_chain_updates<Context>(
2203 ctx: &mut Context,
2204 state: &mut State,
2205 para_id: ParaId,
2206 para_head: Hash,
2207) {
2208 fragment_chain_update_inner(ctx, state, None, Some((para_head, para_id)), None).await
2209}
2210
2211#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2212async fn new_confirmed_candidate_fragment_chain_updates<Context>(
2213 ctx: &mut Context,
2214 state: &mut State,
2215 candidate: HypotheticalCandidate,
2216) {
2217 fragment_chain_update_inner(ctx, state, None, None, Some(vec![candidate])).await
2218}
2219
2220struct ManifestImportSuccess<'a> {
2221 relay_parent_state: &'a mut PerRelayParentState,
2222 per_session: &'a PerSessionState,
2223 acknowledge: bool,
2224 sender_index: ValidatorIndex,
2225}
2226
2227#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2232async fn handle_incoming_manifest_common<'a, Context>(
2233 ctx: &mut Context,
2234 peer: PeerId,
2235 peers: &HashMap<PeerId, PeerState>,
2236 per_relay_parent: &'a mut HashMap<Hash, PerRelayParentState>,
2237 per_session: &'a HashMap<SessionIndex, PerSessionState>,
2238 candidates: &mut Candidates,
2239 candidate_hash: CandidateHash,
2240 relay_parent: Hash,
2241 para_id: ParaId,
2242 mut manifest_summary: grid::ManifestSummary,
2243 manifest_kind: grid::ManifestKind,
2244 reputation: &mut ReputationAggregator,
2245) -> Option<ManifestImportSuccess<'a>> {
2246 let peer_state = peers.get(&peer)?;
2248
2249 let relay_parent_state = match per_relay_parent.get_mut(&relay_parent) {
2250 None => {
2251 modify_reputation(
2252 reputation,
2253 ctx.sender(),
2254 peer,
2255 COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
2256 )
2257 .await;
2258 return None
2259 },
2260 Some(s) => s,
2261 };
2262
2263 let per_session = per_session.get(&relay_parent_state.session)?;
2264
2265 if relay_parent_state.local_validator.is_none() {
2266 if per_session.is_not_validator() {
2267 modify_reputation(
2268 reputation,
2269 ctx.sender(),
2270 peer,
2271 COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
2272 )
2273 .await;
2274 }
2275 return None
2276 }
2277
2278 let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶_id) else {
2279 modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2280 return None
2281 };
2282
2283 if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
2284 modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2285 return None
2286 }
2287
2288 let grid_topology = per_session.grid_view.as_ref()?;
2289
2290 let sender_index = grid_topology
2291 .iter_sending_for_group(manifest_summary.claimed_group_index, manifest_kind)
2292 .filter_map(|i| per_session.session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
2293 .filter(|(_, ad)| peer_state.is_authority(ad))
2294 .map(|(i, _)| i)
2295 .next();
2296
2297 let sender_index = match sender_index {
2298 None => {
2299 modify_reputation(
2300 reputation,
2301 ctx.sender(),
2302 peer,
2303 COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN,
2304 )
2305 .await;
2306 return None
2307 },
2308 Some(s) => s,
2309 };
2310
2311 let group_index = manifest_summary.claimed_group_index;
2313 let claimed_parent_hash = manifest_summary.claimed_parent_hash;
2314
2315 let group = per_session.groups.get(group_index).unwrap_or(&[]);
2317 let disabled_mask = relay_parent_state.disabled_bitmask(group);
2318 manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
2319 manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
2320
2321 let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");
2322
2323 let seconding_limit = relay_parent_state
2324 .assignments_per_group
2325 .get(&group_index)?
2326 .iter()
2327 .filter(|para| para == &¶_id)
2328 .count();
2329
2330 let acknowledge = match local_validator.grid_tracker.import_manifest(
2331 grid_topology,
2332 &per_session.groups,
2333 candidate_hash,
2334 seconding_limit,
2335 manifest_summary,
2336 manifest_kind,
2337 sender_index,
2338 ) {
2339 Ok(x) => x,
2340 Err(grid::ManifestImportError::Conflicting) => {
2341 modify_reputation(reputation, ctx.sender(), peer, COST_CONFLICTING_MANIFEST).await;
2342 return None
2343 },
2344 Err(grid::ManifestImportError::Overflow) => {
2345 modify_reputation(reputation, ctx.sender(), peer, COST_EXCESSIVE_SECONDED).await;
2346 return None
2347 },
2348 Err(grid::ManifestImportError::Insufficient) => {
2349 modify_reputation(reputation, ctx.sender(), peer, COST_INSUFFICIENT_MANIFEST).await;
2350 return None
2351 },
2352 Err(grid::ManifestImportError::Malformed) => {
2353 modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2354 return None
2355 },
2356 Err(grid::ManifestImportError::Disallowed) => {
2357 modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_MANIFEST_DISALLOWED)
2358 .await;
2359 return None
2360 },
2361 };
2362
2363 if let Err(BadAdvertisement) = candidates.insert_unconfirmed(
2365 peer,
2366 candidate_hash,
2367 relay_parent,
2368 group_index,
2369 Some((claimed_parent_hash, para_id)),
2370 ) {
2371 modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
2372 return None
2373 }
2374
2375 if acknowledge {
2376 gum::trace!(
2377 target: LOG_TARGET,
2378 ?candidate_hash,
2379 from = ?sender_index,
2380 local_index = ?per_session.local_validator,
2381 ?manifest_kind,
2382 "immediate ack, known candidate"
2383 );
2384 }
2385
2386 Some(ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index })
2387}
2388
2389fn post_acknowledgement_statement_messages(
2392 recipient: ValidatorIndex,
2393 relay_parent: Hash,
2394 grid_tracker: &mut GridTracker,
2395 statement_store: &StatementStore,
2396 groups: &Groups,
2397 group_index: GroupIndex,
2398 candidate_hash: CandidateHash,
2399 peer: &(PeerId, ValidationVersion),
2400) -> Vec<net_protocol::VersionedValidationProtocol> {
2401 let sending_filter = match grid_tracker.pending_statements_for(recipient, candidate_hash) {
2402 None => return Vec::new(),
2403 Some(f) => f,
2404 };
2405
2406 let mut messages = Vec::new();
2407 for statement in
2408 statement_store.group_statements(groups, group_index, candidate_hash, &sending_filter)
2409 {
2410 grid_tracker.sent_or_received_direct_statement(
2411 groups,
2412 statement.validator_index(),
2413 recipient,
2414 statement.payload(),
2415 false,
2416 );
2417 match peer.1.into() {
2418 ValidationVersion::V3 => messages.push(ValidationProtocols::V3(
2419 protocol_v3::StatementDistributionMessage::Statement(
2420 relay_parent,
2421 statement.as_unchecked().clone(),
2422 )
2423 .into(),
2424 )),
2425 };
2426 }
2427
2428 messages
2429}
2430
2431#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2432async fn handle_incoming_manifest<Context>(
2433 ctx: &mut Context,
2434 state: &mut State,
2435 peer: PeerId,
2436 manifest: net_protocol::v3::BackedCandidateManifest,
2437 reputation: &mut ReputationAggregator,
2438 metrics: &Metrics,
2439) {
2440 gum::debug!(
2441 target: LOG_TARGET,
2442 candidate_hash = ?manifest.candidate_hash,
2443 ?peer,
2444 "Received incoming manifest",
2445 );
2446
2447 let x = match handle_incoming_manifest_common(
2448 ctx,
2449 peer,
2450 &state.peers,
2451 &mut state.per_relay_parent,
2452 &state.per_session,
2453 &mut state.candidates,
2454 manifest.candidate_hash,
2455 manifest.relay_parent,
2456 manifest.para_id,
2457 grid::ManifestSummary {
2458 claimed_parent_hash: manifest.parent_head_data_hash,
2459 claimed_group_index: manifest.group_index,
2460 statement_knowledge: manifest.statement_knowledge,
2461 },
2462 grid::ManifestKind::Full,
2463 reputation,
2464 )
2465 .await
2466 {
2467 Some(x) => x,
2468 None => return,
2469 };
2470
2471 let ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index } = x;
2472
2473 if acknowledge {
2474 gum::trace!(
2476 target: LOG_TARGET,
2477 candidate_hash = ?manifest.candidate_hash,
2478 "Known candidate - acknowledging manifest",
2479 );
2480
2481 let local_knowledge = {
2482 let group_size = match per_session.groups.get(manifest.group_index) {
2483 None => return, Some(x) => x.len(),
2485 };
2486
2487 local_knowledge_filter(
2488 group_size,
2489 manifest.group_index,
2490 manifest.candidate_hash,
2491 &relay_parent_state.statement_store,
2492 )
2493 };
2494
2495 let (messages, statements_count) = acknowledgement_and_statement_messages(
2496 &(
2497 peer,
2498 state
2499 .peers
2500 .get(&peer)
2501 .map(|val| val.protocol_version)
2502 .unwrap_or(ValidationVersion::V3),
2504 ),
2505 sender_index,
2506 &per_session.groups,
2507 relay_parent_state,
2508 manifest.relay_parent,
2509 manifest.group_index,
2510 manifest.candidate_hash,
2511 local_knowledge,
2512 );
2513
2514 if !messages.is_empty() {
2515 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
2516 metrics.on_statements_distributed(statements_count);
2517 }
2518 } else if !state.candidates.is_confirmed(&manifest.candidate_hash) {
2519 gum::trace!(
2521 target: LOG_TARGET,
2522 candidate_hash = ?manifest.candidate_hash,
2523 "Unknown candidate - requesting",
2524 );
2525
2526 state
2527 .request_manager
2528 .get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index)
2529 .add_peer(peer);
2530 }
2531}
2532
2533fn acknowledgement_and_statement_messages(
2536 peer: &(PeerId, ValidationVersion),
2537 validator_index: ValidatorIndex,
2538 groups: &Groups,
2539 relay_parent_state: &mut PerRelayParentState,
2540 relay_parent: Hash,
2541 group_index: GroupIndex,
2542 candidate_hash: CandidateHash,
2543 local_knowledge: StatementFilter,
2544) -> (Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)>, usize) {
2545 let local_validator = match relay_parent_state.local_validator.as_mut() {
2546 None => return (Vec::new(), 0),
2547 Some(l) => l,
2548 };
2549
2550 let acknowledgement = protocol_v3::BackedCandidateAcknowledgement {
2551 candidate_hash,
2552 statement_knowledge: local_knowledge.clone(),
2553 };
2554
2555 let mut messages = match peer.1 {
2556 ValidationVersion::V3 => vec![(
2557 vec![peer.0],
2558 ValidationProtocols::V3(
2559 protocol_v3::StatementDistributionMessage::BackedCandidateKnown(acknowledgement),
2560 )
2561 .into(),
2562 )],
2563 };
2564
2565 local_validator.grid_tracker.manifest_sent_to(
2566 groups,
2567 validator_index,
2568 candidate_hash,
2569 local_knowledge.clone(),
2570 );
2571
2572 let statement_messages = post_acknowledgement_statement_messages(
2573 validator_index,
2574 relay_parent,
2575 &mut local_validator.grid_tracker,
2576 &relay_parent_state.statement_store,
2577 &groups,
2578 group_index,
2579 candidate_hash,
2580 peer,
2581 );
2582 let statements_count = statement_messages.len();
2583
2584 messages.extend(statement_messages.into_iter().map(|m| (vec![peer.0], m)));
2585
2586 (messages, statements_count)
2587}
2588
2589#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2590async fn handle_incoming_acknowledgement<Context>(
2591 ctx: &mut Context,
2592 state: &mut State,
2593 peer: PeerId,
2594 acknowledgement: net_protocol::v3::BackedCandidateAcknowledgement,
2595 reputation: &mut ReputationAggregator,
2596 metrics: &Metrics,
2597) {
2598 gum::debug!(
2603 target: LOG_TARGET,
2604 candidate_hash = ?acknowledgement.candidate_hash,
2605 ?peer,
2606 "Received incoming acknowledgement",
2607 );
2608
2609 let candidate_hash = acknowledgement.candidate_hash;
2610 let (relay_parent, parent_head_data_hash, group_index, para_id) = {
2611 match state.candidates.get_confirmed(&candidate_hash) {
2612 Some(c) => (c.relay_parent(), c.parent_head_data_hash(), c.group_index(), c.para_id()),
2613 None => {
2614 modify_reputation(
2615 reputation,
2616 ctx.sender(),
2617 peer,
2618 COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE,
2619 )
2620 .await;
2621 return
2622 },
2623 }
2624 };
2625
2626 let x = match handle_incoming_manifest_common(
2627 ctx,
2628 peer,
2629 &state.peers,
2630 &mut state.per_relay_parent,
2631 &state.per_session,
2632 &mut state.candidates,
2633 candidate_hash,
2634 relay_parent,
2635 para_id,
2636 grid::ManifestSummary {
2637 claimed_parent_hash: parent_head_data_hash,
2638 claimed_group_index: group_index,
2639 statement_knowledge: acknowledgement.statement_knowledge,
2640 },
2641 grid::ManifestKind::Acknowledgement,
2642 reputation,
2643 )
2644 .await
2645 {
2646 Some(x) => x,
2647 None => return,
2648 };
2649
2650 let ManifestImportSuccess { relay_parent_state, per_session, sender_index, .. } = x;
2651
2652 let local_validator = match relay_parent_state.local_validator.as_mut() {
2653 None => return,
2654 Some(l) => l,
2655 };
2656
2657 let messages = post_acknowledgement_statement_messages(
2658 sender_index,
2659 relay_parent,
2660 &mut local_validator.grid_tracker,
2661 &relay_parent_state.statement_store,
2662 &per_session.groups,
2663 group_index,
2664 candidate_hash,
2665 &(
2666 peer,
2667 state
2668 .peers
2669 .get(&peer)
2670 .map(|val| val.protocol_version)
2671 .unwrap_or(ValidationVersion::V3),
2673 ),
2674 );
2675
2676 if !messages.is_empty() {
2677 let count = messages.len();
2678 ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(
2679 messages.into_iter().map(|m| (vec![peer], m)).collect(),
2680 ))
2681 .await;
2682 metrics.on_statements_distributed(count);
2683 }
2684}
2685
2686#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2688pub(crate) async fn handle_backed_candidate_message<Context>(
2689 ctx: &mut Context,
2690 state: &mut State,
2691 candidate_hash: CandidateHash,
2692 metrics: &Metrics,
2693) {
2694 let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
2697 None => {
2698 gum::debug!(
2699 target: LOG_TARGET,
2700 ?candidate_hash,
2701 "Received backed candidate notification for unknown or unconfirmed",
2702 );
2703
2704 return
2705 },
2706 Some(c) => c,
2707 };
2708
2709 let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
2710 None => return,
2711 Some(s) => s,
2712 };
2713
2714 let per_session = match state.per_session.get(&relay_parent_state.session) {
2715 None => return,
2716 Some(s) => s,
2717 };
2718
2719 gum::debug!(
2720 target: LOG_TARGET,
2721 ?candidate_hash,
2722 group_index = ?confirmed.group_index(),
2723 "Candidate Backed - initiating grid distribution & child fetches"
2724 );
2725
2726 provide_candidate_to_grid(
2727 ctx,
2728 candidate_hash,
2729 relay_parent_state,
2730 confirmed,
2731 per_session,
2732 &state.authorities,
2733 &state.peers,
2734 metrics,
2735 )
2736 .await;
2737
2738 prospective_backed_notification_fragment_chain_updates(
2740 ctx,
2741 state,
2742 confirmed.para_id(),
2743 confirmed.candidate_receipt().descriptor.para_head(),
2744 )
2745 .await;
2746}
2747
2748#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2751async fn send_cluster_candidate_statements<Context>(
2752 ctx: &mut Context,
2753 state: &mut State,
2754 candidate_hash: CandidateHash,
2755 relay_parent: Hash,
2756 metrics: &Metrics,
2757) {
2758 let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
2759 None => return,
2760 Some(s) => s,
2761 };
2762
2763 let per_session = match state.per_session.get(&relay_parent_state.session) {
2764 None => return,
2765 Some(s) => s,
2766 };
2767
2768 let local_group = match relay_parent_state.active_validator_state_mut() {
2769 None => return,
2770 Some(v) => v.group,
2771 };
2772
2773 let group_size = match per_session.groups.get(local_group) {
2774 None => return,
2775 Some(g) => g.len(),
2776 };
2777
2778 let statements: Vec<_> = relay_parent_state
2779 .statement_store
2780 .group_statements(
2781 &per_session.groups,
2782 local_group,
2783 candidate_hash,
2784 &StatementFilter::full(group_size),
2785 )
2786 .map(|x| x.clone())
2787 .collect();
2788
2789 for statement in statements {
2790 circulate_statement(
2791 ctx,
2792 relay_parent,
2793 relay_parent_state,
2794 per_session,
2795 &state.candidates,
2796 &state.authorities,
2797 &state.peers,
2798 statement,
2799 metrics,
2800 )
2801 .await;
2802 }
2803}
2804
2805#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2812async fn apply_post_confirmation<Context>(
2813 ctx: &mut Context,
2814 state: &mut State,
2815 post_confirmation: PostConfirmation,
2816 reputation: &mut ReputationAggregator,
2817 metrics: &Metrics,
2818) {
2819 for peer in post_confirmation.reckoning.incorrect {
2820 modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
2821 }
2822
2823 let candidate_hash = post_confirmation.hypothetical.candidate_hash();
2824 state.request_manager.remove_for(candidate_hash);
2825
2826 send_cluster_candidate_statements(
2827 ctx,
2828 state,
2829 candidate_hash,
2830 post_confirmation.hypothetical.relay_parent(),
2831 metrics,
2832 )
2833 .await;
2834 new_confirmed_candidate_fragment_chain_updates(ctx, state, post_confirmation.hypothetical)
2835 .await;
2836}
2837
2838#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2840pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut State) {
2841 if !state.request_manager.has_pending_requests() {
2842 return
2843 }
2844
2845 let peers = &state.peers;
2846 let peer_advertised = |identifier: &CandidateIdentifier, peer: &_| {
2847 let peer_data = peers.get(peer)?;
2848
2849 let relay_parent_state = state.per_relay_parent.get(&identifier.relay_parent)?;
2850 let per_session = state.per_session.get(&relay_parent_state.session)?;
2851
2852 let local_validator = relay_parent_state.local_validator.as_ref()?;
2853
2854 for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
2855 per_session.authority_lookup.get(a)
2856 }) {
2857 if let Some(active) = local_validator.active.as_ref() {
2860 if active.cluster_tracker.knows_candidate(validator_id, identifier.candidate_hash) {
2861 return Some(StatementFilter::blank(active.cluster_tracker.targets().len()))
2862 }
2863 }
2864
2865 let filter = local_validator
2866 .grid_tracker
2867 .advertised_statements(validator_id, &identifier.candidate_hash);
2868
2869 if let Some(f) = filter {
2870 return Some(f)
2871 }
2872 }
2873
2874 None
2875 };
2876 let request_props = |identifier: &CandidateIdentifier| {
2877 let &CandidateIdentifier { relay_parent, group_index, .. } = identifier;
2878
2879 let relay_parent_state = state.per_relay_parent.get(&relay_parent)?;
2880 let per_session = state.per_session.get(&relay_parent_state.session)?;
2881 let group = per_session.groups.get(group_index)?;
2882 let seconding_limit = relay_parent_state.assignments_per_group.get(&group_index)?.len();
2883
2884 let mut unwanted_mask = StatementFilter::blank(group.len());
2886 for (i, v) in group.iter().enumerate() {
2887 if relay_parent_state.statement_store.seconded_count(v) >= seconding_limit {
2888 unwanted_mask.seconded_in_group.set(i, true);
2889 }
2890 }
2891
2892 let disabled_mask = relay_parent_state.disabled_bitmask(group);
2894 unwanted_mask.seconded_in_group |= &disabled_mask;
2895 unwanted_mask.validated_in_group |= &disabled_mask;
2896
2897 let local_validator = relay_parent_state.local_validator.as_ref()?;
2899 let require_backing = local_validator
2900 .active
2901 .as_ref()
2902 .map_or(true, |active| active.group != group_index);
2903
2904 let backing_threshold = if require_backing {
2905 let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1;
2906 Some(threshold)
2907 } else {
2908 None
2909 };
2910
2911 Some(RequestProperties { unwanted_mask, backing_threshold })
2912 };
2913
2914 while let Some(request) = state.request_manager.next_request(
2915 &mut state.response_manager,
2916 request_props,
2917 peer_advertised,
2918 ) {
2919 ctx.send_message(NetworkBridgeTxMessage::SendRequests(
2921 vec![Requests::AttestedCandidateV2(request)],
2922 IfDisconnected::ImmediateError,
2923 ))
2924 .await;
2925 }
2926}
2927
2928pub(crate) async fn receive_response(response_manager: &mut ResponseManager) -> UnhandledResponse {
2932 match response_manager.incoming().await {
2933 Some(r) => r,
2934 None => futures::future::pending().await,
2935 }
2936}
2937
2938pub(crate) async fn next_retry(request_manager: &mut RequestManager) {
2942 match request_manager.next_retry_time() {
2943 Some(instant) =>
2944 futures_timer::Delay::new(instant.saturating_duration_since(Instant::now())).await,
2945 None => futures::future::pending().await,
2946 }
2947}
2948
2949#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2952pub(crate) async fn handle_response<Context>(
2953 ctx: &mut Context,
2954 state: &mut State,
2955 response: UnhandledResponse,
2956 reputation: &mut ReputationAggregator,
2957 metrics: &Metrics,
2958) {
2959 let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
2960 response.candidate_identifier();
2961 let peer = *response.requested_peer();
2962
2963 gum::trace!(
2964 target: LOG_TARGET,
2965 ?candidate_hash,
2966 ?peer,
2967 "Received response",
2968 );
2969
2970 let post_confirmation = {
2971 let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
2972 None => return,
2973 Some(s) => s,
2974 };
2975
2976 let per_session = match state.per_session.get(&relay_parent_state.session) {
2977 None => return,
2978 Some(s) => s,
2979 };
2980
2981 let group = match per_session.groups.get(group_index) {
2982 None => return,
2983 Some(g) => g,
2984 };
2985
2986 let disabled_mask = relay_parent_state.disabled_bitmask(group);
2987
2988 let res = response.validate_response(
2989 &mut state.request_manager,
2990 group,
2991 relay_parent_state.session,
2992 |v| per_session.session_info.validators.get(v).map(|x| x.clone()),
2993 |para, g_index| {
2994 let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶) else {
2995 return false
2996 };
2997
2998 expected_groups.iter().any(|g| g == &g_index)
2999 },
3000 disabled_mask,
3001 &relay_parent_state.transposed_cq,
3002 per_session.candidate_receipt_v2_enabled(),
3003 );
3004
3005 for (peer, rep) in res.reputation_changes {
3006 modify_reputation(reputation, ctx.sender(), peer, rep).await;
3007 }
3008
3009 let (candidate, pvd, statements) = match res.request_status {
3010 requests::CandidateRequestStatus::Outdated => return,
3011 requests::CandidateRequestStatus::Incomplete => {
3012 gum::trace!(
3013 target: LOG_TARGET,
3014 ?candidate_hash,
3015 "Response incomplete. Retrying"
3016 );
3017
3018 return
3019 },
3020 requests::CandidateRequestStatus::Complete {
3021 candidate,
3022 persisted_validation_data,
3023 statements,
3024 } => {
3025 gum::trace!(
3026 target: LOG_TARGET,
3027 ?candidate_hash,
3028 n_statements = statements.len(),
3029 "Successfully received candidate"
3030 );
3031
3032 (candidate, persisted_validation_data, statements)
3033 },
3034 };
3035
3036 for statement in statements {
3037 let _ = relay_parent_state.statement_store.insert(
3038 &per_session.groups,
3039 statement,
3040 StatementOrigin::Remote,
3041 );
3042 }
3043
3044 if let Some(post_confirmation) =
3045 state.candidates.confirm_candidate(candidate_hash, candidate, pvd, group_index)
3046 {
3047 post_confirmation
3048 } else {
3049 gum::warn!(
3050 target: LOG_TARGET,
3051 ?candidate_hash,
3052 "Candidate re-confirmed by request/response: logic error",
3053 );
3054
3055 return
3056 }
3057 };
3058
3059 apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
3061
3062 let confirmed = state.candidates.get_confirmed(&candidate_hash).expect("just confirmed; qed");
3063
3064 if !confirmed.is_importable(None) {
3068 return
3069 }
3070
3071 let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
3072 None => return,
3073 Some(s) => s,
3074 };
3075
3076 let per_session = match state.per_session.get(&relay_parent_state.session) {
3077 None => return,
3078 Some(s) => s,
3079 };
3080
3081 send_backing_fresh_statements(
3082 ctx,
3083 candidate_hash,
3084 group_index,
3085 &relay_parent,
3086 relay_parent_state,
3087 confirmed,
3088 per_session,
3089 )
3090 .await;
3091
3092 }
3099
3100pub(crate) fn seconded_and_sufficient(
3102 filter: &StatementFilter,
3103 backing_threshold: Option<usize>,
3104) -> bool {
3105 backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
3106}
3107
3108pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
3110 let ResponderMessage { request, sent_feedback } = message;
3111 let AttestedCandidateRequest { candidate_hash, ref mask } = &request.payload;
3112
3113 gum::trace!(
3114 target: LOG_TARGET,
3115 ?candidate_hash,
3116 peer = ?request.peer,
3117 "Received request"
3118 );
3119
3120 let _ = sent_feedback.send(());
3122
3123 let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
3124 None => return, Some(c) => c,
3126 };
3127
3128 let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
3129 None => return,
3130 Some(s) => s,
3131 };
3132
3133 let local_validator = match relay_parent_state.local_validator.as_ref() {
3134 None => return,
3135 Some(s) => s,
3136 };
3137
3138 let per_session = match state.per_session.get(&relay_parent_state.session) {
3139 None => return,
3140 Some(s) => s,
3141 };
3142
3143 let peer_data = match state.peers.get(&request.peer) {
3144 None => return,
3145 Some(d) => d,
3146 };
3147
3148 let group_index = confirmed.group_index();
3149 let group = per_session
3150 .groups
3151 .get(group_index)
3152 .expect("group from session's candidate always known; qed");
3153
3154 let group_size = group.len();
3155
3156 if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size {
3158 let _ = request.send_outgoing_response(OutgoingResponse {
3159 result: Err(()),
3160 reputation_changes: vec![COST_INVALID_REQUEST_BITFIELD_SIZE],
3161 sent_feedback: None,
3162 });
3163
3164 return
3165 }
3166
3167 let (validator_id, is_cluster) = {
3170 let mut validator_id = None;
3171 let mut is_cluster = false;
3172 for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
3173 per_session.authority_lookup.get(a)
3174 }) {
3175 if local_validator
3176 .active
3177 .as_ref()
3178 .map_or(false, |active| active.cluster_tracker.can_request(v, *candidate_hash))
3179 {
3180 validator_id = Some(v);
3181 is_cluster = true;
3182 break
3183 }
3184
3185 if local_validator.grid_tracker.can_request(v, *candidate_hash) {
3186 validator_id = Some(v);
3187 break
3188 }
3189 }
3190
3191 match validator_id {
3192 Some(v) => (v, is_cluster),
3193 None => {
3194 let _ = request.send_outgoing_response(OutgoingResponse {
3195 result: Err(()),
3196 reputation_changes: vec![COST_UNEXPECTED_REQUEST],
3197 sent_feedback: None,
3198 });
3199
3200 return
3201 },
3202 }
3203 };
3204
3205 let and_mask = StatementFilter {
3208 seconded_in_group: !mask.seconded_in_group.clone(),
3209 validated_in_group: !mask.validated_in_group.clone(),
3210 };
3211
3212 let local_validator = match relay_parent_state.local_validator.as_mut() {
3213 None => return,
3214 Some(s) => s,
3215 };
3216
3217 let mut sent_filter = StatementFilter::blank(group_size);
3218 let statements: Vec<_> = relay_parent_state
3219 .statement_store
3220 .group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask)
3221 .map(|s| {
3222 let s = s.as_unchecked().clone();
3223 let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
3224 let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s };
3225
3226 match s.unchecked_payload() {
3227 CompactStatement::Seconded(_) => {
3228 sent_filter.seconded_in_group.set(i, true);
3229 },
3230 CompactStatement::Valid(_) => {
3231 sent_filter.validated_in_group.set(i, true);
3232 },
3233 }
3234 s
3235 })
3236 .collect();
3237
3238 if !is_cluster {
3241 let threshold = per_session
3242 .groups
3243 .get_size_and_backing_threshold(group_index)
3244 .expect("group existence checked above; qed")
3245 .1;
3246
3247 if !seconded_and_sufficient(&sent_filter, Some(threshold)) {
3248 gum::info!(
3249 target: LOG_TARGET,
3250 ?candidate_hash,
3251 relay_parent = ?confirmed.relay_parent(),
3252 ?group_index,
3253 "Dropping a request from a grid peer because the backing threshold is no longer met."
3254 );
3255 return
3256 }
3257 }
3258
3259 for statement in &statements {
3261 if is_cluster {
3262 local_validator
3263 .active
3264 .as_mut()
3265 .expect("cluster peer means local is active validator; qed")
3266 .cluster_tracker
3267 .note_sent(
3268 validator_id,
3269 statement.unchecked_validator_index(),
3270 statement.unchecked_payload().clone(),
3271 );
3272 } else {
3273 local_validator.grid_tracker.sent_or_received_direct_statement(
3274 &per_session.groups,
3275 statement.unchecked_validator_index(),
3276 validator_id,
3277 statement.unchecked_payload(),
3278 false,
3279 );
3280 }
3281 }
3282
3283 let response = AttestedCandidateResponse {
3284 candidate_receipt: (&**confirmed.candidate_receipt()).clone(),
3285 persisted_validation_data: confirmed.persisted_validation_data().clone(),
3286 statements,
3287 };
3288
3289 let _ = request.send_response(response);
3290}
3291
3292pub(crate) struct ResponderMessage {
3294 request: IncomingRequest<AttestedCandidateRequest>,
3295 sent_feedback: oneshot::Sender<()>,
3296}
3297
3298pub(crate) async fn respond_task(
3302 mut receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
3303 mut sender: mpsc::Sender<ResponderMessage>,
3304 metrics: Metrics,
3305) {
3306 let mut pending_out = FuturesUnordered::new();
3307 let mut active_peers = HashSet::new();
3308
3309 loop {
3310 select! {
3311 request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
3313 let request = match request_result.into_nested() {
3314 Ok(Ok(v)) => v,
3315 Err(fatal) => {
3316 gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
3317 return
3318 },
3319 Ok(Err(jfyi)) => {
3320 gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
3321 continue
3322 },
3323 };
3324
3325 if active_peers.contains(&request.peer) {
3327 gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request");
3328 metrics.on_request_dropped_peer_rate_limit();
3329 continue
3330 }
3331
3332 if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
3334 gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish");
3335 metrics.on_max_parallel_requests_reached();
3336 let (_, peer) = pending_out.select_next_some().await;
3337 active_peers.remove(&peer);
3338 }
3339
3340 let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
3342 let peer = request.peer;
3343 if let Err(err) = sender
3344 .feed(ResponderMessage { request, sent_feedback: pending_sent_tx })
3345 .await
3346 {
3347 gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
3348 return
3349 }
3350 let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
3351 pending_out.push(future_with_peer);
3352 active_peers.insert(peer);
3353 },
3354 result = pending_out.select_next_some() => {
3356 let (_, peer) = result;
3357 active_peers.remove(&peer);
3358 },
3359 }
3360 }
3361}