referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking parachain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows parachains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! parachains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Parachains
41//! subsystem. The 'depth' of a parachain block with respect to a relay chain block is
42//! a measure of how many parachain blocks are between the most recent included parachain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent parachain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Parachains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Parachains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85	ValidationResult, DISPUTE_WINDOW,
86};
87use polkadot_node_subsystem::{
88	messages::{
89		AvailabilityDistributionMessage, AvailabilityStoreMessage, BackableCandidateRef,
90		CanSecondRequest, CandidateBackingMessage, CandidateValidationMessage,
91		CollatorProtocolMessage, HypotheticalCandidate, HypotheticalMembershipRequest,
92		IntroduceSecondedCandidateRequest, ProspectiveParachainsMessage, ProvisionableData,
93		ProvisionerMessage, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest,
94		StatementDistributionMessage, StoreAvailableDataError,
95	},
96	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97	SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100	self as util,
101	backing_implicit_view::View as ImplicitView,
102	request_claim_queue, request_disabled_validators, request_min_backing_votes,
103	request_node_features, request_session_index_for_child, request_validator_groups,
104	request_validators,
105	runtime::{self, ClaimQueueSnapshot},
106	Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110	node_features::FeatureIndex, BackedCandidate, CandidateCommitments, CandidateHash,
111	CandidateReceiptV2 as CandidateReceipt,
112	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, GroupIndex,
113	GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData,
114	SessionIndex, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
115	ValidityAttestation,
116};
117use polkadot_statement_table::{
118	generic::AttestedCandidate as TableAttestedCandidate,
119	v2::{
120		SignedStatement as TableSignedStatement, Statement as TableStatement,
121		Summary as TableSummary,
122	},
123	Context as TableContextTrait, Table,
124};
125use sp_keystore::KeystorePtr;
126
127mod error;
128
129mod metrics;
130use self::metrics::Metrics;
131
132#[cfg(test)]
133mod tests;
134
135const LOG_TARGET: &str = "parachain::candidate-backing";
136
137/// PoV data to validate.
138enum PoVData {
139	/// Already available (from candidate selection).
140	Ready(Arc<PoV>),
141	/// Needs to be fetched from validator (we are checking a signed statement).
142	FetchFromValidator {
143		from_validator: ValidatorIndex,
144		candidate_hash: CandidateHash,
145		pov_hash: Hash,
146	},
147}
148
149enum ValidatedCandidateCommand {
150	// We were instructed to second the candidate that has been already validated.
151	Second(BackgroundValidationResult),
152	// We were instructed to validate the candidate.
153	Attest(BackgroundValidationResult),
154	// We were not able to `Attest` because backing validator did not send us the PoV.
155	AttestNoPoV(CandidateHash),
156}
157
158impl std::fmt::Debug for ValidatedCandidateCommand {
159	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
160		let candidate_hash = self.candidate_hash();
161		match *self {
162			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
163			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
164			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
165		}
166	}
167}
168
169impl ValidatedCandidateCommand {
170	fn candidate_hash(&self) -> CandidateHash {
171		match *self {
172			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
173			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
174			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
175			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
176			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
177		}
178	}
179}
180
181/// The candidate backing subsystem.
182pub struct CandidateBackingSubsystem {
183	keystore: KeystorePtr,
184	metrics: Metrics,
185}
186
187impl CandidateBackingSubsystem {
188	/// Create a new instance of the `CandidateBackingSubsystem`.
189	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
190		Self { keystore, metrics }
191	}
192}
193
194#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
195impl<Context> CandidateBackingSubsystem
196where
197	Context: Send + Sync,
198{
199	fn start(self, ctx: Context) -> SpawnedSubsystem {
200		let future = async move {
201			run(ctx, self.keystore, self.metrics)
202				.await
203				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
204		}
205		.boxed();
206
207		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
208	}
209}
210
211struct PerSchedulingParentState {
212	/// The hash of the scheduling parent on top of which this job is doing it's work.
213	parent: Hash,
214	/// The node features.
215	node_features: NodeFeatures,
216	/// The `CoreIndex` assigned to the local validator at this scheduling parent.
217	assigned_core: Option<CoreIndex>,
218	/// The candidates that are backed by enough validators in their group, by hash.
219	backed: HashSet<CandidateHash>,
220	/// The table of candidates and statements under this relay-parent.
221	table: Table<TableContext>,
222	/// The table context, including groups.
223	table_context: TableContext,
224	/// We issued `Seconded` or `Valid` statements on about these candidates.
225	issued_statements: HashSet<CandidateHash>,
226	/// These candidates are undergoing validation in the background.
227	awaiting_validation: HashSet<CandidateHash>,
228	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
229	fallbacks: HashMap<CandidateHash, AttestingData>,
230	/// The minimum backing votes threshold.
231	minimum_backing_votes: u32,
232	/// The number of cores.
233	n_cores: u32,
234	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
235	/// availability cores.
236	claim_queue: ClaimQueueSnapshot,
237	/// The validator index -> group mapping at this scheduling parent.
238	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
239	/// Session index for this scheduling parent. Passed to candidate-validation so it
240	/// can fetch session-scoped params without a runtime call for V1 descriptors.
241	session_index: SessionIndex,
242	/// The associated group rotation information.
243	group_rotation_info: GroupRotationInfo,
244}
245
246struct PerCandidateState {
247	persisted_validation_data: PersistedValidationData,
248	seconded_locally: bool,
249	scheduling_parent: Hash,
250}
251
252/// A cache for storing data per-session to reduce repeated
253/// runtime API calls and avoid redundant computations.
254struct PerSessionCache {
255	/// Cache for storing validators list, retrieved from the runtime.
256	validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
257	/// Cache for storing node features, retrieved from the runtime.
258	node_features_cache: LruMap<SessionIndex, NodeFeatures>,
259	/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
260	minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261	/// Cache for storing validator-to-group mappings, computed from validator groups.
262	validator_to_group_cache:
263		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267	/// Creates a new `PerSessionCache` with a default capacity.
268	// TODO: Use the actual session window for allowed relay parents once
269	// https://github.com/paritytech/polkadot-sdk/issues/11207 is available,
270	// instead of DISPUTE_WINDOW.
271	fn default() -> Self {
272		Self::new(DISPUTE_WINDOW.get())
273	}
274}
275
276impl PerSessionCache {
277	/// Creates a new `PerSessionCache` with a given capacity.
278	fn new(capacity: u32) -> Self {
279		PerSessionCache {
280			validators_cache: LruMap::new(ByLength::new(capacity)),
281			node_features_cache: LruMap::new(ByLength::new(capacity)),
282			minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
283			validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
284		}
285	}
286
287	/// Gets validators from the cache or fetches them from the runtime if not present.
288	async fn validators(
289		&mut self,
290		session_index: SessionIndex,
291		parent: Hash,
292		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
293	) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
294		// Try to get the validators list from the cache.
295		if let Some(validators) = self.validators_cache.get(&session_index) {
296			return Ok(Arc::clone(validators));
297		}
298
299		// Fetch the validators list from the runtime since it was not in the cache.
300		let validators: Vec<ValidatorId> =
301			request_validators(parent, sender).await.await.map_err(|err| {
302				RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
303			})??;
304
305		// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
306		let validators = Arc::new(validators);
307
308		// Cache the fetched validators list for future use.
309		self.validators_cache.insert(session_index, Arc::clone(&validators));
310
311		Ok(validators)
312	}
313
314	/// Gets the node features from the cache or fetches it from the runtime if not present.
315	async fn node_features(
316		&mut self,
317		session_index: SessionIndex,
318		parent: Hash,
319		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
320	) -> Result<NodeFeatures, RuntimeApiError> {
321		// Try to get the node features from the cache.
322		if let Some(node_features) = self.node_features_cache.get(&session_index) {
323			return Ok(node_features.clone());
324		}
325
326		// Fetch the node features from the runtime since it was not in the cache.
327		let node_features = request_node_features(parent, session_index, sender)
328			.await
329			.await
330			.map_err(|err| RuntimeApiError::Execution {
331				runtime_api_name: "NodeFeatures",
332				source: Arc::new(err),
333			})??;
334
335		// Cache the fetched node features for future use.
336		self.node_features_cache.insert(session_index, node_features.clone());
337
338		Ok(node_features)
339	}
340
341	/// Gets the minimum backing votes threshold from the
342	/// cache or fetches it from the runtime if not present.
343	async fn minimum_backing_votes(
344		&mut self,
345		session_index: SessionIndex,
346		parent: Hash,
347		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
348	) -> Result<u32, RuntimeApiError> {
349		// Try to get the value from the cache.
350		if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
351			return Ok(*minimum_backing_votes);
352		}
353
354		// Fetch the value from the runtime since it was not in the cache.
355		let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
356			.await
357			.await
358			.map_err(|err| RuntimeApiError::Execution {
359				runtime_api_name: "MinimumBackingVotes",
360				source: Arc::new(err),
361			})??;
362
363		// Cache the fetched value for future use.
364		self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
365
366		Ok(minimum_backing_votes)
367	}
368
369	/// Gets or computes the validator-to-group mapping for a session.
370	fn validator_to_group(
371		&mut self,
372		session_index: SessionIndex,
373		validators: &[ValidatorId],
374		validator_groups: &[Vec<ValidatorIndex>],
375	) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
376		let validator_to_group = self
377			.validator_to_group_cache
378			.get_or_insert(session_index, || {
379				let mut vector = vec![None; validators.len()];
380
381				for (group_idx, validator_group) in validator_groups.iter().enumerate() {
382					for validator in validator_group {
383						vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
384					}
385				}
386
387				Arc::new(IndexedVec::<_, _>::from(vector))
388			})
389			.expect("Just inserted");
390
391		Arc::clone(validator_to_group)
392	}
393}
394
395/// The state of the subsystem.
396struct State {
397	/// The utility for managing the implicit and explicit views in a consistent way.
398	implicit_view: ImplicitView,
399	/// State tracked for all scheduling-parents backing work is ongoing for. This includes
400	/// all active leaves.
401	per_scheduling_parent: HashMap<Hash, PerSchedulingParentState>,
402	/// State tracked for all candidates relevant to the implicit view.
403	///
404	/// This is guaranteed to have an entry for each candidate with a scheduling parent in the
405	/// implicit or explicit view for which a `Seconded` statement has been successfully imported.
406	per_candidate: HashMap<CandidateHash, PerCandidateState>,
407	/// A local cache for storing per-session data. This cache helps to
408	/// reduce repeated calls to the runtime and avoid redundant computations.
409	per_session_cache: PerSessionCache,
410	/// A clonable sender which is dispatched to background candidate validation tasks to inform
411	/// the main task of the result.
412	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
413	/// The handle to the keystore used for signing.
414	keystore: KeystorePtr,
415	/// Monotonic flag: set to `true` once a **finalized** block is observed whose
416	/// session has the `CandidateReceiptV3` node feature enabled. Once set, never
417	/// unset. Used for V3 gating checks in backing — if V3 was never seen, reject
418	/// V3 candidates and candidates where old/new version detection disagrees.
419	///
420	/// Backing uses finalized blocks (rather than any active leaf) to ensure that
421	/// new backers do not start producing V3 candidates before a supermajority of
422	/// validators (including approval checkers) are aware of the feature. Approval
423	/// and dispute validation use active leaves instead, so they always transition
424	/// *before* backing does — the safe direction.
425	v3_ever_seen: bool,
426}
427
428impl State {
429	fn new(
430		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
431		keystore: KeystorePtr,
432	) -> Self {
433		State {
434			implicit_view: ImplicitView::default(),
435			per_scheduling_parent: HashMap::default(),
436			per_candidate: HashMap::new(),
437			per_session_cache: PerSessionCache::default(),
438			background_validation_tx,
439			keystore,
440			v3_ever_seen: false,
441		}
442	}
443}
444
445#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
446async fn run<Context>(
447	mut ctx: Context,
448	keystore: KeystorePtr,
449	metrics: Metrics,
450) -> FatalResult<()> {
451	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
452	let mut state = State::new(background_validation_tx, keystore);
453
454	loop {
455		let res =
456			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
457
458		match res {
459			Ok(()) => break,
460			Err(e) => crate::error::log_error(Err(e))?,
461		}
462	}
463
464	Ok(())
465}
466
467#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
468async fn run_iteration<Context>(
469	ctx: &mut Context,
470	state: &mut State,
471	metrics: &Metrics,
472	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
473) -> Result<(), Error> {
474	loop {
475		futures::select!(
476			validated_command = background_validation_rx.next().fuse() => {
477				if let Some((scheduling_parent, command)) = validated_command {
478					handle_validated_candidate_command(
479						&mut *ctx,
480						state,
481						scheduling_parent,
482						command,
483						metrics,
484					).await?;
485				} else {
486					panic!("background_validation_tx always alive at this point; qed");
487				}
488			}
489			from_overseer = ctx.recv().fuse() => {
490				match from_overseer.map_err(Error::OverseerExited)? {
491					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
492						handle_active_leaves_update(
493							&mut *ctx,
494							update,
495							state,
496						).await?;
497					}
498					FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _number)) => {
499						if !state.v3_ever_seen {
500							check_v3_on_finalized(&mut *ctx, state, hash).await?;
501						}
502					}
503					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
504					FromOrchestra::Communication { msg } => {
505						handle_communication(&mut *ctx, state, msg, metrics).await?;
506					}
507				}
508			}
509		)
510	}
511}
512
513/// In case a backing validator does not provide a PoV, we need to retry with other backing
514/// validators.
515///
516/// This is the data needed to accomplish this. Basically all the data needed for spawning a
517/// validation job and a list of backing validators, we can try.
518#[derive(Clone)]
519struct AttestingData {
520	/// The candidate to attest.
521	candidate: CandidateReceipt,
522	/// Hash of the PoV we need to fetch.
523	pov_hash: Hash,
524	/// Validator we are currently trying to get the PoV from.
525	from_validator: ValidatorIndex,
526	/// Other backing validators we can try in case `from_validator` failed.
527	backing: Vec<ValidatorIndex>,
528}
529
530#[derive(Default, Debug)]
531struct TableContext {
532	validator: Option<Validator>,
533	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
534	validators: Vec<ValidatorId>,
535	disabled_validators: Vec<ValidatorIndex>,
536}
537
538impl TableContext {
539	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
540	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
541		self.disabled_validators
542			.iter()
543			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
544	}
545
546	// Returns `true` if the local validator is in the disabled validators list
547	pub fn local_validator_is_disabled(&self) -> Option<bool> {
548		self.validator.as_ref().map(|v| v.disabled())
549	}
550}
551
552impl TableContextTrait for TableContext {
553	type AuthorityId = ValidatorIndex;
554	type Digest = CandidateHash;
555	type GroupId = CoreIndex;
556	type Signature = ValidatorSignature;
557	type Candidate = CommittedCandidateReceipt;
558
559	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
560		candidate.hash()
561	}
562
563	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
564		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
565	}
566
567	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
568		self.groups.get(group).map(|g| g.len())
569	}
570}
571
572// It looks like it's not possible to do an `impl From` given the current state of
573// the code. So this does the necessary conversion.
574fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
575	let statement = match s.payload() {
576		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
577		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
578	};
579
580	TableSignedStatement {
581		statement,
582		signature: s.signature().clone(),
583		sender: s.validator_index(),
584	}
585}
586
587fn table_attested_to_backed(
588	attested: TableAttestedCandidate<
589		CoreIndex,
590		CommittedCandidateReceipt,
591		ValidatorIndex,
592		ValidatorSignature,
593	>,
594	table_context: &TableContext,
595) -> Option<BackedCandidate> {
596	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
597
598	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
599		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
600
601	let group = table_context.groups.get(&core_index)?;
602
603	let mut validator_indices = BitVec::with_capacity(group.len());
604
605	validator_indices.resize(group.len(), false);
606
607	// The order of the validity votes in the backed candidate must match
608	// the order of bits set in the bitfield, which is not necessarily
609	// the order of the `validity_votes` we got from the table.
610	let mut vote_positions = Vec::with_capacity(validity_votes.len());
611	for (orig_idx, id) in ids.iter().enumerate() {
612		if let Some(position) = group.iter().position(|x| x == id) {
613			validator_indices.set(position, true);
614			vote_positions.push((orig_idx, position));
615		} else {
616			gum::warn!(
617				target: LOG_TARGET,
618				"Logic error: Validity vote from table does not correspond to group",
619			);
620
621			return None;
622		}
623	}
624	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
625
626	Some(BackedCandidate::new(
627		candidate,
628		vote_positions
629			.into_iter()
630			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
631			.collect(),
632		validator_indices,
633		core_index,
634	))
635}
636
637async fn store_available_data(
638	sender: &mut impl overseer::CandidateBackingSenderTrait,
639	n_validators: u32,
640	candidate_hash: CandidateHash,
641	available_data: AvailableData,
642	expected_erasure_root: Hash,
643	core_index: CoreIndex,
644	node_features: NodeFeatures,
645) -> Result<(), Error> {
646	let (tx, rx) = oneshot::channel();
647	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
648	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
649	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
650	// candidate validity.
651	sender
652		.send_message(AvailabilityStoreMessage::StoreAvailableData {
653			candidate_hash,
654			n_validators,
655			available_data,
656			expected_erasure_root,
657			core_index,
658			node_features,
659			tx,
660		})
661		.await;
662
663	rx.await
664		.map_err(Error::StoreAvailableDataChannel)?
665		.map_err(Error::StoreAvailableData)
666}
667
668// Make a `PoV` available.
669//
670// This calls the AV store to write the available data to storage. The AV store also checks the
671// erasure root matches the `expected_erasure_root`.
672// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
673//
674// Otherwise, it returns `Ok(())`.
675async fn make_pov_available(
676	sender: &mut impl overseer::CandidateBackingSenderTrait,
677	n_validators: usize,
678	pov: Arc<PoV>,
679	candidate_hash: CandidateHash,
680	validation_data: PersistedValidationData,
681	expected_erasure_root: Hash,
682	core_index: CoreIndex,
683	node_features: NodeFeatures,
684) -> Result<(), Error> {
685	store_available_data(
686		sender,
687		n_validators as u32,
688		candidate_hash,
689		AvailableData { pov, validation_data },
690		expected_erasure_root,
691		core_index,
692		node_features,
693	)
694	.await
695}
696
697async fn request_pov(
698	sender: &mut impl overseer::CandidateBackingSenderTrait,
699	relay_parent: Hash,
700	from_validator: ValidatorIndex,
701	para_id: ParaId,
702	candidate_hash: CandidateHash,
703	pov_hash: Hash,
704) -> Result<Arc<PoV>, Error> {
705	let (tx, rx) = oneshot::channel();
706	sender
707		.send_message(AvailabilityDistributionMessage::FetchPoV {
708			relay_parent,
709			from_validator,
710			para_id,
711			candidate_hash,
712			pov_hash,
713			tx,
714		})
715		.await;
716
717	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
718	Ok(Arc::new(pov))
719}
720
721async fn request_candidate_validation(
722	sender: &mut impl overseer::CandidateBackingSenderTrait,
723	validation_data: PersistedValidationData,
724	validation_code: ValidationCode,
725	candidate_receipt: CandidateReceipt,
726	pov: Arc<PoV>,
727	session_index: SessionIndex,
728) -> Result<ValidationResult, Error> {
729	let (tx, rx) = oneshot::channel();
730	let is_system = candidate_receipt.descriptor.para_id().is_system();
731	// PVF execution uses this for pruning obsolete jobs - we need the scheduling parent here:
732	let scheduling_parent = candidate_receipt.descriptor.scheduling_parent();
733
734	sender
735		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
736			validation_data,
737			validation_code,
738			candidate_receipt,
739			pov,
740			scheduling_session_index: session_index,
741			exec_kind: if is_system {
742				PvfExecKind::BackingSystemParas(scheduling_parent)
743			} else {
744				PvfExecKind::Backing(scheduling_parent)
745			},
746			response_sender: tx,
747		})
748		.await;
749
750	match rx.await {
751		Ok(Ok(validation_result)) => Ok(validation_result),
752		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
753		Err(err) => Err(Error::ValidateFromExhaustive(err)),
754	}
755}
756
757struct BackgroundValidationOutputs {
758	candidate: CandidateReceipt,
759	commitments: CandidateCommitments,
760	persisted_validation_data: PersistedValidationData,
761}
762
763type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
764
765struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
766	sender: S,
767	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
768	candidate: CandidateReceipt,
769	/// The scheduling parent hash. Used as context for runtime API queries and as
770	/// the key for `per_scheduling_parent` lookup when sending results back.
771	/// For V1/V2, this equals the candidate's relay_parent.
772	scheduling_parent: Hash,
773	session_index: SessionIndex,
774	node_features: NodeFeatures,
775	persisted_validation_data: PersistedValidationData,
776	pov: PoVData,
777	n_validators: usize,
778	make_command: F,
779}
780
781async fn validate_and_make_available(
782	params: BackgroundValidationParams<
783		impl overseer::CandidateBackingSenderTrait,
784		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
785	>,
786	core_index: CoreIndex,
787) -> Result<(), Error> {
788	let BackgroundValidationParams {
789		mut sender,
790		mut tx_command,
791		candidate,
792		scheduling_parent,
793		session_index,
794		node_features,
795		persisted_validation_data,
796		pov,
797		n_validators,
798		make_command,
799	} = params;
800
801	let validation_code = {
802		let validation_code_hash = candidate.descriptor().validation_code_hash();
803		let (tx, rx) = oneshot::channel();
804		sender
805			.send_message(RuntimeApiMessage::Request(
806				scheduling_parent,
807				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
808			))
809			.await;
810
811		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
812		match code {
813			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
814			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
815			Ok(Some(c)) => c,
816		}
817	};
818
819	let pov = match pov {
820		PoVData::Ready(pov) => pov,
821		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
822			match request_pov(
823				&mut sender,
824				scheduling_parent,
825				from_validator,
826				candidate.descriptor.para_id(),
827				candidate_hash,
828				pov_hash,
829			)
830			.await
831			{
832				Err(Error::FetchPoV) => {
833					tx_command
834						.send((
835							scheduling_parent,
836							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
837						))
838						.await
839						.map_err(Error::BackgroundValidationMpsc)?;
840					return Ok(());
841				},
842				Err(err) => return Err(err),
843				Ok(pov) => pov,
844			}
845		},
846	};
847
848	let v = {
849		request_candidate_validation(
850			&mut sender,
851			persisted_validation_data,
852			validation_code,
853			candidate.clone(),
854			pov.clone(),
855			session_index,
856		)
857		.await?
858	};
859
860	let res = match v {
861		ValidationResult::Valid(commitments, validation_data) => {
862			gum::debug!(
863				target: LOG_TARGET,
864				candidate_hash = ?candidate.hash(),
865				"Validation successful",
866			);
867
868			let erasure_valid = make_pov_available(
869				&mut sender,
870				n_validators,
871				pov.clone(),
872				candidate.hash(),
873				validation_data.clone(),
874				candidate.descriptor.erasure_root(),
875				core_index,
876				node_features,
877			)
878			.await;
879
880			match erasure_valid {
881				Ok(()) => Ok(BackgroundValidationOutputs {
882					candidate,
883					commitments,
884					persisted_validation_data: validation_data,
885				}),
886				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
887					gum::debug!(
888						target: LOG_TARGET,
889						candidate_hash = ?candidate.hash(),
890						actual_commitments = ?commitments,
891						"Erasure root doesn't match the announced by the candidate receipt",
892					);
893					Err(candidate)
894				},
895				// Bubble up any other error.
896				Err(e) => return Err(e),
897			}
898		},
899		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
900			// If validation produces a new set of commitments, we vote the candidate as invalid.
901			gum::warn!(
902				target: LOG_TARGET,
903				candidate_hash = ?candidate.hash(),
904				"Validation yielded different commitments",
905			);
906			Err(candidate)
907		},
908		ValidationResult::Invalid(reason) => {
909			gum::warn!(
910				target: LOG_TARGET,
911				candidate_hash = ?candidate.hash(),
912				reason = ?reason,
913				"Validation yielded an invalid candidate",
914			);
915			Err(candidate)
916		},
917	};
918
919	tx_command
920		.send((scheduling_parent, make_command(res)))
921		.await
922		.map_err(Into::into)
923}
924
925#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
926async fn handle_communication<Context>(
927	ctx: &mut Context,
928	state: &mut State,
929	message: CandidateBackingMessage,
930	metrics: &Metrics,
931) -> Result<(), Error> {
932	match message {
933		CandidateBackingMessage::Second { scheduling_parent: _, candidate, pvd, pov } => {
934			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
935		},
936		CandidateBackingMessage::Statement { scheduling_parent, statement } => {
937			handle_statement_message(ctx, state, scheduling_parent, statement, metrics).await?;
938		},
939		CandidateBackingMessage::GetBackableCandidates { candidates, sender } => {
940			handle_get_backable_candidates_message(state, candidates, sender, metrics)?
941		},
942		CandidateBackingMessage::CanSecond(request, tx) => {
943			handle_can_second_request(ctx, state, request, tx).await
944		},
945	}
946
947	Ok(())
948}
949
950#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
951async fn handle_active_leaves_update<Context>(
952	ctx: &mut Context,
953	update: ActiveLeavesUpdate,
954	state: &mut State,
955) -> Result<(), Error> {
956	// Activate in implicit view before deactivate, per the docs
957	// on ImplicitView, this is more efficient.
958	let res = if let Some(leaf) = update.activated {
959		let leaf_hash = leaf.hash;
960		Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
961	} else {
962		None
963	};
964
965	for deactivated in update.deactivated {
966		state.implicit_view.deactivate_leaf(deactivated);
967	}
968
969	// clean up `per_scheduling_parent` according to ancestry
970	// of leaves. we do this so we can clean up candidates right after
971	// as a result.
972	{
973		let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
974
975		state.per_scheduling_parent.retain(|r, _| remaining.contains(&r));
976	}
977
978	// clean up `per_candidate` according to which relay-parents
979	// are known.
980	state
981		.per_candidate
982		.retain(|_, pc| state.per_scheduling_parent.contains_key(&pc.scheduling_parent));
983
984	// Get relay parents which might be fresh but might be known already
985	// that are explicit or implicit from the new active leaf.
986	let fresh_relay_parents = match res {
987		None => return Ok(()),
988		Some((leaf, Ok(_))) => {
989			let fresh_relay_parents =
990				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash);
991
992			let fresh_relay_parent = match fresh_relay_parents {
993				Some(f) => f.to_vec(),
994				None => {
995					gum::warn!(
996						target: LOG_TARGET,
997						leaf_hash = ?leaf.hash,
998						"Implicit view gave no relay-parents"
999					);
1000
1001					vec![leaf.hash]
1002				},
1003			};
1004			fresh_relay_parent
1005		},
1006		Some((leaf, Err(e))) => {
1007			gum::debug!(
1008				target: LOG_TARGET,
1009				leaf_hash = ?leaf.hash,
1010				err = ?e,
1011				"Failed to load implicit view for leaf."
1012			);
1013
1014			return Ok(());
1015		},
1016	};
1017
1018	// add entries in `per_scheduling_parent`. for all new relay-parents.
1019	for maybe_new in fresh_relay_parents {
1020		if state.per_scheduling_parent.contains_key(&maybe_new) {
1021			continue;
1022		}
1023
1024		// construct a `PerSchedulingParent` from the runtime API
1025		// and insert it.
1026		let per = construct_per_scheduling_parent_state(
1027			ctx,
1028			maybe_new,
1029			&state.keystore,
1030			&mut state.per_session_cache,
1031		)
1032		.await?;
1033
1034		if let Some(per) = per {
1035			state.per_scheduling_parent.insert(maybe_new, per);
1036		}
1037	}
1038
1039	Ok(())
1040}
1041
1042/// Check whether the `CandidateReceiptV3` node feature is enabled at the session
1043/// of the given finalized block. Sets `state.v3_ever_seen` if so.
1044#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1045async fn check_v3_on_finalized<Context>(
1046	ctx: &mut Context,
1047	state: &mut State,
1048	finalized_hash: Hash,
1049) -> Result<(), Error> {
1050	let session_index = request_session_index_for_child(finalized_hash, ctx.sender())
1051		.await
1052		.await
1053		.map_err(runtime::Error::from)?
1054		.map_err(runtime::Error::from)?;
1055
1056	let node_features = state
1057		.per_session_cache
1058		.node_features(session_index, finalized_hash, ctx.sender())
1059		.await
1060		.map_err(runtime::Error::from)?;
1061
1062	if FeatureIndex::CandidateReceiptV3.is_set(&node_features) {
1063		gum::info!(
1064			target: LOG_TARGET,
1065			?session_index,
1066			"CandidateReceiptV3 node feature detected in finalized block, \
1067		 enabling V3 candidate support",
1068		);
1069		state.v3_ever_seen = true;
1070	}
1071
1072	Ok(())
1073}
1074
1075macro_rules! try_runtime_api {
1076	($x: expr) => {
1077		match $x {
1078			Ok(x) => x,
1079			Err(err) => {
1080				// Only bubble up fatal errors.
1081				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1082
1083				// We can't do candidate validation work if we don't have the
1084				// requisite runtime API data. But these errors should not take
1085				// down the node.
1086				return Ok(None);
1087			},
1088		}
1089	};
1090}
1091
1092fn core_index_from_statement(
1093	sp_state: &PerSchedulingParentState,
1094	statement: &SignedFullStatementWithPVD,
1095) -> Option<CoreIndex> {
1096	let compact_statement = statement.as_unchecked();
1097	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1098
1099	gum::trace!(
1100		target: LOG_TARGET,
1101		group_rotation_info = ?sp_state.group_rotation_info,
1102		?statement,
1103		validator_to_group = ?sp_state.validator_to_group,
1104		n_cores = sp_state.n_cores,
1105		?candidate_hash,
1106		"Extracting core index from statement"
1107	);
1108
1109	let statement_validator_index = statement.validator_index();
1110	let Some(Some(group_index)) = sp_state.validator_to_group.get(statement_validator_index) else {
1111		gum::debug!(
1112			target: LOG_TARGET,
1113			group_rotation_info = ?sp_state.group_rotation_info,
1114			?statement,
1115			validator_to_group = ?sp_state.validator_to_group,
1116			n_cores = sp_state.n_cores,
1117			?candidate_hash,
1118			"Invalid validator index: {:?}",
1119			statement_validator_index
1120		);
1121		return None;
1122	};
1123
1124	// First check if the statement para id matches the core assignment.
1125	let core_index =
1126		sp_state.group_rotation_info.core_for_group(*group_index, sp_state.n_cores as _);
1127
1128	if core_index.0 > sp_state.n_cores {
1129		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores = sp_state.n_cores, "Invalid CoreIndex");
1130		return None;
1131	}
1132
1133	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1134		let candidate_para_id = candidate.descriptor.para_id();
1135		let mut assigned_paras = sp_state.claim_queue.iter_claims_for_core(&core_index);
1136
1137		if !assigned_paras.any(|id| id == &candidate_para_id) {
1138			gum::debug!(
1139				target: LOG_TARGET,
1140				?candidate_hash,
1141				?core_index,
1142				assigned_paras = ?sp_state.claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1143				?candidate_para_id,
1144				"Invalid CoreIndex, core is not assigned to this para_id"
1145			);
1146			return None;
1147		}
1148		Some(core_index)
1149	} else {
1150		Some(core_index)
1151	}
1152}
1153
1154/// Load the data necessary to do backing work on top of a relay-parent.
1155#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1156async fn construct_per_scheduling_parent_state<Context>(
1157	ctx: &mut Context,
1158	relay_parent: Hash,
1159	keystore: &KeystorePtr,
1160	per_session_cache: &mut PerSessionCache,
1161) -> Result<Option<PerSchedulingParentState>, Error> {
1162	let parent = relay_parent;
1163
1164	let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1165		request_session_index_for_child(parent, ctx.sender()).await,
1166		request_validator_groups(parent, ctx.sender()).await,
1167		request_claim_queue(parent, ctx.sender()).await,
1168		request_disabled_validators(parent, ctx.sender()).await,
1169	)
1170	.map_err(Error::JoinMultiple)?;
1171
1172	let session_index = try_runtime_api!(session_index);
1173
1174	let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1175	let validators = try_runtime_api!(validators);
1176
1177	let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1178	let node_features = try_runtime_api!(node_features);
1179
1180	gum::debug!(target: LOG_TARGET, ?parent, "New state");
1181
1182	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1183
1184	let minimum_backing_votes = per_session_cache
1185		.minimum_backing_votes(session_index, parent, ctx.sender())
1186		.await;
1187	let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1188	let claim_queue = try_runtime_api!(claim_queue);
1189	let disabled_validators = try_runtime_api!(disabled_validators);
1190
1191	let signing_context = SigningContext { parent_hash: parent, session_index };
1192	let validator = match Validator::construct(
1193		&validators,
1194		&disabled_validators,
1195		signing_context.clone(),
1196		keystore.clone(),
1197	) {
1198		Ok(v) => Some(v),
1199		Err(util::Error::NotAValidator) => None,
1200		Err(e) => {
1201			gum::warn!(
1202				target: LOG_TARGET,
1203				err = ?e,
1204				"Cannot participate in candidate backing",
1205			);
1206
1207			return Ok(None);
1208		},
1209	};
1210
1211	let n_cores = validator_groups.len();
1212
1213	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1214	let mut assigned_core = None;
1215
1216	for idx in 0..n_cores {
1217		let core_index = CoreIndex(idx as _);
1218
1219		if !claim_queue.contains_key(&core_index) {
1220			continue;
1221		}
1222
1223		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1224		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1225			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1226				assigned_core = Some(core_index);
1227			}
1228			groups.insert(core_index, g.clone());
1229		}
1230	}
1231	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1232
1233	let validator_to_group =
1234		per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1235
1236	let table_context =
1237		TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1238
1239	Ok(Some(PerSchedulingParentState {
1240		parent,
1241		node_features,
1242		assigned_core,
1243		backed: HashSet::new(),
1244		table: Table::new(),
1245		table_context,
1246		issued_statements: HashSet::new(),
1247		awaiting_validation: HashSet::new(),
1248		fallbacks: HashMap::new(),
1249		minimum_backing_votes,
1250		n_cores: validator_groups.len() as u32,
1251		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1252		validator_to_group,
1253		session_index,
1254		group_rotation_info,
1255	}))
1256}
1257
1258enum SecondingAllowed {
1259	No,
1260	// On which leaves is seconding allowed.
1261	Yes(Vec<Hash>),
1262}
1263
1264/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1265/// chain.
1266#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1267async fn seconding_sanity_check<Context>(
1268	ctx: &mut Context,
1269	implicit_view: &ImplicitView,
1270	hypothetical_candidate: HypotheticalCandidate,
1271) -> SecondingAllowed {
1272	let mut leaves_for_seconding = Vec::new();
1273	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1274
1275	// Scheduling context: the scheduling parent determines which leaves this candidate
1276	// can be seconded under (it must be in the allowed ancestry).
1277	let candidate_scheduling_parent = hypothetical_candidate.scheduling_parent();
1278	let candidate_hash = hypothetical_candidate.candidate_hash();
1279
1280	for head in implicit_view.leaves() {
1281		// Check that the candidate scheduling parent is allowed under this leaf.
1282		let allowed_parents_for_para = implicit_view.known_allowed_relay_parents_under(head);
1283		if !allowed_parents_for_para
1284			.unwrap_or_default()
1285			.contains(&candidate_scheduling_parent)
1286		{
1287			continue;
1288		}
1289
1290		let (tx, rx) = oneshot::channel();
1291		ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1292			HypotheticalMembershipRequest {
1293				candidates: vec![hypothetical_candidate.clone()],
1294				fragment_chain_relay_parent: Some(*head),
1295			},
1296			tx,
1297		))
1298		.await;
1299		let response = rx.map_ok(move |candidate_memberships| {
1300			let is_member_or_potential = candidate_memberships
1301				.into_iter()
1302				.find_map(|(candidate, leaves)| {
1303					(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1304				})
1305				.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1306				.is_some();
1307
1308			(is_member_or_potential, head)
1309		});
1310		responses.push_back(response.boxed());
1311	}
1312
1313	if responses.is_empty() {
1314		return SecondingAllowed::No;
1315	}
1316
1317	while let Some(response) = responses.next().await {
1318		match response {
1319			Err(oneshot::Canceled) => {
1320				gum::warn!(
1321					target: LOG_TARGET,
1322					"Failed to reach prospective parachains subsystem for hypothetical membership",
1323				);
1324
1325				return SecondingAllowed::No;
1326			},
1327			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1328				false => {
1329					gum::debug!(
1330						target: LOG_TARGET,
1331						?candidate_hash,
1332						leaf_hash = ?head,
1333						"Refusing to second candidate at leaf. Is not a potential member.",
1334					);
1335				},
1336				true => {
1337					leaves_for_seconding.push(*head);
1338				},
1339			},
1340		}
1341	}
1342
1343	if leaves_for_seconding.is_empty() {
1344		SecondingAllowed::No
1345	} else {
1346		SecondingAllowed::Yes(leaves_for_seconding)
1347	}
1348}
1349
1350/// Performs seconding sanity check for an advertisement.
1351#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1352async fn handle_can_second_request<Context>(
1353	ctx: &mut Context,
1354	state: &State,
1355	request: CanSecondRequest,
1356	tx: oneshot::Sender<bool>,
1357) {
1358	let scheduling_parent = request.candidate_scheduling_parent;
1359	let response = if state.per_scheduling_parent.get(&scheduling_parent).is_some() {
1360		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1361			candidate_hash: request.candidate_hash,
1362			candidate_para: request.candidate_para_id,
1363			parent_head_data_hash: request.parent_head_data_hash,
1364			candidate_scheduling_parent: scheduling_parent,
1365		};
1366
1367		let result =
1368			seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1369
1370		match result {
1371			SecondingAllowed::No => false,
1372			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1373		}
1374	} else {
1375		// Relay parent is unknown or async backing is disabled.
1376		false
1377	};
1378
1379	let _ = tx.send(response);
1380}
1381
1382#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1383async fn handle_validated_candidate_command<Context>(
1384	ctx: &mut Context,
1385	state: &mut State,
1386	scheduling_parent: Hash,
1387	command: ValidatedCandidateCommand,
1388	metrics: &Metrics,
1389) -> Result<(), Error> {
1390	match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1391		Some(sp_state) => {
1392			let candidate_hash = command.candidate_hash();
1393			sp_state.awaiting_validation.remove(&candidate_hash);
1394
1395			match command {
1396				ValidatedCandidateCommand::Second(res) => match res {
1397					Ok(outputs) => {
1398						let BackgroundValidationOutputs {
1399							candidate,
1400							commitments,
1401							persisted_validation_data,
1402						} = outputs;
1403
1404						if sp_state.issued_statements.contains(&candidate_hash) {
1405							return Ok(());
1406						}
1407
1408						let receipt = CommittedCandidateReceipt {
1409							descriptor: candidate.descriptor.clone(),
1410							commitments,
1411						};
1412
1413						let hypothetical_candidate = HypotheticalCandidate::Complete {
1414							candidate_hash,
1415							receipt: Arc::new(receipt.clone()),
1416							persisted_validation_data: persisted_validation_data.clone(),
1417						};
1418						// sanity check that we're allowed to second the candidate
1419						// and that it doesn't conflict with other candidates we've
1420						// seconded.
1421						if let SecondingAllowed::No = seconding_sanity_check(
1422							ctx,
1423							&state.implicit_view,
1424							hypothetical_candidate,
1425						)
1426						.await
1427						{
1428							return Ok(());
1429						};
1430
1431						let statement =
1432							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1433
1434						// If we get an Error::RejectedByProspectiveParachains,
1435						// then the statement has not been distributed or imported into
1436						// the table.
1437						let res = sign_import_and_distribute_statement(
1438							ctx,
1439							sp_state,
1440							&mut state.per_candidate,
1441							statement,
1442							state.keystore.clone(),
1443							metrics,
1444						)
1445						.await;
1446
1447						if let Err(Error::RejectedByProspectiveParachains) = res {
1448							let candidate_hash = candidate.hash();
1449							gum::debug!(
1450								target: LOG_TARGET,
1451								scheduling_parent = ?candidate.descriptor().scheduling_parent(),
1452								?candidate_hash,
1453								"Attempted to second candidate but was rejected by prospective parachains",
1454							);
1455
1456							ctx.send_message(CollatorProtocolMessage::Invalid(
1457								candidate.descriptor().scheduling_parent(),
1458								candidate,
1459							))
1460							.await;
1461
1462							return Ok(());
1463						}
1464
1465						if let Some(stmt) = res? {
1466							match state.per_candidate.get_mut(&candidate_hash) {
1467								None => {
1468									gum::warn!(
1469										target: LOG_TARGET,
1470										?candidate_hash,
1471										"Missing `per_candidate` for seconded candidate.",
1472									);
1473								},
1474								Some(p) => p.seconded_locally = true,
1475							}
1476
1477							sp_state.issued_statements.insert(candidate_hash);
1478
1479							metrics.on_candidate_seconded();
1480							ctx.send_message(CollatorProtocolMessage::Seconded(
1481								sp_state.parent,
1482								StatementWithPVD::drop_pvd_from_signed(stmt),
1483							))
1484							.await;
1485						}
1486					},
1487					Err(candidate) => {
1488						ctx.send_message(CollatorProtocolMessage::Invalid(
1489							sp_state.parent,
1490							candidate,
1491						))
1492						.await;
1493					},
1494				},
1495				ValidatedCandidateCommand::Attest(res) => {
1496					// We are done - avoid new validation spawns:
1497					sp_state.fallbacks.remove(&candidate_hash);
1498					// sanity check.
1499					if !sp_state.issued_statements.contains(&candidate_hash) {
1500						if res.is_ok() {
1501							let statement = StatementWithPVD::Valid(candidate_hash);
1502
1503							sign_import_and_distribute_statement(
1504								ctx,
1505								sp_state,
1506								&mut state.per_candidate,
1507								statement,
1508								state.keystore.clone(),
1509								metrics,
1510							)
1511							.await?;
1512						}
1513						sp_state.issued_statements.insert(candidate_hash);
1514					}
1515				},
1516				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1517					if let Some(attesting) = sp_state.fallbacks.get_mut(&candidate_hash) {
1518						if let Some(index) = attesting.backing.pop() {
1519							attesting.from_validator = index;
1520							let attesting = attesting.clone();
1521
1522							// The candidate state should be available because we've
1523							// validated it before, the relay-parent is still around,
1524							// and candidates are pruned on the basis of relay-parents.
1525							//
1526							// If it is not, then no point in validating it anyway.
1527							if let Some(pvd) = state
1528								.per_candidate
1529								.get(&candidate_hash)
1530								.map(|pc| pc.persisted_validation_data.clone())
1531							{
1532								kick_off_validation_work(
1533									ctx,
1534									sp_state,
1535									pvd,
1536									&state.background_validation_tx,
1537									attesting,
1538								)
1539								.await?;
1540							}
1541						}
1542					} else {
1543						gum::warn!(
1544							target: LOG_TARGET,
1545							"AttestNoPoV was triggered without fallback being available."
1546						);
1547						debug_assert!(false);
1548					}
1549				},
1550			}
1551		},
1552		None => {
1553			// simple race condition; can be ignored = this relay-parent
1554			// is no longer relevant.
1555		},
1556	}
1557
1558	Ok(())
1559}
1560
1561fn sign_statement(
1562	sp_state: &PerSchedulingParentState,
1563	statement: StatementWithPVD,
1564	keystore: KeystorePtr,
1565	metrics: &Metrics,
1566) -> Option<SignedFullStatementWithPVD> {
1567	let signed = sp_state
1568		.table_context
1569		.validator
1570		.as_ref()?
1571		.sign(keystore, statement)
1572		.ok()
1573		.flatten()?;
1574	metrics.on_statement_signed();
1575	Some(signed)
1576}
1577
1578/// Import a statement into the statement table and return the summary of the import.
1579///
1580/// This will fail with `Error::RejectedByProspectiveParachains` if the message type is seconded,
1581/// the candidate is fresh, and any of the following are true:
1582/// 1. There is no `PersistedValidationData` attached.
1583/// 2. Prospective parachains subsystem returned an empty `HypotheticalMembership` i.e. did not
1584///    recognize the candidate as being applicable to any of the active leaves.
1585#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1586async fn import_statement<Context>(
1587	ctx: &mut Context,
1588	sp_state: &mut PerSchedulingParentState,
1589	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1590	statement: &SignedFullStatementWithPVD,
1591) -> Result<Option<TableSummary>, Error> {
1592	let candidate_hash = statement.payload().candidate_hash();
1593
1594	gum::debug!(
1595		target: LOG_TARGET,
1596		statement = ?statement.payload().to_compact(),
1597		validator_index = statement.validator_index().0,
1598		?candidate_hash,
1599		"Importing statement",
1600	);
1601
1602	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1603	// we need to create an entry in the `PerCandidateState` map.
1604	//
1605	// We also need to inform the prospective parachains subsystem of the seconded candidate.
1606	// If `ProspectiveParachainsMessage::Second` fails, then we return
1607	// Error::RejectedByProspectiveParachains.
1608	//
1609	// Persisted Validation Data should be available - it may already be available
1610	// if this is a candidate we are seconding.
1611	//
1612	// We should also not accept any candidates which have no valid depths under any of
1613	// our active leaves.
1614	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1615		if !per_candidate.contains_key(&candidate_hash) {
1616			let (tx, rx) = oneshot::channel();
1617			ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1618				IntroduceSecondedCandidateRequest {
1619					candidate_para: candidate.descriptor.para_id(),
1620					candidate_receipt: candidate.clone(),
1621					persisted_validation_data: pvd.clone(),
1622				},
1623				tx,
1624			))
1625			.await;
1626
1627			match rx.await {
1628				Err(oneshot::Canceled) => {
1629					gum::warn!(
1630						target: LOG_TARGET,
1631						"Could not reach the Prospective Parachains subsystem."
1632					);
1633
1634					return Err(Error::RejectedByProspectiveParachains);
1635				},
1636				Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1637				Ok(true) => {},
1638			}
1639
1640			// Only save the candidate if it was approved by prospective parachains.
1641			per_candidate.insert(
1642				candidate_hash,
1643				PerCandidateState {
1644					persisted_validation_data: pvd.clone(),
1645					// This is set after importing when seconding locally.
1646					seconded_locally: false,
1647					// Scheduling context: used for cleanup against per_scheduling_parent.
1648					scheduling_parent: candidate.descriptor.scheduling_parent(),
1649				},
1650			);
1651		}
1652	}
1653
1654	let stmt = primitive_statement_to_table(statement);
1655
1656	let core = core_index_from_statement(sp_state, statement).ok_or(Error::CoreIndexUnavailable)?;
1657
1658	Ok(sp_state.table.import_statement(&sp_state.table_context, core, stmt))
1659}
1660
1661/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1662/// misbehaviors as a result of importing a statement.
1663#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1664async fn post_import_statement_actions<Context>(
1665	ctx: &mut Context,
1666	sp_state: &mut PerSchedulingParentState,
1667	summary: Option<&TableSummary>,
1668) {
1669	if let Some(attested) = summary.as_ref().and_then(|s| {
1670		sp_state.table.attested_candidate(
1671			&s.candidate,
1672			&sp_state.table_context,
1673			sp_state.minimum_backing_votes,
1674		)
1675	}) {
1676		let candidate_hash = attested.candidate.hash();
1677
1678		// `HashSet::insert` returns true if the thing wasn't in there already.
1679		if sp_state.backed.insert(candidate_hash) {
1680			if let Some(backed) = table_attested_to_backed(attested, &sp_state.table_context) {
1681				let para_id = backed.candidate().descriptor.para_id();
1682				gum::debug!(
1683					target: LOG_TARGET,
1684					candidate_hash = ?candidate_hash,
1685					scheduling_parent = ?sp_state.parent,
1686					%para_id,
1687					"Candidate backed",
1688				);
1689
1690				// Inform the prospective parachains subsystem
1691				// that the candidate is now backed.
1692				ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1693					para_id,
1694					candidate_hash,
1695				))
1696				.await;
1697				// Notify statement distribution of backed candidate.
1698				ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1699			} else {
1700				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1701			}
1702		} else {
1703			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1704		}
1705	} else {
1706		gum::debug!(target: LOG_TARGET, "No attested candidate");
1707	}
1708
1709	issue_new_misbehaviors(ctx, sp_state.parent, &mut sp_state.table);
1710}
1711
1712/// Check if there have happened any new misbehaviors and issue necessary messages.
1713#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1714fn issue_new_misbehaviors<Context>(
1715	ctx: &mut Context,
1716	relay_parent: Hash,
1717	table: &mut Table<TableContext>,
1718) {
1719	// collect the misbehaviors to avoid double mutable self borrow issues
1720	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1721	for (validator_id, report) in misbehaviors {
1722		// The provisioner waits on candidate-backing, which means
1723		// that we need to send unbounded messages to avoid cycles.
1724		//
1725		// Misbehaviors are bounded by the number of validators and
1726		// the block production protocol.
1727		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1728			relay_parent,
1729			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1730		));
1731	}
1732}
1733
1734/// Sign, import, and distribute a statement.
1735#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1736async fn sign_import_and_distribute_statement<Context>(
1737	ctx: &mut Context,
1738	sp_state: &mut PerSchedulingParentState,
1739	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1740	statement: StatementWithPVD,
1741	keystore: KeystorePtr,
1742	metrics: &Metrics,
1743) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1744	if let Some(signed_statement) = sign_statement(&*sp_state, statement, keystore, metrics) {
1745		let summary = import_statement(ctx, sp_state, per_candidate, &signed_statement).await?;
1746
1747		// `Share` must always be sent before `Backed`. We send the latter in
1748		// `post_import_statement_action` below.
1749		let smsg = StatementDistributionMessage::Share(sp_state.parent, signed_statement.clone());
1750		ctx.send_unbounded_message(smsg);
1751
1752		post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1753
1754		Ok(Some(signed_statement))
1755	} else {
1756		Ok(None)
1757	}
1758}
1759
1760#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1761async fn background_validate_and_make_available<Context>(
1762	ctx: &mut Context,
1763	sp_state: &mut PerSchedulingParentState,
1764	params: BackgroundValidationParams<
1765		impl overseer::CandidateBackingSenderTrait,
1766		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1767	>,
1768) -> Result<(), Error> {
1769	let candidate_hash = params.candidate.hash();
1770	let Some(core_index) = sp_state.assigned_core else { return Ok(()) };
1771	if sp_state.awaiting_validation.insert(candidate_hash) {
1772		// spawn background task.
1773		let bg = async move {
1774			if let Err(error) = validate_and_make_available(params, core_index).await {
1775				if let Error::BackgroundValidationMpsc(error) = error {
1776					gum::debug!(
1777						target: LOG_TARGET,
1778						?candidate_hash,
1779						?error,
1780						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1781					);
1782				} else {
1783					gum::error!(
1784						target: LOG_TARGET,
1785						?candidate_hash,
1786						?error,
1787						"Failed to validate and make available",
1788					);
1789				}
1790			}
1791		};
1792
1793		ctx.spawn("backing-validation", bg.boxed())
1794			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1795	}
1796
1797	Ok(())
1798}
1799
1800/// Kick off validation work and distribute the result as a signed statement.
1801#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1802async fn kick_off_validation_work<Context>(
1803	ctx: &mut Context,
1804	sp_state: &mut PerSchedulingParentState,
1805	persisted_validation_data: PersistedValidationData,
1806	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1807	attesting: AttestingData,
1808) -> Result<(), Error> {
1809	// Do nothing if the local validator is disabled or not a validator at all
1810	match sp_state.table_context.local_validator_is_disabled() {
1811		Some(true) => {
1812			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1813			return Ok(());
1814		},
1815		Some(false) => {}, // we are not disabled - move on
1816		None => {
1817			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1818			return Ok(());
1819		},
1820	}
1821
1822	let candidate_hash = attesting.candidate.hash();
1823	if sp_state.issued_statements.contains(&candidate_hash) {
1824		return Ok(());
1825	}
1826
1827	gum::debug!(
1828		target: LOG_TARGET,
1829		candidate_hash = ?candidate_hash,
1830		candidate_receipt = ?attesting.candidate,
1831		"Kicking off validation",
1832	);
1833
1834	let bg_sender = ctx.sender().clone();
1835	let pov = PoVData::FetchFromValidator {
1836		from_validator: attesting.from_validator,
1837		candidate_hash,
1838		pov_hash: attesting.pov_hash,
1839	};
1840	let scheduling_parent = attesting.candidate.descriptor().scheduling_parent();
1841
1842	background_validate_and_make_available(
1843		ctx,
1844		sp_state,
1845		BackgroundValidationParams {
1846			sender: bg_sender,
1847			tx_command: background_validation_tx.clone(),
1848			candidate: attesting.candidate,
1849			scheduling_parent,
1850			session_index: sp_state.session_index,
1851			node_features: sp_state.node_features.clone(),
1852			persisted_validation_data,
1853			pov,
1854			n_validators: sp_state.table_context.validators.len(),
1855			make_command: ValidatedCandidateCommand::Attest,
1856		},
1857	)
1858	.await
1859}
1860
1861/// Import the statement and kick off validation work if it is a part of our assignment.
1862#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1863async fn maybe_validate_and_import<Context>(
1864	ctx: &mut Context,
1865	state: &mut State,
1866	scheduling_parent: Hash,
1867	statement: SignedFullStatementWithPVD,
1868) -> Result<(), Error> {
1869	let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1870		Some(r) => r,
1871		None => {
1872			gum::trace!(
1873				target: LOG_TARGET,
1874				?scheduling_parent,
1875				"Received statement for unknown scheduling parent"
1876			);
1877
1878			return Ok(());
1879		},
1880	};
1881
1882	// Don't import statement if the sender is disabled
1883	if sp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1884		gum::debug!(
1885			target: LOG_TARGET,
1886			sender_validator_idx = ?statement.validator_index(),
1887			"Not importing statement because the sender is disabled"
1888		);
1889		return Ok(());
1890	}
1891
1892	// Version consistency + V3 gating for Seconded statements (shared logic).
1893	if let StatementWithPVD::Seconded(receipt, _) = statement.payload() {
1894		if let Err(reason) = receipt.descriptor.check_version_acceptance(state.v3_ever_seen) {
1895			gum::debug!(
1896				target: LOG_TARGET,
1897				?scheduling_parent,
1898				"Not importing Seconded statement: {}",
1899				reason,
1900			);
1901			return Ok(());
1902		}
1903	}
1904
1905	let res = import_statement(ctx, sp_state, &mut state.per_candidate, &statement).await;
1906
1907	// if we get an Error::RejectedByProspectiveParachains,
1908	// we will do nothing.
1909	if let Err(Error::RejectedByProspectiveParachains) = res {
1910		gum::debug!(
1911			target: LOG_TARGET,
1912			?scheduling_parent,
1913			"Statement rejected by prospective parachains."
1914		);
1915
1916		return Ok(());
1917	}
1918
1919	let summary = res?;
1920	post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1921
1922	if let Some(summary) = summary {
1923		// import_statement already takes care of communicating with the
1924		// prospective parachains subsystem. At this point, the candidate
1925		// has already been accepted by the subsystem.
1926
1927		let candidate_hash = summary.candidate;
1928
1929		if Some(summary.group_id) != sp_state.assigned_core {
1930			return Ok(());
1931		}
1932
1933		let attesting = match statement.payload() {
1934			StatementWithPVD::Seconded(receipt, _) => {
1935				let attesting = AttestingData {
1936					candidate: sp_state
1937						.table
1938						.get_candidate(&candidate_hash)
1939						.ok_or(Error::CandidateNotFound)?
1940						.to_plain(),
1941					pov_hash: receipt.descriptor.pov_hash(),
1942					from_validator: statement.validator_index(),
1943					backing: Vec::new(),
1944				};
1945				sp_state.fallbacks.insert(summary.candidate, attesting.clone());
1946				attesting
1947			},
1948			StatementWithPVD::Valid(candidate_hash) => {
1949				if let Some(attesting) = sp_state.fallbacks.get_mut(candidate_hash) {
1950					let our_index = sp_state.table_context.validator.as_ref().map(|v| v.index());
1951					if our_index == Some(statement.validator_index()) {
1952						return Ok(());
1953					}
1954
1955					if sp_state.awaiting_validation.contains(candidate_hash) {
1956						// Job already running:
1957						attesting.backing.push(statement.validator_index());
1958						return Ok(());
1959					} else {
1960						// No job, so start another with current validator:
1961						attesting.from_validator = statement.validator_index();
1962						attesting.clone()
1963					}
1964				} else {
1965					return Ok(());
1966				}
1967			},
1968		};
1969
1970		// Skip validation if local validator is disabled or not a validator.
1971		match sp_state.table_context.local_validator_is_disabled() {
1972			Some(true) => return Ok(()),
1973			None => return Ok(()),
1974			Some(false) => {},
1975		}
1976
1977		// After `import_statement` succeeds, the candidate entry is guaranteed
1978		// to exist.
1979		if let Some(pvd) = state
1980			.per_candidate
1981			.get(&candidate_hash)
1982			.map(|pc| pc.persisted_validation_data.clone())
1983		{
1984			kick_off_validation_work(
1985				ctx,
1986				sp_state,
1987				pvd,
1988				&state.background_validation_tx,
1989				attesting,
1990			)
1991			.await?;
1992		}
1993	}
1994	Ok(())
1995}
1996
1997/// Kick off background validation with intent to second.
1998#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1999async fn validate_and_second<Context>(
2000	ctx: &mut Context,
2001	sp_state: &mut PerSchedulingParentState,
2002	persisted_validation_data: PersistedValidationData,
2003	candidate: &CandidateReceipt,
2004	pov: Arc<PoV>,
2005	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2006) -> Result<(), Error> {
2007	let candidate_hash = candidate.hash();
2008
2009	gum::debug!(
2010		target: LOG_TARGET,
2011		candidate_hash = ?candidate_hash,
2012		candidate_receipt = ?candidate,
2013		"Validate and second candidate",
2014	);
2015
2016	let bg_sender = ctx.sender().clone();
2017	let scheduling_parent = candidate.descriptor.scheduling_parent();
2018	background_validate_and_make_available(
2019		ctx,
2020		sp_state,
2021		BackgroundValidationParams {
2022			sender: bg_sender,
2023			tx_command: background_validation_tx.clone(),
2024			candidate: candidate.clone(),
2025			scheduling_parent,
2026			session_index: sp_state.session_index,
2027			node_features: sp_state.node_features.clone(),
2028			persisted_validation_data,
2029			pov: PoVData::Ready(pov),
2030			n_validators: sp_state.table_context.validators.len(),
2031			make_command: ValidatedCandidateCommand::Second,
2032		},
2033	)
2034	.await?;
2035
2036	Ok(())
2037}
2038
2039#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2040async fn handle_second_message<Context>(
2041	ctx: &mut Context,
2042	state: &mut State,
2043	candidate: CandidateReceipt,
2044	persisted_validation_data: PersistedValidationData,
2045	pov: PoV,
2046	metrics: &Metrics,
2047) -> Result<(), Error> {
2048	let _timer = metrics.time_process_second();
2049
2050	let candidate_hash = candidate.hash();
2051
2052	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2053		gum::warn!(
2054			target: LOG_TARGET,
2055			?candidate_hash,
2056			"Candidate backing was asked to second candidate with wrong PVD",
2057		);
2058
2059		return Ok(());
2060	}
2061
2062	// Version consistency + V3 gating (shared logic from primitives).
2063	if let Err(reason) = candidate.descriptor().check_version_acceptance(state.v3_ever_seen) {
2064		gum::debug!(
2065			target: LOG_TARGET,
2066			?candidate_hash,
2067			"Not seconding candidate: {}",
2068			reason,
2069		);
2070		ctx.send_message(CollatorProtocolMessage::Invalid(
2071			candidate.descriptor().scheduling_parent(),
2072			candidate,
2073		))
2074		.await;
2075		return Ok(());
2076	}
2077
2078	// The signing context should use scheduling_parent (for V1/V2, this equals relay_parent)
2079	let scheduling_parent = candidate.descriptor().scheduling_parent();
2080
2081	// Look up the PerSchedulingParentState using scheduling_parent - this is where we'll sign
2082	let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
2083		None => {
2084			gum::trace!(
2085				target: LOG_TARGET,
2086				?scheduling_parent,
2087				?candidate_hash,
2088				"Candidate has scheduling_parent outside of our view."
2089			);
2090
2091			return Ok(());
2092		},
2093		Some(r) => r,
2094	};
2095
2096	// Get the scheduling info (assigned_core and claim_queue) from the scheduling_parent state
2097	let assigned_core = sp_state.assigned_core;
2098	let claim_queue = &sp_state.claim_queue;
2099
2100	// Just return if the local validator is disabled. If we are here the local node should be a
2101	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2102	if sp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2103		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2104		return Ok(());
2105	}
2106
2107	// For V3, use scheduling info from scheduling_parent (claim queue determines assignments)
2108	let assigned_paras = assigned_core.and_then(|core| claim_queue.0.get(&core));
2109
2110	// Sanity check that candidate is from our assignment.
2111	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2112		gum::debug!(
2113			target: LOG_TARGET,
2114			our_assignment_core = ?assigned_core,
2115			our_assignment_paras = ?assigned_paras,
2116			collation = ?candidate.descriptor().para_id(),
2117			"Subsystem asked to second for para outside of our assignment",
2118		);
2119		return Ok(());
2120	}
2121
2122	gum::debug!(
2123		target: LOG_TARGET,
2124		our_assignment_core = ?assigned_core,
2125		our_assignment_paras = ?assigned_paras,
2126		collation = ?candidate.descriptor().para_id(),
2127		"Current assignments vs collation",
2128	);
2129
2130	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2131	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2132	//
2133	// The actual logic of issuing the signed statement checks that this isn't
2134	// conflicting with other seconded candidates. Not doing that check here
2135	// gives other subsystems the ability to get us to execute arbitrary candidates,
2136	// but no more.
2137	if !sp_state.issued_statements.contains(&candidate_hash) {
2138		let pov = Arc::new(pov);
2139
2140		validate_and_second(
2141			ctx,
2142			sp_state,
2143			persisted_validation_data,
2144			&candidate,
2145			pov,
2146			&state.background_validation_tx,
2147		)
2148		.await?;
2149	}
2150
2151	Ok(())
2152}
2153
2154#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2155async fn handle_statement_message<Context>(
2156	ctx: &mut Context,
2157	state: &mut State,
2158	scheduling_parent: Hash,
2159	statement: SignedFullStatementWithPVD,
2160	metrics: &Metrics,
2161) -> Result<(), Error> {
2162	let _timer = metrics.time_process_statement();
2163
2164	// Validator disabling is handled in `maybe_validate_and_import`
2165	match maybe_validate_and_import(ctx, state, scheduling_parent, statement).await {
2166		Err(Error::ValidationFailed(_)) => Ok(()),
2167		Err(e) => Err(e),
2168		Ok(()) => Ok(()),
2169	}
2170}
2171
2172fn handle_get_backable_candidates_message(
2173	state: &State,
2174	requested_candidates: HashMap<ParaId, Vec<BackableCandidateRef>>,
2175	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2176	metrics: &Metrics,
2177) -> Result<(), Error> {
2178	let _timer = metrics.time_get_backed_candidates();
2179
2180	let mut backed = HashMap::with_capacity(requested_candidates.len());
2181
2182	for (para_id, para_candidates) in requested_candidates {
2183		for candidate_ref in para_candidates.iter() {
2184			let candidate_hash = candidate_ref.candidate_hash;
2185			let scheduling_parent = candidate_ref.scheduling_parent;
2186
2187			let sp_state = match state.per_scheduling_parent.get(&scheduling_parent) {
2188				Some(sp_state) => sp_state,
2189				None => {
2190					gum::debug!(
2191						target: LOG_TARGET,
2192						?scheduling_parent,
2193						?candidate_hash,
2194						"Requested candidate's scheduling parent is out of view",
2195					);
2196					break;
2197				},
2198			};
2199			let maybe_backed_candidate = sp_state
2200				.table
2201				.attested_candidate(
2202					&candidate_hash,
2203					&sp_state.table_context,
2204					sp_state.minimum_backing_votes,
2205				)
2206				.and_then(|attested| table_attested_to_backed(attested, &sp_state.table_context));
2207
2208			if let Some(backed_candidate) = maybe_backed_candidate {
2209				backed
2210					.entry(para_id)
2211					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2212					.push(backed_candidate);
2213			} else {
2214				break;
2215			}
2216		}
2217	}
2218
2219	tx.send(backed).map_err(|data| Error::Send(data))?;
2220	Ok(())
2221}