referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_approval_voting/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Approval Voting Subsystem.
18//!
19//! This subsystem is responsible for determining candidates to do approval checks
20//! on, performing those approval checks, and tracking the assignments and approvals
21//! of others. It uses this information to determine when candidates and blocks have
22//! been sufficiently approved to finalize.
23
24use futures_timer::Delay;
25use polkadot_node_primitives::{
26	approval::{
27		v1::{BlockApprovalMeta, DelayTranche},
28		v2::{
29			AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30			IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31		},
32	},
33	ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36	errors::RecoveryError,
37	messages::{
38		ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39		ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40		AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41		ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42		DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43		RuntimeApiRequest,
44	},
45	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46	SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49	self,
50	database::Database,
51	metrics::{self, prometheus},
52	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53	TimeoutExt,
54};
55use polkadot_primitives::{
56	ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57	CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex,
58	Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair,
59	ValidatorSignature,
60};
61use sc_keystore::LocalKeystore;
62use sp_application_crypto::Pair;
63use sp_consensus::SyncOracle;
64use sp_consensus_slots::Slot;
65use std::time::Instant;
66
67// The max number of blocks we keep track of assignments gathering times. Normally,
68// this would never be reached because we prune the data on finalization, but we need
69// to also ensure the data is not growing unecessarily large.
70const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
71
72use futures::{
73	channel::oneshot,
74	future::{BoxFuture, RemoteHandle},
75	prelude::*,
76	stream::FuturesUnordered,
77	StreamExt,
78};
79
80use std::{
81	cmp::min,
82	collections::{
83		btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
84	},
85	sync::Arc,
86	time::Duration,
87};
88
89use schnellru::{ByLength, LruMap};
90
91use approval_checking::RequiredTranches;
92use bitvec::{order::Lsb0, vec::BitVec};
93pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
94use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
95use polkadot_node_primitives::approval::time::{
96	slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
97};
98
99mod approval_checking;
100pub mod approval_db;
101mod backend;
102pub mod criteria;
103mod import;
104mod ops;
105mod persisted_entries;
106
107use crate::{
108	approval_checking::{Check, TranchesToApproveResult},
109	approval_db::common::{Config as DatabaseConfig, DbBackend},
110	backend::{Backend, OverlayedBackend},
111	criteria::InvalidAssignmentReason,
112	persisted_entries::OurApproval,
113};
114
115#[cfg(test)]
116mod tests;
117
118const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
119/// How long are we willing to wait for approval signatures?
120///
121/// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
122/// lock issues for example.
123const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
124const APPROVAL_CACHE_SIZE: u32 = 1024;
125
126/// The maximum number of times we retry to approve a block if is still needed.
127const MAX_APPROVAL_RETRIES: u32 = 16;
128
129const APPROVAL_DELAY: Tick = 2;
130pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
131
132// The max number of ticks we delay sending the approval after we are ready to issue the approval
133const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
134
135// If the node restarted and the tranche has passed without the assignment
136// being trigger, we won't trigger the assignment at restart because we don't have
137// an wakeup schedule for it.
138// The solution, is to always schedule a wake up after the restart and let the
139// process_wakeup to decide if the assignment needs to be triggered.
140// We need to have a delay after restart to give time to the node to catch up with
141// messages and not trigger its assignment unnecessarily, because it hasn't seen
142// the assignments from the other validators.
143const RESTART_WAKEUP_DELAY: Tick = 12;
144
145/// Configuration for the approval voting subsystem
146#[derive(Debug, Clone)]
147pub struct Config {
148	/// The column family in the DB where approval-voting data is stored.
149	pub col_approval_data: u32,
150	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
151	/// divisible by 500.
152	pub slot_duration_millis: u64,
153}
154
155// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
156// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
157//
158// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
159// the node follows the new incoming blocks and finalized number, but does not yet participate.
160//
161// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
162// subsystem of all unfinalized blocks and the candidates included within them, as well as all
163// votes that the local node itself has cast on candidates within those blocks.
164enum Mode {
165	Active,
166	Syncing(Box<dyn SyncOracle + Send>),
167}
168
169/// The approval voting subsystem.
170pub struct ApprovalVotingSubsystem {
171	/// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
172	///
173	/// We do a lot of VRF signing and need the keys to have low latency.
174	keystore: Arc<LocalKeystore>,
175	db_config: DatabaseConfig,
176	slot_duration_millis: u64,
177	db: Arc<dyn Database>,
178	mode: Mode,
179	metrics: Metrics,
180	clock: Arc<dyn Clock + Send + Sync>,
181	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
182	/// The maximum time we retry to approve a block if it is still needed and PoV fetch failed.
183	max_approval_retries: u32,
184	/// The backoff before we retry the approval.
185	retry_backoff: Duration,
186}
187
188#[derive(Clone)]
189struct MetricsInner {
190	imported_candidates_total: prometheus::Counter<prometheus::U64>,
191	assignments_produced: prometheus::Histogram,
192	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
193	no_shows_total: prometheus::Counter<prometheus::U64>,
194	// The difference from `no_shows_total` is that this counts all observed no-shows at any
195	// moment in time. While `no_shows_total` catches that the no-shows at the moment the candidate
196	// is approved, approvals might arrive late and `no_shows_total` wouldn't catch that number.
197	observed_no_shows: prometheus::Counter<prometheus::U64>,
198	approved_by_one_third: prometheus::Counter<prometheus::U64>,
199	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
200	coalesced_approvals_buckets: prometheus::Histogram,
201	coalesced_approvals_delay: prometheus::Histogram,
202	candidate_approval_time_ticks: prometheus::Histogram,
203	block_approval_time_ticks: prometheus::Histogram,
204	time_db_transaction: prometheus::Histogram,
205	time_recover_and_approve: prometheus::Histogram,
206	candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
207	unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
208	// The time it takes in each stage to gather enough assignments.
209	// We defined a `stage` as being the entire process of gathering enough assignments to
210	// be able to approve a candidate:
211	// E.g:
212	// - Stage 0: We wait for the needed_approvals assignments to be gathered.
213	// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
214	// - Stage 2: We wait for enough tranches to cover all no-shows  of stage 1.
215	assignments_gathering_time_by_stage: prometheus::HistogramVec,
216}
217
218/// Approval Voting metrics.
219#[derive(Default, Clone)]
220pub struct Metrics(Option<MetricsInner>);
221
222impl Metrics {
223	fn on_candidate_imported(&self) {
224		if let Some(metrics) = &self.0 {
225			metrics.imported_candidates_total.inc();
226		}
227	}
228
229	fn on_assignment_produced(&self, tranche: DelayTranche) {
230		if let Some(metrics) = &self.0 {
231			metrics.assignments_produced.observe(tranche as f64);
232		}
233	}
234
235	fn on_approval_coalesce(&self, num_coalesced: u32) {
236		if let Some(metrics) = &self.0 {
237			// Count how many candidates we covered with this coalesced approvals,
238			// so that the heat-map really gives a good understanding of the scales.
239			for _ in 0..num_coalesced {
240				metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
241			}
242		}
243	}
244
245	fn on_delayed_approval(&self, delayed_ticks: u64) {
246		if let Some(metrics) = &self.0 {
247			metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
248		}
249	}
250
251	fn on_approval_stale(&self) {
252		if let Some(metrics) = &self.0 {
253			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
254		}
255	}
256
257	fn on_approval_invalid(&self) {
258		if let Some(metrics) = &self.0 {
259			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
260		}
261	}
262
263	fn on_approval_unavailable(&self) {
264		if let Some(metrics) = &self.0 {
265			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
266		}
267	}
268
269	fn on_approval_error(&self) {
270		if let Some(metrics) = &self.0 {
271			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
272		}
273	}
274
275	fn on_approval_produced(&self) {
276		if let Some(metrics) = &self.0 {
277			metrics.approvals_produced_total.with_label_values(&["success"]).inc()
278		}
279	}
280
281	fn on_no_shows(&self, n: usize) {
282		if let Some(metrics) = &self.0 {
283			metrics.no_shows_total.inc_by(n as u64);
284		}
285	}
286
287	fn on_observed_no_shows(&self, n: usize) {
288		if let Some(metrics) = &self.0 {
289			metrics.observed_no_shows.inc_by(n as u64);
290		}
291	}
292
293	fn on_approved_by_one_third(&self) {
294		if let Some(metrics) = &self.0 {
295			metrics.approved_by_one_third.inc();
296		}
297	}
298
299	fn on_wakeup(&self) {
300		if let Some(metrics) = &self.0 {
301			metrics.wakeups_triggered_total.inc();
302		}
303	}
304
305	fn on_candidate_approved(&self, ticks: Tick) {
306		if let Some(metrics) = &self.0 {
307			metrics.candidate_approval_time_ticks.observe(ticks as f64);
308		}
309	}
310
311	fn on_block_approved(&self, ticks: Tick) {
312		if let Some(metrics) = &self.0 {
313			metrics.block_approval_time_ticks.observe(ticks as f64);
314		}
315	}
316
317	fn on_candidate_signatures_request(&self) {
318		if let Some(metrics) = &self.0 {
319			metrics.candidate_signatures_requests_total.inc();
320		}
321	}
322
323	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
324		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
325	}
326
327	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
328		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
329	}
330
331	fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
332		if let Some(metrics) = &self.0 {
333			metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
334		}
335	}
336
337	pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
338		if let Some(metrics) = &self.0 {
339			let stage_string = stage.to_string();
340			// We don't want to have too many metrics entries with this label to not put unncessary
341			// pressure on the metrics infrastructure, so we cap the stage at 10, which is
342			// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
343			// be more than enough.
344			metrics
345				.assignments_gathering_time_by_stage
346				.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
347				.observe(elapsed_as_millis as f64);
348		}
349	}
350}
351
352impl metrics::Metrics for Metrics {
353	fn try_register(
354		registry: &prometheus::Registry,
355	) -> std::result::Result<Self, prometheus::PrometheusError> {
356		let metrics = MetricsInner {
357			imported_candidates_total: prometheus::register(
358				prometheus::Counter::new(
359					"polkadot_parachain_imported_candidates_total",
360					"Number of candidates imported by the approval voting subsystem",
361				)?,
362				registry,
363			)?,
364			assignments_produced: prometheus::register(
365				prometheus::Histogram::with_opts(
366					prometheus::HistogramOpts::new(
367						"polkadot_parachain_assignments_produced",
368						"Assignments and tranches produced by the approval voting subsystem",
369					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
370				)?,
371				registry,
372			)?,
373			approvals_produced_total: prometheus::register(
374				prometheus::CounterVec::new(
375					prometheus::Opts::new(
376						"polkadot_parachain_approvals_produced_total",
377						"Number of approvals produced by the approval voting subsystem",
378					),
379					&["status"]
380				)?,
381				registry,
382			)?,
383			no_shows_total: prometheus::register(
384				prometheus::Counter::new(
385					"polkadot_parachain_approvals_no_shows_total",
386					"Number of assignments which became no-shows in the approval voting subsystem",
387				)?,
388				registry,
389			)?,
390			observed_no_shows: prometheus::register(
391				prometheus::Counter::new(
392					"polkadot_parachain_approvals_observed_no_shows_total",
393					"Number of observed no shows at any moment in time",
394				)?,
395				registry,
396			)?,
397			wakeups_triggered_total: prometheus::register(
398				prometheus::Counter::new(
399					"polkadot_parachain_approvals_wakeups_total",
400					"Number of times we woke up to process a candidate in the approval voting subsystem",
401				)?,
402				registry,
403			)?,
404			candidate_approval_time_ticks: prometheus::register(
405				prometheus::Histogram::with_opts(
406					prometheus::HistogramOpts::new(
407						"polkadot_parachain_approvals_candidate_approval_time_ticks",
408						"Number of ticks (500ms) to approve candidates.",
409					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
410				)?,
411				registry,
412			)?,
413			coalesced_approvals_buckets: prometheus::register(
414				prometheus::Histogram::with_opts(
415					prometheus::HistogramOpts::new(
416						"polkadot_parachain_approvals_coalesced_approvals_buckets",
417						"Number of coalesced approvals.",
418					).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
419				)?,
420				registry,
421			)?,
422			coalesced_approvals_delay: prometheus::register(
423				prometheus::Histogram::with_opts(
424					prometheus::HistogramOpts::new(
425						"polkadot_parachain_approvals_coalescing_delay",
426						"Number of ticks we delay the sending of a candidate approval",
427					).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
428				)?,
429				registry,
430			)?,
431			approved_by_one_third: prometheus::register(
432				prometheus::Counter::new(
433					"polkadot_parachain_approved_by_one_third",
434					"Number of candidates where more than one third had to vote ",
435				)?,
436				registry,
437			)?,
438			block_approval_time_ticks: prometheus::register(
439				prometheus::Histogram::with_opts(
440					prometheus::HistogramOpts::new(
441						"polkadot_parachain_approvals_blockapproval_time_ticks",
442						"Number of ticks (500ms) to approve blocks.",
443					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
444				)?,
445				registry,
446			)?,
447			time_db_transaction: prometheus::register(
448				prometheus::Histogram::with_opts(
449					prometheus::HistogramOpts::new(
450						"polkadot_parachain_time_approval_db_transaction",
451						"Time spent writing an approval db transaction.",
452					)
453				)?,
454				registry,
455			)?,
456			time_recover_and_approve: prometheus::register(
457				prometheus::Histogram::with_opts(
458					prometheus::HistogramOpts::new(
459						"polkadot_parachain_time_recover_and_approve",
460						"Time spent recovering and approving data in approval voting",
461					)
462				)?,
463				registry,
464			)?,
465			candidate_signatures_requests_total: prometheus::register(
466				prometheus::Counter::new(
467					"polkadot_parachain_approval_candidate_signatures_requests_total",
468					"Number of times signatures got requested by other subsystems",
469				)?,
470				registry,
471			)?,
472			unapproved_candidates_in_unfinalized_chain: prometheus::register(
473				prometheus::Gauge::new(
474					"polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
475					"Number of unapproved candidates in unfinalized chain",
476				)?,
477				registry,
478			)?,
479			assignments_gathering_time_by_stage: prometheus::register(
480				prometheus::HistogramVec::new(
481					prometheus::HistogramOpts::new(
482						"polkadot_parachain_assignments_gather_time_by_stage_ms",
483						"The time in ms it takes for each stage to gather enough assignments needed for approval",
484					)
485					.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
486					&["stage"],
487				)?,
488				registry,
489			)?,
490		};
491
492		Ok(Metrics(Some(metrics)))
493	}
494}
495
496impl ApprovalVotingSubsystem {
497	/// Create a new approval voting subsystem with the given keystore, config, and database.
498	pub fn with_config(
499		config: Config,
500		db: Arc<dyn Database>,
501		keystore: Arc<LocalKeystore>,
502		sync_oracle: Box<dyn SyncOracle + Send>,
503		metrics: Metrics,
504		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
505	) -> Self {
506		ApprovalVotingSubsystem::with_config_and_clock(
507			config,
508			db,
509			keystore,
510			sync_oracle,
511			metrics,
512			Arc::new(SystemClock {}),
513			spawner,
514			MAX_APPROVAL_RETRIES,
515			APPROVAL_CHECKING_TIMEOUT / 2,
516		)
517	}
518
519	/// Create a new approval voting subsystem with the given keystore, config, and database.
520	pub fn with_config_and_clock(
521		config: Config,
522		db: Arc<dyn Database>,
523		keystore: Arc<LocalKeystore>,
524		sync_oracle: Box<dyn SyncOracle + Send>,
525		metrics: Metrics,
526		clock: Arc<dyn Clock + Send + Sync>,
527		spawner: Arc<dyn overseer::gen::Spawner + 'static>,
528		max_approval_retries: u32,
529		retry_backoff: Duration,
530	) -> Self {
531		ApprovalVotingSubsystem {
532			keystore,
533			slot_duration_millis: config.slot_duration_millis,
534			db,
535			db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
536			mode: Mode::Syncing(sync_oracle),
537			metrics,
538			clock,
539			spawner,
540			max_approval_retries,
541			retry_backoff,
542		}
543	}
544
545	/// Revert to the block corresponding to the specified `hash`.
546	/// The operation is not allowed for blocks older than the last finalized one.
547	pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
548		let config =
549			approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
550		let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
551		let mut overlay = OverlayedBackend::new(&backend);
552
553		ops::revert_to(&mut overlay, hash)?;
554
555		let ops = overlay.into_write_ops();
556		backend.write(ops)
557	}
558}
559
560// Checks and logs approval vote db state. It is perfectly normal to start with an
561// empty approval vote DB if we changed DB type or the node will sync from scratch.
562fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
563	let backend = DbBackend::new(db, config);
564	let all_blocks = backend.load_all_blocks()?;
565
566	if all_blocks.is_empty() {
567		gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
568	} else {
569		gum::debug!(
570			target: LOG_TARGET,
571			"Starting with {} blocks in approval vote DB.",
572			all_blocks.len()
573		);
574	}
575
576	Ok(())
577}
578
579#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
580impl<Context: Send> ApprovalVotingSubsystem {
581	fn start(self, mut ctx: Context) -> SpawnedSubsystem {
582		let backend = DbBackend::new(self.db.clone(), self.db_config);
583		let to_other_subsystems = ctx.sender().clone();
584		let to_approval_distr = ctx.sender().clone();
585
586		let future = run::<DbBackend, _, _, _>(
587			ctx,
588			to_other_subsystems,
589			to_approval_distr,
590			self,
591			Box::new(RealAssignmentCriteria),
592			backend,
593		)
594		.map_err(|e| SubsystemError::with_origin("approval-voting", e))
595		.boxed();
596
597		SpawnedSubsystem { name: "approval-voting-subsystem", future }
598	}
599}
600
601#[derive(Debug, Clone)]
602struct ApprovalVoteRequest {
603	validator_index: ValidatorIndex,
604	block_hash: Hash,
605}
606
607#[derive(Default)]
608struct Wakeups {
609	// Tick -> [(Relay Block, Candidate Hash)]
610	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
611	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
612	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
613}
614
615impl Wakeups {
616	// Returns the first tick there exist wakeups for, if any.
617	fn first(&self) -> Option<Tick> {
618		self.wakeups.keys().next().map(|t| *t)
619	}
620
621	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
622		self.block_numbers.entry(block_number).or_default().insert(block_hash);
623	}
624
625	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
626	// for these values. replaces any later wakeup.
627	fn schedule(
628		&mut self,
629		block_hash: Hash,
630		block_number: BlockNumber,
631		candidate_hash: CandidateHash,
632		tick: Tick,
633	) {
634		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
635			if prev <= &tick {
636				return
637			}
638
639			// we are replacing previous wakeup with an earlier one.
640			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
641				if let Some(pos) =
642					entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
643				{
644					entry.get_mut().remove(pos);
645				}
646
647				if entry.get().is_empty() {
648					let _ = entry.remove_entry();
649				}
650			}
651		} else {
652			self.note_block(block_hash, block_number);
653		}
654
655		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
656		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
657	}
658
659	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
660		let after = self.block_numbers.split_off(&(finalized_number + 1));
661		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
662			.into_iter()
663			.flat_map(|(_number, hashes)| hashes)
664			.collect();
665
666		let mut pruned_wakeups = BTreeMap::new();
667		self.reverse_wakeups.retain(|(h, c_h), tick| {
668			let live = !pruned_blocks.contains(h);
669			if !live {
670				pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
671			}
672			live
673		});
674
675		for (tick, pruned) in pruned_wakeups {
676			if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
677				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
678				if entry.get().is_empty() {
679					let _ = entry.remove();
680				}
681			}
682		}
683	}
684
685	// Get the wakeup for a particular block/candidate combo, if any.
686	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
687		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
688	}
689
690	// Returns the next wakeup. this future never returns if there are no wakeups.
691	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
692		match self.first() {
693			None => future::pending().await,
694			Some(tick) => {
695				clock.wait(tick).await;
696				match self.wakeups.entry(tick) {
697					BTMEntry::Vacant(_) => {
698						panic!("entry is known to exist since `first` was `Some`; qed")
699					},
700					BTMEntry::Occupied(mut entry) => {
701						let (hash, candidate_hash) = entry.get_mut().pop()
702							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
703
704						if entry.get().is_empty() {
705							let _ = entry.remove();
706						}
707
708						self.reverse_wakeups.remove(&(hash, candidate_hash));
709
710						(tick, hash, candidate_hash)
711					},
712				}
713			},
714		}
715	}
716}
717
718struct ApprovalStatus {
719	required_tranches: RequiredTranches,
720	tranche_now: DelayTranche,
721	block_tick: Tick,
722	last_no_shows: usize,
723	no_show_validators: Vec<ValidatorIndex>,
724}
725
726#[derive(Copy, Clone)]
727enum ApprovalOutcome {
728	Approved,
729	Failed,
730	TimedOut,
731}
732
733#[derive(Clone)]
734struct RetryApprovalInfo {
735	candidate: CandidateReceipt,
736	backing_group: GroupIndex,
737	executor_params: ExecutorParams,
738	core_index: Option<CoreIndex>,
739	session_index: SessionIndex,
740	attempts_remaining: u32,
741	backoff: Duration,
742}
743
744struct ApprovalState {
745	validator_index: ValidatorIndex,
746	candidate_hash: CandidateHash,
747	approval_outcome: ApprovalOutcome,
748	retry_info: Option<RetryApprovalInfo>,
749}
750
751impl ApprovalState {
752	fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
753		Self {
754			validator_index,
755			candidate_hash,
756			approval_outcome: ApprovalOutcome::Approved,
757			retry_info: None,
758		}
759	}
760	fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
761		Self {
762			validator_index,
763			candidate_hash,
764			approval_outcome: ApprovalOutcome::Failed,
765			retry_info: None,
766		}
767	}
768
769	fn failed_with_retry(
770		validator_index: ValidatorIndex,
771		candidate_hash: CandidateHash,
772		retry_info: Option<RetryApprovalInfo>,
773	) -> Self {
774		Self {
775			validator_index,
776			candidate_hash,
777			approval_outcome: ApprovalOutcome::Failed,
778			retry_info,
779		}
780	}
781}
782
783struct CurrentlyCheckingSet {
784	candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
785	currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
786}
787
788impl Default for CurrentlyCheckingSet {
789	fn default() -> Self {
790		Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
791	}
792}
793
794impl CurrentlyCheckingSet {
795	// This function will lazily launch approval voting work whenever the
796	// candidate is not already undergoing validation.
797	pub async fn insert_relay_block_hash(
798		&mut self,
799		candidate_hash: CandidateHash,
800		validator_index: ValidatorIndex,
801		relay_block: Hash,
802		launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
803	) -> SubsystemResult<()> {
804		match self.candidate_hash_map.entry(candidate_hash) {
805			HMEntry::Occupied(mut entry) => {
806				// validation already undergoing. just add the relay hash if unknown.
807				entry.get_mut().insert(relay_block);
808			},
809			HMEntry::Vacant(entry) => {
810				// validation not ongoing. launch work and time out the remote handle.
811				entry.insert(HashSet::new()).insert(relay_block);
812				let work = launch_work.await?;
813				self.currently_checking.push(Box::pin(async move {
814					match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
815						None => ApprovalState {
816							candidate_hash,
817							validator_index,
818							approval_outcome: ApprovalOutcome::TimedOut,
819							retry_info: None,
820						},
821						Some(approval_state) => approval_state,
822					}
823				}));
824			},
825		}
826
827		Ok(())
828	}
829
830	pub async fn next(
831		&mut self,
832		approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
833	) -> (HashSet<Hash>, ApprovalState) {
834		if !self.currently_checking.is_empty() {
835			if let Some(approval_state) = self.currently_checking.next().await {
836				let out = self
837					.candidate_hash_map
838					.remove(&approval_state.candidate_hash)
839					.unwrap_or_default();
840				approvals_cache
841					.insert(approval_state.candidate_hash, approval_state.approval_outcome);
842				return (out, approval_state)
843			}
844		}
845
846		future::pending().await
847	}
848}
849
850async fn get_extended_session_info<'a, Sender>(
851	runtime_info: &'a mut RuntimeInfo,
852	sender: &mut Sender,
853	relay_parent: Hash,
854	session_index: SessionIndex,
855) -> Option<&'a ExtendedSessionInfo>
856where
857	Sender: SubsystemSender<RuntimeApiMessage>,
858{
859	match runtime_info
860		.get_session_info_by_index(sender, relay_parent, session_index)
861		.await
862	{
863		Ok(extended_info) => Some(&extended_info),
864		Err(_) => {
865			gum::debug!(
866				target: LOG_TARGET,
867				session = session_index,
868				?relay_parent,
869				"Can't obtain SessionInfo or ExecutorParams"
870			);
871			None
872		},
873	}
874}
875
876async fn get_session_info<'a, Sender>(
877	runtime_info: &'a mut RuntimeInfo,
878	sender: &mut Sender,
879	relay_parent: Hash,
880	session_index: SessionIndex,
881) -> Option<&'a SessionInfo>
882where
883	Sender: SubsystemSender<RuntimeApiMessage>,
884{
885	get_extended_session_info(runtime_info, sender, relay_parent, session_index)
886		.await
887		.map(|extended_info| &extended_info.session_info)
888}
889
890struct State {
891	keystore: Arc<LocalKeystore>,
892	slot_duration_millis: u64,
893	clock: Arc<dyn Clock + Send + Sync>,
894	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
895	// Per block, candidate records about how long we take until we gather enough
896	// assignments, this is relevant because it gives us a good idea about how many
897	// tranches we trigger and why.
898	per_block_assignments_gathering_times:
899		LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
900	no_show_stats: NoShowStats,
901}
902
903// Regularly dump the no-show stats at this block number frequency.
904const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
905// The maximum number of validators we record no-shows for, per candidate.
906pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
907
908// No show stats per validator and per parachain.
909// This is valuable information when we have to debug live network issue, because
910// it gives information if things are going wrong only for some validators or just
911// for some parachains.
912#[derive(Debug, Clone, PartialEq, Eq, Default)]
913struct NoShowStats {
914	per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
915	per_parachain_no_show: HashMap<u32, usize>,
916	last_dumped_block_number: BlockNumber,
917}
918
919impl NoShowStats {
920	// Print the no-show stats if NO_SHOW_DUMP_FREQUENCY blocks have passed since the last
921	// print.
922	fn maybe_print(&mut self, current_block_number: BlockNumber) {
923		if self.last_dumped_block_number > current_block_number ||
924			current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
925		{
926			return
927		}
928		if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
929			return
930		}
931
932		gum::debug!(
933			target: LOG_TARGET,
934			"Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
935			self.per_validator_no_show,
936			self.per_parachain_no_show,
937			self.last_dumped_block_number
938		);
939
940		self.last_dumped_block_number = current_block_number;
941
942		self.per_validator_no_show.clear();
943		self.per_parachain_no_show.clear();
944	}
945}
946
947#[derive(Debug, Clone, PartialEq, Eq)]
948struct AssignmentGatheringRecord {
949	// The stage we are in.
950	// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
951	// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
952	// no-shows.
953	stage: usize,
954	// The time we started the stage.
955	stage_start: Option<Instant>,
956}
957
958impl Default for AssignmentGatheringRecord {
959	fn default() -> Self {
960		AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
961	}
962}
963
964#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
965impl State {
966	// Compute the required tranches for approval for this block and candidate combo.
967	// Fails if there is no approval entry for the block under the candidate or no candidate entry
968	// under the block, or if the session is out of bounds.
969	async fn approval_status<Sender, 'a, 'b>(
970		&'a self,
971		sender: &mut Sender,
972		session_info_provider: &'a mut RuntimeInfo,
973		block_entry: &'a BlockEntry,
974		candidate_entry: &'b CandidateEntry,
975	) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
976	where
977		Sender: SubsystemSender<RuntimeApiMessage>,
978	{
979		let session_info = match get_session_info(
980			session_info_provider,
981			sender,
982			block_entry.parent_hash(),
983			block_entry.session(),
984		)
985		.await
986		{
987			Some(s) => s,
988			None => return None,
989		};
990		let block_hash = block_entry.block_hash();
991
992		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
993		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
994		let no_show_duration = slot_number_to_tick(
995			self.slot_duration_millis,
996			Slot::from(u64::from(session_info.no_show_slots)),
997		);
998
999		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
1000			let TranchesToApproveResult {
1001				required_tranches,
1002				total_observed_no_shows,
1003				no_show_validators,
1004			} = approval_checking::tranches_to_approve(
1005				approval_entry,
1006				candidate_entry.approvals(),
1007				tranche_now,
1008				block_tick,
1009				no_show_duration,
1010				session_info.needed_approvals as _,
1011			);
1012
1013			let status = ApprovalStatus {
1014				required_tranches,
1015				block_tick,
1016				tranche_now,
1017				last_no_shows: total_observed_no_shows,
1018				no_show_validators,
1019			};
1020
1021			Some((approval_entry, status))
1022		} else {
1023			None
1024		}
1025	}
1026
1027	// Returns the approval voting params from the RuntimeApi.
1028	async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1029		&self,
1030		sender: &mut Sender,
1031		session_index: SessionIndex,
1032		block_hash: Hash,
1033	) -> Option<ApprovalVotingParams> {
1034		let (s_tx, s_rx) = oneshot::channel();
1035
1036		sender
1037			.send_message(RuntimeApiMessage::Request(
1038				block_hash,
1039				RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1040			))
1041			.await;
1042
1043		match s_rx.await {
1044			Ok(Ok(params)) => {
1045				gum::trace!(
1046					target: LOG_TARGET,
1047					approval_voting_params = ?params,
1048					session = ?session_index,
1049					"Using the following subsystem params"
1050				);
1051				Some(params)
1052			},
1053			Ok(Err(err)) => {
1054				gum::debug!(
1055					target: LOG_TARGET,
1056					?err,
1057					"Could not request approval voting params from runtime"
1058				);
1059				None
1060			},
1061			Err(err) => {
1062				gum::debug!(
1063					target: LOG_TARGET,
1064					?err,
1065					"Could not request approval voting params from runtime"
1066				);
1067				None
1068			},
1069		}
1070	}
1071
1072	fn mark_begining_of_gathering_assignments(
1073		&mut self,
1074		block_number: BlockNumber,
1075		block_hash: Hash,
1076		candidate: CandidateHash,
1077	) {
1078		if let Some(record) = self
1079			.per_block_assignments_gathering_times
1080			.get_or_insert(block_number, HashMap::new)
1081			.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1082		{
1083			if record.stage_start.is_none() {
1084				record.stage += 1;
1085				gum::debug!(
1086					target: LOG_TARGET,
1087					stage = ?record.stage,
1088					?block_hash,
1089					?candidate,
1090					"Started a new assignment gathering stage",
1091				);
1092				record.stage_start = Some(Instant::now());
1093			}
1094		}
1095	}
1096
1097	fn mark_gathered_enough_assignments(
1098		&mut self,
1099		block_number: BlockNumber,
1100		block_hash: Hash,
1101		candidate: CandidateHash,
1102	) -> AssignmentGatheringRecord {
1103		let record = self
1104			.per_block_assignments_gathering_times
1105			.get(&block_number)
1106			.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1107		let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1108		AssignmentGatheringRecord {
1109			stage,
1110			stage_start: record.and_then(|record| record.stage_start.take()),
1111		}
1112	}
1113
1114	fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1115		while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1116		{
1117			if *block_number < remove_lower_than {
1118				self.per_block_assignments_gathering_times.pop_oldest();
1119			} else {
1120				break
1121			}
1122		}
1123	}
1124
1125	fn observe_assignment_gathering_status(
1126		&mut self,
1127		metrics: &Metrics,
1128		required_tranches: &RequiredTranches,
1129		block_hash: Hash,
1130		block_number: BlockNumber,
1131		candidate_hash: CandidateHash,
1132	) {
1133		match required_tranches {
1134			RequiredTranches::All | RequiredTranches::Pending { .. } => {
1135				self.mark_begining_of_gathering_assignments(
1136					block_number,
1137					block_hash,
1138					candidate_hash,
1139				);
1140			},
1141			RequiredTranches::Exact { .. } => {
1142				let time_to_gather =
1143					self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1144				if let Some(gathering_started) = time_to_gather.stage_start {
1145					if gathering_started.elapsed().as_millis() > 6000 {
1146						gum::trace!(
1147							target: LOG_TARGET,
1148							?block_hash,
1149							?candidate_hash,
1150							"Long assignment gathering time",
1151						);
1152					}
1153					metrics.observe_assignment_gathering_time(
1154						time_to_gather.stage,
1155						gathering_started.elapsed().as_millis() as usize,
1156					)
1157				}
1158			},
1159		}
1160	}
1161
1162	fn record_no_shows(
1163		&mut self,
1164		session_index: SessionIndex,
1165		para_id: u32,
1166		no_show_validators: &Vec<ValidatorIndex>,
1167	) {
1168		if !no_show_validators.is_empty() {
1169			*self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1170		}
1171		for validator_index in no_show_validators {
1172			*self
1173				.no_show_stats
1174				.per_validator_no_show
1175				.entry(session_index)
1176				.or_default()
1177				.entry(*validator_index)
1178				.or_default() += 1;
1179		}
1180	}
1181}
1182
1183#[derive(Debug, Clone)]
1184enum Action {
1185	ScheduleWakeup {
1186		block_hash: Hash,
1187		block_number: BlockNumber,
1188		candidate_hash: CandidateHash,
1189		tick: Tick,
1190	},
1191	LaunchApproval {
1192		claimed_candidate_indices: CandidateBitfield,
1193		candidate_hash: CandidateHash,
1194		indirect_cert: IndirectAssignmentCertV2,
1195		assignment_tranche: DelayTranche,
1196		relay_block_hash: Hash,
1197		session: SessionIndex,
1198		executor_params: ExecutorParams,
1199		candidate: CandidateReceipt,
1200		backing_group: GroupIndex,
1201		distribute_assignment: bool,
1202		core_index: Option<CoreIndex>,
1203	},
1204	NoteApprovedInChainSelection(Hash),
1205	IssueApproval(CandidateHash, ApprovalVoteRequest),
1206	BecomeActive,
1207	Conclude,
1208}
1209
1210/// Trait for providing approval voting subsystem with work.
1211#[async_trait::async_trait]
1212pub trait ApprovalVotingWorkProvider {
1213	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1214}
1215
1216#[async_trait::async_trait]
1217#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1218impl<Context> ApprovalVotingWorkProvider for Context {
1219	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1220		self.recv().await
1221	}
1222}
1223
1224#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1225async fn run<
1226	B,
1227	WorkProvider: ApprovalVotingWorkProvider,
1228	Sender: SubsystemSender<ChainApiMessage>
1229		+ SubsystemSender<RuntimeApiMessage>
1230		+ SubsystemSender<ChainSelectionMessage>
1231		+ SubsystemSender<AvailabilityRecoveryMessage>
1232		+ SubsystemSender<DisputeCoordinatorMessage>
1233		+ SubsystemSender<CandidateValidationMessage>
1234		+ Clone,
1235	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1236>(
1237	mut work_provider: WorkProvider,
1238	mut to_other_subsystems: Sender,
1239	mut to_approval_distr: ADSender,
1240	mut subsystem: ApprovalVotingSubsystem,
1241	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1242	mut backend: B,
1243) -> SubsystemResult<()>
1244where
1245	B: Backend,
1246{
1247	if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1248		gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1249	}
1250
1251	let mut state = State {
1252		keystore: subsystem.keystore,
1253		slot_duration_millis: subsystem.slot_duration_millis,
1254		clock: subsystem.clock,
1255		assignment_criteria,
1256		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1257			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1258		)),
1259		no_show_stats: NoShowStats::default(),
1260	};
1261
1262	let mut last_finalized_height: Option<BlockNumber> = {
1263		let (tx, rx) = oneshot::channel();
1264		to_other_subsystems
1265			.send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1266			.await;
1267		match rx.await? {
1268			Ok(number) => Some(number),
1269			Err(err) => {
1270				gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1271				None
1272			},
1273		}
1274	};
1275
1276	// `None` on start-up. Gets initialized/updated on leaf update
1277	let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1278		keystore: None,
1279		session_cache_lru_size: DISPUTE_WINDOW.get(),
1280	});
1281
1282	let mut wakeups = Wakeups::default();
1283	let mut currently_checking_set = CurrentlyCheckingSet::default();
1284	let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1285	let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1286
1287	loop {
1288		let mut overlayed_db = OverlayedBackend::new(&backend);
1289		let actions = futures::select! {
1290			(_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1291				subsystem.metrics.on_wakeup();
1292				process_wakeup(
1293					&mut to_other_subsystems,
1294					&mut state,
1295					&mut overlayed_db,
1296					&mut session_info_provider,
1297					woken_block,
1298					woken_candidate,
1299					&subsystem.metrics,
1300					&wakeups,
1301				).await?
1302			}
1303			next_msg = work_provider.recv().fuse() => {
1304				let mut actions = handle_from_overseer(
1305					&mut to_other_subsystems,
1306					&mut to_approval_distr,
1307					&subsystem.spawner,
1308					&mut state,
1309					&mut overlayed_db,
1310					&mut session_info_provider,
1311					&subsystem.metrics,
1312					next_msg?,
1313					&mut last_finalized_height,
1314					&mut wakeups,
1315				).await?;
1316
1317				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1318					if !oracle.is_major_syncing() {
1319						// note that we're active before processing other actions.
1320						actions.insert(0, Action::BecomeActive)
1321					}
1322				}
1323
1324				actions
1325			}
1326			approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1327				let mut actions = Vec::new();
1328				let (
1329					relay_block_hashes,
1330					ApprovalState {
1331						validator_index,
1332						candidate_hash,
1333						approval_outcome,
1334						retry_info,
1335					}
1336				) = approval_state;
1337
1338				if matches!(approval_outcome, ApprovalOutcome::Approved) {
1339					let mut approvals: Vec<Action> = relay_block_hashes
1340						.iter()
1341						.map(|block_hash|
1342							Action::IssueApproval(
1343								candidate_hash,
1344								ApprovalVoteRequest {
1345									validator_index,
1346									block_hash: *block_hash,
1347								},
1348							)
1349						)
1350						.collect();
1351					actions.append(&mut approvals);
1352				}
1353
1354				if let Some(retry_info) = retry_info {
1355					for block_hash in relay_block_hashes {
1356						if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1357							let sender = to_other_subsystems.clone();
1358							let spawn_handle = subsystem.spawner.clone();
1359							let metrics = subsystem.metrics.clone();
1360							let retry_info = retry_info.clone();
1361							let executor_params = retry_info.executor_params.clone();
1362							let candidate = retry_info.candidate.clone();
1363
1364							currently_checking_set
1365								.insert_relay_block_hash(
1366									candidate_hash,
1367									validator_index,
1368									block_hash,
1369									async move {
1370										launch_approval(
1371											sender,
1372											spawn_handle,
1373											metrics,
1374											retry_info.session_index,
1375											candidate,
1376											validator_index,
1377											block_hash,
1378											retry_info.backing_group,
1379											executor_params,
1380											retry_info.core_index,
1381											retry_info,
1382										)
1383										.await
1384									},
1385								)
1386								.await?;
1387						}
1388					}
1389				}
1390
1391				actions
1392			},
1393			(block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1394				gum::debug!(
1395					target: LOG_TARGET,
1396					?block_hash,
1397					?validator_index,
1398					"Sign approval for multiple candidates",
1399				);
1400
1401				match maybe_create_signature(
1402					&mut overlayed_db,
1403					&mut session_info_provider,
1404					&state,
1405					&mut to_other_subsystems,
1406					&mut to_approval_distr,
1407					block_hash,
1408					validator_index,
1409					&subsystem.metrics,
1410				).await {
1411					Ok(Some(next_wakeup)) => {
1412						delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1413					},
1414					Ok(None) => {}
1415					Err(err) => {
1416						gum::error!(
1417							target: LOG_TARGET,
1418							?err,
1419							"Failed to create signature",
1420						);
1421					}
1422				}
1423				vec![]
1424			}
1425		};
1426
1427		if handle_actions(
1428			&mut to_other_subsystems,
1429			&mut to_approval_distr,
1430			&subsystem.spawner,
1431			&mut state,
1432			&mut overlayed_db,
1433			&mut session_info_provider,
1434			&subsystem.metrics,
1435			&mut wakeups,
1436			&mut currently_checking_set,
1437			&mut delayed_approvals_timers,
1438			&mut approvals_cache,
1439			&mut subsystem.mode,
1440			actions,
1441			subsystem.max_approval_retries,
1442			subsystem.retry_backoff,
1443		)
1444		.await?
1445		{
1446			break
1447		}
1448
1449		if !overlayed_db.is_empty() {
1450			let _timer = subsystem.metrics.time_db_transaction();
1451			let ops = overlayed_db.into_write_ops();
1452			backend.write(ops)?;
1453		}
1454	}
1455
1456	Ok(())
1457}
1458
1459// Starts a worker thread that runs the approval voting subsystem.
1460pub async fn start_approval_worker<
1461	WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1462	Sender: SubsystemSender<ChainApiMessage>
1463		+ SubsystemSender<RuntimeApiMessage>
1464		+ SubsystemSender<ChainSelectionMessage>
1465		+ SubsystemSender<AvailabilityRecoveryMessage>
1466		+ SubsystemSender<DisputeCoordinatorMessage>
1467		+ SubsystemSender<CandidateValidationMessage>
1468		+ Clone,
1469	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1470>(
1471	work_provider: WorkProvider,
1472	to_other_subsystems: Sender,
1473	to_approval_distr: ADSender,
1474	config: Config,
1475	db: Arc<dyn Database>,
1476	keystore: Arc<LocalKeystore>,
1477	sync_oracle: Box<dyn SyncOracle + Send>,
1478	metrics: Metrics,
1479	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1480	task_name: &'static str,
1481	group_name: &'static str,
1482	clock: Arc<dyn Clock + Send + Sync>,
1483) -> SubsystemResult<()> {
1484	let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1485		config,
1486		db.clone(),
1487		keystore,
1488		sync_oracle,
1489		metrics,
1490		clock,
1491		spawner,
1492		MAX_APPROVAL_RETRIES,
1493		APPROVAL_CHECKING_TIMEOUT / 2,
1494	);
1495	let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1496	let spawner = approval_voting.spawner.clone();
1497	spawner.spawn_blocking(
1498		task_name,
1499		Some(group_name),
1500		Box::pin(async move {
1501			if let Err(err) = run(
1502				work_provider,
1503				to_other_subsystems,
1504				to_approval_distr,
1505				approval_voting,
1506				Box::new(RealAssignmentCriteria),
1507				backend,
1508			)
1509			.await
1510			{
1511				gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1512			};
1513		}),
1514	);
1515	Ok(())
1516}
1517
1518// Handle actions is a function that accepts a set of instructions
1519// and subsequently updates the underlying approvals_db in accordance
1520// with the linear set of instructions passed in. Therefore, actions
1521// must be processed in series to ensure that earlier actions are not
1522// negated/corrupted by later actions being executed out-of-order.
1523//
1524// However, certain Actions can cause additional actions to need to be
1525// processed by this function. In order to preserve linearity, we would
1526// need to handle these newly generated actions before we finalize
1527// completing additional actions in the submitted sequence of actions.
1528//
1529// Since recursive async functions are not stable yet, we are
1530// forced to modify the actions iterator on the fly whenever a new set
1531// of actions are generated by handling a single action.
1532//
1533// This particular problem statement is specified in issue 3311:
1534// 	https://github.com/paritytech/polkadot/issues/3311
1535//
1536// returns `true` if any of the actions was a `Conclude` command.
1537#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1538async fn handle_actions<
1539	Sender: SubsystemSender<ChainApiMessage>
1540		+ SubsystemSender<RuntimeApiMessage>
1541		+ SubsystemSender<ChainSelectionMessage>
1542		+ SubsystemSender<AvailabilityRecoveryMessage>
1543		+ SubsystemSender<DisputeCoordinatorMessage>
1544		+ SubsystemSender<CandidateValidationMessage>
1545		+ Clone,
1546	ADSender: SubsystemSender<ApprovalDistributionMessage>,
1547>(
1548	sender: &mut Sender,
1549	approval_voting_sender: &mut ADSender,
1550	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1551	state: &mut State,
1552	overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1553	session_info_provider: &mut RuntimeInfo,
1554	metrics: &Metrics,
1555	wakeups: &mut Wakeups,
1556	currently_checking_set: &mut CurrentlyCheckingSet,
1557	delayed_approvals_timers: &mut DelayedApprovalTimer,
1558	approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1559	mode: &mut Mode,
1560	actions: Vec<Action>,
1561	max_approval_retries: u32,
1562	retry_backoff: Duration,
1563) -> SubsystemResult<bool> {
1564	let mut conclude = false;
1565	let mut actions_iter = actions.into_iter();
1566	while let Some(action) = actions_iter.next() {
1567		match action {
1568			Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1569				wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1570			},
1571			Action::IssueApproval(candidate_hash, approval_request) => {
1572				// Note that the IssueApproval action will create additional
1573				// actions that will need to all be processed before we can
1574				// handle the next action in the set passed to the ambient
1575				// function.
1576				//
1577				// In order to achieve this, we append the existing iterator
1578				// to the end of the iterator made up of these newly generated
1579				// actions.
1580				//
1581				// Note that chaining these iterators is O(n) as we must consume
1582				// the prior iterator.
1583				let next_actions: Vec<Action> = issue_approval(
1584					sender,
1585					approval_voting_sender,
1586					state,
1587					overlayed_db,
1588					session_info_provider,
1589					metrics,
1590					candidate_hash,
1591					delayed_approvals_timers,
1592					approval_request,
1593					&wakeups,
1594				)
1595				.await?
1596				.into_iter()
1597				.map(|v| v.clone())
1598				.chain(actions_iter)
1599				.collect();
1600
1601				actions_iter = next_actions.into_iter();
1602			},
1603			Action::LaunchApproval {
1604				claimed_candidate_indices,
1605				candidate_hash,
1606				indirect_cert,
1607				assignment_tranche,
1608				relay_block_hash,
1609				session,
1610				executor_params,
1611				candidate,
1612				backing_group,
1613				distribute_assignment,
1614				core_index,
1615			} => {
1616				// Don't launch approval work if the node is syncing.
1617				if let Mode::Syncing(_) = *mode {
1618					continue
1619				}
1620
1621				metrics.on_assignment_produced(assignment_tranche);
1622				let block_hash = indirect_cert.block_hash;
1623				let validator_index = indirect_cert.validator;
1624
1625				if distribute_assignment {
1626					approval_voting_sender.send_unbounded_message(
1627						ApprovalDistributionMessage::DistributeAssignment(
1628							indirect_cert,
1629							claimed_candidate_indices,
1630						),
1631					);
1632				}
1633
1634				match approvals_cache.get(&candidate_hash) {
1635					Some(ApprovalOutcome::Approved) => {
1636						let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1637							candidate_hash,
1638							ApprovalVoteRequest { validator_index, block_hash },
1639						))
1640						.map(|v| v.clone())
1641						.chain(actions_iter)
1642						.collect();
1643						actions_iter = new_actions.into_iter();
1644					},
1645					None => {
1646						let sender = sender.clone();
1647						let spawn_handle = spawn_handle.clone();
1648
1649						let retry = RetryApprovalInfo {
1650							candidate: candidate.clone(),
1651							backing_group,
1652							executor_params: executor_params.clone(),
1653							core_index,
1654							session_index: session,
1655							attempts_remaining: max_approval_retries,
1656							backoff: retry_backoff,
1657						};
1658
1659						currently_checking_set
1660							.insert_relay_block_hash(
1661								candidate_hash,
1662								validator_index,
1663								relay_block_hash,
1664								async move {
1665									launch_approval(
1666										sender,
1667										spawn_handle,
1668										metrics.clone(),
1669										session,
1670										candidate,
1671										validator_index,
1672										block_hash,
1673										backing_group,
1674										executor_params,
1675										core_index,
1676										retry,
1677									)
1678									.await
1679								},
1680							)
1681							.await?;
1682					},
1683					Some(_) => {},
1684				}
1685			},
1686			Action::NoteApprovedInChainSelection(block_hash) => {
1687				sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1688			},
1689			Action::BecomeActive => {
1690				*mode = Mode::Active;
1691
1692				let (messages, next_actions) = distribution_messages_for_activation(
1693					sender,
1694					overlayed_db,
1695					state,
1696					delayed_approvals_timers,
1697					session_info_provider,
1698				)
1699				.await?;
1700				for message in messages.into_iter() {
1701					approval_voting_sender.send_unbounded_message(message);
1702				}
1703				let next_actions: Vec<Action> =
1704					next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1705
1706				actions_iter = next_actions.into_iter();
1707			},
1708			Action::Conclude => {
1709				conclude = true;
1710			},
1711		}
1712	}
1713
1714	Ok(conclude)
1715}
1716
1717fn cores_to_candidate_indices(
1718	core_indices: &CoreBitfield,
1719	block_entry: &BlockEntry,
1720) -> Result<CandidateBitfield, BitfieldError> {
1721	let mut candidate_indices = Vec::new();
1722
1723	// Map from core index to candidate index.
1724	for claimed_core_index in core_indices.iter_ones() {
1725		if let Some(candidate_index) = block_entry
1726			.candidates()
1727			.iter()
1728			.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1729		{
1730			candidate_indices.push(candidate_index as _);
1731		}
1732	}
1733
1734	CandidateBitfield::try_from(candidate_indices)
1735}
1736
1737// Returns the claimed core bitfield from the assignment cert and the core index
1738// from the block entry.
1739fn get_core_indices_on_startup(
1740	assignment: &AssignmentCertKindV2,
1741	block_entry_core_index: CoreIndex,
1742) -> CoreBitfield {
1743	match &assignment {
1744		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1745		AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
1746			CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
1747		AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1748			CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
1749	}
1750}
1751
1752// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
1753// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
1754// in the block entry which indicates a bug or corrupted storage.
1755fn get_assignment_core_indices(
1756	assignment: &AssignmentCertKindV2,
1757	candidate_hash: &CandidateHash,
1758	block_entry: &BlockEntry,
1759) -> Option<CoreBitfield> {
1760	match &assignment {
1761		AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
1762			Some(core_bitfield.clone()),
1763		AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1764			.candidates()
1765			.iter()
1766			.find(|(_core_index, h)| candidate_hash == h)
1767			.map(|(core_index, _candidate_hash)| {
1768				CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1769			}),
1770		AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1771			Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")),
1772	}
1773}
1774
1775#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1776async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApiMessage>>(
1777	sender: &mut Sender,
1778	db: &OverlayedBackend<'_, impl Backend>,
1779	state: &State,
1780	delayed_approvals_timers: &mut DelayedApprovalTimer,
1781	session_info_provider: &mut RuntimeInfo,
1782) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1783	let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1784
1785	let mut approval_meta = Vec::with_capacity(all_blocks.len());
1786	let mut messages = Vec::new();
1787	let mut approvals = Vec::new();
1788	let mut actions = Vec::new();
1789
1790	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
1791
1792	for block_hash in all_blocks {
1793		let block_entry = match db.load_block_entry(&block_hash)? {
1794			Some(b) => b,
1795			None => {
1796				gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1797
1798				continue
1799			},
1800		};
1801
1802		approval_meta.push(BlockApprovalMeta {
1803			hash: block_hash,
1804			number: block_entry.block_number(),
1805			parent_hash: block_entry.parent_hash(),
1806			candidates: block_entry
1807				.candidates()
1808				.iter()
1809				.map(|(core_index, c_hash)| {
1810					let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1811					let group_index = candidate
1812						.and_then(|entry| {
1813							entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1814						})
1815						.unwrap_or_else(|| {
1816							gum::warn!(
1817								target: LOG_TARGET,
1818								?block_hash,
1819								?c_hash,
1820								"Missing candidate entry or approval entry",
1821							);
1822							GroupIndex::default()
1823						});
1824					(*c_hash, *core_index, group_index)
1825				})
1826				.collect(),
1827			slot: block_entry.slot(),
1828			session: block_entry.session(),
1829			vrf_story: block_entry.relay_vrf_story(),
1830		});
1831		let mut signatures_queued = HashSet::new();
1832		for (core_index, candidate_hash) in block_entry.candidates() {
1833			let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1834				Some(c) => c,
1835				None => {
1836					gum::warn!(
1837						target: LOG_TARGET,
1838						?block_hash,
1839						?candidate_hash,
1840						"Missing candidate entry",
1841					);
1842
1843					continue
1844				},
1845			};
1846
1847			match candidate_entry.approval_entry(&block_hash) {
1848				Some(approval_entry) => {
1849					match approval_entry.local_statements() {
1850						(None, None) =>
1851							if approval_entry
1852								.our_assignment()
1853								.map(|assignment| !assignment.triggered())
1854								.unwrap_or(false)
1855							{
1856								actions.push(Action::ScheduleWakeup {
1857									block_hash,
1858									block_number: block_entry.block_number(),
1859									candidate_hash: *candidate_hash,
1860									tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1861								})
1862							},
1863						(None, Some(_)) => {}, // second is impossible case.
1864						(Some(assignment), None) => {
1865							let claimed_core_indices =
1866								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1867
1868							if block_entry.has_candidates_pending_signature() {
1869								delayed_approvals_timers.maybe_arm_timer(
1870									state.clock.tick_now(),
1871									state.clock.as_ref(),
1872									block_entry.block_hash(),
1873									assignment.validator_index(),
1874								)
1875							}
1876
1877							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1878								Ok(bitfield) => {
1879									gum::debug!(
1880										target: LOG_TARGET,
1881										candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1882										?block_hash,
1883										"Discovered, triggered assignment, not approved yet",
1884									);
1885
1886									let indirect_cert = IndirectAssignmentCertV2 {
1887										block_hash,
1888										validator: assignment.validator_index(),
1889										cert: assignment.cert().clone(),
1890									};
1891									messages.push(
1892										ApprovalDistributionMessage::DistributeAssignment(
1893											indirect_cert.clone(),
1894											bitfield.clone(),
1895										),
1896									);
1897
1898									if !block_entry.candidate_is_pending_signature(*candidate_hash)
1899									{
1900										let ExtendedSessionInfo { ref executor_params, .. } =
1901											match get_extended_session_info(
1902												session_info_provider,
1903												sender,
1904												block_entry.block_hash(),
1905												block_entry.session(),
1906											)
1907											.await
1908											{
1909												Some(i) => i,
1910												None => continue,
1911											};
1912
1913										actions.push(Action::LaunchApproval {
1914											claimed_candidate_indices: bitfield,
1915											candidate_hash: candidate_entry
1916												.candidate_receipt()
1917												.hash(),
1918											indirect_cert,
1919											assignment_tranche: assignment.tranche(),
1920											relay_block_hash: block_hash,
1921											session: block_entry.session(),
1922											executor_params: executor_params.clone(),
1923											candidate: candidate_entry.candidate_receipt().clone(),
1924											backing_group: approval_entry.backing_group(),
1925											distribute_assignment: false,
1926											core_index: Some(*core_index),
1927										});
1928									}
1929								},
1930								Err(err) => {
1931									// Should never happen. If we fail here it means the
1932									// assignment is null (no cores claimed).
1933									gum::warn!(
1934										target: LOG_TARGET,
1935										?block_hash,
1936										?candidate_hash,
1937										?err,
1938										"Failed to create assignment bitfield",
1939									);
1940								},
1941							}
1942						},
1943						(Some(assignment), Some(approval_sig)) => {
1944							let claimed_core_indices =
1945								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1946							match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1947								Ok(bitfield) => messages.push(
1948									ApprovalDistributionMessage::DistributeAssignment(
1949										IndirectAssignmentCertV2 {
1950											block_hash,
1951											validator: assignment.validator_index(),
1952											cert: assignment.cert().clone(),
1953										},
1954										bitfield,
1955									),
1956								),
1957								Err(err) => {
1958									gum::warn!(
1959										target: LOG_TARGET,
1960										?block_hash,
1961										?candidate_hash,
1962										?err,
1963										"Failed to create assignment bitfield",
1964									);
1965									// If we didn't send assignment, we don't send approval.
1966									continue
1967								},
1968							}
1969							if signatures_queued
1970								.insert(approval_sig.signed_candidates_indices.clone())
1971							{
1972								approvals.push(ApprovalDistributionMessage::DistributeApproval(
1973									IndirectSignedApprovalVoteV2 {
1974										block_hash,
1975										candidate_indices: approval_sig.signed_candidates_indices,
1976										validator: assignment.validator_index(),
1977										signature: approval_sig.signature,
1978									},
1979								))
1980							};
1981						},
1982					}
1983				},
1984				None => {
1985					gum::warn!(
1986						target: LOG_TARGET,
1987						?block_hash,
1988						?candidate_hash,
1989						"Missing approval entry",
1990					);
1991				},
1992			}
1993		}
1994	}
1995
1996	messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
1997	// Approvals are appended at the end, to make sure all assignments are sent
1998	// before the approvals, otherwise if they arrive ahead in approval-distribution
1999	// they will be ignored.
2000	messages.extend(approvals.into_iter());
2001	Ok((messages, actions))
2002}
2003
2004// Handle an incoming signal from the overseer. Returns true if execution should conclude.
2005async fn handle_from_overseer<
2006	Sender: SubsystemSender<ChainApiMessage>
2007		+ SubsystemSender<RuntimeApiMessage>
2008		+ SubsystemSender<ChainSelectionMessage>
2009		+ Clone,
2010	ADSender: SubsystemSender<ApprovalDistributionMessage>,
2011>(
2012	sender: &mut Sender,
2013	approval_voting_sender: &mut ADSender,
2014	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2015	state: &mut State,
2016	db: &mut OverlayedBackend<'_, impl Backend>,
2017	session_info_provider: &mut RuntimeInfo,
2018	metrics: &Metrics,
2019	x: FromOrchestra<ApprovalVotingMessage>,
2020	last_finalized_height: &mut Option<BlockNumber>,
2021	wakeups: &mut Wakeups,
2022) -> SubsystemResult<Vec<Action>> {
2023	let actions = match x {
2024		FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2025			let mut actions = Vec::new();
2026			if let Some(activated) = update.activated {
2027				let head = activated.hash;
2028				match import::handle_new_head(
2029					sender,
2030					approval_voting_sender,
2031					state,
2032					db,
2033					session_info_provider,
2034					head,
2035					last_finalized_height,
2036				)
2037				.await
2038				{
2039					Err(e) => return Err(SubsystemError::with_origin("db", e)),
2040					Ok(block_imported_candidates) => {
2041						// Schedule wakeups for all imported candidates.
2042						for block_batch in block_imported_candidates {
2043							gum::debug!(
2044								target: LOG_TARGET,
2045								block_number = ?block_batch.block_number,
2046								block_hash = ?block_batch.block_hash,
2047								num_candidates = block_batch.imported_candidates.len(),
2048								"Imported new block.",
2049							);
2050
2051							state.no_show_stats.maybe_print(block_batch.block_number);
2052
2053							for (c_hash, c_entry) in block_batch.imported_candidates {
2054								metrics.on_candidate_imported();
2055
2056								let our_tranche = c_entry
2057									.approval_entry(&block_batch.block_hash)
2058									.and_then(|a| a.our_assignment().map(|a| a.tranche()));
2059
2060								if let Some(our_tranche) = our_tranche {
2061									let tick = our_tranche as Tick + block_batch.block_tick;
2062									gum::trace!(
2063										target: LOG_TARGET,
2064										tranche = our_tranche,
2065										candidate_hash = ?c_hash,
2066										block_hash = ?block_batch.block_hash,
2067										block_tick = block_batch.block_tick,
2068										"Scheduling first wakeup.",
2069									);
2070
2071									// Our first wakeup will just be the tranche of our assignment,
2072									// if any. This will likely be superseded by incoming
2073									// assignments and approvals which trigger rescheduling.
2074									actions.push(Action::ScheduleWakeup {
2075										block_hash: block_batch.block_hash,
2076										block_number: block_batch.block_number,
2077										candidate_hash: c_hash,
2078										tick,
2079									});
2080								}
2081							}
2082						}
2083					},
2084				}
2085			}
2086
2087			actions
2088		},
2089		FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2090			gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2091			*last_finalized_height = Some(block_number);
2092
2093			crate::ops::canonicalize(db, block_number, block_hash)
2094				.map_err(|e| SubsystemError::with_origin("db", e))?;
2095
2096			// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2097			// accordingly.
2098			wakeups.prune_finalized_wakeups(block_number);
2099			state.cleanup_assignments_gathering_timestamp(block_number);
2100
2101			// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
2102			// accordingly. let hash_set =
2103			// wakeups.block_numbers.values().flatten().collect::<HashSet<_>>(); state.spans.
2104			// retain(|hash, _| hash_set.contains(hash));
2105
2106			Vec::new()
2107		},
2108		FromOrchestra::Signal(OverseerSignal::Conclude) => {
2109			vec![Action::Conclude]
2110		},
2111		FromOrchestra::Communication { msg } => match msg {
2112			ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2113				let (check_outcome, actions) =
2114					import_assignment(sender, state, db, session_info_provider, checked_assignment)
2115						.await?;
2116				// approval-distribution makes sure this assignment is valid and expected,
2117				// so this import should never fail, if it does it might mean one of two things,
2118				// there is a bug in the code or the two subsystems got out of sync.
2119				if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2120					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2121				}
2122				let _ = tx.map(|tx| tx.send(check_outcome));
2123				actions
2124			},
2125			ApprovalVotingMessage::ImportApproval(a, tx) => {
2126				let result =
2127					import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2128						.await?;
2129				// approval-distribution makes sure this vote is valid and expected,
2130				// so this import should never fail, if it does it might mean one of two things,
2131				// there is a bug in the code or the two subsystems got out of sync.
2132				if let ApprovalCheckResult::Bad(ref err) = result.1 {
2133					gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2134				}
2135				let _ = tx.map(|tx| tx.send(result.1));
2136
2137				result.0
2138			},
2139			ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2140				match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2141					.await
2142				{
2143					Ok(v) => {
2144						let _ = res.send(v);
2145					},
2146					Err(e) => {
2147						let _ = res.send(None);
2148						return Err(e)
2149					},
2150				}
2151
2152				Vec::new()
2153			},
2154			ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2155				metrics.on_candidate_signatures_request();
2156				get_approval_signatures_for_candidate(
2157					approval_voting_sender.clone(),
2158					spawn_handle,
2159					db,
2160					candidate_hash,
2161					tx,
2162				)
2163				.await?;
2164				Vec::new()
2165			},
2166		},
2167	};
2168
2169	Ok(actions)
2170}
2171
2172/// Retrieve approval signatures.
2173///
2174/// This involves an unbounded message send to approval-distribution, the caller has to ensure that
2175/// calls to this function are infrequent and bounded.
2176#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2177async fn get_approval_signatures_for_candidate<
2178	Sender: SubsystemSender<ApprovalDistributionMessage>,
2179>(
2180	mut sender: Sender,
2181	spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2182	db: &OverlayedBackend<'_, impl Backend>,
2183	candidate_hash: CandidateHash,
2184	tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2185) -> SubsystemResult<()> {
2186	let send_votes = |votes| {
2187		if let Err(_) = tx.send(votes) {
2188			gum::debug!(
2189				target: LOG_TARGET,
2190				"Sending approval signatures back failed, as receiver got closed."
2191			);
2192		}
2193	};
2194	let entry = match db.load_candidate_entry(&candidate_hash)? {
2195		None => {
2196			send_votes(HashMap::new());
2197			gum::debug!(
2198				target: LOG_TARGET,
2199				?candidate_hash,
2200				"Sent back empty votes because the candidate was not found in db."
2201			);
2202			return Ok(())
2203		},
2204		Some(e) => e,
2205	};
2206
2207	let relay_hashes = entry.block_assignments.keys();
2208
2209	let mut candidate_indices = HashSet::new();
2210	let mut candidate_indices_to_candidate_hashes: HashMap<
2211		Hash,
2212		HashMap<CandidateIndex, CandidateHash>,
2213	> = HashMap::new();
2214
2215	// Retrieve `CoreIndices`/`CandidateIndices` as required by approval-distribution:
2216	for hash in relay_hashes {
2217		let entry = match db.load_block_entry(hash)? {
2218			None => {
2219				gum::debug!(
2220					target: LOG_TARGET,
2221					?candidate_hash,
2222					?hash,
2223					"Block entry for assignment missing."
2224				);
2225				continue
2226			},
2227			Some(e) => e,
2228		};
2229		for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2230			if c_hash == &candidate_hash {
2231				candidate_indices.insert((*hash, candidate_index as u32));
2232			}
2233			candidate_indices_to_candidate_hashes
2234				.entry(*hash)
2235				.or_default()
2236				.insert(candidate_index as _, *c_hash);
2237		}
2238	}
2239
2240	let get_approvals = async move {
2241		let (tx_distribution, rx_distribution) = oneshot::channel();
2242		sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2243			candidate_indices,
2244			tx_distribution,
2245		));
2246
2247		// Because of the unbounded sending and the nature of the call (just fetching data from
2248		// state), this should not block long:
2249		match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2250			None => {
2251				gum::warn!(
2252					target: LOG_TARGET,
2253					"Waiting for approval signatures timed out - dead lock?"
2254				);
2255			},
2256			Some(Err(_)) => gum::debug!(
2257				target: LOG_TARGET,
2258				"Request for approval signatures got cancelled by `approval-distribution`."
2259			),
2260			Some(Ok(votes)) => {
2261				let votes = votes
2262					.into_iter()
2263					.filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2264						let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2265
2266						if candidates_hashes.is_none() {
2267							gum::warn!(
2268								target: LOG_TARGET,
2269								?hash,
2270								"Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2271							);
2272						}
2273
2274						let num_signed_candidates = signed_candidates_indices.len();
2275
2276						let signed_candidates_hashes: Vec<CandidateHash> =
2277							signed_candidates_indices
2278								.into_iter()
2279								.filter_map(|candidate_index| {
2280									candidates_hashes.and_then(|candidate_hashes| {
2281										if let Some(candidate_hash) =
2282											candidate_hashes.get(&candidate_index)
2283										{
2284											Some(*candidate_hash)
2285										} else {
2286											gum::warn!(
2287												target: LOG_TARGET,
2288												?candidate_index,
2289												"Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2290											);
2291											None
2292										}
2293									})
2294								})
2295								.collect();
2296						if num_signed_candidates == signed_candidates_hashes.len() {
2297							Some((validator_index, (signed_candidates_hashes, signature)))
2298						} else {
2299							gum::warn!(
2300								target: LOG_TARGET,
2301								"Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2302							);
2303							None
2304						}
2305					})
2306					.collect();
2307				send_votes(votes)
2308			},
2309		}
2310	};
2311
2312	// No need to block subsystem on this (also required to break cycle).
2313	// We should not be sending this message frequently - caller must make sure this is bounded.
2314	gum::trace!(
2315		target: LOG_TARGET,
2316		?candidate_hash,
2317		"Spawning task for fetching signatures from approval-distribution"
2318	);
2319	spawn_handle.spawn(
2320		"get-approval-signatures",
2321		Some("approval-voting-subsystem"),
2322		Box::pin(get_approvals),
2323	);
2324	Ok(())
2325}
2326
2327#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2328async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2329	sender: &mut Sender,
2330	db: &OverlayedBackend<'_, impl Backend>,
2331	target: Hash,
2332	lower_bound: BlockNumber,
2333	wakeups: &Wakeups,
2334	metrics: &Metrics,
2335) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2336	const MAX_TRACING_WINDOW: usize = 200;
2337	const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2338	const LOGGING_DEPTH_THRESHOLD: usize = 10;
2339
2340	let mut all_approved_max = None;
2341
2342	let target_number = {
2343		let (tx, rx) = oneshot::channel();
2344
2345		sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2346
2347		match rx.await {
2348			Ok(Ok(Some(n))) => n,
2349			Ok(Ok(None)) => return Ok(None),
2350			Ok(Err(_)) | Err(_) => return Ok(None),
2351		}
2352	};
2353
2354	if target_number <= lower_bound {
2355		return Ok(None)
2356	}
2357
2358	// request ancestors up to but not including the lower bound,
2359	// as a vote on the lower bound is implied if we cannot find
2360	// anything else.
2361	let ancestry = if target_number > lower_bound + 1 {
2362		let (tx, rx) = oneshot::channel();
2363
2364		sender
2365			.send_message(ChainApiMessage::Ancestors {
2366				hash: target,
2367				k: (target_number - (lower_bound + 1)) as usize,
2368				response_channel: tx,
2369			})
2370			.await;
2371
2372		match rx.await {
2373			Ok(Ok(a)) => a,
2374			Err(_) | Ok(Err(_)) => return Ok(None),
2375		}
2376	} else {
2377		Vec::new()
2378	};
2379	let ancestry_len = ancestry.len();
2380
2381	let mut block_descriptions = Vec::new();
2382
2383	let mut bits: BitVec<u8, Lsb0> = Default::default();
2384	for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2385		// Block entries should be present as the assumption is that
2386		// nothing here is finalized. If we encounter any missing block
2387		// entries we can fail.
2388		let entry = match db.load_block_entry(&block_hash)? {
2389			None => {
2390				let block_number = target_number.saturating_sub(i as u32);
2391				gum::info!(
2392					target: LOG_TARGET,
2393					unknown_number = ?block_number,
2394					unknown_hash = ?block_hash,
2395					"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2396					target,
2397					target_number,
2398					lower_bound,
2399					lower_bound,
2400				);
2401				return Ok(None)
2402			},
2403			Some(b) => b,
2404		};
2405
2406		// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
2407		// disk lookups.
2408		bits.push(entry.is_fully_approved());
2409		if entry.is_fully_approved() {
2410			if all_approved_max.is_none() {
2411				// First iteration of the loop is target, i = 0. After that,
2412				// ancestry is moving backwards.
2413				all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2414			}
2415			block_descriptions.push(BlockDescription {
2416				block_hash,
2417				session: entry.session(),
2418				candidates: entry
2419					.candidates()
2420					.iter()
2421					.map(|(_idx, candidate_hash)| *candidate_hash)
2422					.collect(),
2423			});
2424		} else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2425			all_approved_max = None;
2426			block_descriptions.clear();
2427		} else {
2428			all_approved_max = None;
2429			block_descriptions.clear();
2430
2431			let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2432			gum::debug!(
2433				target: LOG_TARGET,
2434				"Block {} is {} blocks deep and has {}/{} candidates unapproved",
2435				block_hash,
2436				bits.len() - 1,
2437				unapproved.len(),
2438				entry.candidates().len(),
2439			);
2440			if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2441			{
2442				gum::trace!(
2443					target: LOG_TARGET,
2444					?block_hash,
2445					"Unapproved candidates at depth {}: {:?}",
2446					bits.len(),
2447					unapproved
2448				)
2449			}
2450			metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2451			for candidate_hash in unapproved {
2452				match db.load_candidate_entry(&candidate_hash)? {
2453					None => {
2454						gum::warn!(
2455							target: LOG_TARGET,
2456							?candidate_hash,
2457							"Missing expected candidate in DB",
2458						);
2459
2460						continue
2461					},
2462					Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2463						None => {
2464							gum::warn!(
2465								target: LOG_TARGET,
2466								?candidate_hash,
2467								?block_hash,
2468								"Missing expected approval entry under candidate.",
2469							);
2470						},
2471						Some(a_entry) => {
2472							let status = || {
2473								let n_assignments = a_entry.n_assignments();
2474
2475								// Take the approvals, filtered by the assignments
2476								// for this block.
2477								let n_approvals = c_entry
2478									.approvals()
2479									.iter()
2480									.by_vals()
2481									.enumerate()
2482									.filter(|(i, approved)| {
2483										*approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2484									})
2485									.count();
2486
2487								format!(
2488									"{}/{}/{}",
2489									n_assignments,
2490									n_approvals,
2491									a_entry.n_validators(),
2492								)
2493							};
2494
2495							match a_entry.our_assignment() {
2496								None => gum::debug!(
2497									target: LOG_TARGET,
2498									?candidate_hash,
2499									?block_hash,
2500									status = %status(),
2501									"no assignment."
2502								),
2503								Some(a) => {
2504									let tranche = a.tranche();
2505									let triggered = a.triggered();
2506
2507									let next_wakeup =
2508										wakeups.wakeup_for(block_hash, candidate_hash);
2509
2510									let approved =
2511										triggered && { a_entry.local_statements().1.is_some() };
2512
2513									gum::debug!(
2514										target: LOG_TARGET,
2515										?candidate_hash,
2516										?block_hash,
2517										tranche,
2518										?next_wakeup,
2519										status = %status(),
2520										triggered,
2521										approved,
2522										"assigned."
2523									);
2524								},
2525							}
2526						},
2527					},
2528				}
2529			}
2530		}
2531	}
2532
2533	gum::debug!(
2534		target: LOG_TARGET,
2535		"approved blocks {}-[{}]-{}",
2536		target_number,
2537		{
2538			// formatting to divide bits by groups of 10.
2539			// when comparing logs on multiple machines where the exact vote
2540			// targets may differ, this grouping is useful.
2541			let mut s = String::with_capacity(bits.len());
2542			for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2543				s.push(if *bit { '1' } else { '0' });
2544				if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 {
2545					s.push(' ');
2546				}
2547			}
2548
2549			s
2550		},
2551		if bits.len() > MAX_TRACING_WINDOW {
2552			format!(
2553				"{}... (truncated due to large window)",
2554				target_number - MAX_TRACING_WINDOW as u32 + 1,
2555			)
2556		} else {
2557			format!("{}", lower_bound + 1)
2558		},
2559	);
2560
2561	// `reverse()` to obtain the ascending order from lowest to highest
2562	// block within the candidates, which is the expected order
2563	block_descriptions.reverse();
2564
2565	let all_approved_max =
2566		all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2567			hash,
2568			number: block_number,
2569			descriptions: block_descriptions,
2570		});
2571
2572	Ok(all_approved_max)
2573}
2574
2575// `Option::cmp` treats `None` as less than `Some`.
2576fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2577	match (a, b) {
2578		(None, None) => None,
2579		(None, Some(x)) | (Some(x), None) => Some(x),
2580		(Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2581	}
2582}
2583
2584fn schedule_wakeup_action(
2585	approval_entry: &ApprovalEntry,
2586	block_hash: Hash,
2587	block_number: BlockNumber,
2588	candidate_hash: CandidateHash,
2589	block_tick: Tick,
2590	tick_now: Tick,
2591	required_tranches: RequiredTranches,
2592) -> Option<Action> {
2593	let maybe_action = match required_tranches {
2594		_ if approval_entry.is_approved() => None,
2595		RequiredTranches::All => None,
2596		RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2597			// Take the earlier of the next no show or the last assignment tick + required delay,
2598			// only considering the latter if it is after the current moment.
2599			min_prefer_some(
2600				last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2601				next_no_show,
2602			)
2603			.map(|tick| Action::ScheduleWakeup {
2604				block_hash,
2605				block_number,
2606				candidate_hash,
2607				tick,
2608			})
2609		},
2610		RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2611			// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
2612			// after `considered`, including any tranche that might contain our own untriggered
2613			// assignment.
2614			let next_non_empty_tranche = {
2615				let next_announced = approval_entry
2616					.tranches()
2617					.iter()
2618					.skip_while(|t| t.tranche() <= considered)
2619					.map(|t| t.tranche())
2620					.next();
2621
2622				let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2623					if !t.triggered() && t.tranche() > considered {
2624						Some(t.tranche())
2625					} else {
2626						None
2627					}
2628				});
2629
2630				// Apply the clock drift to these tranches.
2631				min_prefer_some(next_announced, our_untriggered)
2632					.map(|t| t as Tick + block_tick + clock_drift)
2633			};
2634
2635			min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2636				Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2637			})
2638		},
2639	};
2640
2641	match maybe_action {
2642		Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2643			target: LOG_TARGET,
2644			tick,
2645			?candidate_hash,
2646			?block_hash,
2647			block_tick,
2648			"Scheduling next wakeup.",
2649		),
2650		None => gum::trace!(
2651			target: LOG_TARGET,
2652			?candidate_hash,
2653			?block_hash,
2654			block_tick,
2655			"No wakeup needed.",
2656		),
2657		Some(_) => {}, // unreachable
2658	}
2659
2660	maybe_action
2661}
2662
2663async fn import_assignment<Sender>(
2664	sender: &mut Sender,
2665	state: &State,
2666	db: &mut OverlayedBackend<'_, impl Backend>,
2667	session_info_provider: &mut RuntimeInfo,
2668	checked_assignment: CheckedIndirectAssignment,
2669) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2670where
2671	Sender: SubsystemSender<RuntimeApiMessage>,
2672{
2673	let tick_now = state.clock.tick_now();
2674	let assignment = checked_assignment.assignment();
2675	let candidate_indices = checked_assignment.candidate_indices();
2676	let tranche = checked_assignment.tranche();
2677
2678	let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2679		Some(b) => b,
2680		None =>
2681			return Ok((
2682				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2683					assignment.block_hash,
2684				)),
2685				Vec::new(),
2686			)),
2687	};
2688
2689	let session_info = match get_session_info(
2690		session_info_provider,
2691		sender,
2692		block_entry.parent_hash(),
2693		block_entry.session(),
2694	)
2695	.await
2696	{
2697		Some(s) => s,
2698		None =>
2699			return Ok((
2700				AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2701					block_entry.session(),
2702				)),
2703				Vec::new(),
2704			)),
2705	};
2706
2707	let n_cores = session_info.n_cores as usize;
2708
2709	// Early check the candidate bitfield and core bitfields lengths < `n_cores`.
2710	// Core bitfield length is checked later in `check_assignment_cert`.
2711	if candidate_indices.len() > n_cores {
2712		gum::debug!(
2713			target: LOG_TARGET,
2714			validator = assignment.validator.0,
2715			n_cores,
2716			candidate_bitfield_len = ?candidate_indices.len(),
2717			"Oversized bitfield",
2718		);
2719
2720		return Ok((
2721			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2722				candidate_indices.len(),
2723			)),
2724			Vec::new(),
2725		))
2726	}
2727
2728	let mut claimed_core_indices = Vec::new();
2729	let mut assigned_candidate_hashes = Vec::new();
2730
2731	for candidate_index in candidate_indices.iter_ones() {
2732		let (claimed_core_index, assigned_candidate_hash) =
2733			match block_entry.candidate(candidate_index) {
2734				Some((c, h)) => (*c, *h),
2735				None =>
2736					return Ok((
2737						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2738							candidate_index as _,
2739						)),
2740						Vec::new(),
2741					)), // no candidate at core.
2742			};
2743
2744		let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2745			Some(c) => c,
2746			None =>
2747				return Ok((
2748					AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2749						candidate_index as _,
2750						assigned_candidate_hash,
2751					)),
2752					Vec::new(),
2753				)), // no candidate at core.
2754		};
2755
2756		if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2757			return Ok((
2758				AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2759					assignment.block_hash,
2760					assigned_candidate_hash,
2761				)),
2762				Vec::new(),
2763			));
2764		};
2765
2766		claimed_core_indices.push(claimed_core_index);
2767		assigned_candidate_hashes.push(assigned_candidate_hash);
2768	}
2769
2770	// Error on null assignments.
2771	if claimed_core_indices.is_empty() {
2772		return Ok((
2773			AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2774				assignment.validator,
2775				format!("{:?}", InvalidAssignmentReason::NullAssignment),
2776			)),
2777			Vec::new(),
2778		))
2779	}
2780
2781	let mut actions = Vec::new();
2782	let res = {
2783		let mut is_duplicate = true;
2784		// Import the assignments for all cores in the cert.
2785		for (assigned_candidate_hash, candidate_index) in
2786			assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2787		{
2788			let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2789				Some(c) => c,
2790				None =>
2791					return Ok((
2792						AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2793							candidate_index as _,
2794							*assigned_candidate_hash,
2795						)),
2796						Vec::new(),
2797					)),
2798			};
2799
2800			let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2801				Some(a) => a,
2802				None =>
2803					return Ok((
2804						AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2805							assignment.block_hash,
2806							*assigned_candidate_hash,
2807						)),
2808						Vec::new(),
2809					)),
2810			};
2811
2812			let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2813			is_duplicate &= is_duplicate_for_candidate;
2814			approval_entry.import_assignment(
2815				tranche,
2816				assignment.validator,
2817				tick_now,
2818				is_duplicate_for_candidate,
2819			);
2820
2821			// We've imported a new assignment, so we need to schedule a wake-up for when that might
2822			// no-show.
2823			if let Some((approval_entry, status)) = state
2824				.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2825				.await
2826			{
2827				actions.extend(schedule_wakeup_action(
2828					approval_entry,
2829					block_entry.block_hash(),
2830					block_entry.block_number(),
2831					*assigned_candidate_hash,
2832					status.block_tick,
2833					tick_now,
2834					status.required_tranches,
2835				));
2836			}
2837
2838			// We also write the candidate entry as it now contains the new candidate.
2839			db.write_candidate_entry(candidate_entry.into());
2840		}
2841
2842		// Since we don't account for tranche in distribution message fingerprinting, some
2843		// validators can be assigned to the same core (VRF modulo vs VRF delay). These can be
2844		// safely ignored. However, if an assignment is for multiple cores (these are only
2845		// tranche0), we cannot ignore it, because it would mean ignoring other non duplicate
2846		// assignments.
2847		if is_duplicate {
2848			AssignmentCheckResult::AcceptedDuplicate
2849		} else if candidate_indices.count_ones() > 1 {
2850			gum::trace!(
2851				target: LOG_TARGET,
2852				validator = assignment.validator.0,
2853				candidate_hashes = ?assigned_candidate_hashes,
2854				assigned_cores = ?claimed_core_indices,
2855				?tranche,
2856				"Imported assignments for multiple cores.",
2857			);
2858
2859			AssignmentCheckResult::Accepted
2860		} else {
2861			gum::trace!(
2862				target: LOG_TARGET,
2863				validator = assignment.validator.0,
2864				candidate_hashes = ?assigned_candidate_hashes,
2865				assigned_cores = ?claimed_core_indices,
2866				"Imported assignment for a single core.",
2867			);
2868
2869			AssignmentCheckResult::Accepted
2870		}
2871	};
2872
2873	Ok((res, actions))
2874}
2875
2876async fn import_approval<Sender>(
2877	sender: &mut Sender,
2878	state: &mut State,
2879	db: &mut OverlayedBackend<'_, impl Backend>,
2880	session_info_provider: &mut RuntimeInfo,
2881	metrics: &Metrics,
2882	approval: CheckedIndirectSignedApprovalVote,
2883	wakeups: &Wakeups,
2884) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2885where
2886	Sender: SubsystemSender<RuntimeApiMessage>,
2887{
2888	macro_rules! respond_early {
2889		($e: expr) => {{
2890			return Ok((Vec::new(), $e))
2891		}};
2892	}
2893
2894	let block_entry = match db.load_block_entry(&approval.block_hash)? {
2895		Some(b) => b,
2896		None => {
2897			respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2898				approval.block_hash
2899			),))
2900		},
2901	};
2902
2903	let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2904		approval
2905			.candidate_indices
2906			.iter_ones()
2907			.map(|candidate_index| {
2908				block_entry
2909					.candidate(candidate_index)
2910					.ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2911					.map(|candidate| (candidate_index as _, candidate.1))
2912			})
2913			.collect();
2914
2915	let approved_candidates_info = match approved_candidates_info {
2916		Ok(approved_candidates_info) => approved_candidates_info,
2917		Err(err) => {
2918			respond_early!(ApprovalCheckResult::Bad(err))
2919		},
2920	};
2921
2922	gum::trace!(
2923		target: LOG_TARGET,
2924		"Received approval for num_candidates {:}",
2925		approval.candidate_indices.count_ones()
2926	);
2927
2928	let mut actions = Vec::new();
2929	for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2930		let block_entry = match db.load_block_entry(&approval.block_hash)? {
2931			Some(b) => b,
2932			None => {
2933				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2934					approval.block_hash
2935				),))
2936			},
2937		};
2938
2939		let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2940			Some(c) => c,
2941			None => {
2942				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2943					approval_candidate_index,
2944					approved_candidate_hash
2945				),))
2946			},
2947		};
2948
2949		// Don't accept approvals until assignment.
2950		match candidate_entry.approval_entry(&approval.block_hash) {
2951			None => {
2952				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2953					approval.block_hash,
2954					approved_candidate_hash
2955				),))
2956			},
2957			Some(e) if !e.is_assigned(approval.validator) => {
2958				respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2959					approval.validator
2960				),))
2961			},
2962			_ => {},
2963		}
2964
2965		gum::trace!(
2966			target: LOG_TARGET,
2967			validator_index = approval.validator.0,
2968			candidate_hash = ?approved_candidate_hash,
2969			para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2970			"Importing approval vote",
2971		);
2972
2973		let new_actions = advance_approval_state(
2974			sender,
2975			state,
2976			db,
2977			session_info_provider,
2978			&metrics,
2979			block_entry,
2980			approved_candidate_hash,
2981			candidate_entry,
2982			ApprovalStateTransition::RemoteApproval(approval.validator),
2983			wakeups,
2984		)
2985		.await;
2986		actions.extend(new_actions);
2987	}
2988
2989	// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
2990	Ok((actions, ApprovalCheckResult::Accepted))
2991}
2992
2993#[derive(Debug)]
2994enum ApprovalStateTransition {
2995	RemoteApproval(ValidatorIndex),
2996	LocalApproval(ValidatorIndex),
2997	WakeupProcessed,
2998}
2999
3000impl ApprovalStateTransition {
3001	fn validator_index(&self) -> Option<ValidatorIndex> {
3002		match *self {
3003			ApprovalStateTransition::RemoteApproval(v) |
3004			ApprovalStateTransition::LocalApproval(v) => Some(v),
3005			ApprovalStateTransition::WakeupProcessed => None,
3006		}
3007	}
3008
3009	fn is_local_approval(&self) -> bool {
3010		match *self {
3011			ApprovalStateTransition::RemoteApproval(_) => false,
3012			ApprovalStateTransition::LocalApproval(_) => true,
3013			ApprovalStateTransition::WakeupProcessed => false,
3014		}
3015	}
3016
3017	fn is_remote_approval(&self) -> bool {
3018		matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3019	}
3020}
3021
3022// Advance the approval state, either by importing an approval vote which is already checked to be
3023// valid and corresponding to an assigned validator on the candidate and block, or by noting that
3024// there are no further wakeups or tranches needed. This updates the block entry and candidate entry
3025// as necessary and schedules any further wakeups.
3026async fn advance_approval_state<Sender>(
3027	sender: &mut Sender,
3028	state: &mut State,
3029	db: &mut OverlayedBackend<'_, impl Backend>,
3030	session_info_provider: &mut RuntimeInfo,
3031	metrics: &Metrics,
3032	mut block_entry: BlockEntry,
3033	candidate_hash: CandidateHash,
3034	mut candidate_entry: CandidateEntry,
3035	transition: ApprovalStateTransition,
3036	wakeups: &Wakeups,
3037) -> Vec<Action>
3038where
3039	Sender: SubsystemSender<RuntimeApiMessage>,
3040{
3041	let validator_index = transition.validator_index();
3042
3043	let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3044	let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3045
3046	// Check for early exits.
3047	//
3048	// If the candidate was approved
3049	// but not the block, it means that we still need more approvals for the candidate under the
3050	// block.
3051	//
3052	// If the block was approved, but the validator hadn't approved it yet, we should still hold
3053	// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
3054	// assignment might manifest as a no-show.
3055	if !transition.is_local_approval() {
3056		// We don't store remote votes and there's nothing to store for processed wakeups,
3057		// so we can early exit as long at the candidate is already concluded under the
3058		// block i.e. we don't need more approvals.
3059		if candidate_approved_in_block {
3060			return Vec::new()
3061		}
3062	}
3063
3064	let mut actions = Vec::new();
3065	let block_hash = block_entry.block_hash();
3066	let block_number = block_entry.block_number();
3067	let session_index = block_entry.session();
3068	let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3069	let tick_now = state.clock.tick_now();
3070
3071	let (is_approved, status) = if let Some((approval_entry, status)) = state
3072		.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3073		.await
3074	{
3075		let check = approval_checking::check_approval(
3076			&candidate_entry,
3077			approval_entry,
3078			status.required_tranches.clone(),
3079		);
3080		state.observe_assignment_gathering_status(
3081			&metrics,
3082			&status.required_tranches,
3083			block_hash,
3084			block_entry.block_number(),
3085			candidate_hash,
3086		);
3087
3088		// Check whether this is approved, while allowing a maximum
3089		// assignment tick of `now - APPROVAL_DELAY` - that is, that
3090		// all counted assignments are at least `APPROVAL_DELAY` ticks old.
3091		let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3092		if status.last_no_shows != 0 {
3093			metrics.on_observed_no_shows(status.last_no_shows);
3094			gum::trace!(
3095				target: LOG_TARGET,
3096				?candidate_hash,
3097				?block_hash,
3098				last_no_shows = ?status.last_no_shows,
3099				"Observed no_shows",
3100			);
3101		}
3102		if is_approved {
3103			gum::trace!(
3104				target: LOG_TARGET,
3105				?candidate_hash,
3106				?block_hash,
3107				"Candidate approved under block.",
3108			);
3109
3110			let no_shows = check.known_no_shows();
3111
3112			let was_block_approved = block_entry.is_fully_approved();
3113			block_entry.mark_approved_by_hash(&candidate_hash);
3114			let is_block_approved = block_entry.is_fully_approved();
3115
3116			if no_shows != 0 {
3117				metrics.on_no_shows(no_shows);
3118			}
3119			if check == Check::ApprovedOneThird {
3120				// No-shows are not counted when more than one third of validators approve a
3121				// candidate, so count candidates where more than one third of validators had to
3122				// approve it, this is indicative of something breaking.
3123				metrics.on_approved_by_one_third()
3124			}
3125
3126			metrics.on_candidate_approved(status.tranche_now as _);
3127
3128			if is_block_approved && !was_block_approved {
3129				metrics.on_block_approved(status.tranche_now as _);
3130				actions.push(Action::NoteApprovedInChainSelection(block_hash));
3131			}
3132
3133			db.write_block_entry(block_entry.into());
3134		} else if transition.is_local_approval() {
3135			// Local approvals always update the block_entry, so we need to flush it to
3136			// the database.
3137			db.write_block_entry(block_entry.into());
3138		}
3139
3140		(is_approved, status)
3141	} else {
3142		gum::warn!(
3143			target: LOG_TARGET,
3144			?candidate_hash,
3145			?block_hash,
3146			?validator_index,
3147			"No approval entry for approval under block",
3148		);
3149
3150		return Vec::new()
3151	};
3152
3153	{
3154		let approval_entry = candidate_entry
3155			.approval_entry_mut(&block_hash)
3156			.expect("Approval entry just fetched; qed");
3157
3158		let was_approved = approval_entry.is_approved();
3159		let newly_approved = is_approved && !was_approved;
3160
3161		if is_approved {
3162			approval_entry.mark_approved();
3163		}
3164		if newly_approved {
3165			state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3166		}
3167		actions.extend(schedule_wakeup_action(
3168			&approval_entry,
3169			block_hash,
3170			block_number,
3171			candidate_hash,
3172			status.block_tick,
3173			tick_now,
3174			status.required_tranches,
3175		));
3176
3177		if is_approved && transition.is_remote_approval() {
3178			// Make sure we wake other blocks in case they have
3179			// a no-show that might be covered by this approval.
3180			for (fork_block_hash, fork_approval_entry) in candidate_entry
3181				.block_assignments
3182				.iter()
3183				.filter(|(hash, _)| **hash != block_hash)
3184			{
3185				let assigned_on_fork_block = validator_index
3186					.as_ref()
3187					.map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3188					.unwrap_or_default();
3189				if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3190					!fork_approval_entry.is_approved() &&
3191					assigned_on_fork_block
3192				{
3193					let fork_block_entry = db.load_block_entry(fork_block_hash);
3194					if let Ok(Some(fork_block_entry)) = fork_block_entry {
3195						actions.push(Action::ScheduleWakeup {
3196							block_hash: *fork_block_hash,
3197							block_number: fork_block_entry.block_number(),
3198							candidate_hash,
3199							// Schedule the wakeup next tick, since the assignment must be a
3200							// no-show, because there is no-wakeup scheduled.
3201							tick: tick_now + 1,
3202						})
3203					} else {
3204						gum::debug!(
3205							target: LOG_TARGET,
3206							?fork_block_entry,
3207							?fork_block_hash,
3208							"Failed to load block entry"
3209						)
3210					}
3211				}
3212			}
3213		}
3214		// We have no need to write the candidate entry if all of the following
3215		// is true:
3216		//
3217		// 1. This is not a local approval, as we don't store anything new in the approval entry.
3218		// 2. The candidate is not newly approved, as we haven't altered the approval entry's
3219		//    approved flag with `mark_approved` above.
3220		// 3. The approver, if any, had already approved the candidate, as we haven't altered the
3221		// bitfield.
3222		if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3223		{
3224			// In all other cases, we need to write the candidate entry.
3225			db.write_candidate_entry(candidate_entry);
3226		}
3227	}
3228
3229	actions
3230}
3231
3232fn should_trigger_assignment(
3233	approval_entry: &ApprovalEntry,
3234	candidate_entry: &CandidateEntry,
3235	required_tranches: RequiredTranches,
3236	tranche_now: DelayTranche,
3237) -> bool {
3238	match approval_entry.our_assignment() {
3239		None => false,
3240		Some(ref assignment) if assignment.triggered() => false,
3241		Some(ref assignment) if assignment.tranche() == 0 => true,
3242		Some(ref assignment) => {
3243			match required_tranches {
3244				RequiredTranches::All => !approval_checking::check_approval(
3245					&candidate_entry,
3246					&approval_entry,
3247					RequiredTranches::All,
3248				)
3249				// when all are required, we are just waiting for the first 1/3+
3250				.is_approved(Tick::max_value()),
3251				RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3252					let drifted_tranche_now =
3253						tranche_now.saturating_sub(clock_drift as DelayTranche);
3254					assignment.tranche() <= maximum_broadcast &&
3255						assignment.tranche() <= drifted_tranche_now
3256				},
3257				RequiredTranches::Exact { .. } => {
3258					// indicates that no new assignments are needed at the moment.
3259					false
3260				},
3261			}
3262		},
3263	}
3264}
3265
3266async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3267	sender: &mut Sender,
3268	state: &mut State,
3269	db: &mut OverlayedBackend<'_, impl Backend>,
3270	session_info_provider: &mut RuntimeInfo,
3271	relay_block: Hash,
3272	candidate_hash: CandidateHash,
3273	metrics: &Metrics,
3274	wakeups: &Wakeups,
3275) -> SubsystemResult<Vec<Action>> {
3276	let block_entry = db.load_block_entry(&relay_block)?;
3277	let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3278
3279	// If either is not present, we have nothing to wakeup. Might have lost a race with finality
3280	let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3281		(Some(b), Some(c)) => (b, c),
3282		_ => return Ok(Vec::new()),
3283	};
3284
3285	let ExtendedSessionInfo { ref session_info, ref executor_params, .. } =
3286		match get_extended_session_info(
3287			session_info_provider,
3288			sender,
3289			block_entry.block_hash(),
3290			block_entry.session(),
3291		)
3292		.await
3293		{
3294			Some(i) => i,
3295			None => return Ok(Vec::new()),
3296		};
3297
3298	let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3299	let no_show_duration = slot_number_to_tick(
3300		state.slot_duration_millis,
3301		Slot::from(u64::from(session_info.no_show_slots)),
3302	);
3303	let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3304
3305	gum::trace!(
3306		target: LOG_TARGET,
3307		tranche = tranche_now,
3308		?candidate_hash,
3309		block_hash = ?relay_block,
3310		"Processing wakeup",
3311	);
3312
3313	let (should_trigger, backing_group) = {
3314		let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3315			Some(e) => e,
3316			None => return Ok(Vec::new()),
3317		};
3318
3319		let tranches_to_approve = approval_checking::tranches_to_approve(
3320			&approval_entry,
3321			candidate_entry.approvals(),
3322			tranche_now,
3323			block_tick,
3324			no_show_duration,
3325			session_info.needed_approvals as _,
3326		);
3327
3328		let should_trigger = should_trigger_assignment(
3329			&approval_entry,
3330			&candidate_entry,
3331			tranches_to_approve.required_tranches,
3332			tranche_now,
3333		);
3334
3335		(should_trigger, approval_entry.backing_group())
3336	};
3337
3338	gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3339
3340	let mut actions = Vec::new();
3341	let candidate_receipt = candidate_entry.candidate_receipt().clone();
3342
3343	let maybe_cert = if should_trigger {
3344		let maybe_cert = {
3345			let approval_entry = candidate_entry
3346				.approval_entry_mut(&relay_block)
3347				.expect("should_trigger only true if this fetched earlier; qed");
3348
3349			approval_entry.trigger_our_assignment(state.clock.tick_now())
3350		};
3351
3352		db.write_candidate_entry(candidate_entry.clone());
3353
3354		maybe_cert
3355	} else {
3356		None
3357	};
3358
3359	if let Some((cert, val_index, tranche)) = maybe_cert {
3360		let indirect_cert =
3361			IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3362
3363		gum::trace!(
3364			target: LOG_TARGET,
3365			?candidate_hash,
3366			para_id = ?candidate_receipt.descriptor.para_id(),
3367			block_hash = ?relay_block,
3368			"Launching approval work.",
3369		);
3370
3371		let candidate_core_index = block_entry
3372			.candidates()
3373			.iter()
3374			.find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3375
3376		if let Some(claimed_core_indices) =
3377			get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3378		{
3379			match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3380				Ok(claimed_candidate_indices) => {
3381					// Ensure we distribute multiple core assignments just once.
3382					let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3383						!block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3384					} else {
3385						true
3386					};
3387					db.write_block_entry(block_entry.clone());
3388					actions.push(Action::LaunchApproval {
3389						claimed_candidate_indices,
3390						candidate_hash,
3391						indirect_cert,
3392						assignment_tranche: tranche,
3393						relay_block_hash: relay_block,
3394						session: block_entry.session(),
3395						executor_params: executor_params.clone(),
3396						candidate: candidate_receipt,
3397						backing_group,
3398						distribute_assignment,
3399						core_index: candidate_core_index,
3400					});
3401				},
3402				Err(err) => {
3403					// Never happens, it should only happen if no cores are claimed, which is a
3404					// bug.
3405					gum::warn!(
3406						target: LOG_TARGET,
3407						block_hash = ?relay_block,
3408						?err,
3409						"Failed to create assignment bitfield"
3410					);
3411				},
3412			};
3413		} else {
3414			gum::warn!(
3415				target: LOG_TARGET,
3416				block_hash = ?relay_block,
3417				?candidate_hash,
3418				"Cannot get assignment claimed core indices",
3419			);
3420		}
3421	}
3422	// Although we checked approval earlier in this function,
3423	// this wakeup might have advanced the state to approved via
3424	// a no-show that was immediately covered and therefore
3425	// we need to check for that and advance the state on-disk.
3426	//
3427	// Note that this function also schedules a wakeup as necessary.
3428	actions.extend(
3429		advance_approval_state(
3430			sender,
3431			state,
3432			db,
3433			session_info_provider,
3434			metrics,
3435			block_entry,
3436			candidate_hash,
3437			candidate_entry,
3438			ApprovalStateTransition::WakeupProcessed,
3439			wakeups,
3440		)
3441		.await,
3442	);
3443
3444	Ok(actions)
3445}
3446
3447// Launch approval work, returning an `AbortHandle` which corresponds to the background task
3448// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
3449// to cancel the background work and any requests it has spawned.
3450#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3451async fn launch_approval<
3452	Sender: SubsystemSender<RuntimeApiMessage>
3453		+ SubsystemSender<AvailabilityRecoveryMessage>
3454		+ SubsystemSender<DisputeCoordinatorMessage>
3455		+ SubsystemSender<CandidateValidationMessage>,
3456>(
3457	mut sender: Sender,
3458	spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3459	metrics: Metrics,
3460	session_index: SessionIndex,
3461	candidate: CandidateReceipt,
3462	validator_index: ValidatorIndex,
3463	block_hash: Hash,
3464	backing_group: GroupIndex,
3465	executor_params: ExecutorParams,
3466	core_index: Option<CoreIndex>,
3467	retry: RetryApprovalInfo,
3468) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3469	let (a_tx, a_rx) = oneshot::channel();
3470	let (code_tx, code_rx) = oneshot::channel();
3471
3472	// The background future returned by this function may
3473	// be dropped before completing. This guard is used to ensure that the approval
3474	// work is correctly counted as stale even if so.
3475	struct StaleGuard(Option<Metrics>);
3476
3477	impl StaleGuard {
3478		fn take(mut self) -> Metrics {
3479			self.0.take().expect(
3480				"
3481				consumed after take; so this cannot be called twice; \
3482				nothing in this function reaches into the struct to avoid this API; \
3483				qed
3484			",
3485			)
3486		}
3487	}
3488
3489	impl Drop for StaleGuard {
3490		fn drop(&mut self) {
3491			if let Some(metrics) = self.0.as_ref() {
3492				metrics.on_approval_stale();
3493			}
3494		}
3495	}
3496
3497	let candidate_hash = candidate.hash();
3498	let para_id = candidate.descriptor.para_id();
3499	let mut next_retry = None;
3500	gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3501
3502	let timer = metrics.time_recover_and_approve();
3503	sender
3504		.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3505			candidate.clone(),
3506			session_index,
3507			Some(backing_group),
3508			core_index,
3509			a_tx,
3510		))
3511		.await;
3512
3513	sender
3514		.send_message(RuntimeApiMessage::Request(
3515			block_hash,
3516			RuntimeApiRequest::ValidationCodeByHash(
3517				candidate.descriptor.validation_code_hash(),
3518				code_tx,
3519			),
3520		))
3521		.await;
3522
3523	let candidate = candidate.clone();
3524	let metrics_guard = StaleGuard(Some(metrics));
3525	let background = async move {
3526		// Force the move of the timer into the background task.
3527		let _timer = timer;
3528		let available_data = match a_rx.await {
3529			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3530			Ok(Ok(a)) => a,
3531			Ok(Err(e)) => {
3532				match &e {
3533					&RecoveryError::Unavailable => {
3534						gum::warn!(
3535							target: LOG_TARGET,
3536							?para_id,
3537							?candidate_hash,
3538							attempts_remaining = retry.attempts_remaining,
3539							"Data unavailable for candidate {:?}",
3540							(candidate_hash, candidate.descriptor.para_id()),
3541						);
3542						// Availability could fail if we did not discover much of the network, so
3543						// let's back off and order the subsystem to retry at a later point if the
3544						// approval is still needed, because no-show wasn't covered yet.
3545						if retry.attempts_remaining > 0 {
3546							Delay::new(retry.backoff).await;
3547							next_retry = Some(RetryApprovalInfo {
3548								candidate,
3549								backing_group,
3550								executor_params,
3551								core_index,
3552								session_index,
3553								attempts_remaining: retry.attempts_remaining - 1,
3554								backoff: retry.backoff,
3555							});
3556						} else {
3557							next_retry = None;
3558						}
3559						metrics_guard.take().on_approval_unavailable();
3560					},
3561					&RecoveryError::ChannelClosed => {
3562						gum::warn!(
3563							target: LOG_TARGET,
3564							?para_id,
3565							?candidate_hash,
3566							"Channel closed while recovering data for candidate {:?}",
3567							(candidate_hash, candidate.descriptor.para_id()),
3568						);
3569						// do nothing. we'll just be a no-show and that'll cause others to rise up.
3570						metrics_guard.take().on_approval_unavailable();
3571					},
3572					&RecoveryError::Invalid => {
3573						gum::warn!(
3574							target: LOG_TARGET,
3575							?para_id,
3576							?candidate_hash,
3577							"Data recovery invalid for candidate {:?}",
3578							(candidate_hash, candidate.descriptor.para_id()),
3579						);
3580						issue_local_invalid_statement(
3581							&mut sender,
3582							session_index,
3583							candidate_hash,
3584							candidate.clone(),
3585						);
3586						metrics_guard.take().on_approval_invalid();
3587					},
3588				}
3589				return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
3590			},
3591		};
3592
3593		let validation_code = match code_rx.await {
3594			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3595			Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3596			Ok(Ok(Some(code))) => code,
3597			Ok(Ok(None)) => {
3598				gum::warn!(
3599					target: LOG_TARGET,
3600					"Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3601					candidate.descriptor.relay_parent(),
3602					block_hash,
3603				);
3604
3605				// No dispute necessary, as this indicates that the chain is not behaving
3606				// according to expectations.
3607				metrics_guard.take().on_approval_unavailable();
3608				return ApprovalState::failed(validator_index, candidate_hash)
3609			},
3610		};
3611
3612		let (val_tx, val_rx) = oneshot::channel();
3613		sender
3614			.send_message(CandidateValidationMessage::ValidateFromExhaustive {
3615				validation_data: available_data.validation_data,
3616				validation_code,
3617				candidate_receipt: candidate.clone(),
3618				pov: available_data.pov,
3619				executor_params,
3620				exec_kind: PvfExecKind::Approval,
3621				response_sender: val_tx,
3622			})
3623			.await;
3624
3625		match val_rx.await {
3626			Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3627			Ok(Ok(ValidationResult::Valid(_, _))) => {
3628				// Validation checked out. Issue an approval command. If the underlying service is
3629				// unreachable, then there isn't anything we can do.
3630
3631				gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3632
3633				let _ = metrics_guard.take();
3634				return ApprovalState::approved(validator_index, candidate_hash)
3635			},
3636			Ok(Ok(ValidationResult::Invalid(reason))) => {
3637				gum::warn!(
3638					target: LOG_TARGET,
3639					?reason,
3640					?candidate_hash,
3641					?para_id,
3642					"Detected invalid candidate as an approval checker.",
3643				);
3644
3645				issue_local_invalid_statement(
3646					&mut sender,
3647					session_index,
3648					candidate_hash,
3649					candidate.clone(),
3650				);
3651				metrics_guard.take().on_approval_invalid();
3652				return ApprovalState::failed(validator_index, candidate_hash)
3653			},
3654			Ok(Err(e)) => {
3655				gum::error!(
3656					target: LOG_TARGET,
3657					err = ?e,
3658					?candidate_hash,
3659					?para_id,
3660					"Failed to validate candidate due to internal error",
3661				);
3662				metrics_guard.take().on_approval_error();
3663				return ApprovalState::failed(validator_index, candidate_hash)
3664			},
3665		}
3666	};
3667	let (background, remote_handle) = background.remote_handle();
3668	spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3669	Ok(remote_handle)
3670}
3671
3672// Issue and import a local approval vote. Should only be invoked after approval checks
3673// have been done.
3674#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3675async fn issue_approval<
3676	Sender: SubsystemSender<RuntimeApiMessage>,
3677	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3678>(
3679	sender: &mut Sender,
3680	approval_voting_sender: &mut ADSender,
3681	state: &mut State,
3682	db: &mut OverlayedBackend<'_, impl Backend>,
3683	session_info_provider: &mut RuntimeInfo,
3684	metrics: &Metrics,
3685	candidate_hash: CandidateHash,
3686	delayed_approvals_timers: &mut DelayedApprovalTimer,
3687	ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3688	wakeups: &Wakeups,
3689) -> SubsystemResult<Vec<Action>> {
3690	let mut block_entry = match db.load_block_entry(&block_hash)? {
3691		Some(b) => b,
3692		None => {
3693			// not a cause for alarm - just lost a race with pruning, most likely.
3694			metrics.on_approval_stale();
3695			return Ok(Vec::new())
3696		},
3697	};
3698
3699	let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3700	{
3701		None => {
3702			gum::warn!(
3703				target: LOG_TARGET,
3704				"Candidate hash {} is not present in the block entry's candidates for relay block {}",
3705				candidate_hash,
3706				block_entry.parent_hash(),
3707			);
3708
3709			metrics.on_approval_error();
3710			return Ok(Vec::new())
3711		},
3712		Some(idx) => idx,
3713	};
3714
3715	let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3716		Some((_, h)) => *h,
3717		None => {
3718			gum::warn!(
3719				target: LOG_TARGET,
3720				"Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3721				candidate_index,
3722				block_hash,
3723			);
3724
3725			metrics.on_approval_error();
3726			return Ok(Vec::new())
3727		},
3728	};
3729
3730	let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3731		Some(c) => c,
3732		None => {
3733			gum::warn!(
3734				target: LOG_TARGET,
3735				"Missing entry for candidate index {} included at block {:?}",
3736				candidate_index,
3737				block_hash,
3738			);
3739
3740			metrics.on_approval_error();
3741			return Ok(Vec::new())
3742		},
3743	};
3744
3745	let session_info = match get_session_info(
3746		session_info_provider,
3747		sender,
3748		block_entry.parent_hash(),
3749		block_entry.session(),
3750	)
3751	.await
3752	{
3753		Some(s) => s,
3754		None => return Ok(Vec::new()),
3755	};
3756
3757	if block_entry
3758		.defer_candidate_signature(
3759			candidate_index as _,
3760			candidate_hash,
3761			compute_delayed_approval_sending_tick(
3762				state,
3763				&block_entry,
3764				&candidate_entry,
3765				session_info,
3766				&metrics,
3767			),
3768		)
3769		.is_some()
3770	{
3771		gum::error!(
3772			target: LOG_TARGET,
3773			?candidate_hash,
3774			?block_hash,
3775			validator_index = validator_index.0,
3776			"Possible bug, we shouldn't have to defer a candidate more than once",
3777		);
3778	}
3779
3780	gum::debug!(
3781		target: LOG_TARGET,
3782		?candidate_hash,
3783		?block_hash,
3784		validator_index = validator_index.0,
3785		"Ready to issue approval vote",
3786	);
3787
3788	let actions = advance_approval_state(
3789		sender,
3790		state,
3791		db,
3792		session_info_provider,
3793		metrics,
3794		block_entry,
3795		candidate_hash,
3796		candidate_entry,
3797		ApprovalStateTransition::LocalApproval(validator_index as _),
3798		wakeups,
3799	)
3800	.await;
3801
3802	if let Some(next_wakeup) = maybe_create_signature(
3803		db,
3804		session_info_provider,
3805		state,
3806		sender,
3807		approval_voting_sender,
3808		block_hash,
3809		validator_index,
3810		metrics,
3811	)
3812	.await?
3813	{
3814		delayed_approvals_timers.maybe_arm_timer(
3815			next_wakeup,
3816			state.clock.as_ref(),
3817			block_hash,
3818			validator_index,
3819		);
3820	}
3821	Ok(actions)
3822}
3823
3824// Create signature for the approved candidates pending signatures
3825#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3826async fn maybe_create_signature<
3827	Sender: SubsystemSender<RuntimeApiMessage>,
3828	ADSender: SubsystemSender<ApprovalDistributionMessage>,
3829>(
3830	db: &mut OverlayedBackend<'_, impl Backend>,
3831	session_info_provider: &mut RuntimeInfo,
3832	state: &State,
3833	sender: &mut Sender,
3834	approval_voting_sender: &mut ADSender,
3835	block_hash: Hash,
3836	validator_index: ValidatorIndex,
3837	metrics: &Metrics,
3838) -> SubsystemResult<Option<Tick>> {
3839	let mut block_entry = match db.load_block_entry(&block_hash)? {
3840		Some(b) => b,
3841		None => {
3842			// not a cause for alarm - just lost a race with pruning, most likely.
3843			metrics.on_approval_stale();
3844			gum::debug!(
3845				target: LOG_TARGET,
3846				"Could not find block that needs signature {:}", block_hash
3847			);
3848			return Ok(None)
3849		},
3850	};
3851
3852	let approval_params = state
3853		.get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3854		.await
3855		.unwrap_or_default();
3856
3857	gum::trace!(
3858		target: LOG_TARGET,
3859		"Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3860	);
3861	let tick_now = state.clock.tick_now();
3862
3863	let (candidates_to_sign, sign_no_later_then) = block_entry
3864		.get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3865
3866	let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3867		Some(candidates_to_sign) => candidates_to_sign,
3868		None => return Ok(sign_no_later_then),
3869	};
3870
3871	let session_info = match get_session_info(
3872		session_info_provider,
3873		sender,
3874		block_entry.parent_hash(),
3875		block_entry.session(),
3876	)
3877	.await
3878	{
3879		Some(s) => s,
3880		None => {
3881			metrics.on_approval_error();
3882			gum::error!(
3883				target: LOG_TARGET,
3884				"Could not retrieve the session"
3885			);
3886			return Ok(None)
3887		},
3888	};
3889
3890	let validator_pubkey = match session_info.validators.get(validator_index) {
3891		Some(p) => p,
3892		None => {
3893			gum::error!(
3894				target: LOG_TARGET,
3895				"Validator index {} out of bounds in session {}",
3896				validator_index.0,
3897				block_entry.session(),
3898			);
3899
3900			metrics.on_approval_error();
3901			return Ok(None)
3902		},
3903	};
3904
3905	let signature = match sign_approval(
3906		&state.keystore,
3907		&validator_pubkey,
3908		&candidates_hashes,
3909		block_entry.session(),
3910	) {
3911		Some(sig) => sig,
3912		None => {
3913			gum::error!(
3914				target: LOG_TARGET,
3915				validator_index = ?validator_index,
3916				session = ?block_entry.session(),
3917				"Could not issue approval signature. Assignment key present but not validator key?",
3918			);
3919
3920			metrics.on_approval_error();
3921			return Ok(None)
3922		},
3923	};
3924	metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3925
3926	let candidate_entries = candidates_hashes
3927		.iter()
3928		.map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3929		.collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3930
3931	for mut candidate_entry in candidate_entries {
3932		let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3933			candidate_entry.approval_entry_mut(&block_entry.block_hash())
3934		});
3935
3936		match approval_entry {
3937			Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3938				signature: signature.clone(),
3939				signed_candidates_indices: candidates_indices.clone(),
3940			}),
3941			None => {
3942				gum::error!(
3943					target: LOG_TARGET,
3944					candidate_entry = ?candidate_entry,
3945					"Candidate scheduled for signing approval entry should not be None"
3946				);
3947			},
3948		};
3949		candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3950	}
3951
3952	metrics.on_approval_produced();
3953
3954	approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3955		IndirectSignedApprovalVoteV2 {
3956			block_hash: block_entry.block_hash(),
3957			candidate_indices: candidates_indices,
3958			validator: validator_index,
3959			signature,
3960		},
3961	));
3962
3963	gum::trace!(
3964		target: LOG_TARGET,
3965		?block_hash,
3966		signed_candidates = ?block_entry.num_candidates_pending_signature(),
3967		"Issue approval votes",
3968	);
3969	block_entry.issued_approval();
3970	db.write_block_entry(block_entry.into());
3971	Ok(None)
3972}
3973
3974// Sign an approval vote. Fails if the key isn't present in the store.
3975fn sign_approval(
3976	keystore: &LocalKeystore,
3977	public: &ValidatorId,
3978	candidate_hashes: &[CandidateHash],
3979	session_index: SessionIndex,
3980) -> Option<ValidatorSignature> {
3981	let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
3982
3983	let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
3984
3985	Some(key.sign(&payload[..]))
3986}
3987
3988/// Send `IssueLocalStatement` to dispute-coordinator.
3989fn issue_local_invalid_statement<Sender>(
3990	sender: &mut Sender,
3991	session_index: SessionIndex,
3992	candidate_hash: CandidateHash,
3993	candidate: CandidateReceipt,
3994) where
3995	Sender: SubsystemSender<DisputeCoordinatorMessage>,
3996{
3997	// We need to send an unbounded message here to break a cycle:
3998	// DisputeCoordinatorMessage::IssueLocalStatement ->
3999	// ApprovalVotingMessage::GetApprovalSignaturesForCandidate.
4000	//
4001	// Use of unbounded _should_ be fine here as raising a dispute should be an
4002	// exceptional event. Even in case of bugs: There can be no more than
4003	// number of slots per block requests every block. Also for sending this
4004	// message a full recovery and validation procedure took place, which takes
4005	// longer than issuing a local statement + import.
4006	sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
4007		session_index,
4008		candidate_hash,
4009		candidate.clone(),
4010		false,
4011	));
4012}
4013
4014// Computes what is the latest tick we can send an approval
4015fn compute_delayed_approval_sending_tick(
4016	state: &State,
4017	block_entry: &BlockEntry,
4018	candidate_entry: &CandidateEntry,
4019	session_info: &SessionInfo,
4020	metrics: &Metrics,
4021) -> Tick {
4022	let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4023	let assignment_tranche = candidate_entry
4024		.approval_entry(&block_entry.block_hash())
4025		.and_then(|approval_entry| approval_entry.our_assignment())
4026		.map(|our_assignment| our_assignment.tranche())
4027		.unwrap_or_default();
4028
4029	let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4030
4031	let no_show_duration_ticks = slot_number_to_tick(
4032		state.slot_duration_millis,
4033		Slot::from(u64::from(session_info.no_show_slots)),
4034	);
4035	let tick_now = state.clock.tick_now();
4036
4037	let sign_no_later_than = min(
4038		tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4039		// We don't want to accidentally cause no-shows, so if we are past
4040		// the second half of the no show time, force the sending of the
4041		// approval immediately.
4042		assignment_triggered_tick + no_show_duration_ticks / 2,
4043	);
4044
4045	metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4046	sign_no_later_than
4047}