1use futures_timer::Delay;
25use polkadot_node_primitives::{
26 approval::{
27 v1::{BlockApprovalMeta, DelayTranche},
28 v2::{
29 AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30 IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31 },
32 },
33 ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 errors::RecoveryError,
37 messages::{
38 ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39 ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40 AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41 ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42 DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43 RuntimeApiRequest,
44 },
45 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46 SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49 self,
50 database::Database,
51 metrics::{self, prometheus},
52 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53 TimeoutExt,
54};
55use polkadot_primitives::{
56 ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57 CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex,
58 Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair,
59 ValidatorSignature,
60};
61use sc_keystore::LocalKeystore;
62use sp_application_crypto::Pair;
63use sp_consensus::SyncOracle;
64use sp_consensus_slots::Slot;
65use std::time::Instant;
66
67const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
71
72use futures::{
73 channel::oneshot,
74 future::{BoxFuture, RemoteHandle},
75 prelude::*,
76 stream::FuturesUnordered,
77 StreamExt,
78};
79
80use std::{
81 cmp::min,
82 collections::{
83 btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
84 },
85 sync::Arc,
86 time::Duration,
87};
88
89use schnellru::{ByLength, LruMap};
90
91use approval_checking::RequiredTranches;
92use bitvec::{order::Lsb0, vec::BitVec};
93pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
94use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
95use polkadot_node_primitives::approval::time::{
96 slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
97};
98
99mod approval_checking;
100pub mod approval_db;
101mod backend;
102pub mod criteria;
103mod import;
104mod ops;
105mod persisted_entries;
106
107use crate::{
108 approval_checking::{Check, TranchesToApproveResult},
109 approval_db::common::{Config as DatabaseConfig, DbBackend},
110 backend::{Backend, OverlayedBackend},
111 criteria::InvalidAssignmentReason,
112 persisted_entries::OurApproval,
113};
114
115#[cfg(test)]
116mod tests;
117
118const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
119const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
124const APPROVAL_CACHE_SIZE: u32 = 1024;
125
126const MAX_APPROVAL_RETRIES: u32 = 16;
128
129const APPROVAL_DELAY: Tick = 2;
130pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
131
132const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
134
135const RESTART_WAKEUP_DELAY: Tick = 12;
144
145#[derive(Debug, Clone)]
147pub struct Config {
148 pub col_approval_data: u32,
150 pub slot_duration_millis: u64,
153}
154
155enum Mode {
165 Active,
166 Syncing(Box<dyn SyncOracle + Send>),
167}
168
169pub struct ApprovalVotingSubsystem {
171 keystore: Arc<LocalKeystore>,
175 db_config: DatabaseConfig,
176 slot_duration_millis: u64,
177 db: Arc<dyn Database>,
178 mode: Mode,
179 metrics: Metrics,
180 clock: Arc<dyn Clock + Send + Sync>,
181 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
182 max_approval_retries: u32,
184 retry_backoff: Duration,
186}
187
188#[derive(Clone)]
189struct MetricsInner {
190 imported_candidates_total: prometheus::Counter<prometheus::U64>,
191 assignments_produced: prometheus::Histogram,
192 approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
193 no_shows_total: prometheus::Counter<prometheus::U64>,
194 observed_no_shows: prometheus::Counter<prometheus::U64>,
198 approved_by_one_third: prometheus::Counter<prometheus::U64>,
199 wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
200 coalesced_approvals_buckets: prometheus::Histogram,
201 coalesced_approvals_delay: prometheus::Histogram,
202 candidate_approval_time_ticks: prometheus::Histogram,
203 block_approval_time_ticks: prometheus::Histogram,
204 time_db_transaction: prometheus::Histogram,
205 time_recover_and_approve: prometheus::Histogram,
206 candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
207 unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
208 assignments_gathering_time_by_stage: prometheus::HistogramVec,
216}
217
218#[derive(Default, Clone)]
220pub struct Metrics(Option<MetricsInner>);
221
222impl Metrics {
223 fn on_candidate_imported(&self) {
224 if let Some(metrics) = &self.0 {
225 metrics.imported_candidates_total.inc();
226 }
227 }
228
229 fn on_assignment_produced(&self, tranche: DelayTranche) {
230 if let Some(metrics) = &self.0 {
231 metrics.assignments_produced.observe(tranche as f64);
232 }
233 }
234
235 fn on_approval_coalesce(&self, num_coalesced: u32) {
236 if let Some(metrics) = &self.0 {
237 for _ in 0..num_coalesced {
240 metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
241 }
242 }
243 }
244
245 fn on_delayed_approval(&self, delayed_ticks: u64) {
246 if let Some(metrics) = &self.0 {
247 metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
248 }
249 }
250
251 fn on_approval_stale(&self) {
252 if let Some(metrics) = &self.0 {
253 metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
254 }
255 }
256
257 fn on_approval_invalid(&self) {
258 if let Some(metrics) = &self.0 {
259 metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
260 }
261 }
262
263 fn on_approval_unavailable(&self) {
264 if let Some(metrics) = &self.0 {
265 metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
266 }
267 }
268
269 fn on_approval_error(&self) {
270 if let Some(metrics) = &self.0 {
271 metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
272 }
273 }
274
275 fn on_approval_produced(&self) {
276 if let Some(metrics) = &self.0 {
277 metrics.approvals_produced_total.with_label_values(&["success"]).inc()
278 }
279 }
280
281 fn on_no_shows(&self, n: usize) {
282 if let Some(metrics) = &self.0 {
283 metrics.no_shows_total.inc_by(n as u64);
284 }
285 }
286
287 fn on_observed_no_shows(&self, n: usize) {
288 if let Some(metrics) = &self.0 {
289 metrics.observed_no_shows.inc_by(n as u64);
290 }
291 }
292
293 fn on_approved_by_one_third(&self) {
294 if let Some(metrics) = &self.0 {
295 metrics.approved_by_one_third.inc();
296 }
297 }
298
299 fn on_wakeup(&self) {
300 if let Some(metrics) = &self.0 {
301 metrics.wakeups_triggered_total.inc();
302 }
303 }
304
305 fn on_candidate_approved(&self, ticks: Tick) {
306 if let Some(metrics) = &self.0 {
307 metrics.candidate_approval_time_ticks.observe(ticks as f64);
308 }
309 }
310
311 fn on_block_approved(&self, ticks: Tick) {
312 if let Some(metrics) = &self.0 {
313 metrics.block_approval_time_ticks.observe(ticks as f64);
314 }
315 }
316
317 fn on_candidate_signatures_request(&self) {
318 if let Some(metrics) = &self.0 {
319 metrics.candidate_signatures_requests_total.inc();
320 }
321 }
322
323 fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
324 self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
325 }
326
327 fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
328 self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
329 }
330
331 fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
332 if let Some(metrics) = &self.0 {
333 metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
334 }
335 }
336
337 pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
338 if let Some(metrics) = &self.0 {
339 let stage_string = stage.to_string();
340 metrics
345 .assignments_gathering_time_by_stage
346 .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
347 .observe(elapsed_as_millis as f64);
348 }
349 }
350}
351
352impl metrics::Metrics for Metrics {
353 fn try_register(
354 registry: &prometheus::Registry,
355 ) -> std::result::Result<Self, prometheus::PrometheusError> {
356 let metrics = MetricsInner {
357 imported_candidates_total: prometheus::register(
358 prometheus::Counter::new(
359 "polkadot_parachain_imported_candidates_total",
360 "Number of candidates imported by the approval voting subsystem",
361 )?,
362 registry,
363 )?,
364 assignments_produced: prometheus::register(
365 prometheus::Histogram::with_opts(
366 prometheus::HistogramOpts::new(
367 "polkadot_parachain_assignments_produced",
368 "Assignments and tranches produced by the approval voting subsystem",
369 ).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
370 )?,
371 registry,
372 )?,
373 approvals_produced_total: prometheus::register(
374 prometheus::CounterVec::new(
375 prometheus::Opts::new(
376 "polkadot_parachain_approvals_produced_total",
377 "Number of approvals produced by the approval voting subsystem",
378 ),
379 &["status"]
380 )?,
381 registry,
382 )?,
383 no_shows_total: prometheus::register(
384 prometheus::Counter::new(
385 "polkadot_parachain_approvals_no_shows_total",
386 "Number of assignments which became no-shows in the approval voting subsystem",
387 )?,
388 registry,
389 )?,
390 observed_no_shows: prometheus::register(
391 prometheus::Counter::new(
392 "polkadot_parachain_approvals_observed_no_shows_total",
393 "Number of observed no shows at any moment in time",
394 )?,
395 registry,
396 )?,
397 wakeups_triggered_total: prometheus::register(
398 prometheus::Counter::new(
399 "polkadot_parachain_approvals_wakeups_total",
400 "Number of times we woke up to process a candidate in the approval voting subsystem",
401 )?,
402 registry,
403 )?,
404 candidate_approval_time_ticks: prometheus::register(
405 prometheus::Histogram::with_opts(
406 prometheus::HistogramOpts::new(
407 "polkadot_parachain_approvals_candidate_approval_time_ticks",
408 "Number of ticks (500ms) to approve candidates.",
409 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
410 )?,
411 registry,
412 )?,
413 coalesced_approvals_buckets: prometheus::register(
414 prometheus::Histogram::with_opts(
415 prometheus::HistogramOpts::new(
416 "polkadot_parachain_approvals_coalesced_approvals_buckets",
417 "Number of coalesced approvals.",
418 ).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
419 )?,
420 registry,
421 )?,
422 coalesced_approvals_delay: prometheus::register(
423 prometheus::Histogram::with_opts(
424 prometheus::HistogramOpts::new(
425 "polkadot_parachain_approvals_coalescing_delay",
426 "Number of ticks we delay the sending of a candidate approval",
427 ).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
428 )?,
429 registry,
430 )?,
431 approved_by_one_third: prometheus::register(
432 prometheus::Counter::new(
433 "polkadot_parachain_approved_by_one_third",
434 "Number of candidates where more than one third had to vote ",
435 )?,
436 registry,
437 )?,
438 block_approval_time_ticks: prometheus::register(
439 prometheus::Histogram::with_opts(
440 prometheus::HistogramOpts::new(
441 "polkadot_parachain_approvals_blockapproval_time_ticks",
442 "Number of ticks (500ms) to approve blocks.",
443 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
444 )?,
445 registry,
446 )?,
447 time_db_transaction: prometheus::register(
448 prometheus::Histogram::with_opts(
449 prometheus::HistogramOpts::new(
450 "polkadot_parachain_time_approval_db_transaction",
451 "Time spent writing an approval db transaction.",
452 )
453 )?,
454 registry,
455 )?,
456 time_recover_and_approve: prometheus::register(
457 prometheus::Histogram::with_opts(
458 prometheus::HistogramOpts::new(
459 "polkadot_parachain_time_recover_and_approve",
460 "Time spent recovering and approving data in approval voting",
461 )
462 )?,
463 registry,
464 )?,
465 candidate_signatures_requests_total: prometheus::register(
466 prometheus::Counter::new(
467 "polkadot_parachain_approval_candidate_signatures_requests_total",
468 "Number of times signatures got requested by other subsystems",
469 )?,
470 registry,
471 )?,
472 unapproved_candidates_in_unfinalized_chain: prometheus::register(
473 prometheus::Gauge::new(
474 "polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
475 "Number of unapproved candidates in unfinalized chain",
476 )?,
477 registry,
478 )?,
479 assignments_gathering_time_by_stage: prometheus::register(
480 prometheus::HistogramVec::new(
481 prometheus::HistogramOpts::new(
482 "polkadot_parachain_assignments_gather_time_by_stage_ms",
483 "The time in ms it takes for each stage to gather enough assignments needed for approval",
484 )
485 .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
486 &["stage"],
487 )?,
488 registry,
489 )?,
490 };
491
492 Ok(Metrics(Some(metrics)))
493 }
494}
495
496impl ApprovalVotingSubsystem {
497 pub fn with_config(
499 config: Config,
500 db: Arc<dyn Database>,
501 keystore: Arc<LocalKeystore>,
502 sync_oracle: Box<dyn SyncOracle + Send>,
503 metrics: Metrics,
504 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
505 ) -> Self {
506 ApprovalVotingSubsystem::with_config_and_clock(
507 config,
508 db,
509 keystore,
510 sync_oracle,
511 metrics,
512 Arc::new(SystemClock {}),
513 spawner,
514 MAX_APPROVAL_RETRIES,
515 APPROVAL_CHECKING_TIMEOUT / 2,
516 )
517 }
518
519 pub fn with_config_and_clock(
521 config: Config,
522 db: Arc<dyn Database>,
523 keystore: Arc<LocalKeystore>,
524 sync_oracle: Box<dyn SyncOracle + Send>,
525 metrics: Metrics,
526 clock: Arc<dyn Clock + Send + Sync>,
527 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
528 max_approval_retries: u32,
529 retry_backoff: Duration,
530 ) -> Self {
531 ApprovalVotingSubsystem {
532 keystore,
533 slot_duration_millis: config.slot_duration_millis,
534 db,
535 db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
536 mode: Mode::Syncing(sync_oracle),
537 metrics,
538 clock,
539 spawner,
540 max_approval_retries,
541 retry_backoff,
542 }
543 }
544
545 pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
548 let config =
549 approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
550 let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
551 let mut overlay = OverlayedBackend::new(&backend);
552
553 ops::revert_to(&mut overlay, hash)?;
554
555 let ops = overlay.into_write_ops();
556 backend.write(ops)
557 }
558}
559
560fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
563 let backend = DbBackend::new(db, config);
564 let all_blocks = backend.load_all_blocks()?;
565
566 if all_blocks.is_empty() {
567 gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
568 } else {
569 gum::debug!(
570 target: LOG_TARGET,
571 "Starting with {} blocks in approval vote DB.",
572 all_blocks.len()
573 );
574 }
575
576 Ok(())
577}
578
579#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
580impl<Context: Send> ApprovalVotingSubsystem {
581 fn start(self, mut ctx: Context) -> SpawnedSubsystem {
582 let backend = DbBackend::new(self.db.clone(), self.db_config);
583 let to_other_subsystems = ctx.sender().clone();
584 let to_approval_distr = ctx.sender().clone();
585
586 let future = run::<DbBackend, _, _, _>(
587 ctx,
588 to_other_subsystems,
589 to_approval_distr,
590 self,
591 Box::new(RealAssignmentCriteria),
592 backend,
593 )
594 .map_err(|e| SubsystemError::with_origin("approval-voting", e))
595 .boxed();
596
597 SpawnedSubsystem { name: "approval-voting-subsystem", future }
598 }
599}
600
601#[derive(Debug, Clone)]
602struct ApprovalVoteRequest {
603 validator_index: ValidatorIndex,
604 block_hash: Hash,
605}
606
607#[derive(Default)]
608struct Wakeups {
609 wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
611 reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
612 block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
613}
614
615impl Wakeups {
616 fn first(&self) -> Option<Tick> {
618 self.wakeups.keys().next().map(|t| *t)
619 }
620
621 fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
622 self.block_numbers.entry(block_number).or_default().insert(block_hash);
623 }
624
625 fn schedule(
628 &mut self,
629 block_hash: Hash,
630 block_number: BlockNumber,
631 candidate_hash: CandidateHash,
632 tick: Tick,
633 ) {
634 if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
635 if prev <= &tick {
636 return
637 }
638
639 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
641 if let Some(pos) =
642 entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
643 {
644 entry.get_mut().remove(pos);
645 }
646
647 if entry.get().is_empty() {
648 let _ = entry.remove_entry();
649 }
650 }
651 } else {
652 self.note_block(block_hash, block_number);
653 }
654
655 self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
656 self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
657 }
658
659 fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
660 let after = self.block_numbers.split_off(&(finalized_number + 1));
661 let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
662 .into_iter()
663 .flat_map(|(_number, hashes)| hashes)
664 .collect();
665
666 let mut pruned_wakeups = BTreeMap::new();
667 self.reverse_wakeups.retain(|(h, c_h), tick| {
668 let live = !pruned_blocks.contains(h);
669 if !live {
670 pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
671 }
672 live
673 });
674
675 for (tick, pruned) in pruned_wakeups {
676 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
677 entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
678 if entry.get().is_empty() {
679 let _ = entry.remove();
680 }
681 }
682 }
683 }
684
685 fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
687 self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
688 }
689
690 async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
692 match self.first() {
693 None => future::pending().await,
694 Some(tick) => {
695 clock.wait(tick).await;
696 match self.wakeups.entry(tick) {
697 BTMEntry::Vacant(_) => {
698 panic!("entry is known to exist since `first` was `Some`; qed")
699 },
700 BTMEntry::Occupied(mut entry) => {
701 let (hash, candidate_hash) = entry.get_mut().pop()
702 .expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
703
704 if entry.get().is_empty() {
705 let _ = entry.remove();
706 }
707
708 self.reverse_wakeups.remove(&(hash, candidate_hash));
709
710 (tick, hash, candidate_hash)
711 },
712 }
713 },
714 }
715 }
716}
717
718struct ApprovalStatus {
719 required_tranches: RequiredTranches,
720 tranche_now: DelayTranche,
721 block_tick: Tick,
722 last_no_shows: usize,
723 no_show_validators: Vec<ValidatorIndex>,
724}
725
726#[derive(Copy, Clone)]
727enum ApprovalOutcome {
728 Approved,
729 Failed,
730 TimedOut,
731}
732
733#[derive(Clone)]
734struct RetryApprovalInfo {
735 candidate: CandidateReceipt,
736 backing_group: GroupIndex,
737 executor_params: ExecutorParams,
738 core_index: Option<CoreIndex>,
739 session_index: SessionIndex,
740 attempts_remaining: u32,
741 backoff: Duration,
742}
743
744struct ApprovalState {
745 validator_index: ValidatorIndex,
746 candidate_hash: CandidateHash,
747 approval_outcome: ApprovalOutcome,
748 retry_info: Option<RetryApprovalInfo>,
749}
750
751impl ApprovalState {
752 fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
753 Self {
754 validator_index,
755 candidate_hash,
756 approval_outcome: ApprovalOutcome::Approved,
757 retry_info: None,
758 }
759 }
760 fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
761 Self {
762 validator_index,
763 candidate_hash,
764 approval_outcome: ApprovalOutcome::Failed,
765 retry_info: None,
766 }
767 }
768
769 fn failed_with_retry(
770 validator_index: ValidatorIndex,
771 candidate_hash: CandidateHash,
772 retry_info: Option<RetryApprovalInfo>,
773 ) -> Self {
774 Self {
775 validator_index,
776 candidate_hash,
777 approval_outcome: ApprovalOutcome::Failed,
778 retry_info,
779 }
780 }
781}
782
783struct CurrentlyCheckingSet {
784 candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
785 currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
786}
787
788impl Default for CurrentlyCheckingSet {
789 fn default() -> Self {
790 Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
791 }
792}
793
794impl CurrentlyCheckingSet {
795 pub async fn insert_relay_block_hash(
798 &mut self,
799 candidate_hash: CandidateHash,
800 validator_index: ValidatorIndex,
801 relay_block: Hash,
802 launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
803 ) -> SubsystemResult<()> {
804 match self.candidate_hash_map.entry(candidate_hash) {
805 HMEntry::Occupied(mut entry) => {
806 entry.get_mut().insert(relay_block);
808 },
809 HMEntry::Vacant(entry) => {
810 entry.insert(HashSet::new()).insert(relay_block);
812 let work = launch_work.await?;
813 self.currently_checking.push(Box::pin(async move {
814 match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
815 None => ApprovalState {
816 candidate_hash,
817 validator_index,
818 approval_outcome: ApprovalOutcome::TimedOut,
819 retry_info: None,
820 },
821 Some(approval_state) => approval_state,
822 }
823 }));
824 },
825 }
826
827 Ok(())
828 }
829
830 pub async fn next(
831 &mut self,
832 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
833 ) -> (HashSet<Hash>, ApprovalState) {
834 if !self.currently_checking.is_empty() {
835 if let Some(approval_state) = self.currently_checking.next().await {
836 let out = self
837 .candidate_hash_map
838 .remove(&approval_state.candidate_hash)
839 .unwrap_or_default();
840 approvals_cache
841 .insert(approval_state.candidate_hash, approval_state.approval_outcome);
842 return (out, approval_state)
843 }
844 }
845
846 future::pending().await
847 }
848}
849
850async fn get_extended_session_info<'a, Sender>(
851 runtime_info: &'a mut RuntimeInfo,
852 sender: &mut Sender,
853 relay_parent: Hash,
854) -> Option<&'a ExtendedSessionInfo>
855where
856 Sender: SubsystemSender<RuntimeApiMessage>,
857{
858 match runtime_info.get_session_info(sender, relay_parent).await {
859 Ok(extended_info) => Some(&extended_info),
860 Err(_) => {
861 gum::debug!(
862 target: LOG_TARGET,
863 ?relay_parent,
864 "Can't obtain SessionInfo or ExecutorParams"
865 );
866 None
867 },
868 }
869}
870
871async fn get_extended_session_info_by_index<'a, Sender>(
872 runtime_info: &'a mut RuntimeInfo,
873 sender: &mut Sender,
874 relay_parent: Hash,
875 session_index: SessionIndex,
876) -> Option<&'a ExtendedSessionInfo>
877where
878 Sender: SubsystemSender<RuntimeApiMessage>,
879{
880 match runtime_info
881 .get_session_info_by_index(sender, relay_parent, session_index)
882 .await
883 {
884 Ok(extended_info) => Some(&extended_info),
885 Err(_) => {
886 gum::debug!(
887 target: LOG_TARGET,
888 session = session_index,
889 ?relay_parent,
890 "Can't obtain SessionInfo or ExecutorParams"
891 );
892 None
893 },
894 }
895}
896
897async fn get_session_info_by_index<'a, Sender>(
898 runtime_info: &'a mut RuntimeInfo,
899 sender: &mut Sender,
900 relay_parent: Hash,
901 session_index: SessionIndex,
902) -> Option<&'a SessionInfo>
903where
904 Sender: SubsystemSender<RuntimeApiMessage>,
905{
906 get_extended_session_info_by_index(runtime_info, sender, relay_parent, session_index)
907 .await
908 .map(|extended_info| &extended_info.session_info)
909}
910
911struct State {
912 keystore: Arc<LocalKeystore>,
913 slot_duration_millis: u64,
914 clock: Arc<dyn Clock + Send + Sync>,
915 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
916 per_block_assignments_gathering_times:
920 LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
921 no_show_stats: NoShowStats,
922}
923
924const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
926pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
928
929#[derive(Debug, Clone, PartialEq, Eq, Default)]
934struct NoShowStats {
935 per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
936 per_parachain_no_show: HashMap<u32, usize>,
937 last_dumped_block_number: BlockNumber,
938}
939
940impl NoShowStats {
941 fn maybe_print(&mut self, current_block_number: BlockNumber) {
944 if self.last_dumped_block_number > current_block_number ||
945 current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
946 {
947 return
948 }
949 if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
950 return
951 }
952
953 gum::debug!(
954 target: LOG_TARGET,
955 "Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
956 self.per_validator_no_show,
957 self.per_parachain_no_show,
958 self.last_dumped_block_number
959 );
960
961 self.last_dumped_block_number = current_block_number;
962
963 self.per_validator_no_show.clear();
964 self.per_parachain_no_show.clear();
965 }
966}
967
968#[derive(Debug, Clone, PartialEq, Eq)]
969struct AssignmentGatheringRecord {
970 stage: usize,
975 stage_start: Option<Instant>,
977}
978
979impl Default for AssignmentGatheringRecord {
980 fn default() -> Self {
981 AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
982 }
983}
984
985#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
986impl State {
987 async fn approval_status<Sender, 'a, 'b>(
991 &'a self,
992 sender: &mut Sender,
993 session_info_provider: &'a mut RuntimeInfo,
994 block_entry: &'a BlockEntry,
995 candidate_entry: &'b CandidateEntry,
996 ) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
997 where
998 Sender: SubsystemSender<RuntimeApiMessage>,
999 {
1000 let session_info = match get_session_info_by_index(
1001 session_info_provider,
1002 sender,
1003 block_entry.parent_hash(),
1004 block_entry.session(),
1005 )
1006 .await
1007 {
1008 Some(s) => s,
1009 None => return None,
1010 };
1011 let block_hash = block_entry.block_hash();
1012
1013 let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
1014 let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
1015 let no_show_duration = slot_number_to_tick(
1016 self.slot_duration_millis,
1017 Slot::from(u64::from(session_info.no_show_slots)),
1018 );
1019
1020 if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
1021 let TranchesToApproveResult {
1022 required_tranches,
1023 total_observed_no_shows,
1024 no_show_validators,
1025 } = approval_checking::tranches_to_approve(
1026 approval_entry,
1027 candidate_entry.approvals(),
1028 tranche_now,
1029 block_tick,
1030 no_show_duration,
1031 session_info.needed_approvals as _,
1032 );
1033
1034 let status = ApprovalStatus {
1035 required_tranches,
1036 block_tick,
1037 tranche_now,
1038 last_no_shows: total_observed_no_shows,
1039 no_show_validators,
1040 };
1041
1042 Some((approval_entry, status))
1043 } else {
1044 None
1045 }
1046 }
1047
1048 async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1050 &self,
1051 sender: &mut Sender,
1052 session_index: SessionIndex,
1053 block_hash: Hash,
1054 ) -> Option<ApprovalVotingParams> {
1055 let (s_tx, s_rx) = oneshot::channel();
1056
1057 sender
1058 .send_message(RuntimeApiMessage::Request(
1059 block_hash,
1060 RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1061 ))
1062 .await;
1063
1064 match s_rx.await {
1065 Ok(Ok(params)) => {
1066 gum::trace!(
1067 target: LOG_TARGET,
1068 approval_voting_params = ?params,
1069 session = ?session_index,
1070 "Using the following subsystem params"
1071 );
1072 Some(params)
1073 },
1074 Ok(Err(err)) => {
1075 gum::debug!(
1076 target: LOG_TARGET,
1077 ?err,
1078 "Could not request approval voting params from runtime"
1079 );
1080 None
1081 },
1082 Err(err) => {
1083 gum::debug!(
1084 target: LOG_TARGET,
1085 ?err,
1086 "Could not request approval voting params from runtime"
1087 );
1088 None
1089 },
1090 }
1091 }
1092
1093 fn mark_begining_of_gathering_assignments(
1094 &mut self,
1095 block_number: BlockNumber,
1096 block_hash: Hash,
1097 candidate: CandidateHash,
1098 ) {
1099 if let Some(record) = self
1100 .per_block_assignments_gathering_times
1101 .get_or_insert(block_number, HashMap::new)
1102 .and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1103 {
1104 if record.stage_start.is_none() {
1105 record.stage += 1;
1106 gum::debug!(
1107 target: LOG_TARGET,
1108 stage = ?record.stage,
1109 ?block_hash,
1110 ?candidate,
1111 "Started a new assignment gathering stage",
1112 );
1113 record.stage_start = Some(Instant::now());
1114 }
1115 }
1116 }
1117
1118 fn mark_gathered_enough_assignments(
1119 &mut self,
1120 block_number: BlockNumber,
1121 block_hash: Hash,
1122 candidate: CandidateHash,
1123 ) -> AssignmentGatheringRecord {
1124 let record = self
1125 .per_block_assignments_gathering_times
1126 .get(&block_number)
1127 .and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1128 let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1129 AssignmentGatheringRecord {
1130 stage,
1131 stage_start: record.and_then(|record| record.stage_start.take()),
1132 }
1133 }
1134
1135 fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1136 while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1137 {
1138 if *block_number < remove_lower_than {
1139 self.per_block_assignments_gathering_times.pop_oldest();
1140 } else {
1141 break
1142 }
1143 }
1144 }
1145
1146 fn observe_assignment_gathering_status(
1147 &mut self,
1148 metrics: &Metrics,
1149 required_tranches: &RequiredTranches,
1150 block_hash: Hash,
1151 block_number: BlockNumber,
1152 candidate_hash: CandidateHash,
1153 ) {
1154 match required_tranches {
1155 RequiredTranches::All | RequiredTranches::Pending { .. } => {
1156 self.mark_begining_of_gathering_assignments(
1157 block_number,
1158 block_hash,
1159 candidate_hash,
1160 );
1161 },
1162 RequiredTranches::Exact { .. } => {
1163 let time_to_gather =
1164 self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1165 if let Some(gathering_started) = time_to_gather.stage_start {
1166 if gathering_started.elapsed().as_millis() > 6000 {
1167 gum::trace!(
1168 target: LOG_TARGET,
1169 ?block_hash,
1170 ?candidate_hash,
1171 "Long assignment gathering time",
1172 );
1173 }
1174 metrics.observe_assignment_gathering_time(
1175 time_to_gather.stage,
1176 gathering_started.elapsed().as_millis() as usize,
1177 )
1178 }
1179 },
1180 }
1181 }
1182
1183 fn record_no_shows(
1184 &mut self,
1185 session_index: SessionIndex,
1186 para_id: u32,
1187 no_show_validators: &Vec<ValidatorIndex>,
1188 ) {
1189 if !no_show_validators.is_empty() {
1190 *self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1191 }
1192 for validator_index in no_show_validators {
1193 *self
1194 .no_show_stats
1195 .per_validator_no_show
1196 .entry(session_index)
1197 .or_default()
1198 .entry(*validator_index)
1199 .or_default() += 1;
1200 }
1201 }
1202}
1203
1204#[derive(Debug, Clone)]
1205enum Action {
1206 ScheduleWakeup {
1207 block_hash: Hash,
1208 block_number: BlockNumber,
1209 candidate_hash: CandidateHash,
1210 tick: Tick,
1211 },
1212 LaunchApproval {
1213 claimed_candidate_indices: CandidateBitfield,
1214 candidate_hash: CandidateHash,
1215 indirect_cert: IndirectAssignmentCertV2,
1216 assignment_tranche: DelayTranche,
1217 relay_block_hash: Hash,
1218 session: SessionIndex,
1219 executor_params: ExecutorParams,
1220 candidate: CandidateReceipt,
1221 backing_group: GroupIndex,
1222 distribute_assignment: bool,
1223 core_index: Option<CoreIndex>,
1224 },
1225 NoteApprovedInChainSelection(Hash),
1226 IssueApproval(CandidateHash, ApprovalVoteRequest),
1227 BecomeActive,
1228 Conclude,
1229}
1230
1231#[async_trait::async_trait]
1233pub trait ApprovalVotingWorkProvider {
1234 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1235}
1236
1237#[async_trait::async_trait]
1238#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1239impl<Context> ApprovalVotingWorkProvider for Context {
1240 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1241 self.recv().await
1242 }
1243}
1244
1245#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1246async fn run<
1247 B,
1248 WorkProvider: ApprovalVotingWorkProvider,
1249 Sender: SubsystemSender<ChainApiMessage>
1250 + SubsystemSender<RuntimeApiMessage>
1251 + SubsystemSender<ChainSelectionMessage>
1252 + SubsystemSender<AvailabilityRecoveryMessage>
1253 + SubsystemSender<DisputeCoordinatorMessage>
1254 + SubsystemSender<CandidateValidationMessage>
1255 + Clone,
1256 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1257>(
1258 mut work_provider: WorkProvider,
1259 mut to_other_subsystems: Sender,
1260 mut to_approval_distr: ADSender,
1261 mut subsystem: ApprovalVotingSubsystem,
1262 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1263 mut backend: B,
1264) -> SubsystemResult<()>
1265where
1266 B: Backend,
1267{
1268 if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1269 gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1270 }
1271
1272 let mut state = State {
1273 keystore: subsystem.keystore,
1274 slot_duration_millis: subsystem.slot_duration_millis,
1275 clock: subsystem.clock,
1276 assignment_criteria,
1277 per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1278 MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1279 )),
1280 no_show_stats: NoShowStats::default(),
1281 };
1282
1283 let mut last_finalized_height: Option<BlockNumber> = {
1284 let (tx, rx) = oneshot::channel();
1285 to_other_subsystems
1286 .send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1287 .await;
1288 match rx.await? {
1289 Ok(number) => Some(number),
1290 Err(err) => {
1291 gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1292 None
1293 },
1294 }
1295 };
1296
1297 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1299 keystore: None,
1300 session_cache_lru_size: DISPUTE_WINDOW.get(),
1301 });
1302
1303 let mut wakeups = Wakeups::default();
1304 let mut currently_checking_set = CurrentlyCheckingSet::default();
1305 let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1306 let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1307
1308 loop {
1309 let mut overlayed_db = OverlayedBackend::new(&backend);
1310 let actions = futures::select! {
1311 (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1312 subsystem.metrics.on_wakeup();
1313 process_wakeup(
1314 &mut to_other_subsystems,
1315 &mut state,
1316 &mut overlayed_db,
1317 &mut session_info_provider,
1318 woken_block,
1319 woken_candidate,
1320 &subsystem.metrics,
1321 &wakeups,
1322 ).await?
1323 }
1324 next_msg = work_provider.recv().fuse() => {
1325 let mut actions = handle_from_overseer(
1326 &mut to_other_subsystems,
1327 &mut to_approval_distr,
1328 &subsystem.spawner,
1329 &mut state,
1330 &mut overlayed_db,
1331 &mut session_info_provider,
1332 &subsystem.metrics,
1333 next_msg?,
1334 &mut last_finalized_height,
1335 &mut wakeups,
1336 ).await?;
1337
1338 if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1339 if !oracle.is_major_syncing() {
1340 actions.insert(0, Action::BecomeActive)
1342 }
1343 }
1344
1345 actions
1346 }
1347 approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1348 let mut actions = Vec::new();
1349 let (
1350 relay_block_hashes,
1351 ApprovalState {
1352 validator_index,
1353 candidate_hash,
1354 approval_outcome,
1355 retry_info,
1356 }
1357 ) = approval_state;
1358
1359 if matches!(approval_outcome, ApprovalOutcome::Approved) {
1360 let mut approvals: Vec<Action> = relay_block_hashes
1361 .iter()
1362 .map(|block_hash|
1363 Action::IssueApproval(
1364 candidate_hash,
1365 ApprovalVoteRequest {
1366 validator_index,
1367 block_hash: *block_hash,
1368 },
1369 )
1370 )
1371 .collect();
1372 actions.append(&mut approvals);
1373 }
1374
1375 if let Some(retry_info) = retry_info {
1376 for block_hash in relay_block_hashes {
1377 if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1378 let sender = to_other_subsystems.clone();
1379 let spawn_handle = subsystem.spawner.clone();
1380 let metrics = subsystem.metrics.clone();
1381 let retry_info = retry_info.clone();
1382 let executor_params = retry_info.executor_params.clone();
1383 let candidate = retry_info.candidate.clone();
1384
1385 currently_checking_set
1386 .insert_relay_block_hash(
1387 candidate_hash,
1388 validator_index,
1389 block_hash,
1390 async move {
1391 launch_approval(
1392 sender,
1393 spawn_handle,
1394 metrics,
1395 retry_info.session_index,
1396 candidate,
1397 validator_index,
1398 block_hash,
1399 retry_info.backing_group,
1400 executor_params,
1401 retry_info.core_index,
1402 retry_info,
1403 )
1404 .await
1405 },
1406 )
1407 .await?;
1408 }
1409 }
1410 }
1411
1412 actions
1413 },
1414 (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1415 gum::debug!(
1416 target: LOG_TARGET,
1417 ?block_hash,
1418 ?validator_index,
1419 "Sign approval for multiple candidates",
1420 );
1421
1422 match maybe_create_signature(
1423 &mut overlayed_db,
1424 &mut session_info_provider,
1425 &state,
1426 &mut to_other_subsystems,
1427 &mut to_approval_distr,
1428 block_hash,
1429 validator_index,
1430 &subsystem.metrics,
1431 ).await {
1432 Ok(Some(next_wakeup)) => {
1433 delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1434 },
1435 Ok(None) => {}
1436 Err(err) => {
1437 gum::error!(
1438 target: LOG_TARGET,
1439 ?err,
1440 "Failed to create signature",
1441 );
1442 }
1443 }
1444 vec![]
1445 }
1446 };
1447
1448 if handle_actions(
1449 &mut to_other_subsystems,
1450 &mut to_approval_distr,
1451 &subsystem.spawner,
1452 &mut state,
1453 &mut overlayed_db,
1454 &mut session_info_provider,
1455 &subsystem.metrics,
1456 &mut wakeups,
1457 &mut currently_checking_set,
1458 &mut delayed_approvals_timers,
1459 &mut approvals_cache,
1460 &mut subsystem.mode,
1461 actions,
1462 subsystem.max_approval_retries,
1463 subsystem.retry_backoff,
1464 )
1465 .await?
1466 {
1467 break
1468 }
1469
1470 if !overlayed_db.is_empty() {
1471 let _timer = subsystem.metrics.time_db_transaction();
1472 let ops = overlayed_db.into_write_ops();
1473 backend.write(ops)?;
1474 }
1475 }
1476
1477 Ok(())
1478}
1479
1480pub async fn start_approval_worker<
1482 WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1483 Sender: SubsystemSender<ChainApiMessage>
1484 + SubsystemSender<RuntimeApiMessage>
1485 + SubsystemSender<ChainSelectionMessage>
1486 + SubsystemSender<AvailabilityRecoveryMessage>
1487 + SubsystemSender<DisputeCoordinatorMessage>
1488 + SubsystemSender<CandidateValidationMessage>
1489 + Clone,
1490 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1491>(
1492 work_provider: WorkProvider,
1493 to_other_subsystems: Sender,
1494 to_approval_distr: ADSender,
1495 config: Config,
1496 db: Arc<dyn Database>,
1497 keystore: Arc<LocalKeystore>,
1498 sync_oracle: Box<dyn SyncOracle + Send>,
1499 metrics: Metrics,
1500 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1501 task_name: &'static str,
1502 group_name: &'static str,
1503 clock: Arc<dyn Clock + Send + Sync>,
1504) -> SubsystemResult<()> {
1505 let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1506 config,
1507 db.clone(),
1508 keystore,
1509 sync_oracle,
1510 metrics,
1511 clock,
1512 spawner,
1513 MAX_APPROVAL_RETRIES,
1514 APPROVAL_CHECKING_TIMEOUT / 2,
1515 );
1516 let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1517 let spawner = approval_voting.spawner.clone();
1518 spawner.spawn_blocking(
1519 task_name,
1520 Some(group_name),
1521 Box::pin(async move {
1522 if let Err(err) = run(
1523 work_provider,
1524 to_other_subsystems,
1525 to_approval_distr,
1526 approval_voting,
1527 Box::new(RealAssignmentCriteria),
1528 backend,
1529 )
1530 .await
1531 {
1532 gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1533 };
1534 }),
1535 );
1536 Ok(())
1537}
1538
1539#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1559async fn handle_actions<
1560 Sender: SubsystemSender<ChainApiMessage>
1561 + SubsystemSender<RuntimeApiMessage>
1562 + SubsystemSender<ChainSelectionMessage>
1563 + SubsystemSender<AvailabilityRecoveryMessage>
1564 + SubsystemSender<DisputeCoordinatorMessage>
1565 + SubsystemSender<CandidateValidationMessage>
1566 + Clone,
1567 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1568>(
1569 sender: &mut Sender,
1570 approval_voting_sender: &mut ADSender,
1571 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1572 state: &mut State,
1573 overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1574 session_info_provider: &mut RuntimeInfo,
1575 metrics: &Metrics,
1576 wakeups: &mut Wakeups,
1577 currently_checking_set: &mut CurrentlyCheckingSet,
1578 delayed_approvals_timers: &mut DelayedApprovalTimer,
1579 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1580 mode: &mut Mode,
1581 actions: Vec<Action>,
1582 max_approval_retries: u32,
1583 retry_backoff: Duration,
1584) -> SubsystemResult<bool> {
1585 let mut conclude = false;
1586 let mut actions_iter = actions.into_iter();
1587 while let Some(action) = actions_iter.next() {
1588 match action {
1589 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1590 wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1591 },
1592 Action::IssueApproval(candidate_hash, approval_request) => {
1593 let next_actions: Vec<Action> = issue_approval(
1605 sender,
1606 approval_voting_sender,
1607 state,
1608 overlayed_db,
1609 session_info_provider,
1610 metrics,
1611 candidate_hash,
1612 delayed_approvals_timers,
1613 approval_request,
1614 &wakeups,
1615 )
1616 .await?
1617 .into_iter()
1618 .map(|v| v.clone())
1619 .chain(actions_iter)
1620 .collect();
1621
1622 actions_iter = next_actions.into_iter();
1623 },
1624 Action::LaunchApproval {
1625 claimed_candidate_indices,
1626 candidate_hash,
1627 indirect_cert,
1628 assignment_tranche,
1629 relay_block_hash,
1630 session,
1631 executor_params,
1632 candidate,
1633 backing_group,
1634 distribute_assignment,
1635 core_index,
1636 } => {
1637 if let Mode::Syncing(_) = *mode {
1639 continue
1640 }
1641
1642 metrics.on_assignment_produced(assignment_tranche);
1643 let block_hash = indirect_cert.block_hash;
1644 let validator_index = indirect_cert.validator;
1645
1646 if distribute_assignment {
1647 approval_voting_sender.send_unbounded_message(
1648 ApprovalDistributionMessage::DistributeAssignment(
1649 indirect_cert,
1650 claimed_candidate_indices,
1651 ),
1652 );
1653 }
1654
1655 match approvals_cache.get(&candidate_hash) {
1656 Some(ApprovalOutcome::Approved) => {
1657 let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1658 candidate_hash,
1659 ApprovalVoteRequest { validator_index, block_hash },
1660 ))
1661 .map(|v| v.clone())
1662 .chain(actions_iter)
1663 .collect();
1664 actions_iter = new_actions.into_iter();
1665 },
1666 None => {
1667 let sender = sender.clone();
1668 let spawn_handle = spawn_handle.clone();
1669
1670 let retry = RetryApprovalInfo {
1671 candidate: candidate.clone(),
1672 backing_group,
1673 executor_params: executor_params.clone(),
1674 core_index,
1675 session_index: session,
1676 attempts_remaining: max_approval_retries,
1677 backoff: retry_backoff,
1678 };
1679
1680 currently_checking_set
1681 .insert_relay_block_hash(
1682 candidate_hash,
1683 validator_index,
1684 relay_block_hash,
1685 async move {
1686 launch_approval(
1687 sender,
1688 spawn_handle,
1689 metrics.clone(),
1690 session,
1691 candidate,
1692 validator_index,
1693 block_hash,
1694 backing_group,
1695 executor_params,
1696 core_index,
1697 retry,
1698 )
1699 .await
1700 },
1701 )
1702 .await?;
1703 },
1704 Some(_) => {},
1705 }
1706 },
1707 Action::NoteApprovedInChainSelection(block_hash) => {
1708 sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1709 },
1710 Action::BecomeActive => {
1711 *mode = Mode::Active;
1712
1713 let (messages, next_actions) = distribution_messages_for_activation(
1714 sender,
1715 overlayed_db,
1716 state,
1717 delayed_approvals_timers,
1718 session_info_provider,
1719 )
1720 .await?;
1721 for message in messages.into_iter() {
1722 approval_voting_sender.send_unbounded_message(message);
1723 }
1724 let next_actions: Vec<Action> =
1725 next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1726
1727 actions_iter = next_actions.into_iter();
1728 },
1729 Action::Conclude => {
1730 conclude = true;
1731 },
1732 }
1733 }
1734
1735 Ok(conclude)
1736}
1737
1738fn cores_to_candidate_indices(
1739 core_indices: &CoreBitfield,
1740 block_entry: &BlockEntry,
1741) -> Result<CandidateBitfield, BitfieldError> {
1742 let mut candidate_indices = Vec::new();
1743
1744 for claimed_core_index in core_indices.iter_ones() {
1746 if let Some(candidate_index) = block_entry
1747 .candidates()
1748 .iter()
1749 .position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1750 {
1751 candidate_indices.push(candidate_index as _);
1752 }
1753 }
1754
1755 CandidateBitfield::try_from(candidate_indices)
1756}
1757
1758fn get_core_indices_on_startup(
1761 assignment: &AssignmentCertKindV2,
1762 block_entry_core_index: CoreIndex,
1763) -> CoreBitfield {
1764 match &assignment {
1765 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1766 AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
1767 CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
1768 AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1769 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
1770 }
1771}
1772
1773fn get_assignment_core_indices(
1777 assignment: &AssignmentCertKindV2,
1778 candidate_hash: &CandidateHash,
1779 block_entry: &BlockEntry,
1780) -> Option<CoreBitfield> {
1781 match &assignment {
1782 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
1783 Some(core_bitfield.clone()),
1784 AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1785 .candidates()
1786 .iter()
1787 .find(|(_core_index, h)| candidate_hash == h)
1788 .map(|(core_index, _candidate_hash)| {
1789 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1790 }),
1791 AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1792 Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")),
1793 }
1794}
1795
1796#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1797async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApiMessage>>(
1798 sender: &mut Sender,
1799 db: &OverlayedBackend<'_, impl Backend>,
1800 state: &State,
1801 delayed_approvals_timers: &mut DelayedApprovalTimer,
1802 session_info_provider: &mut RuntimeInfo,
1803) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1804 let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1805
1806 let mut approval_meta = Vec::with_capacity(all_blocks.len());
1807 let mut messages = Vec::new();
1808 let mut approvals = Vec::new();
1809 let mut actions = Vec::new();
1810
1811 messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); for block_hash in all_blocks {
1814 let block_entry = match db.load_block_entry(&block_hash)? {
1815 Some(b) => b,
1816 None => {
1817 gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1818
1819 continue
1820 },
1821 };
1822
1823 approval_meta.push(BlockApprovalMeta {
1824 hash: block_hash,
1825 number: block_entry.block_number(),
1826 parent_hash: block_entry.parent_hash(),
1827 candidates: block_entry
1828 .candidates()
1829 .iter()
1830 .map(|(core_index, c_hash)| {
1831 let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1832 let group_index = candidate
1833 .and_then(|entry| {
1834 entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1835 })
1836 .unwrap_or_else(|| {
1837 gum::warn!(
1838 target: LOG_TARGET,
1839 ?block_hash,
1840 ?c_hash,
1841 "Missing candidate entry or approval entry",
1842 );
1843 GroupIndex::default()
1844 });
1845 (*c_hash, *core_index, group_index)
1846 })
1847 .collect(),
1848 slot: block_entry.slot(),
1849 session: block_entry.session(),
1850 vrf_story: block_entry.relay_vrf_story(),
1851 });
1852 let mut signatures_queued = HashSet::new();
1853 for (core_index, candidate_hash) in block_entry.candidates() {
1854 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1855 Some(c) => c,
1856 None => {
1857 gum::warn!(
1858 target: LOG_TARGET,
1859 ?block_hash,
1860 ?candidate_hash,
1861 "Missing candidate entry",
1862 );
1863
1864 continue
1865 },
1866 };
1867
1868 match candidate_entry.approval_entry(&block_hash) {
1869 Some(approval_entry) => {
1870 match approval_entry.local_statements() {
1871 (None, None) =>
1872 if approval_entry
1873 .our_assignment()
1874 .map(|assignment| !assignment.triggered())
1875 .unwrap_or(false)
1876 {
1877 actions.push(Action::ScheduleWakeup {
1878 block_hash,
1879 block_number: block_entry.block_number(),
1880 candidate_hash: *candidate_hash,
1881 tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1882 })
1883 },
1884 (None, Some(_)) => {}, (Some(assignment), None) => {
1886 let claimed_core_indices =
1887 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1888
1889 if block_entry.has_candidates_pending_signature() {
1890 delayed_approvals_timers.maybe_arm_timer(
1891 state.clock.tick_now(),
1892 state.clock.as_ref(),
1893 block_entry.block_hash(),
1894 assignment.validator_index(),
1895 )
1896 }
1897
1898 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1899 Ok(bitfield) => {
1900 gum::debug!(
1901 target: LOG_TARGET,
1902 candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1903 ?block_hash,
1904 "Discovered, triggered assignment, not approved yet",
1905 );
1906
1907 let indirect_cert = IndirectAssignmentCertV2 {
1908 block_hash,
1909 validator: assignment.validator_index(),
1910 cert: assignment.cert().clone(),
1911 };
1912 messages.push(
1913 ApprovalDistributionMessage::DistributeAssignment(
1914 indirect_cert.clone(),
1915 bitfield.clone(),
1916 ),
1917 );
1918
1919 if !block_entry.candidate_is_pending_signature(*candidate_hash)
1920 {
1921 let ExtendedSessionInfo { ref executor_params, .. } =
1922 match get_extended_session_info(
1923 session_info_provider,
1924 sender,
1925 candidate_entry
1926 .candidate_receipt()
1927 .descriptor()
1928 .relay_parent(),
1929 )
1930 .await
1931 {
1932 Some(i) => i,
1933 None => continue,
1934 };
1935
1936 actions.push(Action::LaunchApproval {
1937 claimed_candidate_indices: bitfield,
1938 candidate_hash: candidate_entry
1939 .candidate_receipt()
1940 .hash(),
1941 indirect_cert,
1942 assignment_tranche: assignment.tranche(),
1943 relay_block_hash: block_hash,
1944 session: block_entry.session(),
1945 executor_params: executor_params.clone(),
1946 candidate: candidate_entry.candidate_receipt().clone(),
1947 backing_group: approval_entry.backing_group(),
1948 distribute_assignment: false,
1949 core_index: Some(*core_index),
1950 });
1951 }
1952 },
1953 Err(err) => {
1954 gum::warn!(
1957 target: LOG_TARGET,
1958 ?block_hash,
1959 ?candidate_hash,
1960 ?err,
1961 "Failed to create assignment bitfield",
1962 );
1963 },
1964 }
1965 },
1966 (Some(assignment), Some(approval_sig)) => {
1967 let claimed_core_indices =
1968 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1969 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1970 Ok(bitfield) => messages.push(
1971 ApprovalDistributionMessage::DistributeAssignment(
1972 IndirectAssignmentCertV2 {
1973 block_hash,
1974 validator: assignment.validator_index(),
1975 cert: assignment.cert().clone(),
1976 },
1977 bitfield,
1978 ),
1979 ),
1980 Err(err) => {
1981 gum::warn!(
1982 target: LOG_TARGET,
1983 ?block_hash,
1984 ?candidate_hash,
1985 ?err,
1986 "Failed to create assignment bitfield",
1987 );
1988 continue
1990 },
1991 }
1992 if signatures_queued
1993 .insert(approval_sig.signed_candidates_indices.clone())
1994 {
1995 approvals.push(ApprovalDistributionMessage::DistributeApproval(
1996 IndirectSignedApprovalVoteV2 {
1997 block_hash,
1998 candidate_indices: approval_sig.signed_candidates_indices,
1999 validator: assignment.validator_index(),
2000 signature: approval_sig.signature,
2001 },
2002 ))
2003 };
2004 },
2005 }
2006 },
2007 None => {
2008 gum::warn!(
2009 target: LOG_TARGET,
2010 ?block_hash,
2011 ?candidate_hash,
2012 "Missing approval entry",
2013 );
2014 },
2015 }
2016 }
2017 }
2018
2019 messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
2020 messages.extend(approvals.into_iter());
2024 Ok((messages, actions))
2025}
2026
2027async fn handle_from_overseer<
2029 Sender: SubsystemSender<ChainApiMessage>
2030 + SubsystemSender<RuntimeApiMessage>
2031 + SubsystemSender<ChainSelectionMessage>
2032 + Clone,
2033 ADSender: SubsystemSender<ApprovalDistributionMessage>,
2034>(
2035 sender: &mut Sender,
2036 approval_voting_sender: &mut ADSender,
2037 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2038 state: &mut State,
2039 db: &mut OverlayedBackend<'_, impl Backend>,
2040 session_info_provider: &mut RuntimeInfo,
2041 metrics: &Metrics,
2042 x: FromOrchestra<ApprovalVotingMessage>,
2043 last_finalized_height: &mut Option<BlockNumber>,
2044 wakeups: &mut Wakeups,
2045) -> SubsystemResult<Vec<Action>> {
2046 let actions = match x {
2047 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2048 let mut actions = Vec::new();
2049 if let Some(activated) = update.activated {
2050 let head = activated.hash;
2051 match import::handle_new_head(
2052 sender,
2053 approval_voting_sender,
2054 state,
2055 db,
2056 session_info_provider,
2057 head,
2058 last_finalized_height,
2059 )
2060 .await
2061 {
2062 Err(e) => return Err(SubsystemError::with_origin("db", e)),
2063 Ok(block_imported_candidates) => {
2064 for block_batch in block_imported_candidates {
2066 gum::debug!(
2067 target: LOG_TARGET,
2068 block_number = ?block_batch.block_number,
2069 block_hash = ?block_batch.block_hash,
2070 num_candidates = block_batch.imported_candidates.len(),
2071 "Imported new block.",
2072 );
2073
2074 state.no_show_stats.maybe_print(block_batch.block_number);
2075
2076 for (c_hash, c_entry) in block_batch.imported_candidates {
2077 metrics.on_candidate_imported();
2078
2079 let our_tranche = c_entry
2080 .approval_entry(&block_batch.block_hash)
2081 .and_then(|a| a.our_assignment().map(|a| a.tranche()));
2082
2083 if let Some(our_tranche) = our_tranche {
2084 let tick = our_tranche as Tick + block_batch.block_tick;
2085 gum::trace!(
2086 target: LOG_TARGET,
2087 tranche = our_tranche,
2088 candidate_hash = ?c_hash,
2089 block_hash = ?block_batch.block_hash,
2090 block_tick = block_batch.block_tick,
2091 "Scheduling first wakeup.",
2092 );
2093
2094 actions.push(Action::ScheduleWakeup {
2098 block_hash: block_batch.block_hash,
2099 block_number: block_batch.block_number,
2100 candidate_hash: c_hash,
2101 tick,
2102 });
2103 }
2104 }
2105 }
2106 },
2107 }
2108 }
2109
2110 actions
2111 },
2112 FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2113 gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2114 *last_finalized_height = Some(block_number);
2115
2116 crate::ops::canonicalize(db, block_number, block_hash)
2117 .map_err(|e| SubsystemError::with_origin("db", e))?;
2118
2119 wakeups.prune_finalized_wakeups(block_number);
2122 state.cleanup_assignments_gathering_timestamp(block_number);
2123
2124 Vec::new()
2130 },
2131 FromOrchestra::Signal(OverseerSignal::Conclude) => {
2132 vec![Action::Conclude]
2133 },
2134 FromOrchestra::Communication { msg } => match msg {
2135 ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2136 let (check_outcome, actions) =
2137 import_assignment(sender, state, db, session_info_provider, checked_assignment)
2138 .await?;
2139 if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2143 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2144 }
2145 let _ = tx.map(|tx| tx.send(check_outcome));
2146 actions
2147 },
2148 ApprovalVotingMessage::ImportApproval(a, tx) => {
2149 let result =
2150 import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2151 .await?;
2152 if let ApprovalCheckResult::Bad(ref err) = result.1 {
2156 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2157 }
2158 let _ = tx.map(|tx| tx.send(result.1));
2159
2160 result.0
2161 },
2162 ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2163 match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2164 .await
2165 {
2166 Ok(v) => {
2167 let _ = res.send(v);
2168 },
2169 Err(e) => {
2170 let _ = res.send(None);
2171 return Err(e)
2172 },
2173 }
2174
2175 Vec::new()
2176 },
2177 ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2178 metrics.on_candidate_signatures_request();
2179 get_approval_signatures_for_candidate(
2180 approval_voting_sender.clone(),
2181 spawn_handle,
2182 db,
2183 candidate_hash,
2184 tx,
2185 )
2186 .await?;
2187 Vec::new()
2188 },
2189 },
2190 };
2191
2192 Ok(actions)
2193}
2194
2195#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2200async fn get_approval_signatures_for_candidate<
2201 Sender: SubsystemSender<ApprovalDistributionMessage>,
2202>(
2203 mut sender: Sender,
2204 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2205 db: &OverlayedBackend<'_, impl Backend>,
2206 candidate_hash: CandidateHash,
2207 tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2208) -> SubsystemResult<()> {
2209 let send_votes = |votes| {
2210 if let Err(_) = tx.send(votes) {
2211 gum::debug!(
2212 target: LOG_TARGET,
2213 "Sending approval signatures back failed, as receiver got closed."
2214 );
2215 }
2216 };
2217 let entry = match db.load_candidate_entry(&candidate_hash)? {
2218 None => {
2219 send_votes(HashMap::new());
2220 gum::debug!(
2221 target: LOG_TARGET,
2222 ?candidate_hash,
2223 "Sent back empty votes because the candidate was not found in db."
2224 );
2225 return Ok(())
2226 },
2227 Some(e) => e,
2228 };
2229
2230 let relay_hashes = entry.block_assignments.keys();
2231
2232 let mut candidate_indices = HashSet::new();
2233 let mut candidate_indices_to_candidate_hashes: HashMap<
2234 Hash,
2235 HashMap<CandidateIndex, CandidateHash>,
2236 > = HashMap::new();
2237
2238 for hash in relay_hashes {
2240 let entry = match db.load_block_entry(hash)? {
2241 None => {
2242 gum::debug!(
2243 target: LOG_TARGET,
2244 ?candidate_hash,
2245 ?hash,
2246 "Block entry for assignment missing."
2247 );
2248 continue
2249 },
2250 Some(e) => e,
2251 };
2252 for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2253 if c_hash == &candidate_hash {
2254 candidate_indices.insert((*hash, candidate_index as u32));
2255 }
2256 candidate_indices_to_candidate_hashes
2257 .entry(*hash)
2258 .or_default()
2259 .insert(candidate_index as _, *c_hash);
2260 }
2261 }
2262
2263 let get_approvals = async move {
2264 let (tx_distribution, rx_distribution) = oneshot::channel();
2265 sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2266 candidate_indices,
2267 tx_distribution,
2268 ));
2269
2270 match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2273 None => {
2274 gum::warn!(
2275 target: LOG_TARGET,
2276 "Waiting for approval signatures timed out - dead lock?"
2277 );
2278 },
2279 Some(Err(_)) => gum::debug!(
2280 target: LOG_TARGET,
2281 "Request for approval signatures got cancelled by `approval-distribution`."
2282 ),
2283 Some(Ok(votes)) => {
2284 let votes = votes
2285 .into_iter()
2286 .filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2287 let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2288
2289 if candidates_hashes.is_none() {
2290 gum::warn!(
2291 target: LOG_TARGET,
2292 ?hash,
2293 "Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2294 );
2295 }
2296
2297 let num_signed_candidates = signed_candidates_indices.len();
2298
2299 let signed_candidates_hashes: Vec<CandidateHash> =
2300 signed_candidates_indices
2301 .into_iter()
2302 .filter_map(|candidate_index| {
2303 candidates_hashes.and_then(|candidate_hashes| {
2304 if let Some(candidate_hash) =
2305 candidate_hashes.get(&candidate_index)
2306 {
2307 Some(*candidate_hash)
2308 } else {
2309 gum::warn!(
2310 target: LOG_TARGET,
2311 ?candidate_index,
2312 "Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2313 );
2314 None
2315 }
2316 })
2317 })
2318 .collect();
2319 if num_signed_candidates == signed_candidates_hashes.len() {
2320 Some((validator_index, (signed_candidates_hashes, signature)))
2321 } else {
2322 gum::warn!(
2323 target: LOG_TARGET,
2324 "Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2325 );
2326 None
2327 }
2328 })
2329 .collect();
2330 send_votes(votes)
2331 },
2332 }
2333 };
2334
2335 gum::trace!(
2338 target: LOG_TARGET,
2339 ?candidate_hash,
2340 "Spawning task for fetching signatures from approval-distribution"
2341 );
2342 spawn_handle.spawn(
2343 "get-approval-signatures",
2344 Some("approval-voting-subsystem"),
2345 Box::pin(get_approvals),
2346 );
2347 Ok(())
2348}
2349
2350#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2351async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2352 sender: &mut Sender,
2353 db: &OverlayedBackend<'_, impl Backend>,
2354 target: Hash,
2355 lower_bound: BlockNumber,
2356 wakeups: &Wakeups,
2357 metrics: &Metrics,
2358) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2359 const MAX_TRACING_WINDOW: usize = 200;
2360 const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2361 const LOGGING_DEPTH_THRESHOLD: usize = 10;
2362
2363 let mut all_approved_max = None;
2364
2365 let target_number = {
2366 let (tx, rx) = oneshot::channel();
2367
2368 sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2369
2370 match rx.await {
2371 Ok(Ok(Some(n))) => n,
2372 Ok(Ok(None)) => return Ok(None),
2373 Ok(Err(_)) | Err(_) => return Ok(None),
2374 }
2375 };
2376
2377 if target_number <= lower_bound {
2378 return Ok(None)
2379 }
2380
2381 let ancestry = if target_number > lower_bound + 1 {
2385 let (tx, rx) = oneshot::channel();
2386
2387 sender
2388 .send_message(ChainApiMessage::Ancestors {
2389 hash: target,
2390 k: (target_number - (lower_bound + 1)) as usize,
2391 response_channel: tx,
2392 })
2393 .await;
2394
2395 match rx.await {
2396 Ok(Ok(a)) => a,
2397 Err(_) | Ok(Err(_)) => return Ok(None),
2398 }
2399 } else {
2400 Vec::new()
2401 };
2402 let ancestry_len = ancestry.len();
2403
2404 let mut block_descriptions = Vec::new();
2405
2406 let mut bits: BitVec<u8, Lsb0> = Default::default();
2407 for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2408 let entry = match db.load_block_entry(&block_hash)? {
2412 None => {
2413 let block_number = target_number.saturating_sub(i as u32);
2414 gum::info!(
2415 target: LOG_TARGET,
2416 unknown_number = ?block_number,
2417 unknown_hash = ?block_hash,
2418 "Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2419 target,
2420 target_number,
2421 lower_bound,
2422 lower_bound,
2423 );
2424 return Ok(None)
2425 },
2426 Some(b) => b,
2427 };
2428
2429 bits.push(entry.is_fully_approved());
2432 if entry.is_fully_approved() {
2433 if all_approved_max.is_none() {
2434 all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2437 }
2438 block_descriptions.push(BlockDescription {
2439 block_hash,
2440 session: entry.session(),
2441 candidates: entry
2442 .candidates()
2443 .iter()
2444 .map(|(_idx, candidate_hash)| *candidate_hash)
2445 .collect(),
2446 });
2447 } else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2448 all_approved_max = None;
2449 block_descriptions.clear();
2450 } else {
2451 all_approved_max = None;
2452 block_descriptions.clear();
2453
2454 let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2455 gum::debug!(
2456 target: LOG_TARGET,
2457 "Block {} is {} blocks deep and has {}/{} candidates unapproved",
2458 block_hash,
2459 bits.len() - 1,
2460 unapproved.len(),
2461 entry.candidates().len(),
2462 );
2463 if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2464 {
2465 gum::trace!(
2466 target: LOG_TARGET,
2467 ?block_hash,
2468 "Unapproved candidates at depth {}: {:?}",
2469 bits.len(),
2470 unapproved
2471 )
2472 }
2473 metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2474 for candidate_hash in unapproved {
2475 match db.load_candidate_entry(&candidate_hash)? {
2476 None => {
2477 gum::warn!(
2478 target: LOG_TARGET,
2479 ?candidate_hash,
2480 "Missing expected candidate in DB",
2481 );
2482
2483 continue
2484 },
2485 Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2486 None => {
2487 gum::warn!(
2488 target: LOG_TARGET,
2489 ?candidate_hash,
2490 ?block_hash,
2491 "Missing expected approval entry under candidate.",
2492 );
2493 },
2494 Some(a_entry) => {
2495 let status = || {
2496 let n_assignments = a_entry.n_assignments();
2497
2498 let n_approvals = c_entry
2501 .approvals()
2502 .iter()
2503 .by_vals()
2504 .enumerate()
2505 .filter(|(i, approved)| {
2506 *approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2507 })
2508 .count();
2509
2510 format!(
2511 "{}/{}/{}",
2512 n_assignments,
2513 n_approvals,
2514 a_entry.n_validators(),
2515 )
2516 };
2517
2518 match a_entry.our_assignment() {
2519 None => gum::debug!(
2520 target: LOG_TARGET,
2521 ?candidate_hash,
2522 ?block_hash,
2523 status = %status(),
2524 "no assignment."
2525 ),
2526 Some(a) => {
2527 let tranche = a.tranche();
2528 let triggered = a.triggered();
2529
2530 let next_wakeup =
2531 wakeups.wakeup_for(block_hash, candidate_hash);
2532
2533 let approved =
2534 triggered && { a_entry.local_statements().1.is_some() };
2535
2536 gum::debug!(
2537 target: LOG_TARGET,
2538 ?candidate_hash,
2539 ?block_hash,
2540 tranche,
2541 ?next_wakeup,
2542 status = %status(),
2543 triggered,
2544 approved,
2545 "assigned."
2546 );
2547 },
2548 }
2549 },
2550 },
2551 }
2552 }
2553 }
2554 }
2555
2556 gum::debug!(
2557 target: LOG_TARGET,
2558 "approved blocks {}-[{}]-{}",
2559 target_number,
2560 {
2561 let mut s = String::with_capacity(bits.len());
2565 for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2566 s.push(if *bit { '1' } else { '0' });
2567 if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 {
2568 s.push(' ');
2569 }
2570 }
2571
2572 s
2573 },
2574 if bits.len() > MAX_TRACING_WINDOW {
2575 format!(
2576 "{}... (truncated due to large window)",
2577 target_number - MAX_TRACING_WINDOW as u32 + 1,
2578 )
2579 } else {
2580 format!("{}", lower_bound + 1)
2581 },
2582 );
2583
2584 block_descriptions.reverse();
2587
2588 let all_approved_max =
2589 all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2590 hash,
2591 number: block_number,
2592 descriptions: block_descriptions,
2593 });
2594
2595 Ok(all_approved_max)
2596}
2597
2598fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2600 match (a, b) {
2601 (None, None) => None,
2602 (None, Some(x)) | (Some(x), None) => Some(x),
2603 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2604 }
2605}
2606
2607fn schedule_wakeup_action(
2608 approval_entry: &ApprovalEntry,
2609 block_hash: Hash,
2610 block_number: BlockNumber,
2611 candidate_hash: CandidateHash,
2612 block_tick: Tick,
2613 tick_now: Tick,
2614 required_tranches: RequiredTranches,
2615) -> Option<Action> {
2616 let maybe_action = match required_tranches {
2617 _ if approval_entry.is_approved() => None,
2618 RequiredTranches::All => None,
2619 RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2620 min_prefer_some(
2623 last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2624 next_no_show,
2625 )
2626 .map(|tick| Action::ScheduleWakeup {
2627 block_hash,
2628 block_number,
2629 candidate_hash,
2630 tick,
2631 })
2632 },
2633 RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2634 let next_non_empty_tranche = {
2638 let next_announced = approval_entry
2639 .tranches()
2640 .iter()
2641 .skip_while(|t| t.tranche() <= considered)
2642 .map(|t| t.tranche())
2643 .next();
2644
2645 let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2646 if !t.triggered() && t.tranche() > considered {
2647 Some(t.tranche())
2648 } else {
2649 None
2650 }
2651 });
2652
2653 min_prefer_some(next_announced, our_untriggered)
2655 .map(|t| t as Tick + block_tick + clock_drift)
2656 };
2657
2658 min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2659 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2660 })
2661 },
2662 };
2663
2664 match maybe_action {
2665 Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2666 target: LOG_TARGET,
2667 tick,
2668 ?candidate_hash,
2669 ?block_hash,
2670 block_tick,
2671 "Scheduling next wakeup.",
2672 ),
2673 None => gum::trace!(
2674 target: LOG_TARGET,
2675 ?candidate_hash,
2676 ?block_hash,
2677 block_tick,
2678 "No wakeup needed.",
2679 ),
2680 Some(_) => {}, }
2682
2683 maybe_action
2684}
2685
2686async fn import_assignment<Sender>(
2687 sender: &mut Sender,
2688 state: &State,
2689 db: &mut OverlayedBackend<'_, impl Backend>,
2690 session_info_provider: &mut RuntimeInfo,
2691 checked_assignment: CheckedIndirectAssignment,
2692) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2693where
2694 Sender: SubsystemSender<RuntimeApiMessage>,
2695{
2696 let tick_now = state.clock.tick_now();
2697 let assignment = checked_assignment.assignment();
2698 let candidate_indices = checked_assignment.candidate_indices();
2699 let tranche = checked_assignment.tranche();
2700
2701 let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2702 Some(b) => b,
2703 None =>
2704 return Ok((
2705 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2706 assignment.block_hash,
2707 )),
2708 Vec::new(),
2709 )),
2710 };
2711
2712 let session_info = match get_session_info_by_index(
2713 session_info_provider,
2714 sender,
2715 block_entry.parent_hash(),
2716 block_entry.session(),
2717 )
2718 .await
2719 {
2720 Some(s) => s,
2721 None =>
2722 return Ok((
2723 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2724 block_entry.session(),
2725 )),
2726 Vec::new(),
2727 )),
2728 };
2729
2730 let n_cores = session_info.n_cores as usize;
2731
2732 if candidate_indices.len() > n_cores {
2735 gum::debug!(
2736 target: LOG_TARGET,
2737 validator = assignment.validator.0,
2738 n_cores,
2739 candidate_bitfield_len = ?candidate_indices.len(),
2740 "Oversized bitfield",
2741 );
2742
2743 return Ok((
2744 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2745 candidate_indices.len(),
2746 )),
2747 Vec::new(),
2748 ))
2749 }
2750
2751 let mut claimed_core_indices = Vec::new();
2752 let mut assigned_candidate_hashes = Vec::new();
2753
2754 for candidate_index in candidate_indices.iter_ones() {
2755 let (claimed_core_index, assigned_candidate_hash) =
2756 match block_entry.candidate(candidate_index) {
2757 Some((c, h)) => (*c, *h),
2758 None =>
2759 return Ok((
2760 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2761 candidate_index as _,
2762 )),
2763 Vec::new(),
2764 )), };
2766
2767 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2768 Some(c) => c,
2769 None =>
2770 return Ok((
2771 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2772 candidate_index as _,
2773 assigned_candidate_hash,
2774 )),
2775 Vec::new(),
2776 )), };
2778
2779 if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2780 return Ok((
2781 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2782 assignment.block_hash,
2783 assigned_candidate_hash,
2784 )),
2785 Vec::new(),
2786 ));
2787 };
2788
2789 claimed_core_indices.push(claimed_core_index);
2790 assigned_candidate_hashes.push(assigned_candidate_hash);
2791 }
2792
2793 if claimed_core_indices.is_empty() {
2795 return Ok((
2796 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2797 assignment.validator,
2798 format!("{:?}", InvalidAssignmentReason::NullAssignment),
2799 )),
2800 Vec::new(),
2801 ))
2802 }
2803
2804 let mut actions = Vec::new();
2805 let res = {
2806 let mut is_duplicate = true;
2807 for (assigned_candidate_hash, candidate_index) in
2809 assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2810 {
2811 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2812 Some(c) => c,
2813 None =>
2814 return Ok((
2815 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2816 candidate_index as _,
2817 *assigned_candidate_hash,
2818 )),
2819 Vec::new(),
2820 )),
2821 };
2822
2823 let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2824 Some(a) => a,
2825 None =>
2826 return Ok((
2827 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2828 assignment.block_hash,
2829 *assigned_candidate_hash,
2830 )),
2831 Vec::new(),
2832 )),
2833 };
2834
2835 let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2836 is_duplicate &= is_duplicate_for_candidate;
2837 approval_entry.import_assignment(
2838 tranche,
2839 assignment.validator,
2840 tick_now,
2841 is_duplicate_for_candidate,
2842 );
2843
2844 if let Some((approval_entry, status)) = state
2847 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2848 .await
2849 {
2850 actions.extend(schedule_wakeup_action(
2851 approval_entry,
2852 block_entry.block_hash(),
2853 block_entry.block_number(),
2854 *assigned_candidate_hash,
2855 status.block_tick,
2856 tick_now,
2857 status.required_tranches,
2858 ));
2859 }
2860
2861 db.write_candidate_entry(candidate_entry.into());
2863 }
2864
2865 if is_duplicate {
2871 AssignmentCheckResult::AcceptedDuplicate
2872 } else if candidate_indices.count_ones() > 1 {
2873 gum::trace!(
2874 target: LOG_TARGET,
2875 validator = assignment.validator.0,
2876 candidate_hashes = ?assigned_candidate_hashes,
2877 assigned_cores = ?claimed_core_indices,
2878 ?tranche,
2879 "Imported assignments for multiple cores.",
2880 );
2881
2882 AssignmentCheckResult::Accepted
2883 } else {
2884 gum::trace!(
2885 target: LOG_TARGET,
2886 validator = assignment.validator.0,
2887 candidate_hashes = ?assigned_candidate_hashes,
2888 assigned_cores = ?claimed_core_indices,
2889 "Imported assignment for a single core.",
2890 );
2891
2892 AssignmentCheckResult::Accepted
2893 }
2894 };
2895
2896 Ok((res, actions))
2897}
2898
2899async fn import_approval<Sender>(
2900 sender: &mut Sender,
2901 state: &mut State,
2902 db: &mut OverlayedBackend<'_, impl Backend>,
2903 session_info_provider: &mut RuntimeInfo,
2904 metrics: &Metrics,
2905 approval: CheckedIndirectSignedApprovalVote,
2906 wakeups: &Wakeups,
2907) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2908where
2909 Sender: SubsystemSender<RuntimeApiMessage>,
2910{
2911 macro_rules! respond_early {
2912 ($e: expr) => {{
2913 return Ok((Vec::new(), $e))
2914 }};
2915 }
2916
2917 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2918 Some(b) => b,
2919 None => {
2920 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2921 approval.block_hash
2922 ),))
2923 },
2924 };
2925
2926 let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2927 approval
2928 .candidate_indices
2929 .iter_ones()
2930 .map(|candidate_index| {
2931 block_entry
2932 .candidate(candidate_index)
2933 .ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2934 .map(|candidate| (candidate_index as _, candidate.1))
2935 })
2936 .collect();
2937
2938 let approved_candidates_info = match approved_candidates_info {
2939 Ok(approved_candidates_info) => approved_candidates_info,
2940 Err(err) => {
2941 respond_early!(ApprovalCheckResult::Bad(err))
2942 },
2943 };
2944
2945 gum::trace!(
2946 target: LOG_TARGET,
2947 "Received approval for num_candidates {:}",
2948 approval.candidate_indices.count_ones()
2949 );
2950
2951 let mut actions = Vec::new();
2952 for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2953 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2954 Some(b) => b,
2955 None => {
2956 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2957 approval.block_hash
2958 ),))
2959 },
2960 };
2961
2962 let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2963 Some(c) => c,
2964 None => {
2965 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2966 approval_candidate_index,
2967 approved_candidate_hash
2968 ),))
2969 },
2970 };
2971
2972 match candidate_entry.approval_entry(&approval.block_hash) {
2974 None => {
2975 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2976 approval.block_hash,
2977 approved_candidate_hash
2978 ),))
2979 },
2980 Some(e) if !e.is_assigned(approval.validator) => {
2981 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2982 approval.validator
2983 ),))
2984 },
2985 _ => {},
2986 }
2987
2988 gum::trace!(
2989 target: LOG_TARGET,
2990 validator_index = approval.validator.0,
2991 candidate_hash = ?approved_candidate_hash,
2992 para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2993 "Importing approval vote",
2994 );
2995
2996 let new_actions = advance_approval_state(
2997 sender,
2998 state,
2999 db,
3000 session_info_provider,
3001 &metrics,
3002 block_entry,
3003 approved_candidate_hash,
3004 candidate_entry,
3005 ApprovalStateTransition::RemoteApproval(approval.validator),
3006 wakeups,
3007 )
3008 .await;
3009 actions.extend(new_actions);
3010 }
3011
3012 Ok((actions, ApprovalCheckResult::Accepted))
3014}
3015
3016#[derive(Debug)]
3017enum ApprovalStateTransition {
3018 RemoteApproval(ValidatorIndex),
3019 LocalApproval(ValidatorIndex),
3020 WakeupProcessed,
3021}
3022
3023impl ApprovalStateTransition {
3024 fn validator_index(&self) -> Option<ValidatorIndex> {
3025 match *self {
3026 ApprovalStateTransition::RemoteApproval(v) |
3027 ApprovalStateTransition::LocalApproval(v) => Some(v),
3028 ApprovalStateTransition::WakeupProcessed => None,
3029 }
3030 }
3031
3032 fn is_local_approval(&self) -> bool {
3033 match *self {
3034 ApprovalStateTransition::RemoteApproval(_) => false,
3035 ApprovalStateTransition::LocalApproval(_) => true,
3036 ApprovalStateTransition::WakeupProcessed => false,
3037 }
3038 }
3039
3040 fn is_remote_approval(&self) -> bool {
3041 matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3042 }
3043}
3044
3045async fn advance_approval_state<Sender>(
3050 sender: &mut Sender,
3051 state: &mut State,
3052 db: &mut OverlayedBackend<'_, impl Backend>,
3053 session_info_provider: &mut RuntimeInfo,
3054 metrics: &Metrics,
3055 mut block_entry: BlockEntry,
3056 candidate_hash: CandidateHash,
3057 mut candidate_entry: CandidateEntry,
3058 transition: ApprovalStateTransition,
3059 wakeups: &Wakeups,
3060) -> Vec<Action>
3061where
3062 Sender: SubsystemSender<RuntimeApiMessage>,
3063{
3064 let validator_index = transition.validator_index();
3065
3066 let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3067 let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3068
3069 if !transition.is_local_approval() {
3079 if candidate_approved_in_block {
3083 return Vec::new()
3084 }
3085 }
3086
3087 let mut actions = Vec::new();
3088 let block_hash = block_entry.block_hash();
3089 let block_number = block_entry.block_number();
3090 let session_index = block_entry.session();
3091 let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3092 let tick_now = state.clock.tick_now();
3093
3094 let (is_approved, status) = if let Some((approval_entry, status)) = state
3095 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3096 .await
3097 {
3098 let check = approval_checking::check_approval(
3099 &candidate_entry,
3100 approval_entry,
3101 status.required_tranches.clone(),
3102 );
3103 state.observe_assignment_gathering_status(
3104 &metrics,
3105 &status.required_tranches,
3106 block_hash,
3107 block_entry.block_number(),
3108 candidate_hash,
3109 );
3110
3111 let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3115 if status.last_no_shows != 0 {
3116 metrics.on_observed_no_shows(status.last_no_shows);
3117 gum::trace!(
3118 target: LOG_TARGET,
3119 ?candidate_hash,
3120 ?block_hash,
3121 last_no_shows = ?status.last_no_shows,
3122 "Observed no_shows",
3123 );
3124 }
3125 if is_approved {
3126 gum::trace!(
3127 target: LOG_TARGET,
3128 ?candidate_hash,
3129 ?block_hash,
3130 "Candidate approved under block.",
3131 );
3132
3133 let no_shows = check.known_no_shows();
3134
3135 let was_block_approved = block_entry.is_fully_approved();
3136 block_entry.mark_approved_by_hash(&candidate_hash);
3137 let is_block_approved = block_entry.is_fully_approved();
3138
3139 if no_shows != 0 {
3140 metrics.on_no_shows(no_shows);
3141 }
3142 if check == Check::ApprovedOneThird {
3143 metrics.on_approved_by_one_third()
3147 }
3148
3149 metrics.on_candidate_approved(status.tranche_now as _);
3150
3151 if is_block_approved && !was_block_approved {
3152 metrics.on_block_approved(status.tranche_now as _);
3153 actions.push(Action::NoteApprovedInChainSelection(block_hash));
3154 }
3155
3156 db.write_block_entry(block_entry.into());
3157 } else if transition.is_local_approval() {
3158 db.write_block_entry(block_entry.into());
3161 }
3162
3163 (is_approved, status)
3164 } else {
3165 gum::warn!(
3166 target: LOG_TARGET,
3167 ?candidate_hash,
3168 ?block_hash,
3169 ?validator_index,
3170 "No approval entry for approval under block",
3171 );
3172
3173 return Vec::new()
3174 };
3175
3176 {
3177 let approval_entry = candidate_entry
3178 .approval_entry_mut(&block_hash)
3179 .expect("Approval entry just fetched; qed");
3180
3181 let was_approved = approval_entry.is_approved();
3182 let newly_approved = is_approved && !was_approved;
3183
3184 if is_approved {
3185 approval_entry.mark_approved();
3186 }
3187 if newly_approved {
3188 state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3189 }
3190 actions.extend(schedule_wakeup_action(
3191 &approval_entry,
3192 block_hash,
3193 block_number,
3194 candidate_hash,
3195 status.block_tick,
3196 tick_now,
3197 status.required_tranches,
3198 ));
3199
3200 if is_approved && transition.is_remote_approval() {
3201 for (fork_block_hash, fork_approval_entry) in candidate_entry
3204 .block_assignments
3205 .iter()
3206 .filter(|(hash, _)| **hash != block_hash)
3207 {
3208 let assigned_on_fork_block = validator_index
3209 .as_ref()
3210 .map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3211 .unwrap_or_default();
3212 if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3213 !fork_approval_entry.is_approved() &&
3214 assigned_on_fork_block
3215 {
3216 let fork_block_entry = db.load_block_entry(fork_block_hash);
3217 if let Ok(Some(fork_block_entry)) = fork_block_entry {
3218 actions.push(Action::ScheduleWakeup {
3219 block_hash: *fork_block_hash,
3220 block_number: fork_block_entry.block_number(),
3221 candidate_hash,
3222 tick: tick_now + 1,
3225 })
3226 } else {
3227 gum::debug!(
3228 target: LOG_TARGET,
3229 ?fork_block_entry,
3230 ?fork_block_hash,
3231 "Failed to load block entry"
3232 )
3233 }
3234 }
3235 }
3236 }
3237 if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3246 {
3247 db.write_candidate_entry(candidate_entry);
3249 }
3250 }
3251
3252 actions
3253}
3254
3255fn should_trigger_assignment(
3256 approval_entry: &ApprovalEntry,
3257 candidate_entry: &CandidateEntry,
3258 required_tranches: RequiredTranches,
3259 tranche_now: DelayTranche,
3260) -> bool {
3261 match approval_entry.our_assignment() {
3262 None => false,
3263 Some(ref assignment) if assignment.triggered() => false,
3264 Some(ref assignment) if assignment.tranche() == 0 => true,
3265 Some(ref assignment) => {
3266 match required_tranches {
3267 RequiredTranches::All => !approval_checking::check_approval(
3268 &candidate_entry,
3269 &approval_entry,
3270 RequiredTranches::All,
3271 )
3272 .is_approved(Tick::max_value()),
3274 RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3275 let drifted_tranche_now =
3276 tranche_now.saturating_sub(clock_drift as DelayTranche);
3277 assignment.tranche() <= maximum_broadcast &&
3278 assignment.tranche() <= drifted_tranche_now
3279 },
3280 RequiredTranches::Exact { .. } => {
3281 false
3283 },
3284 }
3285 },
3286 }
3287}
3288
3289async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3290 sender: &mut Sender,
3291 state: &mut State,
3292 db: &mut OverlayedBackend<'_, impl Backend>,
3293 session_info_provider: &mut RuntimeInfo,
3294 relay_block: Hash,
3295 candidate_hash: CandidateHash,
3296 metrics: &Metrics,
3297 wakeups: &Wakeups,
3298) -> SubsystemResult<Vec<Action>> {
3299 let block_entry = db.load_block_entry(&relay_block)?;
3300 let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3301
3302 let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3304 (Some(b), Some(c)) => (b, c),
3305 _ => return Ok(Vec::new()),
3306 };
3307
3308 let (no_show_slots, needed_approvals) = match get_session_info_by_index(
3309 session_info_provider,
3310 sender,
3311 block_entry.block_hash(),
3312 block_entry.session(),
3313 )
3314 .await
3315 {
3316 Some(i) => (i.no_show_slots, i.needed_approvals),
3317 None => return Ok(Vec::new()),
3318 };
3319
3320 let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3321 let no_show_duration =
3322 slot_number_to_tick(state.slot_duration_millis, Slot::from(u64::from(no_show_slots)));
3323 let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3324
3325 gum::trace!(
3326 target: LOG_TARGET,
3327 tranche = tranche_now,
3328 ?candidate_hash,
3329 block_hash = ?relay_block,
3330 "Processing wakeup",
3331 );
3332
3333 let (should_trigger, backing_group) = {
3334 let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3335 Some(e) => e,
3336 None => return Ok(Vec::new()),
3337 };
3338
3339 let tranches_to_approve = approval_checking::tranches_to_approve(
3340 &approval_entry,
3341 candidate_entry.approvals(),
3342 tranche_now,
3343 block_tick,
3344 no_show_duration,
3345 needed_approvals as _,
3346 );
3347
3348 let should_trigger = should_trigger_assignment(
3349 &approval_entry,
3350 &candidate_entry,
3351 tranches_to_approve.required_tranches,
3352 tranche_now,
3353 );
3354
3355 (should_trigger, approval_entry.backing_group())
3356 };
3357
3358 gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3359
3360 let mut actions = Vec::new();
3361 let candidate_receipt = candidate_entry.candidate_receipt().clone();
3362
3363 let maybe_cert = if should_trigger {
3364 let maybe_cert = {
3365 let approval_entry = candidate_entry
3366 .approval_entry_mut(&relay_block)
3367 .expect("should_trigger only true if this fetched earlier; qed");
3368
3369 approval_entry.trigger_our_assignment(state.clock.tick_now())
3370 };
3371
3372 db.write_candidate_entry(candidate_entry.clone());
3373
3374 maybe_cert
3375 } else {
3376 None
3377 };
3378
3379 if let Some((cert, val_index, tranche)) = maybe_cert {
3380 let ExtendedSessionInfo { ref executor_params, .. } = match get_extended_session_info(
3381 session_info_provider,
3382 sender,
3383 candidate_entry.candidate_receipt().descriptor().relay_parent(),
3384 )
3385 .await
3386 {
3387 Some(i) => i,
3388 None => return Ok(actions),
3389 };
3390 let indirect_cert =
3391 IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3392
3393 gum::trace!(
3394 target: LOG_TARGET,
3395 ?candidate_hash,
3396 para_id = ?candidate_receipt.descriptor.para_id(),
3397 block_hash = ?relay_block,
3398 "Launching approval work.",
3399 );
3400
3401 let candidate_core_index = block_entry
3402 .candidates()
3403 .iter()
3404 .find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3405
3406 if let Some(claimed_core_indices) =
3407 get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3408 {
3409 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3410 Ok(claimed_candidate_indices) => {
3411 let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3413 !block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3414 } else {
3415 true
3416 };
3417 db.write_block_entry(block_entry.clone());
3418 actions.push(Action::LaunchApproval {
3419 claimed_candidate_indices,
3420 candidate_hash,
3421 indirect_cert,
3422 assignment_tranche: tranche,
3423 relay_block_hash: relay_block,
3424 session: block_entry.session(),
3425 executor_params: executor_params.clone(),
3426 candidate: candidate_receipt,
3427 backing_group,
3428 distribute_assignment,
3429 core_index: candidate_core_index,
3430 });
3431 },
3432 Err(err) => {
3433 gum::warn!(
3436 target: LOG_TARGET,
3437 block_hash = ?relay_block,
3438 ?err,
3439 "Failed to create assignment bitfield"
3440 );
3441 },
3442 };
3443 } else {
3444 gum::warn!(
3445 target: LOG_TARGET,
3446 block_hash = ?relay_block,
3447 ?candidate_hash,
3448 "Cannot get assignment claimed core indices",
3449 );
3450 }
3451 }
3452 actions.extend(
3459 advance_approval_state(
3460 sender,
3461 state,
3462 db,
3463 session_info_provider,
3464 metrics,
3465 block_entry,
3466 candidate_hash,
3467 candidate_entry,
3468 ApprovalStateTransition::WakeupProcessed,
3469 wakeups,
3470 )
3471 .await,
3472 );
3473
3474 Ok(actions)
3475}
3476
3477#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3481async fn launch_approval<
3482 Sender: SubsystemSender<RuntimeApiMessage>
3483 + SubsystemSender<AvailabilityRecoveryMessage>
3484 + SubsystemSender<DisputeCoordinatorMessage>
3485 + SubsystemSender<CandidateValidationMessage>,
3486>(
3487 mut sender: Sender,
3488 spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3489 metrics: Metrics,
3490 session_index: SessionIndex,
3491 candidate: CandidateReceipt,
3492 validator_index: ValidatorIndex,
3493 block_hash: Hash,
3494 backing_group: GroupIndex,
3495 executor_params: ExecutorParams,
3496 core_index: Option<CoreIndex>,
3497 retry: RetryApprovalInfo,
3498) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3499 let (a_tx, a_rx) = oneshot::channel();
3500 let (code_tx, code_rx) = oneshot::channel();
3501
3502 struct StaleGuard(Option<Metrics>);
3506
3507 impl StaleGuard {
3508 fn take(mut self) -> Metrics {
3509 self.0.take().expect(
3510 "
3511 consumed after take; so this cannot be called twice; \
3512 nothing in this function reaches into the struct to avoid this API; \
3513 qed
3514 ",
3515 )
3516 }
3517 }
3518
3519 impl Drop for StaleGuard {
3520 fn drop(&mut self) {
3521 if let Some(metrics) = self.0.as_ref() {
3522 metrics.on_approval_stale();
3523 }
3524 }
3525 }
3526
3527 let candidate_hash = candidate.hash();
3528 let para_id = candidate.descriptor.para_id();
3529 let mut next_retry = None;
3530 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3531
3532 let timer = metrics.time_recover_and_approve();
3533 sender
3534 .send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3535 candidate.clone(),
3536 session_index,
3537 Some(backing_group),
3538 core_index,
3539 a_tx,
3540 ))
3541 .await;
3542
3543 sender
3544 .send_message(RuntimeApiMessage::Request(
3545 block_hash,
3546 RuntimeApiRequest::ValidationCodeByHash(
3547 candidate.descriptor.validation_code_hash(),
3548 code_tx,
3549 ),
3550 ))
3551 .await;
3552
3553 let candidate = candidate.clone();
3554 let metrics_guard = StaleGuard(Some(metrics));
3555 let background = async move {
3556 let _timer = timer;
3558 let available_data = match a_rx.await {
3559 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3560 Ok(Ok(a)) => a,
3561 Ok(Err(e)) => {
3562 match &e {
3563 &RecoveryError::Unavailable => {
3564 gum::warn!(
3565 target: LOG_TARGET,
3566 ?para_id,
3567 ?candidate_hash,
3568 attempts_remaining = retry.attempts_remaining,
3569 "Data unavailable for candidate {:?}",
3570 (candidate_hash, candidate.descriptor.para_id()),
3571 );
3572 if retry.attempts_remaining > 0 {
3576 Delay::new(retry.backoff).await;
3577 next_retry = Some(RetryApprovalInfo {
3578 candidate,
3579 backing_group,
3580 executor_params,
3581 core_index,
3582 session_index,
3583 attempts_remaining: retry.attempts_remaining - 1,
3584 backoff: retry.backoff,
3585 });
3586 } else {
3587 next_retry = None;
3588 }
3589 metrics_guard.take().on_approval_unavailable();
3590 },
3591 &RecoveryError::ChannelClosed => {
3592 gum::warn!(
3593 target: LOG_TARGET,
3594 ?para_id,
3595 ?candidate_hash,
3596 "Channel closed while recovering data for candidate {:?}",
3597 (candidate_hash, candidate.descriptor.para_id()),
3598 );
3599 metrics_guard.take().on_approval_unavailable();
3601 },
3602 &RecoveryError::Invalid => {
3603 gum::warn!(
3604 target: LOG_TARGET,
3605 ?para_id,
3606 ?candidate_hash,
3607 "Data recovery invalid for candidate {:?}",
3608 (candidate_hash, candidate.descriptor.para_id()),
3609 );
3610 issue_local_invalid_statement(
3611 &mut sender,
3612 session_index,
3613 candidate_hash,
3614 candidate.clone(),
3615 );
3616 metrics_guard.take().on_approval_invalid();
3617 },
3618 }
3619 return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
3620 },
3621 };
3622
3623 let validation_code = match code_rx.await {
3624 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3625 Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3626 Ok(Ok(Some(code))) => code,
3627 Ok(Ok(None)) => {
3628 gum::warn!(
3629 target: LOG_TARGET,
3630 "Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3631 candidate.descriptor.relay_parent(),
3632 block_hash,
3633 );
3634
3635 metrics_guard.take().on_approval_unavailable();
3638 return ApprovalState::failed(validator_index, candidate_hash)
3639 },
3640 };
3641
3642 let (val_tx, val_rx) = oneshot::channel();
3643 sender
3644 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
3645 validation_data: available_data.validation_data,
3646 validation_code,
3647 candidate_receipt: candidate.clone(),
3648 pov: available_data.pov,
3649 executor_params,
3650 exec_kind: PvfExecKind::Approval,
3651 response_sender: val_tx,
3652 })
3653 .await;
3654
3655 match val_rx.await {
3656 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3657 Ok(Ok(ValidationResult::Valid(_, _))) => {
3658 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3662
3663 let _ = metrics_guard.take();
3664 return ApprovalState::approved(validator_index, candidate_hash)
3665 },
3666 Ok(Ok(ValidationResult::Invalid(reason))) => {
3667 gum::warn!(
3668 target: LOG_TARGET,
3669 ?reason,
3670 ?candidate_hash,
3671 ?para_id,
3672 "Detected invalid candidate as an approval checker.",
3673 );
3674
3675 issue_local_invalid_statement(
3676 &mut sender,
3677 session_index,
3678 candidate_hash,
3679 candidate.clone(),
3680 );
3681 metrics_guard.take().on_approval_invalid();
3682 return ApprovalState::failed(validator_index, candidate_hash)
3683 },
3684 Ok(Err(e)) => {
3685 gum::error!(
3686 target: LOG_TARGET,
3687 err = ?e,
3688 ?candidate_hash,
3689 ?para_id,
3690 "Failed to validate candidate due to internal error",
3691 );
3692 metrics_guard.take().on_approval_error();
3693 return ApprovalState::failed(validator_index, candidate_hash)
3694 },
3695 }
3696 };
3697 let (background, remote_handle) = background.remote_handle();
3698 spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3699 Ok(remote_handle)
3700}
3701
3702#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3705async fn issue_approval<
3706 Sender: SubsystemSender<RuntimeApiMessage>,
3707 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3708>(
3709 sender: &mut Sender,
3710 approval_voting_sender: &mut ADSender,
3711 state: &mut State,
3712 db: &mut OverlayedBackend<'_, impl Backend>,
3713 session_info_provider: &mut RuntimeInfo,
3714 metrics: &Metrics,
3715 candidate_hash: CandidateHash,
3716 delayed_approvals_timers: &mut DelayedApprovalTimer,
3717 ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3718 wakeups: &Wakeups,
3719) -> SubsystemResult<Vec<Action>> {
3720 let mut block_entry = match db.load_block_entry(&block_hash)? {
3721 Some(b) => b,
3722 None => {
3723 metrics.on_approval_stale();
3725 return Ok(Vec::new())
3726 },
3727 };
3728
3729 let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3730 {
3731 None => {
3732 gum::warn!(
3733 target: LOG_TARGET,
3734 "Candidate hash {} is not present in the block entry's candidates for relay block {}",
3735 candidate_hash,
3736 block_entry.parent_hash(),
3737 );
3738
3739 metrics.on_approval_error();
3740 return Ok(Vec::new())
3741 },
3742 Some(idx) => idx,
3743 };
3744
3745 let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3746 Some((_, h)) => *h,
3747 None => {
3748 gum::warn!(
3749 target: LOG_TARGET,
3750 "Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3751 candidate_index,
3752 block_hash,
3753 );
3754
3755 metrics.on_approval_error();
3756 return Ok(Vec::new())
3757 },
3758 };
3759
3760 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3761 Some(c) => c,
3762 None => {
3763 gum::warn!(
3764 target: LOG_TARGET,
3765 "Missing entry for candidate index {} included at block {:?}",
3766 candidate_index,
3767 block_hash,
3768 );
3769
3770 metrics.on_approval_error();
3771 return Ok(Vec::new())
3772 },
3773 };
3774
3775 let session_info = match get_session_info_by_index(
3776 session_info_provider,
3777 sender,
3778 block_entry.parent_hash(),
3779 block_entry.session(),
3780 )
3781 .await
3782 {
3783 Some(s) => s,
3784 None => return Ok(Vec::new()),
3785 };
3786
3787 if block_entry
3788 .defer_candidate_signature(
3789 candidate_index as _,
3790 candidate_hash,
3791 compute_delayed_approval_sending_tick(
3792 state,
3793 &block_entry,
3794 &candidate_entry,
3795 session_info,
3796 &metrics,
3797 ),
3798 )
3799 .is_some()
3800 {
3801 gum::error!(
3802 target: LOG_TARGET,
3803 ?candidate_hash,
3804 ?block_hash,
3805 validator_index = validator_index.0,
3806 "Possible bug, we shouldn't have to defer a candidate more than once",
3807 );
3808 }
3809
3810 gum::debug!(
3811 target: LOG_TARGET,
3812 ?candidate_hash,
3813 ?block_hash,
3814 validator_index = validator_index.0,
3815 "Ready to issue approval vote",
3816 );
3817
3818 let actions = advance_approval_state(
3819 sender,
3820 state,
3821 db,
3822 session_info_provider,
3823 metrics,
3824 block_entry,
3825 candidate_hash,
3826 candidate_entry,
3827 ApprovalStateTransition::LocalApproval(validator_index as _),
3828 wakeups,
3829 )
3830 .await;
3831
3832 if let Some(next_wakeup) = maybe_create_signature(
3833 db,
3834 session_info_provider,
3835 state,
3836 sender,
3837 approval_voting_sender,
3838 block_hash,
3839 validator_index,
3840 metrics,
3841 )
3842 .await?
3843 {
3844 delayed_approvals_timers.maybe_arm_timer(
3845 next_wakeup,
3846 state.clock.as_ref(),
3847 block_hash,
3848 validator_index,
3849 );
3850 }
3851 Ok(actions)
3852}
3853
3854#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3856async fn maybe_create_signature<
3857 Sender: SubsystemSender<RuntimeApiMessage>,
3858 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3859>(
3860 db: &mut OverlayedBackend<'_, impl Backend>,
3861 session_info_provider: &mut RuntimeInfo,
3862 state: &State,
3863 sender: &mut Sender,
3864 approval_voting_sender: &mut ADSender,
3865 block_hash: Hash,
3866 validator_index: ValidatorIndex,
3867 metrics: &Metrics,
3868) -> SubsystemResult<Option<Tick>> {
3869 let mut block_entry = match db.load_block_entry(&block_hash)? {
3870 Some(b) => b,
3871 None => {
3872 metrics.on_approval_stale();
3874 gum::debug!(
3875 target: LOG_TARGET,
3876 "Could not find block that needs signature {:}", block_hash
3877 );
3878 return Ok(None)
3879 },
3880 };
3881
3882 let approval_params = state
3883 .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3884 .await
3885 .unwrap_or_default();
3886
3887 gum::trace!(
3888 target: LOG_TARGET,
3889 "Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3890 );
3891 let tick_now = state.clock.tick_now();
3892
3893 let (candidates_to_sign, sign_no_later_then) = block_entry
3894 .get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3895
3896 let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3897 Some(candidates_to_sign) => candidates_to_sign,
3898 None => return Ok(sign_no_later_then),
3899 };
3900
3901 let session_info = match get_session_info_by_index(
3902 session_info_provider,
3903 sender,
3904 block_entry.parent_hash(),
3905 block_entry.session(),
3906 )
3907 .await
3908 {
3909 Some(s) => s,
3910 None => {
3911 metrics.on_approval_error();
3912 gum::error!(
3913 target: LOG_TARGET,
3914 "Could not retrieve the session"
3915 );
3916 return Ok(None)
3917 },
3918 };
3919
3920 let validator_pubkey = match session_info.validators.get(validator_index) {
3921 Some(p) => p,
3922 None => {
3923 gum::error!(
3924 target: LOG_TARGET,
3925 "Validator index {} out of bounds in session {}",
3926 validator_index.0,
3927 block_entry.session(),
3928 );
3929
3930 metrics.on_approval_error();
3931 return Ok(None)
3932 },
3933 };
3934
3935 let signature = match sign_approval(
3936 &state.keystore,
3937 &validator_pubkey,
3938 &candidates_hashes,
3939 block_entry.session(),
3940 ) {
3941 Some(sig) => sig,
3942 None => {
3943 gum::error!(
3944 target: LOG_TARGET,
3945 validator_index = ?validator_index,
3946 session = ?block_entry.session(),
3947 "Could not issue approval signature. Assignment key present but not validator key?",
3948 );
3949
3950 metrics.on_approval_error();
3951 return Ok(None)
3952 },
3953 };
3954 metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3955
3956 let candidate_entries = candidates_hashes
3957 .iter()
3958 .map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3959 .collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3960
3961 for mut candidate_entry in candidate_entries {
3962 let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3963 candidate_entry.approval_entry_mut(&block_entry.block_hash())
3964 });
3965
3966 match approval_entry {
3967 Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3968 signature: signature.clone(),
3969 signed_candidates_indices: candidates_indices.clone(),
3970 }),
3971 None => {
3972 gum::error!(
3973 target: LOG_TARGET,
3974 candidate_entry = ?candidate_entry,
3975 "Candidate scheduled for signing approval entry should not be None"
3976 );
3977 },
3978 };
3979 candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3980 }
3981
3982 metrics.on_approval_produced();
3983
3984 approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3985 IndirectSignedApprovalVoteV2 {
3986 block_hash: block_entry.block_hash(),
3987 candidate_indices: candidates_indices,
3988 validator: validator_index,
3989 signature,
3990 },
3991 ));
3992
3993 gum::trace!(
3994 target: LOG_TARGET,
3995 ?block_hash,
3996 signed_candidates = ?block_entry.num_candidates_pending_signature(),
3997 "Issue approval votes",
3998 );
3999 block_entry.issued_approval();
4000 db.write_block_entry(block_entry.into());
4001 Ok(None)
4002}
4003
4004fn sign_approval(
4006 keystore: &LocalKeystore,
4007 public: &ValidatorId,
4008 candidate_hashes: &[CandidateHash],
4009 session_index: SessionIndex,
4010) -> Option<ValidatorSignature> {
4011 let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
4012
4013 let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
4014
4015 Some(key.sign(&payload[..]))
4016}
4017
4018fn issue_local_invalid_statement<Sender>(
4020 sender: &mut Sender,
4021 session_index: SessionIndex,
4022 candidate_hash: CandidateHash,
4023 candidate: CandidateReceipt,
4024) where
4025 Sender: SubsystemSender<DisputeCoordinatorMessage>,
4026{
4027 sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
4037 session_index,
4038 candidate_hash,
4039 candidate.clone(),
4040 false,
4041 ));
4042}
4043
4044fn compute_delayed_approval_sending_tick(
4046 state: &State,
4047 block_entry: &BlockEntry,
4048 candidate_entry: &CandidateEntry,
4049 session_info: &SessionInfo,
4050 metrics: &Metrics,
4051) -> Tick {
4052 let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4053 let assignment_tranche = candidate_entry
4054 .approval_entry(&block_entry.block_hash())
4055 .and_then(|approval_entry| approval_entry.our_assignment())
4056 .map(|our_assignment| our_assignment.tranche())
4057 .unwrap_or_default();
4058
4059 let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4060
4061 let no_show_duration_ticks = slot_number_to_tick(
4062 state.slot_duration_millis,
4063 Slot::from(u64::from(session_info.no_show_slots)),
4064 );
4065 let tick_now = state.clock.tick_now();
4066
4067 let sign_no_later_than = min(
4068 tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4069 assignment_triggered_tick + no_show_duration_ticks / 2,
4073 );
4074
4075 metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4076 sign_no_later_than
4077}