referrerpolicy=no-referrer-when-downgrade

polkadot_statement_distribution/v2/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implementation of the v2 statement distribution protocol,
18//! designed for asynchronous backing.
19
20use bitvec::prelude::{BitVec, Lsb0};
21use polkadot_node_network_protocol::{
22	self as net_protocol, filter_by_peer_version,
23	grid_topology::SessionGridTopology,
24	peer_set::{ProtocolVersion, ValidationVersion},
25	request_response::{
26		incoming::OutgoingResponse,
27		v2::{AttestedCandidateRequest, AttestedCandidateResponse},
28		IncomingRequest, IncomingRequestReceiver, Requests,
29		MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
30	},
31	v3::{self as protocol_v3, StatementFilter},
32	IfDisconnected, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
33};
34use polkadot_node_primitives::{
35	SignedFullStatementWithPVD, StatementWithPVD as FullStatementWithPVD,
36};
37use polkadot_node_subsystem::{
38	messages::{
39		network_bridge_event::NewGossipTopology, CandidateBackingMessage, HypotheticalCandidate,
40		HypotheticalMembershipRequest, NetworkBridgeEvent, NetworkBridgeTxMessage,
41		ProspectiveParachainsMessage,
42	},
43	overseer, ActivatedLeaf,
44};
45use polkadot_node_subsystem_util::{
46	backing_implicit_view::View as ImplicitView, reputation::ReputationAggregator,
47	request_min_backing_votes, request_node_features, runtime::ClaimQueueSnapshot,
48};
49use polkadot_primitives::{
50	node_features::FeatureIndex, transpose_claim_queue, AuthorityDiscoveryId,
51	CandidateDescriptorVersion, CandidateHash, CompactStatement, CoreIndex, GroupIndex,
52	GroupRotationInfo, Hash, Id as ParaId, IndexedVec, SessionIndex, SessionInfo, SignedStatement,
53	SigningContext, TransposedClaimQueue, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
54};
55
56use sp_keystore::KeystorePtr;
57
58use fatality::Nested;
59use futures::{
60	channel::{mpsc, oneshot},
61	future::FutureExt,
62	select,
63	stream::FuturesUnordered,
64	SinkExt, StreamExt,
65};
66
67use std::{
68	collections::{
69		hash_map::{Entry, HashMap},
70		HashSet,
71	},
72	time::{Duration, Instant},
73};
74
75use crate::{
76	error::{JfyiError, JfyiErrorResult},
77	metrics::Metrics,
78	LOG_TARGET,
79};
80use candidates::{BadAdvertisement, Candidates, PostConfirmation};
81use cluster::{Accept as ClusterAccept, ClusterTracker, RejectIncoming as ClusterRejectIncoming};
82use grid::GridTracker;
83use groups::Groups;
84use requests::{CandidateIdentifier, RequestProperties};
85use statement_store::{StatementOrigin, StatementStore};
86
87pub use requests::{RequestManager, ResponseManager, UnhandledResponse};
88
89mod candidates;
90mod cluster;
91mod grid;
92mod groups;
93mod requests;
94mod statement_store;
95
96#[cfg(test)]
97mod tests;
98
99const COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR: Rep =
100	Rep::CostMinor("Unexpected Statement, not a validator");
101const COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND: Rep =
102	Rep::CostMinor("Unexpected Statement, validator not found");
103const COST_UNEXPECTED_STATEMENT_INVALID_SENDER: Rep =
104	Rep::CostMinor("Unexpected Statement, invalid sender");
105const COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE: Rep =
106	Rep::CostMinor("Unexpected Statement, bad advertise");
107const COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED: Rep =
108	Rep::CostMinor("Unexpected Statement, cluster rejected");
109const COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP: Rep =
110	Rep::CostMinor("Unexpected Statement, not in group");
111
112const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
113	Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
114const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
115const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator");
116
117const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
118	Rep::CostMinor("Unexpected Manifest, missing knowledge for relay parent");
119const COST_UNEXPECTED_MANIFEST_DISALLOWED: Rep =
120	Rep::CostMinor("Unexpected Manifest, Peer Disallowed");
121const COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN: Rep =
122	Rep::CostMinor("Unexpected Manifest, Peer Unknown");
123const COST_CONFLICTING_MANIFEST: Rep = Rep::CostMajor("Manifest conflicts with previous");
124const COST_INSUFFICIENT_MANIFEST: Rep =
125	Rep::CostMajor("Manifest statements insufficient to back candidate");
126const COST_MALFORMED_MANIFEST: Rep = Rep::CostMajor("Manifest is malformed");
127const COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE: Rep =
128	Rep::CostMinor("Unexpected acknowledgement, unknown candidate");
129
130const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
131const COST_IMPROPERLY_DECODED_RESPONSE: Rep =
132	Rep::CostMajor("Improperly Encoded Candidate Response");
133const COST_INVALID_RESPONSE: Rep = Rep::CostMajor("Invalid Candidate Response");
134const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep =
135	Rep::CostMajor("Un-requested Statement In Response");
136const COST_INACCURATE_ADVERTISEMENT: Rep =
137	Rep::CostMajor("Peer advertised a candidate inaccurately");
138const COST_UNSUPPORTED_DESCRIPTOR_VERSION: Rep =
139	Rep::CostMajor("Candidate Descriptor version is not supported");
140const COST_INVALID_UMP_SIGNALS: Rep =
141	Rep::CostMajor("Candidate Descriptor contains invalid ump signals");
142const COST_INVALID_SESSION_INDEX: Rep =
143	Rep::CostMajor("Candidate Descriptor contains an invalid session index");
144
145const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
146const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep =
147	Rep::CostMajor("Attested candidate request bitfields have wrong size");
148const COST_UNEXPECTED_REQUEST: Rep = Rep::CostMajor("Unexpected attested candidate request");
149
150const BENEFIT_VALID_RESPONSE: Rep = Rep::BenefitMajor("Peer Answered Candidate Request");
151const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement");
152const BENEFIT_VALID_STATEMENT_FIRST: Rep =
153	Rep::BenefitMajorFirst("Peer was the first to provide a given valid statement");
154
155/// The amount of time to wait before retrying when the node sends a request and it is dropped.
156pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
157
158struct PerRelayParentState {
159	local_validator: Option<LocalValidatorState>,
160	statement_store: StatementStore,
161	session: SessionIndex,
162	transposed_cq: TransposedClaimQueue,
163	groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
164	disabled_validators: HashSet<ValidatorIndex>,
165	assignments_per_group: HashMap<GroupIndex, Vec<ParaId>>,
166}
167
168impl PerRelayParentState {
169	fn active_validator_state(&self) -> Option<&ActiveValidatorState> {
170		self.local_validator.as_ref().and_then(|local| local.active.as_ref())
171	}
172
173	fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
174		self.local_validator.as_mut().and_then(|local| local.active.as_mut())
175	}
176
177	/// Returns `true` if the given validator is disabled in the context of the relay parent.
178	pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
179		self.disabled_validators.contains(validator_index)
180	}
181
182	/// A convenience function to generate a disabled bitmask for the given backing group.
183	/// The output bits are set to `true` for validators that are disabled.
184	pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
185		BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
186	}
187}
188
189// per-relay-parent local validator state.
190struct LocalValidatorState {
191	// the grid-level communication at this relay-parent.
192	grid_tracker: GridTracker,
193	// additional fields in case local node is an active validator.
194	active: Option<ActiveValidatorState>,
195	// local index actually exists in case node is inactive validator, however,
196	// it's not needed outside of `build_session_topology`, where it's known.
197}
198
199struct ActiveValidatorState {
200	// The index of the validator.
201	index: ValidatorIndex,
202	// our validator group
203	group: GroupIndex,
204	// the assignments of our validator group, if any.
205	assignments: Vec<ParaId>,
206	// the 'direct-in-group' communication at this relay-parent.
207	cluster_tracker: ClusterTracker,
208}
209
210#[derive(Debug, Copy, Clone)]
211enum LocalValidatorIndex {
212	// Local node is an active validator.
213	Active(ValidatorIndex),
214	// Local node is not in active validator set.
215	Inactive,
216}
217
218#[derive(Debug)]
219struct PerSessionState {
220	session_info: SessionInfo,
221	groups: Groups,
222	authority_lookup: HashMap<AuthorityDiscoveryId, ValidatorIndex>,
223	// is only `None` in the time between seeing a session and
224	// getting the topology from the gossip-support subsystem
225	grid_view: Option<grid::SessionTopologyView>,
226	local_validator: Option<LocalValidatorIndex>,
227	// `true` if v2 candidate receipts are allowed by the runtime
228	allow_v2_descriptors: bool,
229}
230
231impl PerSessionState {
232	fn new(
233		session_info: SessionInfo,
234		keystore: &KeystorePtr,
235		backing_threshold: u32,
236		allow_v2_descriptors: bool,
237	) -> Self {
238		let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold);
239		let mut authority_lookup = HashMap::new();
240		for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() {
241			authority_lookup.insert(ad, ValidatorIndex(i as _));
242		}
243
244		let local_validator = polkadot_node_subsystem_util::signing_key_and_index(
245			session_info.validators.iter(),
246			keystore,
247		)
248		.map(|(_, index)| LocalValidatorIndex::Active(index));
249
250		PerSessionState {
251			session_info,
252			groups,
253			authority_lookup,
254			grid_view: None,
255			local_validator,
256			allow_v2_descriptors,
257		}
258	}
259
260	fn supply_topology(
261		&mut self,
262		topology: &SessionGridTopology,
263		local_index: Option<ValidatorIndex>,
264	) {
265		// Note: we use the local index rather than the `self.local_validator` as the
266		// former may be `Some` when the latter is `None`, due to the set of nodes in
267		// discovery being a superset of the active validators for consensus.
268		let grid_view = grid::build_session_topology(
269			self.session_info.validator_groups.iter(),
270			topology,
271			local_index,
272		);
273
274		self.grid_view = Some(grid_view);
275		if local_index.is_some() {
276			self.local_validator.get_or_insert(LocalValidatorIndex::Inactive);
277		}
278
279		gum::info!(
280			target: LOG_TARGET,
281			index_in_gossip_topology = ?local_index,
282			index_in_parachain_authorities = ?self.local_validator,
283			"Node uses the following topology indices"
284		);
285	}
286
287	/// Returns `true` if local is neither active or inactive validator node.
288	///
289	/// `false` is also returned if session topology is not known yet.
290	fn is_not_validator(&self) -> bool {
291		self.grid_view.is_some() && self.local_validator.is_none()
292	}
293
294	/// Returns `true` if v2 candidate receipts are enabled
295	fn candidate_receipt_v2_enabled(&self) -> bool {
296		self.allow_v2_descriptors
297	}
298}
299
300pub(crate) struct State {
301	/// The utility for managing the implicit and explicit views in a consistent way.
302	implicit_view: ImplicitView,
303	candidates: Candidates,
304	per_relay_parent: HashMap<Hash, PerRelayParentState>,
305	per_session: HashMap<SessionIndex, PerSessionState>,
306	// Topology might be received before first leaf update, where we
307	// initialize the per_session_state, so cache it here until we
308	// are able to use it.
309	unused_topologies: HashMap<SessionIndex, NewGossipTopology>,
310	peers: HashMap<PeerId, PeerState>,
311	keystore: KeystorePtr,
312	authorities: HashMap<AuthorityDiscoveryId, PeerId>,
313	request_manager: RequestManager,
314	response_manager: ResponseManager,
315}
316
317impl State {
318	/// Create a new state.
319	pub(crate) fn new(keystore: KeystorePtr) -> Self {
320		State {
321			implicit_view: Default::default(),
322			candidates: Default::default(),
323			per_relay_parent: HashMap::new(),
324			per_session: HashMap::new(),
325			peers: HashMap::new(),
326			keystore,
327			authorities: HashMap::new(),
328			request_manager: RequestManager::new(),
329			response_manager: ResponseManager::new(),
330			unused_topologies: HashMap::new(),
331		}
332	}
333
334	pub(crate) fn request_and_response_managers(
335		&mut self,
336	) -> (&mut RequestManager, &mut ResponseManager) {
337		(&mut self.request_manager, &mut self.response_manager)
338	}
339}
340
341// For the provided validator index, if there is a connected peer controlling the given authority
342// ID, then return that peer's `PeerId`.
343fn connected_validator_peer(
344	authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
345	per_session: &PerSessionState,
346	validator_index: ValidatorIndex,
347) -> Option<PeerId> {
348	per_session
349		.session_info
350		.discovery_keys
351		.get(validator_index.0 as usize)
352		.and_then(|k| authorities.get(k))
353		.map(|p| *p)
354}
355
356struct PeerState {
357	view: View,
358	protocol_version: ValidationVersion,
359	implicit_view: HashSet<Hash>,
360	discovery_ids: Option<HashSet<AuthorityDiscoveryId>>,
361}
362
363impl PeerState {
364	// Update the view, returning a vector of implicit relay-parents which weren't previously
365	// part of the view.
366	fn update_view(&mut self, new_view: View, local_implicit: &ImplicitView) -> Vec<Hash> {
367		let next_implicit = new_view
368			.iter()
369			.flat_map(|x| local_implicit.known_allowed_relay_parents_under(x, None))
370			.flatten()
371			.cloned()
372			.collect::<HashSet<_>>();
373
374		let fresh_implicit = next_implicit
375			.iter()
376			.filter(|x| !self.implicit_view.contains(x))
377			.cloned()
378			.collect();
379
380		self.view = new_view;
381		self.implicit_view = next_implicit;
382
383		fresh_implicit
384	}
385
386	// Attempt to reconcile the view with new information about the implicit relay parents
387	// under an active leaf.
388	fn reconcile_active_leaf(&mut self, leaf_hash: Hash, implicit: &[Hash]) -> Vec<Hash> {
389		if !self.view.contains(&leaf_hash) {
390			return Vec::new()
391		}
392
393		let mut v = Vec::with_capacity(implicit.len());
394		for i in implicit {
395			if self.implicit_view.insert(*i) {
396				v.push(*i);
397			}
398		}
399		v
400	}
401
402	// Whether we know that a peer knows a relay-parent.
403	// The peer knows the relay-parent if it is either implicit or explicit
404	// in their view. However, if it is implicit via an active-leaf we don't
405	// recognize, we will not accurately be able to recognize them as 'knowing'
406	// the relay-parent.
407	fn knows_relay_parent(&self, relay_parent: &Hash) -> bool {
408		self.implicit_view.contains(relay_parent) || self.view.contains(relay_parent)
409	}
410
411	fn is_authority(&self, authority_id: &AuthorityDiscoveryId) -> bool {
412		self.discovery_ids.as_ref().map_or(false, |x| x.contains(authority_id))
413	}
414
415	fn iter_known_discovery_ids(&self) -> impl Iterator<Item = &AuthorityDiscoveryId> {
416		self.discovery_ids.as_ref().into_iter().flatten()
417	}
418}
419
420#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
421pub(crate) async fn handle_network_update<Context>(
422	ctx: &mut Context,
423	state: &mut State,
424	update: NetworkBridgeEvent<net_protocol::StatementDistributionMessage>,
425	reputation: &mut ReputationAggregator,
426	metrics: &Metrics,
427) {
428	match update {
429		NetworkBridgeEvent::PeerConnected(peer_id, role, protocol_version, mut authority_ids) => {
430			gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?protocol_version, "Peer connected");
431
432			let versioned_protocol = if protocol_version != ValidationVersion::V3.into() {
433				return
434			} else {
435				protocol_version.try_into().expect("Qed, we checked above")
436			};
437
438			if let Some(ref mut authority_ids) = authority_ids {
439				authority_ids.retain(|a| match state.authorities.entry(a.clone()) {
440					Entry::Vacant(e) => {
441						e.insert(peer_id);
442						true
443					},
444					Entry::Occupied(e) => {
445						gum::debug!(
446							target: LOG_TARGET,
447							authority_id = ?a,
448							existing_peer = ?e.get(),
449							new_peer = ?peer_id,
450							"Ignoring new peer with duplicate authority ID as a bearer of that identity"
451						);
452
453						false
454					},
455				});
456			}
457
458			state.peers.insert(
459				peer_id,
460				PeerState {
461					view: View::default(),
462					implicit_view: HashSet::new(),
463					protocol_version: versioned_protocol,
464					discovery_ids: authority_ids,
465				},
466			);
467		},
468		NetworkBridgeEvent::PeerDisconnected(peer_id) => {
469			if let Some(p) = state.peers.remove(&peer_id) {
470				for discovery_key in p.discovery_ids.into_iter().flatten() {
471					state.authorities.remove(&discovery_key);
472				}
473			}
474		},
475		NetworkBridgeEvent::NewGossipTopology(topology) => {
476			let new_session_index = &topology.session;
477			let new_topology = &topology.topology;
478			let local_index = &topology.local_index;
479
480			if let Some(per_session) = state.per_session.get_mut(new_session_index) {
481				per_session.supply_topology(new_topology, *local_index);
482			} else {
483				state.unused_topologies.insert(*new_session_index, topology);
484			}
485
486			// TODO [https://github.com/paritytech/polkadot/issues/6194]
487			// technically, we should account for the fact that the session topology might
488			// come late, and for all relay-parents with this session, send all grid peers
489			// any `BackedCandidateInv` messages they might need.
490			//
491			// in practice, this is a small issue & the API of receiving topologies could
492			// be altered to fix it altogether.
493		},
494		NetworkBridgeEvent::PeerMessage(peer_id, message) => match message {
495			net_protocol::StatementDistributionMessage::V3(
496				protocol_v3::StatementDistributionMessage::Statement(relay_parent, statement),
497			) =>
498				handle_incoming_statement(
499					ctx,
500					state,
501					peer_id,
502					relay_parent,
503					statement,
504					reputation,
505					metrics,
506				)
507				.await,
508			net_protocol::StatementDistributionMessage::V3(
509				protocol_v3::StatementDistributionMessage::BackedCandidateManifest(inner),
510			) => handle_incoming_manifest(ctx, state, peer_id, inner, reputation, metrics).await,
511			net_protocol::StatementDistributionMessage::V3(
512				protocol_v3::StatementDistributionMessage::BackedCandidateKnown(inner),
513			) =>
514				handle_incoming_acknowledgement(ctx, state, peer_id, inner, reputation, metrics)
515					.await,
516		},
517		NetworkBridgeEvent::PeerViewChange(peer_id, view) =>
518			handle_peer_view_update(ctx, state, peer_id, view, metrics).await,
519		NetworkBridgeEvent::OurViewChange(_view) => {
520			// handled by `handle_activated_leaf`
521		},
522		NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
523			gum::trace!(
524				target: LOG_TARGET,
525				?peer_id,
526				?authority_ids,
527				"Updated `AuthorityDiscoveryId`s"
528			);
529
530			// defensive: ensure peers are actually connected
531			let peer_state = match state.peers.get_mut(&peer_id) {
532				None => return,
533				Some(p) => p,
534			};
535
536			// Remove the authority IDs which were previously mapped to the peer
537			// but aren't part of the new set.
538			state.authorities.retain(|a, p| p != &peer_id || authority_ids.contains(a));
539
540			// Map the new authority IDs to the peer.
541			for a in authority_ids.iter().cloned() {
542				state.authorities.insert(a, peer_id);
543			}
544
545			peer_state.discovery_ids = Some(authority_ids);
546		},
547	}
548}
549
550#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
551async fn handle_active_leaf_update<Context>(
552	ctx: &mut Context,
553	state: &mut State,
554	new_relay_parent: Hash,
555) -> JfyiErrorResult<()> {
556	let disabled_validators: HashSet<_> =
557		polkadot_node_subsystem_util::request_disabled_validators(new_relay_parent, ctx.sender())
558			.await
559			.await
560			.map_err(JfyiError::RuntimeApiUnavailable)?
561			.map_err(JfyiError::FetchDisabledValidators)?
562			.into_iter()
563			.collect();
564
565	let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
566		new_relay_parent,
567		ctx.sender(),
568	)
569	.await
570	.await
571	.map_err(JfyiError::RuntimeApiUnavailable)?
572	.map_err(JfyiError::FetchSessionIndex)?;
573
574	if !state.per_session.contains_key(&session_index) {
575		let session_info = polkadot_node_subsystem_util::request_session_info(
576			new_relay_parent,
577			session_index,
578			ctx.sender(),
579		)
580		.await
581		.await
582		.map_err(JfyiError::RuntimeApiUnavailable)?
583		.map_err(JfyiError::FetchSessionInfo)?;
584
585		let session_info = match session_info {
586			None => {
587				gum::warn!(
588					target: LOG_TARGET,
589					relay_parent = ?new_relay_parent,
590					"No session info available for current session"
591				);
592
593				return Ok(())
594			},
595			Some(s) => s,
596		};
597
598		let minimum_backing_votes =
599			request_min_backing_votes(new_relay_parent, session_index, ctx.sender())
600				.await
601				.await
602				.map_err(JfyiError::RuntimeApiUnavailable)?
603				.map_err(JfyiError::FetchMinimumBackingVotes)?;
604		let node_features = request_node_features(new_relay_parent, session_index, ctx.sender())
605			.await
606			.await
607			.map_err(JfyiError::RuntimeApiUnavailable)?
608			.map_err(JfyiError::FetchNodeFeatures)?;
609		let mut per_session_state = PerSessionState::new(
610			session_info,
611			&state.keystore,
612			minimum_backing_votes,
613			node_features
614				.get(FeatureIndex::CandidateReceiptV2 as usize)
615				.map(|b| *b)
616				.unwrap_or(false),
617		);
618		if let Some(topology) = state.unused_topologies.remove(&session_index) {
619			per_session_state.supply_topology(&topology.topology, topology.local_index);
620		}
621		state.per_session.insert(session_index, per_session_state);
622	}
623
624	let per_session = state
625		.per_session
626		.get_mut(&session_index)
627		.expect("either existed or just inserted; qed");
628
629	if !disabled_validators.is_empty() {
630		gum::debug!(
631			target: LOG_TARGET,
632			relay_parent = ?new_relay_parent,
633			?session_index,
634			?disabled_validators,
635			"Disabled validators detected"
636		);
637	}
638
639	let group_rotation_info =
640		polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
641			.await
642			.await
643			.map_err(JfyiError::RuntimeApiUnavailable)?
644			.map_err(JfyiError::FetchValidatorGroups)?
645			.1;
646
647	let claim_queue = ClaimQueueSnapshot(
648		polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender())
649			.await
650			.await
651			.map_err(JfyiError::RuntimeApiUnavailable)?
652			.map_err(JfyiError::FetchClaimQueue)?,
653	);
654
655	let (groups_per_para, assignments_per_group) = determine_group_assignments(
656		per_session.groups.all().len(),
657		group_rotation_info,
658		&claim_queue,
659	)
660	.await;
661
662	let local_validator = per_session.local_validator.and_then(|v| {
663		if let LocalValidatorIndex::Active(idx) = v {
664			find_active_validator_state(idx, &per_session.groups, &assignments_per_group)
665		} else {
666			Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
667		}
668	});
669
670	let transposed_cq = transpose_claim_queue(claim_queue.0);
671
672	state.per_relay_parent.insert(
673		new_relay_parent,
674		PerRelayParentState {
675			local_validator,
676			statement_store: StatementStore::new(&per_session.groups),
677			session: session_index,
678			groups_per_para,
679			disabled_validators,
680			transposed_cq,
681			assignments_per_group,
682		},
683	);
684	Ok(())
685}
686
687/// Called on new leaf updates.
688#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
689pub(crate) async fn handle_active_leaves_update<Context>(
690	ctx: &mut Context,
691	state: &mut State,
692	activated: &ActivatedLeaf,
693	metrics: &Metrics,
694) -> JfyiErrorResult<()> {
695	state
696		.implicit_view
697		.activate_leaf(ctx.sender(), activated.hash)
698		.await
699		.map_err(JfyiError::ActivateLeafFailure)?;
700
701	let new_relay_parents =
702		state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
703
704	for new_relay_parent in new_relay_parents.iter().cloned() {
705		if state.per_relay_parent.contains_key(&new_relay_parent) {
706			continue
707		}
708
709		if let Err(err) = handle_active_leaf_update(ctx, state, new_relay_parent).await {
710			gum::warn!(
711				target: LOG_TARGET,
712				relay_parent = ?new_relay_parent,
713				error = ?err,
714				"Failed to handle active leaf update"
715			);
716			continue
717		}
718	}
719
720	gum::debug!(
721		target: LOG_TARGET,
722		"Activated leaves. Now tracking {} relay-parents across {} sessions",
723		state.per_relay_parent.len(),
724		state.per_session.len(),
725	);
726
727	// Reconcile all peers' views with the active leaf and any relay parents
728	// it implies. If they learned about the block before we did, this reconciliation will give
729	// non-empty results and we should send them messages concerning all activated relay-parents.
730	{
731		let mut update_peers = Vec::new();
732		for (peer, peer_state) in state.peers.iter_mut() {
733			let fresh = peer_state.reconcile_active_leaf(activated.hash, &new_relay_parents);
734			if !fresh.is_empty() {
735				update_peers.push((*peer, fresh));
736			}
737		}
738
739		for (peer, fresh) in update_peers {
740			for fresh_relay_parent in fresh {
741				send_peer_messages_for_relay_parent(ctx, state, peer, fresh_relay_parent, metrics)
742					.await;
743			}
744		}
745	}
746
747	new_leaf_fragment_chain_updates(ctx, state, activated.hash).await;
748
749	Ok(())
750}
751
752fn find_active_validator_state(
753	validator_index: ValidatorIndex,
754	groups: &Groups,
755	assignments_per_group: &HashMap<GroupIndex, Vec<ParaId>>,
756) -> Option<LocalValidatorState> {
757	if groups.all().is_empty() {
758		return None
759	}
760
761	let our_group = groups.by_validator_index(validator_index)?;
762
763	let group_validators = groups.get(our_group)?.to_owned();
764	let paras_assigned_to_core = assignments_per_group.get(&our_group).cloned().unwrap_or_default();
765	let seconding_limit = paras_assigned_to_core.len();
766
767	Some(LocalValidatorState {
768		active: Some(ActiveValidatorState {
769			index: validator_index,
770			group: our_group,
771			cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
772				.expect("group is non-empty because we are in it; qed"),
773			assignments: paras_assigned_to_core.clone(),
774		}),
775		grid_tracker: GridTracker::default(),
776	})
777}
778
779pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) {
780	// deactivate the leaf in the implicit view.
781	for leaf in leaves {
782		let pruned = state.implicit_view.deactivate_leaf(*leaf);
783		for pruned_rp in pruned {
784			// clean up per-relay-parent data based on everything removed.
785			state
786				.per_relay_parent
787				.remove(&pruned_rp)
788				.as_ref()
789				.and_then(|pruned| pruned.active_validator_state())
790				.map(|active_state| {
791					active_state.cluster_tracker.warn_if_too_many_pending_statements(pruned_rp)
792				});
793
794			// clean up requests related to this relay parent.
795			state.request_manager.remove_by_relay_parent(*leaf);
796		}
797	}
798
799	state
800		.candidates
801		.on_deactivate_leaves(&leaves, |h| state.per_relay_parent.contains_key(h));
802
803	// clean up sessions based on everything remaining.
804	let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect();
805	state.per_session.retain(|s, _| sessions.contains(s));
806
807	let last_session_index = state.unused_topologies.keys().max().copied();
808	// Do not clean-up the last saved toplogy unless we moved to the next session
809	// This is needed because handle_deactive_leaves, gets also called when
810	// prospective_parachains APIs are not present, so we would actually remove
811	// the topology without using it because `per_relay_parent` is empty until
812	// prospective_parachains gets enabled
813	state
814		.unused_topologies
815		.retain(|s, _| sessions.contains(s) || last_session_index == Some(*s));
816}
817
818#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
819async fn handle_peer_view_update<Context>(
820	ctx: &mut Context,
821	state: &mut State,
822	peer: PeerId,
823	new_view: View,
824	metrics: &Metrics,
825) {
826	let fresh_implicit = {
827		let peer_data = match state.peers.get_mut(&peer) {
828			None => return,
829			Some(p) => p,
830		};
831
832		peer_data.update_view(new_view, &state.implicit_view)
833	};
834
835	for new_relay_parent in fresh_implicit {
836		send_peer_messages_for_relay_parent(ctx, state, peer, new_relay_parent, metrics).await;
837	}
838}
839
840// Returns an iterator over known validator indices, given an iterator over discovery IDs
841// and a mapping from discovery IDs to validator indices.
842fn find_validator_ids<'a>(
843	known_discovery_ids: impl IntoIterator<Item = &'a AuthorityDiscoveryId>,
844	discovery_mapping: impl Fn(&AuthorityDiscoveryId) -> Option<&'a ValidatorIndex>,
845) -> impl Iterator<Item = ValidatorIndex> {
846	known_discovery_ids.into_iter().filter_map(discovery_mapping).cloned()
847}
848
849/// Send a peer, apparently just becoming aware of a relay-parent, all messages
850/// concerning that relay-parent.
851///
852/// In particular, we send all statements pertaining to our common cluster,
853/// as well as all manifests, acknowledgements, or other grid statements.
854///
855/// Note that due to the way we handle views, our knowledge of peers' relay parents
856/// may "oscillate" with relay parents repeatedly leaving and entering the
857/// view of a peer based on the implicit view of active leaves.
858///
859/// This function is designed to be cheap and not to send duplicate messages in repeated
860/// cases.
861#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
862async fn send_peer_messages_for_relay_parent<Context>(
863	ctx: &mut Context,
864	state: &mut State,
865	peer: PeerId,
866	relay_parent: Hash,
867	metrics: &Metrics,
868) {
869	let peer_data = match state.peers.get_mut(&peer) {
870		None => return,
871		Some(p) => p,
872	};
873
874	let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
875		None => return,
876		Some(s) => s,
877	};
878
879	let per_session_state = match state.per_session.get(&relay_parent_state.session) {
880		None => return,
881		Some(s) => s,
882	};
883
884	for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
885		per_session_state.authority_lookup.get(a)
886	}) {
887		if let Some(active) = relay_parent_state
888			.local_validator
889			.as_mut()
890			.and_then(|local| local.active.as_mut())
891		{
892			send_pending_cluster_statements(
893				ctx,
894				relay_parent,
895				&(peer, peer_data.protocol_version),
896				validator_id,
897				&mut active.cluster_tracker,
898				&state.candidates,
899				&relay_parent_state.statement_store,
900				metrics,
901			)
902			.await;
903		}
904
905		send_pending_grid_messages(
906			ctx,
907			relay_parent,
908			&(peer, peer_data.protocol_version),
909			validator_id,
910			&per_session_state.groups,
911			relay_parent_state,
912			&state.candidates,
913			metrics,
914		)
915		.await;
916	}
917}
918
919fn pending_statement_network_message(
920	statement_store: &StatementStore,
921	relay_parent: Hash,
922	peer: &(PeerId, ValidationVersion),
923	originator: ValidatorIndex,
924	compact: CompactStatement,
925) -> Option<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> {
926	match peer.1 {
927		ValidationVersion::V3 => statement_store
928			.validator_statement(originator, compact)
929			.map(|s| s.as_unchecked().clone())
930			.map(|signed| {
931				protocol_v3::StatementDistributionMessage::Statement(relay_parent, signed)
932			})
933			.map(|msg| (vec![peer.0], ValidationProtocols::V3(msg).into())),
934	}
935}
936
937/// Send a peer all pending cluster statements for a relay parent.
938#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
939async fn send_pending_cluster_statements<Context>(
940	ctx: &mut Context,
941	relay_parent: Hash,
942	peer_id: &(PeerId, ValidationVersion),
943	peer_validator_id: ValidatorIndex,
944	cluster_tracker: &mut ClusterTracker,
945	candidates: &Candidates,
946	statement_store: &StatementStore,
947	metrics: &Metrics,
948) {
949	let pending_statements = cluster_tracker.pending_statements_for(peer_validator_id);
950	let network_messages = pending_statements
951		.into_iter()
952		.filter_map(|(originator, compact)| {
953			if !candidates.is_confirmed(compact.candidate_hash()) {
954				return None
955			}
956
957			let res = pending_statement_network_message(
958				&statement_store,
959				relay_parent,
960				peer_id,
961				originator,
962				compact.clone(),
963			);
964
965			if res.is_some() {
966				cluster_tracker.note_sent(peer_validator_id, originator, compact);
967			}
968
969			res
970		})
971		.collect::<Vec<_>>();
972
973	if !network_messages.is_empty() {
974		let count = network_messages.len();
975		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(network_messages))
976			.await;
977		metrics.on_statements_distributed(count);
978	}
979}
980
981/// Send a peer all pending grid messages / acknowledgements / follow up statements
982/// upon learning about a new relay parent.
983#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
984async fn send_pending_grid_messages<Context>(
985	ctx: &mut Context,
986	relay_parent: Hash,
987	peer_id: &(PeerId, ValidationVersion),
988	peer_validator_id: ValidatorIndex,
989	groups: &Groups,
990	relay_parent_state: &mut PerRelayParentState,
991	candidates: &Candidates,
992	metrics: &Metrics,
993) {
994	let pending_manifests = {
995		let local_validator = match relay_parent_state.local_validator.as_mut() {
996			None => return,
997			Some(l) => l,
998		};
999
1000		let grid_tracker = &mut local_validator.grid_tracker;
1001		grid_tracker.pending_manifests_for(peer_validator_id)
1002	};
1003
1004	let mut messages: Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)> = Vec::new();
1005	let mut statements_count = 0;
1006	for (candidate_hash, kind) in pending_manifests {
1007		let confirmed_candidate = match candidates.get_confirmed(&candidate_hash) {
1008			None => continue, // sanity
1009			Some(c) => c,
1010		};
1011
1012		let group_index = confirmed_candidate.group_index();
1013
1014		let local_knowledge = {
1015			let group_size = match groups.get(group_index) {
1016				None => return, // sanity
1017				Some(x) => x.len(),
1018			};
1019
1020			local_knowledge_filter(
1021				group_size,
1022				group_index,
1023				candidate_hash,
1024				&relay_parent_state.statement_store,
1025			)
1026		};
1027
1028		match kind {
1029			grid::ManifestKind::Full => {
1030				let manifest = protocol_v3::BackedCandidateManifest {
1031					relay_parent,
1032					candidate_hash,
1033					group_index,
1034					para_id: confirmed_candidate.para_id(),
1035					parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
1036					statement_knowledge: local_knowledge.clone(),
1037				};
1038
1039				let grid = &mut relay_parent_state
1040					.local_validator
1041					.as_mut()
1042					.expect("determined to be some earlier in this function; qed")
1043					.grid_tracker;
1044
1045				grid.manifest_sent_to(
1046					groups,
1047					peer_validator_id,
1048					candidate_hash,
1049					local_knowledge.clone(),
1050				);
1051				match peer_id.1 {
1052					ValidationVersion::V3 => messages.push((
1053						vec![peer_id.0],
1054						ValidationProtocols::V3(
1055							protocol_v3::StatementDistributionMessage::BackedCandidateManifest(
1056								manifest,
1057							),
1058						)
1059						.into(),
1060					)),
1061				};
1062			},
1063			grid::ManifestKind::Acknowledgement => {
1064				let (m, c) = acknowledgement_and_statement_messages(
1065					peer_id,
1066					peer_validator_id,
1067					groups,
1068					relay_parent_state,
1069					relay_parent,
1070					group_index,
1071					candidate_hash,
1072					local_knowledge,
1073				);
1074				messages.extend(m);
1075				statements_count += c;
1076			},
1077		}
1078	}
1079
1080	// Send all remaining pending grid statements for a validator, not just
1081	// those for the acknowledgements we've sent.
1082	//
1083	// otherwise, we might receive statements while the grid peer is "out of view" and then
1084	// not send them when they get back "in view". problem!
1085	{
1086		let grid_tracker = &mut relay_parent_state
1087			.local_validator
1088			.as_mut()
1089			.expect("checked earlier; qed")
1090			.grid_tracker;
1091
1092		let pending_statements = grid_tracker.all_pending_statements_for(peer_validator_id);
1093
1094		let extra_statements = pending_statements
1095			.into_iter()
1096			.filter_map(|(originator, compact)| {
1097				let res = pending_statement_network_message(
1098					&relay_parent_state.statement_store,
1099					relay_parent,
1100					peer_id,
1101					originator,
1102					compact.clone(),
1103				);
1104
1105				if res.is_some() {
1106					grid_tracker.sent_or_received_direct_statement(
1107						groups,
1108						originator,
1109						peer_validator_id,
1110						&compact,
1111						false,
1112					);
1113				}
1114
1115				res
1116			})
1117			.collect::<Vec<_>>();
1118
1119		statements_count += extra_statements.len();
1120		messages.extend(extra_statements);
1121	}
1122
1123	if !messages.is_empty() {
1124		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
1125		metrics.on_statements_distributed(statements_count);
1126	}
1127}
1128
1129// Imports a locally originating statement and distributes it to peers.
1130#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1131pub(crate) async fn share_local_statement<Context>(
1132	ctx: &mut Context,
1133	state: &mut State,
1134	relay_parent: Hash,
1135	statement: SignedFullStatementWithPVD,
1136	reputation: &mut ReputationAggregator,
1137	metrics: &Metrics,
1138) -> JfyiErrorResult<()> {
1139	let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1140		None => return Err(JfyiError::InvalidShare),
1141		Some(x) => x,
1142	};
1143
1144	gum::debug!(
1145		target: LOG_TARGET,
1146		statement = ?statement.payload().to_compact(),
1147		"Sharing Statement",
1148	);
1149
1150	let per_session = match state.per_session.get(&per_relay_parent.session) {
1151		Some(s) => s,
1152		None => return Ok(()),
1153	};
1154
1155	let (local_index, local_assignments, local_group) =
1156		match per_relay_parent.active_validator_state() {
1157			None => return Err(JfyiError::InvalidShare),
1158			Some(l) => (l.index, &l.assignments, l.group),
1159		};
1160
1161	// Two possibilities: either the statement is `Seconded` or we already
1162	// have the candidate. Sanity: check the para-id is valid.
1163	let expected = match statement.payload() {
1164		FullStatementWithPVD::Seconded(ref c, _) =>
1165			Some((c.descriptor.para_id(), c.descriptor.relay_parent())),
1166		FullStatementWithPVD::Valid(hash) =>
1167			state.candidates.get_confirmed(&hash).map(|c| (c.para_id(), c.relay_parent())),
1168	};
1169
1170	let is_seconded = match statement.payload() {
1171		FullStatementWithPVD::Seconded(_, _) => true,
1172		FullStatementWithPVD::Valid(_) => false,
1173	};
1174
1175	let (expected_para, expected_relay_parent) = match expected {
1176		None => return Err(JfyiError::InvalidShare),
1177		Some(x) => x,
1178	};
1179
1180	if local_index != statement.validator_index() {
1181		return Err(JfyiError::InvalidShare)
1182	}
1183
1184	let seconding_limit = local_assignments.len();
1185
1186	if is_seconded &&
1187		per_relay_parent.statement_store.seconded_count(&local_index) >= seconding_limit
1188	{
1189		gum::warn!(
1190			target: LOG_TARGET,
1191			limit = ?seconding_limit,
1192			"Local node has issued too many `Seconded` statements",
1193		);
1194		return Err(JfyiError::InvalidShare)
1195	}
1196
1197	if !local_assignments.contains(&expected_para) || relay_parent != expected_relay_parent {
1198		return Err(JfyiError::InvalidShare)
1199	}
1200
1201	let mut post_confirmation = None;
1202
1203	// Insert candidate if unknown + more sanity checks.
1204	let compact_statement = {
1205		let compact_statement = FullStatementWithPVD::signed_to_compact(statement.clone());
1206		let candidate_hash = CandidateHash(*statement.payload().candidate_hash());
1207
1208		if let FullStatementWithPVD::Seconded(ref c, ref pvd) = statement.payload() {
1209			post_confirmation = state.candidates.confirm_candidate(
1210				candidate_hash,
1211				c.clone(),
1212				pvd.clone(),
1213				local_group,
1214			);
1215		};
1216
1217		match per_relay_parent.statement_store.insert(
1218			&per_session.groups,
1219			compact_statement.clone(),
1220			StatementOrigin::Local,
1221		) {
1222			Ok(false) | Err(_) => {
1223				gum::warn!(
1224					target: LOG_TARGET,
1225					statement = ?compact_statement.payload(),
1226					"Candidate backing issued redundant statement?",
1227				);
1228				return Err(JfyiError::InvalidShare)
1229			},
1230			Ok(true) => {},
1231		}
1232
1233		{
1234			let l = per_relay_parent.active_validator_state_mut().expect("checked above; qed");
1235			l.cluster_tracker.note_issued(local_index, compact_statement.payload().clone());
1236		}
1237
1238		if let Some(ref session_topology) = per_session.grid_view {
1239			let l = per_relay_parent.local_validator.as_mut().expect("checked above; qed");
1240			l.grid_tracker.learned_fresh_statement(
1241				&per_session.groups,
1242				session_topology,
1243				local_index,
1244				&compact_statement.payload(),
1245			);
1246		}
1247
1248		compact_statement
1249	};
1250
1251	// send the compact version of the statement to any peers which need it.
1252	circulate_statement(
1253		ctx,
1254		relay_parent,
1255		per_relay_parent,
1256		per_session,
1257		&state.candidates,
1258		&state.authorities,
1259		&state.peers,
1260		compact_statement,
1261		metrics,
1262	)
1263	.await;
1264
1265	if let Some(post_confirmation) = post_confirmation {
1266		apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
1267	}
1268
1269	Ok(())
1270}
1271
1272// two kinds of targets: those in our 'cluster' (currently just those in the same group),
1273// and those we are propagating to through the grid.
1274#[derive(Debug)]
1275enum DirectTargetKind {
1276	Cluster,
1277	Grid,
1278}
1279
1280// Circulates a compact statement to all peers who need it: those in the current group of the
1281// local validator and grid peers which have already indicated that they know the candidate as
1282// backed.
1283//
1284// We only circulate statements for which we have the confirmed candidate, even to the local group.
1285//
1286// The group index which is _canonically assigned_ to this parachain must be
1287// specified already. This function should not be used when the candidate receipt and
1288// therefore the canonical group for the parachain is unknown.
1289//
1290// preconditions: the candidate entry exists in the state under the relay parent
1291// and the statement has already been imported into the entry. If this is a `Valid`
1292// statement, then there must be at least one `Seconded` statement.
1293#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1294async fn circulate_statement<Context>(
1295	ctx: &mut Context,
1296	relay_parent: Hash,
1297	relay_parent_state: &mut PerRelayParentState,
1298	per_session: &PerSessionState,
1299	candidates: &Candidates,
1300	authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
1301	peers: &HashMap<PeerId, PeerState>,
1302	statement: SignedStatement,
1303	metrics: &Metrics,
1304) {
1305	let session_info = &per_session.session_info;
1306
1307	let candidate_hash = *statement.payload().candidate_hash();
1308
1309	let compact_statement = statement.payload().clone();
1310	let is_confirmed = candidates.is_confirmed(&candidate_hash);
1311
1312	let originator = statement.validator_index();
1313	let (local_validator, targets) = {
1314		let local_validator = match relay_parent_state.local_validator.as_mut() {
1315			Some(v) => v,
1316			None => return, // sanity: nothing to propagate if not a validator.
1317		};
1318
1319		let statement_group = per_session.groups.by_validator_index(originator);
1320
1321		// We're not meant to circulate statements in the cluster until we have the confirmed
1322		// candidate.
1323		//
1324		// Cluster is only relevant if local node is an active validator.
1325		let (cluster_relevant, cluster_targets, all_cluster_targets) = local_validator
1326			.active
1327			.as_mut()
1328			.map(|active| {
1329				let cluster_relevant = Some(active.group) == statement_group;
1330				let cluster_targets = if is_confirmed && cluster_relevant {
1331					Some(
1332						active
1333							.cluster_tracker
1334							.targets()
1335							.iter()
1336							.filter(|&&v| {
1337								active
1338									.cluster_tracker
1339									.can_send(v, originator, compact_statement.clone())
1340									.is_ok()
1341							})
1342							.filter(|&v| v != &active.index)
1343							.map(|v| (*v, DirectTargetKind::Cluster)),
1344					)
1345				} else {
1346					None
1347				};
1348				let all_cluster_targets = active.cluster_tracker.targets();
1349				(cluster_relevant, cluster_targets, all_cluster_targets)
1350			})
1351			.unwrap_or((false, None, &[]));
1352
1353		let grid_targets = local_validator
1354			.grid_tracker
1355			.direct_statement_targets(&per_session.groups, originator, &compact_statement)
1356			.into_iter()
1357			.filter(|v| !cluster_relevant || !all_cluster_targets.contains(v))
1358			.map(|v| (v, DirectTargetKind::Grid));
1359
1360		let targets = cluster_targets
1361			.into_iter()
1362			.flatten()
1363			.chain(grid_targets)
1364			.filter_map(|(v, k)| {
1365				session_info.discovery_keys.get(v.0 as usize).map(|a| (v, a.clone(), k))
1366			})
1367			.collect::<Vec<_>>();
1368
1369		(local_validator, targets)
1370	};
1371
1372	let mut statement_to_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1373	for (target, authority_id, kind) in targets {
1374		// Find peer ID based on authority ID, and also filter to connected.
1375		let peer_id: (PeerId, ProtocolVersion) = match authorities.get(&authority_id) {
1376			Some(p) if peers.get(p).map_or(false, |p| p.knows_relay_parent(&relay_parent)) => (
1377				*p,
1378				peers
1379					.get(p)
1380					.expect("Qed, can't fail because it was checked above")
1381					.protocol_version
1382					.into(),
1383			),
1384			None | Some(_) => continue,
1385		};
1386
1387		match kind {
1388			DirectTargetKind::Cluster => {
1389				let active = local_validator
1390					.active
1391					.as_mut()
1392					.expect("cluster target means local is active validator; qed");
1393
1394				// At this point, all peers in the cluster should 'know'
1395				// the candidate, so we don't expect for this to fail.
1396				if let Ok(()) =
1397					active.cluster_tracker.can_send(target, originator, compact_statement.clone())
1398				{
1399					active.cluster_tracker.note_sent(target, originator, compact_statement.clone());
1400					statement_to_peers.push(peer_id);
1401				}
1402			},
1403			DirectTargetKind::Grid => {
1404				statement_to_peers.push(peer_id);
1405				local_validator.grid_tracker.sent_or_received_direct_statement(
1406					&per_session.groups,
1407					originator,
1408					target,
1409					&compact_statement,
1410					false,
1411				);
1412			},
1413		}
1414	}
1415
1416	let statement_to_v3_peers =
1417		filter_by_peer_version(&statement_to_peers, ValidationVersion::V3.into());
1418
1419	// ship off the network messages to the network bridge.
1420	if !statement_to_v3_peers.is_empty() {
1421		gum::debug!(
1422			target: LOG_TARGET,
1423			?compact_statement,
1424			n_peers = ?statement_to_peers.len(),
1425			"Sending statement to v3 peers",
1426		);
1427
1428		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
1429			statement_to_v3_peers,
1430			ValidationProtocols::V3(protocol_v3::StatementDistributionMessage::Statement(
1431				relay_parent,
1432				statement.as_unchecked().clone(),
1433			))
1434			.into(),
1435		))
1436		.await;
1437		metrics.on_statement_distributed();
1438	}
1439}
1440/// Check a statement signature under this parent hash.
1441fn check_statement_signature(
1442	session_index: SessionIndex,
1443	validators: &IndexedVec<ValidatorIndex, ValidatorId>,
1444	relay_parent: Hash,
1445	statement: UncheckedSignedStatement,
1446) -> std::result::Result<SignedStatement, UncheckedSignedStatement> {
1447	let signing_context = SigningContext { session_index, parent_hash: relay_parent };
1448
1449	validators
1450		.get(statement.unchecked_validator_index())
1451		.ok_or_else(|| statement.clone())
1452		.and_then(|v| statement.try_into_checked(&signing_context, v))
1453}
1454
1455/// Modify the reputation of a peer based on its behavior.
1456async fn modify_reputation(
1457	reputation: &mut ReputationAggregator,
1458	sender: &mut impl overseer::StatementDistributionSenderTrait,
1459	peer: PeerId,
1460	rep: Rep,
1461) {
1462	reputation.modify(sender, peer, rep).await;
1463}
1464
1465/// Handle an incoming statement.
1466///
1467/// This checks whether the sender is allowed to send the statement,
1468/// either via the cluster or the grid.
1469///
1470/// This also checks the signature of the statement.
1471/// If the statement is fresh, this function guarantees that after completion
1472///   - The statement is re-circulated to all relevant peers in both the cluster and the grid
1473///   - If the candidate is out-of-cluster and is backable and importable, all statements about the
1474///     candidate have been sent to backing
1475///   - If the candidate is in-cluster and is importable, the statement has been sent to backing
1476#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1477async fn handle_incoming_statement<Context>(
1478	ctx: &mut Context,
1479	state: &mut State,
1480	peer: PeerId,
1481	relay_parent: Hash,
1482	statement: UncheckedSignedStatement,
1483	reputation: &mut ReputationAggregator,
1484	metrics: &Metrics,
1485) {
1486	let peer_state = match state.peers.get(&peer) {
1487		None => {
1488			// sanity: should be impossible.
1489			return
1490		},
1491		Some(p) => p,
1492	};
1493
1494	// Ensure we know the relay parent.
1495	let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1496		None => {
1497			modify_reputation(
1498				reputation,
1499				ctx.sender(),
1500				peer,
1501				COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE,
1502			)
1503			.await;
1504			return
1505		},
1506		Some(p) => p,
1507	};
1508
1509	let per_session = match state.per_session.get(&per_relay_parent.session) {
1510		None => {
1511			gum::warn!(
1512				target: LOG_TARGET,
1513				session = ?per_relay_parent.session,
1514				"Missing expected session info.",
1515			);
1516
1517			return
1518		},
1519		Some(s) => s,
1520	};
1521	let session_info = &per_session.session_info;
1522
1523	if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
1524		gum::debug!(
1525			target: LOG_TARGET,
1526			?relay_parent,
1527			validator_index = ?statement.unchecked_validator_index(),
1528			"Ignoring a statement from disabled validator."
1529		);
1530		modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
1531		return
1532	}
1533
1534	let local_validator = match per_relay_parent.local_validator.as_mut() {
1535		None => {
1536			// we shouldn't be receiving statements unless we're a validator
1537			// this session.
1538			if per_session.is_not_validator() {
1539				modify_reputation(
1540					reputation,
1541					ctx.sender(),
1542					peer,
1543					COST_UNEXPECTED_STATEMENT_NOT_VALIDATOR,
1544				)
1545				.await;
1546			}
1547			return
1548		},
1549		Some(l) => l,
1550	};
1551
1552	let originator_group =
1553		match per_session.groups.by_validator_index(statement.unchecked_validator_index()) {
1554			Some(g) => g,
1555			None => {
1556				modify_reputation(
1557					reputation,
1558					ctx.sender(),
1559					peer,
1560					COST_UNEXPECTED_STATEMENT_VALIDATOR_NOT_FOUND,
1561				)
1562				.await;
1563				return
1564			},
1565		};
1566
1567	let (active, cluster_sender_index) = {
1568		// This block of code only returns `Some` when both the originator and
1569		// the sending peer are in the cluster.
1570		let active = local_validator.active.as_mut();
1571
1572		let allowed_senders = active
1573			.as_ref()
1574			.map(|active| {
1575				active
1576					.cluster_tracker
1577					.senders_for_originator(statement.unchecked_validator_index())
1578			})
1579			.unwrap_or_default();
1580
1581		let idx = allowed_senders
1582			.iter()
1583			.filter_map(|i| session_info.discovery_keys.get(i.0 as usize).map(|ad| (*i, ad)))
1584			.filter(|(_, ad)| peer_state.is_authority(ad))
1585			.map(|(i, _)| i)
1586			.next();
1587		(active, idx)
1588	};
1589
1590	let checked_statement = if let Some((active, cluster_sender_index)) =
1591		active.zip(cluster_sender_index)
1592	{
1593		match handle_cluster_statement(
1594			relay_parent,
1595			&mut active.cluster_tracker,
1596			per_relay_parent.session,
1597			&per_session.session_info,
1598			statement,
1599			cluster_sender_index,
1600		) {
1601			Ok(Some(s)) => s,
1602			Ok(None) => return,
1603			Err(rep) => {
1604				modify_reputation(reputation, ctx.sender(), peer, rep).await;
1605				return
1606			},
1607		}
1608	} else {
1609		let grid_sender_index = local_validator
1610			.grid_tracker
1611			.direct_statement_providers(
1612				&per_session.groups,
1613				statement.unchecked_validator_index(),
1614				statement.unchecked_payload(),
1615			)
1616			.into_iter()
1617			.filter_map(|(i, validator_knows_statement)| {
1618				session_info
1619					.discovery_keys
1620					.get(i.0 as usize)
1621					.map(|ad| (i, ad, validator_knows_statement))
1622			})
1623			.filter(|(_, ad, _)| peer_state.is_authority(ad))
1624			.map(|(i, _, validator_knows_statement)| (i, validator_knows_statement))
1625			.next();
1626
1627		if let Some((grid_sender_index, validator_knows_statement)) = grid_sender_index {
1628			if !validator_knows_statement {
1629				match handle_grid_statement(
1630					relay_parent,
1631					&mut local_validator.grid_tracker,
1632					per_relay_parent.session,
1633					&per_session,
1634					statement,
1635					grid_sender_index,
1636				) {
1637					Ok(s) => s,
1638					Err(rep) => {
1639						modify_reputation(reputation, ctx.sender(), peer, rep).await;
1640						return
1641					},
1642				}
1643			} else {
1644				// Reward the peer for sending us the statement
1645				modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
1646				return;
1647			}
1648		} else {
1649			// Not a cluster or grid peer.
1650			modify_reputation(
1651				reputation,
1652				ctx.sender(),
1653				peer,
1654				COST_UNEXPECTED_STATEMENT_INVALID_SENDER,
1655			)
1656			.await;
1657			return
1658		}
1659	};
1660
1661	let statement = checked_statement.payload().clone();
1662	let originator_index = checked_statement.validator_index();
1663	let candidate_hash = *checked_statement.payload().candidate_hash();
1664
1665	// Insert an unconfirmed candidate entry if needed. Note that if the candidate is already
1666	// confirmed, this ensures that the assigned group of the originator matches the expected group
1667	// of the parachain.
1668	{
1669		let res = state.candidates.insert_unconfirmed(
1670			peer,
1671			candidate_hash,
1672			relay_parent,
1673			originator_group,
1674			None,
1675		);
1676
1677		if let Err(BadAdvertisement) = res {
1678			modify_reputation(
1679				reputation,
1680				ctx.sender(),
1681				peer,
1682				COST_UNEXPECTED_STATEMENT_BAD_ADVERTISE,
1683			)
1684			.await;
1685			return
1686		}
1687	}
1688
1689	let confirmed = state.candidates.get_confirmed(&candidate_hash);
1690	let is_confirmed = state.candidates.is_confirmed(&candidate_hash);
1691	if !is_confirmed {
1692		// If the candidate is not confirmed, note that we should attempt
1693		// to request it from the given peer.
1694		let mut request_entry =
1695			state
1696				.request_manager
1697				.get_or_insert(relay_parent, candidate_hash, originator_group);
1698
1699		request_entry.add_peer(peer);
1700
1701		// We only successfully accept statements from the grid on confirmed
1702		// candidates, therefore this check only passes if the statement is from the cluster
1703		request_entry.set_cluster_priority();
1704	}
1705
1706	let was_fresh = match per_relay_parent.statement_store.insert(
1707		&per_session.groups,
1708		checked_statement.clone(),
1709		StatementOrigin::Remote,
1710	) {
1711		Err(statement_store::Error::ValidatorUnknown) => {
1712			// sanity: should never happen.
1713			gum::warn!(
1714				target: LOG_TARGET,
1715				?relay_parent,
1716				validator_index = ?originator_index,
1717				"Error - accepted message from unknown validator."
1718			);
1719
1720			return
1721		},
1722		Ok(known) => known,
1723	};
1724
1725	if was_fresh {
1726		modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;
1727		let is_importable = state.candidates.is_importable(&candidate_hash);
1728
1729		if let Some(ref session_topology) = per_session.grid_view {
1730			local_validator.grid_tracker.learned_fresh_statement(
1731				&per_session.groups,
1732				session_topology,
1733				originator_index,
1734				&statement,
1735			);
1736		}
1737
1738		if let (true, &Some(confirmed)) = (is_importable, &confirmed) {
1739			send_backing_fresh_statements(
1740				ctx,
1741				candidate_hash,
1742				originator_group,
1743				&relay_parent,
1744				&mut *per_relay_parent,
1745				confirmed,
1746				per_session,
1747			)
1748			.await;
1749		}
1750
1751		// We always circulate statements at this point.
1752		circulate_statement(
1753			ctx,
1754			relay_parent,
1755			per_relay_parent,
1756			per_session,
1757			&state.candidates,
1758			&state.authorities,
1759			&state.peers,
1760			checked_statement,
1761			metrics,
1762		)
1763		.await;
1764	} else {
1765		modify_reputation(reputation, ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await;
1766	}
1767}
1768
1769/// Checks whether a statement is allowed, whether the signature is accurate,
1770/// and importing into the cluster tracker if successful.
1771///
1772/// if successful, this returns a checked signed statement if it should be imported
1773/// or otherwise an error indicating a reputational fault.
1774fn handle_cluster_statement(
1775	relay_parent: Hash,
1776	cluster_tracker: &mut ClusterTracker,
1777	session: SessionIndex,
1778	session_info: &SessionInfo,
1779	statement: UncheckedSignedStatement,
1780	cluster_sender_index: ValidatorIndex,
1781) -> Result<Option<SignedStatement>, Rep> {
1782	// additional cluster checks.
1783	let should_import = {
1784		match cluster_tracker.can_receive(
1785			cluster_sender_index,
1786			statement.unchecked_validator_index(),
1787			statement.unchecked_payload().clone(),
1788		) {
1789			Ok(ClusterAccept::Ok) => true,
1790			Ok(ClusterAccept::WithPrejudice) => false,
1791			Err(ClusterRejectIncoming::ExcessiveSeconded) => return Err(COST_EXCESSIVE_SECONDED),
1792			Err(ClusterRejectIncoming::CandidateUnknown | ClusterRejectIncoming::Duplicate) =>
1793				return Err(COST_UNEXPECTED_STATEMENT_CLUSTER_REJECTED),
1794			Err(ClusterRejectIncoming::NotInGroup) => {
1795				// sanity: shouldn't be possible; we already filtered this
1796				// out above.
1797				return Err(COST_UNEXPECTED_STATEMENT_NOT_IN_GROUP)
1798			},
1799		}
1800	};
1801
1802	// Ensure the statement is correctly signed.
1803	let checked_statement =
1804		match check_statement_signature(session, &session_info.validators, relay_parent, statement)
1805		{
1806			Ok(s) => s,
1807			Err(_) => return Err(COST_INVALID_SIGNATURE),
1808		};
1809
1810	cluster_tracker.note_received(
1811		cluster_sender_index,
1812		checked_statement.validator_index(),
1813		checked_statement.payload().clone(),
1814	);
1815
1816	Ok(if should_import { Some(checked_statement) } else { None })
1817}
1818
1819/// Checks whether the signature is accurate,
1820/// importing into the grid tracker if successful.
1821///
1822/// if successful, this returns a checked signed statement if it should be imported
1823/// or otherwise an error indicating a reputational fault.
1824fn handle_grid_statement(
1825	relay_parent: Hash,
1826	grid_tracker: &mut GridTracker,
1827	session: SessionIndex,
1828	per_session: &PerSessionState,
1829	statement: UncheckedSignedStatement,
1830	grid_sender_index: ValidatorIndex,
1831) -> Result<SignedStatement, Rep> {
1832	// Ensure the statement is correctly signed.
1833	let checked_statement = match check_statement_signature(
1834		session,
1835		&per_session.session_info.validators,
1836		relay_parent,
1837		statement,
1838	) {
1839		Ok(s) => s,
1840		Err(_) => return Err(COST_INVALID_SIGNATURE),
1841	};
1842
1843	grid_tracker.sent_or_received_direct_statement(
1844		&per_session.groups,
1845		checked_statement.validator_index(),
1846		grid_sender_index,
1847		&checked_statement.payload(),
1848		true,
1849	);
1850
1851	Ok(checked_statement)
1852}
1853
1854/// Send backing fresh statements. This should only be performed on importable & confirmed
1855/// candidates.
1856#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1857async fn send_backing_fresh_statements<Context>(
1858	ctx: &mut Context,
1859	candidate_hash: CandidateHash,
1860	group_index: GroupIndex,
1861	relay_parent: &Hash,
1862	relay_parent_state: &mut PerRelayParentState,
1863	confirmed: &candidates::ConfirmedCandidate,
1864	per_session: &PerSessionState,
1865) {
1866	let group_validators = per_session.groups.get(group_index).unwrap_or(&[]);
1867	let mut imported = Vec::new();
1868
1869	for statement in relay_parent_state
1870		.statement_store
1871		.fresh_statements_for_backing(group_validators, candidate_hash)
1872	{
1873		let v = statement.validator_index();
1874		let compact = statement.payload().clone();
1875		imported.push((v, compact));
1876		let carrying_pvd = statement
1877			.clone()
1878			.convert_to_superpayload_with(|statement| match statement {
1879				CompactStatement::Seconded(_) => FullStatementWithPVD::Seconded(
1880					(&**confirmed.candidate_receipt()).clone(),
1881					confirmed.persisted_validation_data().clone(),
1882				),
1883				CompactStatement::Valid(c_hash) => FullStatementWithPVD::Valid(c_hash),
1884			})
1885			.expect("statements refer to same candidate; qed");
1886
1887		ctx.send_message(CandidateBackingMessage::Statement(*relay_parent, carrying_pvd))
1888			.await;
1889	}
1890
1891	for (v, s) in imported {
1892		relay_parent_state.statement_store.note_known_by_backing(v, s);
1893	}
1894}
1895
1896fn local_knowledge_filter(
1897	group_size: usize,
1898	group_index: GroupIndex,
1899	candidate_hash: CandidateHash,
1900	statement_store: &StatementStore,
1901) -> StatementFilter {
1902	let mut f = StatementFilter::blank(group_size);
1903	statement_store.fill_statement_filter(group_index, candidate_hash, &mut f);
1904	f
1905}
1906
1907// This provides a backable candidate to the grid and dispatches backable candidate announcements
1908// and acknowledgements via the grid topology. If the session topology is not yet
1909// available, this will be a no-op.
1910#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
1911async fn provide_candidate_to_grid<Context>(
1912	ctx: &mut Context,
1913	candidate_hash: CandidateHash,
1914	relay_parent_state: &mut PerRelayParentState,
1915	confirmed_candidate: &candidates::ConfirmedCandidate,
1916	per_session: &PerSessionState,
1917	authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
1918	peers: &HashMap<PeerId, PeerState>,
1919	metrics: &Metrics,
1920) {
1921	let local_validator = match relay_parent_state.local_validator {
1922		Some(ref mut v) => v,
1923		None => return,
1924	};
1925
1926	let relay_parent = confirmed_candidate.relay_parent();
1927	let group_index = confirmed_candidate.group_index();
1928
1929	let grid_view = match per_session.grid_view {
1930		Some(ref t) => t,
1931		None => {
1932			gum::debug!(
1933				target: LOG_TARGET,
1934				session = relay_parent_state.session,
1935				"Cannot handle backable candidate due to lack of topology",
1936			);
1937
1938			return
1939		},
1940	};
1941
1942	let group_size = match per_session.groups.get(group_index) {
1943		None => {
1944			gum::warn!(
1945				target: LOG_TARGET,
1946				?candidate_hash,
1947				?relay_parent,
1948				?group_index,
1949				session = relay_parent_state.session,
1950				"Handled backed candidate with unknown group?",
1951			);
1952
1953			return
1954		},
1955		Some(g) => g.len(),
1956	};
1957
1958	let filter = local_knowledge_filter(
1959		group_size,
1960		group_index,
1961		candidate_hash,
1962		&relay_parent_state.statement_store,
1963	);
1964
1965	let actions = local_validator.grid_tracker.add_backed_candidate(
1966		grid_view,
1967		candidate_hash,
1968		group_index,
1969		filter.clone(),
1970	);
1971
1972	let manifest = protocol_v3::BackedCandidateManifest {
1973		relay_parent,
1974		candidate_hash,
1975		group_index,
1976		para_id: confirmed_candidate.para_id(),
1977		parent_head_data_hash: confirmed_candidate.parent_head_data_hash(),
1978		statement_knowledge: filter.clone(),
1979	};
1980	let acknowledgement = protocol_v3::BackedCandidateAcknowledgement {
1981		candidate_hash,
1982		statement_knowledge: filter.clone(),
1983	};
1984
1985	let mut manifest_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1986	let mut ack_peers: Vec<(PeerId, ProtocolVersion)> = Vec::new();
1987
1988	let mut post_statements = Vec::new();
1989	for (v, action) in actions {
1990		let p = match connected_validator_peer(authorities, per_session, v) {
1991			None => continue,
1992			Some(p) =>
1993				if peers.get(&p).map_or(false, |d| d.knows_relay_parent(&relay_parent)) {
1994					(p, peers.get(&p).expect("Qed, was checked above").protocol_version.into())
1995				} else {
1996					continue
1997				},
1998		};
1999
2000		match action {
2001			grid::ManifestKind::Full => manifest_peers.push(p),
2002			grid::ManifestKind::Acknowledgement => ack_peers.push(p),
2003		}
2004
2005		local_validator.grid_tracker.manifest_sent_to(
2006			&per_session.groups,
2007			v,
2008			candidate_hash,
2009			filter.clone(),
2010		);
2011		post_statements.extend(
2012			post_acknowledgement_statement_messages(
2013				v,
2014				relay_parent,
2015				&mut local_validator.grid_tracker,
2016				&relay_parent_state.statement_store,
2017				&per_session.groups,
2018				group_index,
2019				candidate_hash,
2020				&(p.0, p.1.try_into().expect("Qed, can not fail was checked above")),
2021			)
2022			.into_iter()
2023			.map(|m| (vec![p.0], m)),
2024		);
2025	}
2026
2027	let manifest_peers_v3 = filter_by_peer_version(&manifest_peers, ValidationVersion::V3.into());
2028	if !manifest_peers_v3.is_empty() {
2029		gum::debug!(
2030			target: LOG_TARGET,
2031			?candidate_hash,
2032			local_validator = ?per_session.local_validator,
2033			n_peers = manifest_peers_v3.len(),
2034			"Sending manifest to v3 peers"
2035		);
2036
2037		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2038			manifest_peers_v3,
2039			ValidationProtocols::V3(
2040				protocol_v3::StatementDistributionMessage::BackedCandidateManifest(manifest),
2041			)
2042			.into(),
2043		))
2044		.await;
2045	}
2046
2047	let ack_peers_v3 = filter_by_peer_version(&ack_peers, ValidationVersion::V3.into());
2048	if !ack_peers_v3.is_empty() {
2049		gum::debug!(
2050			target: LOG_TARGET,
2051			?candidate_hash,
2052			local_validator = ?per_session.local_validator,
2053			n_peers = ack_peers_v3.len(),
2054			"Sending acknowledgement to v3 peers"
2055		);
2056
2057		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2058			ack_peers_v3,
2059			ValidationProtocols::V3(
2060				protocol_v3::StatementDistributionMessage::BackedCandidateKnown(acknowledgement),
2061			)
2062			.into(),
2063		))
2064		.await;
2065	}
2066	if !post_statements.is_empty() {
2067		let count = post_statements.len();
2068		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(post_statements))
2069			.await;
2070		metrics.on_statements_distributed(count);
2071	}
2072}
2073
2074// Utility function to populate:
2075// - per relay parent `ParaId` to `GroupIndex` mappings.
2076// - per `GroupIndex` claim queue assignments
2077async fn determine_group_assignments(
2078	n_cores: usize,
2079	group_rotation_info: GroupRotationInfo,
2080	claim_queue: &ClaimQueueSnapshot,
2081) -> (HashMap<ParaId, Vec<GroupIndex>>, HashMap<GroupIndex, Vec<ParaId>>) {
2082	// Determine the core indices occupied by each para at the current relay parent. To support
2083	// on-demand parachains we also consider the core indices at next blocks.
2084	let schedule: HashMap<CoreIndex, Vec<ParaId>> = claim_queue
2085		.iter_all_claims()
2086		.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
2087		.collect();
2088
2089	let mut groups_per_para = HashMap::new();
2090	let mut assignments_per_group = HashMap::with_capacity(schedule.len());
2091
2092	// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
2093	for (core_index, paras) in schedule {
2094		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
2095		assignments_per_group.insert(group_index, paras.clone());
2096
2097		for para in paras {
2098			groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index);
2099		}
2100	}
2101
2102	(groups_per_para, assignments_per_group)
2103}
2104
2105#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2106async fn fragment_chain_update_inner<Context>(
2107	ctx: &mut Context,
2108	state: &mut State,
2109	active_leaf_hash: Option<Hash>,
2110	required_parent_info: Option<(Hash, ParaId)>,
2111	known_hypotheticals: Option<Vec<HypotheticalCandidate>>,
2112) {
2113	// 1. get hypothetical candidates
2114	let hypotheticals = match known_hypotheticals {
2115		None => state.candidates.frontier_hypotheticals(required_parent_info),
2116		Some(h) => h,
2117	};
2118
2119	// 2. find out which are in the frontier
2120	gum::debug!(
2121		target: LOG_TARGET,
2122		active_leaf_hash = ?active_leaf_hash,
2123		"Calling getHypotheticalMembership from statement distribution for candidates: {:?}",
2124		&hypotheticals.iter().map(|hypo| hypo.candidate_hash()).collect::<Vec<_>>()
2125	);
2126	let candidate_memberships = {
2127		let (tx, rx) = oneshot::channel();
2128		ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
2129			HypotheticalMembershipRequest {
2130				candidates: hypotheticals,
2131				fragment_chain_relay_parent: active_leaf_hash,
2132			},
2133			tx,
2134		))
2135		.await;
2136
2137		match rx.await {
2138			Ok(candidate_memberships) => candidate_memberships,
2139			Err(oneshot::Canceled) => return,
2140		}
2141	};
2142	// 3. note that they are importable under a given leaf hash.
2143	for (hypo, membership) in candidate_memberships {
2144		// skip parablocks which aren't potential candidates
2145		if membership.is_empty() {
2146			continue
2147		}
2148
2149		for leaf_hash in membership {
2150			state.candidates.note_importable_under(&hypo, leaf_hash);
2151		}
2152
2153		// 4. for confirmed candidates, send all statements which are new to backing.
2154		if let HypotheticalCandidate::Complete {
2155			candidate_hash,
2156			receipt,
2157			persisted_validation_data: _,
2158		} = hypo
2159		{
2160			let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
2161			let prs = state.per_relay_parent.get_mut(&receipt.descriptor.relay_parent());
2162			if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
2163				let per_session = state.per_session.get(&prs.session);
2164				let group_index = confirmed.group_index();
2165
2166				// Sanity check if group_index is valid for this para at relay parent.
2167				let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor.para_id())
2168				else {
2169					continue
2170				};
2171				if !expected_groups.iter().any(|g| *g == group_index) {
2172					continue
2173				}
2174
2175				if let Some(per_session) = per_session {
2176					send_backing_fresh_statements(
2177						ctx,
2178						candidate_hash,
2179						confirmed.group_index(),
2180						&receipt.descriptor.relay_parent(),
2181						prs,
2182						confirmed,
2183						per_session,
2184					)
2185					.await;
2186				}
2187			}
2188		}
2189	}
2190}
2191
2192#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2193async fn new_leaf_fragment_chain_updates<Context>(
2194	ctx: &mut Context,
2195	state: &mut State,
2196	leaf_hash: Hash,
2197) {
2198	fragment_chain_update_inner(ctx, state, Some(leaf_hash), None, None).await
2199}
2200
2201#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2202async fn prospective_backed_notification_fragment_chain_updates<Context>(
2203	ctx: &mut Context,
2204	state: &mut State,
2205	para_id: ParaId,
2206	para_head: Hash,
2207) {
2208	fragment_chain_update_inner(ctx, state, None, Some((para_head, para_id)), None).await
2209}
2210
2211#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2212async fn new_confirmed_candidate_fragment_chain_updates<Context>(
2213	ctx: &mut Context,
2214	state: &mut State,
2215	candidate: HypotheticalCandidate,
2216) {
2217	fragment_chain_update_inner(ctx, state, None, None, Some(vec![candidate])).await
2218}
2219
2220struct ManifestImportSuccess<'a> {
2221	relay_parent_state: &'a mut PerRelayParentState,
2222	per_session: &'a PerSessionState,
2223	acknowledge: bool,
2224	sender_index: ValidatorIndex,
2225}
2226
2227/// Handles the common part of incoming manifests of both types (full & acknowledgement)
2228///
2229/// Basic sanity checks around data, importing the manifest into the grid tracker, finding the
2230/// sending peer's validator index, reporting the peer for any misbehavior, etc.
2231#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2232async fn handle_incoming_manifest_common<'a, Context>(
2233	ctx: &mut Context,
2234	peer: PeerId,
2235	peers: &HashMap<PeerId, PeerState>,
2236	per_relay_parent: &'a mut HashMap<Hash, PerRelayParentState>,
2237	per_session: &'a HashMap<SessionIndex, PerSessionState>,
2238	candidates: &mut Candidates,
2239	candidate_hash: CandidateHash,
2240	relay_parent: Hash,
2241	para_id: ParaId,
2242	mut manifest_summary: grid::ManifestSummary,
2243	manifest_kind: grid::ManifestKind,
2244	reputation: &mut ReputationAggregator,
2245) -> Option<ManifestImportSuccess<'a>> {
2246	// 1. sanity checks: peer is connected, relay-parent in state, para ID matches group index.
2247	let peer_state = peers.get(&peer)?;
2248
2249	let relay_parent_state = match per_relay_parent.get_mut(&relay_parent) {
2250		None => {
2251			modify_reputation(
2252				reputation,
2253				ctx.sender(),
2254				peer,
2255				COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
2256			)
2257			.await;
2258			return None
2259		},
2260		Some(s) => s,
2261	};
2262
2263	let per_session = per_session.get(&relay_parent_state.session)?;
2264
2265	if relay_parent_state.local_validator.is_none() {
2266		if per_session.is_not_validator() {
2267			modify_reputation(
2268				reputation,
2269				ctx.sender(),
2270				peer,
2271				COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
2272			)
2273			.await;
2274		}
2275		return None
2276	}
2277
2278	let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
2279		modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2280		return None
2281	};
2282
2283	if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
2284		modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2285		return None
2286	}
2287
2288	let grid_topology = per_session.grid_view.as_ref()?;
2289
2290	let sender_index = grid_topology
2291		.iter_sending_for_group(manifest_summary.claimed_group_index, manifest_kind)
2292		.filter_map(|i| per_session.session_info.discovery_keys.get(i.0 as usize).map(|ad| (i, ad)))
2293		.filter(|(_, ad)| peer_state.is_authority(ad))
2294		.map(|(i, _)| i)
2295		.next();
2296
2297	let sender_index = match sender_index {
2298		None => {
2299			modify_reputation(
2300				reputation,
2301				ctx.sender(),
2302				peer,
2303				COST_UNEXPECTED_MANIFEST_PEER_UNKNOWN,
2304			)
2305			.await;
2306			return None
2307		},
2308		Some(s) => s,
2309	};
2310
2311	// 2. sanity checks: peer is validator, bitvec size, import into grid tracker
2312	let group_index = manifest_summary.claimed_group_index;
2313	let claimed_parent_hash = manifest_summary.claimed_parent_hash;
2314
2315	// Ignore votes from disabled validators when counting towards the threshold.
2316	let group = per_session.groups.get(group_index).unwrap_or(&[]);
2317	let disabled_mask = relay_parent_state.disabled_bitmask(group);
2318	manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
2319	manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
2320
2321	let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");
2322
2323	let seconding_limit = relay_parent_state
2324		.assignments_per_group
2325		.get(&group_index)?
2326		.iter()
2327		.filter(|para| para == &&para_id)
2328		.count();
2329
2330	let acknowledge = match local_validator.grid_tracker.import_manifest(
2331		grid_topology,
2332		&per_session.groups,
2333		candidate_hash,
2334		seconding_limit,
2335		manifest_summary,
2336		manifest_kind,
2337		sender_index,
2338	) {
2339		Ok(x) => x,
2340		Err(grid::ManifestImportError::Conflicting) => {
2341			modify_reputation(reputation, ctx.sender(), peer, COST_CONFLICTING_MANIFEST).await;
2342			return None
2343		},
2344		Err(grid::ManifestImportError::Overflow) => {
2345			modify_reputation(reputation, ctx.sender(), peer, COST_EXCESSIVE_SECONDED).await;
2346			return None
2347		},
2348		Err(grid::ManifestImportError::Insufficient) => {
2349			modify_reputation(reputation, ctx.sender(), peer, COST_INSUFFICIENT_MANIFEST).await;
2350			return None
2351		},
2352		Err(grid::ManifestImportError::Malformed) => {
2353			modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
2354			return None
2355		},
2356		Err(grid::ManifestImportError::Disallowed) => {
2357			modify_reputation(reputation, ctx.sender(), peer, COST_UNEXPECTED_MANIFEST_DISALLOWED)
2358				.await;
2359			return None
2360		},
2361	};
2362
2363	// 3. if accepted by grid, insert as unconfirmed.
2364	if let Err(BadAdvertisement) = candidates.insert_unconfirmed(
2365		peer,
2366		candidate_hash,
2367		relay_parent,
2368		group_index,
2369		Some((claimed_parent_hash, para_id)),
2370	) {
2371		modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
2372		return None
2373	}
2374
2375	if acknowledge {
2376		gum::trace!(
2377			target: LOG_TARGET,
2378			?candidate_hash,
2379			from = ?sender_index,
2380			local_index = ?per_session.local_validator,
2381			?manifest_kind,
2382			"immediate ack, known candidate"
2383		);
2384	}
2385
2386	Some(ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index })
2387}
2388
2389/// Produce a list of network messages to send to a peer, following acknowledgement of a manifest.
2390/// This notes the messages as sent within the grid state.
2391fn post_acknowledgement_statement_messages(
2392	recipient: ValidatorIndex,
2393	relay_parent: Hash,
2394	grid_tracker: &mut GridTracker,
2395	statement_store: &StatementStore,
2396	groups: &Groups,
2397	group_index: GroupIndex,
2398	candidate_hash: CandidateHash,
2399	peer: &(PeerId, ValidationVersion),
2400) -> Vec<net_protocol::VersionedValidationProtocol> {
2401	let sending_filter = match grid_tracker.pending_statements_for(recipient, candidate_hash) {
2402		None => return Vec::new(),
2403		Some(f) => f,
2404	};
2405
2406	let mut messages = Vec::new();
2407	for statement in
2408		statement_store.group_statements(groups, group_index, candidate_hash, &sending_filter)
2409	{
2410		grid_tracker.sent_or_received_direct_statement(
2411			groups,
2412			statement.validator_index(),
2413			recipient,
2414			statement.payload(),
2415			false,
2416		);
2417		match peer.1.into() {
2418			ValidationVersion::V3 => messages.push(ValidationProtocols::V3(
2419				protocol_v3::StatementDistributionMessage::Statement(
2420					relay_parent,
2421					statement.as_unchecked().clone(),
2422				)
2423				.into(),
2424			)),
2425		};
2426	}
2427
2428	messages
2429}
2430
2431#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2432async fn handle_incoming_manifest<Context>(
2433	ctx: &mut Context,
2434	state: &mut State,
2435	peer: PeerId,
2436	manifest: net_protocol::v3::BackedCandidateManifest,
2437	reputation: &mut ReputationAggregator,
2438	metrics: &Metrics,
2439) {
2440	gum::debug!(
2441		target: LOG_TARGET,
2442		candidate_hash = ?manifest.candidate_hash,
2443		?peer,
2444		"Received incoming manifest",
2445	);
2446
2447	let x = match handle_incoming_manifest_common(
2448		ctx,
2449		peer,
2450		&state.peers,
2451		&mut state.per_relay_parent,
2452		&state.per_session,
2453		&mut state.candidates,
2454		manifest.candidate_hash,
2455		manifest.relay_parent,
2456		manifest.para_id,
2457		grid::ManifestSummary {
2458			claimed_parent_hash: manifest.parent_head_data_hash,
2459			claimed_group_index: manifest.group_index,
2460			statement_knowledge: manifest.statement_knowledge,
2461		},
2462		grid::ManifestKind::Full,
2463		reputation,
2464	)
2465	.await
2466	{
2467		Some(x) => x,
2468		None => return,
2469	};
2470
2471	let ManifestImportSuccess { relay_parent_state, per_session, acknowledge, sender_index } = x;
2472
2473	if acknowledge {
2474		// 4. if already known within grid (confirmed & backed), acknowledge candidate
2475		gum::trace!(
2476			target: LOG_TARGET,
2477			candidate_hash = ?manifest.candidate_hash,
2478			"Known candidate - acknowledging manifest",
2479		);
2480
2481		let local_knowledge = {
2482			let group_size = match per_session.groups.get(manifest.group_index) {
2483				None => return, // sanity
2484				Some(x) => x.len(),
2485			};
2486
2487			local_knowledge_filter(
2488				group_size,
2489				manifest.group_index,
2490				manifest.candidate_hash,
2491				&relay_parent_state.statement_store,
2492			)
2493		};
2494
2495		let (messages, statements_count) = acknowledgement_and_statement_messages(
2496			&(
2497				peer,
2498				state
2499					.peers
2500					.get(&peer)
2501					.map(|val| val.protocol_version)
2502					// Assume the latest stable version, if we don't have info about peer version.
2503					.unwrap_or(ValidationVersion::V3),
2504			),
2505			sender_index,
2506			&per_session.groups,
2507			relay_parent_state,
2508			manifest.relay_parent,
2509			manifest.group_index,
2510			manifest.candidate_hash,
2511			local_knowledge,
2512		);
2513
2514		if !messages.is_empty() {
2515			ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await;
2516			metrics.on_statements_distributed(statements_count);
2517		}
2518	} else if !state.candidates.is_confirmed(&manifest.candidate_hash) {
2519		// 5. if unconfirmed, add request entry
2520		gum::trace!(
2521			target: LOG_TARGET,
2522			candidate_hash = ?manifest.candidate_hash,
2523			"Unknown candidate - requesting",
2524		);
2525
2526		state
2527			.request_manager
2528			.get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index)
2529			.add_peer(peer);
2530	}
2531}
2532
2533/// Produces acknowledgement and statement messages to be sent over the network,
2534/// noting that they have been sent within the grid topology tracker as well.
2535fn acknowledgement_and_statement_messages(
2536	peer: &(PeerId, ValidationVersion),
2537	validator_index: ValidatorIndex,
2538	groups: &Groups,
2539	relay_parent_state: &mut PerRelayParentState,
2540	relay_parent: Hash,
2541	group_index: GroupIndex,
2542	candidate_hash: CandidateHash,
2543	local_knowledge: StatementFilter,
2544) -> (Vec<(Vec<PeerId>, net_protocol::VersionedValidationProtocol)>, usize) {
2545	let local_validator = match relay_parent_state.local_validator.as_mut() {
2546		None => return (Vec::new(), 0),
2547		Some(l) => l,
2548	};
2549
2550	let acknowledgement = protocol_v3::BackedCandidateAcknowledgement {
2551		candidate_hash,
2552		statement_knowledge: local_knowledge.clone(),
2553	};
2554
2555	let mut messages = match peer.1 {
2556		ValidationVersion::V3 => vec![(
2557			vec![peer.0],
2558			ValidationProtocols::V3(
2559				protocol_v3::StatementDistributionMessage::BackedCandidateKnown(acknowledgement),
2560			)
2561			.into(),
2562		)],
2563	};
2564
2565	local_validator.grid_tracker.manifest_sent_to(
2566		groups,
2567		validator_index,
2568		candidate_hash,
2569		local_knowledge.clone(),
2570	);
2571
2572	let statement_messages = post_acknowledgement_statement_messages(
2573		validator_index,
2574		relay_parent,
2575		&mut local_validator.grid_tracker,
2576		&relay_parent_state.statement_store,
2577		&groups,
2578		group_index,
2579		candidate_hash,
2580		peer,
2581	);
2582	let statements_count = statement_messages.len();
2583
2584	messages.extend(statement_messages.into_iter().map(|m| (vec![peer.0], m)));
2585
2586	(messages, statements_count)
2587}
2588
2589#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2590async fn handle_incoming_acknowledgement<Context>(
2591	ctx: &mut Context,
2592	state: &mut State,
2593	peer: PeerId,
2594	acknowledgement: net_protocol::v3::BackedCandidateAcknowledgement,
2595	reputation: &mut ReputationAggregator,
2596	metrics: &Metrics,
2597) {
2598	// The key difference between acknowledgments and full manifests is that only
2599	// the candidate hash is included alongside the bitfields, so the candidate
2600	// must be confirmed for us to even process it.
2601
2602	gum::debug!(
2603		target: LOG_TARGET,
2604		candidate_hash = ?acknowledgement.candidate_hash,
2605		?peer,
2606		"Received incoming acknowledgement",
2607	);
2608
2609	let candidate_hash = acknowledgement.candidate_hash;
2610	let (relay_parent, parent_head_data_hash, group_index, para_id) = {
2611		match state.candidates.get_confirmed(&candidate_hash) {
2612			Some(c) => (c.relay_parent(), c.parent_head_data_hash(), c.group_index(), c.para_id()),
2613			None => {
2614				modify_reputation(
2615					reputation,
2616					ctx.sender(),
2617					peer,
2618					COST_UNEXPECTED_ACKNOWLEDGEMENT_UNKNOWN_CANDIDATE,
2619				)
2620				.await;
2621				return
2622			},
2623		}
2624	};
2625
2626	let x = match handle_incoming_manifest_common(
2627		ctx,
2628		peer,
2629		&state.peers,
2630		&mut state.per_relay_parent,
2631		&state.per_session,
2632		&mut state.candidates,
2633		candidate_hash,
2634		relay_parent,
2635		para_id,
2636		grid::ManifestSummary {
2637			claimed_parent_hash: parent_head_data_hash,
2638			claimed_group_index: group_index,
2639			statement_knowledge: acknowledgement.statement_knowledge,
2640		},
2641		grid::ManifestKind::Acknowledgement,
2642		reputation,
2643	)
2644	.await
2645	{
2646		Some(x) => x,
2647		None => return,
2648	};
2649
2650	let ManifestImportSuccess { relay_parent_state, per_session, sender_index, .. } = x;
2651
2652	let local_validator = match relay_parent_state.local_validator.as_mut() {
2653		None => return,
2654		Some(l) => l,
2655	};
2656
2657	let messages = post_acknowledgement_statement_messages(
2658		sender_index,
2659		relay_parent,
2660		&mut local_validator.grid_tracker,
2661		&relay_parent_state.statement_store,
2662		&per_session.groups,
2663		group_index,
2664		candidate_hash,
2665		&(
2666			peer,
2667			state
2668				.peers
2669				.get(&peer)
2670				.map(|val| val.protocol_version)
2671				// Assume the latest stable version, if we don't have info about peer version.
2672				.unwrap_or(ValidationVersion::V3),
2673		),
2674	);
2675
2676	if !messages.is_empty() {
2677		let count = messages.len();
2678		ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(
2679			messages.into_iter().map(|m| (vec![peer], m)).collect(),
2680		))
2681		.await;
2682		metrics.on_statements_distributed(count);
2683	}
2684}
2685
2686/// Handle a notification of a candidate being backed.
2687#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2688pub(crate) async fn handle_backed_candidate_message<Context>(
2689	ctx: &mut Context,
2690	state: &mut State,
2691	candidate_hash: CandidateHash,
2692	metrics: &Metrics,
2693) {
2694	// If the candidate is unknown or unconfirmed, it's a race (pruned before receiving message)
2695	// or a bug. Ignore if so
2696	let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
2697		None => {
2698			gum::debug!(
2699				target: LOG_TARGET,
2700				?candidate_hash,
2701				"Received backed candidate notification for unknown or unconfirmed",
2702			);
2703
2704			return
2705		},
2706		Some(c) => c,
2707	};
2708
2709	let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
2710		None => return,
2711		Some(s) => s,
2712	};
2713
2714	let per_session = match state.per_session.get(&relay_parent_state.session) {
2715		None => return,
2716		Some(s) => s,
2717	};
2718
2719	gum::debug!(
2720		target: LOG_TARGET,
2721		?candidate_hash,
2722		group_index = ?confirmed.group_index(),
2723		"Candidate Backed - initiating grid distribution & child fetches"
2724	);
2725
2726	provide_candidate_to_grid(
2727		ctx,
2728		candidate_hash,
2729		relay_parent_state,
2730		confirmed,
2731		per_session,
2732		&state.authorities,
2733		&state.peers,
2734		metrics,
2735	)
2736	.await;
2737
2738	// Search for children of the backed candidate to request.
2739	prospective_backed_notification_fragment_chain_updates(
2740		ctx,
2741		state,
2742		confirmed.para_id(),
2743		confirmed.candidate_receipt().descriptor.para_head(),
2744	)
2745	.await;
2746}
2747
2748/// Sends all messages about a candidate to all peers in the cluster,
2749/// with `Seconded` statements first.
2750#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2751async fn send_cluster_candidate_statements<Context>(
2752	ctx: &mut Context,
2753	state: &mut State,
2754	candidate_hash: CandidateHash,
2755	relay_parent: Hash,
2756	metrics: &Metrics,
2757) {
2758	let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
2759		None => return,
2760		Some(s) => s,
2761	};
2762
2763	let per_session = match state.per_session.get(&relay_parent_state.session) {
2764		None => return,
2765		Some(s) => s,
2766	};
2767
2768	let local_group = match relay_parent_state.active_validator_state_mut() {
2769		None => return,
2770		Some(v) => v.group,
2771	};
2772
2773	let group_size = match per_session.groups.get(local_group) {
2774		None => return,
2775		Some(g) => g.len(),
2776	};
2777
2778	let statements: Vec<_> = relay_parent_state
2779		.statement_store
2780		.group_statements(
2781			&per_session.groups,
2782			local_group,
2783			candidate_hash,
2784			&StatementFilter::full(group_size),
2785		)
2786		.map(|x| x.clone())
2787		.collect();
2788
2789	for statement in statements {
2790		circulate_statement(
2791			ctx,
2792			relay_parent,
2793			relay_parent_state,
2794			per_session,
2795			&state.candidates,
2796			&state.authorities,
2797			&state.peers,
2798			statement,
2799			metrics,
2800		)
2801		.await;
2802	}
2803}
2804
2805/// Applies state & p2p updates as a result of a newly confirmed candidate.
2806///
2807/// This punishes peers which advertised the candidate incorrectly, as well as
2808/// doing an importability analysis of the confirmed candidate and providing
2809/// statements to the backing subsystem if importable. It also cleans up
2810/// any pending requests for the candidate.
2811#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2812async fn apply_post_confirmation<Context>(
2813	ctx: &mut Context,
2814	state: &mut State,
2815	post_confirmation: PostConfirmation,
2816	reputation: &mut ReputationAggregator,
2817	metrics: &Metrics,
2818) {
2819	for peer in post_confirmation.reckoning.incorrect {
2820		modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await;
2821	}
2822
2823	let candidate_hash = post_confirmation.hypothetical.candidate_hash();
2824	state.request_manager.remove_for(candidate_hash);
2825
2826	send_cluster_candidate_statements(
2827		ctx,
2828		state,
2829		candidate_hash,
2830		post_confirmation.hypothetical.relay_parent(),
2831		metrics,
2832	)
2833	.await;
2834	new_confirmed_candidate_fragment_chain_updates(ctx, state, post_confirmation.hypothetical)
2835		.await;
2836}
2837
2838/// Dispatch pending requests for candidate data & statements.
2839#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2840pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut State) {
2841	if !state.request_manager.has_pending_requests() {
2842		return
2843	}
2844
2845	let peers = &state.peers;
2846	let peer_advertised = |identifier: &CandidateIdentifier, peer: &_| {
2847		let peer_data = peers.get(peer)?;
2848
2849		let relay_parent_state = state.per_relay_parent.get(&identifier.relay_parent)?;
2850		let per_session = state.per_session.get(&relay_parent_state.session)?;
2851
2852		let local_validator = relay_parent_state.local_validator.as_ref()?;
2853
2854		for validator_id in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
2855			per_session.authority_lookup.get(a)
2856		}) {
2857			// For cluster members, they haven't advertised any statements in particular,
2858			// but have surely sent us some.
2859			if let Some(active) = local_validator.active.as_ref() {
2860				if active.cluster_tracker.knows_candidate(validator_id, identifier.candidate_hash) {
2861					return Some(StatementFilter::blank(active.cluster_tracker.targets().len()))
2862				}
2863			}
2864
2865			let filter = local_validator
2866				.grid_tracker
2867				.advertised_statements(validator_id, &identifier.candidate_hash);
2868
2869			if let Some(f) = filter {
2870				return Some(f)
2871			}
2872		}
2873
2874		None
2875	};
2876	let request_props = |identifier: &CandidateIdentifier| {
2877		let &CandidateIdentifier { relay_parent, group_index, .. } = identifier;
2878
2879		let relay_parent_state = state.per_relay_parent.get(&relay_parent)?;
2880		let per_session = state.per_session.get(&relay_parent_state.session)?;
2881		let group = per_session.groups.get(group_index)?;
2882		let seconding_limit = relay_parent_state.assignments_per_group.get(&group_index)?.len();
2883
2884		// Request nothing which would be an 'over-seconded' statement.
2885		let mut unwanted_mask = StatementFilter::blank(group.len());
2886		for (i, v) in group.iter().enumerate() {
2887			if relay_parent_state.statement_store.seconded_count(v) >= seconding_limit {
2888				unwanted_mask.seconded_in_group.set(i, true);
2889			}
2890		}
2891
2892		// Add disabled validators to the unwanted mask.
2893		let disabled_mask = relay_parent_state.disabled_bitmask(group);
2894		unwanted_mask.seconded_in_group |= &disabled_mask;
2895		unwanted_mask.validated_in_group |= &disabled_mask;
2896
2897		// don't require a backing threshold for cluster candidates.
2898		let local_validator = relay_parent_state.local_validator.as_ref()?;
2899		let require_backing = local_validator
2900			.active
2901			.as_ref()
2902			.map_or(true, |active| active.group != group_index);
2903
2904		let backing_threshold = if require_backing {
2905			let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1;
2906			Some(threshold)
2907		} else {
2908			None
2909		};
2910
2911		Some(RequestProperties { unwanted_mask, backing_threshold })
2912	};
2913
2914	while let Some(request) = state.request_manager.next_request(
2915		&mut state.response_manager,
2916		request_props,
2917		peer_advertised,
2918	) {
2919		// Peer is supposedly connected.
2920		ctx.send_message(NetworkBridgeTxMessage::SendRequests(
2921			vec![Requests::AttestedCandidateV2(request)],
2922			IfDisconnected::ImmediateError,
2923		))
2924		.await;
2925	}
2926}
2927
2928/// Wait on the next incoming response. If there are no requests pending, this
2929/// future never resolves. It is the responsibility of the user of this API
2930/// to interrupt the future.
2931pub(crate) async fn receive_response(response_manager: &mut ResponseManager) -> UnhandledResponse {
2932	match response_manager.incoming().await {
2933		Some(r) => r,
2934		None => futures::future::pending().await,
2935	}
2936}
2937
2938/// Wait on the next soonest retry on a pending request. If there are no retries pending, this
2939/// future never resolves. Note that this only signals that a request is ready to retry; the user of
2940/// this API must call `dispatch_requests`.
2941pub(crate) async fn next_retry(request_manager: &mut RequestManager) {
2942	match request_manager.next_retry_time() {
2943		Some(instant) =>
2944			futures_timer::Delay::new(instant.saturating_duration_since(Instant::now())).await,
2945		None => futures::future::pending().await,
2946	}
2947}
2948
2949/// Handles an incoming response. This does the actual work of validating the response,
2950/// importing statements, sending acknowledgements, etc.
2951#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
2952pub(crate) async fn handle_response<Context>(
2953	ctx: &mut Context,
2954	state: &mut State,
2955	response: UnhandledResponse,
2956	reputation: &mut ReputationAggregator,
2957	metrics: &Metrics,
2958) {
2959	let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
2960		response.candidate_identifier();
2961	let peer = *response.requested_peer();
2962
2963	gum::trace!(
2964		target: LOG_TARGET,
2965		?candidate_hash,
2966		?peer,
2967		"Received response",
2968	);
2969
2970	let post_confirmation = {
2971		let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
2972			None => return,
2973			Some(s) => s,
2974		};
2975
2976		let per_session = match state.per_session.get(&relay_parent_state.session) {
2977			None => return,
2978			Some(s) => s,
2979		};
2980
2981		let group = match per_session.groups.get(group_index) {
2982			None => return,
2983			Some(g) => g,
2984		};
2985
2986		let disabled_mask = relay_parent_state.disabled_bitmask(group);
2987
2988		let res = response.validate_response(
2989			&mut state.request_manager,
2990			group,
2991			relay_parent_state.session,
2992			|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
2993			|para, g_index| {
2994				let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
2995					return false
2996				};
2997
2998				expected_groups.iter().any(|g| g == &g_index)
2999			},
3000			disabled_mask,
3001			&relay_parent_state.transposed_cq,
3002			per_session.candidate_receipt_v2_enabled(),
3003		);
3004
3005		for (peer, rep) in res.reputation_changes {
3006			modify_reputation(reputation, ctx.sender(), peer, rep).await;
3007		}
3008
3009		let (candidate, pvd, statements) = match res.request_status {
3010			requests::CandidateRequestStatus::Outdated => return,
3011			requests::CandidateRequestStatus::Incomplete => {
3012				gum::trace!(
3013					target: LOG_TARGET,
3014					?candidate_hash,
3015					"Response incomplete. Retrying"
3016				);
3017
3018				return
3019			},
3020			requests::CandidateRequestStatus::Complete {
3021				candidate,
3022				persisted_validation_data,
3023				statements,
3024			} => {
3025				gum::trace!(
3026					target: LOG_TARGET,
3027					?candidate_hash,
3028					n_statements = statements.len(),
3029					"Successfully received candidate"
3030				);
3031
3032				(candidate, persisted_validation_data, statements)
3033			},
3034		};
3035
3036		for statement in statements {
3037			let _ = relay_parent_state.statement_store.insert(
3038				&per_session.groups,
3039				statement,
3040				StatementOrigin::Remote,
3041			);
3042		}
3043
3044		if let Some(post_confirmation) =
3045			state.candidates.confirm_candidate(candidate_hash, candidate, pvd, group_index)
3046		{
3047			post_confirmation
3048		} else {
3049			gum::warn!(
3050				target: LOG_TARGET,
3051				?candidate_hash,
3052				"Candidate re-confirmed by request/response: logic error",
3053			);
3054
3055			return
3056		}
3057	};
3058
3059	// Note that this implicitly circulates all statements via the cluster.
3060	apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await;
3061
3062	let confirmed = state.candidates.get_confirmed(&candidate_hash).expect("just confirmed; qed");
3063
3064	// Although the candidate is confirmed, it isn't yet a
3065	// hypothetical member of the fragment chain. Later, when it is,
3066	// we will import statements.
3067	if !confirmed.is_importable(None) {
3068		return
3069	}
3070
3071	let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) {
3072		None => return,
3073		Some(s) => s,
3074	};
3075
3076	let per_session = match state.per_session.get(&relay_parent_state.session) {
3077		None => return,
3078		Some(s) => s,
3079	};
3080
3081	send_backing_fresh_statements(
3082		ctx,
3083		candidate_hash,
3084		group_index,
3085		&relay_parent,
3086		relay_parent_state,
3087		confirmed,
3088		per_session,
3089	)
3090	.await;
3091
3092	// we don't need to send acknowledgement yet because
3093	// 1. the candidate is not known yet, so cannot be backed. any previous confirmation is a bug,
3094	//    because `apply_post_confirmation` is meant to clear requests.
3095	// 2. providing the statements to backing will lead to 'Backed' message.
3096	// 3. on 'Backed' we will send acknowledgements/follow up statements when this becomes
3097	//    includable.
3098}
3099
3100/// Returns true if the statement filter meets the backing threshold for grid requests.
3101pub(crate) fn seconded_and_sufficient(
3102	filter: &StatementFilter,
3103	backing_threshold: Option<usize>,
3104) -> bool {
3105	backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
3106}
3107
3108/// Answer an incoming request for a candidate.
3109pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
3110	let ResponderMessage { request, sent_feedback } = message;
3111	let AttestedCandidateRequest { candidate_hash, ref mask } = &request.payload;
3112
3113	gum::trace!(
3114		target: LOG_TARGET,
3115		?candidate_hash,
3116		peer = ?request.peer,
3117		"Received request"
3118	);
3119
3120	// Signal to the responder that we started processing this request.
3121	let _ = sent_feedback.send(());
3122
3123	let confirmed = match state.candidates.get_confirmed(&candidate_hash) {
3124		None => return, // drop request, candidate not known.
3125		Some(c) => c,
3126	};
3127
3128	let relay_parent_state = match state.per_relay_parent.get_mut(&confirmed.relay_parent()) {
3129		None => return,
3130		Some(s) => s,
3131	};
3132
3133	let local_validator = match relay_parent_state.local_validator.as_ref() {
3134		None => return,
3135		Some(s) => s,
3136	};
3137
3138	let per_session = match state.per_session.get(&relay_parent_state.session) {
3139		None => return,
3140		Some(s) => s,
3141	};
3142
3143	let peer_data = match state.peers.get(&request.peer) {
3144		None => return,
3145		Some(d) => d,
3146	};
3147
3148	let group_index = confirmed.group_index();
3149	let group = per_session
3150		.groups
3151		.get(group_index)
3152		.expect("group from session's candidate always known; qed");
3153
3154	let group_size = group.len();
3155
3156	// check request bitfields are right size.
3157	if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size {
3158		let _ = request.send_outgoing_response(OutgoingResponse {
3159			result: Err(()),
3160			reputation_changes: vec![COST_INVALID_REQUEST_BITFIELD_SIZE],
3161			sent_feedback: None,
3162		});
3163
3164		return
3165	}
3166
3167	// check peer is allowed to request the candidate (i.e. they're in the cluster or we've sent
3168	// them a manifest)
3169	let (validator_id, is_cluster) = {
3170		let mut validator_id = None;
3171		let mut is_cluster = false;
3172		for v in find_validator_ids(peer_data.iter_known_discovery_ids(), |a| {
3173			per_session.authority_lookup.get(a)
3174		}) {
3175			if local_validator
3176				.active
3177				.as_ref()
3178				.map_or(false, |active| active.cluster_tracker.can_request(v, *candidate_hash))
3179			{
3180				validator_id = Some(v);
3181				is_cluster = true;
3182				break
3183			}
3184
3185			if local_validator.grid_tracker.can_request(v, *candidate_hash) {
3186				validator_id = Some(v);
3187				break
3188			}
3189		}
3190
3191		match validator_id {
3192			Some(v) => (v, is_cluster),
3193			None => {
3194				let _ = request.send_outgoing_response(OutgoingResponse {
3195					result: Err(()),
3196					reputation_changes: vec![COST_UNEXPECTED_REQUEST],
3197					sent_feedback: None,
3198				});
3199
3200				return
3201			},
3202		}
3203	};
3204
3205	// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
3206	// below.
3207	let and_mask = StatementFilter {
3208		seconded_in_group: !mask.seconded_in_group.clone(),
3209		validated_in_group: !mask.validated_in_group.clone(),
3210	};
3211
3212	let local_validator = match relay_parent_state.local_validator.as_mut() {
3213		None => return,
3214		Some(s) => s,
3215	};
3216
3217	let mut sent_filter = StatementFilter::blank(group_size);
3218	let statements: Vec<_> = relay_parent_state
3219		.statement_store
3220		.group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask)
3221		.map(|s| {
3222			let s = s.as_unchecked().clone();
3223			let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
3224			let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s };
3225
3226			match s.unchecked_payload() {
3227				CompactStatement::Seconded(_) => {
3228					sent_filter.seconded_in_group.set(i, true);
3229				},
3230				CompactStatement::Valid(_) => {
3231					sent_filter.validated_in_group.set(i, true);
3232				},
3233			}
3234			s
3235		})
3236		.collect();
3237
3238	// There should be no response at all for grid requests when the
3239	// backing threshold is no longer met as a result of disabled validators.
3240	if !is_cluster {
3241		let threshold = per_session
3242			.groups
3243			.get_size_and_backing_threshold(group_index)
3244			.expect("group existence checked above; qed")
3245			.1;
3246
3247		if !seconded_and_sufficient(&sent_filter, Some(threshold)) {
3248			gum::info!(
3249				target: LOG_TARGET,
3250				?candidate_hash,
3251				relay_parent = ?confirmed.relay_parent(),
3252				?group_index,
3253				"Dropping a request from a grid peer because the backing threshold is no longer met."
3254			);
3255			return
3256		}
3257	}
3258
3259	// Update bookkeeping about which statements peers have received.
3260	for statement in &statements {
3261		if is_cluster {
3262			local_validator
3263				.active
3264				.as_mut()
3265				.expect("cluster peer means local is active validator; qed")
3266				.cluster_tracker
3267				.note_sent(
3268					validator_id,
3269					statement.unchecked_validator_index(),
3270					statement.unchecked_payload().clone(),
3271				);
3272		} else {
3273			local_validator.grid_tracker.sent_or_received_direct_statement(
3274				&per_session.groups,
3275				statement.unchecked_validator_index(),
3276				validator_id,
3277				statement.unchecked_payload(),
3278				false,
3279			);
3280		}
3281	}
3282
3283	let response = AttestedCandidateResponse {
3284		candidate_receipt: (&**confirmed.candidate_receipt()).clone(),
3285		persisted_validation_data: confirmed.persisted_validation_data().clone(),
3286		statements,
3287	};
3288
3289	let _ = request.send_response(response);
3290}
3291
3292/// Messages coming from the background respond task.
3293pub(crate) struct ResponderMessage {
3294	request: IncomingRequest<AttestedCandidateRequest>,
3295	sent_feedback: oneshot::Sender<()>,
3296}
3297
3298/// A fetching task, taking care of fetching candidates via request/response.
3299///
3300/// Runs in a background task and feeds request to [`answer_request`] through [`MuxedMessage`].
3301pub(crate) async fn respond_task(
3302	mut receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
3303	mut sender: mpsc::Sender<ResponderMessage>,
3304	metrics: Metrics,
3305) {
3306	let mut pending_out = FuturesUnordered::new();
3307	let mut active_peers = HashSet::new();
3308
3309	loop {
3310		select! {
3311			// New request
3312			request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
3313				let request = match request_result.into_nested() {
3314					Ok(Ok(v)) => v,
3315					Err(fatal) => {
3316						gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
3317						return
3318					},
3319					Ok(Err(jfyi)) => {
3320						gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
3321						continue
3322					},
3323				};
3324
3325				// If peer currently being served drop request
3326				if active_peers.contains(&request.peer) {
3327					gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request");
3328					metrics.on_request_dropped_peer_rate_limit();
3329					continue
3330				}
3331
3332				// If we are over parallel limit wait for one to finish
3333				if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
3334					gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish");
3335					metrics.on_max_parallel_requests_reached();
3336					let (_, peer) = pending_out.select_next_some().await;
3337					active_peers.remove(&peer);
3338				}
3339
3340				// Start serving the request
3341				let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
3342				let peer = request.peer;
3343				if let Err(err) = sender
3344					.feed(ResponderMessage { request, sent_feedback: pending_sent_tx })
3345					.await
3346				{
3347					gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
3348					return
3349				}
3350				let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
3351				pending_out.push(future_with_peer);
3352				active_peers.insert(peer);
3353			},
3354			// Request served/finished
3355			result = pending_out.select_next_some() => {
3356				let (_, peer) = result;
3357				active_peers.remove(&peer);
3358			},
3359		}
3360	}
3361}