referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_approval_voting/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Approval Voting Subsystem.
18//!
19//! This subsystem is responsible for determining candidates to do approval checks
20//! on, performing those approval checks, and tracking the assignments and approvals
21//! of others. It uses this information to determine when candidates and blocks have
22//! been sufficiently approved to finalize.
23
24use futures_timer::Delay;
25use polkadot_node_primitives::{
26	approval::{
27		v1::{BlockApprovalMeta, DelayTranche},
28		v2::{
29			AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30			IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31		},
32	},
33	ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36	errors::RecoveryError,
37	messages::{
38		ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39		ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40		AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41		ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42		DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43		RuntimeApiRequest,
44	},
45	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46	SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49	self,
50	database::Database,
51	metrics::{self, prometheus},
52	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53	TimeoutExt,
54};
55use polkadot_primitives::{
56	ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57	CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, GroupIndex, Hash,
58	SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
59};
60use sc_keystore::LocalKeystore;
61use sp_application_crypto::Pair;
62use sp_consensus::SyncOracle;
63use sp_consensus_slots::Slot;
64use std::time::Instant;
65
66// The max number of blocks we keep track of assignments gathering times. Normally,
67// this would never be reached because we prune the data on finalization, but we need
68// to also ensure the data is not growing unecessarily large.
69const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
70
71use futures::{
72	channel::oneshot,
73	future::{BoxFuture, RemoteHandle},
74	prelude::*,
75	stream::FuturesUnordered,
76	StreamExt,
77};
78
79use std::{
80	cmp::min,
81	collections::{
82		btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
83	},
84	sync::Arc,
85	time::Duration,
86};
87
88use schnellru::{ByLength, LruMap};
89
90use approval_checking::RequiredTranches;
91use bitvec::{order::Lsb0, vec::BitVec};
92pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
93use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
94use polkadot_node_primitives::approval::time::{
95	slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
96};
97
98mod approval_checking;
99pub mod approval_db;
100mod backend;
101pub mod criteria;
102mod import;
103mod ops;
104mod persisted_entries;
105
106use crate::{
107	approval_checking::{Check, TranchesToApproveResult},
108	approval_db::common::{Config as DatabaseConfig, DbBackend},
109	backend::{Backend, OverlayedBackend},
110	criteria::InvalidAssignmentReason,
111	persisted_entries::OurApproval,
112};
113
114#[cfg(test)]
115mod tests;
116
117const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
118/// How long are we willing to wait for approval signatures?
119///
120/// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
121/// lock issues for example.
122const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
123const APPROVAL_CACHE_SIZE: u32 = 1024;
124
125/// The maximum number of times we retry to approve a block if is still needed.
126const MAX_APPROVAL_RETRIES: u32 = 16;
127
128const APPROVAL_DELAY: Tick = 2;
129pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
130
131// The max number of ticks we delay sending the approval after we are ready to issue the approval
132const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
133
134// If the node restarted and the tranche has passed without the assignment
135// being trigger, we won't trigger the assignment at restart because we don't have
136// an wakeup schedule for it.
137// The solution, is to always schedule a wake up after the restart and let the
138// process_wakeup to decide if the assignment needs to be triggered.
139// We need to have a delay after restart to give time to the node to catch up with
140// messages and not trigger its assignment unnecessarily, because it hasn't seen
141// the assignments from the other validators.
142const RESTART_WAKEUP_DELAY: Tick = 12;
143
144/// Configuration for the approval voting subsystem
145#[derive(Debug, Clone)]
146pub struct Config {
147	/// The column family in the DB where approval-voting data is stored.
148	pub col_approval_data: u32,
149	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
150	/// divisible by 500.
151	pub slot_duration_millis: u64,
152}
153
154// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
155// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
156//
157// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
158// the node follows the new incoming blocks and finalized number, but does not yet participate.
159//
160// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
161// subsystem of all unfinalized blocks and the candidates included within them, as well as all
162// votes that the local node itself has cast on candidates within those blocks.
163enum Mode {
164	Active,
165	Syncing(Box<dyn SyncOracle + Send>),
166}
167
168/// The approval voting subsystem.
169pub struct ApprovalVotingSubsystem {
170	/// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
171	///
172	/// We do a lot of VRF signing and need the keys to have low latency.
173	keystore: Arc<LocalKeystore>,
174	db_config: DatabaseConfig,
175	slot_duration_millis: u64,
176	db: Arc<dyn Database>,
177	mode: Mode,
178	metrics: Metrics,
179	clock: Arc<dyn Clock + Send + Sync>,
180	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
181	/// The maximum time we retry to approve a block if it is still needed and PoV fetch failed.
182	max_approval_retries: u32,
183	/// The backoff before we retry the approval.
184	retry_backoff: Duration,
185}
186
187#[derive(Clone)]
188struct MetricsInner {
189	imported_candidates_total: prometheus::Counter<prometheus::U64>,
190	assignments_produced: prometheus::Histogram,
191	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
192	no_shows_total: prometheus::Counter<prometheus::U64>,
193	// The difference from `no_shows_total` is that this counts all observed no-shows at any
194	// moment in time. While `no_shows_total` catches that the no-shows at the moment the candidate
195	// is approved, approvals might arrive late and `no_shows_total` wouldn't catch that number.
196	observed_no_shows: prometheus::Counter<prometheus::U64>,
197	approved_by_one_third: prometheus::Counter<prometheus::U64>,
198	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
199	coalesced_approvals_buckets: prometheus::Histogram,
200	coalesced_approvals_delay: prometheus::Histogram,
201	candidate_approval_time_ticks: prometheus::Histogram,
202	block_approval_time_ticks: prometheus::Histogram,
203	time_db_transaction: prometheus::Histogram,
204	time_recover_and_approve: prometheus::Histogram,
205	candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
206	unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
207	// The time it takes in each stage to gather enough assignments.
208	// We defined a `stage` as being the entire process of gathering enough assignments to
209	// be able to approve a candidate:
210	// E.g:
211	// - Stage 0: We wait for the needed_approvals assignments to be gathered.
212	// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
213	// - Stage 2: We wait for enough tranches to cover all no-shows  of stage 1.
214	assignments_gathering_time_by_stage: prometheus::HistogramVec,
215}
216
217/// Approval Voting metrics.
218#[derive(Default, Clone)]
219pub struct Metrics(Option<MetricsInner>);
220
221impl Metrics {
222	fn on_candidate_imported(&self) {
223		if let Some(metrics) = &self.0 {
224			metrics.imported_candidates_total.inc();
225		}
226	}
227
228	fn on_assignment_produced(&self, tranche: DelayTranche) {
229		if let Some(metrics) = &self.0 {
230			metrics.assignments_produced.observe(tranche as f64);
231		}
232	}
233
234	fn on_approval_coalesce(&self, num_coalesced: u32) {
235		if let Some(metrics) = &self.0 {
236			// Count how many candidates we covered with this coalesced approvals,
237			// so that the heat-map really gives a good understanding of the scales.
238			for _ in 0..num_coalesced {
239				metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
240			}
241		}
242	}
243
244	fn on_delayed_approval(&self, delayed_ticks: u64) {
245		if let Some(metrics) = &self.0 {
246			metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
247		}
248	}
249
250	fn on_approval_stale(&self) {
251		if let Some(metrics) = &self.0 {
252			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
253		}
254	}
255
256	fn on_approval_invalid(&self) {
257		if let Some(metrics) = &self.0 {
258			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
259		}
260	}
261
262	fn on_approval_unavailable(&self) {
263		if let Some(metrics) = &self.0 {
264			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
265		}
266	}
267
268	fn on_approval_error(&self) {
269		if let Some(metrics) = &self.0 {
270			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
271		}
272	}
273
274	fn on_approval_produced(&self) {
275		if let Some(metrics) = &self.0 {
276			metrics.approvals_produced_total.with_label_values(&["success"]).inc()
277		}
278	}
279
280	fn on_no_shows(&self, n: usize) {
281		if let Some(metrics) = &self.0 {
282			metrics.no_shows_total.inc_by(n as u64);
283		}
284	}
285
286	fn on_observed_no_shows(&self, n: usize) {
287		if let Some(metrics) = &self.0 {
288			metrics.observed_no_shows.inc_by(n as u64);
289		}
290	}
291
292	fn on_approved_by_one_third(&self) {
293		if let Some(metrics) = &self.0 {
294			metrics.approved_by_one_third.inc();
295		}
296	}
297
298	fn on_wakeup(&self) {
299		if let Some(metrics) = &self.0 {
300			metrics.wakeups_triggered_total.inc();
301		}
302	}
303
304	fn on_candidate_approved(&self, ticks: Tick) {
305		if let Some(metrics) = &self.0 {
306			metrics.candidate_approval_time_ticks.observe(ticks as f64);
307		}
308	}
309
310	fn on_block_approved(&self, ticks: Tick) {
311		if let Some(metrics) = &self.0 {
312			metrics.block_approval_time_ticks.observe(ticks as f64);
313		}
314	}
315
316	fn on_candidate_signatures_request(&self) {
317		if let Some(metrics) = &self.0 {
318			metrics.candidate_signatures_requests_total.inc();
319		}
320	}
321
322	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
323		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
324	}
325
326	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
327		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
328	}
329
330	fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
331		if let Some(metrics) = &self.0 {
332			metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
333		}
334	}
335
336	pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
337		if let Some(metrics) = &self.0 {
338			let stage_string = stage.to_string();
339			// We don't want to have too many metrics entries with this label to not put unncessary
340			// pressure on the metrics infrastructure, so we cap the stage at 10, which is
341			// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
342			// be more than enough.
343			metrics
344				.assignments_gathering_time_by_stage
345				.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
346				.observe(elapsed_as_millis as f64);
347		}
348	}
349}
350
351impl metrics::Metrics for Metrics {
352	fn try_register(
353		registry: &prometheus::Registry,
354	) -> std::result::Result<Self, prometheus::PrometheusError> {
355		let metrics = MetricsInner {
356			imported_candidates_total: prometheus::register(
357				prometheus::Counter::new(
358					"polkadot_parachain_imported_candidates_total",
359					"Number of candidates imported by the approval voting subsystem",
360				)?,
361				registry,
362			)?,
363			assignments_produced: prometheus::register(
364				prometheus::Histogram::with_opts(
365					prometheus::HistogramOpts::new(
366						"polkadot_parachain_assignments_produced",
367						"Assignments and tranches produced by the approval voting subsystem",
368					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
369				)?,
370				registry,
371			)?,
372			approvals_produced_total: prometheus::register(
373				prometheus::CounterVec::new(
374					prometheus::Opts::new(
375						"polkadot_parachain_approvals_produced_total",
376						"Number of approvals produced by the approval voting subsystem",
377					),
378					&["status"]
379				)?,
380				registry,
381			)?,
382			no_shows_total: prometheus::register(
383				prometheus::Counter::new(
384					"polkadot_parachain_approvals_no_shows_total",
385					"Number of assignments which became no-shows in the approval voting subsystem",
386				)?,
387				registry,
388			)?,
389			observed_no_shows: prometheus::register(
390				prometheus::Counter::new(
391					"polkadot_parachain_approvals_observed_no_shows_total",
392					"Number of observed no shows at any moment in time",
393				)?,
394				registry,
395			)?,
396			wakeups_triggered_total: prometheus::register(
397				prometheus::Counter::new(
398					"polkadot_parachain_approvals_wakeups_total",
399					"Number of times we woke up to process a candidate in the approval voting subsystem",
400				)?,
401				registry,
402			)?,
403			candidate_approval_time_ticks: prometheus::register(
404				prometheus::Histogram::with_opts(
405					prometheus::HistogramOpts::new(
406						"polkadot_parachain_approvals_candidate_approval_time_ticks",
407						"Number of ticks (500ms) to approve candidates.",
408					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
409				)?,
410				registry,
411			)?,
412			coalesced_approvals_buckets: prometheus::register(
413				prometheus::Histogram::with_opts(
414					prometheus::HistogramOpts::new(
415						"polkadot_parachain_approvals_coalesced_approvals_buckets",
416						"Number of coalesced approvals.",
417					).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
418				)?,
419				registry,
420			)?,
421			coalesced_approvals_delay: prometheus::register(
422				prometheus::Histogram::with_opts(
423					prometheus::HistogramOpts::new(
424						"polkadot_parachain_approvals_coalescing_delay",
425						"Number of ticks we delay the sending of a candidate approval",
426					).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
427				)?,
428				registry,
429			)?,
430			approved_by_one_third: prometheus::register(
431				prometheus::Counter::new(
432					"polkadot_parachain_approved_by_one_third",
433					"Number of candidates where more than one third had to vote ",
434				)?,
435				registry,
436			)?,
437			block_approval_time_ticks: prometheus::register(
438				prometheus::Histogram::with_opts(
439					prometheus::HistogramOpts::new(
440						"polkadot_parachain_approvals_blockapproval_time_ticks",
441						"Number of ticks (500ms) to approve blocks.",
442					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
443				)?,
444				registry,
445			)?,
446			time_db_transaction: prometheus::register(
447				prometheus::Histogram::with_opts(
448					prometheus::HistogramOpts::new(
449						"polkadot_parachain_time_approval_db_transaction",
450						"Time spent writing an approval db transaction.",
451					)
452				)?,
453				registry,
454			)?,
455			time_recover_and_approve: prometheus::register(
456				prometheus::Histogram::with_opts(
457					prometheus::HistogramOpts::new(
458						"polkadot_parachain_time_recover_and_approve",
459						"Time spent recovering and approving data in approval voting",
460					)
461				)?,
462				registry,
463			)?,
464			candidate_signatures_requests_total: prometheus::register(
465				prometheus::Counter::new(
466					"polkadot_parachain_approval_candidate_signatures_requests_total",
467					"Number of times signatures got requested by other subsystems",
468				)?,
469				registry,
470			)?,
471			unapproved_candidates_in_unfinalized_chain: prometheus::register(
472				prometheus::Gauge::new(
473					"polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
474					"Number of unapproved candidates in unfinalized chain",
475				)?,
476				registry,
477			)?,
478			assignments_gathering_time_by_stage: prometheus::register(
479				prometheus::HistogramVec::new(
480					prometheus::HistogramOpts::new(
481						"polkadot_parachain_assignments_gather_time_by_stage_ms",
482						"The time in ms it takes for each stage to gather enough assignments needed for approval",
483					)
484					.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
485					&["stage"],
486				)?,
487				registry,
488			)?,
489		};
490
491		Ok(Metrics(Some(metrics)))
492	}
493}
494
495impl ApprovalVotingSubsystem {
496	/// Create a new approval voting subsystem with the given keystore, config, and database.
497	pub fn with_config(
498		config: Config,
499		db: Arc<dyn Database>,
500		keystore: Arc<LocalKeystore>,
501		sync_oracle: Box<dyn SyncOracle + Send>,
502		metrics: Metrics,
503		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
504	) -> Self {
505		ApprovalVotingSubsystem::with_config_and_clock(
506			config,
507			db,
508			keystore,
509			sync_oracle,
510			metrics,
511			Arc::new(SystemClock {}),
512			spawner,
513			MAX_APPROVAL_RETRIES,
514			APPROVAL_CHECKING_TIMEOUT / 2,
515		)
516	}
517
518	/// Create a new approval voting subsystem with the given keystore, config, and database.
519	pub fn with_config_and_clock(
520		config: Config,
521		db: Arc<dyn Database>,
522		keystore: Arc<LocalKeystore>,
523		sync_oracle: Box<dyn SyncOracle + Send>,
524		metrics: Metrics,
525		clock: Arc<dyn Clock + Send + Sync>,
526		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
527		max_approval_retries: u32,
528		retry_backoff: Duration,
529	) -> Self {
530		ApprovalVotingSubsystem {
531			keystore,
532			slot_duration_millis: config.slot_duration_millis,
533			db,
534			db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
535			mode: Mode::Syncing(sync_oracle),
536			metrics,
537			clock,
538			spawner,
539			max_approval_retries,
540			retry_backoff,
541		}
542	}
543
544	/// Revert to the block corresponding to the specified `hash`.
545	/// The operation is not allowed for blocks older than the last finalized one.
546	pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
547		let config =
548			approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
549		let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
550		let mut overlay = OverlayedBackend::new(&backend);
551
552		ops::revert_to(&mut overlay, hash)?;
553
554		let ops = overlay.into_write_ops();
555		backend.write(ops)
556	}
557}
558
559// Checks and logs approval vote db state. It is perfectly normal to start with an
560// empty approval vote DB if we changed DB type or the node will sync from scratch.
561fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
562	let backend = DbBackend::new(db, config);
563	let all_blocks = backend.load_all_blocks()?;
564
565	if all_blocks.is_empty() {
566		gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
567	} else {
568		gum::debug!(
569			target: LOG_TARGET,
570			"Starting with {} blocks in approval vote DB.",
571			all_blocks.len()
572		);
573	}
574
575	Ok(())
576}
577
578#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
579impl<Context: Send> ApprovalVotingSubsystem {
580	fn start(self, mut ctx: Context) -> SpawnedSubsystem {
581		let backend = DbBackend::new(self.db.clone(), self.db_config);
582		let to_other_subsystems = ctx.sender().clone();
583		let to_approval_distr = ctx.sender().clone();
584
585		let future = run::<DbBackend, _, _, _>(
586			ctx,
587			to_other_subsystems,
588			to_approval_distr,
589			self,
590			Box::new(RealAssignmentCriteria),
591			backend,
592		)
593		.map_err(|e| SubsystemError::with_origin("approval-voting", e))
594		.boxed();
595
596		SpawnedSubsystem { name: "approval-voting-subsystem", future }
597	}
598}
599
600#[derive(Debug, Clone)]
601struct ApprovalVoteRequest {
602	validator_index: ValidatorIndex,
603	block_hash: Hash,
604}
605
606#[derive(Default)]
607struct Wakeups {
608	// Tick -> [(Relay Block, Candidate Hash)]
609	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
610	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
611	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
612}
613
614impl Wakeups {
615	// Returns the first tick there exist wakeups for, if any.
616	fn first(&self) -> Option<Tick> {
617		self.wakeups.keys().next().map(|t| *t)
618	}
619
620	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
621		self.block_numbers.entry(block_number).or_default().insert(block_hash);
622	}
623
624	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
625	// for these values. replaces any later wakeup.
626	fn schedule(
627		&mut self,
628		block_hash: Hash,
629		block_number: BlockNumber,
630		candidate_hash: CandidateHash,
631		tick: Tick,
632	) {
633		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
634			if prev <= &tick {
635				return;
636			}
637
638			// we are replacing previous wakeup with an earlier one.
639			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
640				if let Some(pos) =
641					entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
642				{
643					entry.get_mut().remove(pos);
644				}
645
646				if entry.get().is_empty() {
647					let _ = entry.remove_entry();
648				}
649			}
650		} else {
651			self.note_block(block_hash, block_number);
652		}
653
654		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
655		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
656	}
657
658	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
659		let after = self.block_numbers.split_off(&(finalized_number + 1));
660		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
661			.into_iter()
662			.flat_map(|(_number, hashes)| hashes)
663			.collect();
664
665		let mut pruned_wakeups = BTreeMap::new();
666		self.reverse_wakeups.retain(|(h, c_h), tick| {
667			let live = !pruned_blocks.contains(h);
668			if !live {
669				pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
670			}
671			live
672		});
673
674		for (tick, pruned) in pruned_wakeups {
675			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
676				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
677				if entry.get().is_empty() {
678					let _ = entry.remove();
679				}
680			}
681		}
682	}
683
684	// Get the wakeup for a particular block/candidate combo, if any.
685	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
686		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
687	}
688
689	// Returns the next wakeup. this future never returns if there are no wakeups.
690	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
691		match self.first() {
692			None => future::pending().await,
693			Some(tick) => {
694				clock.wait(tick).await;
695				match self.wakeups.entry(tick) {
696					BTMEntry::Vacant(_) => {
697						panic!("entry is known to exist since `first` was `Some`; qed")
698					},
699					BTMEntry::Occupied(mut entry) => {
700						let (hash, candidate_hash) = entry.get_mut().pop()
701							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
702
703						if entry.get().is_empty() {
704							let _ = entry.remove();
705						}
706
707						self.reverse_wakeups.remove(&(hash, candidate_hash));
708
709						(tick, hash, candidate_hash)
710					},
711				}
712			},
713		}
714	}
715}
716
717struct ApprovalStatus {
718	required_tranches: RequiredTranches,
719	tranche_now: DelayTranche,
720	block_tick: Tick,
721	last_no_shows: usize,
722	no_show_validators: Vec<ValidatorIndex>,
723}
724
725#[derive(Copy, Clone)]
726enum ApprovalOutcome {
727	Approved,
728	Failed,
729	TimedOut,
730}
731
732#[derive(Clone)]
733struct RetryApprovalInfo {
734	candidate: CandidateReceipt,
735	backing_group: GroupIndex,
736	core_index: Option<CoreIndex>,
737	session_index: SessionIndex,
738	attempts_remaining: u32,
739	backoff: Duration,
740}
741
742struct ApprovalState {
743	validator_index: ValidatorIndex,
744	candidate_hash: CandidateHash,
745	approval_outcome: ApprovalOutcome,
746	retry_info: Option<RetryApprovalInfo>,
747}
748
749impl ApprovalState {
750	fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
751		Self {
752			validator_index,
753			candidate_hash,
754			approval_outcome: ApprovalOutcome::Approved,
755			retry_info: None,
756		}
757	}
758	fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
759		Self {
760			validator_index,
761			candidate_hash,
762			approval_outcome: ApprovalOutcome::Failed,
763			retry_info: None,
764		}
765	}
766
767	fn failed_with_retry(
768		validator_index: ValidatorIndex,
769		candidate_hash: CandidateHash,
770		retry_info: Option<RetryApprovalInfo>,
771	) -> Self {
772		Self {
773			validator_index,
774			candidate_hash,
775			approval_outcome: ApprovalOutcome::Failed,
776			retry_info,
777		}
778	}
779}
780
781struct CurrentlyCheckingSet {
782	candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
783	currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
784}
785
786impl Default for CurrentlyCheckingSet {
787	fn default() -> Self {
788		Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
789	}
790}
791
792impl CurrentlyCheckingSet {
793	// This function will lazily launch approval voting work whenever the
794	// candidate is not already undergoing validation.
795	pub async fn insert_relay_block_hash(
796		&mut self,
797		candidate_hash: CandidateHash,
798		validator_index: ValidatorIndex,
799		relay_block: Hash,
800		launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
801	) -> SubsystemResult<()> {
802		match self.candidate_hash_map.entry(candidate_hash) {
803			HMEntry::Occupied(mut entry) => {
804				// validation already undergoing. just add the relay hash if unknown.
805				entry.get_mut().insert(relay_block);
806			},
807			HMEntry::Vacant(entry) => {
808				// validation not ongoing. launch work and time out the remote handle.
809				entry.insert(HashSet::new()).insert(relay_block);
810				let work = launch_work.await?;
811				self.currently_checking.push(Box::pin(async move {
812					match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
813						None => ApprovalState {
814							candidate_hash,
815							validator_index,
816							approval_outcome: ApprovalOutcome::TimedOut,
817							retry_info: None,
818						},
819						Some(approval_state) => approval_state,
820					}
821				}));
822			},
823		}
824
825		Ok(())
826	}
827
828	pub async fn next(
829		&mut self,
830		approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
831	) -> (HashSet<Hash>, ApprovalState) {
832		if !self.currently_checking.is_empty() {
833			if let Some(approval_state) = self.currently_checking.next().await {
834				let out = self
835					.candidate_hash_map
836					.remove(&approval_state.candidate_hash)
837					.unwrap_or_default();
838				approvals_cache
839					.insert(approval_state.candidate_hash, approval_state.approval_outcome);
840				return (out, approval_state);
841			}
842		}
843
844		future::pending().await
845	}
846}
847
848async fn get_extended_session_info_by_index<'a, Sender>(
849	runtime_info: &'a mut RuntimeInfo,
850	sender: &mut Sender,
851	block_hash: Hash,
852	session_index: SessionIndex,
853) -> Option<&'a ExtendedSessionInfo>
854where
855	Sender: SubsystemSender<RuntimeApiMessage>,
856{
857	match runtime_info.get_session_info_by_index(sender, block_hash, session_index).await {
858		Ok(extended_info) => Some(&extended_info),
859		Err(_) => {
860			gum::debug!(
861				target: LOG_TARGET,
862				session = session_index,
863				?block_hash,
864				"Can't obtain SessionInfo or ExecutorParams"
865			);
866			None
867		},
868	}
869}
870
871async fn get_session_info_by_index<'a, Sender>(
872	runtime_info: &'a mut RuntimeInfo,
873	sender: &mut Sender,
874	block_hash: Hash,
875	session_index: SessionIndex,
876) -> Option<&'a SessionInfo>
877where
878	Sender: SubsystemSender<RuntimeApiMessage>,
879{
880	get_extended_session_info_by_index(runtime_info, sender, block_hash, session_index)
881		.await
882		.map(|extended_info| &extended_info.session_info)
883}
884
885struct State {
886	keystore: Arc<LocalKeystore>,
887	slot_duration_millis: u64,
888	clock: Arc<dyn Clock + Send + Sync>,
889	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
890	// Per block, candidate records about how long we take until we gather enough
891	// assignments, this is relevant because it gives us a good idea about how many
892	// tranches we trigger and why.
893	per_block_assignments_gathering_times:
894		LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
895	no_show_stats: NoShowStats,
896}
897
898// Regularly dump the no-show stats at this block number frequency.
899const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
900// The maximum number of validators we record no-shows for, per candidate.
901pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
902
903// No show stats per validator and per parachain.
904// This is valuable information when we have to debug live network issue, because
905// it gives information if things are going wrong only for some validators or just
906// for some parachains.
907#[derive(Debug, Clone, PartialEq, Eq, Default)]
908struct NoShowStats {
909	per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
910	per_parachain_no_show: HashMap<u32, usize>,
911	last_dumped_block_number: BlockNumber,
912}
913
914impl NoShowStats {
915	// Print the no-show stats if NO_SHOW_DUMP_FREQUENCY blocks have passed since the last
916	// print.
917	fn maybe_print(&mut self, current_block_number: BlockNumber) {
918		if self.last_dumped_block_number > current_block_number ||
919			current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
920		{
921			return;
922		}
923		if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
924			return;
925		}
926
927		gum::debug!(
928			target: LOG_TARGET,
929			"Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
930			self.per_validator_no_show,
931			self.per_parachain_no_show,
932			self.last_dumped_block_number
933		);
934
935		self.last_dumped_block_number = current_block_number;
936
937		self.per_validator_no_show.clear();
938		self.per_parachain_no_show.clear();
939	}
940}
941
942#[derive(Debug, Clone, PartialEq, Eq)]
943struct AssignmentGatheringRecord {
944	// The stage we are in.
945	// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
946	// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
947	// no-shows.
948	stage: usize,
949	// The time we started the stage.
950	stage_start: Option<Instant>,
951}
952
953impl Default for AssignmentGatheringRecord {
954	fn default() -> Self {
955		AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
956	}
957}
958
959#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
960impl State {
961	// Compute the required tranches for approval for this block and candidate combo.
962	// Fails if there is no approval entry for the block under the candidate or no candidate entry
963	// under the block, or if the session is out of bounds.
964	async fn approval_status<Sender, 'a, 'b>(
965		&'a self,
966		sender: &mut Sender,
967		session_info_provider: &'a mut RuntimeInfo,
968		block_entry: &'a BlockEntry,
969		candidate_entry: &'b CandidateEntry,
970	) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
971	where
972		Sender: SubsystemSender<RuntimeApiMessage>,
973	{
974		let session_info = match get_session_info_by_index(
975			session_info_provider,
976			sender,
977			block_entry.parent_hash(),
978			block_entry.session(),
979		)
980		.await
981		{
982			Some(s) => s,
983			None => return None,
984		};
985		let block_hash = block_entry.block_hash();
986
987		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
988		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
989		let no_show_duration = slot_number_to_tick(
990			self.slot_duration_millis,
991			Slot::from(u64::from(session_info.no_show_slots)),
992		);
993
994		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
995			let TranchesToApproveResult {
996				required_tranches,
997				total_observed_no_shows,
998				no_show_validators,
999			} = approval_checking::tranches_to_approve(
1000				approval_entry,
1001				candidate_entry.approvals(),
1002				tranche_now,
1003				block_tick,
1004				no_show_duration,
1005				session_info.needed_approvals as _,
1006			);
1007
1008			let status = ApprovalStatus {
1009				required_tranches,
1010				block_tick,
1011				tranche_now,
1012				last_no_shows: total_observed_no_shows,
1013				no_show_validators,
1014			};
1015
1016			Some((approval_entry, status))
1017		} else {
1018			None
1019		}
1020	}
1021
1022	// Returns the approval voting params from the RuntimeApi.
1023	async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1024		&self,
1025		sender: &mut Sender,
1026		session_index: SessionIndex,
1027		block_hash: Hash,
1028	) -> Option<ApprovalVotingParams> {
1029		let (s_tx, s_rx) = oneshot::channel();
1030
1031		sender
1032			.send_message(RuntimeApiMessage::Request(
1033				block_hash,
1034				RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1035			))
1036			.await;
1037
1038		match s_rx.await {
1039			Ok(Ok(params)) => {
1040				gum::trace!(
1041					target: LOG_TARGET,
1042					approval_voting_params = ?params,
1043					session = ?session_index,
1044					"Using the following subsystem params"
1045				);
1046				Some(params)
1047			},
1048			Ok(Err(err)) => {
1049				gum::debug!(
1050					target: LOG_TARGET,
1051					?err,
1052					"Could not request approval voting params from runtime"
1053				);
1054				None
1055			},
1056			Err(err) => {
1057				gum::debug!(
1058					target: LOG_TARGET,
1059					?err,
1060					"Could not request approval voting params from runtime"
1061				);
1062				None
1063			},
1064		}
1065	}
1066
1067	fn mark_begining_of_gathering_assignments(
1068		&mut self,
1069		block_number: BlockNumber,
1070		block_hash: Hash,
1071		candidate: CandidateHash,
1072	) {
1073		if let Some(record) = self
1074			.per_block_assignments_gathering_times
1075			.get_or_insert(block_number, HashMap::new)
1076			.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1077		{
1078			if record.stage_start.is_none() {
1079				record.stage += 1;
1080				gum::debug!(
1081					target: LOG_TARGET,
1082					stage = ?record.stage,
1083					?block_hash,
1084					?candidate,
1085					"Started a new assignment gathering stage",
1086				);
1087				record.stage_start = Some(Instant::now());
1088			}
1089		}
1090	}
1091
1092	fn mark_gathered_enough_assignments(
1093		&mut self,
1094		block_number: BlockNumber,
1095		block_hash: Hash,
1096		candidate: CandidateHash,
1097	) -> AssignmentGatheringRecord {
1098		let record = self
1099			.per_block_assignments_gathering_times
1100			.get(&block_number)
1101			.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1102		let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1103		AssignmentGatheringRecord {
1104			stage,
1105			stage_start: record.and_then(|record| record.stage_start.take()),
1106		}
1107	}
1108
1109	fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1110		while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1111		{
1112			if *block_number < remove_lower_than {
1113				self.per_block_assignments_gathering_times.pop_oldest();
1114			} else {
1115				break;
1116			}
1117		}
1118	}
1119
1120	fn observe_assignment_gathering_status(
1121		&mut self,
1122		metrics: &Metrics,
1123		required_tranches: &RequiredTranches,
1124		block_hash: Hash,
1125		block_number: BlockNumber,
1126		candidate_hash: CandidateHash,
1127	) {
1128		match required_tranches {
1129			RequiredTranches::All | RequiredTranches::Pending { .. } => {
1130				self.mark_begining_of_gathering_assignments(
1131					block_number,
1132					block_hash,
1133					candidate_hash,
1134				);
1135			},
1136			RequiredTranches::Exact { .. } => {
1137				let time_to_gather =
1138					self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1139				if let Some(gathering_started) = time_to_gather.stage_start {
1140					if gathering_started.elapsed().as_millis() > 6000 {
1141						gum::trace!(
1142							target: LOG_TARGET,
1143							?block_hash,
1144							?candidate_hash,
1145							"Long assignment gathering time",
1146						);
1147					}
1148					metrics.observe_assignment_gathering_time(
1149						time_to_gather.stage,
1150						gathering_started.elapsed().as_millis() as usize,
1151					)
1152				}
1153			},
1154		}
1155	}
1156
1157	fn record_no_shows(
1158		&mut self,
1159		session_index: SessionIndex,
1160		para_id: u32,
1161		no_show_validators: &Vec<ValidatorIndex>,
1162	) {
1163		if !no_show_validators.is_empty() {
1164			*self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1165		}
1166		for validator_index in no_show_validators {
1167			*self
1168				.no_show_stats
1169				.per_validator_no_show
1170				.entry(session_index)
1171				.or_default()
1172				.entry(*validator_index)
1173				.or_default() += 1;
1174		}
1175	}
1176}
1177
1178#[derive(Debug, Clone)]
1179enum Action {
1180	ScheduleWakeup {
1181		block_hash: Hash,
1182		block_number: BlockNumber,
1183		candidate_hash: CandidateHash,
1184		tick: Tick,
1185	},
1186	LaunchApproval {
1187		claimed_candidate_indices: CandidateBitfield,
1188		candidate_hash: CandidateHash,
1189		indirect_cert: IndirectAssignmentCertV2,
1190		assignment_tranche: DelayTranche,
1191		relay_block_hash: Hash,
1192		session: SessionIndex,
1193		candidate: CandidateReceipt,
1194		backing_group: GroupIndex,
1195		distribute_assignment: bool,
1196		core_index: Option<CoreIndex>,
1197	},
1198	NoteApprovedInChainSelection(Hash),
1199	IssueApproval(CandidateHash, ApprovalVoteRequest),
1200	BecomeActive,
1201	Conclude,
1202}
1203
1204/// Trait for providing approval voting subsystem with work.
1205#[async_trait::async_trait]
1206pub trait ApprovalVotingWorkProvider {
1207	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1208}
1209
1210#[async_trait::async_trait]
1211#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1212impl<Context> ApprovalVotingWorkProvider for Context {
1213	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1214		self.recv().await
1215	}
1216}
1217
1218#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1219async fn run<
1220	B,
1221	WorkProvider: ApprovalVotingWorkProvider,
1222	Sender: SubsystemSender<ChainApiMessage>
1223		+ SubsystemSender<RuntimeApiMessage>
1224		+ SubsystemSender<ChainSelectionMessage>
1225		+ SubsystemSender<AvailabilityRecoveryMessage>
1226		+ SubsystemSender<DisputeCoordinatorMessage>
1227		+ SubsystemSender<CandidateValidationMessage>
1228		+ Clone,
1229	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1230>(
1231	mut work_provider: WorkProvider,
1232	mut to_other_subsystems: Sender,
1233	mut to_approval_distr: ADSender,
1234	mut subsystem: ApprovalVotingSubsystem,
1235	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1236	mut backend: B,
1237) -> SubsystemResult<()>
1238where
1239	B: Backend,
1240{
1241	if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1242		gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1243	}
1244
1245	let mut state = State {
1246		keystore: subsystem.keystore,
1247		slot_duration_millis: subsystem.slot_duration_millis,
1248		clock: subsystem.clock,
1249		assignment_criteria,
1250		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1251			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1252		)),
1253		no_show_stats: NoShowStats::default(),
1254	};
1255
1256	let mut last_finalized_height: Option<BlockNumber> = {
1257		let (tx, rx) = oneshot::channel();
1258		to_other_subsystems
1259			.send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1260			.await;
1261		match rx.await? {
1262			Ok(number) => Some(number),
1263			Err(err) => {
1264				gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1265				None
1266			},
1267		}
1268	};
1269
1270	// `None` on start-up. Gets initialized/updated on leaf update
1271	let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1272		keystore: None,
1273		session_cache_lru_size: DISPUTE_WINDOW.get(),
1274	});
1275
1276	let mut wakeups = Wakeups::default();
1277	let mut currently_checking_set = CurrentlyCheckingSet::default();
1278	let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1279	let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1280
1281	loop {
1282		let mut overlayed_db = OverlayedBackend::new(&backend);
1283		let actions = futures::select! {
1284			(_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1285				subsystem.metrics.on_wakeup();
1286				process_wakeup(
1287					&mut to_other_subsystems,
1288					&mut state,
1289					&mut overlayed_db,
1290					&mut session_info_provider,
1291					woken_block,
1292					woken_candidate,
1293					&subsystem.metrics,
1294					&wakeups,
1295				).await?
1296			}
1297			next_msg = work_provider.recv().fuse() => {
1298				let mut actions = handle_from_overseer(
1299					&mut to_other_subsystems,
1300					&mut to_approval_distr,
1301					&subsystem.spawner,
1302					&mut state,
1303					&mut overlayed_db,
1304					&mut session_info_provider,
1305					&subsystem.metrics,
1306					next_msg?,
1307					&mut last_finalized_height,
1308					&mut wakeups,
1309				).await?;
1310
1311				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1312					if !oracle.is_major_syncing() {
1313						// note that we're active before processing other actions.
1314						actions.insert(0, Action::BecomeActive)
1315					}
1316				}
1317
1318				actions
1319			}
1320			approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1321				let mut actions = Vec::new();
1322				let (
1323					relay_block_hashes,
1324					ApprovalState {
1325						validator_index,
1326						candidate_hash,
1327						approval_outcome,
1328						retry_info,
1329					}
1330				) = approval_state;
1331
1332				if matches!(approval_outcome, ApprovalOutcome::Approved) {
1333					let mut approvals: Vec<Action> = relay_block_hashes
1334						.iter()
1335						.map(|block_hash|
1336							Action::IssueApproval(
1337								candidate_hash,
1338								ApprovalVoteRequest {
1339									validator_index,
1340									block_hash: *block_hash,
1341								},
1342							)
1343						)
1344						.collect();
1345					actions.append(&mut approvals);
1346				}
1347
1348				if let Some(retry_info) = retry_info {
1349					for block_hash in relay_block_hashes {
1350						if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1351							let sender = to_other_subsystems.clone();
1352							let spawn_handle = subsystem.spawner.clone();
1353							let metrics = subsystem.metrics.clone();
1354							let retry_info = retry_info.clone();
1355							let candidate = retry_info.candidate.clone();
1356
1357							currently_checking_set
1358								.insert_relay_block_hash(
1359									candidate_hash,
1360									validator_index,
1361									block_hash,
1362									async move {
1363										launch_approval(
1364											sender,
1365											spawn_handle,
1366											metrics,
1367											retry_info.session_index,
1368											candidate,
1369											validator_index,
1370											block_hash,
1371											retry_info.backing_group,
1372											retry_info.core_index,
1373											retry_info,
1374										)
1375										.await
1376									},
1377								)
1378								.await?;
1379						}
1380					}
1381				}
1382
1383				actions
1384			},
1385			(block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1386				gum::debug!(
1387					target: LOG_TARGET,
1388					?block_hash,
1389					?validator_index,
1390					"Sign approval for multiple candidates",
1391				);
1392
1393				match maybe_create_signature(
1394					&mut overlayed_db,
1395					&mut session_info_provider,
1396					&state,
1397					&mut to_other_subsystems,
1398					&mut to_approval_distr,
1399					block_hash,
1400					validator_index,
1401					&subsystem.metrics,
1402				).await {
1403					Ok(Some(next_wakeup)) => {
1404						delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1405					},
1406					Ok(None) => {}
1407					Err(err) => {
1408						gum::error!(
1409							target: LOG_TARGET,
1410							?err,
1411							"Failed to create signature",
1412						);
1413					}
1414				}
1415				vec![]
1416			}
1417		};
1418
1419		if handle_actions(
1420			&mut to_other_subsystems,
1421			&mut to_approval_distr,
1422			&subsystem.spawner,
1423			&mut state,
1424			&mut overlayed_db,
1425			&mut session_info_provider,
1426			&subsystem.metrics,
1427			&mut wakeups,
1428			&mut currently_checking_set,
1429			&mut delayed_approvals_timers,
1430			&mut approvals_cache,
1431			&mut subsystem.mode,
1432			actions,
1433			subsystem.max_approval_retries,
1434			subsystem.retry_backoff,
1435		)
1436		.await?
1437		{
1438			break;
1439		}
1440
1441		if !overlayed_db.is_empty() {
1442			let _timer = subsystem.metrics.time_db_transaction();
1443			let ops = overlayed_db.into_write_ops();
1444			backend.write(ops)?;
1445		}
1446	}
1447
1448	Ok(())
1449}
1450
1451// Starts a worker thread that runs the approval voting subsystem.
1452pub async fn start_approval_worker<
1453	WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1454	Sender: SubsystemSender<ChainApiMessage>
1455		+ SubsystemSender<RuntimeApiMessage>
1456		+ SubsystemSender<ChainSelectionMessage>
1457		+ SubsystemSender<AvailabilityRecoveryMessage>
1458		+ SubsystemSender<DisputeCoordinatorMessage>
1459		+ SubsystemSender<CandidateValidationMessage>
1460		+ Clone,
1461	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1462>(
1463	work_provider: WorkProvider,
1464	to_other_subsystems: Sender,
1465	to_approval_distr: ADSender,
1466	config: Config,
1467	db: Arc<dyn Database>,
1468	keystore: Arc<LocalKeystore>,
1469	sync_oracle: Box<dyn SyncOracle + Send>,
1470	metrics: Metrics,
1471	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1472	task_name: &'static str,
1473	group_name: &'static str,
1474	clock: Arc<dyn Clock + Send + Sync>,
1475) -> SubsystemResult<()> {
1476	let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1477		config,
1478		db.clone(),
1479		keystore,
1480		sync_oracle,
1481		metrics,
1482		clock,
1483		spawner,
1484		MAX_APPROVAL_RETRIES,
1485		APPROVAL_CHECKING_TIMEOUT / 2,
1486	);
1487	let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1488	let spawner = approval_voting.spawner.clone();
1489	spawner.spawn_blocking(
1490		task_name,
1491		Some(group_name),
1492		Box::pin(async move {
1493			if let Err(err) = run(
1494				work_provider,
1495				to_other_subsystems,
1496				to_approval_distr,
1497				approval_voting,
1498				Box::new(RealAssignmentCriteria),
1499				backend,
1500			)
1501			.await
1502			{
1503				gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1504			};
1505		}),
1506	);
1507	Ok(())
1508}
1509
1510// Handle actions is a function that accepts a set of instructions
1511// and subsequently updates the underlying approvals_db in accordance
1512// with the linear set of instructions passed in. Therefore, actions
1513// must be processed in series to ensure that earlier actions are not
1514// negated/corrupted by later actions being executed out-of-order.
1515//
1516// However, certain Actions can cause additional actions to need to be
1517// processed by this function. In order to preserve linearity, we would
1518// need to handle these newly generated actions before we finalize
1519// completing additional actions in the submitted sequence of actions.
1520//
1521// Since recursive async functions are not stable yet, we are
1522// forced to modify the actions iterator on the fly whenever a new set
1523// of actions are generated by handling a single action.
1524//
1525// This particular problem statement is specified in issue 3311:
1526// 	https://github.com/paritytech/polkadot/issues/3311
1527//
1528// returns `true` if any of the actions was a `Conclude` command.
1529#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1530async fn handle_actions<
1531	Sender: SubsystemSender<ChainApiMessage>
1532		+ SubsystemSender<RuntimeApiMessage>
1533		+ SubsystemSender<ChainSelectionMessage>
1534		+ SubsystemSender<AvailabilityRecoveryMessage>
1535		+ SubsystemSender<DisputeCoordinatorMessage>
1536		+ SubsystemSender<CandidateValidationMessage>
1537		+ Clone,
1538	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1539>(
1540	sender: &mut Sender,
1541	approval_voting_sender: &mut ADSender,
1542	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1543	state: &mut State,
1544	overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1545	session_info_provider: &mut RuntimeInfo,
1546	metrics: &Metrics,
1547	wakeups: &mut Wakeups,
1548	currently_checking_set: &mut CurrentlyCheckingSet,
1549	delayed_approvals_timers: &mut DelayedApprovalTimer,
1550	approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1551	mode: &mut Mode,
1552	actions: Vec<Action>,
1553	max_approval_retries: u32,
1554	retry_backoff: Duration,
1555) -> SubsystemResult<bool> {
1556	let mut conclude = false;
1557	let mut actions_iter = actions.into_iter();
1558	while let Some(action) = actions_iter.next() {
1559		match action {
1560			Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1561				wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1562			},
1563			Action::IssueApproval(candidate_hash, approval_request) => {
1564				// Note that the IssueApproval action will create additional
1565				// actions that will need to all be processed before we can
1566				// handle the next action in the set passed to the ambient
1567				// function.
1568				//
1569				// In order to achieve this, we append the existing iterator
1570				// to the end of the iterator made up of these newly generated
1571				// actions.
1572				//
1573				// Note that chaining these iterators is O(n) as we must consume
1574				// the prior iterator.
1575				let next_actions: Vec<Action> = issue_approval(
1576					sender,
1577					approval_voting_sender,
1578					state,
1579					overlayed_db,
1580					session_info_provider,
1581					metrics,
1582					candidate_hash,
1583					delayed_approvals_timers,
1584					approval_request,
1585					&wakeups,
1586				)
1587				.await?
1588				.into_iter()
1589				.map(|v| v.clone())
1590				.chain(actions_iter)
1591				.collect();
1592
1593				actions_iter = next_actions.into_iter();
1594			},
1595			Action::LaunchApproval {
1596				claimed_candidate_indices,
1597				candidate_hash,
1598				indirect_cert,
1599				assignment_tranche,
1600				relay_block_hash,
1601				session,
1602				candidate,
1603				backing_group,
1604				distribute_assignment,
1605				core_index,
1606			} => {
1607				// Don't launch approval work if the node is syncing.
1608				if let Mode::Syncing(_) = *mode {
1609					continue;
1610				}
1611
1612				metrics.on_assignment_produced(assignment_tranche);
1613				let block_hash = indirect_cert.block_hash;
1614				let validator_index = indirect_cert.validator;
1615
1616				if distribute_assignment {
1617					approval_voting_sender.send_unbounded_message(
1618						ApprovalDistributionMessage::DistributeAssignment(
1619							indirect_cert,
1620							claimed_candidate_indices,
1621						),
1622					);
1623				}
1624
1625				match approvals_cache.get(&candidate_hash) {
1626					Some(ApprovalOutcome::Approved) => {
1627						let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1628							candidate_hash,
1629							ApprovalVoteRequest { validator_index, block_hash },
1630						))
1631						.map(|v| v.clone())
1632						.chain(actions_iter)
1633						.collect();
1634						actions_iter = new_actions.into_iter();
1635					},
1636					None => {
1637						let sender = sender.clone();
1638						let spawn_handle = spawn_handle.clone();
1639
1640						let retry = RetryApprovalInfo {
1641							candidate: candidate.clone(),
1642							backing_group,
1643							core_index,
1644							session_index: session,
1645							attempts_remaining: max_approval_retries,
1646							backoff: retry_backoff,
1647						};
1648
1649						currently_checking_set
1650							.insert_relay_block_hash(
1651								candidate_hash,
1652								validator_index,
1653								relay_block_hash,
1654								async move {
1655									launch_approval(
1656										sender,
1657										spawn_handle,
1658										metrics.clone(),
1659										session,
1660										candidate,
1661										validator_index,
1662										block_hash,
1663										backing_group,
1664										core_index,
1665										retry,
1666									)
1667									.await
1668								},
1669							)
1670							.await?;
1671					},
1672					Some(_) => {},
1673				}
1674			},
1675			Action::NoteApprovedInChainSelection(block_hash) => {
1676				sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1677			},
1678			Action::BecomeActive => {
1679				*mode = Mode::Active;
1680
1681				let (messages, next_actions) = distribution_messages_for_activation(
1682					overlayed_db,
1683					state,
1684					delayed_approvals_timers,
1685				)
1686				.await?;
1687				for message in messages.into_iter() {
1688					approval_voting_sender.send_unbounded_message(message);
1689				}
1690				let next_actions: Vec<Action> =
1691					next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1692
1693				actions_iter = next_actions.into_iter();
1694			},
1695			Action::Conclude => {
1696				conclude = true;
1697			},
1698		}
1699	}
1700
1701	Ok(conclude)
1702}
1703
1704fn cores_to_candidate_indices(
1705	core_indices: &CoreBitfield,
1706	block_entry: &BlockEntry,
1707) -> Result<CandidateBitfield, BitfieldError> {
1708	let mut candidate_indices = Vec::new();
1709
1710	// Map from core index to candidate index.
1711	for claimed_core_index in core_indices.iter_ones() {
1712		if let Some(candidate_index) = block_entry
1713			.candidates()
1714			.iter()
1715			.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1716		{
1717			candidate_indices.push(candidate_index as _);
1718		}
1719	}
1720
1721	CandidateBitfield::try_from(candidate_indices)
1722}
1723
1724// Returns the claimed core bitfield from the assignment cert and the core index
1725// from the block entry.
1726fn get_core_indices_on_startup(
1727	assignment: &AssignmentCertKindV2,
1728	block_entry_core_index: CoreIndex,
1729) -> CoreBitfield {
1730	match &assignment {
1731		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1732		AssignmentCertKindV2::RelayVRFModulo { sample: _ } => {
1733			CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed")
1734		},
1735		AssignmentCertKindV2::RelayVRFDelay { core_index } => {
1736			CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1737		},
1738	}
1739}
1740
1741// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
1742// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
1743// in the block entry which indicates a bug or corrupted storage.
1744fn get_assignment_core_indices(
1745	assignment: &AssignmentCertKindV2,
1746	candidate_hash: &CandidateHash,
1747	block_entry: &BlockEntry,
1748) -> Option<CoreBitfield> {
1749	match &assignment {
1750		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => {
1751			Some(core_bitfield.clone())
1752		},
1753		AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1754			.candidates()
1755			.iter()
1756			.find(|(_core_index, h)| candidate_hash == h)
1757			.map(|(core_index, _candidate_hash)| {
1758				CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1759			}),
1760		AssignmentCertKindV2::RelayVRFDelay { core_index } => {
1761			Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"))
1762		},
1763	}
1764}
1765
1766async fn distribution_messages_for_activation(
1767	db: &OverlayedBackend<'_, impl Backend>,
1768	state: &State,
1769	delayed_approvals_timers: &mut DelayedApprovalTimer,
1770) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1771	let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1772
1773	let mut approval_meta = Vec::with_capacity(all_blocks.len());
1774	let mut messages = Vec::new();
1775	let mut approvals = Vec::new();
1776	let mut actions = Vec::new();
1777
1778	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
1779
1780	for block_hash in all_blocks {
1781		let block_entry = match db.load_block_entry(&block_hash)? {
1782			Some(b) => b,
1783			None => {
1784				gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1785
1786				continue;
1787			},
1788		};
1789
1790		approval_meta.push(BlockApprovalMeta {
1791			hash: block_hash,
1792			number: block_entry.block_number(),
1793			parent_hash: block_entry.parent_hash(),
1794			candidates: block_entry
1795				.candidates()
1796				.iter()
1797				.map(|(core_index, c_hash)| {
1798					let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1799					let group_index = candidate
1800						.and_then(|entry| {
1801							entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1802						})
1803						.unwrap_or_else(|| {
1804							gum::warn!(
1805								target: LOG_TARGET,
1806								?block_hash,
1807								?c_hash,
1808								"Missing candidate entry or approval entry",
1809							);
1810							GroupIndex::default()
1811						});
1812					(*c_hash, *core_index, group_index)
1813				})
1814				.collect(),
1815			slot: block_entry.slot(),
1816			session: block_entry.session(),
1817			vrf_story: block_entry.relay_vrf_story(),
1818		});
1819		let mut signatures_queued = HashSet::new();
1820		for (core_index, candidate_hash) in block_entry.candidates() {
1821			let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1822				Some(c) => c,
1823				None => {
1824					gum::warn!(
1825						target: LOG_TARGET,
1826						?block_hash,
1827						?candidate_hash,
1828						"Missing candidate entry",
1829					);
1830
1831					continue;
1832				},
1833			};
1834
1835			match candidate_entry.approval_entry(&block_hash) {
1836				Some(approval_entry) => {
1837					match approval_entry.local_statements() {
1838						(None, None) => {
1839							if approval_entry
1840								.our_assignment()
1841								.map(|assignment| !assignment.triggered())
1842								.unwrap_or(false)
1843							{
1844								actions.push(Action::ScheduleWakeup {
1845									block_hash,
1846									block_number: block_entry.block_number(),
1847									candidate_hash: *candidate_hash,
1848									tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1849								})
1850							}
1851						},
1852						(None, Some(_)) => {}, // second is impossible case.
1853						(Some(assignment), None) => {
1854							let claimed_core_indices =
1855								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1856
1857							if block_entry.has_candidates_pending_signature() {
1858								delayed_approvals_timers.maybe_arm_timer(
1859									state.clock.tick_now(),
1860									state.clock.as_ref(),
1861									block_entry.block_hash(),
1862									assignment.validator_index(),
1863								)
1864							}
1865
1866							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1867								Ok(bitfield) => {
1868									gum::debug!(
1869										target: LOG_TARGET,
1870										candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1871										?block_hash,
1872										"Discovered, triggered assignment, not approved yet",
1873									);
1874
1875									let indirect_cert = IndirectAssignmentCertV2 {
1876										block_hash,
1877										validator: assignment.validator_index(),
1878										cert: assignment.cert().clone(),
1879									};
1880									messages.push(
1881										ApprovalDistributionMessage::DistributeAssignment(
1882											indirect_cert.clone(),
1883											bitfield.clone(),
1884										),
1885									);
1886
1887									if !block_entry.candidate_is_pending_signature(*candidate_hash)
1888									{
1889										actions.push(Action::LaunchApproval {
1890											claimed_candidate_indices: bitfield,
1891											candidate_hash: candidate_entry
1892												.candidate_receipt()
1893												.hash(),
1894											indirect_cert,
1895											assignment_tranche: assignment.tranche(),
1896											relay_block_hash: block_hash,
1897											session: block_entry.session(),
1898											candidate: candidate_entry.candidate_receipt().clone(),
1899											backing_group: approval_entry.backing_group(),
1900											distribute_assignment: false,
1901											core_index: Some(*core_index),
1902										});
1903									}
1904								},
1905								Err(err) => {
1906									// Should never happen. If we fail here it means the
1907									// assignment is null (no cores claimed).
1908									gum::warn!(
1909										target: LOG_TARGET,
1910										?block_hash,
1911										?candidate_hash,
1912										?err,
1913										"Failed to create assignment bitfield",
1914									);
1915								},
1916							}
1917						},
1918						(Some(assignment), Some(approval_sig)) => {
1919							let claimed_core_indices =
1920								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1921							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1922								Ok(bitfield) => messages.push(
1923									ApprovalDistributionMessage::DistributeAssignment(
1924										IndirectAssignmentCertV2 {
1925											block_hash,
1926											validator: assignment.validator_index(),
1927											cert: assignment.cert().clone(),
1928										},
1929										bitfield,
1930									),
1931								),
1932								Err(err) => {
1933									gum::warn!(
1934										target: LOG_TARGET,
1935										?block_hash,
1936										?candidate_hash,
1937										?err,
1938										"Failed to create assignment bitfield",
1939									);
1940									// If we didn't send assignment, we don't send approval.
1941									continue;
1942								},
1943							}
1944							if signatures_queued
1945								.insert(approval_sig.signed_candidates_indices.clone())
1946							{
1947								approvals.push(ApprovalDistributionMessage::DistributeApproval(
1948									IndirectSignedApprovalVoteV2 {
1949										block_hash,
1950										candidate_indices: approval_sig.signed_candidates_indices,
1951										validator: assignment.validator_index(),
1952										signature: approval_sig.signature,
1953									},
1954								))
1955							};
1956						},
1957					}
1958				},
1959				None => {
1960					gum::warn!(
1961						target: LOG_TARGET,
1962						?block_hash,
1963						?candidate_hash,
1964						"Missing approval entry",
1965					);
1966				},
1967			}
1968		}
1969	}
1970
1971	messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
1972	// Approvals are appended at the end, to make sure all assignments are sent
1973	// before the approvals, otherwise if they arrive ahead in approval-distribution
1974	// they will be ignored.
1975	messages.extend(approvals.into_iter());
1976	Ok((messages, actions))
1977}
1978
1979// Handle an incoming signal from the overseer. Returns true if execution should conclude.
1980async fn handle_from_overseer<
1981	Sender: SubsystemSender<ChainApiMessage>
1982		+ SubsystemSender<RuntimeApiMessage>
1983		+ SubsystemSender<ChainSelectionMessage>
1984		+ Clone,
1985	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1986>(
1987	sender: &mut Sender,
1988	approval_voting_sender: &mut ADSender,
1989	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1990	state: &mut State,
1991	db: &mut OverlayedBackend<'_, impl Backend>,
1992	session_info_provider: &mut RuntimeInfo,
1993	metrics: &Metrics,
1994	x: FromOrchestra<ApprovalVotingMessage>,
1995	last_finalized_height: &mut Option<BlockNumber>,
1996	wakeups: &mut Wakeups,
1997) -> SubsystemResult<Vec<Action>> {
1998	let actions = match x {
1999		FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2000			let mut actions = Vec::new();
2001			if let Some(activated) = update.activated {
2002				let head = activated.hash;
2003				match import::handle_new_head(
2004					sender,
2005					approval_voting_sender,
2006					state,
2007					db,
2008					session_info_provider,
2009					head,
2010					last_finalized_height,
2011				)
2012				.await
2013				{
2014					Err(e) => return Err(SubsystemError::with_origin("db", e)),
2015					Ok(block_imported_candidates) => {
2016						// Schedule wakeups for all imported candidates.
2017						for block_batch in block_imported_candidates {
2018							gum::debug!(
2019								target: LOG_TARGET,
2020								block_number = ?block_batch.block_number,
2021								block_hash = ?block_batch.block_hash,
2022								num_candidates = block_batch.imported_candidates.len(),
2023								"Imported new block.",
2024							);
2025
2026							state.no_show_stats.maybe_print(block_batch.block_number);
2027
2028							for (c_hash, c_entry) in block_batch.imported_candidates {
2029								metrics.on_candidate_imported();
2030
2031								let our_tranche = c_entry
2032									.approval_entry(&block_batch.block_hash)
2033									.and_then(|a| a.our_assignment().map(|a| a.tranche()));
2034
2035								if let Some(our_tranche) = our_tranche {
2036									let tick = our_tranche as Tick + block_batch.block_tick;
2037									gum::trace!(
2038										target: LOG_TARGET,
2039										tranche = our_tranche,
2040										candidate_hash = ?c_hash,
2041										block_hash = ?block_batch.block_hash,
2042										block_tick = block_batch.block_tick,
2043										"Scheduling first wakeup.",
2044									);
2045
2046									// Our first wakeup will just be the tranche of our assignment,
2047									// if any. This will likely be superseded by incoming
2048									// assignments and approvals which trigger rescheduling.
2049									actions.push(Action::ScheduleWakeup {
2050										block_hash: block_batch.block_hash,
2051										block_number: block_batch.block_number,
2052										candidate_hash: c_hash,
2053										tick,
2054									});
2055								}
2056							}
2057						}
2058					},
2059				}
2060			}
2061
2062			actions
2063		},
2064		FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2065			gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2066			*last_finalized_height = Some(block_number);
2067
2068			crate::ops::canonicalize(db, block_number, block_hash)
2069				.map_err(|e| SubsystemError::with_origin("db", e))?;
2070
2071			// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2072			// accordingly.
2073			wakeups.prune_finalized_wakeups(block_number);
2074			state.cleanup_assignments_gathering_timestamp(block_number);
2075
2076			// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2077			// accordingly. let hash_set =
2078			// wakeups.block_numbers.values().flatten().collect::<HashSet<_>>(); state.spans.
2079			// retain(|hash, _| hash_set.contains(hash));
2080
2081			Vec::new()
2082		},
2083		FromOrchestra::Signal(OverseerSignal::Conclude) => {
2084			vec![Action::Conclude]
2085		},
2086		FromOrchestra::Communication { msg } => match msg {
2087			ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2088				let (check_outcome, actions) =
2089					import_assignment(sender, state, db, session_info_provider, checked_assignment)
2090						.await?;
2091				// approval-distribution makes sure this assignment is valid and expected,
2092				// so this import should never fail, if it does it might mean one of two things,
2093				// there is a bug in the code or the two subsystems got out of sync.
2094				if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2095					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2096				}
2097				let _ = tx.map(|tx| tx.send(check_outcome));
2098				actions
2099			},
2100			ApprovalVotingMessage::ImportApproval(a, tx) => {
2101				let result =
2102					import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2103						.await?;
2104				// approval-distribution makes sure this vote is valid and expected,
2105				// so this import should never fail, if it does it might mean one of two things,
2106				// there is a bug in the code or the two subsystems got out of sync.
2107				if let ApprovalCheckResult::Bad(ref err) = result.1 {
2108					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2109				}
2110				let _ = tx.map(|tx| tx.send(result.1));
2111
2112				result.0
2113			},
2114			ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2115				match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2116					.await
2117				{
2118					Ok(v) => {
2119						let _ = res.send(v);
2120					},
2121					Err(e) => {
2122						let _ = res.send(None);
2123						return Err(e);
2124					},
2125				}
2126
2127				Vec::new()
2128			},
2129			ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2130				metrics.on_candidate_signatures_request();
2131				get_approval_signatures_for_candidate(
2132					approval_voting_sender.clone(),
2133					spawn_handle,
2134					db,
2135					candidate_hash,
2136					tx,
2137				)
2138				.await?;
2139				Vec::new()
2140			},
2141		},
2142	};
2143
2144	Ok(actions)
2145}
2146
2147/// Retrieve approval signatures.
2148///
2149/// This involves an unbounded message send to approval-distribution, the caller has to ensure that
2150/// calls to this function are infrequent and bounded.
2151#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2152async fn get_approval_signatures_for_candidate<
2153	Sender: SubsystemSender<ApprovalDistributionMessage>,
2154>(
2155	mut sender: Sender,
2156	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2157	db: &OverlayedBackend<'_, impl Backend>,
2158	candidate_hash: CandidateHash,
2159	tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2160) -> SubsystemResult<()> {
2161	let send_votes = |votes| {
2162		if let Err(_) = tx.send(votes) {
2163			gum::debug!(
2164				target: LOG_TARGET,
2165				"Sending approval signatures back failed, as receiver got closed."
2166			);
2167		}
2168	};
2169	let entry = match db.load_candidate_entry(&candidate_hash)? {
2170		None => {
2171			send_votes(HashMap::new());
2172			gum::debug!(
2173				target: LOG_TARGET,
2174				?candidate_hash,
2175				"Sent back empty votes because the candidate was not found in db."
2176			);
2177			return Ok(());
2178		},
2179		Some(e) => e,
2180	};
2181
2182	let relay_hashes = entry.block_assignments.keys();
2183
2184	let mut candidate_indices = HashSet::new();
2185	let mut candidate_indices_to_candidate_hashes: HashMap<
2186		Hash,
2187		HashMap<CandidateIndex, CandidateHash>,
2188	> = HashMap::new();
2189
2190	// Retrieve `CoreIndices`/`CandidateIndices` as required by approval-distribution:
2191	for hash in relay_hashes {
2192		let entry = match db.load_block_entry(hash)? {
2193			None => {
2194				gum::debug!(
2195					target: LOG_TARGET,
2196					?candidate_hash,
2197					?hash,
2198					"Block entry for assignment missing."
2199				);
2200				continue;
2201			},
2202			Some(e) => e,
2203		};
2204		for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2205			if c_hash == &candidate_hash {
2206				candidate_indices.insert((*hash, candidate_index as u32));
2207			}
2208			candidate_indices_to_candidate_hashes
2209				.entry(*hash)
2210				.or_default()
2211				.insert(candidate_index as _, *c_hash);
2212		}
2213	}
2214
2215	let get_approvals = async move {
2216		let (tx_distribution, rx_distribution) = oneshot::channel();
2217		sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2218			candidate_indices,
2219			tx_distribution,
2220		));
2221
2222		// Because of the unbounded sending and the nature of the call (just fetching data from
2223		// state), this should not block long:
2224		match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2225			None => {
2226				gum::warn!(
2227					target: LOG_TARGET,
2228					"Waiting for approval signatures timed out - dead lock?"
2229				);
2230			},
2231			Some(Err(_)) => gum::debug!(
2232				target: LOG_TARGET,
2233				"Request for approval signatures got cancelled by `approval-distribution`."
2234			),
2235			Some(Ok(votes)) => {
2236				let votes = votes
2237					.into_iter()
2238					.filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2239						let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2240
2241						if candidates_hashes.is_none() {
2242							gum::warn!(
2243								target: LOG_TARGET,
2244								?hash,
2245								"Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2246							);
2247						}
2248
2249						let num_signed_candidates = signed_candidates_indices.len();
2250
2251						let signed_candidates_hashes: Vec<CandidateHash> =
2252							signed_candidates_indices
2253								.into_iter()
2254								.filter_map(|candidate_index| {
2255									candidates_hashes.and_then(|candidate_hashes| {
2256										if let Some(candidate_hash) =
2257											candidate_hashes.get(&candidate_index)
2258										{
2259											Some(*candidate_hash)
2260										} else {
2261											gum::warn!(
2262												target: LOG_TARGET,
2263												?candidate_index,
2264												"Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2265											);
2266											None
2267										}
2268									})
2269								})
2270								.collect();
2271						if num_signed_candidates == signed_candidates_hashes.len() {
2272							Some((validator_index, (signed_candidates_hashes, signature)))
2273						} else {
2274							gum::warn!(
2275								target: LOG_TARGET,
2276								"Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2277							);
2278							None
2279						}
2280					})
2281					.collect();
2282				send_votes(votes)
2283			},
2284		}
2285	};
2286
2287	// No need to block subsystem on this (also required to break cycle).
2288	// We should not be sending this message frequently - caller must make sure this is bounded.
2289	gum::trace!(
2290		target: LOG_TARGET,
2291		?candidate_hash,
2292		"Spawning task for fetching signatures from approval-distribution"
2293	);
2294	spawn_handle.spawn(
2295		"get-approval-signatures",
2296		Some("approval-voting-subsystem"),
2297		Box::pin(get_approvals),
2298	);
2299	Ok(())
2300}
2301
2302#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2303async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2304	sender: &mut Sender,
2305	db: &OverlayedBackend<'_, impl Backend>,
2306	target: Hash,
2307	lower_bound: BlockNumber,
2308	wakeups: &Wakeups,
2309	metrics: &Metrics,
2310) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2311	const MAX_TRACING_WINDOW: usize = 200;
2312	const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2313	const LOGGING_DEPTH_THRESHOLD: usize = 10;
2314
2315	let mut all_approved_max = None;
2316
2317	let target_number = {
2318		let (tx, rx) = oneshot::channel();
2319
2320		sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2321
2322		match rx.await {
2323			Ok(Ok(Some(n))) => n,
2324			Ok(Ok(None)) => return Ok(None),
2325			Ok(Err(_)) | Err(_) => return Ok(None),
2326		}
2327	};
2328
2329	if target_number <= lower_bound {
2330		return Ok(None);
2331	}
2332
2333	// request ancestors up to but not including the lower bound,
2334	// as a vote on the lower bound is implied if we cannot find
2335	// anything else.
2336	let ancestry = if target_number > lower_bound + 1 {
2337		let (tx, rx) = oneshot::channel();
2338
2339		sender
2340			.send_message(ChainApiMessage::Ancestors {
2341				hash: target,
2342				k: (target_number - (lower_bound + 1)) as usize,
2343				response_channel: tx,
2344			})
2345			.await;
2346
2347		match rx.await {
2348			Ok(Ok(a)) => a,
2349			Err(_) | Ok(Err(_)) => return Ok(None),
2350		}
2351	} else {
2352		Vec::new()
2353	};
2354	let ancestry_len = ancestry.len();
2355
2356	let mut block_descriptions = Vec::new();
2357
2358	let mut bits: BitVec<u8, Lsb0> = Default::default();
2359	for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2360		// Block entries should be present as the assumption is that
2361		// nothing here is finalized. If we encounter any missing block
2362		// entries we can fail.
2363		let entry = match db.load_block_entry(&block_hash)? {
2364			None => {
2365				let block_number = target_number.saturating_sub(i as u32);
2366				gum::info!(
2367					target: LOG_TARGET,
2368					unknown_number = ?block_number,
2369					unknown_hash = ?block_hash,
2370					"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2371					target,
2372					target_number,
2373					lower_bound,
2374					lower_bound,
2375				);
2376				return Ok(None);
2377			},
2378			Some(b) => b,
2379		};
2380
2381		// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
2382		// disk lookups.
2383		bits.push(entry.is_fully_approved());
2384		if entry.is_fully_approved() {
2385			if all_approved_max.is_none() {
2386				// First iteration of the loop is target, i = 0. After that,
2387				// ancestry is moving backwards.
2388				all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2389			}
2390			block_descriptions.push(BlockDescription {
2391				block_hash,
2392				session: entry.session(),
2393				candidates: entry
2394					.candidates()
2395					.iter()
2396					.map(|(_idx, candidate_hash)| *candidate_hash)
2397					.collect(),
2398			});
2399		} else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2400			all_approved_max = None;
2401			block_descriptions.clear();
2402		} else {
2403			all_approved_max = None;
2404			block_descriptions.clear();
2405
2406			let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2407			gum::debug!(
2408				target: LOG_TARGET,
2409				"Block {} is {} blocks deep and has {}/{} candidates unapproved",
2410				block_hash,
2411				bits.len() - 1,
2412				unapproved.len(),
2413				entry.candidates().len(),
2414			);
2415			if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2416			{
2417				gum::trace!(
2418					target: LOG_TARGET,
2419					?block_hash,
2420					"Unapproved candidates at depth {}: {:?}",
2421					bits.len(),
2422					unapproved
2423				)
2424			}
2425			metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2426			for candidate_hash in unapproved {
2427				match db.load_candidate_entry(&candidate_hash)? {
2428					None => {
2429						gum::warn!(
2430							target: LOG_TARGET,
2431							?candidate_hash,
2432							"Missing expected candidate in DB",
2433						);
2434
2435						continue;
2436					},
2437					Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2438						None => {
2439							gum::warn!(
2440								target: LOG_TARGET,
2441								?candidate_hash,
2442								?block_hash,
2443								"Missing expected approval entry under candidate.",
2444							);
2445						},
2446						Some(a_entry) => {
2447							let status = || {
2448								let n_assignments = a_entry.n_assignments();
2449
2450								// Take the approvals, filtered by the assignments
2451								// for this block.
2452								let n_approvals = c_entry
2453									.approvals()
2454									.iter()
2455									.by_vals()
2456									.enumerate()
2457									.filter(|(i, approved)| {
2458										*approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2459									})
2460									.count();
2461
2462								format!(
2463									"{}/{}/{}",
2464									n_assignments,
2465									n_approvals,
2466									a_entry.n_validators(),
2467								)
2468							};
2469
2470							match a_entry.our_assignment() {
2471								None => gum::debug!(
2472									target: LOG_TARGET,
2473									?candidate_hash,
2474									?block_hash,
2475									status = %status(),
2476									"no assignment."
2477								),
2478								Some(a) => {
2479									let tranche = a.tranche();
2480									let triggered = a.triggered();
2481
2482									let next_wakeup =
2483										wakeups.wakeup_for(block_hash, candidate_hash);
2484
2485									let approved =
2486										triggered && { a_entry.local_statements().1.is_some() };
2487
2488									gum::debug!(
2489										target: LOG_TARGET,
2490										?candidate_hash,
2491										?block_hash,
2492										tranche,
2493										?next_wakeup,
2494										status = %status(),
2495										triggered,
2496										approved,
2497										"assigned."
2498									);
2499								},
2500							}
2501						},
2502					},
2503				}
2504			}
2505		}
2506	}
2507
2508	gum::debug!(
2509		target: LOG_TARGET,
2510		"approved blocks {}-[{}]-{}",
2511		target_number,
2512		{
2513			// formatting to divide bits by groups of 10.
2514			// when comparing logs on multiple machines where the exact vote
2515			// targets may differ, this grouping is useful.
2516			let mut s = String::with_capacity(bits.len());
2517			for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2518				s.push(if *bit { '1' } else { '0' });
2519				if (target_number - i as u32).is_multiple_of(10) && i != bits.len() - 1 {
2520					s.push(' ');
2521				}
2522			}
2523
2524			s
2525		},
2526		if bits.len() > MAX_TRACING_WINDOW {
2527			format!(
2528				"{}... (truncated due to large window)",
2529				target_number - MAX_TRACING_WINDOW as u32 + 1,
2530			)
2531		} else {
2532			format!("{}", lower_bound + 1)
2533		},
2534	);
2535
2536	// `reverse()` to obtain the ascending order from lowest to highest
2537	// block within the candidates, which is the expected order
2538	block_descriptions.reverse();
2539
2540	let all_approved_max =
2541		all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2542			hash,
2543			number: block_number,
2544			descriptions: block_descriptions,
2545		});
2546
2547	Ok(all_approved_max)
2548}
2549
2550// `Option::cmp` treats `None` as less than `Some`.
2551fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2552	match (a, b) {
2553		(None, None) => None,
2554		(None, Some(x)) | (Some(x), None) => Some(x),
2555		(Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2556	}
2557}
2558
2559fn schedule_wakeup_action(
2560	approval_entry: &ApprovalEntry,
2561	block_hash: Hash,
2562	block_number: BlockNumber,
2563	candidate_hash: CandidateHash,
2564	block_tick: Tick,
2565	tick_now: Tick,
2566	required_tranches: RequiredTranches,
2567) -> Option<Action> {
2568	let maybe_action = match required_tranches {
2569		_ if approval_entry.is_approved() => None,
2570		RequiredTranches::All => None,
2571		RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2572			// Take the earlier of the next no show or the last assignment tick + required delay,
2573			// only considering the latter if it is after the current moment.
2574			min_prefer_some(
2575				last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2576				next_no_show,
2577			)
2578			.map(|tick| Action::ScheduleWakeup {
2579				block_hash,
2580				block_number,
2581				candidate_hash,
2582				tick,
2583			})
2584		},
2585		RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2586			// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
2587			// after `considered`, including any tranche that might contain our own untriggered
2588			// assignment.
2589			let next_non_empty_tranche = {
2590				let next_announced = approval_entry
2591					.tranches()
2592					.iter()
2593					.skip_while(|t| t.tranche() <= considered)
2594					.map(|t| t.tranche())
2595					.next();
2596
2597				let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2598					if !t.triggered() && t.tranche() > considered {
2599						Some(t.tranche())
2600					} else {
2601						None
2602					}
2603				});
2604
2605				// Apply the clock drift to these tranches.
2606				min_prefer_some(next_announced, our_untriggered)
2607					.map(|t| t as Tick + block_tick + clock_drift)
2608			};
2609
2610			min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2611				Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2612			})
2613		},
2614	};
2615
2616	match maybe_action {
2617		Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2618			target: LOG_TARGET,
2619			tick,
2620			?candidate_hash,
2621			?block_hash,
2622			block_tick,
2623			"Scheduling next wakeup.",
2624		),
2625		None => gum::trace!(
2626			target: LOG_TARGET,
2627			?candidate_hash,
2628			?block_hash,
2629			block_tick,
2630			"No wakeup needed.",
2631		),
2632		Some(_) => {}, // unreachable
2633	}
2634
2635	maybe_action
2636}
2637
2638async fn import_assignment<Sender>(
2639	sender: &mut Sender,
2640	state: &State,
2641	db: &mut OverlayedBackend<'_, impl Backend>,
2642	session_info_provider: &mut RuntimeInfo,
2643	checked_assignment: CheckedIndirectAssignment,
2644) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2645where
2646	Sender: SubsystemSender<RuntimeApiMessage>,
2647{
2648	let tick_now = state.clock.tick_now();
2649	let assignment = checked_assignment.assignment();
2650	let candidate_indices = checked_assignment.candidate_indices();
2651	let tranche = checked_assignment.tranche();
2652
2653	let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2654		Some(b) => b,
2655		None => {
2656			return Ok((
2657				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2658					assignment.block_hash,
2659				)),
2660				Vec::new(),
2661			))
2662		},
2663	};
2664
2665	let session_info = match get_session_info_by_index(
2666		session_info_provider,
2667		sender,
2668		block_entry.parent_hash(),
2669		block_entry.session(),
2670	)
2671	.await
2672	{
2673		Some(s) => s,
2674		None => {
2675			return Ok((
2676				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2677					block_entry.session(),
2678				)),
2679				Vec::new(),
2680			))
2681		},
2682	};
2683
2684	let n_cores = session_info.n_cores as usize;
2685
2686	// Early check the candidate bitfield and core bitfields lengths < `n_cores`.
2687	// Core bitfield length is checked later in `check_assignment_cert`.
2688	if candidate_indices.len() > n_cores {
2689		gum::debug!(
2690			target: LOG_TARGET,
2691			validator = assignment.validator.0,
2692			n_cores,
2693			candidate_bitfield_len = ?candidate_indices.len(),
2694			"Oversized bitfield",
2695		);
2696
2697		return Ok((
2698			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2699				candidate_indices.len(),
2700			)),
2701			Vec::new(),
2702		));
2703	}
2704
2705	let mut claimed_core_indices = Vec::new();
2706	let mut assigned_candidate_hashes = Vec::new();
2707
2708	for candidate_index in candidate_indices.iter_ones() {
2709		let (claimed_core_index, assigned_candidate_hash) =
2710			match block_entry.candidate(candidate_index) {
2711				Some((c, h)) => (*c, *h),
2712				None => {
2713					return Ok((
2714						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2715							candidate_index as _,
2716						)),
2717						Vec::new(),
2718					))
2719				}, // no candidate at core.
2720			};
2721
2722		let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2723			Some(c) => c,
2724			None => {
2725				return Ok((
2726					AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2727						candidate_index as _,
2728						assigned_candidate_hash,
2729					)),
2730					Vec::new(),
2731				))
2732			}, // no candidate at core.
2733		};
2734
2735		if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2736			return Ok((
2737				AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2738					assignment.block_hash,
2739					assigned_candidate_hash,
2740				)),
2741				Vec::new(),
2742			));
2743		};
2744
2745		claimed_core_indices.push(claimed_core_index);
2746		assigned_candidate_hashes.push(assigned_candidate_hash);
2747	}
2748
2749	// Error on null assignments.
2750	if claimed_core_indices.is_empty() {
2751		return Ok((
2752			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2753				assignment.validator,
2754				format!("{:?}", InvalidAssignmentReason::NullAssignment),
2755			)),
2756			Vec::new(),
2757		));
2758	}
2759
2760	let mut actions = Vec::new();
2761	let res = {
2762		let mut is_duplicate = true;
2763		// Import the assignments for all cores in the cert.
2764		for (assigned_candidate_hash, candidate_index) in
2765			assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2766		{
2767			let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2768				Some(c) => c,
2769				None => {
2770					return Ok((
2771						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2772							candidate_index as _,
2773							*assigned_candidate_hash,
2774						)),
2775						Vec::new(),
2776					))
2777				},
2778			};
2779
2780			let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2781				Some(a) => a,
2782				None => {
2783					return Ok((
2784						AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2785							assignment.block_hash,
2786							*assigned_candidate_hash,
2787						)),
2788						Vec::new(),
2789					))
2790				},
2791			};
2792
2793			let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2794			is_duplicate &= is_duplicate_for_candidate;
2795			approval_entry.import_assignment(
2796				tranche,
2797				assignment.validator,
2798				tick_now,
2799				is_duplicate_for_candidate,
2800			);
2801
2802			// We've imported a new assignment, so we need to schedule a wake-up for when that might
2803			// no-show.
2804			if let Some((approval_entry, status)) = state
2805				.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2806				.await
2807			{
2808				actions.extend(schedule_wakeup_action(
2809					approval_entry,
2810					block_entry.block_hash(),
2811					block_entry.block_number(),
2812					*assigned_candidate_hash,
2813					status.block_tick,
2814					tick_now,
2815					status.required_tranches,
2816				));
2817			}
2818
2819			// We also write the candidate entry as it now contains the new candidate.
2820			db.write_candidate_entry(candidate_entry.into());
2821		}
2822
2823		// Since we don't account for tranche in distribution message fingerprinting, some
2824		// validators can be assigned to the same core (VRF modulo vs VRF delay). These can be
2825		// safely ignored. However, if an assignment is for multiple cores (these are only
2826		// tranche0), we cannot ignore it, because it would mean ignoring other non duplicate
2827		// assignments.
2828		if is_duplicate {
2829			AssignmentCheckResult::AcceptedDuplicate
2830		} else if candidate_indices.count_ones() > 1 {
2831			gum::trace!(
2832				target: LOG_TARGET,
2833				validator = assignment.validator.0,
2834				candidate_hashes = ?assigned_candidate_hashes,
2835				assigned_cores = ?claimed_core_indices,
2836				?tranche,
2837				"Imported assignments for multiple cores.",
2838			);
2839
2840			AssignmentCheckResult::Accepted
2841		} else {
2842			gum::trace!(
2843				target: LOG_TARGET,
2844				validator = assignment.validator.0,
2845				candidate_hashes = ?assigned_candidate_hashes,
2846				assigned_cores = ?claimed_core_indices,
2847				"Imported assignment for a single core.",
2848			);
2849
2850			AssignmentCheckResult::Accepted
2851		}
2852	};
2853
2854	Ok((res, actions))
2855}
2856
2857async fn import_approval<Sender>(
2858	sender: &mut Sender,
2859	state: &mut State,
2860	db: &mut OverlayedBackend<'_, impl Backend>,
2861	session_info_provider: &mut RuntimeInfo,
2862	metrics: &Metrics,
2863	approval: CheckedIndirectSignedApprovalVote,
2864	wakeups: &Wakeups,
2865) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2866where
2867	Sender: SubsystemSender<RuntimeApiMessage>,
2868{
2869	macro_rules! respond_early {
2870		($e: expr) => {{
2871			return Ok((Vec::new(), $e));
2872		}};
2873	}
2874
2875	let block_entry = match db.load_block_entry(&approval.block_hash)? {
2876		Some(b) => b,
2877		None => {
2878			respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2879				approval.block_hash
2880			),))
2881		},
2882	};
2883
2884	let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2885		approval
2886			.candidate_indices
2887			.iter_ones()
2888			.map(|candidate_index| {
2889				block_entry
2890					.candidate(candidate_index)
2891					.ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2892					.map(|candidate| (candidate_index as _, candidate.1))
2893			})
2894			.collect();
2895
2896	let approved_candidates_info = match approved_candidates_info {
2897		Ok(approved_candidates_info) => approved_candidates_info,
2898		Err(err) => {
2899			respond_early!(ApprovalCheckResult::Bad(err))
2900		},
2901	};
2902
2903	gum::trace!(
2904		target: LOG_TARGET,
2905		"Received approval for num_candidates {:}",
2906		approval.candidate_indices.count_ones()
2907	);
2908
2909	let mut actions = Vec::new();
2910	for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2911		let block_entry = match db.load_block_entry(&approval.block_hash)? {
2912			Some(b) => b,
2913			None => {
2914				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2915					approval.block_hash
2916				),))
2917			},
2918		};
2919
2920		let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2921			Some(c) => c,
2922			None => {
2923				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2924					approval_candidate_index,
2925					approved_candidate_hash
2926				),))
2927			},
2928		};
2929
2930		// Don't accept approvals until assignment.
2931		match candidate_entry.approval_entry(&approval.block_hash) {
2932			None => {
2933				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2934					approval.block_hash,
2935					approved_candidate_hash
2936				),))
2937			},
2938			Some(e) if !e.is_assigned(approval.validator) => {
2939				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2940					approval.validator
2941				),))
2942			},
2943			_ => {},
2944		}
2945
2946		gum::trace!(
2947			target: LOG_TARGET,
2948			validator_index = approval.validator.0,
2949			candidate_hash = ?approved_candidate_hash,
2950			para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2951			"Importing approval vote",
2952		);
2953
2954		let new_actions = advance_approval_state(
2955			sender,
2956			state,
2957			db,
2958			session_info_provider,
2959			&metrics,
2960			block_entry,
2961			approved_candidate_hash,
2962			candidate_entry,
2963			ApprovalStateTransition::RemoteApproval(approval.validator),
2964			wakeups,
2965		)
2966		.await;
2967		actions.extend(new_actions);
2968	}
2969
2970	// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
2971	Ok((actions, ApprovalCheckResult::Accepted))
2972}
2973
2974#[derive(Debug)]
2975enum ApprovalStateTransition {
2976	RemoteApproval(ValidatorIndex),
2977	LocalApproval(ValidatorIndex),
2978	WakeupProcessed,
2979}
2980
2981impl ApprovalStateTransition {
2982	fn validator_index(&self) -> Option<ValidatorIndex> {
2983		match *self {
2984			ApprovalStateTransition::RemoteApproval(v) |
2985			ApprovalStateTransition::LocalApproval(v) => Some(v),
2986			ApprovalStateTransition::WakeupProcessed => None,
2987		}
2988	}
2989
2990	fn is_local_approval(&self) -> bool {
2991		match *self {
2992			ApprovalStateTransition::RemoteApproval(_) => false,
2993			ApprovalStateTransition::LocalApproval(_) => true,
2994			ApprovalStateTransition::WakeupProcessed => false,
2995		}
2996	}
2997
2998	fn is_remote_approval(&self) -> bool {
2999		matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3000	}
3001}
3002
3003// Advance the approval state, either by importing an approval vote which is already checked to be
3004// valid and corresponding to an assigned validator on the candidate and block, or by noting that
3005// there are no further wakeups or tranches needed. This updates the block entry and candidate entry
3006// as necessary and schedules any further wakeups.
3007async fn advance_approval_state<Sender>(
3008	sender: &mut Sender,
3009	state: &mut State,
3010	db: &mut OverlayedBackend<'_, impl Backend>,
3011	session_info_provider: &mut RuntimeInfo,
3012	metrics: &Metrics,
3013	mut block_entry: BlockEntry,
3014	candidate_hash: CandidateHash,
3015	mut candidate_entry: CandidateEntry,
3016	transition: ApprovalStateTransition,
3017	wakeups: &Wakeups,
3018) -> Vec<Action>
3019where
3020	Sender: SubsystemSender<RuntimeApiMessage>,
3021{
3022	let validator_index = transition.validator_index();
3023
3024	let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3025	let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3026
3027	// Check for early exits.
3028	//
3029	// If the candidate was approved
3030	// but not the block, it means that we still need more approvals for the candidate under the
3031	// block.
3032	//
3033	// If the block was approved, but the validator hadn't approved it yet, we should still hold
3034	// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
3035	// assignment might manifest as a no-show.
3036	if !transition.is_local_approval() {
3037		// We don't store remote votes and there's nothing to store for processed wakeups,
3038		// so we can early exit as long at the candidate is already concluded under the
3039		// block i.e. we don't need more approvals.
3040		if candidate_approved_in_block {
3041			return Vec::new();
3042		}
3043	}
3044
3045	let mut actions = Vec::new();
3046	let block_hash = block_entry.block_hash();
3047	let block_number = block_entry.block_number();
3048	let session_index = block_entry.session();
3049	let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3050	let tick_now = state.clock.tick_now();
3051
3052	let (is_approved, status) = if let Some((approval_entry, status)) = state
3053		.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3054		.await
3055	{
3056		let check = approval_checking::check_approval(
3057			&candidate_entry,
3058			approval_entry,
3059			status.required_tranches.clone(),
3060		);
3061		state.observe_assignment_gathering_status(
3062			&metrics,
3063			&status.required_tranches,
3064			block_hash,
3065			block_entry.block_number(),
3066			candidate_hash,
3067		);
3068
3069		// Check whether this is approved, while allowing a maximum
3070		// assignment tick of `now - APPROVAL_DELAY` - that is, that
3071		// all counted assignments are at least `APPROVAL_DELAY` ticks old.
3072		let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3073		if status.last_no_shows != 0 {
3074			metrics.on_observed_no_shows(status.last_no_shows);
3075			gum::trace!(
3076				target: LOG_TARGET,
3077				?candidate_hash,
3078				?block_hash,
3079				last_no_shows = ?status.last_no_shows,
3080				"Observed no_shows",
3081			);
3082		}
3083		if is_approved {
3084			gum::trace!(
3085				target: LOG_TARGET,
3086				?candidate_hash,
3087				?block_hash,
3088				"Candidate approved under block.",
3089			);
3090
3091			let no_shows = check.known_no_shows();
3092
3093			let was_block_approved = block_entry.is_fully_approved();
3094			block_entry.mark_approved_by_hash(&candidate_hash);
3095			let is_block_approved = block_entry.is_fully_approved();
3096
3097			if no_shows != 0 {
3098				metrics.on_no_shows(no_shows);
3099			}
3100			if check == Check::ApprovedOneThird {
3101				// No-shows are not counted when more than one third of validators approve a
3102				// candidate, so count candidates where more than one third of validators had to
3103				// approve it, this is indicative of something breaking.
3104				metrics.on_approved_by_one_third()
3105			}
3106
3107			metrics.on_candidate_approved(status.tranche_now as _);
3108
3109			if is_block_approved && !was_block_approved {
3110				metrics.on_block_approved(status.tranche_now as _);
3111				actions.push(Action::NoteApprovedInChainSelection(block_hash));
3112			}
3113
3114			db.write_block_entry(block_entry.into());
3115		} else if transition.is_local_approval() {
3116			// Local approvals always update the block_entry, so we need to flush it to
3117			// the database.
3118			db.write_block_entry(block_entry.into());
3119		}
3120
3121		(is_approved, status)
3122	} else {
3123		gum::warn!(
3124			target: LOG_TARGET,
3125			?candidate_hash,
3126			?block_hash,
3127			?validator_index,
3128			"No approval entry for approval under block",
3129		);
3130
3131		return Vec::new();
3132	};
3133
3134	{
3135		let approval_entry = candidate_entry
3136			.approval_entry_mut(&block_hash)
3137			.expect("Approval entry just fetched; qed");
3138
3139		let was_approved = approval_entry.is_approved();
3140		let newly_approved = is_approved && !was_approved;
3141
3142		if is_approved {
3143			approval_entry.mark_approved();
3144		}
3145		if newly_approved {
3146			state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3147		}
3148		actions.extend(schedule_wakeup_action(
3149			&approval_entry,
3150			block_hash,
3151			block_number,
3152			candidate_hash,
3153			status.block_tick,
3154			tick_now,
3155			status.required_tranches,
3156		));
3157
3158		if is_approved && transition.is_remote_approval() {
3159			// Make sure we wake other blocks in case they have
3160			// a no-show that might be covered by this approval.
3161			for (fork_block_hash, fork_approval_entry) in candidate_entry
3162				.block_assignments
3163				.iter()
3164				.filter(|(hash, _)| **hash != block_hash)
3165			{
3166				let assigned_on_fork_block = validator_index
3167					.as_ref()
3168					.map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3169					.unwrap_or_default();
3170				if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3171					!fork_approval_entry.is_approved() &&
3172					assigned_on_fork_block
3173				{
3174					let fork_block_entry = db.load_block_entry(fork_block_hash);
3175					if let Ok(Some(fork_block_entry)) = fork_block_entry {
3176						actions.push(Action::ScheduleWakeup {
3177							block_hash: *fork_block_hash,
3178							block_number: fork_block_entry.block_number(),
3179							candidate_hash,
3180							// Schedule the wakeup next tick, since the assignment must be a
3181							// no-show, because there is no-wakeup scheduled.
3182							tick: tick_now + 1,
3183						})
3184					} else {
3185						gum::debug!(
3186							target: LOG_TARGET,
3187							?fork_block_entry,
3188							?fork_block_hash,
3189							"Failed to load block entry"
3190						)
3191					}
3192				}
3193			}
3194		}
3195		// We have no need to write the candidate entry if all of the following
3196		// is true:
3197		//
3198		// 1. This is not a local approval, as we don't store anything new in the approval entry.
3199		// 2. The candidate is not newly approved, as we haven't altered the approval entry's
3200		//    approved flag with `mark_approved` above.
3201		// 3. The approver, if any, had already approved the candidate, as we haven't altered the
3202		// bitfield.
3203		if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3204		{
3205			// In all other cases, we need to write the candidate entry.
3206			db.write_candidate_entry(candidate_entry);
3207		}
3208	}
3209
3210	actions
3211}
3212
3213fn should_trigger_assignment(
3214	approval_entry: &ApprovalEntry,
3215	candidate_entry: &CandidateEntry,
3216	required_tranches: RequiredTranches,
3217	tranche_now: DelayTranche,
3218) -> bool {
3219	match approval_entry.our_assignment() {
3220		None => false,
3221		Some(ref assignment) if assignment.triggered() => false,
3222		Some(ref assignment) if assignment.tranche() == 0 => true,
3223		Some(ref assignment) => {
3224			match required_tranches {
3225				RequiredTranches::All => !approval_checking::check_approval(
3226					&candidate_entry,
3227					&approval_entry,
3228					RequiredTranches::All,
3229				)
3230				// when all are required, we are just waiting for the first 1/3+
3231				.is_approved(Tick::max_value()),
3232				RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3233					let drifted_tranche_now =
3234						tranche_now.saturating_sub(clock_drift as DelayTranche);
3235					assignment.tranche() <= maximum_broadcast &&
3236						assignment.tranche() <= drifted_tranche_now
3237				},
3238				RequiredTranches::Exact { .. } => {
3239					// indicates that no new assignments are needed at the moment.
3240					false
3241				},
3242			}
3243		},
3244	}
3245}
3246
3247async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3248	sender: &mut Sender,
3249	state: &mut State,
3250	db: &mut OverlayedBackend<'_, impl Backend>,
3251	session_info_provider: &mut RuntimeInfo,
3252	relay_block: Hash,
3253	candidate_hash: CandidateHash,
3254	metrics: &Metrics,
3255	wakeups: &Wakeups,
3256) -> SubsystemResult<Vec<Action>> {
3257	let block_entry = db.load_block_entry(&relay_block)?;
3258	let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3259
3260	// If either is not present, we have nothing to wakeup. Might have lost a race with finality
3261	let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3262		(Some(b), Some(c)) => (b, c),
3263		_ => return Ok(Vec::new()),
3264	};
3265
3266	let (no_show_slots, needed_approvals) = match get_session_info_by_index(
3267		session_info_provider,
3268		sender,
3269		block_entry.block_hash(),
3270		block_entry.session(),
3271	)
3272	.await
3273	{
3274		Some(i) => (i.no_show_slots, i.needed_approvals),
3275		None => return Ok(Vec::new()),
3276	};
3277
3278	let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3279	let no_show_duration =
3280		slot_number_to_tick(state.slot_duration_millis, Slot::from(u64::from(no_show_slots)));
3281	let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3282
3283	gum::trace!(
3284		target: LOG_TARGET,
3285		tranche = tranche_now,
3286		?candidate_hash,
3287		block_hash = ?relay_block,
3288		"Processing wakeup",
3289	);
3290
3291	let (should_trigger, backing_group) = {
3292		let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3293			Some(e) => e,
3294			None => return Ok(Vec::new()),
3295		};
3296
3297		let tranches_to_approve = approval_checking::tranches_to_approve(
3298			&approval_entry,
3299			candidate_entry.approvals(),
3300			tranche_now,
3301			block_tick,
3302			no_show_duration,
3303			needed_approvals as _,
3304		);
3305
3306		let should_trigger = should_trigger_assignment(
3307			&approval_entry,
3308			&candidate_entry,
3309			tranches_to_approve.required_tranches,
3310			tranche_now,
3311		);
3312
3313		(should_trigger, approval_entry.backing_group())
3314	};
3315
3316	gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3317
3318	let mut actions = Vec::new();
3319	let candidate_receipt = candidate_entry.candidate_receipt().clone();
3320
3321	let maybe_cert = if should_trigger {
3322		let maybe_cert = {
3323			let approval_entry = candidate_entry
3324				.approval_entry_mut(&relay_block)
3325				.expect("should_trigger only true if this fetched earlier; qed");
3326
3327			approval_entry.trigger_our_assignment(state.clock.tick_now())
3328		};
3329
3330		db.write_candidate_entry(candidate_entry.clone());
3331
3332		maybe_cert
3333	} else {
3334		None
3335	};
3336
3337	if let Some((cert, val_index, tranche)) = maybe_cert {
3338		let indirect_cert =
3339			IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3340
3341		gum::trace!(
3342			target: LOG_TARGET,
3343			?candidate_hash,
3344			para_id = ?candidate_receipt.descriptor.para_id(),
3345			block_hash = ?relay_block,
3346			"Launching approval work.",
3347		);
3348
3349		let candidate_core_index = block_entry
3350			.candidates()
3351			.iter()
3352			.find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3353
3354		if let Some(claimed_core_indices) =
3355			get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3356		{
3357			match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3358				Ok(claimed_candidate_indices) => {
3359					// Ensure we distribute multiple core assignments just once.
3360					let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3361						!block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3362					} else {
3363						true
3364					};
3365					db.write_block_entry(block_entry.clone());
3366					actions.push(Action::LaunchApproval {
3367						claimed_candidate_indices,
3368						candidate_hash,
3369						indirect_cert,
3370						assignment_tranche: tranche,
3371						relay_block_hash: relay_block,
3372						session: block_entry.session(),
3373						candidate: candidate_receipt,
3374						backing_group,
3375						distribute_assignment,
3376						core_index: candidate_core_index,
3377					});
3378				},
3379				Err(err) => {
3380					// Never happens, it should only happen if no cores are claimed, which is a
3381					// bug.
3382					gum::warn!(
3383						target: LOG_TARGET,
3384						block_hash = ?relay_block,
3385						?err,
3386						"Failed to create assignment bitfield"
3387					);
3388				},
3389			};
3390		} else {
3391			gum::warn!(
3392				target: LOG_TARGET,
3393				block_hash = ?relay_block,
3394				?candidate_hash,
3395				"Cannot get assignment claimed core indices",
3396			);
3397		}
3398	}
3399	// Although we checked approval earlier in this function,
3400	// this wakeup might have advanced the state to approved via
3401	// a no-show that was immediately covered and therefore
3402	// we need to check for that and advance the state on-disk.
3403	//
3404	// Note that this function also schedules a wakeup as necessary.
3405	actions.extend(
3406		advance_approval_state(
3407			sender,
3408			state,
3409			db,
3410			session_info_provider,
3411			metrics,
3412			block_entry,
3413			candidate_hash,
3414			candidate_entry,
3415			ApprovalStateTransition::WakeupProcessed,
3416			wakeups,
3417		)
3418		.await,
3419	);
3420
3421	Ok(actions)
3422}
3423
3424// Launch approval work, returning an `AbortHandle` which corresponds to the background task
3425// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
3426// to cancel the background work and any requests it has spawned.
3427#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3428async fn launch_approval<
3429	Sender: SubsystemSender<RuntimeApiMessage>
3430		+ SubsystemSender<AvailabilityRecoveryMessage>
3431		+ SubsystemSender<DisputeCoordinatorMessage>
3432		+ SubsystemSender<CandidateValidationMessage>,
3433>(
3434	mut sender: Sender,
3435	spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3436	metrics: Metrics,
3437	session_index: SessionIndex,
3438	candidate: CandidateReceipt,
3439	validator_index: ValidatorIndex,
3440	block_hash: Hash,
3441	backing_group: GroupIndex,
3442	core_index: Option<CoreIndex>,
3443	retry: RetryApprovalInfo,
3444) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3445	let (a_tx, a_rx) = oneshot::channel();
3446	let (code_tx, code_rx) = oneshot::channel();
3447
3448	// The background future returned by this function may
3449	// be dropped before completing. This guard is used to ensure that the approval
3450	// work is correctly counted as stale even if so.
3451	struct StaleGuard(Option<Metrics>);
3452
3453	impl StaleGuard {
3454		fn take(mut self) -> Metrics {
3455			self.0.take().expect(
3456				"
3457				consumed after take; so this cannot be called twice; \
3458				nothing in this function reaches into the struct to avoid this API; \
3459				qed
3460			",
3461			)
3462		}
3463	}
3464
3465	impl Drop for StaleGuard {
3466		fn drop(&mut self) {
3467			if let Some(metrics) = self.0.as_ref() {
3468				metrics.on_approval_stale();
3469			}
3470		}
3471	}
3472
3473	let candidate_hash = candidate.hash();
3474	let para_id = candidate.descriptor.para_id();
3475	let mut next_retry = None;
3476	gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3477
3478	let timer = metrics.time_recover_and_approve();
3479	sender
3480		.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3481			candidate.clone(),
3482			session_index,
3483			Some(backing_group),
3484			core_index,
3485			a_tx,
3486		))
3487		.await;
3488
3489	sender
3490		.send_message(RuntimeApiMessage::Request(
3491			block_hash,
3492			RuntimeApiRequest::ValidationCodeByHash(
3493				candidate.descriptor.validation_code_hash(),
3494				code_tx,
3495			),
3496		))
3497		.await;
3498
3499	let candidate = candidate.clone();
3500	let metrics_guard = StaleGuard(Some(metrics));
3501	let background = async move {
3502		// Force the move of the timer into the background task.
3503		let _timer = timer;
3504		let available_data = match a_rx.await {
3505			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3506			Ok(Ok(a)) => a,
3507			Ok(Err(e)) => {
3508				match &e {
3509					&RecoveryError::Unavailable => {
3510						gum::warn!(
3511							target: LOG_TARGET,
3512							?para_id,
3513							?candidate_hash,
3514							attempts_remaining = retry.attempts_remaining,
3515							"Data unavailable for candidate {:?}",
3516							(candidate_hash, candidate.descriptor.para_id()),
3517						);
3518						// Availability could fail if we did not discover much of the network, so
3519						// let's back off and order the subsystem to retry at a later point if the
3520						// approval is still needed, because no-show wasn't covered yet.
3521						if retry.attempts_remaining > 0 {
3522							Delay::new(retry.backoff).await;
3523							next_retry = Some(RetryApprovalInfo {
3524								candidate,
3525								backing_group,
3526								core_index,
3527								session_index,
3528								attempts_remaining: retry.attempts_remaining - 1,
3529								backoff: retry.backoff,
3530							});
3531						} else {
3532							next_retry = None;
3533						}
3534						metrics_guard.take().on_approval_unavailable();
3535					},
3536					&RecoveryError::ChannelClosed => {
3537						gum::warn!(
3538							target: LOG_TARGET,
3539							?para_id,
3540							?candidate_hash,
3541							"Channel closed while recovering data for candidate {:?}",
3542							(candidate_hash, candidate.descriptor.para_id()),
3543						);
3544						// do nothing. we'll just be a no-show and that'll cause others to rise up.
3545						metrics_guard.take().on_approval_unavailable();
3546					},
3547					&RecoveryError::Invalid => {
3548						gum::warn!(
3549							target: LOG_TARGET,
3550							?para_id,
3551							?candidate_hash,
3552							"Data recovery invalid for candidate {:?}",
3553							(candidate_hash, candidate.descriptor.para_id()),
3554						);
3555						issue_local_invalid_statement(
3556							&mut sender,
3557							session_index,
3558							candidate_hash,
3559							candidate.clone(),
3560						);
3561						metrics_guard.take().on_approval_invalid();
3562					},
3563				}
3564				return ApprovalState::failed_with_retry(
3565					validator_index,
3566					candidate_hash,
3567					next_retry,
3568				);
3569			},
3570		};
3571
3572		let validation_code = match code_rx.await {
3573			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3574			Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3575			Ok(Ok(Some(code))) => code,
3576			Ok(Ok(None)) => {
3577				gum::warn!(
3578					target: LOG_TARGET,
3579					"Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3580					candidate.descriptor.relay_parent(),
3581					block_hash,
3582				);
3583
3584				// No dispute necessary, as this indicates that the chain is not behaving
3585				// according to expectations.
3586				metrics_guard.take().on_approval_unavailable();
3587				return ApprovalState::failed(validator_index, candidate_hash);
3588			},
3589		};
3590
3591		let (val_tx, val_rx) = oneshot::channel();
3592		sender
3593			.send_message(CandidateValidationMessage::ValidateFromExhaustive {
3594				validation_data: available_data.validation_data,
3595				validation_code,
3596				candidate_receipt: candidate.clone(),
3597				pov: available_data.pov,
3598				scheduling_session_index: session_index,
3599				exec_kind: PvfExecKind::Approval,
3600				response_sender: val_tx,
3601			})
3602			.await;
3603
3604		match val_rx.await {
3605			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3606			Ok(Ok(ValidationResult::Valid(_, _))) => {
3607				// Validation checked out. Issue an approval command. If the underlying service is
3608				// unreachable, then there isn't anything we can do.
3609
3610				gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3611
3612				let _ = metrics_guard.take();
3613				return ApprovalState::approved(validator_index, candidate_hash);
3614			},
3615			Ok(Ok(ValidationResult::Invalid(reason))) => {
3616				gum::warn!(
3617					target: LOG_TARGET,
3618					?reason,
3619					?candidate_hash,
3620					?para_id,
3621					"Detected invalid candidate as an approval checker.",
3622				);
3623
3624				issue_local_invalid_statement(
3625					&mut sender,
3626					session_index,
3627					candidate_hash,
3628					candidate.clone(),
3629				);
3630				metrics_guard.take().on_approval_invalid();
3631				return ApprovalState::failed(validator_index, candidate_hash);
3632			},
3633			Ok(Err(e)) => {
3634				gum::error!(
3635					target: LOG_TARGET,
3636					err = ?e,
3637					?candidate_hash,
3638					?para_id,
3639					"Failed to validate candidate due to internal error",
3640				);
3641				metrics_guard.take().on_approval_error();
3642				return ApprovalState::failed(validator_index, candidate_hash);
3643			},
3644		}
3645	};
3646	let (background, remote_handle) = background.remote_handle();
3647	spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3648	Ok(remote_handle)
3649}
3650
3651// Issue and import a local approval vote. Should only be invoked after approval checks
3652// have been done.
3653#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3654async fn issue_approval<
3655	Sender: SubsystemSender<RuntimeApiMessage>,
3656	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3657>(
3658	sender: &mut Sender,
3659	approval_voting_sender: &mut ADSender,
3660	state: &mut State,
3661	db: &mut OverlayedBackend<'_, impl Backend>,
3662	session_info_provider: &mut RuntimeInfo,
3663	metrics: &Metrics,
3664	candidate_hash: CandidateHash,
3665	delayed_approvals_timers: &mut DelayedApprovalTimer,
3666	ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3667	wakeups: &Wakeups,
3668) -> SubsystemResult<Vec<Action>> {
3669	let mut block_entry = match db.load_block_entry(&block_hash)? {
3670		Some(b) => b,
3671		None => {
3672			// not a cause for alarm - just lost a race with pruning, most likely.
3673			metrics.on_approval_stale();
3674			return Ok(Vec::new());
3675		},
3676	};
3677
3678	let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3679	{
3680		None => {
3681			gum::warn!(
3682				target: LOG_TARGET,
3683				"Candidate hash {} is not present in the block entry's candidates for relay block {}",
3684				candidate_hash,
3685				block_entry.parent_hash(),
3686			);
3687
3688			metrics.on_approval_error();
3689			return Ok(Vec::new());
3690		},
3691		Some(idx) => idx,
3692	};
3693
3694	let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3695		Some((_, h)) => *h,
3696		None => {
3697			gum::warn!(
3698				target: LOG_TARGET,
3699				"Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3700				candidate_index,
3701				block_hash,
3702			);
3703
3704			metrics.on_approval_error();
3705			return Ok(Vec::new());
3706		},
3707	};
3708
3709	let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3710		Some(c) => c,
3711		None => {
3712			gum::warn!(
3713				target: LOG_TARGET,
3714				"Missing entry for candidate index {} included at block {:?}",
3715				candidate_index,
3716				block_hash,
3717			);
3718
3719			metrics.on_approval_error();
3720			return Ok(Vec::new());
3721		},
3722	};
3723
3724	let session_info = match get_session_info_by_index(
3725		session_info_provider,
3726		sender,
3727		block_entry.parent_hash(),
3728		block_entry.session(),
3729	)
3730	.await
3731	{
3732		Some(s) => s,
3733		None => return Ok(Vec::new()),
3734	};
3735
3736	if block_entry
3737		.defer_candidate_signature(
3738			candidate_index as _,
3739			candidate_hash,
3740			compute_delayed_approval_sending_tick(
3741				state,
3742				&block_entry,
3743				&candidate_entry,
3744				session_info,
3745				&metrics,
3746			),
3747		)
3748		.is_some()
3749	{
3750		gum::error!(
3751			target: LOG_TARGET,
3752			?candidate_hash,
3753			?block_hash,
3754			validator_index = validator_index.0,
3755			"Possible bug, we shouldn't have to defer a candidate more than once",
3756		);
3757	}
3758
3759	gum::debug!(
3760		target: LOG_TARGET,
3761		?candidate_hash,
3762		?block_hash,
3763		validator_index = validator_index.0,
3764		"Ready to issue approval vote",
3765	);
3766
3767	let actions = advance_approval_state(
3768		sender,
3769		state,
3770		db,
3771		session_info_provider,
3772		metrics,
3773		block_entry,
3774		candidate_hash,
3775		candidate_entry,
3776		ApprovalStateTransition::LocalApproval(validator_index as _),
3777		wakeups,
3778	)
3779	.await;
3780
3781	if let Some(next_wakeup) = maybe_create_signature(
3782		db,
3783		session_info_provider,
3784		state,
3785		sender,
3786		approval_voting_sender,
3787		block_hash,
3788		validator_index,
3789		metrics,
3790	)
3791	.await?
3792	{
3793		delayed_approvals_timers.maybe_arm_timer(
3794			next_wakeup,
3795			state.clock.as_ref(),
3796			block_hash,
3797			validator_index,
3798		);
3799	}
3800	Ok(actions)
3801}
3802
3803// Create signature for the approved candidates pending signatures
3804#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3805async fn maybe_create_signature<
3806	Sender: SubsystemSender<RuntimeApiMessage>,
3807	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3808>(
3809	db: &mut OverlayedBackend<'_, impl Backend>,
3810	session_info_provider: &mut RuntimeInfo,
3811	state: &State,
3812	sender: &mut Sender,
3813	approval_voting_sender: &mut ADSender,
3814	block_hash: Hash,
3815	validator_index: ValidatorIndex,
3816	metrics: &Metrics,
3817) -> SubsystemResult<Option<Tick>> {
3818	let mut block_entry = match db.load_block_entry(&block_hash)? {
3819		Some(b) => b,
3820		None => {
3821			// not a cause for alarm - just lost a race with pruning, most likely.
3822			metrics.on_approval_stale();
3823			gum::debug!(
3824				target: LOG_TARGET,
3825				"Could not find block that needs signature {:}", block_hash
3826			);
3827			return Ok(None);
3828		},
3829	};
3830
3831	let approval_params = state
3832		.get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3833		.await
3834		.unwrap_or_default();
3835
3836	gum::trace!(
3837		target: LOG_TARGET,
3838		"Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3839	);
3840	let tick_now = state.clock.tick_now();
3841
3842	let (candidates_to_sign, sign_no_later_then) = block_entry
3843		.get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3844
3845	let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3846		Some(candidates_to_sign) => candidates_to_sign,
3847		None => return Ok(sign_no_later_then),
3848	};
3849
3850	let session_info = match get_session_info_by_index(
3851		session_info_provider,
3852		sender,
3853		block_entry.parent_hash(),
3854		block_entry.session(),
3855	)
3856	.await
3857	{
3858		Some(s) => s,
3859		None => {
3860			metrics.on_approval_error();
3861			gum::error!(
3862				target: LOG_TARGET,
3863				"Could not retrieve the session"
3864			);
3865			return Ok(None);
3866		},
3867	};
3868
3869	let validator_pubkey = match session_info.validators.get(validator_index) {
3870		Some(p) => p,
3871		None => {
3872			gum::error!(
3873				target: LOG_TARGET,
3874				"Validator index {} out of bounds in session {}",
3875				validator_index.0,
3876				block_entry.session(),
3877			);
3878
3879			metrics.on_approval_error();
3880			return Ok(None);
3881		},
3882	};
3883
3884	let signature = match sign_approval(
3885		&state.keystore,
3886		&validator_pubkey,
3887		&candidates_hashes,
3888		block_entry.session(),
3889	) {
3890		Some(sig) => sig,
3891		None => {
3892			gum::error!(
3893				target: LOG_TARGET,
3894				validator_index = ?validator_index,
3895				session = ?block_entry.session(),
3896				"Could not issue approval signature. Assignment key present but not validator key?",
3897			);
3898
3899			metrics.on_approval_error();
3900			return Ok(None);
3901		},
3902	};
3903	metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3904
3905	let candidate_entries = candidates_hashes
3906		.iter()
3907		.map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3908		.collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3909
3910	for mut candidate_entry in candidate_entries {
3911		let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3912			candidate_entry.approval_entry_mut(&block_entry.block_hash())
3913		});
3914
3915		match approval_entry {
3916			Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3917				signature: signature.clone(),
3918				signed_candidates_indices: candidates_indices.clone(),
3919			}),
3920			None => {
3921				gum::error!(
3922					target: LOG_TARGET,
3923					candidate_entry = ?candidate_entry,
3924					"Candidate scheduled for signing approval entry should not be None"
3925				);
3926			},
3927		};
3928		candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3929	}
3930
3931	metrics.on_approval_produced();
3932
3933	approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3934		IndirectSignedApprovalVoteV2 {
3935			block_hash: block_entry.block_hash(),
3936			candidate_indices: candidates_indices,
3937			validator: validator_index,
3938			signature,
3939		},
3940	));
3941
3942	gum::trace!(
3943		target: LOG_TARGET,
3944		?block_hash,
3945		signed_candidates = ?block_entry.num_candidates_pending_signature(),
3946		"Issue approval votes",
3947	);
3948	block_entry.issued_approval();
3949	db.write_block_entry(block_entry.into());
3950	Ok(None)
3951}
3952
3953// Sign an approval vote. Fails if the key isn't present in the store.
3954fn sign_approval(
3955	keystore: &LocalKeystore,
3956	public: &ValidatorId,
3957	candidate_hashes: &[CandidateHash],
3958	session_index: SessionIndex,
3959) -> Option<ValidatorSignature> {
3960	let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
3961
3962	let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
3963
3964	Some(key.sign(&payload[..]))
3965}
3966
3967/// Send `IssueLocalStatement` to dispute-coordinator.
3968fn issue_local_invalid_statement<Sender>(
3969	sender: &mut Sender,
3970	session_index: SessionIndex,
3971	candidate_hash: CandidateHash,
3972	candidate: CandidateReceipt,
3973) where
3974	Sender: SubsystemSender<DisputeCoordinatorMessage>,
3975{
3976	// We need to send an unbounded message here to break a cycle:
3977	// DisputeCoordinatorMessage::IssueLocalStatement ->
3978	// ApprovalVotingMessage::GetApprovalSignaturesForCandidate.
3979	//
3980	// Use of unbounded _should_ be fine here as raising a dispute should be an
3981	// exceptional event. Even in case of bugs: There can be no more than
3982	// number of slots per block requests every block. Also for sending this
3983	// message a full recovery and validation procedure took place, which takes
3984	// longer than issuing a local statement + import.
3985	sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
3986		session_index,
3987		candidate_hash,
3988		candidate.clone(),
3989		false,
3990	));
3991}
3992
3993// Computes what is the latest tick we can send an approval
3994fn compute_delayed_approval_sending_tick(
3995	state: &State,
3996	block_entry: &BlockEntry,
3997	candidate_entry: &CandidateEntry,
3998	session_info: &SessionInfo,
3999	metrics: &Metrics,
4000) -> Tick {
4001	let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4002	let assignment_tranche = candidate_entry
4003		.approval_entry(&block_entry.block_hash())
4004		.and_then(|approval_entry| approval_entry.our_assignment())
4005		.map(|our_assignment| our_assignment.tranche())
4006		.unwrap_or_default();
4007
4008	let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4009
4010	let no_show_duration_ticks = slot_number_to_tick(
4011		state.slot_duration_millis,
4012		Slot::from(u64::from(session_info.no_show_slots)),
4013	);
4014	let tick_now = state.clock.tick_now();
4015
4016	let sign_no_later_than = min(
4017		tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4018		// We don't want to accidentally cause no-shows, so if we are past
4019		// the second half of the no show time, force the sending of the
4020		// approval immediately.
4021		assignment_triggered_tick + no_show_duration_ticks / 2,
4022	);
4023
4024	metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4025	sign_no_later_than
4026}