referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_approval_voting/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Approval Voting Subsystem.
18//!
19//! This subsystem is responsible for determining candidates to do approval checks
20//! on, performing those approval checks, and tracking the assignments and approvals
21//! of others. It uses this information to determine when candidates and blocks have
22//! been sufficiently approved to finalize.
23
24use futures_timer::Delay;
25use polkadot_node_primitives::{
26	approval::{
27		v1::{BlockApprovalMeta, DelayTranche},
28		v2::{
29			AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30			IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31		},
32	},
33	ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36	errors::RecoveryError,
37	messages::{
38		ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39		ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40		AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41		ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42		DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43		RuntimeApiRequest,
44	},
45	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46	SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49	self,
50	database::Database,
51	metrics::{self, prometheus},
52	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53	TimeoutExt,
54};
55use polkadot_primitives::{
56	ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57	CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex,
58	Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair,
59	ValidatorSignature,
60};
61use sc_keystore::LocalKeystore;
62use sp_application_crypto::Pair;
63use sp_consensus::SyncOracle;
64use sp_consensus_slots::Slot;
65use std::time::Instant;
66
67// The max number of blocks we keep track of assignments gathering times. Normally,
68// this would never be reached because we prune the data on finalization, but we need
69// to also ensure the data is not growing unecessarily large.
70const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
71
72use futures::{
73	channel::oneshot,
74	future::{BoxFuture, RemoteHandle},
75	prelude::*,
76	stream::FuturesUnordered,
77	StreamExt,
78};
79
80use std::{
81	cmp::min,
82	collections::{
83		btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
84	},
85	sync::Arc,
86	time::Duration,
87};
88
89use schnellru::{ByLength, LruMap};
90
91use approval_checking::RequiredTranches;
92use bitvec::{order::Lsb0, vec::BitVec};
93pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
94use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
95use polkadot_node_primitives::approval::time::{
96	slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
97};
98
99mod approval_checking;
100pub mod approval_db;
101mod backend;
102pub mod criteria;
103mod import;
104mod ops;
105mod persisted_entries;
106
107use crate::{
108	approval_checking::{Check, TranchesToApproveResult},
109	approval_db::common::{Config as DatabaseConfig, DbBackend},
110	backend::{Backend, OverlayedBackend},
111	criteria::InvalidAssignmentReason,
112	persisted_entries::OurApproval,
113};
114
115#[cfg(test)]
116mod tests;
117
118const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
119/// How long are we willing to wait for approval signatures?
120///
121/// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
122/// lock issues for example.
123const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
124const APPROVAL_CACHE_SIZE: u32 = 1024;
125
126/// The maximum number of times we retry to approve a block if is still needed.
127const MAX_APPROVAL_RETRIES: u32 = 16;
128
129const APPROVAL_DELAY: Tick = 2;
130pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
131
132// The max number of ticks we delay sending the approval after we are ready to issue the approval
133const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
134
135// If the node restarted and the tranche has passed without the assignment
136// being trigger, we won't trigger the assignment at restart because we don't have
137// an wakeup schedule for it.
138// The solution, is to always schedule a wake up after the restart and let the
139// process_wakeup to decide if the assignment needs to be triggered.
140// We need to have a delay after restart to give time to the node to catch up with
141// messages and not trigger its assignment unnecessarily, because it hasn't seen
142// the assignments from the other validators.
143const RESTART_WAKEUP_DELAY: Tick = 12;
144
145/// Configuration for the approval voting subsystem
146#[derive(Debug, Clone)]
147pub struct Config {
148	/// The column family in the DB where approval-voting data is stored.
149	pub col_approval_data: u32,
150	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
151	/// divisible by 500.
152	pub slot_duration_millis: u64,
153}
154
155// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
156// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
157//
158// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
159// the node follows the new incoming blocks and finalized number, but does not yet participate.
160//
161// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
162// subsystem of all unfinalized blocks and the candidates included within them, as well as all
163// votes that the local node itself has cast on candidates within those blocks.
164enum Mode {
165	Active,
166	Syncing(Box<dyn SyncOracle + Send>),
167}
168
169/// The approval voting subsystem.
170pub struct ApprovalVotingSubsystem {
171	/// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
172	///
173	/// We do a lot of VRF signing and need the keys to have low latency.
174	keystore: Arc<LocalKeystore>,
175	db_config: DatabaseConfig,
176	slot_duration_millis: u64,
177	db: Arc<dyn Database>,
178	mode: Mode,
179	metrics: Metrics,
180	clock: Arc<dyn Clock + Send + Sync>,
181	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
182	/// The maximum time we retry to approve a block if it is still needed and PoV fetch failed.
183	max_approval_retries: u32,
184	/// The backoff before we retry the approval.
185	retry_backoff: Duration,
186}
187
188#[derive(Clone)]
189struct MetricsInner {
190	imported_candidates_total: prometheus::Counter<prometheus::U64>,
191	assignments_produced: prometheus::Histogram,
192	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
193	no_shows_total: prometheus::Counter<prometheus::U64>,
194	// The difference from `no_shows_total` is that this counts all observed no-shows at any
195	// moment in time. While `no_shows_total` catches that the no-shows at the moment the candidate
196	// is approved, approvals might arrive late and `no_shows_total` wouldn't catch that number.
197	observed_no_shows: prometheus::Counter<prometheus::U64>,
198	approved_by_one_third: prometheus::Counter<prometheus::U64>,
199	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
200	coalesced_approvals_buckets: prometheus::Histogram,
201	coalesced_approvals_delay: prometheus::Histogram,
202	candidate_approval_time_ticks: prometheus::Histogram,
203	block_approval_time_ticks: prometheus::Histogram,
204	time_db_transaction: prometheus::Histogram,
205	time_recover_and_approve: prometheus::Histogram,
206	candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
207	unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
208	// The time it takes in each stage to gather enough assignments.
209	// We defined a `stage` as being the entire process of gathering enough assignments to
210	// be able to approve a candidate:
211	// E.g:
212	// - Stage 0: We wait for the needed_approvals assignments to be gathered.
213	// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
214	// - Stage 2: We wait for enough tranches to cover all no-shows  of stage 1.
215	assignments_gathering_time_by_stage: prometheus::HistogramVec,
216}
217
218/// Approval Voting metrics.
219#[derive(Default, Clone)]
220pub struct Metrics(Option<MetricsInner>);
221
222impl Metrics {
223	fn on_candidate_imported(&self) {
224		if let Some(metrics) = &self.0 {
225			metrics.imported_candidates_total.inc();
226		}
227	}
228
229	fn on_assignment_produced(&self, tranche: DelayTranche) {
230		if let Some(metrics) = &self.0 {
231			metrics.assignments_produced.observe(tranche as f64);
232		}
233	}
234
235	fn on_approval_coalesce(&self, num_coalesced: u32) {
236		if let Some(metrics) = &self.0 {
237			// Count how many candidates we covered with this coalesced approvals,
238			// so that the heat-map really gives a good understanding of the scales.
239			for _ in 0..num_coalesced {
240				metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
241			}
242		}
243	}
244
245	fn on_delayed_approval(&self, delayed_ticks: u64) {
246		if let Some(metrics) = &self.0 {
247			metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
248		}
249	}
250
251	fn on_approval_stale(&self) {
252		if let Some(metrics) = &self.0 {
253			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
254		}
255	}
256
257	fn on_approval_invalid(&self) {
258		if let Some(metrics) = &self.0 {
259			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
260		}
261	}
262
263	fn on_approval_unavailable(&self) {
264		if let Some(metrics) = &self.0 {
265			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
266		}
267	}
268
269	fn on_approval_error(&self) {
270		if let Some(metrics) = &self.0 {
271			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
272		}
273	}
274
275	fn on_approval_produced(&self) {
276		if let Some(metrics) = &self.0 {
277			metrics.approvals_produced_total.with_label_values(&["success"]).inc()
278		}
279	}
280
281	fn on_no_shows(&self, n: usize) {
282		if let Some(metrics) = &self.0 {
283			metrics.no_shows_total.inc_by(n as u64);
284		}
285	}
286
287	fn on_observed_no_shows(&self, n: usize) {
288		if let Some(metrics) = &self.0 {
289			metrics.observed_no_shows.inc_by(n as u64);
290		}
291	}
292
293	fn on_approved_by_one_third(&self) {
294		if let Some(metrics) = &self.0 {
295			metrics.approved_by_one_third.inc();
296		}
297	}
298
299	fn on_wakeup(&self) {
300		if let Some(metrics) = &self.0 {
301			metrics.wakeups_triggered_total.inc();
302		}
303	}
304
305	fn on_candidate_approved(&self, ticks: Tick) {
306		if let Some(metrics) = &self.0 {
307			metrics.candidate_approval_time_ticks.observe(ticks as f64);
308		}
309	}
310
311	fn on_block_approved(&self, ticks: Tick) {
312		if let Some(metrics) = &self.0 {
313			metrics.block_approval_time_ticks.observe(ticks as f64);
314		}
315	}
316
317	fn on_candidate_signatures_request(&self) {
318		if let Some(metrics) = &self.0 {
319			metrics.candidate_signatures_requests_total.inc();
320		}
321	}
322
323	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
324		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
325	}
326
327	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
328		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
329	}
330
331	fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
332		if let Some(metrics) = &self.0 {
333			metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
334		}
335	}
336
337	pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
338		if let Some(metrics) = &self.0 {
339			let stage_string = stage.to_string();
340			// We don't want to have too many metrics entries with this label to not put unncessary
341			// pressure on the metrics infrastructure, so we cap the stage at 10, which is
342			// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
343			// be more than enough.
344			metrics
345				.assignments_gathering_time_by_stage
346				.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
347				.observe(elapsed_as_millis as f64);
348		}
349	}
350}
351
352impl metrics::Metrics for Metrics {
353	fn try_register(
354		registry: &prometheus::Registry,
355	) -> std::result::Result<Self, prometheus::PrometheusError> {
356		let metrics = MetricsInner {
357			imported_candidates_total: prometheus::register(
358				prometheus::Counter::new(
359					"polkadot_parachain_imported_candidates_total",
360					"Number of candidates imported by the approval voting subsystem",
361				)?,
362				registry,
363			)?,
364			assignments_produced: prometheus::register(
365				prometheus::Histogram::with_opts(
366					prometheus::HistogramOpts::new(
367						"polkadot_parachain_assignments_produced",
368						"Assignments and tranches produced by the approval voting subsystem",
369					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
370				)?,
371				registry,
372			)?,
373			approvals_produced_total: prometheus::register(
374				prometheus::CounterVec::new(
375					prometheus::Opts::new(
376						"polkadot_parachain_approvals_produced_total",
377						"Number of approvals produced by the approval voting subsystem",
378					),
379					&["status"]
380				)?,
381				registry,
382			)?,
383			no_shows_total: prometheus::register(
384				prometheus::Counter::new(
385					"polkadot_parachain_approvals_no_shows_total",
386					"Number of assignments which became no-shows in the approval voting subsystem",
387				)?,
388				registry,
389			)?,
390			observed_no_shows: prometheus::register(
391				prometheus::Counter::new(
392					"polkadot_parachain_approvals_observed_no_shows_total",
393					"Number of observed no shows at any moment in time",
394				)?,
395				registry,
396			)?,
397			wakeups_triggered_total: prometheus::register(
398				prometheus::Counter::new(
399					"polkadot_parachain_approvals_wakeups_total",
400					"Number of times we woke up to process a candidate in the approval voting subsystem",
401				)?,
402				registry,
403			)?,
404			candidate_approval_time_ticks: prometheus::register(
405				prometheus::Histogram::with_opts(
406					prometheus::HistogramOpts::new(
407						"polkadot_parachain_approvals_candidate_approval_time_ticks",
408						"Number of ticks (500ms) to approve candidates.",
409					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
410				)?,
411				registry,
412			)?,
413			coalesced_approvals_buckets: prometheus::register(
414				prometheus::Histogram::with_opts(
415					prometheus::HistogramOpts::new(
416						"polkadot_parachain_approvals_coalesced_approvals_buckets",
417						"Number of coalesced approvals.",
418					).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
419				)?,
420				registry,
421			)?,
422			coalesced_approvals_delay: prometheus::register(
423				prometheus::Histogram::with_opts(
424					prometheus::HistogramOpts::new(
425						"polkadot_parachain_approvals_coalescing_delay",
426						"Number of ticks we delay the sending of a candidate approval",
427					).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
428				)?,
429				registry,
430			)?,
431			approved_by_one_third: prometheus::register(
432				prometheus::Counter::new(
433					"polkadot_parachain_approved_by_one_third",
434					"Number of candidates where more than one third had to vote ",
435				)?,
436				registry,
437			)?,
438			block_approval_time_ticks: prometheus::register(
439				prometheus::Histogram::with_opts(
440					prometheus::HistogramOpts::new(
441						"polkadot_parachain_approvals_blockapproval_time_ticks",
442						"Number of ticks (500ms) to approve blocks.",
443					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
444				)?,
445				registry,
446			)?,
447			time_db_transaction: prometheus::register(
448				prometheus::Histogram::with_opts(
449					prometheus::HistogramOpts::new(
450						"polkadot_parachain_time_approval_db_transaction",
451						"Time spent writing an approval db transaction.",
452					)
453				)?,
454				registry,
455			)?,
456			time_recover_and_approve: prometheus::register(
457				prometheus::Histogram::with_opts(
458					prometheus::HistogramOpts::new(
459						"polkadot_parachain_time_recover_and_approve",
460						"Time spent recovering and approving data in approval voting",
461					)
462				)?,
463				registry,
464			)?,
465			candidate_signatures_requests_total: prometheus::register(
466				prometheus::Counter::new(
467					"polkadot_parachain_approval_candidate_signatures_requests_total",
468					"Number of times signatures got requested by other subsystems",
469				)?,
470				registry,
471			)?,
472			unapproved_candidates_in_unfinalized_chain: prometheus::register(
473				prometheus::Gauge::new(
474					"polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
475					"Number of unapproved candidates in unfinalized chain",
476				)?,
477				registry,
478			)?,
479			assignments_gathering_time_by_stage: prometheus::register(
480				prometheus::HistogramVec::new(
481					prometheus::HistogramOpts::new(
482						"polkadot_parachain_assignments_gather_time_by_stage_ms",
483						"The time in ms it takes for each stage to gather enough assignments needed for approval",
484					)
485					.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
486					&["stage"],
487				)?,
488				registry,
489			)?,
490		};
491
492		Ok(Metrics(Some(metrics)))
493	}
494}
495
496impl ApprovalVotingSubsystem {
497	/// Create a new approval voting subsystem with the given keystore, config, and database.
498	pub fn with_config(
499		config: Config,
500		db: Arc<dyn Database>,
501		keystore: Arc<LocalKeystore>,
502		sync_oracle: Box<dyn SyncOracle + Send>,
503		metrics: Metrics,
504		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
505	) -> Self {
506		ApprovalVotingSubsystem::with_config_and_clock(
507			config,
508			db,
509			keystore,
510			sync_oracle,
511			metrics,
512			Arc::new(SystemClock {}),
513			spawner,
514			MAX_APPROVAL_RETRIES,
515			APPROVAL_CHECKING_TIMEOUT / 2,
516		)
517	}
518
519	/// Create a new approval voting subsystem with the given keystore, config, and database.
520	pub fn with_config_and_clock(
521		config: Config,
522		db: Arc<dyn Database>,
523		keystore: Arc<LocalKeystore>,
524		sync_oracle: Box<dyn SyncOracle + Send>,
525		metrics: Metrics,
526		clock: Arc<dyn Clock + Send + Sync>,
527		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
528		max_approval_retries: u32,
529		retry_backoff: Duration,
530	) -> Self {
531		ApprovalVotingSubsystem {
532			keystore,
533			slot_duration_millis: config.slot_duration_millis,
534			db,
535			db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
536			mode: Mode::Syncing(sync_oracle),
537			metrics,
538			clock,
539			spawner,
540			max_approval_retries,
541			retry_backoff,
542		}
543	}
544
545	/// Revert to the block corresponding to the specified `hash`.
546	/// The operation is not allowed for blocks older than the last finalized one.
547	pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
548		let config =
549			approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
550		let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
551		let mut overlay = OverlayedBackend::new(&backend);
552
553		ops::revert_to(&mut overlay, hash)?;
554
555		let ops = overlay.into_write_ops();
556		backend.write(ops)
557	}
558}
559
560// Checks and logs approval vote db state. It is perfectly normal to start with an
561// empty approval vote DB if we changed DB type or the node will sync from scratch.
562fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
563	let backend = DbBackend::new(db, config);
564	let all_blocks = backend.load_all_blocks()?;
565
566	if all_blocks.is_empty() {
567		gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
568	} else {
569		gum::debug!(
570			target: LOG_TARGET,
571			"Starting with {} blocks in approval vote DB.",
572			all_blocks.len()
573		);
574	}
575
576	Ok(())
577}
578
579#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
580impl<Context: Send> ApprovalVotingSubsystem {
581	fn start(self, mut ctx: Context) -> SpawnedSubsystem {
582		let backend = DbBackend::new(self.db.clone(), self.db_config);
583		let to_other_subsystems = ctx.sender().clone();
584		let to_approval_distr = ctx.sender().clone();
585
586		let future = run::<DbBackend, _, _, _>(
587			ctx,
588			to_other_subsystems,
589			to_approval_distr,
590			self,
591			Box::new(RealAssignmentCriteria),
592			backend,
593		)
594		.map_err(|e| SubsystemError::with_origin("approval-voting", e))
595		.boxed();
596
597		SpawnedSubsystem { name: "approval-voting-subsystem", future }
598	}
599}
600
601#[derive(Debug, Clone)]
602struct ApprovalVoteRequest {
603	validator_index: ValidatorIndex,
604	block_hash: Hash,
605}
606
607#[derive(Default)]
608struct Wakeups {
609	// Tick -> [(Relay Block, Candidate Hash)]
610	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
611	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
612	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
613}
614
615impl Wakeups {
616	// Returns the first tick there exist wakeups for, if any.
617	fn first(&self) -> Option<Tick> {
618		self.wakeups.keys().next().map(|t| *t)
619	}
620
621	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
622		self.block_numbers.entry(block_number).or_default().insert(block_hash);
623	}
624
625	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
626	// for these values. replaces any later wakeup.
627	fn schedule(
628		&mut self,
629		block_hash: Hash,
630		block_number: BlockNumber,
631		candidate_hash: CandidateHash,
632		tick: Tick,
633	) {
634		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
635			if prev <= &tick {
636				return
637			}
638
639			// we are replacing previous wakeup with an earlier one.
640			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
641				if let Some(pos) =
642					entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
643				{
644					entry.get_mut().remove(pos);
645				}
646
647				if entry.get().is_empty() {
648					let _ = entry.remove_entry();
649				}
650			}
651		} else {
652			self.note_block(block_hash, block_number);
653		}
654
655		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
656		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
657	}
658
659	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
660		let after = self.block_numbers.split_off(&(finalized_number + 1));
661		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
662			.into_iter()
663			.flat_map(|(_number, hashes)| hashes)
664			.collect();
665
666		let mut pruned_wakeups = BTreeMap::new();
667		self.reverse_wakeups.retain(|(h, c_h), tick| {
668			let live = !pruned_blocks.contains(h);
669			if !live {
670				pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
671			}
672			live
673		});
674
675		for (tick, pruned) in pruned_wakeups {
676			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
677				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
678				if entry.get().is_empty() {
679					let _ = entry.remove();
680				}
681			}
682		}
683	}
684
685	// Get the wakeup for a particular block/candidate combo, if any.
686	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
687		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
688	}
689
690	// Returns the next wakeup. this future never returns if there are no wakeups.
691	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
692		match self.first() {
693			None => future::pending().await,
694			Some(tick) => {
695				clock.wait(tick).await;
696				match self.wakeups.entry(tick) {
697					BTMEntry::Vacant(_) => {
698						panic!("entry is known to exist since `first` was `Some`; qed")
699					},
700					BTMEntry::Occupied(mut entry) => {
701						let (hash, candidate_hash) = entry.get_mut().pop()
702							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
703
704						if entry.get().is_empty() {
705							let _ = entry.remove();
706						}
707
708						self.reverse_wakeups.remove(&(hash, candidate_hash));
709
710						(tick, hash, candidate_hash)
711					},
712				}
713			},
714		}
715	}
716}
717
718struct ApprovalStatus {
719	required_tranches: RequiredTranches,
720	tranche_now: DelayTranche,
721	block_tick: Tick,
722	last_no_shows: usize,
723	no_show_validators: Vec<ValidatorIndex>,
724}
725
726#[derive(Copy, Clone)]
727enum ApprovalOutcome {
728	Approved,
729	Failed,
730	TimedOut,
731}
732
733#[derive(Clone)]
734struct RetryApprovalInfo {
735	candidate: CandidateReceipt,
736	backing_group: GroupIndex,
737	executor_params: ExecutorParams,
738	core_index: Option<CoreIndex>,
739	session_index: SessionIndex,
740	attempts_remaining: u32,
741	backoff: Duration,
742}
743
744struct ApprovalState {
745	validator_index: ValidatorIndex,
746	candidate_hash: CandidateHash,
747	approval_outcome: ApprovalOutcome,
748	retry_info: Option<RetryApprovalInfo>,
749}
750
751impl ApprovalState {
752	fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
753		Self {
754			validator_index,
755			candidate_hash,
756			approval_outcome: ApprovalOutcome::Approved,
757			retry_info: None,
758		}
759	}
760	fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
761		Self {
762			validator_index,
763			candidate_hash,
764			approval_outcome: ApprovalOutcome::Failed,
765			retry_info: None,
766		}
767	}
768
769	fn failed_with_retry(
770		validator_index: ValidatorIndex,
771		candidate_hash: CandidateHash,
772		retry_info: Option<RetryApprovalInfo>,
773	) -> Self {
774		Self {
775			validator_index,
776			candidate_hash,
777			approval_outcome: ApprovalOutcome::Failed,
778			retry_info,
779		}
780	}
781}
782
783struct CurrentlyCheckingSet {
784	candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
785	currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
786}
787
788impl Default for CurrentlyCheckingSet {
789	fn default() -> Self {
790		Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
791	}
792}
793
794impl CurrentlyCheckingSet {
795	// This function will lazily launch approval voting work whenever the
796	// candidate is not already undergoing validation.
797	pub async fn insert_relay_block_hash(
798		&mut self,
799		candidate_hash: CandidateHash,
800		validator_index: ValidatorIndex,
801		relay_block: Hash,
802		launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
803	) -> SubsystemResult<()> {
804		match self.candidate_hash_map.entry(candidate_hash) {
805			HMEntry::Occupied(mut entry) => {
806				// validation already undergoing. just add the relay hash if unknown.
807				entry.get_mut().insert(relay_block);
808			},
809			HMEntry::Vacant(entry) => {
810				// validation not ongoing. launch work and time out the remote handle.
811				entry.insert(HashSet::new()).insert(relay_block);
812				let work = launch_work.await?;
813				self.currently_checking.push(Box::pin(async move {
814					match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
815						None => ApprovalState {
816							candidate_hash,
817							validator_index,
818							approval_outcome: ApprovalOutcome::TimedOut,
819							retry_info: None,
820						},
821						Some(approval_state) => approval_state,
822					}
823				}));
824			},
825		}
826
827		Ok(())
828	}
829
830	pub async fn next(
831		&mut self,
832		approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
833	) -> (HashSet<Hash>, ApprovalState) {
834		if !self.currently_checking.is_empty() {
835			if let Some(approval_state) = self.currently_checking.next().await {
836				let out = self
837					.candidate_hash_map
838					.remove(&approval_state.candidate_hash)
839					.unwrap_or_default();
840				approvals_cache
841					.insert(approval_state.candidate_hash, approval_state.approval_outcome);
842				return (out, approval_state)
843			}
844		}
845
846		future::pending().await
847	}
848}
849
850async fn get_extended_session_info<'a, Sender>(
851	runtime_info: &'a mut RuntimeInfo,
852	sender: &mut Sender,
853	relay_parent: Hash,
854) -> Option<&'a ExtendedSessionInfo>
855where
856	Sender: SubsystemSender<RuntimeApiMessage>,
857{
858	match runtime_info.get_session_info(sender, relay_parent).await {
859		Ok(extended_info) => Some(&extended_info),
860		Err(_) => {
861			gum::debug!(
862				target: LOG_TARGET,
863				?relay_parent,
864				"Can't obtain SessionInfo or ExecutorParams"
865			);
866			None
867		},
868	}
869}
870
871async fn get_extended_session_info_by_index<'a, Sender>(
872	runtime_info: &'a mut RuntimeInfo,
873	sender: &mut Sender,
874	relay_parent: Hash,
875	session_index: SessionIndex,
876) -> Option<&'a ExtendedSessionInfo>
877where
878	Sender: SubsystemSender<RuntimeApiMessage>,
879{
880	match runtime_info
881		.get_session_info_by_index(sender, relay_parent, session_index)
882		.await
883	{
884		Ok(extended_info) => Some(&extended_info),
885		Err(_) => {
886			gum::debug!(
887				target: LOG_TARGET,
888				session = session_index,
889				?relay_parent,
890				"Can't obtain SessionInfo or ExecutorParams"
891			);
892			None
893		},
894	}
895}
896
897async fn get_session_info_by_index<'a, Sender>(
898	runtime_info: &'a mut RuntimeInfo,
899	sender: &mut Sender,
900	relay_parent: Hash,
901	session_index: SessionIndex,
902) -> Option<&'a SessionInfo>
903where
904	Sender: SubsystemSender<RuntimeApiMessage>,
905{
906	get_extended_session_info_by_index(runtime_info, sender, relay_parent, session_index)
907		.await
908		.map(|extended_info| &extended_info.session_info)
909}
910
911struct State {
912	keystore: Arc<LocalKeystore>,
913	slot_duration_millis: u64,
914	clock: Arc<dyn Clock + Send + Sync>,
915	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
916	// Per block, candidate records about how long we take until we gather enough
917	// assignments, this is relevant because it gives us a good idea about how many
918	// tranches we trigger and why.
919	per_block_assignments_gathering_times:
920		LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
921	no_show_stats: NoShowStats,
922}
923
924// Regularly dump the no-show stats at this block number frequency.
925const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
926// The maximum number of validators we record no-shows for, per candidate.
927pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
928
929// No show stats per validator and per parachain.
930// This is valuable information when we have to debug live network issue, because
931// it gives information if things are going wrong only for some validators or just
932// for some parachains.
933#[derive(Debug, Clone, PartialEq, Eq, Default)]
934struct NoShowStats {
935	per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
936	per_parachain_no_show: HashMap<u32, usize>,
937	last_dumped_block_number: BlockNumber,
938}
939
940impl NoShowStats {
941	// Print the no-show stats if NO_SHOW_DUMP_FREQUENCY blocks have passed since the last
942	// print.
943	fn maybe_print(&mut self, current_block_number: BlockNumber) {
944		if self.last_dumped_block_number > current_block_number ||
945			current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
946		{
947			return
948		}
949		if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
950			return
951		}
952
953		gum::debug!(
954			target: LOG_TARGET,
955			"Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
956			self.per_validator_no_show,
957			self.per_parachain_no_show,
958			self.last_dumped_block_number
959		);
960
961		self.last_dumped_block_number = current_block_number;
962
963		self.per_validator_no_show.clear();
964		self.per_parachain_no_show.clear();
965	}
966}
967
968#[derive(Debug, Clone, PartialEq, Eq)]
969struct AssignmentGatheringRecord {
970	// The stage we are in.
971	// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
972	// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
973	// no-shows.
974	stage: usize,
975	// The time we started the stage.
976	stage_start: Option<Instant>,
977}
978
979impl Default for AssignmentGatheringRecord {
980	fn default() -> Self {
981		AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
982	}
983}
984
985#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
986impl State {
987	// Compute the required tranches for approval for this block and candidate combo.
988	// Fails if there is no approval entry for the block under the candidate or no candidate entry
989	// under the block, or if the session is out of bounds.
990	async fn approval_status<Sender, 'a, 'b>(
991		&'a self,
992		sender: &mut Sender,
993		session_info_provider: &'a mut RuntimeInfo,
994		block_entry: &'a BlockEntry,
995		candidate_entry: &'b CandidateEntry,
996	) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
997	where
998		Sender: SubsystemSender<RuntimeApiMessage>,
999	{
1000		let session_info = match get_session_info_by_index(
1001			session_info_provider,
1002			sender,
1003			block_entry.parent_hash(),
1004			block_entry.session(),
1005		)
1006		.await
1007		{
1008			Some(s) => s,
1009			None => return None,
1010		};
1011		let block_hash = block_entry.block_hash();
1012
1013		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
1014		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
1015		let no_show_duration = slot_number_to_tick(
1016			self.slot_duration_millis,
1017			Slot::from(u64::from(session_info.no_show_slots)),
1018		);
1019
1020		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
1021			let TranchesToApproveResult {
1022				required_tranches,
1023				total_observed_no_shows,
1024				no_show_validators,
1025			} = approval_checking::tranches_to_approve(
1026				approval_entry,
1027				candidate_entry.approvals(),
1028				tranche_now,
1029				block_tick,
1030				no_show_duration,
1031				session_info.needed_approvals as _,
1032			);
1033
1034			let status = ApprovalStatus {
1035				required_tranches,
1036				block_tick,
1037				tranche_now,
1038				last_no_shows: total_observed_no_shows,
1039				no_show_validators,
1040			};
1041
1042			Some((approval_entry, status))
1043		} else {
1044			None
1045		}
1046	}
1047
1048	// Returns the approval voting params from the RuntimeApi.
1049	async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1050		&self,
1051		sender: &mut Sender,
1052		session_index: SessionIndex,
1053		block_hash: Hash,
1054	) -> Option<ApprovalVotingParams> {
1055		let (s_tx, s_rx) = oneshot::channel();
1056
1057		sender
1058			.send_message(RuntimeApiMessage::Request(
1059				block_hash,
1060				RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1061			))
1062			.await;
1063
1064		match s_rx.await {
1065			Ok(Ok(params)) => {
1066				gum::trace!(
1067					target: LOG_TARGET,
1068					approval_voting_params = ?params,
1069					session = ?session_index,
1070					"Using the following subsystem params"
1071				);
1072				Some(params)
1073			},
1074			Ok(Err(err)) => {
1075				gum::debug!(
1076					target: LOG_TARGET,
1077					?err,
1078					"Could not request approval voting params from runtime"
1079				);
1080				None
1081			},
1082			Err(err) => {
1083				gum::debug!(
1084					target: LOG_TARGET,
1085					?err,
1086					"Could not request approval voting params from runtime"
1087				);
1088				None
1089			},
1090		}
1091	}
1092
1093	fn mark_begining_of_gathering_assignments(
1094		&mut self,
1095		block_number: BlockNumber,
1096		block_hash: Hash,
1097		candidate: CandidateHash,
1098	) {
1099		if let Some(record) = self
1100			.per_block_assignments_gathering_times
1101			.get_or_insert(block_number, HashMap::new)
1102			.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1103		{
1104			if record.stage_start.is_none() {
1105				record.stage += 1;
1106				gum::debug!(
1107					target: LOG_TARGET,
1108					stage = ?record.stage,
1109					?block_hash,
1110					?candidate,
1111					"Started a new assignment gathering stage",
1112				);
1113				record.stage_start = Some(Instant::now());
1114			}
1115		}
1116	}
1117
1118	fn mark_gathered_enough_assignments(
1119		&mut self,
1120		block_number: BlockNumber,
1121		block_hash: Hash,
1122		candidate: CandidateHash,
1123	) -> AssignmentGatheringRecord {
1124		let record = self
1125			.per_block_assignments_gathering_times
1126			.get(&block_number)
1127			.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1128		let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1129		AssignmentGatheringRecord {
1130			stage,
1131			stage_start: record.and_then(|record| record.stage_start.take()),
1132		}
1133	}
1134
1135	fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1136		while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1137		{
1138			if *block_number < remove_lower_than {
1139				self.per_block_assignments_gathering_times.pop_oldest();
1140			} else {
1141				break
1142			}
1143		}
1144	}
1145
1146	fn observe_assignment_gathering_status(
1147		&mut self,
1148		metrics: &Metrics,
1149		required_tranches: &RequiredTranches,
1150		block_hash: Hash,
1151		block_number: BlockNumber,
1152		candidate_hash: CandidateHash,
1153	) {
1154		match required_tranches {
1155			RequiredTranches::All | RequiredTranches::Pending { .. } => {
1156				self.mark_begining_of_gathering_assignments(
1157					block_number,
1158					block_hash,
1159					candidate_hash,
1160				);
1161			},
1162			RequiredTranches::Exact { .. } => {
1163				let time_to_gather =
1164					self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1165				if let Some(gathering_started) = time_to_gather.stage_start {
1166					if gathering_started.elapsed().as_millis() > 6000 {
1167						gum::trace!(
1168							target: LOG_TARGET,
1169							?block_hash,
1170							?candidate_hash,
1171							"Long assignment gathering time",
1172						);
1173					}
1174					metrics.observe_assignment_gathering_time(
1175						time_to_gather.stage,
1176						gathering_started.elapsed().as_millis() as usize,
1177					)
1178				}
1179			},
1180		}
1181	}
1182
1183	fn record_no_shows(
1184		&mut self,
1185		session_index: SessionIndex,
1186		para_id: u32,
1187		no_show_validators: &Vec<ValidatorIndex>,
1188	) {
1189		if !no_show_validators.is_empty() {
1190			*self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1191		}
1192		for validator_index in no_show_validators {
1193			*self
1194				.no_show_stats
1195				.per_validator_no_show
1196				.entry(session_index)
1197				.or_default()
1198				.entry(*validator_index)
1199				.or_default() += 1;
1200		}
1201	}
1202}
1203
1204#[derive(Debug, Clone)]
1205enum Action {
1206	ScheduleWakeup {
1207		block_hash: Hash,
1208		block_number: BlockNumber,
1209		candidate_hash: CandidateHash,
1210		tick: Tick,
1211	},
1212	LaunchApproval {
1213		claimed_candidate_indices: CandidateBitfield,
1214		candidate_hash: CandidateHash,
1215		indirect_cert: IndirectAssignmentCertV2,
1216		assignment_tranche: DelayTranche,
1217		relay_block_hash: Hash,
1218		session: SessionIndex,
1219		executor_params: ExecutorParams,
1220		candidate: CandidateReceipt,
1221		backing_group: GroupIndex,
1222		distribute_assignment: bool,
1223		core_index: Option<CoreIndex>,
1224	},
1225	NoteApprovedInChainSelection(Hash),
1226	IssueApproval(CandidateHash, ApprovalVoteRequest),
1227	BecomeActive,
1228	Conclude,
1229}
1230
1231/// Trait for providing approval voting subsystem with work.
1232#[async_trait::async_trait]
1233pub trait ApprovalVotingWorkProvider {
1234	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1235}
1236
1237#[async_trait::async_trait]
1238#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1239impl<Context> ApprovalVotingWorkProvider for Context {
1240	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1241		self.recv().await
1242	}
1243}
1244
1245#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1246async fn run<
1247	B,
1248	WorkProvider: ApprovalVotingWorkProvider,
1249	Sender: SubsystemSender<ChainApiMessage>
1250		+ SubsystemSender<RuntimeApiMessage>
1251		+ SubsystemSender<ChainSelectionMessage>
1252		+ SubsystemSender<AvailabilityRecoveryMessage>
1253		+ SubsystemSender<DisputeCoordinatorMessage>
1254		+ SubsystemSender<CandidateValidationMessage>
1255		+ Clone,
1256	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1257>(
1258	mut work_provider: WorkProvider,
1259	mut to_other_subsystems: Sender,
1260	mut to_approval_distr: ADSender,
1261	mut subsystem: ApprovalVotingSubsystem,
1262	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1263	mut backend: B,
1264) -> SubsystemResult<()>
1265where
1266	B: Backend,
1267{
1268	if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1269		gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1270	}
1271
1272	let mut state = State {
1273		keystore: subsystem.keystore,
1274		slot_duration_millis: subsystem.slot_duration_millis,
1275		clock: subsystem.clock,
1276		assignment_criteria,
1277		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1278			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1279		)),
1280		no_show_stats: NoShowStats::default(),
1281	};
1282
1283	let mut last_finalized_height: Option<BlockNumber> = {
1284		let (tx, rx) = oneshot::channel();
1285		to_other_subsystems
1286			.send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1287			.await;
1288		match rx.await? {
1289			Ok(number) => Some(number),
1290			Err(err) => {
1291				gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1292				None
1293			},
1294		}
1295	};
1296
1297	// `None` on start-up. Gets initialized/updated on leaf update
1298	let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1299		keystore: None,
1300		session_cache_lru_size: DISPUTE_WINDOW.get(),
1301	});
1302
1303	let mut wakeups = Wakeups::default();
1304	let mut currently_checking_set = CurrentlyCheckingSet::default();
1305	let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1306	let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1307
1308	loop {
1309		let mut overlayed_db = OverlayedBackend::new(&backend);
1310		let actions = futures::select! {
1311			(_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1312				subsystem.metrics.on_wakeup();
1313				process_wakeup(
1314					&mut to_other_subsystems,
1315					&mut state,
1316					&mut overlayed_db,
1317					&mut session_info_provider,
1318					woken_block,
1319					woken_candidate,
1320					&subsystem.metrics,
1321					&wakeups,
1322				).await?
1323			}
1324			next_msg = work_provider.recv().fuse() => {
1325				let mut actions = handle_from_overseer(
1326					&mut to_other_subsystems,
1327					&mut to_approval_distr,
1328					&subsystem.spawner,
1329					&mut state,
1330					&mut overlayed_db,
1331					&mut session_info_provider,
1332					&subsystem.metrics,
1333					next_msg?,
1334					&mut last_finalized_height,
1335					&mut wakeups,
1336				).await?;
1337
1338				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1339					if !oracle.is_major_syncing() {
1340						// note that we're active before processing other actions.
1341						actions.insert(0, Action::BecomeActive)
1342					}
1343				}
1344
1345				actions
1346			}
1347			approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1348				let mut actions = Vec::new();
1349				let (
1350					relay_block_hashes,
1351					ApprovalState {
1352						validator_index,
1353						candidate_hash,
1354						approval_outcome,
1355						retry_info,
1356					}
1357				) = approval_state;
1358
1359				if matches!(approval_outcome, ApprovalOutcome::Approved) {
1360					let mut approvals: Vec<Action> = relay_block_hashes
1361						.iter()
1362						.map(|block_hash|
1363							Action::IssueApproval(
1364								candidate_hash,
1365								ApprovalVoteRequest {
1366									validator_index,
1367									block_hash: *block_hash,
1368								},
1369							)
1370						)
1371						.collect();
1372					actions.append(&mut approvals);
1373				}
1374
1375				if let Some(retry_info) = retry_info {
1376					for block_hash in relay_block_hashes {
1377						if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1378							let sender = to_other_subsystems.clone();
1379							let spawn_handle = subsystem.spawner.clone();
1380							let metrics = subsystem.metrics.clone();
1381							let retry_info = retry_info.clone();
1382							let executor_params = retry_info.executor_params.clone();
1383							let candidate = retry_info.candidate.clone();
1384
1385							currently_checking_set
1386								.insert_relay_block_hash(
1387									candidate_hash,
1388									validator_index,
1389									block_hash,
1390									async move {
1391										launch_approval(
1392											sender,
1393											spawn_handle,
1394											metrics,
1395											retry_info.session_index,
1396											candidate,
1397											validator_index,
1398											block_hash,
1399											retry_info.backing_group,
1400											executor_params,
1401											retry_info.core_index,
1402											retry_info,
1403										)
1404										.await
1405									},
1406								)
1407								.await?;
1408						}
1409					}
1410				}
1411
1412				actions
1413			},
1414			(block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1415				gum::debug!(
1416					target: LOG_TARGET,
1417					?block_hash,
1418					?validator_index,
1419					"Sign approval for multiple candidates",
1420				);
1421
1422				match maybe_create_signature(
1423					&mut overlayed_db,
1424					&mut session_info_provider,
1425					&state,
1426					&mut to_other_subsystems,
1427					&mut to_approval_distr,
1428					block_hash,
1429					validator_index,
1430					&subsystem.metrics,
1431				).await {
1432					Ok(Some(next_wakeup)) => {
1433						delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1434					},
1435					Ok(None) => {}
1436					Err(err) => {
1437						gum::error!(
1438							target: LOG_TARGET,
1439							?err,
1440							"Failed to create signature",
1441						);
1442					}
1443				}
1444				vec![]
1445			}
1446		};
1447
1448		if handle_actions(
1449			&mut to_other_subsystems,
1450			&mut to_approval_distr,
1451			&subsystem.spawner,
1452			&mut state,
1453			&mut overlayed_db,
1454			&mut session_info_provider,
1455			&subsystem.metrics,
1456			&mut wakeups,
1457			&mut currently_checking_set,
1458			&mut delayed_approvals_timers,
1459			&mut approvals_cache,
1460			&mut subsystem.mode,
1461			actions,
1462			subsystem.max_approval_retries,
1463			subsystem.retry_backoff,
1464		)
1465		.await?
1466		{
1467			break
1468		}
1469
1470		if !overlayed_db.is_empty() {
1471			let _timer = subsystem.metrics.time_db_transaction();
1472			let ops = overlayed_db.into_write_ops();
1473			backend.write(ops)?;
1474		}
1475	}
1476
1477	Ok(())
1478}
1479
1480// Starts a worker thread that runs the approval voting subsystem.
1481pub async fn start_approval_worker<
1482	WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1483	Sender: SubsystemSender<ChainApiMessage>
1484		+ SubsystemSender<RuntimeApiMessage>
1485		+ SubsystemSender<ChainSelectionMessage>
1486		+ SubsystemSender<AvailabilityRecoveryMessage>
1487		+ SubsystemSender<DisputeCoordinatorMessage>
1488		+ SubsystemSender<CandidateValidationMessage>
1489		+ Clone,
1490	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1491>(
1492	work_provider: WorkProvider,
1493	to_other_subsystems: Sender,
1494	to_approval_distr: ADSender,
1495	config: Config,
1496	db: Arc<dyn Database>,
1497	keystore: Arc<LocalKeystore>,
1498	sync_oracle: Box<dyn SyncOracle + Send>,
1499	metrics: Metrics,
1500	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1501	task_name: &'static str,
1502	group_name: &'static str,
1503	clock: Arc<dyn Clock + Send + Sync>,
1504) -> SubsystemResult<()> {
1505	let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1506		config,
1507		db.clone(),
1508		keystore,
1509		sync_oracle,
1510		metrics,
1511		clock,
1512		spawner,
1513		MAX_APPROVAL_RETRIES,
1514		APPROVAL_CHECKING_TIMEOUT / 2,
1515	);
1516	let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1517	let spawner = approval_voting.spawner.clone();
1518	spawner.spawn_blocking(
1519		task_name,
1520		Some(group_name),
1521		Box::pin(async move {
1522			if let Err(err) = run(
1523				work_provider,
1524				to_other_subsystems,
1525				to_approval_distr,
1526				approval_voting,
1527				Box::new(RealAssignmentCriteria),
1528				backend,
1529			)
1530			.await
1531			{
1532				gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1533			};
1534		}),
1535	);
1536	Ok(())
1537}
1538
1539// Handle actions is a function that accepts a set of instructions
1540// and subsequently updates the underlying approvals_db in accordance
1541// with the linear set of instructions passed in. Therefore, actions
1542// must be processed in series to ensure that earlier actions are not
1543// negated/corrupted by later actions being executed out-of-order.
1544//
1545// However, certain Actions can cause additional actions to need to be
1546// processed by this function. In order to preserve linearity, we would
1547// need to handle these newly generated actions before we finalize
1548// completing additional actions in the submitted sequence of actions.
1549//
1550// Since recursive async functions are not stable yet, we are
1551// forced to modify the actions iterator on the fly whenever a new set
1552// of actions are generated by handling a single action.
1553//
1554// This particular problem statement is specified in issue 3311:
1555// 	https://github.com/paritytech/polkadot/issues/3311
1556//
1557// returns `true` if any of the actions was a `Conclude` command.
1558#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1559async fn handle_actions<
1560	Sender: SubsystemSender<ChainApiMessage>
1561		+ SubsystemSender<RuntimeApiMessage>
1562		+ SubsystemSender<ChainSelectionMessage>
1563		+ SubsystemSender<AvailabilityRecoveryMessage>
1564		+ SubsystemSender<DisputeCoordinatorMessage>
1565		+ SubsystemSender<CandidateValidationMessage>
1566		+ Clone,
1567	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1568>(
1569	sender: &mut Sender,
1570	approval_voting_sender: &mut ADSender,
1571	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1572	state: &mut State,
1573	overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1574	session_info_provider: &mut RuntimeInfo,
1575	metrics: &Metrics,
1576	wakeups: &mut Wakeups,
1577	currently_checking_set: &mut CurrentlyCheckingSet,
1578	delayed_approvals_timers: &mut DelayedApprovalTimer,
1579	approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1580	mode: &mut Mode,
1581	actions: Vec<Action>,
1582	max_approval_retries: u32,
1583	retry_backoff: Duration,
1584) -> SubsystemResult<bool> {
1585	let mut conclude = false;
1586	let mut actions_iter = actions.into_iter();
1587	while let Some(action) = actions_iter.next() {
1588		match action {
1589			Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1590				wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1591			},
1592			Action::IssueApproval(candidate_hash, approval_request) => {
1593				// Note that the IssueApproval action will create additional
1594				// actions that will need to all be processed before we can
1595				// handle the next action in the set passed to the ambient
1596				// function.
1597				//
1598				// In order to achieve this, we append the existing iterator
1599				// to the end of the iterator made up of these newly generated
1600				// actions.
1601				//
1602				// Note that chaining these iterators is O(n) as we must consume
1603				// the prior iterator.
1604				let next_actions: Vec<Action> = issue_approval(
1605					sender,
1606					approval_voting_sender,
1607					state,
1608					overlayed_db,
1609					session_info_provider,
1610					metrics,
1611					candidate_hash,
1612					delayed_approvals_timers,
1613					approval_request,
1614					&wakeups,
1615				)
1616				.await?
1617				.into_iter()
1618				.map(|v| v.clone())
1619				.chain(actions_iter)
1620				.collect();
1621
1622				actions_iter = next_actions.into_iter();
1623			},
1624			Action::LaunchApproval {
1625				claimed_candidate_indices,
1626				candidate_hash,
1627				indirect_cert,
1628				assignment_tranche,
1629				relay_block_hash,
1630				session,
1631				executor_params,
1632				candidate,
1633				backing_group,
1634				distribute_assignment,
1635				core_index,
1636			} => {
1637				// Don't launch approval work if the node is syncing.
1638				if let Mode::Syncing(_) = *mode {
1639					continue
1640				}
1641
1642				metrics.on_assignment_produced(assignment_tranche);
1643				let block_hash = indirect_cert.block_hash;
1644				let validator_index = indirect_cert.validator;
1645
1646				if distribute_assignment {
1647					approval_voting_sender.send_unbounded_message(
1648						ApprovalDistributionMessage::DistributeAssignment(
1649							indirect_cert,
1650							claimed_candidate_indices,
1651						),
1652					);
1653				}
1654
1655				match approvals_cache.get(&candidate_hash) {
1656					Some(ApprovalOutcome::Approved) => {
1657						let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1658							candidate_hash,
1659							ApprovalVoteRequest { validator_index, block_hash },
1660						))
1661						.map(|v| v.clone())
1662						.chain(actions_iter)
1663						.collect();
1664						actions_iter = new_actions.into_iter();
1665					},
1666					None => {
1667						let sender = sender.clone();
1668						let spawn_handle = spawn_handle.clone();
1669
1670						let retry = RetryApprovalInfo {
1671							candidate: candidate.clone(),
1672							backing_group,
1673							executor_params: executor_params.clone(),
1674							core_index,
1675							session_index: session,
1676							attempts_remaining: max_approval_retries,
1677							backoff: retry_backoff,
1678						};
1679
1680						currently_checking_set
1681							.insert_relay_block_hash(
1682								candidate_hash,
1683								validator_index,
1684								relay_block_hash,
1685								async move {
1686									launch_approval(
1687										sender,
1688										spawn_handle,
1689										metrics.clone(),
1690										session,
1691										candidate,
1692										validator_index,
1693										block_hash,
1694										backing_group,
1695										executor_params,
1696										core_index,
1697										retry,
1698									)
1699									.await
1700								},
1701							)
1702							.await?;
1703					},
1704					Some(_) => {},
1705				}
1706			},
1707			Action::NoteApprovedInChainSelection(block_hash) => {
1708				sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1709			},
1710			Action::BecomeActive => {
1711				*mode = Mode::Active;
1712
1713				let (messages, next_actions) = distribution_messages_for_activation(
1714					sender,
1715					overlayed_db,
1716					state,
1717					delayed_approvals_timers,
1718					session_info_provider,
1719				)
1720				.await?;
1721				for message in messages.into_iter() {
1722					approval_voting_sender.send_unbounded_message(message);
1723				}
1724				let next_actions: Vec<Action> =
1725					next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1726
1727				actions_iter = next_actions.into_iter();
1728			},
1729			Action::Conclude => {
1730				conclude = true;
1731			},
1732		}
1733	}
1734
1735	Ok(conclude)
1736}
1737
1738fn cores_to_candidate_indices(
1739	core_indices: &CoreBitfield,
1740	block_entry: &BlockEntry,
1741) -> Result<CandidateBitfield, BitfieldError> {
1742	let mut candidate_indices = Vec::new();
1743
1744	// Map from core index to candidate index.
1745	for claimed_core_index in core_indices.iter_ones() {
1746		if let Some(candidate_index) = block_entry
1747			.candidates()
1748			.iter()
1749			.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1750		{
1751			candidate_indices.push(candidate_index as _);
1752		}
1753	}
1754
1755	CandidateBitfield::try_from(candidate_indices)
1756}
1757
1758// Returns the claimed core bitfield from the assignment cert and the core index
1759// from the block entry.
1760fn get_core_indices_on_startup(
1761	assignment: &AssignmentCertKindV2,
1762	block_entry_core_index: CoreIndex,
1763) -> CoreBitfield {
1764	match &assignment {
1765		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1766		AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
1767			CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
1768		AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1769			CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
1770	}
1771}
1772
1773// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
1774// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
1775// in the block entry which indicates a bug or corrupted storage.
1776fn get_assignment_core_indices(
1777	assignment: &AssignmentCertKindV2,
1778	candidate_hash: &CandidateHash,
1779	block_entry: &BlockEntry,
1780) -> Option<CoreBitfield> {
1781	match &assignment {
1782		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
1783			Some(core_bitfield.clone()),
1784		AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1785			.candidates()
1786			.iter()
1787			.find(|(_core_index, h)| candidate_hash == h)
1788			.map(|(core_index, _candidate_hash)| {
1789				CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1790			}),
1791		AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1792			Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")),
1793	}
1794}
1795
1796#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1797async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApiMessage>>(
1798	sender: &mut Sender,
1799	db: &OverlayedBackend<'_, impl Backend>,
1800	state: &State,
1801	delayed_approvals_timers: &mut DelayedApprovalTimer,
1802	session_info_provider: &mut RuntimeInfo,
1803) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1804	let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1805
1806	let mut approval_meta = Vec::with_capacity(all_blocks.len());
1807	let mut messages = Vec::new();
1808	let mut approvals = Vec::new();
1809	let mut actions = Vec::new();
1810
1811	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
1812
1813	for block_hash in all_blocks {
1814		let block_entry = match db.load_block_entry(&block_hash)? {
1815			Some(b) => b,
1816			None => {
1817				gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1818
1819				continue
1820			},
1821		};
1822
1823		approval_meta.push(BlockApprovalMeta {
1824			hash: block_hash,
1825			number: block_entry.block_number(),
1826			parent_hash: block_entry.parent_hash(),
1827			candidates: block_entry
1828				.candidates()
1829				.iter()
1830				.map(|(core_index, c_hash)| {
1831					let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1832					let group_index = candidate
1833						.and_then(|entry| {
1834							entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1835						})
1836						.unwrap_or_else(|| {
1837							gum::warn!(
1838								target: LOG_TARGET,
1839								?block_hash,
1840								?c_hash,
1841								"Missing candidate entry or approval entry",
1842							);
1843							GroupIndex::default()
1844						});
1845					(*c_hash, *core_index, group_index)
1846				})
1847				.collect(),
1848			slot: block_entry.slot(),
1849			session: block_entry.session(),
1850			vrf_story: block_entry.relay_vrf_story(),
1851		});
1852		let mut signatures_queued = HashSet::new();
1853		for (core_index, candidate_hash) in block_entry.candidates() {
1854			let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1855				Some(c) => c,
1856				None => {
1857					gum::warn!(
1858						target: LOG_TARGET,
1859						?block_hash,
1860						?candidate_hash,
1861						"Missing candidate entry",
1862					);
1863
1864					continue
1865				},
1866			};
1867
1868			match candidate_entry.approval_entry(&block_hash) {
1869				Some(approval_entry) => {
1870					match approval_entry.local_statements() {
1871						(None, None) =>
1872							if approval_entry
1873								.our_assignment()
1874								.map(|assignment| !assignment.triggered())
1875								.unwrap_or(false)
1876							{
1877								actions.push(Action::ScheduleWakeup {
1878									block_hash,
1879									block_number: block_entry.block_number(),
1880									candidate_hash: *candidate_hash,
1881									tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1882								})
1883							},
1884						(None, Some(_)) => {}, // second is impossible case.
1885						(Some(assignment), None) => {
1886							let claimed_core_indices =
1887								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1888
1889							if block_entry.has_candidates_pending_signature() {
1890								delayed_approvals_timers.maybe_arm_timer(
1891									state.clock.tick_now(),
1892									state.clock.as_ref(),
1893									block_entry.block_hash(),
1894									assignment.validator_index(),
1895								)
1896							}
1897
1898							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1899								Ok(bitfield) => {
1900									gum::debug!(
1901										target: LOG_TARGET,
1902										candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1903										?block_hash,
1904										"Discovered, triggered assignment, not approved yet",
1905									);
1906
1907									let indirect_cert = IndirectAssignmentCertV2 {
1908										block_hash,
1909										validator: assignment.validator_index(),
1910										cert: assignment.cert().clone(),
1911									};
1912									messages.push(
1913										ApprovalDistributionMessage::DistributeAssignment(
1914											indirect_cert.clone(),
1915											bitfield.clone(),
1916										),
1917									);
1918
1919									if !block_entry.candidate_is_pending_signature(*candidate_hash)
1920									{
1921										let ExtendedSessionInfo { ref executor_params, .. } =
1922											match get_extended_session_info(
1923												session_info_provider,
1924												sender,
1925												candidate_entry
1926													.candidate_receipt()
1927													.descriptor()
1928													.relay_parent(),
1929											)
1930											.await
1931											{
1932												Some(i) => i,
1933												None => continue,
1934											};
1935
1936										actions.push(Action::LaunchApproval {
1937											claimed_candidate_indices: bitfield,
1938											candidate_hash: candidate_entry
1939												.candidate_receipt()
1940												.hash(),
1941											indirect_cert,
1942											assignment_tranche: assignment.tranche(),
1943											relay_block_hash: block_hash,
1944											session: block_entry.session(),
1945											executor_params: executor_params.clone(),
1946											candidate: candidate_entry.candidate_receipt().clone(),
1947											backing_group: approval_entry.backing_group(),
1948											distribute_assignment: false,
1949											core_index: Some(*core_index),
1950										});
1951									}
1952								},
1953								Err(err) => {
1954									// Should never happen. If we fail here it means the
1955									// assignment is null (no cores claimed).
1956									gum::warn!(
1957										target: LOG_TARGET,
1958										?block_hash,
1959										?candidate_hash,
1960										?err,
1961										"Failed to create assignment bitfield",
1962									);
1963								},
1964							}
1965						},
1966						(Some(assignment), Some(approval_sig)) => {
1967							let claimed_core_indices =
1968								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1969							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1970								Ok(bitfield) => messages.push(
1971									ApprovalDistributionMessage::DistributeAssignment(
1972										IndirectAssignmentCertV2 {
1973											block_hash,
1974											validator: assignment.validator_index(),
1975											cert: assignment.cert().clone(),
1976										},
1977										bitfield,
1978									),
1979								),
1980								Err(err) => {
1981									gum::warn!(
1982										target: LOG_TARGET,
1983										?block_hash,
1984										?candidate_hash,
1985										?err,
1986										"Failed to create assignment bitfield",
1987									);
1988									// If we didn't send assignment, we don't send approval.
1989									continue
1990								},
1991							}
1992							if signatures_queued
1993								.insert(approval_sig.signed_candidates_indices.clone())
1994							{
1995								approvals.push(ApprovalDistributionMessage::DistributeApproval(
1996									IndirectSignedApprovalVoteV2 {
1997										block_hash,
1998										candidate_indices: approval_sig.signed_candidates_indices,
1999										validator: assignment.validator_index(),
2000										signature: approval_sig.signature,
2001									},
2002								))
2003							};
2004						},
2005					}
2006				},
2007				None => {
2008					gum::warn!(
2009						target: LOG_TARGET,
2010						?block_hash,
2011						?candidate_hash,
2012						"Missing approval entry",
2013					);
2014				},
2015			}
2016		}
2017	}
2018
2019	messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
2020	// Approvals are appended at the end, to make sure all assignments are sent
2021	// before the approvals, otherwise if they arrive ahead in approval-distribution
2022	// they will be ignored.
2023	messages.extend(approvals.into_iter());
2024	Ok((messages, actions))
2025}
2026
2027// Handle an incoming signal from the overseer. Returns true if execution should conclude.
2028async fn handle_from_overseer<
2029	Sender: SubsystemSender<ChainApiMessage>
2030		+ SubsystemSender<RuntimeApiMessage>
2031		+ SubsystemSender<ChainSelectionMessage>
2032		+ Clone,
2033	ADSender: SubsystemSender<ApprovalDistributionMessage>,
2034>(
2035	sender: &mut Sender,
2036	approval_voting_sender: &mut ADSender,
2037	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2038	state: &mut State,
2039	db: &mut OverlayedBackend<'_, impl Backend>,
2040	session_info_provider: &mut RuntimeInfo,
2041	metrics: &Metrics,
2042	x: FromOrchestra<ApprovalVotingMessage>,
2043	last_finalized_height: &mut Option<BlockNumber>,
2044	wakeups: &mut Wakeups,
2045) -> SubsystemResult<Vec<Action>> {
2046	let actions = match x {
2047		FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2048			let mut actions = Vec::new();
2049			if let Some(activated) = update.activated {
2050				let head = activated.hash;
2051				match import::handle_new_head(
2052					sender,
2053					approval_voting_sender,
2054					state,
2055					db,
2056					session_info_provider,
2057					head,
2058					last_finalized_height,
2059				)
2060				.await
2061				{
2062					Err(e) => return Err(SubsystemError::with_origin("db", e)),
2063					Ok(block_imported_candidates) => {
2064						// Schedule wakeups for all imported candidates.
2065						for block_batch in block_imported_candidates {
2066							gum::debug!(
2067								target: LOG_TARGET,
2068								block_number = ?block_batch.block_number,
2069								block_hash = ?block_batch.block_hash,
2070								num_candidates = block_batch.imported_candidates.len(),
2071								"Imported new block.",
2072							);
2073
2074							state.no_show_stats.maybe_print(block_batch.block_number);
2075
2076							for (c_hash, c_entry) in block_batch.imported_candidates {
2077								metrics.on_candidate_imported();
2078
2079								let our_tranche = c_entry
2080									.approval_entry(&block_batch.block_hash)
2081									.and_then(|a| a.our_assignment().map(|a| a.tranche()));
2082
2083								if let Some(our_tranche) = our_tranche {
2084									let tick = our_tranche as Tick + block_batch.block_tick;
2085									gum::trace!(
2086										target: LOG_TARGET,
2087										tranche = our_tranche,
2088										candidate_hash = ?c_hash,
2089										block_hash = ?block_batch.block_hash,
2090										block_tick = block_batch.block_tick,
2091										"Scheduling first wakeup.",
2092									);
2093
2094									// Our first wakeup will just be the tranche of our assignment,
2095									// if any. This will likely be superseded by incoming
2096									// assignments and approvals which trigger rescheduling.
2097									actions.push(Action::ScheduleWakeup {
2098										block_hash: block_batch.block_hash,
2099										block_number: block_batch.block_number,
2100										candidate_hash: c_hash,
2101										tick,
2102									});
2103								}
2104							}
2105						}
2106					},
2107				}
2108			}
2109
2110			actions
2111		},
2112		FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2113			gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2114			*last_finalized_height = Some(block_number);
2115
2116			crate::ops::canonicalize(db, block_number, block_hash)
2117				.map_err(|e| SubsystemError::with_origin("db", e))?;
2118
2119			// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2120			// accordingly.
2121			wakeups.prune_finalized_wakeups(block_number);
2122			state.cleanup_assignments_gathering_timestamp(block_number);
2123
2124			// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2125			// accordingly. let hash_set =
2126			// wakeups.block_numbers.values().flatten().collect::<HashSet<_>>(); state.spans.
2127			// retain(|hash, _| hash_set.contains(hash));
2128
2129			Vec::new()
2130		},
2131		FromOrchestra::Signal(OverseerSignal::Conclude) => {
2132			vec![Action::Conclude]
2133		},
2134		FromOrchestra::Communication { msg } => match msg {
2135			ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2136				let (check_outcome, actions) =
2137					import_assignment(sender, state, db, session_info_provider, checked_assignment)
2138						.await?;
2139				// approval-distribution makes sure this assignment is valid and expected,
2140				// so this import should never fail, if it does it might mean one of two things,
2141				// there is a bug in the code or the two subsystems got out of sync.
2142				if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2143					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2144				}
2145				let _ = tx.map(|tx| tx.send(check_outcome));
2146				actions
2147			},
2148			ApprovalVotingMessage::ImportApproval(a, tx) => {
2149				let result =
2150					import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2151						.await?;
2152				// approval-distribution makes sure this vote is valid and expected,
2153				// so this import should never fail, if it does it might mean one of two things,
2154				// there is a bug in the code or the two subsystems got out of sync.
2155				if let ApprovalCheckResult::Bad(ref err) = result.1 {
2156					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2157				}
2158				let _ = tx.map(|tx| tx.send(result.1));
2159
2160				result.0
2161			},
2162			ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2163				match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2164					.await
2165				{
2166					Ok(v) => {
2167						let _ = res.send(v);
2168					},
2169					Err(e) => {
2170						let _ = res.send(None);
2171						return Err(e)
2172					},
2173				}
2174
2175				Vec::new()
2176			},
2177			ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2178				metrics.on_candidate_signatures_request();
2179				get_approval_signatures_for_candidate(
2180					approval_voting_sender.clone(),
2181					spawn_handle,
2182					db,
2183					candidate_hash,
2184					tx,
2185				)
2186				.await?;
2187				Vec::new()
2188			},
2189		},
2190	};
2191
2192	Ok(actions)
2193}
2194
2195/// Retrieve approval signatures.
2196///
2197/// This involves an unbounded message send to approval-distribution, the caller has to ensure that
2198/// calls to this function are infrequent and bounded.
2199#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2200async fn get_approval_signatures_for_candidate<
2201	Sender: SubsystemSender<ApprovalDistributionMessage>,
2202>(
2203	mut sender: Sender,
2204	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2205	db: &OverlayedBackend<'_, impl Backend>,
2206	candidate_hash: CandidateHash,
2207	tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2208) -> SubsystemResult<()> {
2209	let send_votes = |votes| {
2210		if let Err(_) = tx.send(votes) {
2211			gum::debug!(
2212				target: LOG_TARGET,
2213				"Sending approval signatures back failed, as receiver got closed."
2214			);
2215		}
2216	};
2217	let entry = match db.load_candidate_entry(&candidate_hash)? {
2218		None => {
2219			send_votes(HashMap::new());
2220			gum::debug!(
2221				target: LOG_TARGET,
2222				?candidate_hash,
2223				"Sent back empty votes because the candidate was not found in db."
2224			);
2225			return Ok(())
2226		},
2227		Some(e) => e,
2228	};
2229
2230	let relay_hashes = entry.block_assignments.keys();
2231
2232	let mut candidate_indices = HashSet::new();
2233	let mut candidate_indices_to_candidate_hashes: HashMap<
2234		Hash,
2235		HashMap<CandidateIndex, CandidateHash>,
2236	> = HashMap::new();
2237
2238	// Retrieve `CoreIndices`/`CandidateIndices` as required by approval-distribution:
2239	for hash in relay_hashes {
2240		let entry = match db.load_block_entry(hash)? {
2241			None => {
2242				gum::debug!(
2243					target: LOG_TARGET,
2244					?candidate_hash,
2245					?hash,
2246					"Block entry for assignment missing."
2247				);
2248				continue
2249			},
2250			Some(e) => e,
2251		};
2252		for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2253			if c_hash == &candidate_hash {
2254				candidate_indices.insert((*hash, candidate_index as u32));
2255			}
2256			candidate_indices_to_candidate_hashes
2257				.entry(*hash)
2258				.or_default()
2259				.insert(candidate_index as _, *c_hash);
2260		}
2261	}
2262
2263	let get_approvals = async move {
2264		let (tx_distribution, rx_distribution) = oneshot::channel();
2265		sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2266			candidate_indices,
2267			tx_distribution,
2268		));
2269
2270		// Because of the unbounded sending and the nature of the call (just fetching data from
2271		// state), this should not block long:
2272		match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2273			None => {
2274				gum::warn!(
2275					target: LOG_TARGET,
2276					"Waiting for approval signatures timed out - dead lock?"
2277				);
2278			},
2279			Some(Err(_)) => gum::debug!(
2280				target: LOG_TARGET,
2281				"Request for approval signatures got cancelled by `approval-distribution`."
2282			),
2283			Some(Ok(votes)) => {
2284				let votes = votes
2285					.into_iter()
2286					.filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2287						let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2288
2289						if candidates_hashes.is_none() {
2290							gum::warn!(
2291								target: LOG_TARGET,
2292								?hash,
2293								"Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2294							);
2295						}
2296
2297						let num_signed_candidates = signed_candidates_indices.len();
2298
2299						let signed_candidates_hashes: Vec<CandidateHash> =
2300							signed_candidates_indices
2301								.into_iter()
2302								.filter_map(|candidate_index| {
2303									candidates_hashes.and_then(|candidate_hashes| {
2304										if let Some(candidate_hash) =
2305											candidate_hashes.get(&candidate_index)
2306										{
2307											Some(*candidate_hash)
2308										} else {
2309											gum::warn!(
2310												target: LOG_TARGET,
2311												?candidate_index,
2312												"Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2313											);
2314											None
2315										}
2316									})
2317								})
2318								.collect();
2319						if num_signed_candidates == signed_candidates_hashes.len() {
2320							Some((validator_index, (signed_candidates_hashes, signature)))
2321						} else {
2322							gum::warn!(
2323								target: LOG_TARGET,
2324								"Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2325							);
2326							None
2327						}
2328					})
2329					.collect();
2330				send_votes(votes)
2331			},
2332		}
2333	};
2334
2335	// No need to block subsystem on this (also required to break cycle).
2336	// We should not be sending this message frequently - caller must make sure this is bounded.
2337	gum::trace!(
2338		target: LOG_TARGET,
2339		?candidate_hash,
2340		"Spawning task for fetching signatures from approval-distribution"
2341	);
2342	spawn_handle.spawn(
2343		"get-approval-signatures",
2344		Some("approval-voting-subsystem"),
2345		Box::pin(get_approvals),
2346	);
2347	Ok(())
2348}
2349
2350#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2351async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2352	sender: &mut Sender,
2353	db: &OverlayedBackend<'_, impl Backend>,
2354	target: Hash,
2355	lower_bound: BlockNumber,
2356	wakeups: &Wakeups,
2357	metrics: &Metrics,
2358) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2359	const MAX_TRACING_WINDOW: usize = 200;
2360	const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2361	const LOGGING_DEPTH_THRESHOLD: usize = 10;
2362
2363	let mut all_approved_max = None;
2364
2365	let target_number = {
2366		let (tx, rx) = oneshot::channel();
2367
2368		sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2369
2370		match rx.await {
2371			Ok(Ok(Some(n))) => n,
2372			Ok(Ok(None)) => return Ok(None),
2373			Ok(Err(_)) | Err(_) => return Ok(None),
2374		}
2375	};
2376
2377	if target_number <= lower_bound {
2378		return Ok(None)
2379	}
2380
2381	// request ancestors up to but not including the lower bound,
2382	// as a vote on the lower bound is implied if we cannot find
2383	// anything else.
2384	let ancestry = if target_number > lower_bound + 1 {
2385		let (tx, rx) = oneshot::channel();
2386
2387		sender
2388			.send_message(ChainApiMessage::Ancestors {
2389				hash: target,
2390				k: (target_number - (lower_bound + 1)) as usize,
2391				response_channel: tx,
2392			})
2393			.await;
2394
2395		match rx.await {
2396			Ok(Ok(a)) => a,
2397			Err(_) | Ok(Err(_)) => return Ok(None),
2398		}
2399	} else {
2400		Vec::new()
2401	};
2402	let ancestry_len = ancestry.len();
2403
2404	let mut block_descriptions = Vec::new();
2405
2406	let mut bits: BitVec<u8, Lsb0> = Default::default();
2407	for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2408		// Block entries should be present as the assumption is that
2409		// nothing here is finalized. If we encounter any missing block
2410		// entries we can fail.
2411		let entry = match db.load_block_entry(&block_hash)? {
2412			None => {
2413				let block_number = target_number.saturating_sub(i as u32);
2414				gum::info!(
2415					target: LOG_TARGET,
2416					unknown_number = ?block_number,
2417					unknown_hash = ?block_hash,
2418					"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2419					target,
2420					target_number,
2421					lower_bound,
2422					lower_bound,
2423				);
2424				return Ok(None)
2425			},
2426			Some(b) => b,
2427		};
2428
2429		// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
2430		// disk lookups.
2431		bits.push(entry.is_fully_approved());
2432		if entry.is_fully_approved() {
2433			if all_approved_max.is_none() {
2434				// First iteration of the loop is target, i = 0. After that,
2435				// ancestry is moving backwards.
2436				all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2437			}
2438			block_descriptions.push(BlockDescription {
2439				block_hash,
2440				session: entry.session(),
2441				candidates: entry
2442					.candidates()
2443					.iter()
2444					.map(|(_idx, candidate_hash)| *candidate_hash)
2445					.collect(),
2446			});
2447		} else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2448			all_approved_max = None;
2449			block_descriptions.clear();
2450		} else {
2451			all_approved_max = None;
2452			block_descriptions.clear();
2453
2454			let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2455			gum::debug!(
2456				target: LOG_TARGET,
2457				"Block {} is {} blocks deep and has {}/{} candidates unapproved",
2458				block_hash,
2459				bits.len() - 1,
2460				unapproved.len(),
2461				entry.candidates().len(),
2462			);
2463			if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2464			{
2465				gum::trace!(
2466					target: LOG_TARGET,
2467					?block_hash,
2468					"Unapproved candidates at depth {}: {:?}",
2469					bits.len(),
2470					unapproved
2471				)
2472			}
2473			metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2474			for candidate_hash in unapproved {
2475				match db.load_candidate_entry(&candidate_hash)? {
2476					None => {
2477						gum::warn!(
2478							target: LOG_TARGET,
2479							?candidate_hash,
2480							"Missing expected candidate in DB",
2481						);
2482
2483						continue
2484					},
2485					Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2486						None => {
2487							gum::warn!(
2488								target: LOG_TARGET,
2489								?candidate_hash,
2490								?block_hash,
2491								"Missing expected approval entry under candidate.",
2492							);
2493						},
2494						Some(a_entry) => {
2495							let status = || {
2496								let n_assignments = a_entry.n_assignments();
2497
2498								// Take the approvals, filtered by the assignments
2499								// for this block.
2500								let n_approvals = c_entry
2501									.approvals()
2502									.iter()
2503									.by_vals()
2504									.enumerate()
2505									.filter(|(i, approved)| {
2506										*approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2507									})
2508									.count();
2509
2510								format!(
2511									"{}/{}/{}",
2512									n_assignments,
2513									n_approvals,
2514									a_entry.n_validators(),
2515								)
2516							};
2517
2518							match a_entry.our_assignment() {
2519								None => gum::debug!(
2520									target: LOG_TARGET,
2521									?candidate_hash,
2522									?block_hash,
2523									status = %status(),
2524									"no assignment."
2525								),
2526								Some(a) => {
2527									let tranche = a.tranche();
2528									let triggered = a.triggered();
2529
2530									let next_wakeup =
2531										wakeups.wakeup_for(block_hash, candidate_hash);
2532
2533									let approved =
2534										triggered && { a_entry.local_statements().1.is_some() };
2535
2536									gum::debug!(
2537										target: LOG_TARGET,
2538										?candidate_hash,
2539										?block_hash,
2540										tranche,
2541										?next_wakeup,
2542										status = %status(),
2543										triggered,
2544										approved,
2545										"assigned."
2546									);
2547								},
2548							}
2549						},
2550					},
2551				}
2552			}
2553		}
2554	}
2555
2556	gum::debug!(
2557		target: LOG_TARGET,
2558		"approved blocks {}-[{}]-{}",
2559		target_number,
2560		{
2561			// formatting to divide bits by groups of 10.
2562			// when comparing logs on multiple machines where the exact vote
2563			// targets may differ, this grouping is useful.
2564			let mut s = String::with_capacity(bits.len());
2565			for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2566				s.push(if *bit { '1' } else { '0' });
2567				if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 {
2568					s.push(' ');
2569				}
2570			}
2571
2572			s
2573		},
2574		if bits.len() > MAX_TRACING_WINDOW {
2575			format!(
2576				"{}... (truncated due to large window)",
2577				target_number - MAX_TRACING_WINDOW as u32 + 1,
2578			)
2579		} else {
2580			format!("{}", lower_bound + 1)
2581		},
2582	);
2583
2584	// `reverse()` to obtain the ascending order from lowest to highest
2585	// block within the candidates, which is the expected order
2586	block_descriptions.reverse();
2587
2588	let all_approved_max =
2589		all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2590			hash,
2591			number: block_number,
2592			descriptions: block_descriptions,
2593		});
2594
2595	Ok(all_approved_max)
2596}
2597
2598// `Option::cmp` treats `None` as less than `Some`.
2599fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2600	match (a, b) {
2601		(None, None) => None,
2602		(None, Some(x)) | (Some(x), None) => Some(x),
2603		(Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2604	}
2605}
2606
2607fn schedule_wakeup_action(
2608	approval_entry: &ApprovalEntry,
2609	block_hash: Hash,
2610	block_number: BlockNumber,
2611	candidate_hash: CandidateHash,
2612	block_tick: Tick,
2613	tick_now: Tick,
2614	required_tranches: RequiredTranches,
2615) -> Option<Action> {
2616	let maybe_action = match required_tranches {
2617		_ if approval_entry.is_approved() => None,
2618		RequiredTranches::All => None,
2619		RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2620			// Take the earlier of the next no show or the last assignment tick + required delay,
2621			// only considering the latter if it is after the current moment.
2622			min_prefer_some(
2623				last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2624				next_no_show,
2625			)
2626			.map(|tick| Action::ScheduleWakeup {
2627				block_hash,
2628				block_number,
2629				candidate_hash,
2630				tick,
2631			})
2632		},
2633		RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2634			// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
2635			// after `considered`, including any tranche that might contain our own untriggered
2636			// assignment.
2637			let next_non_empty_tranche = {
2638				let next_announced = approval_entry
2639					.tranches()
2640					.iter()
2641					.skip_while(|t| t.tranche() <= considered)
2642					.map(|t| t.tranche())
2643					.next();
2644
2645				let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2646					if !t.triggered() && t.tranche() > considered {
2647						Some(t.tranche())
2648					} else {
2649						None
2650					}
2651				});
2652
2653				// Apply the clock drift to these tranches.
2654				min_prefer_some(next_announced, our_untriggered)
2655					.map(|t| t as Tick + block_tick + clock_drift)
2656			};
2657
2658			min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2659				Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2660			})
2661		},
2662	};
2663
2664	match maybe_action {
2665		Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2666			target: LOG_TARGET,
2667			tick,
2668			?candidate_hash,
2669			?block_hash,
2670			block_tick,
2671			"Scheduling next wakeup.",
2672		),
2673		None => gum::trace!(
2674			target: LOG_TARGET,
2675			?candidate_hash,
2676			?block_hash,
2677			block_tick,
2678			"No wakeup needed.",
2679		),
2680		Some(_) => {}, // unreachable
2681	}
2682
2683	maybe_action
2684}
2685
2686async fn import_assignment<Sender>(
2687	sender: &mut Sender,
2688	state: &State,
2689	db: &mut OverlayedBackend<'_, impl Backend>,
2690	session_info_provider: &mut RuntimeInfo,
2691	checked_assignment: CheckedIndirectAssignment,
2692) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2693where
2694	Sender: SubsystemSender<RuntimeApiMessage>,
2695{
2696	let tick_now = state.clock.tick_now();
2697	let assignment = checked_assignment.assignment();
2698	let candidate_indices = checked_assignment.candidate_indices();
2699	let tranche = checked_assignment.tranche();
2700
2701	let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2702		Some(b) => b,
2703		None =>
2704			return Ok((
2705				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2706					assignment.block_hash,
2707				)),
2708				Vec::new(),
2709			)),
2710	};
2711
2712	let session_info = match get_session_info_by_index(
2713		session_info_provider,
2714		sender,
2715		block_entry.parent_hash(),
2716		block_entry.session(),
2717	)
2718	.await
2719	{
2720		Some(s) => s,
2721		None =>
2722			return Ok((
2723				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2724					block_entry.session(),
2725				)),
2726				Vec::new(),
2727			)),
2728	};
2729
2730	let n_cores = session_info.n_cores as usize;
2731
2732	// Early check the candidate bitfield and core bitfields lengths < `n_cores`.
2733	// Core bitfield length is checked later in `check_assignment_cert`.
2734	if candidate_indices.len() > n_cores {
2735		gum::debug!(
2736			target: LOG_TARGET,
2737			validator = assignment.validator.0,
2738			n_cores,
2739			candidate_bitfield_len = ?candidate_indices.len(),
2740			"Oversized bitfield",
2741		);
2742
2743		return Ok((
2744			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2745				candidate_indices.len(),
2746			)),
2747			Vec::new(),
2748		))
2749	}
2750
2751	let mut claimed_core_indices = Vec::new();
2752	let mut assigned_candidate_hashes = Vec::new();
2753
2754	for candidate_index in candidate_indices.iter_ones() {
2755		let (claimed_core_index, assigned_candidate_hash) =
2756			match block_entry.candidate(candidate_index) {
2757				Some((c, h)) => (*c, *h),
2758				None =>
2759					return Ok((
2760						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2761							candidate_index as _,
2762						)),
2763						Vec::new(),
2764					)), // no candidate at core.
2765			};
2766
2767		let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2768			Some(c) => c,
2769			None =>
2770				return Ok((
2771					AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2772						candidate_index as _,
2773						assigned_candidate_hash,
2774					)),
2775					Vec::new(),
2776				)), // no candidate at core.
2777		};
2778
2779		if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2780			return Ok((
2781				AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2782					assignment.block_hash,
2783					assigned_candidate_hash,
2784				)),
2785				Vec::new(),
2786			));
2787		};
2788
2789		claimed_core_indices.push(claimed_core_index);
2790		assigned_candidate_hashes.push(assigned_candidate_hash);
2791	}
2792
2793	// Error on null assignments.
2794	if claimed_core_indices.is_empty() {
2795		return Ok((
2796			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2797				assignment.validator,
2798				format!("{:?}", InvalidAssignmentReason::NullAssignment),
2799			)),
2800			Vec::new(),
2801		))
2802	}
2803
2804	let mut actions = Vec::new();
2805	let res = {
2806		let mut is_duplicate = true;
2807		// Import the assignments for all cores in the cert.
2808		for (assigned_candidate_hash, candidate_index) in
2809			assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2810		{
2811			let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2812				Some(c) => c,
2813				None =>
2814					return Ok((
2815						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2816							candidate_index as _,
2817							*assigned_candidate_hash,
2818						)),
2819						Vec::new(),
2820					)),
2821			};
2822
2823			let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2824				Some(a) => a,
2825				None =>
2826					return Ok((
2827						AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2828							assignment.block_hash,
2829							*assigned_candidate_hash,
2830						)),
2831						Vec::new(),
2832					)),
2833			};
2834
2835			let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2836			is_duplicate &= is_duplicate_for_candidate;
2837			approval_entry.import_assignment(
2838				tranche,
2839				assignment.validator,
2840				tick_now,
2841				is_duplicate_for_candidate,
2842			);
2843
2844			// We've imported a new assignment, so we need to schedule a wake-up for when that might
2845			// no-show.
2846			if let Some((approval_entry, status)) = state
2847				.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2848				.await
2849			{
2850				actions.extend(schedule_wakeup_action(
2851					approval_entry,
2852					block_entry.block_hash(),
2853					block_entry.block_number(),
2854					*assigned_candidate_hash,
2855					status.block_tick,
2856					tick_now,
2857					status.required_tranches,
2858				));
2859			}
2860
2861			// We also write the candidate entry as it now contains the new candidate.
2862			db.write_candidate_entry(candidate_entry.into());
2863		}
2864
2865		// Since we don't account for tranche in distribution message fingerprinting, some
2866		// validators can be assigned to the same core (VRF modulo vs VRF delay). These can be
2867		// safely ignored. However, if an assignment is for multiple cores (these are only
2868		// tranche0), we cannot ignore it, because it would mean ignoring other non duplicate
2869		// assignments.
2870		if is_duplicate {
2871			AssignmentCheckResult::AcceptedDuplicate
2872		} else if candidate_indices.count_ones() > 1 {
2873			gum::trace!(
2874				target: LOG_TARGET,
2875				validator = assignment.validator.0,
2876				candidate_hashes = ?assigned_candidate_hashes,
2877				assigned_cores = ?claimed_core_indices,
2878				?tranche,
2879				"Imported assignments for multiple cores.",
2880			);
2881
2882			AssignmentCheckResult::Accepted
2883		} else {
2884			gum::trace!(
2885				target: LOG_TARGET,
2886				validator = assignment.validator.0,
2887				candidate_hashes = ?assigned_candidate_hashes,
2888				assigned_cores = ?claimed_core_indices,
2889				"Imported assignment for a single core.",
2890			);
2891
2892			AssignmentCheckResult::Accepted
2893		}
2894	};
2895
2896	Ok((res, actions))
2897}
2898
2899async fn import_approval<Sender>(
2900	sender: &mut Sender,
2901	state: &mut State,
2902	db: &mut OverlayedBackend<'_, impl Backend>,
2903	session_info_provider: &mut RuntimeInfo,
2904	metrics: &Metrics,
2905	approval: CheckedIndirectSignedApprovalVote,
2906	wakeups: &Wakeups,
2907) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2908where
2909	Sender: SubsystemSender<RuntimeApiMessage>,
2910{
2911	macro_rules! respond_early {
2912		($e: expr) => {{
2913			return Ok((Vec::new(), $e))
2914		}};
2915	}
2916
2917	let block_entry = match db.load_block_entry(&approval.block_hash)? {
2918		Some(b) => b,
2919		None => {
2920			respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2921				approval.block_hash
2922			),))
2923		},
2924	};
2925
2926	let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2927		approval
2928			.candidate_indices
2929			.iter_ones()
2930			.map(|candidate_index| {
2931				block_entry
2932					.candidate(candidate_index)
2933					.ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2934					.map(|candidate| (candidate_index as _, candidate.1))
2935			})
2936			.collect();
2937
2938	let approved_candidates_info = match approved_candidates_info {
2939		Ok(approved_candidates_info) => approved_candidates_info,
2940		Err(err) => {
2941			respond_early!(ApprovalCheckResult::Bad(err))
2942		},
2943	};
2944
2945	gum::trace!(
2946		target: LOG_TARGET,
2947		"Received approval for num_candidates {:}",
2948		approval.candidate_indices.count_ones()
2949	);
2950
2951	let mut actions = Vec::new();
2952	for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2953		let block_entry = match db.load_block_entry(&approval.block_hash)? {
2954			Some(b) => b,
2955			None => {
2956				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2957					approval.block_hash
2958				),))
2959			},
2960		};
2961
2962		let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2963			Some(c) => c,
2964			None => {
2965				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2966					approval_candidate_index,
2967					approved_candidate_hash
2968				),))
2969			},
2970		};
2971
2972		// Don't accept approvals until assignment.
2973		match candidate_entry.approval_entry(&approval.block_hash) {
2974			None => {
2975				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2976					approval.block_hash,
2977					approved_candidate_hash
2978				),))
2979			},
2980			Some(e) if !e.is_assigned(approval.validator) => {
2981				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2982					approval.validator
2983				),))
2984			},
2985			_ => {},
2986		}
2987
2988		gum::trace!(
2989			target: LOG_TARGET,
2990			validator_index = approval.validator.0,
2991			candidate_hash = ?approved_candidate_hash,
2992			para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2993			"Importing approval vote",
2994		);
2995
2996		let new_actions = advance_approval_state(
2997			sender,
2998			state,
2999			db,
3000			session_info_provider,
3001			&metrics,
3002			block_entry,
3003			approved_candidate_hash,
3004			candidate_entry,
3005			ApprovalStateTransition::RemoteApproval(approval.validator),
3006			wakeups,
3007		)
3008		.await;
3009		actions.extend(new_actions);
3010	}
3011
3012	// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
3013	Ok((actions, ApprovalCheckResult::Accepted))
3014}
3015
3016#[derive(Debug)]
3017enum ApprovalStateTransition {
3018	RemoteApproval(ValidatorIndex),
3019	LocalApproval(ValidatorIndex),
3020	WakeupProcessed,
3021}
3022
3023impl ApprovalStateTransition {
3024	fn validator_index(&self) -> Option<ValidatorIndex> {
3025		match *self {
3026			ApprovalStateTransition::RemoteApproval(v) |
3027			ApprovalStateTransition::LocalApproval(v) => Some(v),
3028			ApprovalStateTransition::WakeupProcessed => None,
3029		}
3030	}
3031
3032	fn is_local_approval(&self) -> bool {
3033		match *self {
3034			ApprovalStateTransition::RemoteApproval(_) => false,
3035			ApprovalStateTransition::LocalApproval(_) => true,
3036			ApprovalStateTransition::WakeupProcessed => false,
3037		}
3038	}
3039
3040	fn is_remote_approval(&self) -> bool {
3041		matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3042	}
3043}
3044
3045// Advance the approval state, either by importing an approval vote which is already checked to be
3046// valid and corresponding to an assigned validator on the candidate and block, or by noting that
3047// there are no further wakeups or tranches needed. This updates the block entry and candidate entry
3048// as necessary and schedules any further wakeups.
3049async fn advance_approval_state<Sender>(
3050	sender: &mut Sender,
3051	state: &mut State,
3052	db: &mut OverlayedBackend<'_, impl Backend>,
3053	session_info_provider: &mut RuntimeInfo,
3054	metrics: &Metrics,
3055	mut block_entry: BlockEntry,
3056	candidate_hash: CandidateHash,
3057	mut candidate_entry: CandidateEntry,
3058	transition: ApprovalStateTransition,
3059	wakeups: &Wakeups,
3060) -> Vec<Action>
3061where
3062	Sender: SubsystemSender<RuntimeApiMessage>,
3063{
3064	let validator_index = transition.validator_index();
3065
3066	let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3067	let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3068
3069	// Check for early exits.
3070	//
3071	// If the candidate was approved
3072	// but not the block, it means that we still need more approvals for the candidate under the
3073	// block.
3074	//
3075	// If the block was approved, but the validator hadn't approved it yet, we should still hold
3076	// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
3077	// assignment might manifest as a no-show.
3078	if !transition.is_local_approval() {
3079		// We don't store remote votes and there's nothing to store for processed wakeups,
3080		// so we can early exit as long at the candidate is already concluded under the
3081		// block i.e. we don't need more approvals.
3082		if candidate_approved_in_block {
3083			return Vec::new()
3084		}
3085	}
3086
3087	let mut actions = Vec::new();
3088	let block_hash = block_entry.block_hash();
3089	let block_number = block_entry.block_number();
3090	let session_index = block_entry.session();
3091	let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3092	let tick_now = state.clock.tick_now();
3093
3094	let (is_approved, status) = if let Some((approval_entry, status)) = state
3095		.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3096		.await
3097	{
3098		let check = approval_checking::check_approval(
3099			&candidate_entry,
3100			approval_entry,
3101			status.required_tranches.clone(),
3102		);
3103		state.observe_assignment_gathering_status(
3104			&metrics,
3105			&status.required_tranches,
3106			block_hash,
3107			block_entry.block_number(),
3108			candidate_hash,
3109		);
3110
3111		// Check whether this is approved, while allowing a maximum
3112		// assignment tick of `now - APPROVAL_DELAY` - that is, that
3113		// all counted assignments are at least `APPROVAL_DELAY` ticks old.
3114		let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3115		if status.last_no_shows != 0 {
3116			metrics.on_observed_no_shows(status.last_no_shows);
3117			gum::trace!(
3118				target: LOG_TARGET,
3119				?candidate_hash,
3120				?block_hash,
3121				last_no_shows = ?status.last_no_shows,
3122				"Observed no_shows",
3123			);
3124		}
3125		if is_approved {
3126			gum::trace!(
3127				target: LOG_TARGET,
3128				?candidate_hash,
3129				?block_hash,
3130				"Candidate approved under block.",
3131			);
3132
3133			let no_shows = check.known_no_shows();
3134
3135			let was_block_approved = block_entry.is_fully_approved();
3136			block_entry.mark_approved_by_hash(&candidate_hash);
3137			let is_block_approved = block_entry.is_fully_approved();
3138
3139			if no_shows != 0 {
3140				metrics.on_no_shows(no_shows);
3141			}
3142			if check == Check::ApprovedOneThird {
3143				// No-shows are not counted when more than one third of validators approve a
3144				// candidate, so count candidates where more than one third of validators had to
3145				// approve it, this is indicative of something breaking.
3146				metrics.on_approved_by_one_third()
3147			}
3148
3149			metrics.on_candidate_approved(status.tranche_now as _);
3150
3151			if is_block_approved && !was_block_approved {
3152				metrics.on_block_approved(status.tranche_now as _);
3153				actions.push(Action::NoteApprovedInChainSelection(block_hash));
3154			}
3155
3156			db.write_block_entry(block_entry.into());
3157		} else if transition.is_local_approval() {
3158			// Local approvals always update the block_entry, so we need to flush it to
3159			// the database.
3160			db.write_block_entry(block_entry.into());
3161		}
3162
3163		(is_approved, status)
3164	} else {
3165		gum::warn!(
3166			target: LOG_TARGET,
3167			?candidate_hash,
3168			?block_hash,
3169			?validator_index,
3170			"No approval entry for approval under block",
3171		);
3172
3173		return Vec::new()
3174	};
3175
3176	{
3177		let approval_entry = candidate_entry
3178			.approval_entry_mut(&block_hash)
3179			.expect("Approval entry just fetched; qed");
3180
3181		let was_approved = approval_entry.is_approved();
3182		let newly_approved = is_approved && !was_approved;
3183
3184		if is_approved {
3185			approval_entry.mark_approved();
3186		}
3187		if newly_approved {
3188			state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3189		}
3190		actions.extend(schedule_wakeup_action(
3191			&approval_entry,
3192			block_hash,
3193			block_number,
3194			candidate_hash,
3195			status.block_tick,
3196			tick_now,
3197			status.required_tranches,
3198		));
3199
3200		if is_approved && transition.is_remote_approval() {
3201			// Make sure we wake other blocks in case they have
3202			// a no-show that might be covered by this approval.
3203			for (fork_block_hash, fork_approval_entry) in candidate_entry
3204				.block_assignments
3205				.iter()
3206				.filter(|(hash, _)| **hash != block_hash)
3207			{
3208				let assigned_on_fork_block = validator_index
3209					.as_ref()
3210					.map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3211					.unwrap_or_default();
3212				if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3213					!fork_approval_entry.is_approved() &&
3214					assigned_on_fork_block
3215				{
3216					let fork_block_entry = db.load_block_entry(fork_block_hash);
3217					if let Ok(Some(fork_block_entry)) = fork_block_entry {
3218						actions.push(Action::ScheduleWakeup {
3219							block_hash: *fork_block_hash,
3220							block_number: fork_block_entry.block_number(),
3221							candidate_hash,
3222							// Schedule the wakeup next tick, since the assignment must be a
3223							// no-show, because there is no-wakeup scheduled.
3224							tick: tick_now + 1,
3225						})
3226					} else {
3227						gum::debug!(
3228							target: LOG_TARGET,
3229							?fork_block_entry,
3230							?fork_block_hash,
3231							"Failed to load block entry"
3232						)
3233					}
3234				}
3235			}
3236		}
3237		// We have no need to write the candidate entry if all of the following
3238		// is true:
3239		//
3240		// 1. This is not a local approval, as we don't store anything new in the approval entry.
3241		// 2. The candidate is not newly approved, as we haven't altered the approval entry's
3242		//    approved flag with `mark_approved` above.
3243		// 3. The approver, if any, had already approved the candidate, as we haven't altered the
3244		// bitfield.
3245		if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3246		{
3247			// In all other cases, we need to write the candidate entry.
3248			db.write_candidate_entry(candidate_entry);
3249		}
3250	}
3251
3252	actions
3253}
3254
3255fn should_trigger_assignment(
3256	approval_entry: &ApprovalEntry,
3257	candidate_entry: &CandidateEntry,
3258	required_tranches: RequiredTranches,
3259	tranche_now: DelayTranche,
3260) -> bool {
3261	match approval_entry.our_assignment() {
3262		None => false,
3263		Some(ref assignment) if assignment.triggered() => false,
3264		Some(ref assignment) if assignment.tranche() == 0 => true,
3265		Some(ref assignment) => {
3266			match required_tranches {
3267				RequiredTranches::All => !approval_checking::check_approval(
3268					&candidate_entry,
3269					&approval_entry,
3270					RequiredTranches::All,
3271				)
3272				// when all are required, we are just waiting for the first 1/3+
3273				.is_approved(Tick::max_value()),
3274				RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3275					let drifted_tranche_now =
3276						tranche_now.saturating_sub(clock_drift as DelayTranche);
3277					assignment.tranche() <= maximum_broadcast &&
3278						assignment.tranche() <= drifted_tranche_now
3279				},
3280				RequiredTranches::Exact { .. } => {
3281					// indicates that no new assignments are needed at the moment.
3282					false
3283				},
3284			}
3285		},
3286	}
3287}
3288
3289async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3290	sender: &mut Sender,
3291	state: &mut State,
3292	db: &mut OverlayedBackend<'_, impl Backend>,
3293	session_info_provider: &mut RuntimeInfo,
3294	relay_block: Hash,
3295	candidate_hash: CandidateHash,
3296	metrics: &Metrics,
3297	wakeups: &Wakeups,
3298) -> SubsystemResult<Vec<Action>> {
3299	let block_entry = db.load_block_entry(&relay_block)?;
3300	let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3301
3302	// If either is not present, we have nothing to wakeup. Might have lost a race with finality
3303	let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3304		(Some(b), Some(c)) => (b, c),
3305		_ => return Ok(Vec::new()),
3306	};
3307
3308	let (no_show_slots, needed_approvals) = match get_session_info_by_index(
3309		session_info_provider,
3310		sender,
3311		block_entry.block_hash(),
3312		block_entry.session(),
3313	)
3314	.await
3315	{
3316		Some(i) => (i.no_show_slots, i.needed_approvals),
3317		None => return Ok(Vec::new()),
3318	};
3319
3320	let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3321	let no_show_duration =
3322		slot_number_to_tick(state.slot_duration_millis, Slot::from(u64::from(no_show_slots)));
3323	let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3324
3325	gum::trace!(
3326		target: LOG_TARGET,
3327		tranche = tranche_now,
3328		?candidate_hash,
3329		block_hash = ?relay_block,
3330		"Processing wakeup",
3331	);
3332
3333	let (should_trigger, backing_group) = {
3334		let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3335			Some(e) => e,
3336			None => return Ok(Vec::new()),
3337		};
3338
3339		let tranches_to_approve = approval_checking::tranches_to_approve(
3340			&approval_entry,
3341			candidate_entry.approvals(),
3342			tranche_now,
3343			block_tick,
3344			no_show_duration,
3345			needed_approvals as _,
3346		);
3347
3348		let should_trigger = should_trigger_assignment(
3349			&approval_entry,
3350			&candidate_entry,
3351			tranches_to_approve.required_tranches,
3352			tranche_now,
3353		);
3354
3355		(should_trigger, approval_entry.backing_group())
3356	};
3357
3358	gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3359
3360	let mut actions = Vec::new();
3361	let candidate_receipt = candidate_entry.candidate_receipt().clone();
3362
3363	let maybe_cert = if should_trigger {
3364		let maybe_cert = {
3365			let approval_entry = candidate_entry
3366				.approval_entry_mut(&relay_block)
3367				.expect("should_trigger only true if this fetched earlier; qed");
3368
3369			approval_entry.trigger_our_assignment(state.clock.tick_now())
3370		};
3371
3372		db.write_candidate_entry(candidate_entry.clone());
3373
3374		maybe_cert
3375	} else {
3376		None
3377	};
3378
3379	if let Some((cert, val_index, tranche)) = maybe_cert {
3380		let ExtendedSessionInfo { ref executor_params, .. } = match get_extended_session_info(
3381			session_info_provider,
3382			sender,
3383			candidate_entry.candidate_receipt().descriptor().relay_parent(),
3384		)
3385		.await
3386		{
3387			Some(i) => i,
3388			None => return Ok(actions),
3389		};
3390		let indirect_cert =
3391			IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3392
3393		gum::trace!(
3394			target: LOG_TARGET,
3395			?candidate_hash,
3396			para_id = ?candidate_receipt.descriptor.para_id(),
3397			block_hash = ?relay_block,
3398			"Launching approval work.",
3399		);
3400
3401		let candidate_core_index = block_entry
3402			.candidates()
3403			.iter()
3404			.find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3405
3406		if let Some(claimed_core_indices) =
3407			get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3408		{
3409			match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3410				Ok(claimed_candidate_indices) => {
3411					// Ensure we distribute multiple core assignments just once.
3412					let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3413						!block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3414					} else {
3415						true
3416					};
3417					db.write_block_entry(block_entry.clone());
3418					actions.push(Action::LaunchApproval {
3419						claimed_candidate_indices,
3420						candidate_hash,
3421						indirect_cert,
3422						assignment_tranche: tranche,
3423						relay_block_hash: relay_block,
3424						session: block_entry.session(),
3425						executor_params: executor_params.clone(),
3426						candidate: candidate_receipt,
3427						backing_group,
3428						distribute_assignment,
3429						core_index: candidate_core_index,
3430					});
3431				},
3432				Err(err) => {
3433					// Never happens, it should only happen if no cores are claimed, which is a
3434					// bug.
3435					gum::warn!(
3436						target: LOG_TARGET,
3437						block_hash = ?relay_block,
3438						?err,
3439						"Failed to create assignment bitfield"
3440					);
3441				},
3442			};
3443		} else {
3444			gum::warn!(
3445				target: LOG_TARGET,
3446				block_hash = ?relay_block,
3447				?candidate_hash,
3448				"Cannot get assignment claimed core indices",
3449			);
3450		}
3451	}
3452	// Although we checked approval earlier in this function,
3453	// this wakeup might have advanced the state to approved via
3454	// a no-show that was immediately covered and therefore
3455	// we need to check for that and advance the state on-disk.
3456	//
3457	// Note that this function also schedules a wakeup as necessary.
3458	actions.extend(
3459		advance_approval_state(
3460			sender,
3461			state,
3462			db,
3463			session_info_provider,
3464			metrics,
3465			block_entry,
3466			candidate_hash,
3467			candidate_entry,
3468			ApprovalStateTransition::WakeupProcessed,
3469			wakeups,
3470		)
3471		.await,
3472	);
3473
3474	Ok(actions)
3475}
3476
3477// Launch approval work, returning an `AbortHandle` which corresponds to the background task
3478// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
3479// to cancel the background work and any requests it has spawned.
3480#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3481async fn launch_approval<
3482	Sender: SubsystemSender<RuntimeApiMessage>
3483		+ SubsystemSender<AvailabilityRecoveryMessage>
3484		+ SubsystemSender<DisputeCoordinatorMessage>
3485		+ SubsystemSender<CandidateValidationMessage>,
3486>(
3487	mut sender: Sender,
3488	spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3489	metrics: Metrics,
3490	session_index: SessionIndex,
3491	candidate: CandidateReceipt,
3492	validator_index: ValidatorIndex,
3493	block_hash: Hash,
3494	backing_group: GroupIndex,
3495	executor_params: ExecutorParams,
3496	core_index: Option<CoreIndex>,
3497	retry: RetryApprovalInfo,
3498) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3499	let (a_tx, a_rx) = oneshot::channel();
3500	let (code_tx, code_rx) = oneshot::channel();
3501
3502	// The background future returned by this function may
3503	// be dropped before completing. This guard is used to ensure that the approval
3504	// work is correctly counted as stale even if so.
3505	struct StaleGuard(Option<Metrics>);
3506
3507	impl StaleGuard {
3508		fn take(mut self) -> Metrics {
3509			self.0.take().expect(
3510				"
3511				consumed after take; so this cannot be called twice; \
3512				nothing in this function reaches into the struct to avoid this API; \
3513				qed
3514			",
3515			)
3516		}
3517	}
3518
3519	impl Drop for StaleGuard {
3520		fn drop(&mut self) {
3521			if let Some(metrics) = self.0.as_ref() {
3522				metrics.on_approval_stale();
3523			}
3524		}
3525	}
3526
3527	let candidate_hash = candidate.hash();
3528	let para_id = candidate.descriptor.para_id();
3529	let mut next_retry = None;
3530	gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3531
3532	let timer = metrics.time_recover_and_approve();
3533	sender
3534		.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3535			candidate.clone(),
3536			session_index,
3537			Some(backing_group),
3538			core_index,
3539			a_tx,
3540		))
3541		.await;
3542
3543	sender
3544		.send_message(RuntimeApiMessage::Request(
3545			block_hash,
3546			RuntimeApiRequest::ValidationCodeByHash(
3547				candidate.descriptor.validation_code_hash(),
3548				code_tx,
3549			),
3550		))
3551		.await;
3552
3553	let candidate = candidate.clone();
3554	let metrics_guard = StaleGuard(Some(metrics));
3555	let background = async move {
3556		// Force the move of the timer into the background task.
3557		let _timer = timer;
3558		let available_data = match a_rx.await {
3559			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3560			Ok(Ok(a)) => a,
3561			Ok(Err(e)) => {
3562				match &e {
3563					&RecoveryError::Unavailable => {
3564						gum::warn!(
3565							target: LOG_TARGET,
3566							?para_id,
3567							?candidate_hash,
3568							attempts_remaining = retry.attempts_remaining,
3569							"Data unavailable for candidate {:?}",
3570							(candidate_hash, candidate.descriptor.para_id()),
3571						);
3572						// Availability could fail if we did not discover much of the network, so
3573						// let's back off and order the subsystem to retry at a later point if the
3574						// approval is still needed, because no-show wasn't covered yet.
3575						if retry.attempts_remaining > 0 {
3576							Delay::new(retry.backoff).await;
3577							next_retry = Some(RetryApprovalInfo {
3578								candidate,
3579								backing_group,
3580								executor_params,
3581								core_index,
3582								session_index,
3583								attempts_remaining: retry.attempts_remaining - 1,
3584								backoff: retry.backoff,
3585							});
3586						} else {
3587							next_retry = None;
3588						}
3589						metrics_guard.take().on_approval_unavailable();
3590					},
3591					&RecoveryError::ChannelClosed => {
3592						gum::warn!(
3593							target: LOG_TARGET,
3594							?para_id,
3595							?candidate_hash,
3596							"Channel closed while recovering data for candidate {:?}",
3597							(candidate_hash, candidate.descriptor.para_id()),
3598						);
3599						// do nothing. we'll just be a no-show and that'll cause others to rise up.
3600						metrics_guard.take().on_approval_unavailable();
3601					},
3602					&RecoveryError::Invalid => {
3603						gum::warn!(
3604							target: LOG_TARGET,
3605							?para_id,
3606							?candidate_hash,
3607							"Data recovery invalid for candidate {:?}",
3608							(candidate_hash, candidate.descriptor.para_id()),
3609						);
3610						issue_local_invalid_statement(
3611							&mut sender,
3612							session_index,
3613							candidate_hash,
3614							candidate.clone(),
3615						);
3616						metrics_guard.take().on_approval_invalid();
3617					},
3618				}
3619				return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
3620			},
3621		};
3622
3623		let validation_code = match code_rx.await {
3624			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3625			Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3626			Ok(Ok(Some(code))) => code,
3627			Ok(Ok(None)) => {
3628				gum::warn!(
3629					target: LOG_TARGET,
3630					"Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3631					candidate.descriptor.relay_parent(),
3632					block_hash,
3633				);
3634
3635				// No dispute necessary, as this indicates that the chain is not behaving
3636				// according to expectations.
3637				metrics_guard.take().on_approval_unavailable();
3638				return ApprovalState::failed(validator_index, candidate_hash)
3639			},
3640		};
3641
3642		let (val_tx, val_rx) = oneshot::channel();
3643		sender
3644			.send_message(CandidateValidationMessage::ValidateFromExhaustive {
3645				validation_data: available_data.validation_data,
3646				validation_code,
3647				candidate_receipt: candidate.clone(),
3648				pov: available_data.pov,
3649				executor_params,
3650				exec_kind: PvfExecKind::Approval,
3651				response_sender: val_tx,
3652			})
3653			.await;
3654
3655		match val_rx.await {
3656			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3657			Ok(Ok(ValidationResult::Valid(_, _))) => {
3658				// Validation checked out. Issue an approval command. If the underlying service is
3659				// unreachable, then there isn't anything we can do.
3660
3661				gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3662
3663				let _ = metrics_guard.take();
3664				return ApprovalState::approved(validator_index, candidate_hash)
3665			},
3666			Ok(Ok(ValidationResult::Invalid(reason))) => {
3667				gum::warn!(
3668					target: LOG_TARGET,
3669					?reason,
3670					?candidate_hash,
3671					?para_id,
3672					"Detected invalid candidate as an approval checker.",
3673				);
3674
3675				issue_local_invalid_statement(
3676					&mut sender,
3677					session_index,
3678					candidate_hash,
3679					candidate.clone(),
3680				);
3681				metrics_guard.take().on_approval_invalid();
3682				return ApprovalState::failed(validator_index, candidate_hash)
3683			},
3684			Ok(Err(e)) => {
3685				gum::error!(
3686					target: LOG_TARGET,
3687					err = ?e,
3688					?candidate_hash,
3689					?para_id,
3690					"Failed to validate candidate due to internal error",
3691				);
3692				metrics_guard.take().on_approval_error();
3693				return ApprovalState::failed(validator_index, candidate_hash)
3694			},
3695		}
3696	};
3697	let (background, remote_handle) = background.remote_handle();
3698	spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3699	Ok(remote_handle)
3700}
3701
3702// Issue and import a local approval vote. Should only be invoked after approval checks
3703// have been done.
3704#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3705async fn issue_approval<
3706	Sender: SubsystemSender<RuntimeApiMessage>,
3707	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3708>(
3709	sender: &mut Sender,
3710	approval_voting_sender: &mut ADSender,
3711	state: &mut State,
3712	db: &mut OverlayedBackend<'_, impl Backend>,
3713	session_info_provider: &mut RuntimeInfo,
3714	metrics: &Metrics,
3715	candidate_hash: CandidateHash,
3716	delayed_approvals_timers: &mut DelayedApprovalTimer,
3717	ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3718	wakeups: &Wakeups,
3719) -> SubsystemResult<Vec<Action>> {
3720	let mut block_entry = match db.load_block_entry(&block_hash)? {
3721		Some(b) => b,
3722		None => {
3723			// not a cause for alarm - just lost a race with pruning, most likely.
3724			metrics.on_approval_stale();
3725			return Ok(Vec::new())
3726		},
3727	};
3728
3729	let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3730	{
3731		None => {
3732			gum::warn!(
3733				target: LOG_TARGET,
3734				"Candidate hash {} is not present in the block entry's candidates for relay block {}",
3735				candidate_hash,
3736				block_entry.parent_hash(),
3737			);
3738
3739			metrics.on_approval_error();
3740			return Ok(Vec::new())
3741		},
3742		Some(idx) => idx,
3743	};
3744
3745	let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3746		Some((_, h)) => *h,
3747		None => {
3748			gum::warn!(
3749				target: LOG_TARGET,
3750				"Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3751				candidate_index,
3752				block_hash,
3753			);
3754
3755			metrics.on_approval_error();
3756			return Ok(Vec::new())
3757		},
3758	};
3759
3760	let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3761		Some(c) => c,
3762		None => {
3763			gum::warn!(
3764				target: LOG_TARGET,
3765				"Missing entry for candidate index {} included at block {:?}",
3766				candidate_index,
3767				block_hash,
3768			);
3769
3770			metrics.on_approval_error();
3771			return Ok(Vec::new())
3772		},
3773	};
3774
3775	let session_info = match get_session_info_by_index(
3776		session_info_provider,
3777		sender,
3778		block_entry.parent_hash(),
3779		block_entry.session(),
3780	)
3781	.await
3782	{
3783		Some(s) => s,
3784		None => return Ok(Vec::new()),
3785	};
3786
3787	if block_entry
3788		.defer_candidate_signature(
3789			candidate_index as _,
3790			candidate_hash,
3791			compute_delayed_approval_sending_tick(
3792				state,
3793				&block_entry,
3794				&candidate_entry,
3795				session_info,
3796				&metrics,
3797			),
3798		)
3799		.is_some()
3800	{
3801		gum::error!(
3802			target: LOG_TARGET,
3803			?candidate_hash,
3804			?block_hash,
3805			validator_index = validator_index.0,
3806			"Possible bug, we shouldn't have to defer a candidate more than once",
3807		);
3808	}
3809
3810	gum::debug!(
3811		target: LOG_TARGET,
3812		?candidate_hash,
3813		?block_hash,
3814		validator_index = validator_index.0,
3815		"Ready to issue approval vote",
3816	);
3817
3818	let actions = advance_approval_state(
3819		sender,
3820		state,
3821		db,
3822		session_info_provider,
3823		metrics,
3824		block_entry,
3825		candidate_hash,
3826		candidate_entry,
3827		ApprovalStateTransition::LocalApproval(validator_index as _),
3828		wakeups,
3829	)
3830	.await;
3831
3832	if let Some(next_wakeup) = maybe_create_signature(
3833		db,
3834		session_info_provider,
3835		state,
3836		sender,
3837		approval_voting_sender,
3838		block_hash,
3839		validator_index,
3840		metrics,
3841	)
3842	.await?
3843	{
3844		delayed_approvals_timers.maybe_arm_timer(
3845			next_wakeup,
3846			state.clock.as_ref(),
3847			block_hash,
3848			validator_index,
3849		);
3850	}
3851	Ok(actions)
3852}
3853
3854// Create signature for the approved candidates pending signatures
3855#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3856async fn maybe_create_signature<
3857	Sender: SubsystemSender<RuntimeApiMessage>,
3858	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3859>(
3860	db: &mut OverlayedBackend<'_, impl Backend>,
3861	session_info_provider: &mut RuntimeInfo,
3862	state: &State,
3863	sender: &mut Sender,
3864	approval_voting_sender: &mut ADSender,
3865	block_hash: Hash,
3866	validator_index: ValidatorIndex,
3867	metrics: &Metrics,
3868) -> SubsystemResult<Option<Tick>> {
3869	let mut block_entry = match db.load_block_entry(&block_hash)? {
3870		Some(b) => b,
3871		None => {
3872			// not a cause for alarm - just lost a race with pruning, most likely.
3873			metrics.on_approval_stale();
3874			gum::debug!(
3875				target: LOG_TARGET,
3876				"Could not find block that needs signature {:}", block_hash
3877			);
3878			return Ok(None)
3879		},
3880	};
3881
3882	let approval_params = state
3883		.get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3884		.await
3885		.unwrap_or_default();
3886
3887	gum::trace!(
3888		target: LOG_TARGET,
3889		"Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3890	);
3891	let tick_now = state.clock.tick_now();
3892
3893	let (candidates_to_sign, sign_no_later_then) = block_entry
3894		.get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3895
3896	let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3897		Some(candidates_to_sign) => candidates_to_sign,
3898		None => return Ok(sign_no_later_then),
3899	};
3900
3901	let session_info = match get_session_info_by_index(
3902		session_info_provider,
3903		sender,
3904		block_entry.parent_hash(),
3905		block_entry.session(),
3906	)
3907	.await
3908	{
3909		Some(s) => s,
3910		None => {
3911			metrics.on_approval_error();
3912			gum::error!(
3913				target: LOG_TARGET,
3914				"Could not retrieve the session"
3915			);
3916			return Ok(None)
3917		},
3918	};
3919
3920	let validator_pubkey = match session_info.validators.get(validator_index) {
3921		Some(p) => p,
3922		None => {
3923			gum::error!(
3924				target: LOG_TARGET,
3925				"Validator index {} out of bounds in session {}",
3926				validator_index.0,
3927				block_entry.session(),
3928			);
3929
3930			metrics.on_approval_error();
3931			return Ok(None)
3932		},
3933	};
3934
3935	let signature = match sign_approval(
3936		&state.keystore,
3937		&validator_pubkey,
3938		&candidates_hashes,
3939		block_entry.session(),
3940	) {
3941		Some(sig) => sig,
3942		None => {
3943			gum::error!(
3944				target: LOG_TARGET,
3945				validator_index = ?validator_index,
3946				session = ?block_entry.session(),
3947				"Could not issue approval signature. Assignment key present but not validator key?",
3948			);
3949
3950			metrics.on_approval_error();
3951			return Ok(None)
3952		},
3953	};
3954	metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3955
3956	let candidate_entries = candidates_hashes
3957		.iter()
3958		.map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3959		.collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3960
3961	for mut candidate_entry in candidate_entries {
3962		let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3963			candidate_entry.approval_entry_mut(&block_entry.block_hash())
3964		});
3965
3966		match approval_entry {
3967			Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3968				signature: signature.clone(),
3969				signed_candidates_indices: candidates_indices.clone(),
3970			}),
3971			None => {
3972				gum::error!(
3973					target: LOG_TARGET,
3974					candidate_entry = ?candidate_entry,
3975					"Candidate scheduled for signing approval entry should not be None"
3976				);
3977			},
3978		};
3979		candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3980	}
3981
3982	metrics.on_approval_produced();
3983
3984	approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3985		IndirectSignedApprovalVoteV2 {
3986			block_hash: block_entry.block_hash(),
3987			candidate_indices: candidates_indices,
3988			validator: validator_index,
3989			signature,
3990		},
3991	));
3992
3993	gum::trace!(
3994		target: LOG_TARGET,
3995		?block_hash,
3996		signed_candidates = ?block_entry.num_candidates_pending_signature(),
3997		"Issue approval votes",
3998	);
3999	block_entry.issued_approval();
4000	db.write_block_entry(block_entry.into());
4001	Ok(None)
4002}
4003
4004// Sign an approval vote. Fails if the key isn't present in the store.
4005fn sign_approval(
4006	keystore: &LocalKeystore,
4007	public: &ValidatorId,
4008	candidate_hashes: &[CandidateHash],
4009	session_index: SessionIndex,
4010) -> Option<ValidatorSignature> {
4011	let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
4012
4013	let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
4014
4015	Some(key.sign(&payload[..]))
4016}
4017
4018/// Send `IssueLocalStatement` to dispute-coordinator.
4019fn issue_local_invalid_statement<Sender>(
4020	sender: &mut Sender,
4021	session_index: SessionIndex,
4022	candidate_hash: CandidateHash,
4023	candidate: CandidateReceipt,
4024) where
4025	Sender: SubsystemSender<DisputeCoordinatorMessage>,
4026{
4027	// We need to send an unbounded message here to break a cycle:
4028	// DisputeCoordinatorMessage::IssueLocalStatement ->
4029	// ApprovalVotingMessage::GetApprovalSignaturesForCandidate.
4030	//
4031	// Use of unbounded _should_ be fine here as raising a dispute should be an
4032	// exceptional event. Even in case of bugs: There can be no more than
4033	// number of slots per block requests every block. Also for sending this
4034	// message a full recovery and validation procedure took place, which takes
4035	// longer than issuing a local statement + import.
4036	sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
4037		session_index,
4038		candidate_hash,
4039		candidate.clone(),
4040		false,
4041	));
4042}
4043
4044// Computes what is the latest tick we can send an approval
4045fn compute_delayed_approval_sending_tick(
4046	state: &State,
4047	block_entry: &BlockEntry,
4048	candidate_entry: &CandidateEntry,
4049	session_info: &SessionInfo,
4050	metrics: &Metrics,
4051) -> Tick {
4052	let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4053	let assignment_tranche = candidate_entry
4054		.approval_entry(&block_entry.block_hash())
4055		.and_then(|approval_entry| approval_entry.our_assignment())
4056		.map(|our_assignment| our_assignment.tranche())
4057		.unwrap_or_default();
4058
4059	let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4060
4061	let no_show_duration_ticks = slot_number_to_tick(
4062		state.slot_duration_millis,
4063		Slot::from(u64::from(session_info.no_show_slots)),
4064	);
4065	let tick_now = state.clock.tick_now();
4066
4067	let sign_no_later_than = min(
4068		tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4069		// We don't want to accidentally cause no-shows, so if we are past
4070		// the second half of the no show time, force the sending of the
4071		// approval immediately.
4072		assignment_triggered_tick + no_show_duration_ticks / 2,
4073	);
4074
4075	metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4076	sign_no_later_than
4077}