1use futures_timer::Delay;
25use polkadot_node_primitives::{
26 approval::{
27 v1::{BlockApprovalMeta, DelayTranche},
28 v2::{
29 AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30 IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31 },
32 },
33 ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 errors::RecoveryError,
37 messages::{
38 ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39 ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40 AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41 ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42 DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43 RuntimeApiRequest,
44 },
45 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46 SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49 self,
50 database::Database,
51 metrics::{self, prometheus},
52 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53 TimeoutExt,
54};
55use polkadot_primitives::{
56 ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57 CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex,
58 Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair,
59 ValidatorSignature,
60};
61use sc_keystore::LocalKeystore;
62use sp_application_crypto::Pair;
63use sp_consensus::SyncOracle;
64use sp_consensus_slots::Slot;
65use std::time::Instant;
66
67const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
71
72use futures::{
73 channel::oneshot,
74 future::{BoxFuture, RemoteHandle},
75 prelude::*,
76 stream::FuturesUnordered,
77 StreamExt,
78};
79
80use std::{
81 cmp::min,
82 collections::{
83 btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
84 },
85 sync::Arc,
86 time::Duration,
87};
88
89use schnellru::{ByLength, LruMap};
90
91use approval_checking::RequiredTranches;
92use bitvec::{order::Lsb0, vec::BitVec};
93pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
94use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
95use polkadot_node_primitives::approval::time::{
96 slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
97};
98
99mod approval_checking;
100pub mod approval_db;
101mod backend;
102pub mod criteria;
103mod import;
104mod ops;
105mod persisted_entries;
106
107use crate::{
108 approval_checking::{Check, TranchesToApproveResult},
109 approval_db::common::{Config as DatabaseConfig, DbBackend},
110 backend::{Backend, OverlayedBackend},
111 criteria::InvalidAssignmentReason,
112 persisted_entries::OurApproval,
113};
114
115#[cfg(test)]
116mod tests;
117
118const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
119const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
124const APPROVAL_CACHE_SIZE: u32 = 1024;
125
126const MAX_APPROVAL_RETRIES: u32 = 16;
128
129const APPROVAL_DELAY: Tick = 2;
130pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
131
132const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
134
135const RESTART_WAKEUP_DELAY: Tick = 12;
144
145#[derive(Debug, Clone)]
147pub struct Config {
148 pub col_approval_data: u32,
150 pub slot_duration_millis: u64,
153}
154
155enum Mode {
165 Active,
166 Syncing(Box<dyn SyncOracle + Send>),
167}
168
169pub struct ApprovalVotingSubsystem {
171 keystore: Arc<LocalKeystore>,
175 db_config: DatabaseConfig,
176 slot_duration_millis: u64,
177 db: Arc<dyn Database>,
178 mode: Mode,
179 metrics: Metrics,
180 clock: Arc<dyn Clock + Send + Sync>,
181 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
182 max_approval_retries: u32,
184 retry_backoff: Duration,
186}
187
188#[derive(Clone)]
189struct MetricsInner {
190 imported_candidates_total: prometheus::Counter<prometheus::U64>,
191 assignments_produced: prometheus::Histogram,
192 approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
193 no_shows_total: prometheus::Counter<prometheus::U64>,
194 observed_no_shows: prometheus::Counter<prometheus::U64>,
198 approved_by_one_third: prometheus::Counter<prometheus::U64>,
199 wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
200 coalesced_approvals_buckets: prometheus::Histogram,
201 coalesced_approvals_delay: prometheus::Histogram,
202 candidate_approval_time_ticks: prometheus::Histogram,
203 block_approval_time_ticks: prometheus::Histogram,
204 time_db_transaction: prometheus::Histogram,
205 time_recover_and_approve: prometheus::Histogram,
206 candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
207 unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
208 assignments_gathering_time_by_stage: prometheus::HistogramVec,
216}
217
218#[derive(Default, Clone)]
220pub struct Metrics(Option<MetricsInner>);
221
222impl Metrics {
223 fn on_candidate_imported(&self) {
224 if let Some(metrics) = &self.0 {
225 metrics.imported_candidates_total.inc();
226 }
227 }
228
229 fn on_assignment_produced(&self, tranche: DelayTranche) {
230 if let Some(metrics) = &self.0 {
231 metrics.assignments_produced.observe(tranche as f64);
232 }
233 }
234
235 fn on_approval_coalesce(&self, num_coalesced: u32) {
236 if let Some(metrics) = &self.0 {
237 for _ in 0..num_coalesced {
240 metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
241 }
242 }
243 }
244
245 fn on_delayed_approval(&self, delayed_ticks: u64) {
246 if let Some(metrics) = &self.0 {
247 metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
248 }
249 }
250
251 fn on_approval_stale(&self) {
252 if let Some(metrics) = &self.0 {
253 metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
254 }
255 }
256
257 fn on_approval_invalid(&self) {
258 if let Some(metrics) = &self.0 {
259 metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
260 }
261 }
262
263 fn on_approval_unavailable(&self) {
264 if let Some(metrics) = &self.0 {
265 metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
266 }
267 }
268
269 fn on_approval_error(&self) {
270 if let Some(metrics) = &self.0 {
271 metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
272 }
273 }
274
275 fn on_approval_produced(&self) {
276 if let Some(metrics) = &self.0 {
277 metrics.approvals_produced_total.with_label_values(&["success"]).inc()
278 }
279 }
280
281 fn on_no_shows(&self, n: usize) {
282 if let Some(metrics) = &self.0 {
283 metrics.no_shows_total.inc_by(n as u64);
284 }
285 }
286
287 fn on_observed_no_shows(&self, n: usize) {
288 if let Some(metrics) = &self.0 {
289 metrics.observed_no_shows.inc_by(n as u64);
290 }
291 }
292
293 fn on_approved_by_one_third(&self) {
294 if let Some(metrics) = &self.0 {
295 metrics.approved_by_one_third.inc();
296 }
297 }
298
299 fn on_wakeup(&self) {
300 if let Some(metrics) = &self.0 {
301 metrics.wakeups_triggered_total.inc();
302 }
303 }
304
305 fn on_candidate_approved(&self, ticks: Tick) {
306 if let Some(metrics) = &self.0 {
307 metrics.candidate_approval_time_ticks.observe(ticks as f64);
308 }
309 }
310
311 fn on_block_approved(&self, ticks: Tick) {
312 if let Some(metrics) = &self.0 {
313 metrics.block_approval_time_ticks.observe(ticks as f64);
314 }
315 }
316
317 fn on_candidate_signatures_request(&self) {
318 if let Some(metrics) = &self.0 {
319 metrics.candidate_signatures_requests_total.inc();
320 }
321 }
322
323 fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
324 self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
325 }
326
327 fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
328 self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
329 }
330
331 fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
332 if let Some(metrics) = &self.0 {
333 metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
334 }
335 }
336
337 pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
338 if let Some(metrics) = &self.0 {
339 let stage_string = stage.to_string();
340 metrics
345 .assignments_gathering_time_by_stage
346 .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
347 .observe(elapsed_as_millis as f64);
348 }
349 }
350}
351
352impl metrics::Metrics for Metrics {
353 fn try_register(
354 registry: &prometheus::Registry,
355 ) -> std::result::Result<Self, prometheus::PrometheusError> {
356 let metrics = MetricsInner {
357 imported_candidates_total: prometheus::register(
358 prometheus::Counter::new(
359 "polkadot_parachain_imported_candidates_total",
360 "Number of candidates imported by the approval voting subsystem",
361 )?,
362 registry,
363 )?,
364 assignments_produced: prometheus::register(
365 prometheus::Histogram::with_opts(
366 prometheus::HistogramOpts::new(
367 "polkadot_parachain_assignments_produced",
368 "Assignments and tranches produced by the approval voting subsystem",
369 ).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
370 )?,
371 registry,
372 )?,
373 approvals_produced_total: prometheus::register(
374 prometheus::CounterVec::new(
375 prometheus::Opts::new(
376 "polkadot_parachain_approvals_produced_total",
377 "Number of approvals produced by the approval voting subsystem",
378 ),
379 &["status"]
380 )?,
381 registry,
382 )?,
383 no_shows_total: prometheus::register(
384 prometheus::Counter::new(
385 "polkadot_parachain_approvals_no_shows_total",
386 "Number of assignments which became no-shows in the approval voting subsystem",
387 )?,
388 registry,
389 )?,
390 observed_no_shows: prometheus::register(
391 prometheus::Counter::new(
392 "polkadot_parachain_approvals_observed_no_shows_total",
393 "Number of observed no shows at any moment in time",
394 )?,
395 registry,
396 )?,
397 wakeups_triggered_total: prometheus::register(
398 prometheus::Counter::new(
399 "polkadot_parachain_approvals_wakeups_total",
400 "Number of times we woke up to process a candidate in the approval voting subsystem",
401 )?,
402 registry,
403 )?,
404 candidate_approval_time_ticks: prometheus::register(
405 prometheus::Histogram::with_opts(
406 prometheus::HistogramOpts::new(
407 "polkadot_parachain_approvals_candidate_approval_time_ticks",
408 "Number of ticks (500ms) to approve candidates.",
409 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
410 )?,
411 registry,
412 )?,
413 coalesced_approvals_buckets: prometheus::register(
414 prometheus::Histogram::with_opts(
415 prometheus::HistogramOpts::new(
416 "polkadot_parachain_approvals_coalesced_approvals_buckets",
417 "Number of coalesced approvals.",
418 ).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
419 )?,
420 registry,
421 )?,
422 coalesced_approvals_delay: prometheus::register(
423 prometheus::Histogram::with_opts(
424 prometheus::HistogramOpts::new(
425 "polkadot_parachain_approvals_coalescing_delay",
426 "Number of ticks we delay the sending of a candidate approval",
427 ).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
428 )?,
429 registry,
430 )?,
431 approved_by_one_third: prometheus::register(
432 prometheus::Counter::new(
433 "polkadot_parachain_approved_by_one_third",
434 "Number of candidates where more than one third had to vote ",
435 )?,
436 registry,
437 )?,
438 block_approval_time_ticks: prometheus::register(
439 prometheus::Histogram::with_opts(
440 prometheus::HistogramOpts::new(
441 "polkadot_parachain_approvals_blockapproval_time_ticks",
442 "Number of ticks (500ms) to approve blocks.",
443 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
444 )?,
445 registry,
446 )?,
447 time_db_transaction: prometheus::register(
448 prometheus::Histogram::with_opts(
449 prometheus::HistogramOpts::new(
450 "polkadot_parachain_time_approval_db_transaction",
451 "Time spent writing an approval db transaction.",
452 )
453 )?,
454 registry,
455 )?,
456 time_recover_and_approve: prometheus::register(
457 prometheus::Histogram::with_opts(
458 prometheus::HistogramOpts::new(
459 "polkadot_parachain_time_recover_and_approve",
460 "Time spent recovering and approving data in approval voting",
461 )
462 )?,
463 registry,
464 )?,
465 candidate_signatures_requests_total: prometheus::register(
466 prometheus::Counter::new(
467 "polkadot_parachain_approval_candidate_signatures_requests_total",
468 "Number of times signatures got requested by other subsystems",
469 )?,
470 registry,
471 )?,
472 unapproved_candidates_in_unfinalized_chain: prometheus::register(
473 prometheus::Gauge::new(
474 "polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
475 "Number of unapproved candidates in unfinalized chain",
476 )?,
477 registry,
478 )?,
479 assignments_gathering_time_by_stage: prometheus::register(
480 prometheus::HistogramVec::new(
481 prometheus::HistogramOpts::new(
482 "polkadot_parachain_assignments_gather_time_by_stage_ms",
483 "The time in ms it takes for each stage to gather enough assignments needed for approval",
484 )
485 .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
486 &["stage"],
487 )?,
488 registry,
489 )?,
490 };
491
492 Ok(Metrics(Some(metrics)))
493 }
494}
495
496impl ApprovalVotingSubsystem {
497 pub fn with_config(
499 config: Config,
500 db: Arc<dyn Database>,
501 keystore: Arc<LocalKeystore>,
502 sync_oracle: Box<dyn SyncOracle + Send>,
503 metrics: Metrics,
504 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
505 ) -> Self {
506 ApprovalVotingSubsystem::with_config_and_clock(
507 config,
508 db,
509 keystore,
510 sync_oracle,
511 metrics,
512 Arc::new(SystemClock {}),
513 spawner,
514 MAX_APPROVAL_RETRIES,
515 APPROVAL_CHECKING_TIMEOUT / 2,
516 )
517 }
518
519 pub fn with_config_and_clock(
521 config: Config,
522 db: Arc<dyn Database>,
523 keystore: Arc<LocalKeystore>,
524 sync_oracle: Box<dyn SyncOracle + Send>,
525 metrics: Metrics,
526 clock: Arc<dyn Clock + Send + Sync>,
527 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
528 max_approval_retries: u32,
529 retry_backoff: Duration,
530 ) -> Self {
531 ApprovalVotingSubsystem {
532 keystore,
533 slot_duration_millis: config.slot_duration_millis,
534 db,
535 db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
536 mode: Mode::Syncing(sync_oracle),
537 metrics,
538 clock,
539 spawner,
540 max_approval_retries,
541 retry_backoff,
542 }
543 }
544
545 pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
548 let config =
549 approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
550 let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
551 let mut overlay = OverlayedBackend::new(&backend);
552
553 ops::revert_to(&mut overlay, hash)?;
554
555 let ops = overlay.into_write_ops();
556 backend.write(ops)
557 }
558}
559
560fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
563 let backend = DbBackend::new(db, config);
564 let all_blocks = backend.load_all_blocks()?;
565
566 if all_blocks.is_empty() {
567 gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
568 } else {
569 gum::debug!(
570 target: LOG_TARGET,
571 "Starting with {} blocks in approval vote DB.",
572 all_blocks.len()
573 );
574 }
575
576 Ok(())
577}
578
579#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
580impl<Context: Send> ApprovalVotingSubsystem {
581 fn start(self, mut ctx: Context) -> SpawnedSubsystem {
582 let backend = DbBackend::new(self.db.clone(), self.db_config);
583 let to_other_subsystems = ctx.sender().clone();
584 let to_approval_distr = ctx.sender().clone();
585
586 let future = run::<DbBackend, _, _, _>(
587 ctx,
588 to_other_subsystems,
589 to_approval_distr,
590 self,
591 Box::new(RealAssignmentCriteria),
592 backend,
593 )
594 .map_err(|e| SubsystemError::with_origin("approval-voting", e))
595 .boxed();
596
597 SpawnedSubsystem { name: "approval-voting-subsystem", future }
598 }
599}
600
601#[derive(Debug, Clone)]
602struct ApprovalVoteRequest {
603 validator_index: ValidatorIndex,
604 block_hash: Hash,
605}
606
607#[derive(Default)]
608struct Wakeups {
609 wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
611 reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
612 block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
613}
614
615impl Wakeups {
616 fn first(&self) -> Option<Tick> {
618 self.wakeups.keys().next().map(|t| *t)
619 }
620
621 fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
622 self.block_numbers.entry(block_number).or_default().insert(block_hash);
623 }
624
625 fn schedule(
628 &mut self,
629 block_hash: Hash,
630 block_number: BlockNumber,
631 candidate_hash: CandidateHash,
632 tick: Tick,
633 ) {
634 if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
635 if prev <= &tick {
636 return
637 }
638
639 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
641 if let Some(pos) =
642 entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
643 {
644 entry.get_mut().remove(pos);
645 }
646
647 if entry.get().is_empty() {
648 let _ = entry.remove_entry();
649 }
650 }
651 } else {
652 self.note_block(block_hash, block_number);
653 }
654
655 self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
656 self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
657 }
658
659 fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
660 let after = self.block_numbers.split_off(&(finalized_number + 1));
661 let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
662 .into_iter()
663 .flat_map(|(_number, hashes)| hashes)
664 .collect();
665
666 let mut pruned_wakeups = BTreeMap::new();
667 self.reverse_wakeups.retain(|(h, c_h), tick| {
668 let live = !pruned_blocks.contains(h);
669 if !live {
670 pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
671 }
672 live
673 });
674
675 for (tick, pruned) in pruned_wakeups {
676 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
677 entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
678 if entry.get().is_empty() {
679 let _ = entry.remove();
680 }
681 }
682 }
683 }
684
685 fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
687 self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
688 }
689
690 async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
692 match self.first() {
693 None => future::pending().await,
694 Some(tick) => {
695 clock.wait(tick).await;
696 match self.wakeups.entry(tick) {
697 BTMEntry::Vacant(_) => {
698 panic!("entry is known to exist since `first` was `Some`; qed")
699 },
700 BTMEntry::Occupied(mut entry) => {
701 let (hash, candidate_hash) = entry.get_mut().pop()
702 .expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
703
704 if entry.get().is_empty() {
705 let _ = entry.remove();
706 }
707
708 self.reverse_wakeups.remove(&(hash, candidate_hash));
709
710 (tick, hash, candidate_hash)
711 },
712 }
713 },
714 }
715 }
716}
717
718struct ApprovalStatus {
719 required_tranches: RequiredTranches,
720 tranche_now: DelayTranche,
721 block_tick: Tick,
722 last_no_shows: usize,
723 no_show_validators: Vec<ValidatorIndex>,
724}
725
726#[derive(Copy, Clone)]
727enum ApprovalOutcome {
728 Approved,
729 Failed,
730 TimedOut,
731}
732
733#[derive(Clone)]
734struct RetryApprovalInfo {
735 candidate: CandidateReceipt,
736 backing_group: GroupIndex,
737 executor_params: ExecutorParams,
738 core_index: Option<CoreIndex>,
739 session_index: SessionIndex,
740 attempts_remaining: u32,
741 backoff: Duration,
742}
743
744struct ApprovalState {
745 validator_index: ValidatorIndex,
746 candidate_hash: CandidateHash,
747 approval_outcome: ApprovalOutcome,
748 retry_info: Option<RetryApprovalInfo>,
749}
750
751impl ApprovalState {
752 fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
753 Self {
754 validator_index,
755 candidate_hash,
756 approval_outcome: ApprovalOutcome::Approved,
757 retry_info: None,
758 }
759 }
760 fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
761 Self {
762 validator_index,
763 candidate_hash,
764 approval_outcome: ApprovalOutcome::Failed,
765 retry_info: None,
766 }
767 }
768
769 fn failed_with_retry(
770 validator_index: ValidatorIndex,
771 candidate_hash: CandidateHash,
772 retry_info: Option<RetryApprovalInfo>,
773 ) -> Self {
774 Self {
775 validator_index,
776 candidate_hash,
777 approval_outcome: ApprovalOutcome::Failed,
778 retry_info,
779 }
780 }
781}
782
783struct CurrentlyCheckingSet {
784 candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
785 currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
786}
787
788impl Default for CurrentlyCheckingSet {
789 fn default() -> Self {
790 Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
791 }
792}
793
794impl CurrentlyCheckingSet {
795 pub async fn insert_relay_block_hash(
798 &mut self,
799 candidate_hash: CandidateHash,
800 validator_index: ValidatorIndex,
801 relay_block: Hash,
802 launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
803 ) -> SubsystemResult<()> {
804 match self.candidate_hash_map.entry(candidate_hash) {
805 HMEntry::Occupied(mut entry) => {
806 entry.get_mut().insert(relay_block);
808 },
809 HMEntry::Vacant(entry) => {
810 entry.insert(HashSet::new()).insert(relay_block);
812 let work = launch_work.await?;
813 self.currently_checking.push(Box::pin(async move {
814 match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
815 None => ApprovalState {
816 candidate_hash,
817 validator_index,
818 approval_outcome: ApprovalOutcome::TimedOut,
819 retry_info: None,
820 },
821 Some(approval_state) => approval_state,
822 }
823 }));
824 },
825 }
826
827 Ok(())
828 }
829
830 pub async fn next(
831 &mut self,
832 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
833 ) -> (HashSet<Hash>, ApprovalState) {
834 if !self.currently_checking.is_empty() {
835 if let Some(approval_state) = self.currently_checking.next().await {
836 let out = self
837 .candidate_hash_map
838 .remove(&approval_state.candidate_hash)
839 .unwrap_or_default();
840 approvals_cache
841 .insert(approval_state.candidate_hash, approval_state.approval_outcome);
842 return (out, approval_state)
843 }
844 }
845
846 future::pending().await
847 }
848}
849
850async fn get_extended_session_info<'a, Sender>(
851 runtime_info: &'a mut RuntimeInfo,
852 sender: &mut Sender,
853 relay_parent: Hash,
854 session_index: SessionIndex,
855) -> Option<&'a ExtendedSessionInfo>
856where
857 Sender: SubsystemSender<RuntimeApiMessage>,
858{
859 match runtime_info
860 .get_session_info_by_index(sender, relay_parent, session_index)
861 .await
862 {
863 Ok(extended_info) => Some(&extended_info),
864 Err(_) => {
865 gum::debug!(
866 target: LOG_TARGET,
867 session = session_index,
868 ?relay_parent,
869 "Can't obtain SessionInfo or ExecutorParams"
870 );
871 None
872 },
873 }
874}
875
876async fn get_session_info<'a, Sender>(
877 runtime_info: &'a mut RuntimeInfo,
878 sender: &mut Sender,
879 relay_parent: Hash,
880 session_index: SessionIndex,
881) -> Option<&'a SessionInfo>
882where
883 Sender: SubsystemSender<RuntimeApiMessage>,
884{
885 get_extended_session_info(runtime_info, sender, relay_parent, session_index)
886 .await
887 .map(|extended_info| &extended_info.session_info)
888}
889
890struct State {
891 keystore: Arc<LocalKeystore>,
892 slot_duration_millis: u64,
893 clock: Arc<dyn Clock + Send + Sync>,
894 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
895 per_block_assignments_gathering_times:
899 LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
900 no_show_stats: NoShowStats,
901}
902
903const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
905pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
907
908#[derive(Debug, Clone, PartialEq, Eq, Default)]
913struct NoShowStats {
914 per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
915 per_parachain_no_show: HashMap<u32, usize>,
916 last_dumped_block_number: BlockNumber,
917}
918
919impl NoShowStats {
920 fn maybe_print(&mut self, current_block_number: BlockNumber) {
923 if self.last_dumped_block_number > current_block_number ||
924 current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
925 {
926 return
927 }
928 if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
929 return
930 }
931
932 gum::debug!(
933 target: LOG_TARGET,
934 "Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
935 self.per_validator_no_show,
936 self.per_parachain_no_show,
937 self.last_dumped_block_number
938 );
939
940 self.last_dumped_block_number = current_block_number;
941
942 self.per_validator_no_show.clear();
943 self.per_parachain_no_show.clear();
944 }
945}
946
947#[derive(Debug, Clone, PartialEq, Eq)]
948struct AssignmentGatheringRecord {
949 stage: usize,
954 stage_start: Option<Instant>,
956}
957
958impl Default for AssignmentGatheringRecord {
959 fn default() -> Self {
960 AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
961 }
962}
963
964#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
965impl State {
966 async fn approval_status<Sender, 'a, 'b>(
970 &'a self,
971 sender: &mut Sender,
972 session_info_provider: &'a mut RuntimeInfo,
973 block_entry: &'a BlockEntry,
974 candidate_entry: &'b CandidateEntry,
975 ) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
976 where
977 Sender: SubsystemSender<RuntimeApiMessage>,
978 {
979 let session_info = match get_session_info(
980 session_info_provider,
981 sender,
982 block_entry.parent_hash(),
983 block_entry.session(),
984 )
985 .await
986 {
987 Some(s) => s,
988 None => return None,
989 };
990 let block_hash = block_entry.block_hash();
991
992 let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
993 let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
994 let no_show_duration = slot_number_to_tick(
995 self.slot_duration_millis,
996 Slot::from(u64::from(session_info.no_show_slots)),
997 );
998
999 if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
1000 let TranchesToApproveResult {
1001 required_tranches,
1002 total_observed_no_shows,
1003 no_show_validators,
1004 } = approval_checking::tranches_to_approve(
1005 approval_entry,
1006 candidate_entry.approvals(),
1007 tranche_now,
1008 block_tick,
1009 no_show_duration,
1010 session_info.needed_approvals as _,
1011 );
1012
1013 let status = ApprovalStatus {
1014 required_tranches,
1015 block_tick,
1016 tranche_now,
1017 last_no_shows: total_observed_no_shows,
1018 no_show_validators,
1019 };
1020
1021 Some((approval_entry, status))
1022 } else {
1023 None
1024 }
1025 }
1026
1027 async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1029 &self,
1030 sender: &mut Sender,
1031 session_index: SessionIndex,
1032 block_hash: Hash,
1033 ) -> Option<ApprovalVotingParams> {
1034 let (s_tx, s_rx) = oneshot::channel();
1035
1036 sender
1037 .send_message(RuntimeApiMessage::Request(
1038 block_hash,
1039 RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1040 ))
1041 .await;
1042
1043 match s_rx.await {
1044 Ok(Ok(params)) => {
1045 gum::trace!(
1046 target: LOG_TARGET,
1047 approval_voting_params = ?params,
1048 session = ?session_index,
1049 "Using the following subsystem params"
1050 );
1051 Some(params)
1052 },
1053 Ok(Err(err)) => {
1054 gum::debug!(
1055 target: LOG_TARGET,
1056 ?err,
1057 "Could not request approval voting params from runtime"
1058 );
1059 None
1060 },
1061 Err(err) => {
1062 gum::debug!(
1063 target: LOG_TARGET,
1064 ?err,
1065 "Could not request approval voting params from runtime"
1066 );
1067 None
1068 },
1069 }
1070 }
1071
1072 fn mark_begining_of_gathering_assignments(
1073 &mut self,
1074 block_number: BlockNumber,
1075 block_hash: Hash,
1076 candidate: CandidateHash,
1077 ) {
1078 if let Some(record) = self
1079 .per_block_assignments_gathering_times
1080 .get_or_insert(block_number, HashMap::new)
1081 .and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1082 {
1083 if record.stage_start.is_none() {
1084 record.stage += 1;
1085 gum::debug!(
1086 target: LOG_TARGET,
1087 stage = ?record.stage,
1088 ?block_hash,
1089 ?candidate,
1090 "Started a new assignment gathering stage",
1091 );
1092 record.stage_start = Some(Instant::now());
1093 }
1094 }
1095 }
1096
1097 fn mark_gathered_enough_assignments(
1098 &mut self,
1099 block_number: BlockNumber,
1100 block_hash: Hash,
1101 candidate: CandidateHash,
1102 ) -> AssignmentGatheringRecord {
1103 let record = self
1104 .per_block_assignments_gathering_times
1105 .get(&block_number)
1106 .and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1107 let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1108 AssignmentGatheringRecord {
1109 stage,
1110 stage_start: record.and_then(|record| record.stage_start.take()),
1111 }
1112 }
1113
1114 fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1115 while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1116 {
1117 if *block_number < remove_lower_than {
1118 self.per_block_assignments_gathering_times.pop_oldest();
1119 } else {
1120 break
1121 }
1122 }
1123 }
1124
1125 fn observe_assignment_gathering_status(
1126 &mut self,
1127 metrics: &Metrics,
1128 required_tranches: &RequiredTranches,
1129 block_hash: Hash,
1130 block_number: BlockNumber,
1131 candidate_hash: CandidateHash,
1132 ) {
1133 match required_tranches {
1134 RequiredTranches::All | RequiredTranches::Pending { .. } => {
1135 self.mark_begining_of_gathering_assignments(
1136 block_number,
1137 block_hash,
1138 candidate_hash,
1139 );
1140 },
1141 RequiredTranches::Exact { .. } => {
1142 let time_to_gather =
1143 self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1144 if let Some(gathering_started) = time_to_gather.stage_start {
1145 if gathering_started.elapsed().as_millis() > 6000 {
1146 gum::trace!(
1147 target: LOG_TARGET,
1148 ?block_hash,
1149 ?candidate_hash,
1150 "Long assignment gathering time",
1151 );
1152 }
1153 metrics.observe_assignment_gathering_time(
1154 time_to_gather.stage,
1155 gathering_started.elapsed().as_millis() as usize,
1156 )
1157 }
1158 },
1159 }
1160 }
1161
1162 fn record_no_shows(
1163 &mut self,
1164 session_index: SessionIndex,
1165 para_id: u32,
1166 no_show_validators: &Vec<ValidatorIndex>,
1167 ) {
1168 if !no_show_validators.is_empty() {
1169 *self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1170 }
1171 for validator_index in no_show_validators {
1172 *self
1173 .no_show_stats
1174 .per_validator_no_show
1175 .entry(session_index)
1176 .or_default()
1177 .entry(*validator_index)
1178 .or_default() += 1;
1179 }
1180 }
1181}
1182
1183#[derive(Debug, Clone)]
1184enum Action {
1185 ScheduleWakeup {
1186 block_hash: Hash,
1187 block_number: BlockNumber,
1188 candidate_hash: CandidateHash,
1189 tick: Tick,
1190 },
1191 LaunchApproval {
1192 claimed_candidate_indices: CandidateBitfield,
1193 candidate_hash: CandidateHash,
1194 indirect_cert: IndirectAssignmentCertV2,
1195 assignment_tranche: DelayTranche,
1196 relay_block_hash: Hash,
1197 session: SessionIndex,
1198 executor_params: ExecutorParams,
1199 candidate: CandidateReceipt,
1200 backing_group: GroupIndex,
1201 distribute_assignment: bool,
1202 core_index: Option<CoreIndex>,
1203 },
1204 NoteApprovedInChainSelection(Hash),
1205 IssueApproval(CandidateHash, ApprovalVoteRequest),
1206 BecomeActive,
1207 Conclude,
1208}
1209
1210#[async_trait::async_trait]
1212pub trait ApprovalVotingWorkProvider {
1213 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1214}
1215
1216#[async_trait::async_trait]
1217#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1218impl<Context> ApprovalVotingWorkProvider for Context {
1219 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1220 self.recv().await
1221 }
1222}
1223
1224#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1225async fn run<
1226 B,
1227 WorkProvider: ApprovalVotingWorkProvider,
1228 Sender: SubsystemSender<ChainApiMessage>
1229 + SubsystemSender<RuntimeApiMessage>
1230 + SubsystemSender<ChainSelectionMessage>
1231 + SubsystemSender<AvailabilityRecoveryMessage>
1232 + SubsystemSender<DisputeCoordinatorMessage>
1233 + SubsystemSender<CandidateValidationMessage>
1234 + Clone,
1235 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1236>(
1237 mut work_provider: WorkProvider,
1238 mut to_other_subsystems: Sender,
1239 mut to_approval_distr: ADSender,
1240 mut subsystem: ApprovalVotingSubsystem,
1241 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1242 mut backend: B,
1243) -> SubsystemResult<()>
1244where
1245 B: Backend,
1246{
1247 if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1248 gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1249 }
1250
1251 let mut state = State {
1252 keystore: subsystem.keystore,
1253 slot_duration_millis: subsystem.slot_duration_millis,
1254 clock: subsystem.clock,
1255 assignment_criteria,
1256 per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1257 MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1258 )),
1259 no_show_stats: NoShowStats::default(),
1260 };
1261
1262 let mut last_finalized_height: Option<BlockNumber> = {
1263 let (tx, rx) = oneshot::channel();
1264 to_other_subsystems
1265 .send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1266 .await;
1267 match rx.await? {
1268 Ok(number) => Some(number),
1269 Err(err) => {
1270 gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1271 None
1272 },
1273 }
1274 };
1275
1276 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1278 keystore: None,
1279 session_cache_lru_size: DISPUTE_WINDOW.get(),
1280 });
1281
1282 let mut wakeups = Wakeups::default();
1283 let mut currently_checking_set = CurrentlyCheckingSet::default();
1284 let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1285 let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1286
1287 loop {
1288 let mut overlayed_db = OverlayedBackend::new(&backend);
1289 let actions = futures::select! {
1290 (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1291 subsystem.metrics.on_wakeup();
1292 process_wakeup(
1293 &mut to_other_subsystems,
1294 &mut state,
1295 &mut overlayed_db,
1296 &mut session_info_provider,
1297 woken_block,
1298 woken_candidate,
1299 &subsystem.metrics,
1300 &wakeups,
1301 ).await?
1302 }
1303 next_msg = work_provider.recv().fuse() => {
1304 let mut actions = handle_from_overseer(
1305 &mut to_other_subsystems,
1306 &mut to_approval_distr,
1307 &subsystem.spawner,
1308 &mut state,
1309 &mut overlayed_db,
1310 &mut session_info_provider,
1311 &subsystem.metrics,
1312 next_msg?,
1313 &mut last_finalized_height,
1314 &mut wakeups,
1315 ).await?;
1316
1317 if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1318 if !oracle.is_major_syncing() {
1319 actions.insert(0, Action::BecomeActive)
1321 }
1322 }
1323
1324 actions
1325 }
1326 approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1327 let mut actions = Vec::new();
1328 let (
1329 relay_block_hashes,
1330 ApprovalState {
1331 validator_index,
1332 candidate_hash,
1333 approval_outcome,
1334 retry_info,
1335 }
1336 ) = approval_state;
1337
1338 if matches!(approval_outcome, ApprovalOutcome::Approved) {
1339 let mut approvals: Vec<Action> = relay_block_hashes
1340 .iter()
1341 .map(|block_hash|
1342 Action::IssueApproval(
1343 candidate_hash,
1344 ApprovalVoteRequest {
1345 validator_index,
1346 block_hash: *block_hash,
1347 },
1348 )
1349 )
1350 .collect();
1351 actions.append(&mut approvals);
1352 }
1353
1354 if let Some(retry_info) = retry_info {
1355 for block_hash in relay_block_hashes {
1356 if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1357 let sender = to_other_subsystems.clone();
1358 let spawn_handle = subsystem.spawner.clone();
1359 let metrics = subsystem.metrics.clone();
1360 let retry_info = retry_info.clone();
1361 let executor_params = retry_info.executor_params.clone();
1362 let candidate = retry_info.candidate.clone();
1363
1364 currently_checking_set
1365 .insert_relay_block_hash(
1366 candidate_hash,
1367 validator_index,
1368 block_hash,
1369 async move {
1370 launch_approval(
1371 sender,
1372 spawn_handle,
1373 metrics,
1374 retry_info.session_index,
1375 candidate,
1376 validator_index,
1377 block_hash,
1378 retry_info.backing_group,
1379 executor_params,
1380 retry_info.core_index,
1381 retry_info,
1382 )
1383 .await
1384 },
1385 )
1386 .await?;
1387 }
1388 }
1389 }
1390
1391 actions
1392 },
1393 (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1394 gum::debug!(
1395 target: LOG_TARGET,
1396 ?block_hash,
1397 ?validator_index,
1398 "Sign approval for multiple candidates",
1399 );
1400
1401 match maybe_create_signature(
1402 &mut overlayed_db,
1403 &mut session_info_provider,
1404 &state,
1405 &mut to_other_subsystems,
1406 &mut to_approval_distr,
1407 block_hash,
1408 validator_index,
1409 &subsystem.metrics,
1410 ).await {
1411 Ok(Some(next_wakeup)) => {
1412 delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1413 },
1414 Ok(None) => {}
1415 Err(err) => {
1416 gum::error!(
1417 target: LOG_TARGET,
1418 ?err,
1419 "Failed to create signature",
1420 );
1421 }
1422 }
1423 vec![]
1424 }
1425 };
1426
1427 if handle_actions(
1428 &mut to_other_subsystems,
1429 &mut to_approval_distr,
1430 &subsystem.spawner,
1431 &mut state,
1432 &mut overlayed_db,
1433 &mut session_info_provider,
1434 &subsystem.metrics,
1435 &mut wakeups,
1436 &mut currently_checking_set,
1437 &mut delayed_approvals_timers,
1438 &mut approvals_cache,
1439 &mut subsystem.mode,
1440 actions,
1441 subsystem.max_approval_retries,
1442 subsystem.retry_backoff,
1443 )
1444 .await?
1445 {
1446 break
1447 }
1448
1449 if !overlayed_db.is_empty() {
1450 let _timer = subsystem.metrics.time_db_transaction();
1451 let ops = overlayed_db.into_write_ops();
1452 backend.write(ops)?;
1453 }
1454 }
1455
1456 Ok(())
1457}
1458
1459pub async fn start_approval_worker<
1461 WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1462 Sender: SubsystemSender<ChainApiMessage>
1463 + SubsystemSender<RuntimeApiMessage>
1464 + SubsystemSender<ChainSelectionMessage>
1465 + SubsystemSender<AvailabilityRecoveryMessage>
1466 + SubsystemSender<DisputeCoordinatorMessage>
1467 + SubsystemSender<CandidateValidationMessage>
1468 + Clone,
1469 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1470>(
1471 work_provider: WorkProvider,
1472 to_other_subsystems: Sender,
1473 to_approval_distr: ADSender,
1474 config: Config,
1475 db: Arc<dyn Database>,
1476 keystore: Arc<LocalKeystore>,
1477 sync_oracle: Box<dyn SyncOracle + Send>,
1478 metrics: Metrics,
1479 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1480 task_name: &'static str,
1481 group_name: &'static str,
1482 clock: Arc<dyn Clock + Send + Sync>,
1483) -> SubsystemResult<()> {
1484 let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1485 config,
1486 db.clone(),
1487 keystore,
1488 sync_oracle,
1489 metrics,
1490 clock,
1491 spawner,
1492 MAX_APPROVAL_RETRIES,
1493 APPROVAL_CHECKING_TIMEOUT / 2,
1494 );
1495 let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1496 let spawner = approval_voting.spawner.clone();
1497 spawner.spawn_blocking(
1498 task_name,
1499 Some(group_name),
1500 Box::pin(async move {
1501 if let Err(err) = run(
1502 work_provider,
1503 to_other_subsystems,
1504 to_approval_distr,
1505 approval_voting,
1506 Box::new(RealAssignmentCriteria),
1507 backend,
1508 )
1509 .await
1510 {
1511 gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1512 };
1513 }),
1514 );
1515 Ok(())
1516}
1517
1518#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1538async fn handle_actions<
1539 Sender: SubsystemSender<ChainApiMessage>
1540 + SubsystemSender<RuntimeApiMessage>
1541 + SubsystemSender<ChainSelectionMessage>
1542 + SubsystemSender<AvailabilityRecoveryMessage>
1543 + SubsystemSender<DisputeCoordinatorMessage>
1544 + SubsystemSender<CandidateValidationMessage>
1545 + Clone,
1546 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1547>(
1548 sender: &mut Sender,
1549 approval_voting_sender: &mut ADSender,
1550 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1551 state: &mut State,
1552 overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1553 session_info_provider: &mut RuntimeInfo,
1554 metrics: &Metrics,
1555 wakeups: &mut Wakeups,
1556 currently_checking_set: &mut CurrentlyCheckingSet,
1557 delayed_approvals_timers: &mut DelayedApprovalTimer,
1558 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1559 mode: &mut Mode,
1560 actions: Vec<Action>,
1561 max_approval_retries: u32,
1562 retry_backoff: Duration,
1563) -> SubsystemResult<bool> {
1564 let mut conclude = false;
1565 let mut actions_iter = actions.into_iter();
1566 while let Some(action) = actions_iter.next() {
1567 match action {
1568 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1569 wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1570 },
1571 Action::IssueApproval(candidate_hash, approval_request) => {
1572 let next_actions: Vec<Action> = issue_approval(
1584 sender,
1585 approval_voting_sender,
1586 state,
1587 overlayed_db,
1588 session_info_provider,
1589 metrics,
1590 candidate_hash,
1591 delayed_approvals_timers,
1592 approval_request,
1593 &wakeups,
1594 )
1595 .await?
1596 .into_iter()
1597 .map(|v| v.clone())
1598 .chain(actions_iter)
1599 .collect();
1600
1601 actions_iter = next_actions.into_iter();
1602 },
1603 Action::LaunchApproval {
1604 claimed_candidate_indices,
1605 candidate_hash,
1606 indirect_cert,
1607 assignment_tranche,
1608 relay_block_hash,
1609 session,
1610 executor_params,
1611 candidate,
1612 backing_group,
1613 distribute_assignment,
1614 core_index,
1615 } => {
1616 if let Mode::Syncing(_) = *mode {
1618 continue
1619 }
1620
1621 metrics.on_assignment_produced(assignment_tranche);
1622 let block_hash = indirect_cert.block_hash;
1623 let validator_index = indirect_cert.validator;
1624
1625 if distribute_assignment {
1626 approval_voting_sender.send_unbounded_message(
1627 ApprovalDistributionMessage::DistributeAssignment(
1628 indirect_cert,
1629 claimed_candidate_indices,
1630 ),
1631 );
1632 }
1633
1634 match approvals_cache.get(&candidate_hash) {
1635 Some(ApprovalOutcome::Approved) => {
1636 let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1637 candidate_hash,
1638 ApprovalVoteRequest { validator_index, block_hash },
1639 ))
1640 .map(|v| v.clone())
1641 .chain(actions_iter)
1642 .collect();
1643 actions_iter = new_actions.into_iter();
1644 },
1645 None => {
1646 let sender = sender.clone();
1647 let spawn_handle = spawn_handle.clone();
1648
1649 let retry = RetryApprovalInfo {
1650 candidate: candidate.clone(),
1651 backing_group,
1652 executor_params: executor_params.clone(),
1653 core_index,
1654 session_index: session,
1655 attempts_remaining: max_approval_retries,
1656 backoff: retry_backoff,
1657 };
1658
1659 currently_checking_set
1660 .insert_relay_block_hash(
1661 candidate_hash,
1662 validator_index,
1663 relay_block_hash,
1664 async move {
1665 launch_approval(
1666 sender,
1667 spawn_handle,
1668 metrics.clone(),
1669 session,
1670 candidate,
1671 validator_index,
1672 block_hash,
1673 backing_group,
1674 executor_params,
1675 core_index,
1676 retry,
1677 )
1678 .await
1679 },
1680 )
1681 .await?;
1682 },
1683 Some(_) => {},
1684 }
1685 },
1686 Action::NoteApprovedInChainSelection(block_hash) => {
1687 sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1688 },
1689 Action::BecomeActive => {
1690 *mode = Mode::Active;
1691
1692 let (messages, next_actions) = distribution_messages_for_activation(
1693 sender,
1694 overlayed_db,
1695 state,
1696 delayed_approvals_timers,
1697 session_info_provider,
1698 )
1699 .await?;
1700 for message in messages.into_iter() {
1701 approval_voting_sender.send_unbounded_message(message);
1702 }
1703 let next_actions: Vec<Action> =
1704 next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1705
1706 actions_iter = next_actions.into_iter();
1707 },
1708 Action::Conclude => {
1709 conclude = true;
1710 },
1711 }
1712 }
1713
1714 Ok(conclude)
1715}
1716
1717fn cores_to_candidate_indices(
1718 core_indices: &CoreBitfield,
1719 block_entry: &BlockEntry,
1720) -> Result<CandidateBitfield, BitfieldError> {
1721 let mut candidate_indices = Vec::new();
1722
1723 for claimed_core_index in core_indices.iter_ones() {
1725 if let Some(candidate_index) = block_entry
1726 .candidates()
1727 .iter()
1728 .position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1729 {
1730 candidate_indices.push(candidate_index as _);
1731 }
1732 }
1733
1734 CandidateBitfield::try_from(candidate_indices)
1735}
1736
1737fn get_core_indices_on_startup(
1740 assignment: &AssignmentCertKindV2,
1741 block_entry_core_index: CoreIndex,
1742) -> CoreBitfield {
1743 match &assignment {
1744 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1745 AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
1746 CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
1747 AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1748 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
1749 }
1750}
1751
1752fn get_assignment_core_indices(
1756 assignment: &AssignmentCertKindV2,
1757 candidate_hash: &CandidateHash,
1758 block_entry: &BlockEntry,
1759) -> Option<CoreBitfield> {
1760 match &assignment {
1761 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
1762 Some(core_bitfield.clone()),
1763 AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1764 .candidates()
1765 .iter()
1766 .find(|(_core_index, h)| candidate_hash == h)
1767 .map(|(core_index, _candidate_hash)| {
1768 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1769 }),
1770 AssignmentCertKindV2::RelayVRFDelay { core_index } =>
1771 Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")),
1772 }
1773}
1774
1775#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1776async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApiMessage>>(
1777 sender: &mut Sender,
1778 db: &OverlayedBackend<'_, impl Backend>,
1779 state: &State,
1780 delayed_approvals_timers: &mut DelayedApprovalTimer,
1781 session_info_provider: &mut RuntimeInfo,
1782) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1783 let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1784
1785 let mut approval_meta = Vec::with_capacity(all_blocks.len());
1786 let mut messages = Vec::new();
1787 let mut approvals = Vec::new();
1788 let mut actions = Vec::new();
1789
1790 messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); for block_hash in all_blocks {
1793 let block_entry = match db.load_block_entry(&block_hash)? {
1794 Some(b) => b,
1795 None => {
1796 gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1797
1798 continue
1799 },
1800 };
1801
1802 approval_meta.push(BlockApprovalMeta {
1803 hash: block_hash,
1804 number: block_entry.block_number(),
1805 parent_hash: block_entry.parent_hash(),
1806 candidates: block_entry
1807 .candidates()
1808 .iter()
1809 .map(|(core_index, c_hash)| {
1810 let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1811 let group_index = candidate
1812 .and_then(|entry| {
1813 entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1814 })
1815 .unwrap_or_else(|| {
1816 gum::warn!(
1817 target: LOG_TARGET,
1818 ?block_hash,
1819 ?c_hash,
1820 "Missing candidate entry or approval entry",
1821 );
1822 GroupIndex::default()
1823 });
1824 (*c_hash, *core_index, group_index)
1825 })
1826 .collect(),
1827 slot: block_entry.slot(),
1828 session: block_entry.session(),
1829 vrf_story: block_entry.relay_vrf_story(),
1830 });
1831 let mut signatures_queued = HashSet::new();
1832 for (core_index, candidate_hash) in block_entry.candidates() {
1833 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1834 Some(c) => c,
1835 None => {
1836 gum::warn!(
1837 target: LOG_TARGET,
1838 ?block_hash,
1839 ?candidate_hash,
1840 "Missing candidate entry",
1841 );
1842
1843 continue
1844 },
1845 };
1846
1847 match candidate_entry.approval_entry(&block_hash) {
1848 Some(approval_entry) => {
1849 match approval_entry.local_statements() {
1850 (None, None) =>
1851 if approval_entry
1852 .our_assignment()
1853 .map(|assignment| !assignment.triggered())
1854 .unwrap_or(false)
1855 {
1856 actions.push(Action::ScheduleWakeup {
1857 block_hash,
1858 block_number: block_entry.block_number(),
1859 candidate_hash: *candidate_hash,
1860 tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1861 })
1862 },
1863 (None, Some(_)) => {}, (Some(assignment), None) => {
1865 let claimed_core_indices =
1866 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1867
1868 if block_entry.has_candidates_pending_signature() {
1869 delayed_approvals_timers.maybe_arm_timer(
1870 state.clock.tick_now(),
1871 state.clock.as_ref(),
1872 block_entry.block_hash(),
1873 assignment.validator_index(),
1874 )
1875 }
1876
1877 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1878 Ok(bitfield) => {
1879 gum::debug!(
1880 target: LOG_TARGET,
1881 candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1882 ?block_hash,
1883 "Discovered, triggered assignment, not approved yet",
1884 );
1885
1886 let indirect_cert = IndirectAssignmentCertV2 {
1887 block_hash,
1888 validator: assignment.validator_index(),
1889 cert: assignment.cert().clone(),
1890 };
1891 messages.push(
1892 ApprovalDistributionMessage::DistributeAssignment(
1893 indirect_cert.clone(),
1894 bitfield.clone(),
1895 ),
1896 );
1897
1898 if !block_entry.candidate_is_pending_signature(*candidate_hash)
1899 {
1900 let ExtendedSessionInfo { ref executor_params, .. } =
1901 match get_extended_session_info(
1902 session_info_provider,
1903 sender,
1904 block_entry.block_hash(),
1905 block_entry.session(),
1906 )
1907 .await
1908 {
1909 Some(i) => i,
1910 None => continue,
1911 };
1912
1913 actions.push(Action::LaunchApproval {
1914 claimed_candidate_indices: bitfield,
1915 candidate_hash: candidate_entry
1916 .candidate_receipt()
1917 .hash(),
1918 indirect_cert,
1919 assignment_tranche: assignment.tranche(),
1920 relay_block_hash: block_hash,
1921 session: block_entry.session(),
1922 executor_params: executor_params.clone(),
1923 candidate: candidate_entry.candidate_receipt().clone(),
1924 backing_group: approval_entry.backing_group(),
1925 distribute_assignment: false,
1926 core_index: Some(*core_index),
1927 });
1928 }
1929 },
1930 Err(err) => {
1931 gum::warn!(
1934 target: LOG_TARGET,
1935 ?block_hash,
1936 ?candidate_hash,
1937 ?err,
1938 "Failed to create assignment bitfield",
1939 );
1940 },
1941 }
1942 },
1943 (Some(assignment), Some(approval_sig)) => {
1944 let claimed_core_indices =
1945 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1946 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1947 Ok(bitfield) => messages.push(
1948 ApprovalDistributionMessage::DistributeAssignment(
1949 IndirectAssignmentCertV2 {
1950 block_hash,
1951 validator: assignment.validator_index(),
1952 cert: assignment.cert().clone(),
1953 },
1954 bitfield,
1955 ),
1956 ),
1957 Err(err) => {
1958 gum::warn!(
1959 target: LOG_TARGET,
1960 ?block_hash,
1961 ?candidate_hash,
1962 ?err,
1963 "Failed to create assignment bitfield",
1964 );
1965 continue
1967 },
1968 }
1969 if signatures_queued
1970 .insert(approval_sig.signed_candidates_indices.clone())
1971 {
1972 approvals.push(ApprovalDistributionMessage::DistributeApproval(
1973 IndirectSignedApprovalVoteV2 {
1974 block_hash,
1975 candidate_indices: approval_sig.signed_candidates_indices,
1976 validator: assignment.validator_index(),
1977 signature: approval_sig.signature,
1978 },
1979 ))
1980 };
1981 },
1982 }
1983 },
1984 None => {
1985 gum::warn!(
1986 target: LOG_TARGET,
1987 ?block_hash,
1988 ?candidate_hash,
1989 "Missing approval entry",
1990 );
1991 },
1992 }
1993 }
1994 }
1995
1996 messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
1997 messages.extend(approvals.into_iter());
2001 Ok((messages, actions))
2002}
2003
2004async fn handle_from_overseer<
2006 Sender: SubsystemSender<ChainApiMessage>
2007 + SubsystemSender<RuntimeApiMessage>
2008 + SubsystemSender<ChainSelectionMessage>
2009 + Clone,
2010 ADSender: SubsystemSender<ApprovalDistributionMessage>,
2011>(
2012 sender: &mut Sender,
2013 approval_voting_sender: &mut ADSender,
2014 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2015 state: &mut State,
2016 db: &mut OverlayedBackend<'_, impl Backend>,
2017 session_info_provider: &mut RuntimeInfo,
2018 metrics: &Metrics,
2019 x: FromOrchestra<ApprovalVotingMessage>,
2020 last_finalized_height: &mut Option<BlockNumber>,
2021 wakeups: &mut Wakeups,
2022) -> SubsystemResult<Vec<Action>> {
2023 let actions = match x {
2024 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2025 let mut actions = Vec::new();
2026 if let Some(activated) = update.activated {
2027 let head = activated.hash;
2028 match import::handle_new_head(
2029 sender,
2030 approval_voting_sender,
2031 state,
2032 db,
2033 session_info_provider,
2034 head,
2035 last_finalized_height,
2036 )
2037 .await
2038 {
2039 Err(e) => return Err(SubsystemError::with_origin("db", e)),
2040 Ok(block_imported_candidates) => {
2041 for block_batch in block_imported_candidates {
2043 gum::debug!(
2044 target: LOG_TARGET,
2045 block_number = ?block_batch.block_number,
2046 block_hash = ?block_batch.block_hash,
2047 num_candidates = block_batch.imported_candidates.len(),
2048 "Imported new block.",
2049 );
2050
2051 state.no_show_stats.maybe_print(block_batch.block_number);
2052
2053 for (c_hash, c_entry) in block_batch.imported_candidates {
2054 metrics.on_candidate_imported();
2055
2056 let our_tranche = c_entry
2057 .approval_entry(&block_batch.block_hash)
2058 .and_then(|a| a.our_assignment().map(|a| a.tranche()));
2059
2060 if let Some(our_tranche) = our_tranche {
2061 let tick = our_tranche as Tick + block_batch.block_tick;
2062 gum::trace!(
2063 target: LOG_TARGET,
2064 tranche = our_tranche,
2065 candidate_hash = ?c_hash,
2066 block_hash = ?block_batch.block_hash,
2067 block_tick = block_batch.block_tick,
2068 "Scheduling first wakeup.",
2069 );
2070
2071 actions.push(Action::ScheduleWakeup {
2075 block_hash: block_batch.block_hash,
2076 block_number: block_batch.block_number,
2077 candidate_hash: c_hash,
2078 tick,
2079 });
2080 }
2081 }
2082 }
2083 },
2084 }
2085 }
2086
2087 actions
2088 },
2089 FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2090 gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2091 *last_finalized_height = Some(block_number);
2092
2093 crate::ops::canonicalize(db, block_number, block_hash)
2094 .map_err(|e| SubsystemError::with_origin("db", e))?;
2095
2096 wakeups.prune_finalized_wakeups(block_number);
2099 state.cleanup_assignments_gathering_timestamp(block_number);
2100
2101 Vec::new()
2107 },
2108 FromOrchestra::Signal(OverseerSignal::Conclude) => {
2109 vec![Action::Conclude]
2110 },
2111 FromOrchestra::Communication { msg } => match msg {
2112 ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2113 let (check_outcome, actions) =
2114 import_assignment(sender, state, db, session_info_provider, checked_assignment)
2115 .await?;
2116 if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2120 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2121 }
2122 let _ = tx.map(|tx| tx.send(check_outcome));
2123 actions
2124 },
2125 ApprovalVotingMessage::ImportApproval(a, tx) => {
2126 let result =
2127 import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2128 .await?;
2129 if let ApprovalCheckResult::Bad(ref err) = result.1 {
2133 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2134 }
2135 let _ = tx.map(|tx| tx.send(result.1));
2136
2137 result.0
2138 },
2139 ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2140 match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2141 .await
2142 {
2143 Ok(v) => {
2144 let _ = res.send(v);
2145 },
2146 Err(e) => {
2147 let _ = res.send(None);
2148 return Err(e)
2149 },
2150 }
2151
2152 Vec::new()
2153 },
2154 ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2155 metrics.on_candidate_signatures_request();
2156 get_approval_signatures_for_candidate(
2157 approval_voting_sender.clone(),
2158 spawn_handle,
2159 db,
2160 candidate_hash,
2161 tx,
2162 )
2163 .await?;
2164 Vec::new()
2165 },
2166 },
2167 };
2168
2169 Ok(actions)
2170}
2171
2172#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2177async fn get_approval_signatures_for_candidate<
2178 Sender: SubsystemSender<ApprovalDistributionMessage>,
2179>(
2180 mut sender: Sender,
2181 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2182 db: &OverlayedBackend<'_, impl Backend>,
2183 candidate_hash: CandidateHash,
2184 tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2185) -> SubsystemResult<()> {
2186 let send_votes = |votes| {
2187 if let Err(_) = tx.send(votes) {
2188 gum::debug!(
2189 target: LOG_TARGET,
2190 "Sending approval signatures back failed, as receiver got closed."
2191 );
2192 }
2193 };
2194 let entry = match db.load_candidate_entry(&candidate_hash)? {
2195 None => {
2196 send_votes(HashMap::new());
2197 gum::debug!(
2198 target: LOG_TARGET,
2199 ?candidate_hash,
2200 "Sent back empty votes because the candidate was not found in db."
2201 );
2202 return Ok(())
2203 },
2204 Some(e) => e,
2205 };
2206
2207 let relay_hashes = entry.block_assignments.keys();
2208
2209 let mut candidate_indices = HashSet::new();
2210 let mut candidate_indices_to_candidate_hashes: HashMap<
2211 Hash,
2212 HashMap<CandidateIndex, CandidateHash>,
2213 > = HashMap::new();
2214
2215 for hash in relay_hashes {
2217 let entry = match db.load_block_entry(hash)? {
2218 None => {
2219 gum::debug!(
2220 target: LOG_TARGET,
2221 ?candidate_hash,
2222 ?hash,
2223 "Block entry for assignment missing."
2224 );
2225 continue
2226 },
2227 Some(e) => e,
2228 };
2229 for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2230 if c_hash == &candidate_hash {
2231 candidate_indices.insert((*hash, candidate_index as u32));
2232 }
2233 candidate_indices_to_candidate_hashes
2234 .entry(*hash)
2235 .or_default()
2236 .insert(candidate_index as _, *c_hash);
2237 }
2238 }
2239
2240 let get_approvals = async move {
2241 let (tx_distribution, rx_distribution) = oneshot::channel();
2242 sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2243 candidate_indices,
2244 tx_distribution,
2245 ));
2246
2247 match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2250 None => {
2251 gum::warn!(
2252 target: LOG_TARGET,
2253 "Waiting for approval signatures timed out - dead lock?"
2254 );
2255 },
2256 Some(Err(_)) => gum::debug!(
2257 target: LOG_TARGET,
2258 "Request for approval signatures got cancelled by `approval-distribution`."
2259 ),
2260 Some(Ok(votes)) => {
2261 let votes = votes
2262 .into_iter()
2263 .filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2264 let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2265
2266 if candidates_hashes.is_none() {
2267 gum::warn!(
2268 target: LOG_TARGET,
2269 ?hash,
2270 "Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2271 );
2272 }
2273
2274 let num_signed_candidates = signed_candidates_indices.len();
2275
2276 let signed_candidates_hashes: Vec<CandidateHash> =
2277 signed_candidates_indices
2278 .into_iter()
2279 .filter_map(|candidate_index| {
2280 candidates_hashes.and_then(|candidate_hashes| {
2281 if let Some(candidate_hash) =
2282 candidate_hashes.get(&candidate_index)
2283 {
2284 Some(*candidate_hash)
2285 } else {
2286 gum::warn!(
2287 target: LOG_TARGET,
2288 ?candidate_index,
2289 "Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2290 );
2291 None
2292 }
2293 })
2294 })
2295 .collect();
2296 if num_signed_candidates == signed_candidates_hashes.len() {
2297 Some((validator_index, (signed_candidates_hashes, signature)))
2298 } else {
2299 gum::warn!(
2300 target: LOG_TARGET,
2301 "Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2302 );
2303 None
2304 }
2305 })
2306 .collect();
2307 send_votes(votes)
2308 },
2309 }
2310 };
2311
2312 gum::trace!(
2315 target: LOG_TARGET,
2316 ?candidate_hash,
2317 "Spawning task for fetching signatures from approval-distribution"
2318 );
2319 spawn_handle.spawn(
2320 "get-approval-signatures",
2321 Some("approval-voting-subsystem"),
2322 Box::pin(get_approvals),
2323 );
2324 Ok(())
2325}
2326
2327#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2328async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2329 sender: &mut Sender,
2330 db: &OverlayedBackend<'_, impl Backend>,
2331 target: Hash,
2332 lower_bound: BlockNumber,
2333 wakeups: &Wakeups,
2334 metrics: &Metrics,
2335) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2336 const MAX_TRACING_WINDOW: usize = 200;
2337 const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2338 const LOGGING_DEPTH_THRESHOLD: usize = 10;
2339
2340 let mut all_approved_max = None;
2341
2342 let target_number = {
2343 let (tx, rx) = oneshot::channel();
2344
2345 sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2346
2347 match rx.await {
2348 Ok(Ok(Some(n))) => n,
2349 Ok(Ok(None)) => return Ok(None),
2350 Ok(Err(_)) | Err(_) => return Ok(None),
2351 }
2352 };
2353
2354 if target_number <= lower_bound {
2355 return Ok(None)
2356 }
2357
2358 let ancestry = if target_number > lower_bound + 1 {
2362 let (tx, rx) = oneshot::channel();
2363
2364 sender
2365 .send_message(ChainApiMessage::Ancestors {
2366 hash: target,
2367 k: (target_number - (lower_bound + 1)) as usize,
2368 response_channel: tx,
2369 })
2370 .await;
2371
2372 match rx.await {
2373 Ok(Ok(a)) => a,
2374 Err(_) | Ok(Err(_)) => return Ok(None),
2375 }
2376 } else {
2377 Vec::new()
2378 };
2379 let ancestry_len = ancestry.len();
2380
2381 let mut block_descriptions = Vec::new();
2382
2383 let mut bits: BitVec<u8, Lsb0> = Default::default();
2384 for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2385 let entry = match db.load_block_entry(&block_hash)? {
2389 None => {
2390 let block_number = target_number.saturating_sub(i as u32);
2391 gum::info!(
2392 target: LOG_TARGET,
2393 unknown_number = ?block_number,
2394 unknown_hash = ?block_hash,
2395 "Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2396 target,
2397 target_number,
2398 lower_bound,
2399 lower_bound,
2400 );
2401 return Ok(None)
2402 },
2403 Some(b) => b,
2404 };
2405
2406 bits.push(entry.is_fully_approved());
2409 if entry.is_fully_approved() {
2410 if all_approved_max.is_none() {
2411 all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2414 }
2415 block_descriptions.push(BlockDescription {
2416 block_hash,
2417 session: entry.session(),
2418 candidates: entry
2419 .candidates()
2420 .iter()
2421 .map(|(_idx, candidate_hash)| *candidate_hash)
2422 .collect(),
2423 });
2424 } else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2425 all_approved_max = None;
2426 block_descriptions.clear();
2427 } else {
2428 all_approved_max = None;
2429 block_descriptions.clear();
2430
2431 let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2432 gum::debug!(
2433 target: LOG_TARGET,
2434 "Block {} is {} blocks deep and has {}/{} candidates unapproved",
2435 block_hash,
2436 bits.len() - 1,
2437 unapproved.len(),
2438 entry.candidates().len(),
2439 );
2440 if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2441 {
2442 gum::trace!(
2443 target: LOG_TARGET,
2444 ?block_hash,
2445 "Unapproved candidates at depth {}: {:?}",
2446 bits.len(),
2447 unapproved
2448 )
2449 }
2450 metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2451 for candidate_hash in unapproved {
2452 match db.load_candidate_entry(&candidate_hash)? {
2453 None => {
2454 gum::warn!(
2455 target: LOG_TARGET,
2456 ?candidate_hash,
2457 "Missing expected candidate in DB",
2458 );
2459
2460 continue
2461 },
2462 Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2463 None => {
2464 gum::warn!(
2465 target: LOG_TARGET,
2466 ?candidate_hash,
2467 ?block_hash,
2468 "Missing expected approval entry under candidate.",
2469 );
2470 },
2471 Some(a_entry) => {
2472 let status = || {
2473 let n_assignments = a_entry.n_assignments();
2474
2475 let n_approvals = c_entry
2478 .approvals()
2479 .iter()
2480 .by_vals()
2481 .enumerate()
2482 .filter(|(i, approved)| {
2483 *approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2484 })
2485 .count();
2486
2487 format!(
2488 "{}/{}/{}",
2489 n_assignments,
2490 n_approvals,
2491 a_entry.n_validators(),
2492 )
2493 };
2494
2495 match a_entry.our_assignment() {
2496 None => gum::debug!(
2497 target: LOG_TARGET,
2498 ?candidate_hash,
2499 ?block_hash,
2500 status = %status(),
2501 "no assignment."
2502 ),
2503 Some(a) => {
2504 let tranche = a.tranche();
2505 let triggered = a.triggered();
2506
2507 let next_wakeup =
2508 wakeups.wakeup_for(block_hash, candidate_hash);
2509
2510 let approved =
2511 triggered && { a_entry.local_statements().1.is_some() };
2512
2513 gum::debug!(
2514 target: LOG_TARGET,
2515 ?candidate_hash,
2516 ?block_hash,
2517 tranche,
2518 ?next_wakeup,
2519 status = %status(),
2520 triggered,
2521 approved,
2522 "assigned."
2523 );
2524 },
2525 }
2526 },
2527 },
2528 }
2529 }
2530 }
2531 }
2532
2533 gum::debug!(
2534 target: LOG_TARGET,
2535 "approved blocks {}-[{}]-{}",
2536 target_number,
2537 {
2538 let mut s = String::with_capacity(bits.len());
2542 for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2543 s.push(if *bit { '1' } else { '0' });
2544 if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 {
2545 s.push(' ');
2546 }
2547 }
2548
2549 s
2550 },
2551 if bits.len() > MAX_TRACING_WINDOW {
2552 format!(
2553 "{}... (truncated due to large window)",
2554 target_number - MAX_TRACING_WINDOW as u32 + 1,
2555 )
2556 } else {
2557 format!("{}", lower_bound + 1)
2558 },
2559 );
2560
2561 block_descriptions.reverse();
2564
2565 let all_approved_max =
2566 all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2567 hash,
2568 number: block_number,
2569 descriptions: block_descriptions,
2570 });
2571
2572 Ok(all_approved_max)
2573}
2574
2575fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2577 match (a, b) {
2578 (None, None) => None,
2579 (None, Some(x)) | (Some(x), None) => Some(x),
2580 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2581 }
2582}
2583
2584fn schedule_wakeup_action(
2585 approval_entry: &ApprovalEntry,
2586 block_hash: Hash,
2587 block_number: BlockNumber,
2588 candidate_hash: CandidateHash,
2589 block_tick: Tick,
2590 tick_now: Tick,
2591 required_tranches: RequiredTranches,
2592) -> Option<Action> {
2593 let maybe_action = match required_tranches {
2594 _ if approval_entry.is_approved() => None,
2595 RequiredTranches::All => None,
2596 RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2597 min_prefer_some(
2600 last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2601 next_no_show,
2602 )
2603 .map(|tick| Action::ScheduleWakeup {
2604 block_hash,
2605 block_number,
2606 candidate_hash,
2607 tick,
2608 })
2609 },
2610 RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2611 let next_non_empty_tranche = {
2615 let next_announced = approval_entry
2616 .tranches()
2617 .iter()
2618 .skip_while(|t| t.tranche() <= considered)
2619 .map(|t| t.tranche())
2620 .next();
2621
2622 let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2623 if !t.triggered() && t.tranche() > considered {
2624 Some(t.tranche())
2625 } else {
2626 None
2627 }
2628 });
2629
2630 min_prefer_some(next_announced, our_untriggered)
2632 .map(|t| t as Tick + block_tick + clock_drift)
2633 };
2634
2635 min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2636 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2637 })
2638 },
2639 };
2640
2641 match maybe_action {
2642 Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2643 target: LOG_TARGET,
2644 tick,
2645 ?candidate_hash,
2646 ?block_hash,
2647 block_tick,
2648 "Scheduling next wakeup.",
2649 ),
2650 None => gum::trace!(
2651 target: LOG_TARGET,
2652 ?candidate_hash,
2653 ?block_hash,
2654 block_tick,
2655 "No wakeup needed.",
2656 ),
2657 Some(_) => {}, }
2659
2660 maybe_action
2661}
2662
2663async fn import_assignment<Sender>(
2664 sender: &mut Sender,
2665 state: &State,
2666 db: &mut OverlayedBackend<'_, impl Backend>,
2667 session_info_provider: &mut RuntimeInfo,
2668 checked_assignment: CheckedIndirectAssignment,
2669) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2670where
2671 Sender: SubsystemSender<RuntimeApiMessage>,
2672{
2673 let tick_now = state.clock.tick_now();
2674 let assignment = checked_assignment.assignment();
2675 let candidate_indices = checked_assignment.candidate_indices();
2676 let tranche = checked_assignment.tranche();
2677
2678 let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2679 Some(b) => b,
2680 None =>
2681 return Ok((
2682 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2683 assignment.block_hash,
2684 )),
2685 Vec::new(),
2686 )),
2687 };
2688
2689 let session_info = match get_session_info(
2690 session_info_provider,
2691 sender,
2692 block_entry.parent_hash(),
2693 block_entry.session(),
2694 )
2695 .await
2696 {
2697 Some(s) => s,
2698 None =>
2699 return Ok((
2700 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2701 block_entry.session(),
2702 )),
2703 Vec::new(),
2704 )),
2705 };
2706
2707 let n_cores = session_info.n_cores as usize;
2708
2709 if candidate_indices.len() > n_cores {
2712 gum::debug!(
2713 target: LOG_TARGET,
2714 validator = assignment.validator.0,
2715 n_cores,
2716 candidate_bitfield_len = ?candidate_indices.len(),
2717 "Oversized bitfield",
2718 );
2719
2720 return Ok((
2721 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2722 candidate_indices.len(),
2723 )),
2724 Vec::new(),
2725 ))
2726 }
2727
2728 let mut claimed_core_indices = Vec::new();
2729 let mut assigned_candidate_hashes = Vec::new();
2730
2731 for candidate_index in candidate_indices.iter_ones() {
2732 let (claimed_core_index, assigned_candidate_hash) =
2733 match block_entry.candidate(candidate_index) {
2734 Some((c, h)) => (*c, *h),
2735 None =>
2736 return Ok((
2737 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2738 candidate_index as _,
2739 )),
2740 Vec::new(),
2741 )), };
2743
2744 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2745 Some(c) => c,
2746 None =>
2747 return Ok((
2748 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2749 candidate_index as _,
2750 assigned_candidate_hash,
2751 )),
2752 Vec::new(),
2753 )), };
2755
2756 if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2757 return Ok((
2758 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2759 assignment.block_hash,
2760 assigned_candidate_hash,
2761 )),
2762 Vec::new(),
2763 ));
2764 };
2765
2766 claimed_core_indices.push(claimed_core_index);
2767 assigned_candidate_hashes.push(assigned_candidate_hash);
2768 }
2769
2770 if claimed_core_indices.is_empty() {
2772 return Ok((
2773 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2774 assignment.validator,
2775 format!("{:?}", InvalidAssignmentReason::NullAssignment),
2776 )),
2777 Vec::new(),
2778 ))
2779 }
2780
2781 let mut actions = Vec::new();
2782 let res = {
2783 let mut is_duplicate = true;
2784 for (assigned_candidate_hash, candidate_index) in
2786 assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2787 {
2788 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2789 Some(c) => c,
2790 None =>
2791 return Ok((
2792 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2793 candidate_index as _,
2794 *assigned_candidate_hash,
2795 )),
2796 Vec::new(),
2797 )),
2798 };
2799
2800 let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2801 Some(a) => a,
2802 None =>
2803 return Ok((
2804 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2805 assignment.block_hash,
2806 *assigned_candidate_hash,
2807 )),
2808 Vec::new(),
2809 )),
2810 };
2811
2812 let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2813 is_duplicate &= is_duplicate_for_candidate;
2814 approval_entry.import_assignment(
2815 tranche,
2816 assignment.validator,
2817 tick_now,
2818 is_duplicate_for_candidate,
2819 );
2820
2821 if let Some((approval_entry, status)) = state
2824 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2825 .await
2826 {
2827 actions.extend(schedule_wakeup_action(
2828 approval_entry,
2829 block_entry.block_hash(),
2830 block_entry.block_number(),
2831 *assigned_candidate_hash,
2832 status.block_tick,
2833 tick_now,
2834 status.required_tranches,
2835 ));
2836 }
2837
2838 db.write_candidate_entry(candidate_entry.into());
2840 }
2841
2842 if is_duplicate {
2848 AssignmentCheckResult::AcceptedDuplicate
2849 } else if candidate_indices.count_ones() > 1 {
2850 gum::trace!(
2851 target: LOG_TARGET,
2852 validator = assignment.validator.0,
2853 candidate_hashes = ?assigned_candidate_hashes,
2854 assigned_cores = ?claimed_core_indices,
2855 ?tranche,
2856 "Imported assignments for multiple cores.",
2857 );
2858
2859 AssignmentCheckResult::Accepted
2860 } else {
2861 gum::trace!(
2862 target: LOG_TARGET,
2863 validator = assignment.validator.0,
2864 candidate_hashes = ?assigned_candidate_hashes,
2865 assigned_cores = ?claimed_core_indices,
2866 "Imported assignment for a single core.",
2867 );
2868
2869 AssignmentCheckResult::Accepted
2870 }
2871 };
2872
2873 Ok((res, actions))
2874}
2875
2876async fn import_approval<Sender>(
2877 sender: &mut Sender,
2878 state: &mut State,
2879 db: &mut OverlayedBackend<'_, impl Backend>,
2880 session_info_provider: &mut RuntimeInfo,
2881 metrics: &Metrics,
2882 approval: CheckedIndirectSignedApprovalVote,
2883 wakeups: &Wakeups,
2884) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2885where
2886 Sender: SubsystemSender<RuntimeApiMessage>,
2887{
2888 macro_rules! respond_early {
2889 ($e: expr) => {{
2890 return Ok((Vec::new(), $e))
2891 }};
2892 }
2893
2894 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2895 Some(b) => b,
2896 None => {
2897 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2898 approval.block_hash
2899 ),))
2900 },
2901 };
2902
2903 let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2904 approval
2905 .candidate_indices
2906 .iter_ones()
2907 .map(|candidate_index| {
2908 block_entry
2909 .candidate(candidate_index)
2910 .ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2911 .map(|candidate| (candidate_index as _, candidate.1))
2912 })
2913 .collect();
2914
2915 let approved_candidates_info = match approved_candidates_info {
2916 Ok(approved_candidates_info) => approved_candidates_info,
2917 Err(err) => {
2918 respond_early!(ApprovalCheckResult::Bad(err))
2919 },
2920 };
2921
2922 gum::trace!(
2923 target: LOG_TARGET,
2924 "Received approval for num_candidates {:}",
2925 approval.candidate_indices.count_ones()
2926 );
2927
2928 let mut actions = Vec::new();
2929 for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2930 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2931 Some(b) => b,
2932 None => {
2933 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2934 approval.block_hash
2935 ),))
2936 },
2937 };
2938
2939 let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2940 Some(c) => c,
2941 None => {
2942 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2943 approval_candidate_index,
2944 approved_candidate_hash
2945 ),))
2946 },
2947 };
2948
2949 match candidate_entry.approval_entry(&approval.block_hash) {
2951 None => {
2952 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2953 approval.block_hash,
2954 approved_candidate_hash
2955 ),))
2956 },
2957 Some(e) if !e.is_assigned(approval.validator) => {
2958 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2959 approval.validator
2960 ),))
2961 },
2962 _ => {},
2963 }
2964
2965 gum::trace!(
2966 target: LOG_TARGET,
2967 validator_index = approval.validator.0,
2968 candidate_hash = ?approved_candidate_hash,
2969 para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2970 "Importing approval vote",
2971 );
2972
2973 let new_actions = advance_approval_state(
2974 sender,
2975 state,
2976 db,
2977 session_info_provider,
2978 &metrics,
2979 block_entry,
2980 approved_candidate_hash,
2981 candidate_entry,
2982 ApprovalStateTransition::RemoteApproval(approval.validator),
2983 wakeups,
2984 )
2985 .await;
2986 actions.extend(new_actions);
2987 }
2988
2989 Ok((actions, ApprovalCheckResult::Accepted))
2991}
2992
2993#[derive(Debug)]
2994enum ApprovalStateTransition {
2995 RemoteApproval(ValidatorIndex),
2996 LocalApproval(ValidatorIndex),
2997 WakeupProcessed,
2998}
2999
3000impl ApprovalStateTransition {
3001 fn validator_index(&self) -> Option<ValidatorIndex> {
3002 match *self {
3003 ApprovalStateTransition::RemoteApproval(v) |
3004 ApprovalStateTransition::LocalApproval(v) => Some(v),
3005 ApprovalStateTransition::WakeupProcessed => None,
3006 }
3007 }
3008
3009 fn is_local_approval(&self) -> bool {
3010 match *self {
3011 ApprovalStateTransition::RemoteApproval(_) => false,
3012 ApprovalStateTransition::LocalApproval(_) => true,
3013 ApprovalStateTransition::WakeupProcessed => false,
3014 }
3015 }
3016
3017 fn is_remote_approval(&self) -> bool {
3018 matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3019 }
3020}
3021
3022async fn advance_approval_state<Sender>(
3027 sender: &mut Sender,
3028 state: &mut State,
3029 db: &mut OverlayedBackend<'_, impl Backend>,
3030 session_info_provider: &mut RuntimeInfo,
3031 metrics: &Metrics,
3032 mut block_entry: BlockEntry,
3033 candidate_hash: CandidateHash,
3034 mut candidate_entry: CandidateEntry,
3035 transition: ApprovalStateTransition,
3036 wakeups: &Wakeups,
3037) -> Vec<Action>
3038where
3039 Sender: SubsystemSender<RuntimeApiMessage>,
3040{
3041 let validator_index = transition.validator_index();
3042
3043 let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3044 let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3045
3046 if !transition.is_local_approval() {
3056 if candidate_approved_in_block {
3060 return Vec::new()
3061 }
3062 }
3063
3064 let mut actions = Vec::new();
3065 let block_hash = block_entry.block_hash();
3066 let block_number = block_entry.block_number();
3067 let session_index = block_entry.session();
3068 let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3069 let tick_now = state.clock.tick_now();
3070
3071 let (is_approved, status) = if let Some((approval_entry, status)) = state
3072 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3073 .await
3074 {
3075 let check = approval_checking::check_approval(
3076 &candidate_entry,
3077 approval_entry,
3078 status.required_tranches.clone(),
3079 );
3080 state.observe_assignment_gathering_status(
3081 &metrics,
3082 &status.required_tranches,
3083 block_hash,
3084 block_entry.block_number(),
3085 candidate_hash,
3086 );
3087
3088 let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3092 if status.last_no_shows != 0 {
3093 metrics.on_observed_no_shows(status.last_no_shows);
3094 gum::trace!(
3095 target: LOG_TARGET,
3096 ?candidate_hash,
3097 ?block_hash,
3098 last_no_shows = ?status.last_no_shows,
3099 "Observed no_shows",
3100 );
3101 }
3102 if is_approved {
3103 gum::trace!(
3104 target: LOG_TARGET,
3105 ?candidate_hash,
3106 ?block_hash,
3107 "Candidate approved under block.",
3108 );
3109
3110 let no_shows = check.known_no_shows();
3111
3112 let was_block_approved = block_entry.is_fully_approved();
3113 block_entry.mark_approved_by_hash(&candidate_hash);
3114 let is_block_approved = block_entry.is_fully_approved();
3115
3116 if no_shows != 0 {
3117 metrics.on_no_shows(no_shows);
3118 }
3119 if check == Check::ApprovedOneThird {
3120 metrics.on_approved_by_one_third()
3124 }
3125
3126 metrics.on_candidate_approved(status.tranche_now as _);
3127
3128 if is_block_approved && !was_block_approved {
3129 metrics.on_block_approved(status.tranche_now as _);
3130 actions.push(Action::NoteApprovedInChainSelection(block_hash));
3131 }
3132
3133 db.write_block_entry(block_entry.into());
3134 } else if transition.is_local_approval() {
3135 db.write_block_entry(block_entry.into());
3138 }
3139
3140 (is_approved, status)
3141 } else {
3142 gum::warn!(
3143 target: LOG_TARGET,
3144 ?candidate_hash,
3145 ?block_hash,
3146 ?validator_index,
3147 "No approval entry for approval under block",
3148 );
3149
3150 return Vec::new()
3151 };
3152
3153 {
3154 let approval_entry = candidate_entry
3155 .approval_entry_mut(&block_hash)
3156 .expect("Approval entry just fetched; qed");
3157
3158 let was_approved = approval_entry.is_approved();
3159 let newly_approved = is_approved && !was_approved;
3160
3161 if is_approved {
3162 approval_entry.mark_approved();
3163 }
3164 if newly_approved {
3165 state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3166 }
3167 actions.extend(schedule_wakeup_action(
3168 &approval_entry,
3169 block_hash,
3170 block_number,
3171 candidate_hash,
3172 status.block_tick,
3173 tick_now,
3174 status.required_tranches,
3175 ));
3176
3177 if is_approved && transition.is_remote_approval() {
3178 for (fork_block_hash, fork_approval_entry) in candidate_entry
3181 .block_assignments
3182 .iter()
3183 .filter(|(hash, _)| **hash != block_hash)
3184 {
3185 let assigned_on_fork_block = validator_index
3186 .as_ref()
3187 .map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3188 .unwrap_or_default();
3189 if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3190 !fork_approval_entry.is_approved() &&
3191 assigned_on_fork_block
3192 {
3193 let fork_block_entry = db.load_block_entry(fork_block_hash);
3194 if let Ok(Some(fork_block_entry)) = fork_block_entry {
3195 actions.push(Action::ScheduleWakeup {
3196 block_hash: *fork_block_hash,
3197 block_number: fork_block_entry.block_number(),
3198 candidate_hash,
3199 tick: tick_now + 1,
3202 })
3203 } else {
3204 gum::debug!(
3205 target: LOG_TARGET,
3206 ?fork_block_entry,
3207 ?fork_block_hash,
3208 "Failed to load block entry"
3209 )
3210 }
3211 }
3212 }
3213 }
3214 if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3223 {
3224 db.write_candidate_entry(candidate_entry);
3226 }
3227 }
3228
3229 actions
3230}
3231
3232fn should_trigger_assignment(
3233 approval_entry: &ApprovalEntry,
3234 candidate_entry: &CandidateEntry,
3235 required_tranches: RequiredTranches,
3236 tranche_now: DelayTranche,
3237) -> bool {
3238 match approval_entry.our_assignment() {
3239 None => false,
3240 Some(ref assignment) if assignment.triggered() => false,
3241 Some(ref assignment) if assignment.tranche() == 0 => true,
3242 Some(ref assignment) => {
3243 match required_tranches {
3244 RequiredTranches::All => !approval_checking::check_approval(
3245 &candidate_entry,
3246 &approval_entry,
3247 RequiredTranches::All,
3248 )
3249 .is_approved(Tick::max_value()),
3251 RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3252 let drifted_tranche_now =
3253 tranche_now.saturating_sub(clock_drift as DelayTranche);
3254 assignment.tranche() <= maximum_broadcast &&
3255 assignment.tranche() <= drifted_tranche_now
3256 },
3257 RequiredTranches::Exact { .. } => {
3258 false
3260 },
3261 }
3262 },
3263 }
3264}
3265
3266async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3267 sender: &mut Sender,
3268 state: &mut State,
3269 db: &mut OverlayedBackend<'_, impl Backend>,
3270 session_info_provider: &mut RuntimeInfo,
3271 relay_block: Hash,
3272 candidate_hash: CandidateHash,
3273 metrics: &Metrics,
3274 wakeups: &Wakeups,
3275) -> SubsystemResult<Vec<Action>> {
3276 let block_entry = db.load_block_entry(&relay_block)?;
3277 let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3278
3279 let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3281 (Some(b), Some(c)) => (b, c),
3282 _ => return Ok(Vec::new()),
3283 };
3284
3285 let ExtendedSessionInfo { ref session_info, ref executor_params, .. } =
3286 match get_extended_session_info(
3287 session_info_provider,
3288 sender,
3289 block_entry.block_hash(),
3290 block_entry.session(),
3291 )
3292 .await
3293 {
3294 Some(i) => i,
3295 None => return Ok(Vec::new()),
3296 };
3297
3298 let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3299 let no_show_duration = slot_number_to_tick(
3300 state.slot_duration_millis,
3301 Slot::from(u64::from(session_info.no_show_slots)),
3302 );
3303 let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3304
3305 gum::trace!(
3306 target: LOG_TARGET,
3307 tranche = tranche_now,
3308 ?candidate_hash,
3309 block_hash = ?relay_block,
3310 "Processing wakeup",
3311 );
3312
3313 let (should_trigger, backing_group) = {
3314 let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3315 Some(e) => e,
3316 None => return Ok(Vec::new()),
3317 };
3318
3319 let tranches_to_approve = approval_checking::tranches_to_approve(
3320 &approval_entry,
3321 candidate_entry.approvals(),
3322 tranche_now,
3323 block_tick,
3324 no_show_duration,
3325 session_info.needed_approvals as _,
3326 );
3327
3328 let should_trigger = should_trigger_assignment(
3329 &approval_entry,
3330 &candidate_entry,
3331 tranches_to_approve.required_tranches,
3332 tranche_now,
3333 );
3334
3335 (should_trigger, approval_entry.backing_group())
3336 };
3337
3338 gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3339
3340 let mut actions = Vec::new();
3341 let candidate_receipt = candidate_entry.candidate_receipt().clone();
3342
3343 let maybe_cert = if should_trigger {
3344 let maybe_cert = {
3345 let approval_entry = candidate_entry
3346 .approval_entry_mut(&relay_block)
3347 .expect("should_trigger only true if this fetched earlier; qed");
3348
3349 approval_entry.trigger_our_assignment(state.clock.tick_now())
3350 };
3351
3352 db.write_candidate_entry(candidate_entry.clone());
3353
3354 maybe_cert
3355 } else {
3356 None
3357 };
3358
3359 if let Some((cert, val_index, tranche)) = maybe_cert {
3360 let indirect_cert =
3361 IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3362
3363 gum::trace!(
3364 target: LOG_TARGET,
3365 ?candidate_hash,
3366 para_id = ?candidate_receipt.descriptor.para_id(),
3367 block_hash = ?relay_block,
3368 "Launching approval work.",
3369 );
3370
3371 let candidate_core_index = block_entry
3372 .candidates()
3373 .iter()
3374 .find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3375
3376 if let Some(claimed_core_indices) =
3377 get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3378 {
3379 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3380 Ok(claimed_candidate_indices) => {
3381 let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3383 !block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3384 } else {
3385 true
3386 };
3387 db.write_block_entry(block_entry.clone());
3388 actions.push(Action::LaunchApproval {
3389 claimed_candidate_indices,
3390 candidate_hash,
3391 indirect_cert,
3392 assignment_tranche: tranche,
3393 relay_block_hash: relay_block,
3394 session: block_entry.session(),
3395 executor_params: executor_params.clone(),
3396 candidate: candidate_receipt,
3397 backing_group,
3398 distribute_assignment,
3399 core_index: candidate_core_index,
3400 });
3401 },
3402 Err(err) => {
3403 gum::warn!(
3406 target: LOG_TARGET,
3407 block_hash = ?relay_block,
3408 ?err,
3409 "Failed to create assignment bitfield"
3410 );
3411 },
3412 };
3413 } else {
3414 gum::warn!(
3415 target: LOG_TARGET,
3416 block_hash = ?relay_block,
3417 ?candidate_hash,
3418 "Cannot get assignment claimed core indices",
3419 );
3420 }
3421 }
3422 actions.extend(
3429 advance_approval_state(
3430 sender,
3431 state,
3432 db,
3433 session_info_provider,
3434 metrics,
3435 block_entry,
3436 candidate_hash,
3437 candidate_entry,
3438 ApprovalStateTransition::WakeupProcessed,
3439 wakeups,
3440 )
3441 .await,
3442 );
3443
3444 Ok(actions)
3445}
3446
3447#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3451async fn launch_approval<
3452 Sender: SubsystemSender<RuntimeApiMessage>
3453 + SubsystemSender<AvailabilityRecoveryMessage>
3454 + SubsystemSender<DisputeCoordinatorMessage>
3455 + SubsystemSender<CandidateValidationMessage>,
3456>(
3457 mut sender: Sender,
3458 spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3459 metrics: Metrics,
3460 session_index: SessionIndex,
3461 candidate: CandidateReceipt,
3462 validator_index: ValidatorIndex,
3463 block_hash: Hash,
3464 backing_group: GroupIndex,
3465 executor_params: ExecutorParams,
3466 core_index: Option<CoreIndex>,
3467 retry: RetryApprovalInfo,
3468) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3469 let (a_tx, a_rx) = oneshot::channel();
3470 let (code_tx, code_rx) = oneshot::channel();
3471
3472 struct StaleGuard(Option<Metrics>);
3476
3477 impl StaleGuard {
3478 fn take(mut self) -> Metrics {
3479 self.0.take().expect(
3480 "
3481 consumed after take; so this cannot be called twice; \
3482 nothing in this function reaches into the struct to avoid this API; \
3483 qed
3484 ",
3485 )
3486 }
3487 }
3488
3489 impl Drop for StaleGuard {
3490 fn drop(&mut self) {
3491 if let Some(metrics) = self.0.as_ref() {
3492 metrics.on_approval_stale();
3493 }
3494 }
3495 }
3496
3497 let candidate_hash = candidate.hash();
3498 let para_id = candidate.descriptor.para_id();
3499 let mut next_retry = None;
3500 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3501
3502 let timer = metrics.time_recover_and_approve();
3503 sender
3504 .send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3505 candidate.clone(),
3506 session_index,
3507 Some(backing_group),
3508 core_index,
3509 a_tx,
3510 ))
3511 .await;
3512
3513 sender
3514 .send_message(RuntimeApiMessage::Request(
3515 block_hash,
3516 RuntimeApiRequest::ValidationCodeByHash(
3517 candidate.descriptor.validation_code_hash(),
3518 code_tx,
3519 ),
3520 ))
3521 .await;
3522
3523 let candidate = candidate.clone();
3524 let metrics_guard = StaleGuard(Some(metrics));
3525 let background = async move {
3526 let _timer = timer;
3528 let available_data = match a_rx.await {
3529 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3530 Ok(Ok(a)) => a,
3531 Ok(Err(e)) => {
3532 match &e {
3533 &RecoveryError::Unavailable => {
3534 gum::warn!(
3535 target: LOG_TARGET,
3536 ?para_id,
3537 ?candidate_hash,
3538 attempts_remaining = retry.attempts_remaining,
3539 "Data unavailable for candidate {:?}",
3540 (candidate_hash, candidate.descriptor.para_id()),
3541 );
3542 if retry.attempts_remaining > 0 {
3546 Delay::new(retry.backoff).await;
3547 next_retry = Some(RetryApprovalInfo {
3548 candidate,
3549 backing_group,
3550 executor_params,
3551 core_index,
3552 session_index,
3553 attempts_remaining: retry.attempts_remaining - 1,
3554 backoff: retry.backoff,
3555 });
3556 } else {
3557 next_retry = None;
3558 }
3559 metrics_guard.take().on_approval_unavailable();
3560 },
3561 &RecoveryError::ChannelClosed => {
3562 gum::warn!(
3563 target: LOG_TARGET,
3564 ?para_id,
3565 ?candidate_hash,
3566 "Channel closed while recovering data for candidate {:?}",
3567 (candidate_hash, candidate.descriptor.para_id()),
3568 );
3569 metrics_guard.take().on_approval_unavailable();
3571 },
3572 &RecoveryError::Invalid => {
3573 gum::warn!(
3574 target: LOG_TARGET,
3575 ?para_id,
3576 ?candidate_hash,
3577 "Data recovery invalid for candidate {:?}",
3578 (candidate_hash, candidate.descriptor.para_id()),
3579 );
3580 issue_local_invalid_statement(
3581 &mut sender,
3582 session_index,
3583 candidate_hash,
3584 candidate.clone(),
3585 );
3586 metrics_guard.take().on_approval_invalid();
3587 },
3588 }
3589 return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
3590 },
3591 };
3592
3593 let validation_code = match code_rx.await {
3594 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3595 Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3596 Ok(Ok(Some(code))) => code,
3597 Ok(Ok(None)) => {
3598 gum::warn!(
3599 target: LOG_TARGET,
3600 "Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3601 candidate.descriptor.relay_parent(),
3602 block_hash,
3603 );
3604
3605 metrics_guard.take().on_approval_unavailable();
3608 return ApprovalState::failed(validator_index, candidate_hash)
3609 },
3610 };
3611
3612 let (val_tx, val_rx) = oneshot::channel();
3613 sender
3614 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
3615 validation_data: available_data.validation_data,
3616 validation_code,
3617 candidate_receipt: candidate.clone(),
3618 pov: available_data.pov,
3619 executor_params,
3620 exec_kind: PvfExecKind::Approval,
3621 response_sender: val_tx,
3622 })
3623 .await;
3624
3625 match val_rx.await {
3626 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3627 Ok(Ok(ValidationResult::Valid(_, _))) => {
3628 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3632
3633 let _ = metrics_guard.take();
3634 return ApprovalState::approved(validator_index, candidate_hash)
3635 },
3636 Ok(Ok(ValidationResult::Invalid(reason))) => {
3637 gum::warn!(
3638 target: LOG_TARGET,
3639 ?reason,
3640 ?candidate_hash,
3641 ?para_id,
3642 "Detected invalid candidate as an approval checker.",
3643 );
3644
3645 issue_local_invalid_statement(
3646 &mut sender,
3647 session_index,
3648 candidate_hash,
3649 candidate.clone(),
3650 );
3651 metrics_guard.take().on_approval_invalid();
3652 return ApprovalState::failed(validator_index, candidate_hash)
3653 },
3654 Ok(Err(e)) => {
3655 gum::error!(
3656 target: LOG_TARGET,
3657 err = ?e,
3658 ?candidate_hash,
3659 ?para_id,
3660 "Failed to validate candidate due to internal error",
3661 );
3662 metrics_guard.take().on_approval_error();
3663 return ApprovalState::failed(validator_index, candidate_hash)
3664 },
3665 }
3666 };
3667 let (background, remote_handle) = background.remote_handle();
3668 spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3669 Ok(remote_handle)
3670}
3671
3672#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3675async fn issue_approval<
3676 Sender: SubsystemSender<RuntimeApiMessage>,
3677 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3678>(
3679 sender: &mut Sender,
3680 approval_voting_sender: &mut ADSender,
3681 state: &mut State,
3682 db: &mut OverlayedBackend<'_, impl Backend>,
3683 session_info_provider: &mut RuntimeInfo,
3684 metrics: &Metrics,
3685 candidate_hash: CandidateHash,
3686 delayed_approvals_timers: &mut DelayedApprovalTimer,
3687 ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3688 wakeups: &Wakeups,
3689) -> SubsystemResult<Vec<Action>> {
3690 let mut block_entry = match db.load_block_entry(&block_hash)? {
3691 Some(b) => b,
3692 None => {
3693 metrics.on_approval_stale();
3695 return Ok(Vec::new())
3696 },
3697 };
3698
3699 let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3700 {
3701 None => {
3702 gum::warn!(
3703 target: LOG_TARGET,
3704 "Candidate hash {} is not present in the block entry's candidates for relay block {}",
3705 candidate_hash,
3706 block_entry.parent_hash(),
3707 );
3708
3709 metrics.on_approval_error();
3710 return Ok(Vec::new())
3711 },
3712 Some(idx) => idx,
3713 };
3714
3715 let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3716 Some((_, h)) => *h,
3717 None => {
3718 gum::warn!(
3719 target: LOG_TARGET,
3720 "Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3721 candidate_index,
3722 block_hash,
3723 );
3724
3725 metrics.on_approval_error();
3726 return Ok(Vec::new())
3727 },
3728 };
3729
3730 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3731 Some(c) => c,
3732 None => {
3733 gum::warn!(
3734 target: LOG_TARGET,
3735 "Missing entry for candidate index {} included at block {:?}",
3736 candidate_index,
3737 block_hash,
3738 );
3739
3740 metrics.on_approval_error();
3741 return Ok(Vec::new())
3742 },
3743 };
3744
3745 let session_info = match get_session_info(
3746 session_info_provider,
3747 sender,
3748 block_entry.parent_hash(),
3749 block_entry.session(),
3750 )
3751 .await
3752 {
3753 Some(s) => s,
3754 None => return Ok(Vec::new()),
3755 };
3756
3757 if block_entry
3758 .defer_candidate_signature(
3759 candidate_index as _,
3760 candidate_hash,
3761 compute_delayed_approval_sending_tick(
3762 state,
3763 &block_entry,
3764 &candidate_entry,
3765 session_info,
3766 &metrics,
3767 ),
3768 )
3769 .is_some()
3770 {
3771 gum::error!(
3772 target: LOG_TARGET,
3773 ?candidate_hash,
3774 ?block_hash,
3775 validator_index = validator_index.0,
3776 "Possible bug, we shouldn't have to defer a candidate more than once",
3777 );
3778 }
3779
3780 gum::debug!(
3781 target: LOG_TARGET,
3782 ?candidate_hash,
3783 ?block_hash,
3784 validator_index = validator_index.0,
3785 "Ready to issue approval vote",
3786 );
3787
3788 let actions = advance_approval_state(
3789 sender,
3790 state,
3791 db,
3792 session_info_provider,
3793 metrics,
3794 block_entry,
3795 candidate_hash,
3796 candidate_entry,
3797 ApprovalStateTransition::LocalApproval(validator_index as _),
3798 wakeups,
3799 )
3800 .await;
3801
3802 if let Some(next_wakeup) = maybe_create_signature(
3803 db,
3804 session_info_provider,
3805 state,
3806 sender,
3807 approval_voting_sender,
3808 block_hash,
3809 validator_index,
3810 metrics,
3811 )
3812 .await?
3813 {
3814 delayed_approvals_timers.maybe_arm_timer(
3815 next_wakeup,
3816 state.clock.as_ref(),
3817 block_hash,
3818 validator_index,
3819 );
3820 }
3821 Ok(actions)
3822}
3823
3824#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3826async fn maybe_create_signature<
3827 Sender: SubsystemSender<RuntimeApiMessage>,
3828 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3829>(
3830 db: &mut OverlayedBackend<'_, impl Backend>,
3831 session_info_provider: &mut RuntimeInfo,
3832 state: &State,
3833 sender: &mut Sender,
3834 approval_voting_sender: &mut ADSender,
3835 block_hash: Hash,
3836 validator_index: ValidatorIndex,
3837 metrics: &Metrics,
3838) -> SubsystemResult<Option<Tick>> {
3839 let mut block_entry = match db.load_block_entry(&block_hash)? {
3840 Some(b) => b,
3841 None => {
3842 metrics.on_approval_stale();
3844 gum::debug!(
3845 target: LOG_TARGET,
3846 "Could not find block that needs signature {:}", block_hash
3847 );
3848 return Ok(None)
3849 },
3850 };
3851
3852 let approval_params = state
3853 .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3854 .await
3855 .unwrap_or_default();
3856
3857 gum::trace!(
3858 target: LOG_TARGET,
3859 "Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3860 );
3861 let tick_now = state.clock.tick_now();
3862
3863 let (candidates_to_sign, sign_no_later_then) = block_entry
3864 .get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3865
3866 let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3867 Some(candidates_to_sign) => candidates_to_sign,
3868 None => return Ok(sign_no_later_then),
3869 };
3870
3871 let session_info = match get_session_info(
3872 session_info_provider,
3873 sender,
3874 block_entry.parent_hash(),
3875 block_entry.session(),
3876 )
3877 .await
3878 {
3879 Some(s) => s,
3880 None => {
3881 metrics.on_approval_error();
3882 gum::error!(
3883 target: LOG_TARGET,
3884 "Could not retrieve the session"
3885 );
3886 return Ok(None)
3887 },
3888 };
3889
3890 let validator_pubkey = match session_info.validators.get(validator_index) {
3891 Some(p) => p,
3892 None => {
3893 gum::error!(
3894 target: LOG_TARGET,
3895 "Validator index {} out of bounds in session {}",
3896 validator_index.0,
3897 block_entry.session(),
3898 );
3899
3900 metrics.on_approval_error();
3901 return Ok(None)
3902 },
3903 };
3904
3905 let signature = match sign_approval(
3906 &state.keystore,
3907 &validator_pubkey,
3908 &candidates_hashes,
3909 block_entry.session(),
3910 ) {
3911 Some(sig) => sig,
3912 None => {
3913 gum::error!(
3914 target: LOG_TARGET,
3915 validator_index = ?validator_index,
3916 session = ?block_entry.session(),
3917 "Could not issue approval signature. Assignment key present but not validator key?",
3918 );
3919
3920 metrics.on_approval_error();
3921 return Ok(None)
3922 },
3923 };
3924 metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3925
3926 let candidate_entries = candidates_hashes
3927 .iter()
3928 .map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3929 .collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3930
3931 for mut candidate_entry in candidate_entries {
3932 let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3933 candidate_entry.approval_entry_mut(&block_entry.block_hash())
3934 });
3935
3936 match approval_entry {
3937 Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3938 signature: signature.clone(),
3939 signed_candidates_indices: candidates_indices.clone(),
3940 }),
3941 None => {
3942 gum::error!(
3943 target: LOG_TARGET,
3944 candidate_entry = ?candidate_entry,
3945 "Candidate scheduled for signing approval entry should not be None"
3946 );
3947 },
3948 };
3949 candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3950 }
3951
3952 metrics.on_approval_produced();
3953
3954 approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3955 IndirectSignedApprovalVoteV2 {
3956 block_hash: block_entry.block_hash(),
3957 candidate_indices: candidates_indices,
3958 validator: validator_index,
3959 signature,
3960 },
3961 ));
3962
3963 gum::trace!(
3964 target: LOG_TARGET,
3965 ?block_hash,
3966 signed_candidates = ?block_entry.num_candidates_pending_signature(),
3967 "Issue approval votes",
3968 );
3969 block_entry.issued_approval();
3970 db.write_block_entry(block_entry.into());
3971 Ok(None)
3972}
3973
3974fn sign_approval(
3976 keystore: &LocalKeystore,
3977 public: &ValidatorId,
3978 candidate_hashes: &[CandidateHash],
3979 session_index: SessionIndex,
3980) -> Option<ValidatorSignature> {
3981 let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
3982
3983 let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
3984
3985 Some(key.sign(&payload[..]))
3986}
3987
3988fn issue_local_invalid_statement<Sender>(
3990 sender: &mut Sender,
3991 session_index: SessionIndex,
3992 candidate_hash: CandidateHash,
3993 candidate: CandidateReceipt,
3994) where
3995 Sender: SubsystemSender<DisputeCoordinatorMessage>,
3996{
3997 sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
4007 session_index,
4008 candidate_hash,
4009 candidate.clone(),
4010 false,
4011 ));
4012}
4013
4014fn compute_delayed_approval_sending_tick(
4016 state: &State,
4017 block_entry: &BlockEntry,
4018 candidate_entry: &CandidateEntry,
4019 session_info: &SessionInfo,
4020 metrics: &Metrics,
4021) -> Tick {
4022 let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4023 let assignment_tranche = candidate_entry
4024 .approval_entry(&block_entry.block_hash())
4025 .and_then(|approval_entry| approval_entry.our_assignment())
4026 .map(|our_assignment| our_assignment.tranche())
4027 .unwrap_or_default();
4028
4029 let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4030
4031 let no_show_duration_ticks = slot_number_to_tick(
4032 state.slot_duration_millis,
4033 Slot::from(u64::from(session_info.no_show_slots)),
4034 );
4035 let tick_now = state.clock.tick_now();
4036
4037 let sign_no_later_than = min(
4038 tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4039 assignment_triggered_tick + no_show_duration_ticks / 2,
4043 );
4044
4045 metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4046 sign_no_later_than
4047}