referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking parachain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows parachains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! parachains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Parachains
41//! subsystem. The 'depth' of a parachain block with respect to a relay chain block is
42//! a measure of how many parachain blocks are between the most recent included parachain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent parachain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Parachains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Parachains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85	ValidationResult,
86};
87use polkadot_node_subsystem::{
88	messages::{
89		AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90		CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91		HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92		ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93		RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94		StoreAvailableDataError,
95	},
96	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97	SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100	self as util,
101	backing_implicit_view::View as ImplicitView,
102	request_claim_queue, request_disabled_validators, request_min_backing_votes,
103	request_node_features, request_session_executor_params, request_session_index_for_child,
104	request_validator_groups, request_validators,
105	runtime::{self, ClaimQueueSnapshot},
106	Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110	BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt,
111	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
112	GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
113	PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
114	ValidatorIndex, ValidatorSignature, ValidityAttestation,
115};
116use polkadot_statement_table::{
117	generic::AttestedCandidate as TableAttestedCandidate,
118	v2::{
119		SignedStatement as TableSignedStatement, Statement as TableStatement,
120		Summary as TableSummary,
121	},
122	Context as TableContextTrait, Table,
123};
124use sp_keystore::KeystorePtr;
125
126mod error;
127
128mod metrics;
129use self::metrics::Metrics;
130
131#[cfg(test)]
132mod tests;
133
134const LOG_TARGET: &str = "parachain::candidate-backing";
135
136/// PoV data to validate.
137enum PoVData {
138	/// Already available (from candidate selection).
139	Ready(Arc<PoV>),
140	/// Needs to be fetched from validator (we are checking a signed statement).
141	FetchFromValidator {
142		from_validator: ValidatorIndex,
143		candidate_hash: CandidateHash,
144		pov_hash: Hash,
145	},
146}
147
148enum ValidatedCandidateCommand {
149	// We were instructed to second the candidate that has been already validated.
150	Second(BackgroundValidationResult),
151	// We were instructed to validate the candidate.
152	Attest(BackgroundValidationResult),
153	// We were not able to `Attest` because backing validator did not send us the PoV.
154	AttestNoPoV(CandidateHash),
155}
156
157impl std::fmt::Debug for ValidatedCandidateCommand {
158	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
159		let candidate_hash = self.candidate_hash();
160		match *self {
161			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
162			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
163			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
164		}
165	}
166}
167
168impl ValidatedCandidateCommand {
169	fn candidate_hash(&self) -> CandidateHash {
170		match *self {
171			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
172			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
173			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
174			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
175			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
176		}
177	}
178}
179
180/// The candidate backing subsystem.
181pub struct CandidateBackingSubsystem {
182	keystore: KeystorePtr,
183	metrics: Metrics,
184}
185
186impl CandidateBackingSubsystem {
187	/// Create a new instance of the `CandidateBackingSubsystem`.
188	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
189		Self { keystore, metrics }
190	}
191}
192
193#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
194impl<Context> CandidateBackingSubsystem
195where
196	Context: Send + Sync,
197{
198	fn start(self, ctx: Context) -> SpawnedSubsystem {
199		let future = async move {
200			run(ctx, self.keystore, self.metrics)
201				.await
202				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
203		}
204		.boxed();
205
206		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
207	}
208}
209
210struct PerRelayParentState {
211	/// The hash of the relay parent on top of which this job is doing it's work.
212	parent: Hash,
213	/// The node features.
214	node_features: NodeFeatures,
215	/// The executor parameters.
216	executor_params: Arc<ExecutorParams>,
217	/// The `CoreIndex` assigned to the local validator at this relay parent.
218	assigned_core: Option<CoreIndex>,
219	/// The candidates that are backed by enough validators in their group, by hash.
220	backed: HashSet<CandidateHash>,
221	/// The table of candidates and statements under this relay-parent.
222	table: Table<TableContext>,
223	/// The table context, including groups.
224	table_context: TableContext,
225	/// We issued `Seconded` or `Valid` statements on about these candidates.
226	issued_statements: HashSet<CandidateHash>,
227	/// These candidates are undergoing validation in the background.
228	awaiting_validation: HashSet<CandidateHash>,
229	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
230	fallbacks: HashMap<CandidateHash, AttestingData>,
231	/// The minimum backing votes threshold.
232	minimum_backing_votes: u32,
233	/// The number of cores.
234	n_cores: u32,
235	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
236	/// availability cores.
237	claim_queue: ClaimQueueSnapshot,
238	/// The validator index -> group mapping at this relay parent.
239	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
240	/// The associated group rotation information.
241	group_rotation_info: GroupRotationInfo,
242}
243
244struct PerCandidateState {
245	persisted_validation_data: PersistedValidationData,
246	seconded_locally: bool,
247	relay_parent: Hash,
248}
249
250/// A cache for storing data per-session to reduce repeated
251/// runtime API calls and avoid redundant computations.
252struct PerSessionCache {
253	/// Cache for storing validators list, retrieved from the runtime.
254	validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
255	/// Cache for storing node features, retrieved from the runtime.
256	node_features_cache: LruMap<SessionIndex, NodeFeatures>,
257	/// Cache for storing executor parameters, retrieved from the runtime.
258	executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
259	/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
260	minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261	/// Cache for storing validator-to-group mappings, computed from validator groups.
262	validator_to_group_cache:
263		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267	/// Creates a new `PerSessionCache` with a default capacity.
268	fn default() -> Self {
269		Self::new(2)
270	}
271}
272
273impl PerSessionCache {
274	/// Creates a new `PerSessionCache` with a given capacity.
275	fn new(capacity: u32) -> Self {
276		PerSessionCache {
277			validators_cache: LruMap::new(ByLength::new(capacity)),
278			node_features_cache: LruMap::new(ByLength::new(capacity)),
279			executor_params_cache: LruMap::new(ByLength::new(capacity)),
280			minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
281			validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
282		}
283	}
284
285	/// Gets validators from the cache or fetches them from the runtime if not present.
286	async fn validators(
287		&mut self,
288		session_index: SessionIndex,
289		parent: Hash,
290		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
291	) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
292		// Try to get the validators list from the cache.
293		if let Some(validators) = self.validators_cache.get(&session_index) {
294			return Ok(Arc::clone(validators));
295		}
296
297		// Fetch the validators list from the runtime since it was not in the cache.
298		let validators: Vec<ValidatorId> =
299			request_validators(parent, sender).await.await.map_err(|err| {
300				RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
301			})??;
302
303		// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
304		let validators = Arc::new(validators);
305
306		// Cache the fetched validators list for future use.
307		self.validators_cache.insert(session_index, Arc::clone(&validators));
308
309		Ok(validators)
310	}
311
312	/// Gets the node features from the cache or fetches it from the runtime if not present.
313	async fn node_features(
314		&mut self,
315		session_index: SessionIndex,
316		parent: Hash,
317		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
318	) -> Result<NodeFeatures, RuntimeApiError> {
319		// Try to get the node features from the cache.
320		if let Some(node_features) = self.node_features_cache.get(&session_index) {
321			return Ok(node_features.clone());
322		}
323
324		// Fetch the node features from the runtime since it was not in the cache.
325		let node_features = request_node_features(parent, session_index, sender)
326			.await
327			.await
328			.map_err(|err| RuntimeApiError::Execution {
329				runtime_api_name: "NodeFeatures",
330				source: Arc::new(err),
331			})??;
332
333		// Cache the fetched node features for future use.
334		self.node_features_cache.insert(session_index, node_features.clone());
335
336		Ok(node_features)
337	}
338
339	/// Gets the executor parameters from the cache or
340	/// fetches them from the runtime if not present.
341	async fn executor_params(
342		&mut self,
343		session_index: SessionIndex,
344		parent: Hash,
345		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
346	) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
347		// Try to get the executor parameters from the cache.
348		if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
349			return Ok(Arc::clone(executor_params));
350		}
351
352		// Fetch the executor parameters from the runtime since it was not in the cache.
353		let executor_params = request_session_executor_params(parent, session_index, sender)
354			.await
355			.await
356			.map_err(|err| RuntimeApiError::Execution {
357				runtime_api_name: "SessionExecutorParams",
358				source: Arc::new(err),
359			})??
360			.ok_or_else(|| RuntimeApiError::Execution {
361				runtime_api_name: "SessionExecutorParams",
362				source: Arc::new(Error::MissingExecutorParams),
363			})?;
364
365		// Wrap the executor parameters in an Arc to avoid a deep copy when storing it in the cache.
366		let executor_params = Arc::new(executor_params);
367
368		// Cache the fetched executor parameters for future use.
369		self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
370
371		Ok(executor_params)
372	}
373
374	/// Gets the minimum backing votes threshold from the
375	/// cache or fetches it from the runtime if not present.
376	async fn minimum_backing_votes(
377		&mut self,
378		session_index: SessionIndex,
379		parent: Hash,
380		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
381	) -> Result<u32, RuntimeApiError> {
382		// Try to get the value from the cache.
383		if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
384			return Ok(*minimum_backing_votes);
385		}
386
387		// Fetch the value from the runtime since it was not in the cache.
388		let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
389			.await
390			.await
391			.map_err(|err| RuntimeApiError::Execution {
392				runtime_api_name: "MinimumBackingVotes",
393				source: Arc::new(err),
394			})??;
395
396		// Cache the fetched value for future use.
397		self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
398
399		Ok(minimum_backing_votes)
400	}
401
402	/// Gets or computes the validator-to-group mapping for a session.
403	fn validator_to_group(
404		&mut self,
405		session_index: SessionIndex,
406		validators: &[ValidatorId],
407		validator_groups: &[Vec<ValidatorIndex>],
408	) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
409		let validator_to_group = self
410			.validator_to_group_cache
411			.get_or_insert(session_index, || {
412				let mut vector = vec![None; validators.len()];
413
414				for (group_idx, validator_group) in validator_groups.iter().enumerate() {
415					for validator in validator_group {
416						vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
417					}
418				}
419
420				Arc::new(IndexedVec::<_, _>::from(vector))
421			})
422			.expect("Just inserted");
423
424		Arc::clone(validator_to_group)
425	}
426}
427
428/// The state of the subsystem.
429struct State {
430	/// The utility for managing the implicit and explicit views in a consistent way.
431	implicit_view: ImplicitView,
432	/// State tracked for all relay-parents backing work is ongoing for. This includes
433	/// all active leaves.
434	per_relay_parent: HashMap<Hash, PerRelayParentState>,
435	/// State tracked for all candidates relevant to the implicit view.
436	///
437	/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
438	/// or explicit view for which a `Seconded` statement has been successfully imported.
439	per_candidate: HashMap<CandidateHash, PerCandidateState>,
440	/// A local cache for storing per-session data. This cache helps to
441	/// reduce repeated calls to the runtime and avoid redundant computations.
442	per_session_cache: PerSessionCache,
443	/// A clonable sender which is dispatched to background candidate validation tasks to inform
444	/// the main task of the result.
445	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
446	/// The handle to the keystore used for signing.
447	keystore: KeystorePtr,
448}
449
450impl State {
451	fn new(
452		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
453		keystore: KeystorePtr,
454	) -> Self {
455		State {
456			implicit_view: ImplicitView::default(),
457			per_relay_parent: HashMap::default(),
458			per_candidate: HashMap::new(),
459			per_session_cache: PerSessionCache::default(),
460			background_validation_tx,
461			keystore,
462		}
463	}
464}
465
466#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
467async fn run<Context>(
468	mut ctx: Context,
469	keystore: KeystorePtr,
470	metrics: Metrics,
471) -> FatalResult<()> {
472	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
473	let mut state = State::new(background_validation_tx, keystore);
474
475	loop {
476		let res =
477			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
478
479		match res {
480			Ok(()) => break,
481			Err(e) => crate::error::log_error(Err(e))?,
482		}
483	}
484
485	Ok(())
486}
487
488#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
489async fn run_iteration<Context>(
490	ctx: &mut Context,
491	state: &mut State,
492	metrics: &Metrics,
493	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
494) -> Result<(), Error> {
495	loop {
496		futures::select!(
497			validated_command = background_validation_rx.next().fuse() => {
498				if let Some((relay_parent, command)) = validated_command {
499					handle_validated_candidate_command(
500						&mut *ctx,
501						state,
502						relay_parent,
503						command,
504						metrics,
505					).await?;
506				} else {
507					panic!("background_validation_tx always alive at this point; qed");
508				}
509			}
510			from_overseer = ctx.recv().fuse() => {
511				match from_overseer.map_err(Error::OverseerExited)? {
512					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
513						handle_active_leaves_update(
514							&mut *ctx,
515							update,
516							state,
517						).await?;
518					}
519					FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
520					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
521					FromOrchestra::Communication { msg } => {
522						handle_communication(&mut *ctx, state, msg, metrics).await?;
523					}
524				}
525			}
526		)
527	}
528}
529
530/// In case a backing validator does not provide a PoV, we need to retry with other backing
531/// validators.
532///
533/// This is the data needed to accomplish this. Basically all the data needed for spawning a
534/// validation job and a list of backing validators, we can try.
535#[derive(Clone)]
536struct AttestingData {
537	/// The candidate to attest.
538	candidate: CandidateReceipt,
539	/// Hash of the PoV we need to fetch.
540	pov_hash: Hash,
541	/// Validator we are currently trying to get the PoV from.
542	from_validator: ValidatorIndex,
543	/// Other backing validators we can try in case `from_validator` failed.
544	backing: Vec<ValidatorIndex>,
545}
546
547#[derive(Default, Debug)]
548struct TableContext {
549	validator: Option<Validator>,
550	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
551	validators: Vec<ValidatorId>,
552	disabled_validators: Vec<ValidatorIndex>,
553}
554
555impl TableContext {
556	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
557	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
558		self.disabled_validators
559			.iter()
560			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
561	}
562
563	// Returns `true` if the local validator is in the disabled validators list
564	pub fn local_validator_is_disabled(&self) -> Option<bool> {
565		self.validator.as_ref().map(|v| v.disabled())
566	}
567}
568
569impl TableContextTrait for TableContext {
570	type AuthorityId = ValidatorIndex;
571	type Digest = CandidateHash;
572	type GroupId = CoreIndex;
573	type Signature = ValidatorSignature;
574	type Candidate = CommittedCandidateReceipt;
575
576	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
577		candidate.hash()
578	}
579
580	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
581		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
582	}
583
584	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
585		self.groups.get(group).map(|g| g.len())
586	}
587}
588
589// It looks like it's not possible to do an `impl From` given the current state of
590// the code. So this does the necessary conversion.
591fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
592	let statement = match s.payload() {
593		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
594		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
595	};
596
597	TableSignedStatement {
598		statement,
599		signature: s.signature().clone(),
600		sender: s.validator_index(),
601	}
602}
603
604fn table_attested_to_backed(
605	attested: TableAttestedCandidate<
606		CoreIndex,
607		CommittedCandidateReceipt,
608		ValidatorIndex,
609		ValidatorSignature,
610	>,
611	table_context: &TableContext,
612) -> Option<BackedCandidate> {
613	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
614
615	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
616		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
617
618	let group = table_context.groups.get(&core_index)?;
619
620	let mut validator_indices = BitVec::with_capacity(group.len());
621
622	validator_indices.resize(group.len(), false);
623
624	// The order of the validity votes in the backed candidate must match
625	// the order of bits set in the bitfield, which is not necessarily
626	// the order of the `validity_votes` we got from the table.
627	let mut vote_positions = Vec::with_capacity(validity_votes.len());
628	for (orig_idx, id) in ids.iter().enumerate() {
629		if let Some(position) = group.iter().position(|x| x == id) {
630			validator_indices.set(position, true);
631			vote_positions.push((orig_idx, position));
632		} else {
633			gum::warn!(
634				target: LOG_TARGET,
635				"Logic error: Validity vote from table does not correspond to group",
636			);
637
638			return None
639		}
640	}
641	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
642
643	Some(BackedCandidate::new(
644		candidate,
645		vote_positions
646			.into_iter()
647			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
648			.collect(),
649		validator_indices,
650		core_index,
651	))
652}
653
654async fn store_available_data(
655	sender: &mut impl overseer::CandidateBackingSenderTrait,
656	n_validators: u32,
657	candidate_hash: CandidateHash,
658	available_data: AvailableData,
659	expected_erasure_root: Hash,
660	core_index: CoreIndex,
661	node_features: NodeFeatures,
662) -> Result<(), Error> {
663	let (tx, rx) = oneshot::channel();
664	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
665	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
666	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
667	// candidate validity.
668	sender
669		.send_message(AvailabilityStoreMessage::StoreAvailableData {
670			candidate_hash,
671			n_validators,
672			available_data,
673			expected_erasure_root,
674			core_index,
675			node_features,
676			tx,
677		})
678		.await;
679
680	rx.await
681		.map_err(Error::StoreAvailableDataChannel)?
682		.map_err(Error::StoreAvailableData)
683}
684
685// Make a `PoV` available.
686//
687// This calls the AV store to write the available data to storage. The AV store also checks the
688// erasure root matches the `expected_erasure_root`.
689// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
690//
691// Otherwise, it returns `Ok(())`.
692async fn make_pov_available(
693	sender: &mut impl overseer::CandidateBackingSenderTrait,
694	n_validators: usize,
695	pov: Arc<PoV>,
696	candidate_hash: CandidateHash,
697	validation_data: PersistedValidationData,
698	expected_erasure_root: Hash,
699	core_index: CoreIndex,
700	node_features: NodeFeatures,
701) -> Result<(), Error> {
702	store_available_data(
703		sender,
704		n_validators as u32,
705		candidate_hash,
706		AvailableData { pov, validation_data },
707		expected_erasure_root,
708		core_index,
709		node_features,
710	)
711	.await
712}
713
714async fn request_pov(
715	sender: &mut impl overseer::CandidateBackingSenderTrait,
716	relay_parent: Hash,
717	from_validator: ValidatorIndex,
718	para_id: ParaId,
719	candidate_hash: CandidateHash,
720	pov_hash: Hash,
721) -> Result<Arc<PoV>, Error> {
722	let (tx, rx) = oneshot::channel();
723	sender
724		.send_message(AvailabilityDistributionMessage::FetchPoV {
725			relay_parent,
726			from_validator,
727			para_id,
728			candidate_hash,
729			pov_hash,
730			tx,
731		})
732		.await;
733
734	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
735	Ok(Arc::new(pov))
736}
737
738async fn request_candidate_validation(
739	sender: &mut impl overseer::CandidateBackingSenderTrait,
740	validation_data: PersistedValidationData,
741	validation_code: ValidationCode,
742	candidate_receipt: CandidateReceipt,
743	pov: Arc<PoV>,
744	executor_params: ExecutorParams,
745) -> Result<ValidationResult, Error> {
746	let (tx, rx) = oneshot::channel();
747	let is_system = candidate_receipt.descriptor.para_id().is_system();
748	let relay_parent = candidate_receipt.descriptor.relay_parent();
749
750	sender
751		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
752			validation_data,
753			validation_code,
754			candidate_receipt,
755			pov,
756			executor_params,
757			exec_kind: if is_system {
758				PvfExecKind::BackingSystemParas(relay_parent)
759			} else {
760				PvfExecKind::Backing(relay_parent)
761			},
762			response_sender: tx,
763		})
764		.await;
765
766	match rx.await {
767		Ok(Ok(validation_result)) => Ok(validation_result),
768		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
769		Err(err) => Err(Error::ValidateFromExhaustive(err)),
770	}
771}
772
773struct BackgroundValidationOutputs {
774	candidate: CandidateReceipt,
775	commitments: CandidateCommitments,
776	persisted_validation_data: PersistedValidationData,
777}
778
779type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
780
781struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
782	sender: S,
783	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
784	candidate: CandidateReceipt,
785	relay_parent: Hash,
786	node_features: NodeFeatures,
787	executor_params: Arc<ExecutorParams>,
788	persisted_validation_data: PersistedValidationData,
789	pov: PoVData,
790	n_validators: usize,
791	make_command: F,
792}
793
794async fn validate_and_make_available(
795	params: BackgroundValidationParams<
796		impl overseer::CandidateBackingSenderTrait,
797		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
798	>,
799	core_index: CoreIndex,
800) -> Result<(), Error> {
801	let BackgroundValidationParams {
802		mut sender,
803		mut tx_command,
804		candidate,
805		relay_parent,
806		node_features,
807		executor_params,
808		persisted_validation_data,
809		pov,
810		n_validators,
811		make_command,
812	} = params;
813
814	let validation_code = {
815		let validation_code_hash = candidate.descriptor().validation_code_hash();
816		let (tx, rx) = oneshot::channel();
817		sender
818			.send_message(RuntimeApiMessage::Request(
819				relay_parent,
820				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
821			))
822			.await;
823
824		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
825		match code {
826			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
827			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
828			Ok(Some(c)) => c,
829		}
830	};
831
832	let pov = match pov {
833		PoVData::Ready(pov) => pov,
834		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
835			match request_pov(
836				&mut sender,
837				relay_parent,
838				from_validator,
839				candidate.descriptor.para_id(),
840				candidate_hash,
841				pov_hash,
842			)
843			.await
844			{
845				Err(Error::FetchPoV) => {
846					tx_command
847						.send((
848							relay_parent,
849							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
850						))
851						.await
852						.map_err(Error::BackgroundValidationMpsc)?;
853					return Ok(())
854				},
855				Err(err) => return Err(err),
856				Ok(pov) => pov,
857			},
858	};
859
860	let v = {
861		request_candidate_validation(
862			&mut sender,
863			persisted_validation_data,
864			validation_code,
865			candidate.clone(),
866			pov.clone(),
867			executor_params.as_ref().clone(),
868		)
869		.await?
870	};
871
872	let res = match v {
873		ValidationResult::Valid(commitments, validation_data) => {
874			gum::debug!(
875				target: LOG_TARGET,
876				candidate_hash = ?candidate.hash(),
877				"Validation successful",
878			);
879
880			let erasure_valid = make_pov_available(
881				&mut sender,
882				n_validators,
883				pov.clone(),
884				candidate.hash(),
885				validation_data.clone(),
886				candidate.descriptor.erasure_root(),
887				core_index,
888				node_features,
889			)
890			.await;
891
892			match erasure_valid {
893				Ok(()) => Ok(BackgroundValidationOutputs {
894					candidate,
895					commitments,
896					persisted_validation_data: validation_data,
897				}),
898				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
899					gum::debug!(
900						target: LOG_TARGET,
901						candidate_hash = ?candidate.hash(),
902						actual_commitments = ?commitments,
903						"Erasure root doesn't match the announced by the candidate receipt",
904					);
905					Err(candidate)
906				},
907				// Bubble up any other error.
908				Err(e) => return Err(e),
909			}
910		},
911		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
912			// If validation produces a new set of commitments, we vote the candidate as invalid.
913			gum::warn!(
914				target: LOG_TARGET,
915				candidate_hash = ?candidate.hash(),
916				"Validation yielded different commitments",
917			);
918			Err(candidate)
919		},
920		ValidationResult::Invalid(reason) => {
921			gum::warn!(
922				target: LOG_TARGET,
923				candidate_hash = ?candidate.hash(),
924				reason = ?reason,
925				"Validation yielded an invalid candidate",
926			);
927			Err(candidate)
928		},
929	};
930
931	tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
932}
933
934#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
935async fn handle_communication<Context>(
936	ctx: &mut Context,
937	state: &mut State,
938	message: CandidateBackingMessage,
939	metrics: &Metrics,
940) -> Result<(), Error> {
941	match message {
942		CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
943			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
944		},
945		CandidateBackingMessage::Statement(relay_parent, statement) => {
946			handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
947		},
948		CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
949			handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
950		CandidateBackingMessage::CanSecond(request, tx) =>
951			handle_can_second_request(ctx, state, request, tx).await,
952	}
953
954	Ok(())
955}
956
957#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
958async fn handle_active_leaves_update<Context>(
959	ctx: &mut Context,
960	update: ActiveLeavesUpdate,
961	state: &mut State,
962) -> Result<(), Error> {
963	// Activate in implicit view before deactivate, per the docs
964	// on ImplicitView, this is more efficient.
965	let res = if let Some(leaf) = update.activated {
966		let leaf_hash = leaf.hash;
967		Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
968	} else {
969		None
970	};
971
972	for deactivated in update.deactivated {
973		state.implicit_view.deactivate_leaf(deactivated);
974	}
975
976	// clean up `per_relay_parent` according to ancestry
977	// of leaves. we do this so we can clean up candidates right after
978	// as a result.
979	{
980		let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
981
982		state.per_relay_parent.retain(|r, _| remaining.contains(&r));
983	}
984
985	// clean up `per_candidate` according to which relay-parents
986	// are known.
987	state
988		.per_candidate
989		.retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
990
991	// Get relay parents which might be fresh but might be known already
992	// that are explicit or implicit from the new active leaf.
993	let fresh_relay_parents = match res {
994		None => return Ok(()),
995		Some((leaf, Ok(_))) => {
996			let fresh_relay_parents =
997				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
998
999			let fresh_relay_parent = match fresh_relay_parents {
1000				Some(f) => f.to_vec(),
1001				None => {
1002					gum::warn!(
1003						target: LOG_TARGET,
1004						leaf_hash = ?leaf.hash,
1005						"Implicit view gave no relay-parents"
1006					);
1007
1008					vec![leaf.hash]
1009				},
1010			};
1011			fresh_relay_parent
1012		},
1013		Some((leaf, Err(e))) => {
1014			gum::debug!(
1015				target: LOG_TARGET,
1016				leaf_hash = ?leaf.hash,
1017				err = ?e,
1018				"Failed to load implicit view for leaf."
1019			);
1020
1021			return Ok(())
1022		},
1023	};
1024
1025	// add entries in `per_relay_parent`. for all new relay-parents.
1026	for maybe_new in fresh_relay_parents {
1027		if state.per_relay_parent.contains_key(&maybe_new) {
1028			continue
1029		}
1030
1031		// construct a `PerRelayParent` from the runtime API
1032		// and insert it.
1033		let per = construct_per_relay_parent_state(
1034			ctx,
1035			maybe_new,
1036			&state.keystore,
1037			&mut state.per_session_cache,
1038		)
1039		.await?;
1040
1041		if let Some(per) = per {
1042			state.per_relay_parent.insert(maybe_new, per);
1043		}
1044	}
1045
1046	Ok(())
1047}
1048
1049macro_rules! try_runtime_api {
1050	($x: expr) => {
1051		match $x {
1052			Ok(x) => x,
1053			Err(err) => {
1054				// Only bubble up fatal errors.
1055				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1056
1057				// We can't do candidate validation work if we don't have the
1058				// requisite runtime API data. But these errors should not take
1059				// down the node.
1060				return Ok(None)
1061			},
1062		}
1063	};
1064}
1065
1066fn core_index_from_statement(
1067	validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1068	group_rotation_info: &GroupRotationInfo,
1069	n_cores: u32,
1070	claim_queue: &ClaimQueueSnapshot,
1071	statement: &SignedFullStatementWithPVD,
1072) -> Option<CoreIndex> {
1073	let compact_statement = statement.as_unchecked();
1074	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1075
1076	gum::trace!(
1077		target:LOG_TARGET,
1078		?group_rotation_info,
1079		?statement,
1080		?validator_to_group,
1081		n_cores,
1082		?candidate_hash,
1083		"Extracting core index from statement"
1084	);
1085
1086	let statement_validator_index = statement.validator_index();
1087	let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1088		gum::debug!(
1089			target: LOG_TARGET,
1090			?group_rotation_info,
1091			?statement,
1092			?validator_to_group,
1093			n_cores,
1094			?candidate_hash,
1095			"Invalid validator index: {:?}",
1096			statement_validator_index
1097		);
1098		return None
1099	};
1100
1101	// First check if the statement para id matches the core assignment.
1102	let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1103
1104	if core_index.0 > n_cores {
1105		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1106		return None
1107	}
1108
1109	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1110		let candidate_para_id = candidate.descriptor.para_id();
1111		let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1112
1113		if !assigned_paras.any(|id| id == &candidate_para_id) {
1114			gum::debug!(
1115				target: LOG_TARGET,
1116				?candidate_hash,
1117				?core_index,
1118				assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1119				?candidate_para_id,
1120				"Invalid CoreIndex, core is not assigned to this para_id"
1121			);
1122			return None
1123		}
1124		return Some(core_index)
1125	} else {
1126		return Some(core_index)
1127	}
1128}
1129
1130/// Load the data necessary to do backing work on top of a relay-parent.
1131#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1132async fn construct_per_relay_parent_state<Context>(
1133	ctx: &mut Context,
1134	relay_parent: Hash,
1135	keystore: &KeystorePtr,
1136	per_session_cache: &mut PerSessionCache,
1137) -> Result<Option<PerRelayParentState>, Error> {
1138	let parent = relay_parent;
1139
1140	let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1141		request_session_index_for_child(parent, ctx.sender()).await,
1142		request_validator_groups(parent, ctx.sender()).await,
1143		request_claim_queue(parent, ctx.sender()).await,
1144		request_disabled_validators(parent, ctx.sender()).await,
1145	)
1146	.map_err(Error::JoinMultiple)?;
1147
1148	let session_index = try_runtime_api!(session_index);
1149
1150	let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1151	let validators = try_runtime_api!(validators);
1152
1153	let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1154	let node_features = try_runtime_api!(node_features);
1155
1156	let executor_params =
1157		per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1158	let executor_params = try_runtime_api!(executor_params);
1159
1160	gum::debug!(target: LOG_TARGET, ?parent, "New state");
1161
1162	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1163
1164	let minimum_backing_votes = per_session_cache
1165		.minimum_backing_votes(session_index, parent, ctx.sender())
1166		.await;
1167	let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1168	let claim_queue = try_runtime_api!(claim_queue);
1169	let disabled_validators = try_runtime_api!(disabled_validators);
1170
1171	let signing_context = SigningContext { parent_hash: parent, session_index };
1172	let validator = match Validator::construct(
1173		&validators,
1174		&disabled_validators,
1175		signing_context.clone(),
1176		keystore.clone(),
1177	) {
1178		Ok(v) => Some(v),
1179		Err(util::Error::NotAValidator) => None,
1180		Err(e) => {
1181			gum::warn!(
1182				target: LOG_TARGET,
1183				err = ?e,
1184				"Cannot participate in candidate backing",
1185			);
1186
1187			return Ok(None)
1188		},
1189	};
1190
1191	let n_cores = validator_groups.len();
1192
1193	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1194	let mut assigned_core = None;
1195
1196	for idx in 0..n_cores {
1197		let core_index = CoreIndex(idx as _);
1198
1199		if !claim_queue.contains_key(&core_index) {
1200			continue
1201		}
1202
1203		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1204		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1205			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1206				assigned_core = Some(core_index);
1207			}
1208			groups.insert(core_index, g.clone());
1209		}
1210	}
1211	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1212
1213	let validator_to_group =
1214		per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1215
1216	let table_context =
1217		TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1218
1219	Ok(Some(PerRelayParentState {
1220		parent,
1221		node_features,
1222		executor_params,
1223		assigned_core,
1224		backed: HashSet::new(),
1225		table: Table::new(),
1226		table_context,
1227		issued_statements: HashSet::new(),
1228		awaiting_validation: HashSet::new(),
1229		fallbacks: HashMap::new(),
1230		minimum_backing_votes,
1231		n_cores: validator_groups.len() as u32,
1232		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1233		validator_to_group,
1234		group_rotation_info,
1235	}))
1236}
1237
1238enum SecondingAllowed {
1239	No,
1240	// On which leaves is seconding allowed.
1241	Yes(Vec<Hash>),
1242}
1243
1244/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1245/// chain.
1246#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1247async fn seconding_sanity_check<Context>(
1248	ctx: &mut Context,
1249	implicit_view: &ImplicitView,
1250	hypothetical_candidate: HypotheticalCandidate,
1251) -> SecondingAllowed {
1252	let mut leaves_for_seconding = Vec::new();
1253	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1254
1255	let candidate_para = hypothetical_candidate.candidate_para();
1256	let candidate_relay_parent = hypothetical_candidate.relay_parent();
1257	let candidate_hash = hypothetical_candidate.candidate_hash();
1258
1259	for head in implicit_view.leaves() {
1260		// Check that the candidate relay parent is allowed for para, skip the
1261		// leaf otherwise.
1262		let allowed_parents_for_para =
1263			implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1264		if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1265			continue
1266		}
1267
1268		let (tx, rx) = oneshot::channel();
1269		ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1270			HypotheticalMembershipRequest {
1271				candidates: vec![hypothetical_candidate.clone()],
1272				fragment_chain_relay_parent: Some(*head),
1273			},
1274			tx,
1275		))
1276		.await;
1277		let response = rx.map_ok(move |candidate_memberships| {
1278			let is_member_or_potential = candidate_memberships
1279				.into_iter()
1280				.find_map(|(candidate, leaves)| {
1281					(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1282				})
1283				.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1284				.is_some();
1285
1286			(is_member_or_potential, head)
1287		});
1288		responses.push_back(response.boxed());
1289	}
1290
1291	if responses.is_empty() {
1292		return SecondingAllowed::No
1293	}
1294
1295	while let Some(response) = responses.next().await {
1296		match response {
1297			Err(oneshot::Canceled) => {
1298				gum::warn!(
1299					target: LOG_TARGET,
1300					"Failed to reach prospective parachains subsystem for hypothetical membership",
1301				);
1302
1303				return SecondingAllowed::No
1304			},
1305			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1306				false => {
1307					gum::debug!(
1308						target: LOG_TARGET,
1309						?candidate_hash,
1310						leaf_hash = ?head,
1311						"Refusing to second candidate at leaf. Is not a potential member.",
1312					);
1313				},
1314				true => {
1315					leaves_for_seconding.push(*head);
1316				},
1317			},
1318		}
1319	}
1320
1321	if leaves_for_seconding.is_empty() {
1322		SecondingAllowed::No
1323	} else {
1324		SecondingAllowed::Yes(leaves_for_seconding)
1325	}
1326}
1327
1328/// Performs seconding sanity check for an advertisement.
1329#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1330async fn handle_can_second_request<Context>(
1331	ctx: &mut Context,
1332	state: &State,
1333	request: CanSecondRequest,
1334	tx: oneshot::Sender<bool>,
1335) {
1336	let relay_parent = request.candidate_relay_parent;
1337	let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1338		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1339			candidate_hash: request.candidate_hash,
1340			candidate_para: request.candidate_para_id,
1341			parent_head_data_hash: request.parent_head_data_hash,
1342			candidate_relay_parent: relay_parent,
1343		};
1344
1345		let result =
1346			seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1347
1348		match result {
1349			SecondingAllowed::No => false,
1350			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1351		}
1352	} else {
1353		// Relay parent is unknown or async backing is disabled.
1354		false
1355	};
1356
1357	let _ = tx.send(response);
1358}
1359
1360#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1361async fn handle_validated_candidate_command<Context>(
1362	ctx: &mut Context,
1363	state: &mut State,
1364	relay_parent: Hash,
1365	command: ValidatedCandidateCommand,
1366	metrics: &Metrics,
1367) -> Result<(), Error> {
1368	match state.per_relay_parent.get_mut(&relay_parent) {
1369		Some(rp_state) => {
1370			let candidate_hash = command.candidate_hash();
1371			rp_state.awaiting_validation.remove(&candidate_hash);
1372
1373			match command {
1374				ValidatedCandidateCommand::Second(res) => match res {
1375					Ok(outputs) => {
1376						let BackgroundValidationOutputs {
1377							candidate,
1378							commitments,
1379							persisted_validation_data,
1380						} = outputs;
1381
1382						if rp_state.issued_statements.contains(&candidate_hash) {
1383							return Ok(())
1384						}
1385
1386						let receipt = CommittedCandidateReceipt {
1387							descriptor: candidate.descriptor.clone(),
1388							commitments,
1389						};
1390
1391						let hypothetical_candidate = HypotheticalCandidate::Complete {
1392							candidate_hash,
1393							receipt: Arc::new(receipt.clone()),
1394							persisted_validation_data: persisted_validation_data.clone(),
1395						};
1396						// sanity check that we're allowed to second the candidate
1397						// and that it doesn't conflict with other candidates we've
1398						// seconded.
1399						if let SecondingAllowed::No = seconding_sanity_check(
1400							ctx,
1401							&state.implicit_view,
1402							hypothetical_candidate,
1403						)
1404						.await
1405						{
1406							return Ok(())
1407						};
1408
1409						let statement =
1410							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1411
1412						// If we get an Error::RejectedByProspectiveParachains,
1413						// then the statement has not been distributed or imported into
1414						// the table.
1415						let res = sign_import_and_distribute_statement(
1416							ctx,
1417							rp_state,
1418							&mut state.per_candidate,
1419							statement,
1420							state.keystore.clone(),
1421							metrics,
1422						)
1423						.await;
1424
1425						if let Err(Error::RejectedByProspectiveParachains) = res {
1426							let candidate_hash = candidate.hash();
1427							gum::debug!(
1428								target: LOG_TARGET,
1429								relay_parent = ?candidate.descriptor().relay_parent(),
1430								?candidate_hash,
1431								"Attempted to second candidate but was rejected by prospective parachains",
1432							);
1433
1434							// Ensure the collator is reported.
1435							ctx.send_message(CollatorProtocolMessage::Invalid(
1436								candidate.descriptor().relay_parent(),
1437								candidate,
1438							))
1439							.await;
1440
1441							return Ok(())
1442						}
1443
1444						if let Some(stmt) = res? {
1445							match state.per_candidate.get_mut(&candidate_hash) {
1446								None => {
1447									gum::warn!(
1448										target: LOG_TARGET,
1449										?candidate_hash,
1450										"Missing `per_candidate` for seconded candidate.",
1451									);
1452								},
1453								Some(p) => p.seconded_locally = true,
1454							}
1455
1456							rp_state.issued_statements.insert(candidate_hash);
1457
1458							metrics.on_candidate_seconded();
1459							ctx.send_message(CollatorProtocolMessage::Seconded(
1460								rp_state.parent,
1461								StatementWithPVD::drop_pvd_from_signed(stmt),
1462							))
1463							.await;
1464						}
1465					},
1466					Err(candidate) => {
1467						ctx.send_message(CollatorProtocolMessage::Invalid(
1468							rp_state.parent,
1469							candidate,
1470						))
1471						.await;
1472					},
1473				},
1474				ValidatedCandidateCommand::Attest(res) => {
1475					// We are done - avoid new validation spawns:
1476					rp_state.fallbacks.remove(&candidate_hash);
1477					// sanity check.
1478					if !rp_state.issued_statements.contains(&candidate_hash) {
1479						if res.is_ok() {
1480							let statement = StatementWithPVD::Valid(candidate_hash);
1481
1482							sign_import_and_distribute_statement(
1483								ctx,
1484								rp_state,
1485								&mut state.per_candidate,
1486								statement,
1487								state.keystore.clone(),
1488								metrics,
1489							)
1490							.await?;
1491						}
1492						rp_state.issued_statements.insert(candidate_hash);
1493					}
1494				},
1495				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1496					if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1497						if let Some(index) = attesting.backing.pop() {
1498							attesting.from_validator = index;
1499							let attesting = attesting.clone();
1500
1501							// The candidate state should be available because we've
1502							// validated it before, the relay-parent is still around,
1503							// and candidates are pruned on the basis of relay-parents.
1504							//
1505							// If it's not, then no point in validating it anyway.
1506							if let Some(pvd) = state
1507								.per_candidate
1508								.get(&candidate_hash)
1509								.map(|pc| pc.persisted_validation_data.clone())
1510							{
1511								kick_off_validation_work(
1512									ctx,
1513									rp_state,
1514									pvd,
1515									&state.background_validation_tx,
1516									attesting,
1517								)
1518								.await?;
1519							}
1520						}
1521					} else {
1522						gum::warn!(
1523							target: LOG_TARGET,
1524							"AttestNoPoV was triggered without fallback being available."
1525						);
1526						debug_assert!(false);
1527					}
1528				},
1529			}
1530		},
1531		None => {
1532			// simple race condition; can be ignored = this relay-parent
1533			// is no longer relevant.
1534		},
1535	}
1536
1537	Ok(())
1538}
1539
1540fn sign_statement(
1541	rp_state: &PerRelayParentState,
1542	statement: StatementWithPVD,
1543	keystore: KeystorePtr,
1544	metrics: &Metrics,
1545) -> Option<SignedFullStatementWithPVD> {
1546	let signed = rp_state
1547		.table_context
1548		.validator
1549		.as_ref()?
1550		.sign(keystore, statement)
1551		.ok()
1552		.flatten()?;
1553	metrics.on_statement_signed();
1554	Some(signed)
1555}
1556
1557/// Import a statement into the statement table and return the summary of the import.
1558///
1559/// This will fail with `Error::RejectedByProspectiveParachains` if the message type is seconded,
1560/// the candidate is fresh, and any of the following are true:
1561/// 1. There is no `PersistedValidationData` attached.
1562/// 2. Prospective parachains subsystem returned an empty `HypotheticalMembership` i.e. did not
1563///    recognize the candidate as being applicable to any of the active leaves.
1564#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1565async fn import_statement<Context>(
1566	ctx: &mut Context,
1567	rp_state: &mut PerRelayParentState,
1568	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1569	statement: &SignedFullStatementWithPVD,
1570) -> Result<Option<TableSummary>, Error> {
1571	let candidate_hash = statement.payload().candidate_hash();
1572
1573	gum::debug!(
1574		target: LOG_TARGET,
1575		statement = ?statement.payload().to_compact(),
1576		validator_index = statement.validator_index().0,
1577		?candidate_hash,
1578		"Importing statement",
1579	);
1580
1581	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1582	// we need to create an entry in the `PerCandidateState` map.
1583	//
1584	// We also need to inform the prospective parachains subsystem of the seconded candidate.
1585	// If `ProspectiveParachainsMessage::Second` fails, then we return
1586	// Error::RejectedByProspectiveParachains.
1587	//
1588	// Persisted Validation Data should be available - it may already be available
1589	// if this is a candidate we are seconding.
1590	//
1591	// We should also not accept any candidates which have no valid depths under any of
1592	// our active leaves.
1593	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1594		if !per_candidate.contains_key(&candidate_hash) {
1595			let (tx, rx) = oneshot::channel();
1596			ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1597				IntroduceSecondedCandidateRequest {
1598					candidate_para: candidate.descriptor.para_id(),
1599					candidate_receipt: candidate.clone(),
1600					persisted_validation_data: pvd.clone(),
1601				},
1602				tx,
1603			))
1604			.await;
1605
1606			match rx.await {
1607				Err(oneshot::Canceled) => {
1608					gum::warn!(
1609						target: LOG_TARGET,
1610						"Could not reach the Prospective Parachains subsystem."
1611					);
1612
1613					return Err(Error::RejectedByProspectiveParachains)
1614				},
1615				Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1616				Ok(true) => {},
1617			}
1618
1619			// Only save the candidate if it was approved by prospective parachains.
1620			per_candidate.insert(
1621				candidate_hash,
1622				PerCandidateState {
1623					persisted_validation_data: pvd.clone(),
1624					// This is set after importing when seconding locally.
1625					seconded_locally: false,
1626					relay_parent: candidate.descriptor.relay_parent(),
1627				},
1628			);
1629		}
1630	}
1631
1632	let stmt = primitive_statement_to_table(statement);
1633
1634	let core = core_index_from_statement(
1635		&rp_state.validator_to_group,
1636		&rp_state.group_rotation_info,
1637		rp_state.n_cores,
1638		&rp_state.claim_queue,
1639		statement,
1640	)
1641	.ok_or(Error::CoreIndexUnavailable)?;
1642
1643	Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1644}
1645
1646/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1647/// misbehaviors as a result of importing a statement.
1648#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1649async fn post_import_statement_actions<Context>(
1650	ctx: &mut Context,
1651	rp_state: &mut PerRelayParentState,
1652	summary: Option<&TableSummary>,
1653) {
1654	if let Some(attested) = summary.as_ref().and_then(|s| {
1655		rp_state.table.attested_candidate(
1656			&s.candidate,
1657			&rp_state.table_context,
1658			rp_state.minimum_backing_votes,
1659		)
1660	}) {
1661		let candidate_hash = attested.candidate.hash();
1662
1663		// `HashSet::insert` returns true if the thing wasn't in there already.
1664		if rp_state.backed.insert(candidate_hash) {
1665			if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1666				let para_id = backed.candidate().descriptor.para_id();
1667				gum::debug!(
1668					target: LOG_TARGET,
1669					candidate_hash = ?candidate_hash,
1670					relay_parent = ?rp_state.parent,
1671					%para_id,
1672					"Candidate backed",
1673				);
1674
1675				// Inform the prospective parachains subsystem
1676				// that the candidate is now backed.
1677				ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1678					para_id,
1679					candidate_hash,
1680				))
1681				.await;
1682				// Notify statement distribution of backed candidate.
1683				ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1684			} else {
1685				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1686			}
1687		} else {
1688			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1689		}
1690	} else {
1691		gum::debug!(target: LOG_TARGET, "No attested candidate");
1692	}
1693
1694	issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1695}
1696
1697/// Check if there have happened any new misbehaviors and issue necessary messages.
1698#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1699fn issue_new_misbehaviors<Context>(
1700	ctx: &mut Context,
1701	relay_parent: Hash,
1702	table: &mut Table<TableContext>,
1703) {
1704	// collect the misbehaviors to avoid double mutable self borrow issues
1705	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1706	for (validator_id, report) in misbehaviors {
1707		// The provisioner waits on candidate-backing, which means
1708		// that we need to send unbounded messages to avoid cycles.
1709		//
1710		// Misbehaviors are bounded by the number of validators and
1711		// the block production protocol.
1712		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1713			relay_parent,
1714			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1715		));
1716	}
1717}
1718
1719/// Sign, import, and distribute a statement.
1720#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1721async fn sign_import_and_distribute_statement<Context>(
1722	ctx: &mut Context,
1723	rp_state: &mut PerRelayParentState,
1724	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1725	statement: StatementWithPVD,
1726	keystore: KeystorePtr,
1727	metrics: &Metrics,
1728) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1729	if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1730		let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1731
1732		// `Share` must always be sent before `Backed`. We send the latter in
1733		// `post_import_statement_action` below.
1734		let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1735		ctx.send_unbounded_message(smsg);
1736
1737		post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1738
1739		Ok(Some(signed_statement))
1740	} else {
1741		Ok(None)
1742	}
1743}
1744
1745#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1746async fn background_validate_and_make_available<Context>(
1747	ctx: &mut Context,
1748	rp_state: &mut PerRelayParentState,
1749	params: BackgroundValidationParams<
1750		impl overseer::CandidateBackingSenderTrait,
1751		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1752	>,
1753) -> Result<(), Error> {
1754	let candidate_hash = params.candidate.hash();
1755	let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1756	if rp_state.awaiting_validation.insert(candidate_hash) {
1757		// spawn background task.
1758		let bg = async move {
1759			if let Err(error) = validate_and_make_available(params, core_index).await {
1760				if let Error::BackgroundValidationMpsc(error) = error {
1761					gum::debug!(
1762						target: LOG_TARGET,
1763						?candidate_hash,
1764						?error,
1765						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1766					);
1767				} else {
1768					gum::error!(
1769						target: LOG_TARGET,
1770						?candidate_hash,
1771						?error,
1772						"Failed to validate and make available",
1773					);
1774				}
1775			}
1776		};
1777
1778		ctx.spawn("backing-validation", bg.boxed())
1779			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1780	}
1781
1782	Ok(())
1783}
1784
1785/// Kick off validation work and distribute the result as a signed statement.
1786#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1787async fn kick_off_validation_work<Context>(
1788	ctx: &mut Context,
1789	rp_state: &mut PerRelayParentState,
1790	persisted_validation_data: PersistedValidationData,
1791	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1792	attesting: AttestingData,
1793) -> Result<(), Error> {
1794	// Do nothing if the local validator is disabled or not a validator at all
1795	match rp_state.table_context.local_validator_is_disabled() {
1796		Some(true) => {
1797			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1798			return Ok(())
1799		},
1800		Some(false) => {}, // we are not disabled - move on
1801		None => {
1802			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1803			return Ok(())
1804		},
1805	}
1806
1807	let candidate_hash = attesting.candidate.hash();
1808	if rp_state.issued_statements.contains(&candidate_hash) {
1809		return Ok(())
1810	}
1811
1812	gum::debug!(
1813		target: LOG_TARGET,
1814		candidate_hash = ?candidate_hash,
1815		candidate_receipt = ?attesting.candidate,
1816		"Kicking off validation",
1817	);
1818
1819	let bg_sender = ctx.sender().clone();
1820	let pov = PoVData::FetchFromValidator {
1821		from_validator: attesting.from_validator,
1822		candidate_hash,
1823		pov_hash: attesting.pov_hash,
1824	};
1825
1826	background_validate_and_make_available(
1827		ctx,
1828		rp_state,
1829		BackgroundValidationParams {
1830			sender: bg_sender,
1831			tx_command: background_validation_tx.clone(),
1832			candidate: attesting.candidate,
1833			relay_parent: rp_state.parent,
1834			node_features: rp_state.node_features.clone(),
1835			executor_params: Arc::clone(&rp_state.executor_params),
1836			persisted_validation_data,
1837			pov,
1838			n_validators: rp_state.table_context.validators.len(),
1839			make_command: ValidatedCandidateCommand::Attest,
1840		},
1841	)
1842	.await
1843}
1844
1845/// Import the statement and kick off validation work if it is a part of our assignment.
1846#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1847async fn maybe_validate_and_import<Context>(
1848	ctx: &mut Context,
1849	state: &mut State,
1850	relay_parent: Hash,
1851	statement: SignedFullStatementWithPVD,
1852) -> Result<(), Error> {
1853	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1854		Some(r) => r,
1855		None => {
1856			gum::trace!(
1857				target: LOG_TARGET,
1858				?relay_parent,
1859				"Received statement for unknown relay-parent"
1860			);
1861
1862			return Ok(())
1863		},
1864	};
1865
1866	// Don't import statement if the sender is disabled
1867	if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1868		gum::debug!(
1869			target: LOG_TARGET,
1870			sender_validator_idx = ?statement.validator_index(),
1871			"Not importing statement because the sender is disabled"
1872		);
1873		return Ok(())
1874	}
1875
1876	let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1877
1878	// if we get an Error::RejectedByProspectiveParachains,
1879	// we will do nothing.
1880	if let Err(Error::RejectedByProspectiveParachains) = res {
1881		gum::debug!(
1882			target: LOG_TARGET,
1883			?relay_parent,
1884			"Statement rejected by prospective parachains."
1885		);
1886
1887		return Ok(())
1888	}
1889
1890	let summary = res?;
1891	post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1892
1893	if let Some(summary) = summary {
1894		// import_statement already takes care of communicating with the
1895		// prospective parachains subsystem. At this point, the candidate
1896		// has already been accepted by the subsystem.
1897
1898		let candidate_hash = summary.candidate;
1899
1900		if Some(summary.group_id) != rp_state.assigned_core {
1901			return Ok(())
1902		}
1903
1904		let attesting = match statement.payload() {
1905			StatementWithPVD::Seconded(receipt, _) => {
1906				let attesting = AttestingData {
1907					candidate: rp_state
1908						.table
1909						.get_candidate(&candidate_hash)
1910						.ok_or(Error::CandidateNotFound)?
1911						.to_plain(),
1912					pov_hash: receipt.descriptor.pov_hash(),
1913					from_validator: statement.validator_index(),
1914					backing: Vec::new(),
1915				};
1916				rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1917				attesting
1918			},
1919			StatementWithPVD::Valid(candidate_hash) => {
1920				if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1921					let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1922					if our_index == Some(statement.validator_index()) {
1923						return Ok(())
1924					}
1925
1926					if rp_state.awaiting_validation.contains(candidate_hash) {
1927						// Job already running:
1928						attesting.backing.push(statement.validator_index());
1929						return Ok(())
1930					} else {
1931						// No job, so start another with current validator:
1932						attesting.from_validator = statement.validator_index();
1933						attesting.clone()
1934					}
1935				} else {
1936					return Ok(())
1937				}
1938			},
1939		};
1940
1941		// After `import_statement` succeeds, the candidate entry is guaranteed
1942		// to exist.
1943		if let Some(pvd) = state
1944			.per_candidate
1945			.get(&candidate_hash)
1946			.map(|pc| pc.persisted_validation_data.clone())
1947		{
1948			kick_off_validation_work(
1949				ctx,
1950				rp_state,
1951				pvd,
1952				&state.background_validation_tx,
1953				attesting,
1954			)
1955			.await?;
1956		}
1957	}
1958	Ok(())
1959}
1960
1961/// Kick off background validation with intent to second.
1962#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1963async fn validate_and_second<Context>(
1964	ctx: &mut Context,
1965	rp_state: &mut PerRelayParentState,
1966	persisted_validation_data: PersistedValidationData,
1967	candidate: &CandidateReceipt,
1968	pov: Arc<PoV>,
1969	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1970) -> Result<(), Error> {
1971	let candidate_hash = candidate.hash();
1972
1973	gum::debug!(
1974		target: LOG_TARGET,
1975		candidate_hash = ?candidate_hash,
1976		candidate_receipt = ?candidate,
1977		"Validate and second candidate",
1978	);
1979
1980	let bg_sender = ctx.sender().clone();
1981	background_validate_and_make_available(
1982		ctx,
1983		rp_state,
1984		BackgroundValidationParams {
1985			sender: bg_sender,
1986			tx_command: background_validation_tx.clone(),
1987			candidate: candidate.clone(),
1988			relay_parent: rp_state.parent,
1989			node_features: rp_state.node_features.clone(),
1990			executor_params: Arc::clone(&rp_state.executor_params),
1991			persisted_validation_data,
1992			pov: PoVData::Ready(pov),
1993			n_validators: rp_state.table_context.validators.len(),
1994			make_command: ValidatedCandidateCommand::Second,
1995		},
1996	)
1997	.await?;
1998
1999	Ok(())
2000}
2001
2002#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2003async fn handle_second_message<Context>(
2004	ctx: &mut Context,
2005	state: &mut State,
2006	candidate: CandidateReceipt,
2007	persisted_validation_data: PersistedValidationData,
2008	pov: PoV,
2009	metrics: &Metrics,
2010) -> Result<(), Error> {
2011	let _timer = metrics.time_process_second();
2012
2013	let candidate_hash = candidate.hash();
2014	let relay_parent = candidate.descriptor().relay_parent();
2015
2016	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2017		gum::warn!(
2018			target: LOG_TARGET,
2019			?candidate_hash,
2020			"Candidate backing was asked to second candidate with wrong PVD",
2021		);
2022
2023		return Ok(())
2024	}
2025
2026	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2027		None => {
2028			gum::trace!(
2029				target: LOG_TARGET,
2030				?relay_parent,
2031				?candidate_hash,
2032				"We were asked to second a candidate outside of our view."
2033			);
2034
2035			return Ok(())
2036		},
2037		Some(r) => r,
2038	};
2039
2040	// Just return if the local validator is disabled. If we are here the local node should be a
2041	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2042	if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2043		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2044		return Ok(())
2045	}
2046
2047	let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2048
2049	// Sanity check that candidate is from our assignment.
2050	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2051		gum::debug!(
2052			target: LOG_TARGET,
2053			our_assignment_core = ?rp_state.assigned_core,
2054			our_assignment_paras = ?assigned_paras,
2055			collation = ?candidate.descriptor().para_id(),
2056			"Subsystem asked to second for para outside of our assignment",
2057		);
2058		return Ok(());
2059	}
2060
2061	gum::debug!(
2062		target: LOG_TARGET,
2063		our_assignment_core = ?rp_state.assigned_core,
2064		our_assignment_paras = ?assigned_paras,
2065		collation = ?candidate.descriptor().para_id(),
2066		"Current assignments vs collation",
2067	);
2068
2069	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2070	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2071	//
2072	// The actual logic of issuing the signed statement checks that this isn't
2073	// conflicting with other seconded candidates. Not doing that check here
2074	// gives other subsystems the ability to get us to execute arbitrary candidates,
2075	// but no more.
2076	if !rp_state.issued_statements.contains(&candidate_hash) {
2077		let pov = Arc::new(pov);
2078
2079		validate_and_second(
2080			ctx,
2081			rp_state,
2082			persisted_validation_data,
2083			&candidate,
2084			pov,
2085			&state.background_validation_tx,
2086		)
2087		.await?;
2088	}
2089
2090	Ok(())
2091}
2092
2093#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2094async fn handle_statement_message<Context>(
2095	ctx: &mut Context,
2096	state: &mut State,
2097	relay_parent: Hash,
2098	statement: SignedFullStatementWithPVD,
2099	metrics: &Metrics,
2100) -> Result<(), Error> {
2101	let _timer = metrics.time_process_statement();
2102
2103	// Validator disabling is handled in `maybe_validate_and_import`
2104	match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2105		Err(Error::ValidationFailed(_)) => Ok(()),
2106		Err(e) => Err(e),
2107		Ok(()) => Ok(()),
2108	}
2109}
2110
2111fn handle_get_backable_candidates_message(
2112	state: &State,
2113	requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2114	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2115	metrics: &Metrics,
2116) -> Result<(), Error> {
2117	let _timer = metrics.time_get_backed_candidates();
2118
2119	let mut backed = HashMap::with_capacity(requested_candidates.len());
2120
2121	for (para_id, para_candidates) in requested_candidates {
2122		for (candidate_hash, relay_parent) in para_candidates.iter() {
2123			let rp_state = match state.per_relay_parent.get(&relay_parent) {
2124				Some(rp_state) => rp_state,
2125				None => {
2126					gum::debug!(
2127						target: LOG_TARGET,
2128						?relay_parent,
2129						?candidate_hash,
2130						"Requested candidate's relay parent is out of view",
2131					);
2132					break
2133				},
2134			};
2135			let maybe_backed_candidate = rp_state
2136				.table
2137				.attested_candidate(
2138					candidate_hash,
2139					&rp_state.table_context,
2140					rp_state.minimum_backing_votes,
2141				)
2142				.and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2143
2144			if let Some(backed_candidate) = maybe_backed_candidate {
2145				backed
2146					.entry(para_id)
2147					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2148					.push(backed_candidate);
2149			} else {
2150				break
2151			}
2152		}
2153	}
2154
2155	tx.send(backed).map_err(|data| Error::Send(data))?;
2156	Ok(())
2157}