1use futures_timer::Delay;
25use polkadot_node_primitives::{
26 approval::{
27 v1::{BlockApprovalMeta, DelayTranche},
28 v2::{
29 AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
30 IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2,
31 },
32 },
33 ValidationResult, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 errors::RecoveryError,
37 messages::{
38 ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
39 ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
40 AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
41 ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote,
42 DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage,
43 RuntimeApiRequest,
44 },
45 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
46 SubsystemSender,
47};
48use polkadot_node_subsystem_util::{
49 self,
50 database::Database,
51 metrics::{self, prometheus},
52 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
53 TimeoutExt,
54};
55use polkadot_primitives::{
56 ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash,
57 CandidateIndex, CandidateReceiptV2 as CandidateReceipt, CoreIndex, GroupIndex, Hash,
58 SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
59};
60use sc_keystore::LocalKeystore;
61use sp_application_crypto::Pair;
62use sp_consensus::SyncOracle;
63use sp_consensus_slots::Slot;
64use std::time::Instant;
65
66const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
70
71use futures::{
72 channel::oneshot,
73 future::{BoxFuture, RemoteHandle},
74 prelude::*,
75 stream::FuturesUnordered,
76 StreamExt,
77};
78
79use std::{
80 cmp::min,
81 collections::{
82 btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
83 },
84 sync::Arc,
85 time::Duration,
86};
87
88use schnellru::{ByLength, LruMap};
89
90use approval_checking::RequiredTranches;
91use bitvec::{order::Lsb0, vec::BitVec};
92pub use criteria::{AssignmentCriteria, Config as AssignmentConfig, RealAssignmentCriteria};
93use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
94use polkadot_node_primitives::approval::time::{
95 slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick,
96};
97
98mod approval_checking;
99pub mod approval_db;
100mod backend;
101pub mod criteria;
102mod import;
103mod ops;
104mod persisted_entries;
105
106use crate::{
107 approval_checking::{Check, TranchesToApproveResult},
108 approval_db::common::{Config as DatabaseConfig, DbBackend},
109 backend::{Backend, OverlayedBackend},
110 criteria::InvalidAssignmentReason,
111 persisted_entries::OurApproval,
112};
113
114#[cfg(test)]
115mod tests;
116
117const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
118const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
123const APPROVAL_CACHE_SIZE: u32 = 1024;
124
125const MAX_APPROVAL_RETRIES: u32 = 16;
127
128const APPROVAL_DELAY: Tick = 2;
129pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
130
131const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
133
134const RESTART_WAKEUP_DELAY: Tick = 12;
143
144#[derive(Debug, Clone)]
146pub struct Config {
147 pub col_approval_data: u32,
149 pub slot_duration_millis: u64,
152}
153
154enum Mode {
164 Active,
165 Syncing(Box<dyn SyncOracle + Send>),
166}
167
168pub struct ApprovalVotingSubsystem {
170 keystore: Arc<LocalKeystore>,
174 db_config: DatabaseConfig,
175 slot_duration_millis: u64,
176 db: Arc<dyn Database>,
177 mode: Mode,
178 metrics: Metrics,
179 clock: Arc<dyn Clock + Send + Sync>,
180 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
181 max_approval_retries: u32,
183 retry_backoff: Duration,
185}
186
187#[derive(Clone)]
188struct MetricsInner {
189 imported_candidates_total: prometheus::Counter<prometheus::U64>,
190 assignments_produced: prometheus::Histogram,
191 approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
192 no_shows_total: prometheus::Counter<prometheus::U64>,
193 observed_no_shows: prometheus::Counter<prometheus::U64>,
197 approved_by_one_third: prometheus::Counter<prometheus::U64>,
198 wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
199 coalesced_approvals_buckets: prometheus::Histogram,
200 coalesced_approvals_delay: prometheus::Histogram,
201 candidate_approval_time_ticks: prometheus::Histogram,
202 block_approval_time_ticks: prometheus::Histogram,
203 time_db_transaction: prometheus::Histogram,
204 time_recover_and_approve: prometheus::Histogram,
205 candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
206 unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
207 assignments_gathering_time_by_stage: prometheus::HistogramVec,
215}
216
217#[derive(Default, Clone)]
219pub struct Metrics(Option<MetricsInner>);
220
221impl Metrics {
222 fn on_candidate_imported(&self) {
223 if let Some(metrics) = &self.0 {
224 metrics.imported_candidates_total.inc();
225 }
226 }
227
228 fn on_assignment_produced(&self, tranche: DelayTranche) {
229 if let Some(metrics) = &self.0 {
230 metrics.assignments_produced.observe(tranche as f64);
231 }
232 }
233
234 fn on_approval_coalesce(&self, num_coalesced: u32) {
235 if let Some(metrics) = &self.0 {
236 for _ in 0..num_coalesced {
239 metrics.coalesced_approvals_buckets.observe(num_coalesced as f64)
240 }
241 }
242 }
243
244 fn on_delayed_approval(&self, delayed_ticks: u64) {
245 if let Some(metrics) = &self.0 {
246 metrics.coalesced_approvals_delay.observe(delayed_ticks as f64)
247 }
248 }
249
250 fn on_approval_stale(&self) {
251 if let Some(metrics) = &self.0 {
252 metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
253 }
254 }
255
256 fn on_approval_invalid(&self) {
257 if let Some(metrics) = &self.0 {
258 metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
259 }
260 }
261
262 fn on_approval_unavailable(&self) {
263 if let Some(metrics) = &self.0 {
264 metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
265 }
266 }
267
268 fn on_approval_error(&self) {
269 if let Some(metrics) = &self.0 {
270 metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
271 }
272 }
273
274 fn on_approval_produced(&self) {
275 if let Some(metrics) = &self.0 {
276 metrics.approvals_produced_total.with_label_values(&["success"]).inc()
277 }
278 }
279
280 fn on_no_shows(&self, n: usize) {
281 if let Some(metrics) = &self.0 {
282 metrics.no_shows_total.inc_by(n as u64);
283 }
284 }
285
286 fn on_observed_no_shows(&self, n: usize) {
287 if let Some(metrics) = &self.0 {
288 metrics.observed_no_shows.inc_by(n as u64);
289 }
290 }
291
292 fn on_approved_by_one_third(&self) {
293 if let Some(metrics) = &self.0 {
294 metrics.approved_by_one_third.inc();
295 }
296 }
297
298 fn on_wakeup(&self) {
299 if let Some(metrics) = &self.0 {
300 metrics.wakeups_triggered_total.inc();
301 }
302 }
303
304 fn on_candidate_approved(&self, ticks: Tick) {
305 if let Some(metrics) = &self.0 {
306 metrics.candidate_approval_time_ticks.observe(ticks as f64);
307 }
308 }
309
310 fn on_block_approved(&self, ticks: Tick) {
311 if let Some(metrics) = &self.0 {
312 metrics.block_approval_time_ticks.observe(ticks as f64);
313 }
314 }
315
316 fn on_candidate_signatures_request(&self) {
317 if let Some(metrics) = &self.0 {
318 metrics.candidate_signatures_requests_total.inc();
319 }
320 }
321
322 fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
323 self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
324 }
325
326 fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
327 self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
328 }
329
330 fn on_unapproved_candidates_in_unfinalized_chain(&self, count: usize) {
331 if let Some(metrics) = &self.0 {
332 metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
333 }
334 }
335
336 pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
337 if let Some(metrics) = &self.0 {
338 let stage_string = stage.to_string();
339 metrics
344 .assignments_gathering_time_by_stage
345 .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
346 .observe(elapsed_as_millis as f64);
347 }
348 }
349}
350
351impl metrics::Metrics for Metrics {
352 fn try_register(
353 registry: &prometheus::Registry,
354 ) -> std::result::Result<Self, prometheus::PrometheusError> {
355 let metrics = MetricsInner {
356 imported_candidates_total: prometheus::register(
357 prometheus::Counter::new(
358 "polkadot_parachain_imported_candidates_total",
359 "Number of candidates imported by the approval voting subsystem",
360 )?,
361 registry,
362 )?,
363 assignments_produced: prometheus::register(
364 prometheus::Histogram::with_opts(
365 prometheus::HistogramOpts::new(
366 "polkadot_parachain_assignments_produced",
367 "Assignments and tranches produced by the approval voting subsystem",
368 ).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
369 )?,
370 registry,
371 )?,
372 approvals_produced_total: prometheus::register(
373 prometheus::CounterVec::new(
374 prometheus::Opts::new(
375 "polkadot_parachain_approvals_produced_total",
376 "Number of approvals produced by the approval voting subsystem",
377 ),
378 &["status"]
379 )?,
380 registry,
381 )?,
382 no_shows_total: prometheus::register(
383 prometheus::Counter::new(
384 "polkadot_parachain_approvals_no_shows_total",
385 "Number of assignments which became no-shows in the approval voting subsystem",
386 )?,
387 registry,
388 )?,
389 observed_no_shows: prometheus::register(
390 prometheus::Counter::new(
391 "polkadot_parachain_approvals_observed_no_shows_total",
392 "Number of observed no shows at any moment in time",
393 )?,
394 registry,
395 )?,
396 wakeups_triggered_total: prometheus::register(
397 prometheus::Counter::new(
398 "polkadot_parachain_approvals_wakeups_total",
399 "Number of times we woke up to process a candidate in the approval voting subsystem",
400 )?,
401 registry,
402 )?,
403 candidate_approval_time_ticks: prometheus::register(
404 prometheus::Histogram::with_opts(
405 prometheus::HistogramOpts::new(
406 "polkadot_parachain_approvals_candidate_approval_time_ticks",
407 "Number of ticks (500ms) to approve candidates.",
408 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
409 )?,
410 registry,
411 )?,
412 coalesced_approvals_buckets: prometheus::register(
413 prometheus::Histogram::with_opts(
414 prometheus::HistogramOpts::new(
415 "polkadot_parachain_approvals_coalesced_approvals_buckets",
416 "Number of coalesced approvals.",
417 ).buckets(vec![1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]),
418 )?,
419 registry,
420 )?,
421 coalesced_approvals_delay: prometheus::register(
422 prometheus::Histogram::with_opts(
423 prometheus::HistogramOpts::new(
424 "polkadot_parachain_approvals_coalescing_delay",
425 "Number of ticks we delay the sending of a candidate approval",
426 ).buckets(vec![1.1, 2.1, 3.1, 4.1, 6.1, 8.1, 12.1, 20.1, 32.1]),
427 )?,
428 registry,
429 )?,
430 approved_by_one_third: prometheus::register(
431 prometheus::Counter::new(
432 "polkadot_parachain_approved_by_one_third",
433 "Number of candidates where more than one third had to vote ",
434 )?,
435 registry,
436 )?,
437 block_approval_time_ticks: prometheus::register(
438 prometheus::Histogram::with_opts(
439 prometheus::HistogramOpts::new(
440 "polkadot_parachain_approvals_blockapproval_time_ticks",
441 "Number of ticks (500ms) to approve blocks.",
442 ).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
443 )?,
444 registry,
445 )?,
446 time_db_transaction: prometheus::register(
447 prometheus::Histogram::with_opts(
448 prometheus::HistogramOpts::new(
449 "polkadot_parachain_time_approval_db_transaction",
450 "Time spent writing an approval db transaction.",
451 )
452 )?,
453 registry,
454 )?,
455 time_recover_and_approve: prometheus::register(
456 prometheus::Histogram::with_opts(
457 prometheus::HistogramOpts::new(
458 "polkadot_parachain_time_recover_and_approve",
459 "Time spent recovering and approving data in approval voting",
460 )
461 )?,
462 registry,
463 )?,
464 candidate_signatures_requests_total: prometheus::register(
465 prometheus::Counter::new(
466 "polkadot_parachain_approval_candidate_signatures_requests_total",
467 "Number of times signatures got requested by other subsystems",
468 )?,
469 registry,
470 )?,
471 unapproved_candidates_in_unfinalized_chain: prometheus::register(
472 prometheus::Gauge::new(
473 "polkadot_parachain_approval_unapproved_candidates_in_unfinalized_chain",
474 "Number of unapproved candidates in unfinalized chain",
475 )?,
476 registry,
477 )?,
478 assignments_gathering_time_by_stage: prometheus::register(
479 prometheus::HistogramVec::new(
480 prometheus::HistogramOpts::new(
481 "polkadot_parachain_assignments_gather_time_by_stage_ms",
482 "The time in ms it takes for each stage to gather enough assignments needed for approval",
483 )
484 .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
485 &["stage"],
486 )?,
487 registry,
488 )?,
489 };
490
491 Ok(Metrics(Some(metrics)))
492 }
493}
494
495impl ApprovalVotingSubsystem {
496 pub fn with_config(
498 config: Config,
499 db: Arc<dyn Database>,
500 keystore: Arc<LocalKeystore>,
501 sync_oracle: Box<dyn SyncOracle + Send>,
502 metrics: Metrics,
503 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
504 ) -> Self {
505 ApprovalVotingSubsystem::with_config_and_clock(
506 config,
507 db,
508 keystore,
509 sync_oracle,
510 metrics,
511 Arc::new(SystemClock {}),
512 spawner,
513 MAX_APPROVAL_RETRIES,
514 APPROVAL_CHECKING_TIMEOUT / 2,
515 )
516 }
517
518 pub fn with_config_and_clock(
520 config: Config,
521 db: Arc<dyn Database>,
522 keystore: Arc<LocalKeystore>,
523 sync_oracle: Box<dyn SyncOracle + Send>,
524 metrics: Metrics,
525 clock: Arc<dyn Clock + Send + Sync>,
526 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
527 max_approval_retries: u32,
528 retry_backoff: Duration,
529 ) -> Self {
530 ApprovalVotingSubsystem {
531 keystore,
532 slot_duration_millis: config.slot_duration_millis,
533 db,
534 db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
535 mode: Mode::Syncing(sync_oracle),
536 metrics,
537 clock,
538 spawner,
539 max_approval_retries,
540 retry_backoff,
541 }
542 }
543
544 pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
547 let config =
548 approval_db::common::Config { col_approval_data: self.db_config.col_approval_data };
549 let mut backend = approval_db::common::DbBackend::new(self.db.clone(), config);
550 let mut overlay = OverlayedBackend::new(&backend);
551
552 ops::revert_to(&mut overlay, hash)?;
553
554 let ops = overlay.into_write_ops();
555 backend.write(ops)
556 }
557}
558
559fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemResult<()> {
562 let backend = DbBackend::new(db, config);
563 let all_blocks = backend.load_all_blocks()?;
564
565 if all_blocks.is_empty() {
566 gum::info!(target: LOG_TARGET, "Starting with an empty approval vote DB.",);
567 } else {
568 gum::debug!(
569 target: LOG_TARGET,
570 "Starting with {} blocks in approval vote DB.",
571 all_blocks.len()
572 );
573 }
574
575 Ok(())
576}
577
578#[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)]
579impl<Context: Send> ApprovalVotingSubsystem {
580 fn start(self, mut ctx: Context) -> SpawnedSubsystem {
581 let backend = DbBackend::new(self.db.clone(), self.db_config);
582 let to_other_subsystems = ctx.sender().clone();
583 let to_approval_distr = ctx.sender().clone();
584
585 let future = run::<DbBackend, _, _, _>(
586 ctx,
587 to_other_subsystems,
588 to_approval_distr,
589 self,
590 Box::new(RealAssignmentCriteria),
591 backend,
592 )
593 .map_err(|e| SubsystemError::with_origin("approval-voting", e))
594 .boxed();
595
596 SpawnedSubsystem { name: "approval-voting-subsystem", future }
597 }
598}
599
600#[derive(Debug, Clone)]
601struct ApprovalVoteRequest {
602 validator_index: ValidatorIndex,
603 block_hash: Hash,
604}
605
606#[derive(Default)]
607struct Wakeups {
608 wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
610 reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
611 block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
612}
613
614impl Wakeups {
615 fn first(&self) -> Option<Tick> {
617 self.wakeups.keys().next().map(|t| *t)
618 }
619
620 fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
621 self.block_numbers.entry(block_number).or_default().insert(block_hash);
622 }
623
624 fn schedule(
627 &mut self,
628 block_hash: Hash,
629 block_number: BlockNumber,
630 candidate_hash: CandidateHash,
631 tick: Tick,
632 ) {
633 if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
634 if prev <= &tick {
635 return;
636 }
637
638 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) {
640 if let Some(pos) =
641 entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
642 {
643 entry.get_mut().remove(pos);
644 }
645
646 if entry.get().is_empty() {
647 let _ = entry.remove_entry();
648 }
649 }
650 } else {
651 self.note_block(block_hash, block_number);
652 }
653
654 self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
655 self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
656 }
657
658 fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
659 let after = self.block_numbers.split_off(&(finalized_number + 1));
660 let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
661 .into_iter()
662 .flat_map(|(_number, hashes)| hashes)
663 .collect();
664
665 let mut pruned_wakeups = BTreeMap::new();
666 self.reverse_wakeups.retain(|(h, c_h), tick| {
667 let live = !pruned_blocks.contains(h);
668 if !live {
669 pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
670 }
671 live
672 });
673
674 for (tick, pruned) in pruned_wakeups {
675 if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) {
676 entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
677 if entry.get().is_empty() {
678 let _ = entry.remove();
679 }
680 }
681 }
682 }
683
684 fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
686 self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
687 }
688
689 async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
691 match self.first() {
692 None => future::pending().await,
693 Some(tick) => {
694 clock.wait(tick).await;
695 match self.wakeups.entry(tick) {
696 BTMEntry::Vacant(_) => {
697 panic!("entry is known to exist since `first` was `Some`; qed")
698 },
699 BTMEntry::Occupied(mut entry) => {
700 let (hash, candidate_hash) = entry.get_mut().pop()
701 .expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
702
703 if entry.get().is_empty() {
704 let _ = entry.remove();
705 }
706
707 self.reverse_wakeups.remove(&(hash, candidate_hash));
708
709 (tick, hash, candidate_hash)
710 },
711 }
712 },
713 }
714 }
715}
716
717struct ApprovalStatus {
718 required_tranches: RequiredTranches,
719 tranche_now: DelayTranche,
720 block_tick: Tick,
721 last_no_shows: usize,
722 no_show_validators: Vec<ValidatorIndex>,
723}
724
725#[derive(Copy, Clone)]
726enum ApprovalOutcome {
727 Approved,
728 Failed,
729 TimedOut,
730}
731
732#[derive(Clone)]
733struct RetryApprovalInfo {
734 candidate: CandidateReceipt,
735 backing_group: GroupIndex,
736 core_index: Option<CoreIndex>,
737 session_index: SessionIndex,
738 attempts_remaining: u32,
739 backoff: Duration,
740}
741
742struct ApprovalState {
743 validator_index: ValidatorIndex,
744 candidate_hash: CandidateHash,
745 approval_outcome: ApprovalOutcome,
746 retry_info: Option<RetryApprovalInfo>,
747}
748
749impl ApprovalState {
750 fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
751 Self {
752 validator_index,
753 candidate_hash,
754 approval_outcome: ApprovalOutcome::Approved,
755 retry_info: None,
756 }
757 }
758 fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
759 Self {
760 validator_index,
761 candidate_hash,
762 approval_outcome: ApprovalOutcome::Failed,
763 retry_info: None,
764 }
765 }
766
767 fn failed_with_retry(
768 validator_index: ValidatorIndex,
769 candidate_hash: CandidateHash,
770 retry_info: Option<RetryApprovalInfo>,
771 ) -> Self {
772 Self {
773 validator_index,
774 candidate_hash,
775 approval_outcome: ApprovalOutcome::Failed,
776 retry_info,
777 }
778 }
779}
780
781struct CurrentlyCheckingSet {
782 candidate_hash_map: HashMap<CandidateHash, HashSet<Hash>>,
783 currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
784}
785
786impl Default for CurrentlyCheckingSet {
787 fn default() -> Self {
788 Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
789 }
790}
791
792impl CurrentlyCheckingSet {
793 pub async fn insert_relay_block_hash(
796 &mut self,
797 candidate_hash: CandidateHash,
798 validator_index: ValidatorIndex,
799 relay_block: Hash,
800 launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
801 ) -> SubsystemResult<()> {
802 match self.candidate_hash_map.entry(candidate_hash) {
803 HMEntry::Occupied(mut entry) => {
804 entry.get_mut().insert(relay_block);
806 },
807 HMEntry::Vacant(entry) => {
808 entry.insert(HashSet::new()).insert(relay_block);
810 let work = launch_work.await?;
811 self.currently_checking.push(Box::pin(async move {
812 match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
813 None => ApprovalState {
814 candidate_hash,
815 validator_index,
816 approval_outcome: ApprovalOutcome::TimedOut,
817 retry_info: None,
818 },
819 Some(approval_state) => approval_state,
820 }
821 }));
822 },
823 }
824
825 Ok(())
826 }
827
828 pub async fn next(
829 &mut self,
830 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
831 ) -> (HashSet<Hash>, ApprovalState) {
832 if !self.currently_checking.is_empty() {
833 if let Some(approval_state) = self.currently_checking.next().await {
834 let out = self
835 .candidate_hash_map
836 .remove(&approval_state.candidate_hash)
837 .unwrap_or_default();
838 approvals_cache
839 .insert(approval_state.candidate_hash, approval_state.approval_outcome);
840 return (out, approval_state);
841 }
842 }
843
844 future::pending().await
845 }
846}
847
848async fn get_extended_session_info_by_index<'a, Sender>(
849 runtime_info: &'a mut RuntimeInfo,
850 sender: &mut Sender,
851 block_hash: Hash,
852 session_index: SessionIndex,
853) -> Option<&'a ExtendedSessionInfo>
854where
855 Sender: SubsystemSender<RuntimeApiMessage>,
856{
857 match runtime_info.get_session_info_by_index(sender, block_hash, session_index).await {
858 Ok(extended_info) => Some(&extended_info),
859 Err(_) => {
860 gum::debug!(
861 target: LOG_TARGET,
862 session = session_index,
863 ?block_hash,
864 "Can't obtain SessionInfo or ExecutorParams"
865 );
866 None
867 },
868 }
869}
870
871async fn get_session_info_by_index<'a, Sender>(
872 runtime_info: &'a mut RuntimeInfo,
873 sender: &mut Sender,
874 block_hash: Hash,
875 session_index: SessionIndex,
876) -> Option<&'a SessionInfo>
877where
878 Sender: SubsystemSender<RuntimeApiMessage>,
879{
880 get_extended_session_info_by_index(runtime_info, sender, block_hash, session_index)
881 .await
882 .map(|extended_info| &extended_info.session_info)
883}
884
885struct State {
886 keystore: Arc<LocalKeystore>,
887 slot_duration_millis: u64,
888 clock: Arc<dyn Clock + Send + Sync>,
889 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
890 per_block_assignments_gathering_times:
894 LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
895 no_show_stats: NoShowStats,
896}
897
898const NO_SHOW_DUMP_FREQUENCY: BlockNumber = 50;
900pub(crate) const MAX_RECORDED_NO_SHOW_VALIDATORS_PER_CANDIDATE: usize = 20;
902
903#[derive(Debug, Clone, PartialEq, Eq, Default)]
908struct NoShowStats {
909 per_validator_no_show: HashMap<SessionIndex, HashMap<ValidatorIndex, usize>>,
910 per_parachain_no_show: HashMap<u32, usize>,
911 last_dumped_block_number: BlockNumber,
912}
913
914impl NoShowStats {
915 fn maybe_print(&mut self, current_block_number: BlockNumber) {
918 if self.last_dumped_block_number > current_block_number ||
919 current_block_number - self.last_dumped_block_number < NO_SHOW_DUMP_FREQUENCY
920 {
921 return;
922 }
923 if self.per_parachain_no_show.is_empty() && self.per_validator_no_show.is_empty() {
924 return;
925 }
926
927 gum::debug!(
928 target: LOG_TARGET,
929 "Validators with no_show {:?} and parachains with no_shows {:?} since {:}",
930 self.per_validator_no_show,
931 self.per_parachain_no_show,
932 self.last_dumped_block_number
933 );
934
935 self.last_dumped_block_number = current_block_number;
936
937 self.per_validator_no_show.clear();
938 self.per_parachain_no_show.clear();
939 }
940}
941
942#[derive(Debug, Clone, PartialEq, Eq)]
943struct AssignmentGatheringRecord {
944 stage: usize,
949 stage_start: Option<Instant>,
951}
952
953impl Default for AssignmentGatheringRecord {
954 fn default() -> Self {
955 AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
956 }
957}
958
959#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
960impl State {
961 async fn approval_status<Sender, 'a, 'b>(
965 &'a self,
966 sender: &mut Sender,
967 session_info_provider: &'a mut RuntimeInfo,
968 block_entry: &'a BlockEntry,
969 candidate_entry: &'b CandidateEntry,
970 ) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
971 where
972 Sender: SubsystemSender<RuntimeApiMessage>,
973 {
974 let session_info = match get_session_info_by_index(
975 session_info_provider,
976 sender,
977 block_entry.parent_hash(),
978 block_entry.session(),
979 )
980 .await
981 {
982 Some(s) => s,
983 None => return None,
984 };
985 let block_hash = block_entry.block_hash();
986
987 let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
988 let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
989 let no_show_duration = slot_number_to_tick(
990 self.slot_duration_millis,
991 Slot::from(u64::from(session_info.no_show_slots)),
992 );
993
994 if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
995 let TranchesToApproveResult {
996 required_tranches,
997 total_observed_no_shows,
998 no_show_validators,
999 } = approval_checking::tranches_to_approve(
1000 approval_entry,
1001 candidate_entry.approvals(),
1002 tranche_now,
1003 block_tick,
1004 no_show_duration,
1005 session_info.needed_approvals as _,
1006 );
1007
1008 let status = ApprovalStatus {
1009 required_tranches,
1010 block_tick,
1011 tranche_now,
1012 last_no_shows: total_observed_no_shows,
1013 no_show_validators,
1014 };
1015
1016 Some((approval_entry, status))
1017 } else {
1018 None
1019 }
1020 }
1021
1022 async fn get_approval_voting_params_or_default<Sender: SubsystemSender<RuntimeApiMessage>>(
1024 &self,
1025 sender: &mut Sender,
1026 session_index: SessionIndex,
1027 block_hash: Hash,
1028 ) -> Option<ApprovalVotingParams> {
1029 let (s_tx, s_rx) = oneshot::channel();
1030
1031 sender
1032 .send_message(RuntimeApiMessage::Request(
1033 block_hash,
1034 RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx),
1035 ))
1036 .await;
1037
1038 match s_rx.await {
1039 Ok(Ok(params)) => {
1040 gum::trace!(
1041 target: LOG_TARGET,
1042 approval_voting_params = ?params,
1043 session = ?session_index,
1044 "Using the following subsystem params"
1045 );
1046 Some(params)
1047 },
1048 Ok(Err(err)) => {
1049 gum::debug!(
1050 target: LOG_TARGET,
1051 ?err,
1052 "Could not request approval voting params from runtime"
1053 );
1054 None
1055 },
1056 Err(err) => {
1057 gum::debug!(
1058 target: LOG_TARGET,
1059 ?err,
1060 "Could not request approval voting params from runtime"
1061 );
1062 None
1063 },
1064 }
1065 }
1066
1067 fn mark_begining_of_gathering_assignments(
1068 &mut self,
1069 block_number: BlockNumber,
1070 block_hash: Hash,
1071 candidate: CandidateHash,
1072 ) {
1073 if let Some(record) = self
1074 .per_block_assignments_gathering_times
1075 .get_or_insert(block_number, HashMap::new)
1076 .and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
1077 {
1078 if record.stage_start.is_none() {
1079 record.stage += 1;
1080 gum::debug!(
1081 target: LOG_TARGET,
1082 stage = ?record.stage,
1083 ?block_hash,
1084 ?candidate,
1085 "Started a new assignment gathering stage",
1086 );
1087 record.stage_start = Some(Instant::now());
1088 }
1089 }
1090 }
1091
1092 fn mark_gathered_enough_assignments(
1093 &mut self,
1094 block_number: BlockNumber,
1095 block_hash: Hash,
1096 candidate: CandidateHash,
1097 ) -> AssignmentGatheringRecord {
1098 let record = self
1099 .per_block_assignments_gathering_times
1100 .get(&block_number)
1101 .and_then(|entry| entry.get_mut(&(block_hash, candidate)));
1102 let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
1103 AssignmentGatheringRecord {
1104 stage,
1105 stage_start: record.and_then(|record| record.stage_start.take()),
1106 }
1107 }
1108
1109 fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
1110 while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
1111 {
1112 if *block_number < remove_lower_than {
1113 self.per_block_assignments_gathering_times.pop_oldest();
1114 } else {
1115 break;
1116 }
1117 }
1118 }
1119
1120 fn observe_assignment_gathering_status(
1121 &mut self,
1122 metrics: &Metrics,
1123 required_tranches: &RequiredTranches,
1124 block_hash: Hash,
1125 block_number: BlockNumber,
1126 candidate_hash: CandidateHash,
1127 ) {
1128 match required_tranches {
1129 RequiredTranches::All | RequiredTranches::Pending { .. } => {
1130 self.mark_begining_of_gathering_assignments(
1131 block_number,
1132 block_hash,
1133 candidate_hash,
1134 );
1135 },
1136 RequiredTranches::Exact { .. } => {
1137 let time_to_gather =
1138 self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
1139 if let Some(gathering_started) = time_to_gather.stage_start {
1140 if gathering_started.elapsed().as_millis() > 6000 {
1141 gum::trace!(
1142 target: LOG_TARGET,
1143 ?block_hash,
1144 ?candidate_hash,
1145 "Long assignment gathering time",
1146 );
1147 }
1148 metrics.observe_assignment_gathering_time(
1149 time_to_gather.stage,
1150 gathering_started.elapsed().as_millis() as usize,
1151 )
1152 }
1153 },
1154 }
1155 }
1156
1157 fn record_no_shows(
1158 &mut self,
1159 session_index: SessionIndex,
1160 para_id: u32,
1161 no_show_validators: &Vec<ValidatorIndex>,
1162 ) {
1163 if !no_show_validators.is_empty() {
1164 *self.no_show_stats.per_parachain_no_show.entry(para_id.into()).or_default() += 1;
1165 }
1166 for validator_index in no_show_validators {
1167 *self
1168 .no_show_stats
1169 .per_validator_no_show
1170 .entry(session_index)
1171 .or_default()
1172 .entry(*validator_index)
1173 .or_default() += 1;
1174 }
1175 }
1176}
1177
1178#[derive(Debug, Clone)]
1179enum Action {
1180 ScheduleWakeup {
1181 block_hash: Hash,
1182 block_number: BlockNumber,
1183 candidate_hash: CandidateHash,
1184 tick: Tick,
1185 },
1186 LaunchApproval {
1187 claimed_candidate_indices: CandidateBitfield,
1188 candidate_hash: CandidateHash,
1189 indirect_cert: IndirectAssignmentCertV2,
1190 assignment_tranche: DelayTranche,
1191 relay_block_hash: Hash,
1192 session: SessionIndex,
1193 candidate: CandidateReceipt,
1194 backing_group: GroupIndex,
1195 distribute_assignment: bool,
1196 core_index: Option<CoreIndex>,
1197 },
1198 NoteApprovedInChainSelection(Hash),
1199 IssueApproval(CandidateHash, ApprovalVoteRequest),
1200 BecomeActive,
1201 Conclude,
1202}
1203
1204#[async_trait::async_trait]
1206pub trait ApprovalVotingWorkProvider {
1207 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>>;
1208}
1209
1210#[async_trait::async_trait]
1211#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1212impl<Context> ApprovalVotingWorkProvider for Context {
1213 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
1214 self.recv().await
1215 }
1216}
1217
1218#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1219async fn run<
1220 B,
1221 WorkProvider: ApprovalVotingWorkProvider,
1222 Sender: SubsystemSender<ChainApiMessage>
1223 + SubsystemSender<RuntimeApiMessage>
1224 + SubsystemSender<ChainSelectionMessage>
1225 + SubsystemSender<AvailabilityRecoveryMessage>
1226 + SubsystemSender<DisputeCoordinatorMessage>
1227 + SubsystemSender<CandidateValidationMessage>
1228 + Clone,
1229 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1230>(
1231 mut work_provider: WorkProvider,
1232 mut to_other_subsystems: Sender,
1233 mut to_approval_distr: ADSender,
1234 mut subsystem: ApprovalVotingSubsystem,
1235 assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
1236 mut backend: B,
1237) -> SubsystemResult<()>
1238where
1239 B: Backend,
1240{
1241 if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) {
1242 gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
1243 }
1244
1245 let mut state = State {
1246 keystore: subsystem.keystore,
1247 slot_duration_millis: subsystem.slot_duration_millis,
1248 clock: subsystem.clock,
1249 assignment_criteria,
1250 per_block_assignments_gathering_times: LruMap::new(ByLength::new(
1251 MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
1252 )),
1253 no_show_stats: NoShowStats::default(),
1254 };
1255
1256 let mut last_finalized_height: Option<BlockNumber> = {
1257 let (tx, rx) = oneshot::channel();
1258 to_other_subsystems
1259 .send_message(ChainApiMessage::FinalizedBlockNumber(tx))
1260 .await;
1261 match rx.await? {
1262 Ok(number) => Some(number),
1263 Err(err) => {
1264 gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number");
1265 None
1266 },
1267 }
1268 };
1269
1270 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
1272 keystore: None,
1273 session_cache_lru_size: DISPUTE_WINDOW.get(),
1274 });
1275
1276 let mut wakeups = Wakeups::default();
1277 let mut currently_checking_set = CurrentlyCheckingSet::default();
1278 let mut delayed_approvals_timers = DelayedApprovalTimer::default();
1279 let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE));
1280
1281 loop {
1282 let mut overlayed_db = OverlayedBackend::new(&backend);
1283 let actions = futures::select! {
1284 (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
1285 subsystem.metrics.on_wakeup();
1286 process_wakeup(
1287 &mut to_other_subsystems,
1288 &mut state,
1289 &mut overlayed_db,
1290 &mut session_info_provider,
1291 woken_block,
1292 woken_candidate,
1293 &subsystem.metrics,
1294 &wakeups,
1295 ).await?
1296 }
1297 next_msg = work_provider.recv().fuse() => {
1298 let mut actions = handle_from_overseer(
1299 &mut to_other_subsystems,
1300 &mut to_approval_distr,
1301 &subsystem.spawner,
1302 &mut state,
1303 &mut overlayed_db,
1304 &mut session_info_provider,
1305 &subsystem.metrics,
1306 next_msg?,
1307 &mut last_finalized_height,
1308 &mut wakeups,
1309 ).await?;
1310
1311 if let Mode::Syncing(ref mut oracle) = subsystem.mode {
1312 if !oracle.is_major_syncing() {
1313 actions.insert(0, Action::BecomeActive)
1315 }
1316 }
1317
1318 actions
1319 }
1320 approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
1321 let mut actions = Vec::new();
1322 let (
1323 relay_block_hashes,
1324 ApprovalState {
1325 validator_index,
1326 candidate_hash,
1327 approval_outcome,
1328 retry_info,
1329 }
1330 ) = approval_state;
1331
1332 if matches!(approval_outcome, ApprovalOutcome::Approved) {
1333 let mut approvals: Vec<Action> = relay_block_hashes
1334 .iter()
1335 .map(|block_hash|
1336 Action::IssueApproval(
1337 candidate_hash,
1338 ApprovalVoteRequest {
1339 validator_index,
1340 block_hash: *block_hash,
1341 },
1342 )
1343 )
1344 .collect();
1345 actions.append(&mut approvals);
1346 }
1347
1348 if let Some(retry_info) = retry_info {
1349 for block_hash in relay_block_hashes {
1350 if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
1351 let sender = to_other_subsystems.clone();
1352 let spawn_handle = subsystem.spawner.clone();
1353 let metrics = subsystem.metrics.clone();
1354 let retry_info = retry_info.clone();
1355 let candidate = retry_info.candidate.clone();
1356
1357 currently_checking_set
1358 .insert_relay_block_hash(
1359 candidate_hash,
1360 validator_index,
1361 block_hash,
1362 async move {
1363 launch_approval(
1364 sender,
1365 spawn_handle,
1366 metrics,
1367 retry_info.session_index,
1368 candidate,
1369 validator_index,
1370 block_hash,
1371 retry_info.backing_group,
1372 retry_info.core_index,
1373 retry_info,
1374 )
1375 .await
1376 },
1377 )
1378 .await?;
1379 }
1380 }
1381 }
1382
1383 actions
1384 },
1385 (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
1386 gum::debug!(
1387 target: LOG_TARGET,
1388 ?block_hash,
1389 ?validator_index,
1390 "Sign approval for multiple candidates",
1391 );
1392
1393 match maybe_create_signature(
1394 &mut overlayed_db,
1395 &mut session_info_provider,
1396 &state,
1397 &mut to_other_subsystems,
1398 &mut to_approval_distr,
1399 block_hash,
1400 validator_index,
1401 &subsystem.metrics,
1402 ).await {
1403 Ok(Some(next_wakeup)) => {
1404 delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index);
1405 },
1406 Ok(None) => {}
1407 Err(err) => {
1408 gum::error!(
1409 target: LOG_TARGET,
1410 ?err,
1411 "Failed to create signature",
1412 );
1413 }
1414 }
1415 vec![]
1416 }
1417 };
1418
1419 if handle_actions(
1420 &mut to_other_subsystems,
1421 &mut to_approval_distr,
1422 &subsystem.spawner,
1423 &mut state,
1424 &mut overlayed_db,
1425 &mut session_info_provider,
1426 &subsystem.metrics,
1427 &mut wakeups,
1428 &mut currently_checking_set,
1429 &mut delayed_approvals_timers,
1430 &mut approvals_cache,
1431 &mut subsystem.mode,
1432 actions,
1433 subsystem.max_approval_retries,
1434 subsystem.retry_backoff,
1435 )
1436 .await?
1437 {
1438 break;
1439 }
1440
1441 if !overlayed_db.is_empty() {
1442 let _timer = subsystem.metrics.time_db_transaction();
1443 let ops = overlayed_db.into_write_ops();
1444 backend.write(ops)?;
1445 }
1446 }
1447
1448 Ok(())
1449}
1450
1451pub async fn start_approval_worker<
1453 WorkProvider: ApprovalVotingWorkProvider + Send + 'static,
1454 Sender: SubsystemSender<ChainApiMessage>
1455 + SubsystemSender<RuntimeApiMessage>
1456 + SubsystemSender<ChainSelectionMessage>
1457 + SubsystemSender<AvailabilityRecoveryMessage>
1458 + SubsystemSender<DisputeCoordinatorMessage>
1459 + SubsystemSender<CandidateValidationMessage>
1460 + Clone,
1461 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1462>(
1463 work_provider: WorkProvider,
1464 to_other_subsystems: Sender,
1465 to_approval_distr: ADSender,
1466 config: Config,
1467 db: Arc<dyn Database>,
1468 keystore: Arc<LocalKeystore>,
1469 sync_oracle: Box<dyn SyncOracle + Send>,
1470 metrics: Metrics,
1471 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
1472 task_name: &'static str,
1473 group_name: &'static str,
1474 clock: Arc<dyn Clock + Send + Sync>,
1475) -> SubsystemResult<()> {
1476 let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
1477 config,
1478 db.clone(),
1479 keystore,
1480 sync_oracle,
1481 metrics,
1482 clock,
1483 spawner,
1484 MAX_APPROVAL_RETRIES,
1485 APPROVAL_CHECKING_TIMEOUT / 2,
1486 );
1487 let backend = DbBackend::new(db.clone(), approval_voting.db_config);
1488 let spawner = approval_voting.spawner.clone();
1489 spawner.spawn_blocking(
1490 task_name,
1491 Some(group_name),
1492 Box::pin(async move {
1493 if let Err(err) = run(
1494 work_provider,
1495 to_other_subsystems,
1496 to_approval_distr,
1497 approval_voting,
1498 Box::new(RealAssignmentCriteria),
1499 backend,
1500 )
1501 .await
1502 {
1503 gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages");
1504 };
1505 }),
1506 );
1507 Ok(())
1508}
1509
1510#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
1530async fn handle_actions<
1531 Sender: SubsystemSender<ChainApiMessage>
1532 + SubsystemSender<RuntimeApiMessage>
1533 + SubsystemSender<ChainSelectionMessage>
1534 + SubsystemSender<AvailabilityRecoveryMessage>
1535 + SubsystemSender<DisputeCoordinatorMessage>
1536 + SubsystemSender<CandidateValidationMessage>
1537 + Clone,
1538 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1539>(
1540 sender: &mut Sender,
1541 approval_voting_sender: &mut ADSender,
1542 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1543 state: &mut State,
1544 overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
1545 session_info_provider: &mut RuntimeInfo,
1546 metrics: &Metrics,
1547 wakeups: &mut Wakeups,
1548 currently_checking_set: &mut CurrentlyCheckingSet,
1549 delayed_approvals_timers: &mut DelayedApprovalTimer,
1550 approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
1551 mode: &mut Mode,
1552 actions: Vec<Action>,
1553 max_approval_retries: u32,
1554 retry_backoff: Duration,
1555) -> SubsystemResult<bool> {
1556 let mut conclude = false;
1557 let mut actions_iter = actions.into_iter();
1558 while let Some(action) = actions_iter.next() {
1559 match action {
1560 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
1561 wakeups.schedule(block_hash, block_number, candidate_hash, tick);
1562 },
1563 Action::IssueApproval(candidate_hash, approval_request) => {
1564 let next_actions: Vec<Action> = issue_approval(
1576 sender,
1577 approval_voting_sender,
1578 state,
1579 overlayed_db,
1580 session_info_provider,
1581 metrics,
1582 candidate_hash,
1583 delayed_approvals_timers,
1584 approval_request,
1585 &wakeups,
1586 )
1587 .await?
1588 .into_iter()
1589 .map(|v| v.clone())
1590 .chain(actions_iter)
1591 .collect();
1592
1593 actions_iter = next_actions.into_iter();
1594 },
1595 Action::LaunchApproval {
1596 claimed_candidate_indices,
1597 candidate_hash,
1598 indirect_cert,
1599 assignment_tranche,
1600 relay_block_hash,
1601 session,
1602 candidate,
1603 backing_group,
1604 distribute_assignment,
1605 core_index,
1606 } => {
1607 if let Mode::Syncing(_) = *mode {
1609 continue;
1610 }
1611
1612 metrics.on_assignment_produced(assignment_tranche);
1613 let block_hash = indirect_cert.block_hash;
1614 let validator_index = indirect_cert.validator;
1615
1616 if distribute_assignment {
1617 approval_voting_sender.send_unbounded_message(
1618 ApprovalDistributionMessage::DistributeAssignment(
1619 indirect_cert,
1620 claimed_candidate_indices,
1621 ),
1622 );
1623 }
1624
1625 match approvals_cache.get(&candidate_hash) {
1626 Some(ApprovalOutcome::Approved) => {
1627 let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
1628 candidate_hash,
1629 ApprovalVoteRequest { validator_index, block_hash },
1630 ))
1631 .map(|v| v.clone())
1632 .chain(actions_iter)
1633 .collect();
1634 actions_iter = new_actions.into_iter();
1635 },
1636 None => {
1637 let sender = sender.clone();
1638 let spawn_handle = spawn_handle.clone();
1639
1640 let retry = RetryApprovalInfo {
1641 candidate: candidate.clone(),
1642 backing_group,
1643 core_index,
1644 session_index: session,
1645 attempts_remaining: max_approval_retries,
1646 backoff: retry_backoff,
1647 };
1648
1649 currently_checking_set
1650 .insert_relay_block_hash(
1651 candidate_hash,
1652 validator_index,
1653 relay_block_hash,
1654 async move {
1655 launch_approval(
1656 sender,
1657 spawn_handle,
1658 metrics.clone(),
1659 session,
1660 candidate,
1661 validator_index,
1662 block_hash,
1663 backing_group,
1664 core_index,
1665 retry,
1666 )
1667 .await
1668 },
1669 )
1670 .await?;
1671 },
1672 Some(_) => {},
1673 }
1674 },
1675 Action::NoteApprovedInChainSelection(block_hash) => {
1676 sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
1677 },
1678 Action::BecomeActive => {
1679 *mode = Mode::Active;
1680
1681 let (messages, next_actions) = distribution_messages_for_activation(
1682 overlayed_db,
1683 state,
1684 delayed_approvals_timers,
1685 )
1686 .await?;
1687 for message in messages.into_iter() {
1688 approval_voting_sender.send_unbounded_message(message);
1689 }
1690 let next_actions: Vec<Action> =
1691 next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
1692
1693 actions_iter = next_actions.into_iter();
1694 },
1695 Action::Conclude => {
1696 conclude = true;
1697 },
1698 }
1699 }
1700
1701 Ok(conclude)
1702}
1703
1704fn cores_to_candidate_indices(
1705 core_indices: &CoreBitfield,
1706 block_entry: &BlockEntry,
1707) -> Result<CandidateBitfield, BitfieldError> {
1708 let mut candidate_indices = Vec::new();
1709
1710 for claimed_core_index in core_indices.iter_ones() {
1712 if let Some(candidate_index) = block_entry
1713 .candidates()
1714 .iter()
1715 .position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
1716 {
1717 candidate_indices.push(candidate_index as _);
1718 }
1719 }
1720
1721 CandidateBitfield::try_from(candidate_indices)
1722}
1723
1724fn get_core_indices_on_startup(
1727 assignment: &AssignmentCertKindV2,
1728 block_entry_core_index: CoreIndex,
1729) -> CoreBitfield {
1730 match &assignment {
1731 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
1732 AssignmentCertKindV2::RelayVRFModulo { sample: _ } => {
1733 CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed")
1734 },
1735 AssignmentCertKindV2::RelayVRFDelay { core_index } => {
1736 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1737 },
1738 }
1739}
1740
1741fn get_assignment_core_indices(
1745 assignment: &AssignmentCertKindV2,
1746 candidate_hash: &CandidateHash,
1747 block_entry: &BlockEntry,
1748) -> Option<CoreBitfield> {
1749 match &assignment {
1750 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => {
1751 Some(core_bitfield.clone())
1752 },
1753 AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
1754 .candidates()
1755 .iter()
1756 .find(|(_core_index, h)| candidate_hash == h)
1757 .map(|(core_index, _candidate_hash)| {
1758 CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
1759 }),
1760 AssignmentCertKindV2::RelayVRFDelay { core_index } => {
1761 Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"))
1762 },
1763 }
1764}
1765
1766async fn distribution_messages_for_activation(
1767 db: &OverlayedBackend<'_, impl Backend>,
1768 state: &State,
1769 delayed_approvals_timers: &mut DelayedApprovalTimer,
1770) -> SubsystemResult<(Vec<ApprovalDistributionMessage>, Vec<Action>)> {
1771 let all_blocks: Vec<Hash> = db.load_all_blocks()?;
1772
1773 let mut approval_meta = Vec::with_capacity(all_blocks.len());
1774 let mut messages = Vec::new();
1775 let mut approvals = Vec::new();
1776 let mut actions = Vec::new();
1777
1778 messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); for block_hash in all_blocks {
1781 let block_entry = match db.load_block_entry(&block_hash)? {
1782 Some(b) => b,
1783 None => {
1784 gum::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry");
1785
1786 continue;
1787 },
1788 };
1789
1790 approval_meta.push(BlockApprovalMeta {
1791 hash: block_hash,
1792 number: block_entry.block_number(),
1793 parent_hash: block_entry.parent_hash(),
1794 candidates: block_entry
1795 .candidates()
1796 .iter()
1797 .map(|(core_index, c_hash)| {
1798 let candidate = db.load_candidate_entry(c_hash).ok().flatten();
1799 let group_index = candidate
1800 .and_then(|entry| {
1801 entry.approval_entry(&block_hash).map(|entry| entry.backing_group())
1802 })
1803 .unwrap_or_else(|| {
1804 gum::warn!(
1805 target: LOG_TARGET,
1806 ?block_hash,
1807 ?c_hash,
1808 "Missing candidate entry or approval entry",
1809 );
1810 GroupIndex::default()
1811 });
1812 (*c_hash, *core_index, group_index)
1813 })
1814 .collect(),
1815 slot: block_entry.slot(),
1816 session: block_entry.session(),
1817 vrf_story: block_entry.relay_vrf_story(),
1818 });
1819 let mut signatures_queued = HashSet::new();
1820 for (core_index, candidate_hash) in block_entry.candidates() {
1821 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
1822 Some(c) => c,
1823 None => {
1824 gum::warn!(
1825 target: LOG_TARGET,
1826 ?block_hash,
1827 ?candidate_hash,
1828 "Missing candidate entry",
1829 );
1830
1831 continue;
1832 },
1833 };
1834
1835 match candidate_entry.approval_entry(&block_hash) {
1836 Some(approval_entry) => {
1837 match approval_entry.local_statements() {
1838 (None, None) => {
1839 if approval_entry
1840 .our_assignment()
1841 .map(|assignment| !assignment.triggered())
1842 .unwrap_or(false)
1843 {
1844 actions.push(Action::ScheduleWakeup {
1845 block_hash,
1846 block_number: block_entry.block_number(),
1847 candidate_hash: *candidate_hash,
1848 tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
1849 })
1850 }
1851 },
1852 (None, Some(_)) => {}, (Some(assignment), None) => {
1854 let claimed_core_indices =
1855 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1856
1857 if block_entry.has_candidates_pending_signature() {
1858 delayed_approvals_timers.maybe_arm_timer(
1859 state.clock.tick_now(),
1860 state.clock.as_ref(),
1861 block_entry.block_hash(),
1862 assignment.validator_index(),
1863 )
1864 }
1865
1866 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1867 Ok(bitfield) => {
1868 gum::debug!(
1869 target: LOG_TARGET,
1870 candidate_hash = ?candidate_entry.candidate_receipt().hash(),
1871 ?block_hash,
1872 "Discovered, triggered assignment, not approved yet",
1873 );
1874
1875 let indirect_cert = IndirectAssignmentCertV2 {
1876 block_hash,
1877 validator: assignment.validator_index(),
1878 cert: assignment.cert().clone(),
1879 };
1880 messages.push(
1881 ApprovalDistributionMessage::DistributeAssignment(
1882 indirect_cert.clone(),
1883 bitfield.clone(),
1884 ),
1885 );
1886
1887 if !block_entry.candidate_is_pending_signature(*candidate_hash)
1888 {
1889 actions.push(Action::LaunchApproval {
1890 claimed_candidate_indices: bitfield,
1891 candidate_hash: candidate_entry
1892 .candidate_receipt()
1893 .hash(),
1894 indirect_cert,
1895 assignment_tranche: assignment.tranche(),
1896 relay_block_hash: block_hash,
1897 session: block_entry.session(),
1898 candidate: candidate_entry.candidate_receipt().clone(),
1899 backing_group: approval_entry.backing_group(),
1900 distribute_assignment: false,
1901 core_index: Some(*core_index),
1902 });
1903 }
1904 },
1905 Err(err) => {
1906 gum::warn!(
1909 target: LOG_TARGET,
1910 ?block_hash,
1911 ?candidate_hash,
1912 ?err,
1913 "Failed to create assignment bitfield",
1914 );
1915 },
1916 }
1917 },
1918 (Some(assignment), Some(approval_sig)) => {
1919 let claimed_core_indices =
1920 get_core_indices_on_startup(&assignment.cert().kind, *core_index);
1921 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
1922 Ok(bitfield) => messages.push(
1923 ApprovalDistributionMessage::DistributeAssignment(
1924 IndirectAssignmentCertV2 {
1925 block_hash,
1926 validator: assignment.validator_index(),
1927 cert: assignment.cert().clone(),
1928 },
1929 bitfield,
1930 ),
1931 ),
1932 Err(err) => {
1933 gum::warn!(
1934 target: LOG_TARGET,
1935 ?block_hash,
1936 ?candidate_hash,
1937 ?err,
1938 "Failed to create assignment bitfield",
1939 );
1940 continue;
1942 },
1943 }
1944 if signatures_queued
1945 .insert(approval_sig.signed_candidates_indices.clone())
1946 {
1947 approvals.push(ApprovalDistributionMessage::DistributeApproval(
1948 IndirectSignedApprovalVoteV2 {
1949 block_hash,
1950 candidate_indices: approval_sig.signed_candidates_indices,
1951 validator: assignment.validator_index(),
1952 signature: approval_sig.signature,
1953 },
1954 ))
1955 };
1956 },
1957 }
1958 },
1959 None => {
1960 gum::warn!(
1961 target: LOG_TARGET,
1962 ?block_hash,
1963 ?candidate_hash,
1964 "Missing approval entry",
1965 );
1966 },
1967 }
1968 }
1969 }
1970
1971 messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
1972 messages.extend(approvals.into_iter());
1976 Ok((messages, actions))
1977}
1978
1979async fn handle_from_overseer<
1981 Sender: SubsystemSender<ChainApiMessage>
1982 + SubsystemSender<RuntimeApiMessage>
1983 + SubsystemSender<ChainSelectionMessage>
1984 + Clone,
1985 ADSender: SubsystemSender<ApprovalDistributionMessage>,
1986>(
1987 sender: &mut Sender,
1988 approval_voting_sender: &mut ADSender,
1989 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
1990 state: &mut State,
1991 db: &mut OverlayedBackend<'_, impl Backend>,
1992 session_info_provider: &mut RuntimeInfo,
1993 metrics: &Metrics,
1994 x: FromOrchestra<ApprovalVotingMessage>,
1995 last_finalized_height: &mut Option<BlockNumber>,
1996 wakeups: &mut Wakeups,
1997) -> SubsystemResult<Vec<Action>> {
1998 let actions = match x {
1999 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
2000 let mut actions = Vec::new();
2001 if let Some(activated) = update.activated {
2002 let head = activated.hash;
2003 match import::handle_new_head(
2004 sender,
2005 approval_voting_sender,
2006 state,
2007 db,
2008 session_info_provider,
2009 head,
2010 last_finalized_height,
2011 )
2012 .await
2013 {
2014 Err(e) => return Err(SubsystemError::with_origin("db", e)),
2015 Ok(block_imported_candidates) => {
2016 for block_batch in block_imported_candidates {
2018 gum::debug!(
2019 target: LOG_TARGET,
2020 block_number = ?block_batch.block_number,
2021 block_hash = ?block_batch.block_hash,
2022 num_candidates = block_batch.imported_candidates.len(),
2023 "Imported new block.",
2024 );
2025
2026 state.no_show_stats.maybe_print(block_batch.block_number);
2027
2028 for (c_hash, c_entry) in block_batch.imported_candidates {
2029 metrics.on_candidate_imported();
2030
2031 let our_tranche = c_entry
2032 .approval_entry(&block_batch.block_hash)
2033 .and_then(|a| a.our_assignment().map(|a| a.tranche()));
2034
2035 if let Some(our_tranche) = our_tranche {
2036 let tick = our_tranche as Tick + block_batch.block_tick;
2037 gum::trace!(
2038 target: LOG_TARGET,
2039 tranche = our_tranche,
2040 candidate_hash = ?c_hash,
2041 block_hash = ?block_batch.block_hash,
2042 block_tick = block_batch.block_tick,
2043 "Scheduling first wakeup.",
2044 );
2045
2046 actions.push(Action::ScheduleWakeup {
2050 block_hash: block_batch.block_hash,
2051 block_number: block_batch.block_number,
2052 candidate_hash: c_hash,
2053 tick,
2054 });
2055 }
2056 }
2057 }
2058 },
2059 }
2060 }
2061
2062 actions
2063 },
2064 FromOrchestra::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
2065 gum::debug!(target: LOG_TARGET, ?block_hash, ?block_number, "Block finalized");
2066 *last_finalized_height = Some(block_number);
2067
2068 crate::ops::canonicalize(db, block_number, block_hash)
2069 .map_err(|e| SubsystemError::with_origin("db", e))?;
2070
2071 wakeups.prune_finalized_wakeups(block_number);
2074 state.cleanup_assignments_gathering_timestamp(block_number);
2075
2076 Vec::new()
2082 },
2083 FromOrchestra::Signal(OverseerSignal::Conclude) => {
2084 vec![Action::Conclude]
2085 },
2086 FromOrchestra::Communication { msg } => match msg {
2087 ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => {
2088 let (check_outcome, actions) =
2089 import_assignment(sender, state, db, session_info_provider, checked_assignment)
2090 .await?;
2091 if let AssignmentCheckResult::Bad(ref err) = check_outcome {
2095 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an assignment");
2096 }
2097 let _ = tx.map(|tx| tx.send(check_outcome));
2098 actions
2099 },
2100 ApprovalVotingMessage::ImportApproval(a, tx) => {
2101 let result =
2102 import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups)
2103 .await?;
2104 if let ApprovalCheckResult::Bad(ref err) = result.1 {
2108 gum::debug!(target: LOG_TARGET, ?err, "Unexpected fail when importing an approval");
2109 }
2110 let _ = tx.map(|tx| tx.send(result.1));
2111
2112 result.0
2113 },
2114 ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
2115 match handle_approved_ancestor(sender, db, target, lower_bound, wakeups, &metrics)
2116 .await
2117 {
2118 Ok(v) => {
2119 let _ = res.send(v);
2120 },
2121 Err(e) => {
2122 let _ = res.send(None);
2123 return Err(e);
2124 },
2125 }
2126
2127 Vec::new()
2128 },
2129 ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => {
2130 metrics.on_candidate_signatures_request();
2131 get_approval_signatures_for_candidate(
2132 approval_voting_sender.clone(),
2133 spawn_handle,
2134 db,
2135 candidate_hash,
2136 tx,
2137 )
2138 .await?;
2139 Vec::new()
2140 },
2141 },
2142 };
2143
2144 Ok(actions)
2145}
2146
2147#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2152async fn get_approval_signatures_for_candidate<
2153 Sender: SubsystemSender<ApprovalDistributionMessage>,
2154>(
2155 mut sender: Sender,
2156 spawn_handle: &Arc<dyn overseer::gen::Spawner + 'static>,
2157 db: &OverlayedBackend<'_, impl Backend>,
2158 candidate_hash: CandidateHash,
2159 tx: oneshot::Sender<HashMap<ValidatorIndex, (Vec<CandidateHash>, ValidatorSignature)>>,
2160) -> SubsystemResult<()> {
2161 let send_votes = |votes| {
2162 if let Err(_) = tx.send(votes) {
2163 gum::debug!(
2164 target: LOG_TARGET,
2165 "Sending approval signatures back failed, as receiver got closed."
2166 );
2167 }
2168 };
2169 let entry = match db.load_candidate_entry(&candidate_hash)? {
2170 None => {
2171 send_votes(HashMap::new());
2172 gum::debug!(
2173 target: LOG_TARGET,
2174 ?candidate_hash,
2175 "Sent back empty votes because the candidate was not found in db."
2176 );
2177 return Ok(());
2178 },
2179 Some(e) => e,
2180 };
2181
2182 let relay_hashes = entry.block_assignments.keys();
2183
2184 let mut candidate_indices = HashSet::new();
2185 let mut candidate_indices_to_candidate_hashes: HashMap<
2186 Hash,
2187 HashMap<CandidateIndex, CandidateHash>,
2188 > = HashMap::new();
2189
2190 for hash in relay_hashes {
2192 let entry = match db.load_block_entry(hash)? {
2193 None => {
2194 gum::debug!(
2195 target: LOG_TARGET,
2196 ?candidate_hash,
2197 ?hash,
2198 "Block entry for assignment missing."
2199 );
2200 continue;
2201 },
2202 Some(e) => e,
2203 };
2204 for (candidate_index, (_core_index, c_hash)) in entry.candidates().iter().enumerate() {
2205 if c_hash == &candidate_hash {
2206 candidate_indices.insert((*hash, candidate_index as u32));
2207 }
2208 candidate_indices_to_candidate_hashes
2209 .entry(*hash)
2210 .or_default()
2211 .insert(candidate_index as _, *c_hash);
2212 }
2213 }
2214
2215 let get_approvals = async move {
2216 let (tx_distribution, rx_distribution) = oneshot::channel();
2217 sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
2218 candidate_indices,
2219 tx_distribution,
2220 ));
2221
2222 match rx_distribution.timeout(WAIT_FOR_SIGS_TIMEOUT).await {
2225 None => {
2226 gum::warn!(
2227 target: LOG_TARGET,
2228 "Waiting for approval signatures timed out - dead lock?"
2229 );
2230 },
2231 Some(Err(_)) => gum::debug!(
2232 target: LOG_TARGET,
2233 "Request for approval signatures got cancelled by `approval-distribution`."
2234 ),
2235 Some(Ok(votes)) => {
2236 let votes = votes
2237 .into_iter()
2238 .filter_map(|(validator_index, (hash, signed_candidates_indices, signature))| {
2239 let candidates_hashes = candidate_indices_to_candidate_hashes.get(&hash);
2240
2241 if candidates_hashes.is_none() {
2242 gum::warn!(
2243 target: LOG_TARGET,
2244 ?hash,
2245 "Possible bug! Could not find map of candidate_hashes for block hash received from approval-distribution"
2246 );
2247 }
2248
2249 let num_signed_candidates = signed_candidates_indices.len();
2250
2251 let signed_candidates_hashes: Vec<CandidateHash> =
2252 signed_candidates_indices
2253 .into_iter()
2254 .filter_map(|candidate_index| {
2255 candidates_hashes.and_then(|candidate_hashes| {
2256 if let Some(candidate_hash) =
2257 candidate_hashes.get(&candidate_index)
2258 {
2259 Some(*candidate_hash)
2260 } else {
2261 gum::warn!(
2262 target: LOG_TARGET,
2263 ?candidate_index,
2264 "Possible bug! Could not find candidate hash for candidate_index coming from approval-distribution"
2265 );
2266 None
2267 }
2268 })
2269 })
2270 .collect();
2271 if num_signed_candidates == signed_candidates_hashes.len() {
2272 Some((validator_index, (signed_candidates_hashes, signature)))
2273 } else {
2274 gum::warn!(
2275 target: LOG_TARGET,
2276 "Possible bug! Could not find all hashes for candidates coming from approval-distribution"
2277 );
2278 None
2279 }
2280 })
2281 .collect();
2282 send_votes(votes)
2283 },
2284 }
2285 };
2286
2287 gum::trace!(
2290 target: LOG_TARGET,
2291 ?candidate_hash,
2292 "Spawning task for fetching signatures from approval-distribution"
2293 );
2294 spawn_handle.spawn(
2295 "get-approval-signatures",
2296 Some("approval-voting-subsystem"),
2297 Box::pin(get_approvals),
2298 );
2299 Ok(())
2300}
2301
2302#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
2303async fn handle_approved_ancestor<Sender: SubsystemSender<ChainApiMessage>>(
2304 sender: &mut Sender,
2305 db: &OverlayedBackend<'_, impl Backend>,
2306 target: Hash,
2307 lower_bound: BlockNumber,
2308 wakeups: &Wakeups,
2309 metrics: &Metrics,
2310) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
2311 const MAX_TRACING_WINDOW: usize = 200;
2312 const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
2313 const LOGGING_DEPTH_THRESHOLD: usize = 10;
2314
2315 let mut all_approved_max = None;
2316
2317 let target_number = {
2318 let (tx, rx) = oneshot::channel();
2319
2320 sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
2321
2322 match rx.await {
2323 Ok(Ok(Some(n))) => n,
2324 Ok(Ok(None)) => return Ok(None),
2325 Ok(Err(_)) | Err(_) => return Ok(None),
2326 }
2327 };
2328
2329 if target_number <= lower_bound {
2330 return Ok(None);
2331 }
2332
2333 let ancestry = if target_number > lower_bound + 1 {
2337 let (tx, rx) = oneshot::channel();
2338
2339 sender
2340 .send_message(ChainApiMessage::Ancestors {
2341 hash: target,
2342 k: (target_number - (lower_bound + 1)) as usize,
2343 response_channel: tx,
2344 })
2345 .await;
2346
2347 match rx.await {
2348 Ok(Ok(a)) => a,
2349 Err(_) | Ok(Err(_)) => return Ok(None),
2350 }
2351 } else {
2352 Vec::new()
2353 };
2354 let ancestry_len = ancestry.len();
2355
2356 let mut block_descriptions = Vec::new();
2357
2358 let mut bits: BitVec<u8, Lsb0> = Default::default();
2359 for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
2360 let entry = match db.load_block_entry(&block_hash)? {
2364 None => {
2365 let block_number = target_number.saturating_sub(i as u32);
2366 gum::info!(
2367 target: LOG_TARGET,
2368 unknown_number = ?block_number,
2369 unknown_hash = ?block_hash,
2370 "Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
2371 target,
2372 target_number,
2373 lower_bound,
2374 lower_bound,
2375 );
2376 return Ok(None);
2377 },
2378 Some(b) => b,
2379 };
2380
2381 bits.push(entry.is_fully_approved());
2384 if entry.is_fully_approved() {
2385 if all_approved_max.is_none() {
2386 all_approved_max = Some((block_hash, target_number - i as BlockNumber));
2389 }
2390 block_descriptions.push(BlockDescription {
2391 block_hash,
2392 session: entry.session(),
2393 candidates: entry
2394 .candidates()
2395 .iter()
2396 .map(|(_idx, candidate_hash)| *candidate_hash)
2397 .collect(),
2398 });
2399 } else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
2400 all_approved_max = None;
2401 block_descriptions.clear();
2402 } else {
2403 all_approved_max = None;
2404 block_descriptions.clear();
2405
2406 let unapproved: Vec<_> = entry.unapproved_candidates().collect();
2407 gum::debug!(
2408 target: LOG_TARGET,
2409 "Block {} is {} blocks deep and has {}/{} candidates unapproved",
2410 block_hash,
2411 bits.len() - 1,
2412 unapproved.len(),
2413 entry.candidates().len(),
2414 );
2415 if ancestry_len >= LOGGING_DEPTH_THRESHOLD && i > ancestry_len - LOGGING_DEPTH_THRESHOLD
2416 {
2417 gum::trace!(
2418 target: LOG_TARGET,
2419 ?block_hash,
2420 "Unapproved candidates at depth {}: {:?}",
2421 bits.len(),
2422 unapproved
2423 )
2424 }
2425 metrics.on_unapproved_candidates_in_unfinalized_chain(unapproved.len());
2426 for candidate_hash in unapproved {
2427 match db.load_candidate_entry(&candidate_hash)? {
2428 None => {
2429 gum::warn!(
2430 target: LOG_TARGET,
2431 ?candidate_hash,
2432 "Missing expected candidate in DB",
2433 );
2434
2435 continue;
2436 },
2437 Some(c_entry) => match c_entry.approval_entry(&block_hash) {
2438 None => {
2439 gum::warn!(
2440 target: LOG_TARGET,
2441 ?candidate_hash,
2442 ?block_hash,
2443 "Missing expected approval entry under candidate.",
2444 );
2445 },
2446 Some(a_entry) => {
2447 let status = || {
2448 let n_assignments = a_entry.n_assignments();
2449
2450 let n_approvals = c_entry
2453 .approvals()
2454 .iter()
2455 .by_vals()
2456 .enumerate()
2457 .filter(|(i, approved)| {
2458 *approved && a_entry.is_assigned(ValidatorIndex(*i as _))
2459 })
2460 .count();
2461
2462 format!(
2463 "{}/{}/{}",
2464 n_assignments,
2465 n_approvals,
2466 a_entry.n_validators(),
2467 )
2468 };
2469
2470 match a_entry.our_assignment() {
2471 None => gum::debug!(
2472 target: LOG_TARGET,
2473 ?candidate_hash,
2474 ?block_hash,
2475 status = %status(),
2476 "no assignment."
2477 ),
2478 Some(a) => {
2479 let tranche = a.tranche();
2480 let triggered = a.triggered();
2481
2482 let next_wakeup =
2483 wakeups.wakeup_for(block_hash, candidate_hash);
2484
2485 let approved =
2486 triggered && { a_entry.local_statements().1.is_some() };
2487
2488 gum::debug!(
2489 target: LOG_TARGET,
2490 ?candidate_hash,
2491 ?block_hash,
2492 tranche,
2493 ?next_wakeup,
2494 status = %status(),
2495 triggered,
2496 approved,
2497 "assigned."
2498 );
2499 },
2500 }
2501 },
2502 },
2503 }
2504 }
2505 }
2506 }
2507
2508 gum::debug!(
2509 target: LOG_TARGET,
2510 "approved blocks {}-[{}]-{}",
2511 target_number,
2512 {
2513 let mut s = String::with_capacity(bits.len());
2517 for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
2518 s.push(if *bit { '1' } else { '0' });
2519 if (target_number - i as u32).is_multiple_of(10) && i != bits.len() - 1 {
2520 s.push(' ');
2521 }
2522 }
2523
2524 s
2525 },
2526 if bits.len() > MAX_TRACING_WINDOW {
2527 format!(
2528 "{}... (truncated due to large window)",
2529 target_number - MAX_TRACING_WINDOW as u32 + 1,
2530 )
2531 } else {
2532 format!("{}", lower_bound + 1)
2533 },
2534 );
2535
2536 block_descriptions.reverse();
2539
2540 let all_approved_max =
2541 all_approved_max.map(|(hash, block_number)| HighestApprovedAncestorBlock {
2542 hash,
2543 number: block_number,
2544 descriptions: block_descriptions,
2545 });
2546
2547 Ok(all_approved_max)
2548}
2549
2550fn min_prefer_some<T: std::cmp::Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
2552 match (a, b) {
2553 (None, None) => None,
2554 (None, Some(x)) | (Some(x), None) => Some(x),
2555 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
2556 }
2557}
2558
2559fn schedule_wakeup_action(
2560 approval_entry: &ApprovalEntry,
2561 block_hash: Hash,
2562 block_number: BlockNumber,
2563 candidate_hash: CandidateHash,
2564 block_tick: Tick,
2565 tick_now: Tick,
2566 required_tranches: RequiredTranches,
2567) -> Option<Action> {
2568 let maybe_action = match required_tranches {
2569 _ if approval_entry.is_approved() => None,
2570 RequiredTranches::All => None,
2571 RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => {
2572 min_prefer_some(
2575 last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
2576 next_no_show,
2577 )
2578 .map(|tick| Action::ScheduleWakeup {
2579 block_hash,
2580 block_number,
2581 candidate_hash,
2582 tick,
2583 })
2584 },
2585 RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
2586 let next_non_empty_tranche = {
2590 let next_announced = approval_entry
2591 .tranches()
2592 .iter()
2593 .skip_while(|t| t.tranche() <= considered)
2594 .map(|t| t.tranche())
2595 .next();
2596
2597 let our_untriggered = approval_entry.our_assignment().and_then(|t| {
2598 if !t.triggered() && t.tranche() > considered {
2599 Some(t.tranche())
2600 } else {
2601 None
2602 }
2603 });
2604
2605 min_prefer_some(next_announced, our_untriggered)
2607 .map(|t| t as Tick + block_tick + clock_drift)
2608 };
2609
2610 min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| {
2611 Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick }
2612 })
2613 },
2614 };
2615
2616 match maybe_action {
2617 Some(Action::ScheduleWakeup { ref tick, .. }) => gum::trace!(
2618 target: LOG_TARGET,
2619 tick,
2620 ?candidate_hash,
2621 ?block_hash,
2622 block_tick,
2623 "Scheduling next wakeup.",
2624 ),
2625 None => gum::trace!(
2626 target: LOG_TARGET,
2627 ?candidate_hash,
2628 ?block_hash,
2629 block_tick,
2630 "No wakeup needed.",
2631 ),
2632 Some(_) => {}, }
2634
2635 maybe_action
2636}
2637
2638async fn import_assignment<Sender>(
2639 sender: &mut Sender,
2640 state: &State,
2641 db: &mut OverlayedBackend<'_, impl Backend>,
2642 session_info_provider: &mut RuntimeInfo,
2643 checked_assignment: CheckedIndirectAssignment,
2644) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
2645where
2646 Sender: SubsystemSender<RuntimeApiMessage>,
2647{
2648 let tick_now = state.clock.tick_now();
2649 let assignment = checked_assignment.assignment();
2650 let candidate_indices = checked_assignment.candidate_indices();
2651 let tranche = checked_assignment.tranche();
2652
2653 let block_entry = match db.load_block_entry(&assignment.block_hash)? {
2654 Some(b) => b,
2655 None => {
2656 return Ok((
2657 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(
2658 assignment.block_hash,
2659 )),
2660 Vec::new(),
2661 ))
2662 },
2663 };
2664
2665 let session_info = match get_session_info_by_index(
2666 session_info_provider,
2667 sender,
2668 block_entry.parent_hash(),
2669 block_entry.session(),
2670 )
2671 .await
2672 {
2673 Some(s) => s,
2674 None => {
2675 return Ok((
2676 AssignmentCheckResult::Bad(AssignmentCheckError::UnknownSessionIndex(
2677 block_entry.session(),
2678 )),
2679 Vec::new(),
2680 ))
2681 },
2682 };
2683
2684 let n_cores = session_info.n_cores as usize;
2685
2686 if candidate_indices.len() > n_cores {
2689 gum::debug!(
2690 target: LOG_TARGET,
2691 validator = assignment.validator.0,
2692 n_cores,
2693 candidate_bitfield_len = ?candidate_indices.len(),
2694 "Oversized bitfield",
2695 );
2696
2697 return Ok((
2698 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
2699 candidate_indices.len(),
2700 )),
2701 Vec::new(),
2702 ));
2703 }
2704
2705 let mut claimed_core_indices = Vec::new();
2706 let mut assigned_candidate_hashes = Vec::new();
2707
2708 for candidate_index in candidate_indices.iter_ones() {
2709 let (claimed_core_index, assigned_candidate_hash) =
2710 match block_entry.candidate(candidate_index) {
2711 Some((c, h)) => (*c, *h),
2712 None => {
2713 return Ok((
2714 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
2715 candidate_index as _,
2716 )),
2717 Vec::new(),
2718 ))
2719 }, };
2721
2722 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2723 Some(c) => c,
2724 None => {
2725 return Ok((
2726 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2727 candidate_index as _,
2728 assigned_candidate_hash,
2729 )),
2730 Vec::new(),
2731 ))
2732 }, };
2734
2735 if candidate_entry.approval_entry_mut(&assignment.block_hash).is_none() {
2736 return Ok((
2737 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2738 assignment.block_hash,
2739 assigned_candidate_hash,
2740 )),
2741 Vec::new(),
2742 ));
2743 };
2744
2745 claimed_core_indices.push(claimed_core_index);
2746 assigned_candidate_hashes.push(assigned_candidate_hash);
2747 }
2748
2749 if claimed_core_indices.is_empty() {
2751 return Ok((
2752 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
2753 assignment.validator,
2754 format!("{:?}", InvalidAssignmentReason::NullAssignment),
2755 )),
2756 Vec::new(),
2757 ));
2758 }
2759
2760 let mut actions = Vec::new();
2761 let res = {
2762 let mut is_duplicate = true;
2763 for (assigned_candidate_hash, candidate_index) in
2765 assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
2766 {
2767 let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
2768 Some(c) => c,
2769 None => {
2770 return Ok((
2771 AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
2772 candidate_index as _,
2773 *assigned_candidate_hash,
2774 )),
2775 Vec::new(),
2776 ))
2777 },
2778 };
2779
2780 let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
2781 Some(a) => a,
2782 None => {
2783 return Ok((
2784 AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
2785 assignment.block_hash,
2786 *assigned_candidate_hash,
2787 )),
2788 Vec::new(),
2789 ))
2790 },
2791 };
2792
2793 let is_duplicate_for_candidate = approval_entry.is_assigned(assignment.validator);
2794 is_duplicate &= is_duplicate_for_candidate;
2795 approval_entry.import_assignment(
2796 tranche,
2797 assignment.validator,
2798 tick_now,
2799 is_duplicate_for_candidate,
2800 );
2801
2802 if let Some((approval_entry, status)) = state
2805 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
2806 .await
2807 {
2808 actions.extend(schedule_wakeup_action(
2809 approval_entry,
2810 block_entry.block_hash(),
2811 block_entry.block_number(),
2812 *assigned_candidate_hash,
2813 status.block_tick,
2814 tick_now,
2815 status.required_tranches,
2816 ));
2817 }
2818
2819 db.write_candidate_entry(candidate_entry.into());
2821 }
2822
2823 if is_duplicate {
2829 AssignmentCheckResult::AcceptedDuplicate
2830 } else if candidate_indices.count_ones() > 1 {
2831 gum::trace!(
2832 target: LOG_TARGET,
2833 validator = assignment.validator.0,
2834 candidate_hashes = ?assigned_candidate_hashes,
2835 assigned_cores = ?claimed_core_indices,
2836 ?tranche,
2837 "Imported assignments for multiple cores.",
2838 );
2839
2840 AssignmentCheckResult::Accepted
2841 } else {
2842 gum::trace!(
2843 target: LOG_TARGET,
2844 validator = assignment.validator.0,
2845 candidate_hashes = ?assigned_candidate_hashes,
2846 assigned_cores = ?claimed_core_indices,
2847 "Imported assignment for a single core.",
2848 );
2849
2850 AssignmentCheckResult::Accepted
2851 }
2852 };
2853
2854 Ok((res, actions))
2855}
2856
2857async fn import_approval<Sender>(
2858 sender: &mut Sender,
2859 state: &mut State,
2860 db: &mut OverlayedBackend<'_, impl Backend>,
2861 session_info_provider: &mut RuntimeInfo,
2862 metrics: &Metrics,
2863 approval: CheckedIndirectSignedApprovalVote,
2864 wakeups: &Wakeups,
2865) -> SubsystemResult<(Vec<Action>, ApprovalCheckResult)>
2866where
2867 Sender: SubsystemSender<RuntimeApiMessage>,
2868{
2869 macro_rules! respond_early {
2870 ($e: expr) => {{
2871 return Ok((Vec::new(), $e));
2872 }};
2873 }
2874
2875 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2876 Some(b) => b,
2877 None => {
2878 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2879 approval.block_hash
2880 ),))
2881 },
2882 };
2883
2884 let approved_candidates_info: Result<Vec<(CandidateIndex, CandidateHash)>, ApprovalCheckError> =
2885 approval
2886 .candidate_indices
2887 .iter_ones()
2888 .map(|candidate_index| {
2889 block_entry
2890 .candidate(candidate_index)
2891 .ok_or(ApprovalCheckError::InvalidCandidateIndex(candidate_index as _))
2892 .map(|candidate| (candidate_index as _, candidate.1))
2893 })
2894 .collect();
2895
2896 let approved_candidates_info = match approved_candidates_info {
2897 Ok(approved_candidates_info) => approved_candidates_info,
2898 Err(err) => {
2899 respond_early!(ApprovalCheckResult::Bad(err))
2900 },
2901 };
2902
2903 gum::trace!(
2904 target: LOG_TARGET,
2905 "Received approval for num_candidates {:}",
2906 approval.candidate_indices.count_ones()
2907 );
2908
2909 let mut actions = Vec::new();
2910 for (approval_candidate_index, approved_candidate_hash) in approved_candidates_info {
2911 let block_entry = match db.load_block_entry(&approval.block_hash)? {
2912 Some(b) => b,
2913 None => {
2914 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(
2915 approval.block_hash
2916 ),))
2917 },
2918 };
2919
2920 let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
2921 Some(c) => c,
2922 None => {
2923 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(
2924 approval_candidate_index,
2925 approved_candidate_hash
2926 ),))
2927 },
2928 };
2929
2930 match candidate_entry.approval_entry(&approval.block_hash) {
2932 None => {
2933 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::Internal(
2934 approval.block_hash,
2935 approved_candidate_hash
2936 ),))
2937 },
2938 Some(e) if !e.is_assigned(approval.validator) => {
2939 respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(
2940 approval.validator
2941 ),))
2942 },
2943 _ => {},
2944 }
2945
2946 gum::trace!(
2947 target: LOG_TARGET,
2948 validator_index = approval.validator.0,
2949 candidate_hash = ?approved_candidate_hash,
2950 para_id = ?candidate_entry.candidate_receipt().descriptor.para_id(),
2951 "Importing approval vote",
2952 );
2953
2954 let new_actions = advance_approval_state(
2955 sender,
2956 state,
2957 db,
2958 session_info_provider,
2959 &metrics,
2960 block_entry,
2961 approved_candidate_hash,
2962 candidate_entry,
2963 ApprovalStateTransition::RemoteApproval(approval.validator),
2964 wakeups,
2965 )
2966 .await;
2967 actions.extend(new_actions);
2968 }
2969
2970 Ok((actions, ApprovalCheckResult::Accepted))
2972}
2973
2974#[derive(Debug)]
2975enum ApprovalStateTransition {
2976 RemoteApproval(ValidatorIndex),
2977 LocalApproval(ValidatorIndex),
2978 WakeupProcessed,
2979}
2980
2981impl ApprovalStateTransition {
2982 fn validator_index(&self) -> Option<ValidatorIndex> {
2983 match *self {
2984 ApprovalStateTransition::RemoteApproval(v) |
2985 ApprovalStateTransition::LocalApproval(v) => Some(v),
2986 ApprovalStateTransition::WakeupProcessed => None,
2987 }
2988 }
2989
2990 fn is_local_approval(&self) -> bool {
2991 match *self {
2992 ApprovalStateTransition::RemoteApproval(_) => false,
2993 ApprovalStateTransition::LocalApproval(_) => true,
2994 ApprovalStateTransition::WakeupProcessed => false,
2995 }
2996 }
2997
2998 fn is_remote_approval(&self) -> bool {
2999 matches!(*self, ApprovalStateTransition::RemoteApproval(_))
3000 }
3001}
3002
3003async fn advance_approval_state<Sender>(
3008 sender: &mut Sender,
3009 state: &mut State,
3010 db: &mut OverlayedBackend<'_, impl Backend>,
3011 session_info_provider: &mut RuntimeInfo,
3012 metrics: &Metrics,
3013 mut block_entry: BlockEntry,
3014 candidate_hash: CandidateHash,
3015 mut candidate_entry: CandidateEntry,
3016 transition: ApprovalStateTransition,
3017 wakeups: &Wakeups,
3018) -> Vec<Action>
3019where
3020 Sender: SubsystemSender<RuntimeApiMessage>,
3021{
3022 let validator_index = transition.validator_index();
3023
3024 let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
3025 let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
3026
3027 if !transition.is_local_approval() {
3037 if candidate_approved_in_block {
3041 return Vec::new();
3042 }
3043 }
3044
3045 let mut actions = Vec::new();
3046 let block_hash = block_entry.block_hash();
3047 let block_number = block_entry.block_number();
3048 let session_index = block_entry.session();
3049 let para_id = candidate_entry.candidate_receipt().descriptor().para_id();
3050 let tick_now = state.clock.tick_now();
3051
3052 let (is_approved, status) = if let Some((approval_entry, status)) = state
3053 .approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
3054 .await
3055 {
3056 let check = approval_checking::check_approval(
3057 &candidate_entry,
3058 approval_entry,
3059 status.required_tranches.clone(),
3060 );
3061 state.observe_assignment_gathering_status(
3062 &metrics,
3063 &status.required_tranches,
3064 block_hash,
3065 block_entry.block_number(),
3066 candidate_hash,
3067 );
3068
3069 let is_approved = check.is_approved(tick_now.saturating_sub(APPROVAL_DELAY));
3073 if status.last_no_shows != 0 {
3074 metrics.on_observed_no_shows(status.last_no_shows);
3075 gum::trace!(
3076 target: LOG_TARGET,
3077 ?candidate_hash,
3078 ?block_hash,
3079 last_no_shows = ?status.last_no_shows,
3080 "Observed no_shows",
3081 );
3082 }
3083 if is_approved {
3084 gum::trace!(
3085 target: LOG_TARGET,
3086 ?candidate_hash,
3087 ?block_hash,
3088 "Candidate approved under block.",
3089 );
3090
3091 let no_shows = check.known_no_shows();
3092
3093 let was_block_approved = block_entry.is_fully_approved();
3094 block_entry.mark_approved_by_hash(&candidate_hash);
3095 let is_block_approved = block_entry.is_fully_approved();
3096
3097 if no_shows != 0 {
3098 metrics.on_no_shows(no_shows);
3099 }
3100 if check == Check::ApprovedOneThird {
3101 metrics.on_approved_by_one_third()
3105 }
3106
3107 metrics.on_candidate_approved(status.tranche_now as _);
3108
3109 if is_block_approved && !was_block_approved {
3110 metrics.on_block_approved(status.tranche_now as _);
3111 actions.push(Action::NoteApprovedInChainSelection(block_hash));
3112 }
3113
3114 db.write_block_entry(block_entry.into());
3115 } else if transition.is_local_approval() {
3116 db.write_block_entry(block_entry.into());
3119 }
3120
3121 (is_approved, status)
3122 } else {
3123 gum::warn!(
3124 target: LOG_TARGET,
3125 ?candidate_hash,
3126 ?block_hash,
3127 ?validator_index,
3128 "No approval entry for approval under block",
3129 );
3130
3131 return Vec::new();
3132 };
3133
3134 {
3135 let approval_entry = candidate_entry
3136 .approval_entry_mut(&block_hash)
3137 .expect("Approval entry just fetched; qed");
3138
3139 let was_approved = approval_entry.is_approved();
3140 let newly_approved = is_approved && !was_approved;
3141
3142 if is_approved {
3143 approval_entry.mark_approved();
3144 }
3145 if newly_approved {
3146 state.record_no_shows(session_index, para_id.into(), &status.no_show_validators);
3147 }
3148 actions.extend(schedule_wakeup_action(
3149 &approval_entry,
3150 block_hash,
3151 block_number,
3152 candidate_hash,
3153 status.block_tick,
3154 tick_now,
3155 status.required_tranches,
3156 ));
3157
3158 if is_approved && transition.is_remote_approval() {
3159 for (fork_block_hash, fork_approval_entry) in candidate_entry
3162 .block_assignments
3163 .iter()
3164 .filter(|(hash, _)| **hash != block_hash)
3165 {
3166 let assigned_on_fork_block = validator_index
3167 .as_ref()
3168 .map(|validator_index| fork_approval_entry.is_assigned(*validator_index))
3169 .unwrap_or_default();
3170 if wakeups.wakeup_for(*fork_block_hash, candidate_hash).is_none() &&
3171 !fork_approval_entry.is_approved() &&
3172 assigned_on_fork_block
3173 {
3174 let fork_block_entry = db.load_block_entry(fork_block_hash);
3175 if let Ok(Some(fork_block_entry)) = fork_block_entry {
3176 actions.push(Action::ScheduleWakeup {
3177 block_hash: *fork_block_hash,
3178 block_number: fork_block_entry.block_number(),
3179 candidate_hash,
3180 tick: tick_now + 1,
3183 })
3184 } else {
3185 gum::debug!(
3186 target: LOG_TARGET,
3187 ?fork_block_entry,
3188 ?fork_block_hash,
3189 "Failed to load block entry"
3190 )
3191 }
3192 }
3193 }
3194 }
3195 if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
3204 {
3205 db.write_candidate_entry(candidate_entry);
3207 }
3208 }
3209
3210 actions
3211}
3212
3213fn should_trigger_assignment(
3214 approval_entry: &ApprovalEntry,
3215 candidate_entry: &CandidateEntry,
3216 required_tranches: RequiredTranches,
3217 tranche_now: DelayTranche,
3218) -> bool {
3219 match approval_entry.our_assignment() {
3220 None => false,
3221 Some(ref assignment) if assignment.triggered() => false,
3222 Some(ref assignment) if assignment.tranche() == 0 => true,
3223 Some(ref assignment) => {
3224 match required_tranches {
3225 RequiredTranches::All => !approval_checking::check_approval(
3226 &candidate_entry,
3227 &approval_entry,
3228 RequiredTranches::All,
3229 )
3230 .is_approved(Tick::max_value()),
3232 RequiredTranches::Pending { maximum_broadcast, clock_drift, .. } => {
3233 let drifted_tranche_now =
3234 tranche_now.saturating_sub(clock_drift as DelayTranche);
3235 assignment.tranche() <= maximum_broadcast &&
3236 assignment.tranche() <= drifted_tranche_now
3237 },
3238 RequiredTranches::Exact { .. } => {
3239 false
3241 },
3242 }
3243 },
3244 }
3245}
3246
3247async fn process_wakeup<Sender: SubsystemSender<RuntimeApiMessage>>(
3248 sender: &mut Sender,
3249 state: &mut State,
3250 db: &mut OverlayedBackend<'_, impl Backend>,
3251 session_info_provider: &mut RuntimeInfo,
3252 relay_block: Hash,
3253 candidate_hash: CandidateHash,
3254 metrics: &Metrics,
3255 wakeups: &Wakeups,
3256) -> SubsystemResult<Vec<Action>> {
3257 let block_entry = db.load_block_entry(&relay_block)?;
3258 let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
3259
3260 let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
3262 (Some(b), Some(c)) => (b, c),
3263 _ => return Ok(Vec::new()),
3264 };
3265
3266 let (no_show_slots, needed_approvals) = match get_session_info_by_index(
3267 session_info_provider,
3268 sender,
3269 block_entry.block_hash(),
3270 block_entry.session(),
3271 )
3272 .await
3273 {
3274 Some(i) => (i.no_show_slots, i.needed_approvals),
3275 None => return Ok(Vec::new()),
3276 };
3277
3278 let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
3279 let no_show_duration =
3280 slot_number_to_tick(state.slot_duration_millis, Slot::from(u64::from(no_show_slots)));
3281 let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
3282
3283 gum::trace!(
3284 target: LOG_TARGET,
3285 tranche = tranche_now,
3286 ?candidate_hash,
3287 block_hash = ?relay_block,
3288 "Processing wakeup",
3289 );
3290
3291 let (should_trigger, backing_group) = {
3292 let approval_entry = match candidate_entry.approval_entry(&relay_block) {
3293 Some(e) => e,
3294 None => return Ok(Vec::new()),
3295 };
3296
3297 let tranches_to_approve = approval_checking::tranches_to_approve(
3298 &approval_entry,
3299 candidate_entry.approvals(),
3300 tranche_now,
3301 block_tick,
3302 no_show_duration,
3303 needed_approvals as _,
3304 );
3305
3306 let should_trigger = should_trigger_assignment(
3307 &approval_entry,
3308 &candidate_entry,
3309 tranches_to_approve.required_tranches,
3310 tranche_now,
3311 );
3312
3313 (should_trigger, approval_entry.backing_group())
3314 };
3315
3316 gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
3317
3318 let mut actions = Vec::new();
3319 let candidate_receipt = candidate_entry.candidate_receipt().clone();
3320
3321 let maybe_cert = if should_trigger {
3322 let maybe_cert = {
3323 let approval_entry = candidate_entry
3324 .approval_entry_mut(&relay_block)
3325 .expect("should_trigger only true if this fetched earlier; qed");
3326
3327 approval_entry.trigger_our_assignment(state.clock.tick_now())
3328 };
3329
3330 db.write_candidate_entry(candidate_entry.clone());
3331
3332 maybe_cert
3333 } else {
3334 None
3335 };
3336
3337 if let Some((cert, val_index, tranche)) = maybe_cert {
3338 let indirect_cert =
3339 IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
3340
3341 gum::trace!(
3342 target: LOG_TARGET,
3343 ?candidate_hash,
3344 para_id = ?candidate_receipt.descriptor.para_id(),
3345 block_hash = ?relay_block,
3346 "Launching approval work.",
3347 );
3348
3349 let candidate_core_index = block_entry
3350 .candidates()
3351 .iter()
3352 .find_map(|(core_index, h)| (h == &candidate_hash).then_some(*core_index));
3353
3354 if let Some(claimed_core_indices) =
3355 get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
3356 {
3357 match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
3358 Ok(claimed_candidate_indices) => {
3359 let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
3361 !block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
3362 } else {
3363 true
3364 };
3365 db.write_block_entry(block_entry.clone());
3366 actions.push(Action::LaunchApproval {
3367 claimed_candidate_indices,
3368 candidate_hash,
3369 indirect_cert,
3370 assignment_tranche: tranche,
3371 relay_block_hash: relay_block,
3372 session: block_entry.session(),
3373 candidate: candidate_receipt,
3374 backing_group,
3375 distribute_assignment,
3376 core_index: candidate_core_index,
3377 });
3378 },
3379 Err(err) => {
3380 gum::warn!(
3383 target: LOG_TARGET,
3384 block_hash = ?relay_block,
3385 ?err,
3386 "Failed to create assignment bitfield"
3387 );
3388 },
3389 };
3390 } else {
3391 gum::warn!(
3392 target: LOG_TARGET,
3393 block_hash = ?relay_block,
3394 ?candidate_hash,
3395 "Cannot get assignment claimed core indices",
3396 );
3397 }
3398 }
3399 actions.extend(
3406 advance_approval_state(
3407 sender,
3408 state,
3409 db,
3410 session_info_provider,
3411 metrics,
3412 block_entry,
3413 candidate_hash,
3414 candidate_entry,
3415 ApprovalStateTransition::WakeupProcessed,
3416 wakeups,
3417 )
3418 .await,
3419 );
3420
3421 Ok(actions)
3422}
3423
3424#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3428async fn launch_approval<
3429 Sender: SubsystemSender<RuntimeApiMessage>
3430 + SubsystemSender<AvailabilityRecoveryMessage>
3431 + SubsystemSender<DisputeCoordinatorMessage>
3432 + SubsystemSender<CandidateValidationMessage>,
3433>(
3434 mut sender: Sender,
3435 spawn_handle: Arc<dyn overseer::gen::Spawner + 'static>,
3436 metrics: Metrics,
3437 session_index: SessionIndex,
3438 candidate: CandidateReceipt,
3439 validator_index: ValidatorIndex,
3440 block_hash: Hash,
3441 backing_group: GroupIndex,
3442 core_index: Option<CoreIndex>,
3443 retry: RetryApprovalInfo,
3444) -> SubsystemResult<RemoteHandle<ApprovalState>> {
3445 let (a_tx, a_rx) = oneshot::channel();
3446 let (code_tx, code_rx) = oneshot::channel();
3447
3448 struct StaleGuard(Option<Metrics>);
3452
3453 impl StaleGuard {
3454 fn take(mut self) -> Metrics {
3455 self.0.take().expect(
3456 "
3457 consumed after take; so this cannot be called twice; \
3458 nothing in this function reaches into the struct to avoid this API; \
3459 qed
3460 ",
3461 )
3462 }
3463 }
3464
3465 impl Drop for StaleGuard {
3466 fn drop(&mut self) {
3467 if let Some(metrics) = self.0.as_ref() {
3468 metrics.on_approval_stale();
3469 }
3470 }
3471 }
3472
3473 let candidate_hash = candidate.hash();
3474 let para_id = candidate.descriptor.para_id();
3475 let mut next_retry = None;
3476 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
3477
3478 let timer = metrics.time_recover_and_approve();
3479 sender
3480 .send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
3481 candidate.clone(),
3482 session_index,
3483 Some(backing_group),
3484 core_index,
3485 a_tx,
3486 ))
3487 .await;
3488
3489 sender
3490 .send_message(RuntimeApiMessage::Request(
3491 block_hash,
3492 RuntimeApiRequest::ValidationCodeByHash(
3493 candidate.descriptor.validation_code_hash(),
3494 code_tx,
3495 ),
3496 ))
3497 .await;
3498
3499 let candidate = candidate.clone();
3500 let metrics_guard = StaleGuard(Some(metrics));
3501 let background = async move {
3502 let _timer = timer;
3504 let available_data = match a_rx.await {
3505 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3506 Ok(Ok(a)) => a,
3507 Ok(Err(e)) => {
3508 match &e {
3509 &RecoveryError::Unavailable => {
3510 gum::warn!(
3511 target: LOG_TARGET,
3512 ?para_id,
3513 ?candidate_hash,
3514 attempts_remaining = retry.attempts_remaining,
3515 "Data unavailable for candidate {:?}",
3516 (candidate_hash, candidate.descriptor.para_id()),
3517 );
3518 if retry.attempts_remaining > 0 {
3522 Delay::new(retry.backoff).await;
3523 next_retry = Some(RetryApprovalInfo {
3524 candidate,
3525 backing_group,
3526 core_index,
3527 session_index,
3528 attempts_remaining: retry.attempts_remaining - 1,
3529 backoff: retry.backoff,
3530 });
3531 } else {
3532 next_retry = None;
3533 }
3534 metrics_guard.take().on_approval_unavailable();
3535 },
3536 &RecoveryError::ChannelClosed => {
3537 gum::warn!(
3538 target: LOG_TARGET,
3539 ?para_id,
3540 ?candidate_hash,
3541 "Channel closed while recovering data for candidate {:?}",
3542 (candidate_hash, candidate.descriptor.para_id()),
3543 );
3544 metrics_guard.take().on_approval_unavailable();
3546 },
3547 &RecoveryError::Invalid => {
3548 gum::warn!(
3549 target: LOG_TARGET,
3550 ?para_id,
3551 ?candidate_hash,
3552 "Data recovery invalid for candidate {:?}",
3553 (candidate_hash, candidate.descriptor.para_id()),
3554 );
3555 issue_local_invalid_statement(
3556 &mut sender,
3557 session_index,
3558 candidate_hash,
3559 candidate.clone(),
3560 );
3561 metrics_guard.take().on_approval_invalid();
3562 },
3563 }
3564 return ApprovalState::failed_with_retry(
3565 validator_index,
3566 candidate_hash,
3567 next_retry,
3568 );
3569 },
3570 };
3571
3572 let validation_code = match code_rx.await {
3573 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3574 Ok(Err(_)) => return ApprovalState::failed(validator_index, candidate_hash),
3575 Ok(Ok(Some(code))) => code,
3576 Ok(Ok(None)) => {
3577 gum::warn!(
3578 target: LOG_TARGET,
3579 "Validation code unavailable for block {:?} in the state of block {:?} (a recent descendant)",
3580 candidate.descriptor.relay_parent(),
3581 block_hash,
3582 );
3583
3584 metrics_guard.take().on_approval_unavailable();
3587 return ApprovalState::failed(validator_index, candidate_hash);
3588 },
3589 };
3590
3591 let (val_tx, val_rx) = oneshot::channel();
3592 sender
3593 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
3594 validation_data: available_data.validation_data,
3595 validation_code,
3596 candidate_receipt: candidate.clone(),
3597 pov: available_data.pov,
3598 scheduling_session_index: session_index,
3599 exec_kind: PvfExecKind::Approval,
3600 response_sender: val_tx,
3601 })
3602 .await;
3603
3604 match val_rx.await {
3605 Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
3606 Ok(Ok(ValidationResult::Valid(_, _))) => {
3607 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Candidate Valid");
3611
3612 let _ = metrics_guard.take();
3613 return ApprovalState::approved(validator_index, candidate_hash);
3614 },
3615 Ok(Ok(ValidationResult::Invalid(reason))) => {
3616 gum::warn!(
3617 target: LOG_TARGET,
3618 ?reason,
3619 ?candidate_hash,
3620 ?para_id,
3621 "Detected invalid candidate as an approval checker.",
3622 );
3623
3624 issue_local_invalid_statement(
3625 &mut sender,
3626 session_index,
3627 candidate_hash,
3628 candidate.clone(),
3629 );
3630 metrics_guard.take().on_approval_invalid();
3631 return ApprovalState::failed(validator_index, candidate_hash);
3632 },
3633 Ok(Err(e)) => {
3634 gum::error!(
3635 target: LOG_TARGET,
3636 err = ?e,
3637 ?candidate_hash,
3638 ?para_id,
3639 "Failed to validate candidate due to internal error",
3640 );
3641 metrics_guard.take().on_approval_error();
3642 return ApprovalState::failed(validator_index, candidate_hash);
3643 },
3644 }
3645 };
3646 let (background, remote_handle) = background.remote_handle();
3647 spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background));
3648 Ok(remote_handle)
3649}
3650
3651#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3654async fn issue_approval<
3655 Sender: SubsystemSender<RuntimeApiMessage>,
3656 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3657>(
3658 sender: &mut Sender,
3659 approval_voting_sender: &mut ADSender,
3660 state: &mut State,
3661 db: &mut OverlayedBackend<'_, impl Backend>,
3662 session_info_provider: &mut RuntimeInfo,
3663 metrics: &Metrics,
3664 candidate_hash: CandidateHash,
3665 delayed_approvals_timers: &mut DelayedApprovalTimer,
3666 ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
3667 wakeups: &Wakeups,
3668) -> SubsystemResult<Vec<Action>> {
3669 let mut block_entry = match db.load_block_entry(&block_hash)? {
3670 Some(b) => b,
3671 None => {
3672 metrics.on_approval_stale();
3674 return Ok(Vec::new());
3675 },
3676 };
3677
3678 let candidate_index = match block_entry.candidates().iter().position(|e| e.1 == candidate_hash)
3679 {
3680 None => {
3681 gum::warn!(
3682 target: LOG_TARGET,
3683 "Candidate hash {} is not present in the block entry's candidates for relay block {}",
3684 candidate_hash,
3685 block_entry.parent_hash(),
3686 );
3687
3688 metrics.on_approval_error();
3689 return Ok(Vec::new());
3690 },
3691 Some(idx) => idx,
3692 };
3693
3694 let candidate_hash = match block_entry.candidate(candidate_index as usize) {
3695 Some((_, h)) => *h,
3696 None => {
3697 gum::warn!(
3698 target: LOG_TARGET,
3699 "Received malformed request to approve out-of-bounds candidate index {} included at block {:?}",
3700 candidate_index,
3701 block_hash,
3702 );
3703
3704 metrics.on_approval_error();
3705 return Ok(Vec::new());
3706 },
3707 };
3708
3709 let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
3710 Some(c) => c,
3711 None => {
3712 gum::warn!(
3713 target: LOG_TARGET,
3714 "Missing entry for candidate index {} included at block {:?}",
3715 candidate_index,
3716 block_hash,
3717 );
3718
3719 metrics.on_approval_error();
3720 return Ok(Vec::new());
3721 },
3722 };
3723
3724 let session_info = match get_session_info_by_index(
3725 session_info_provider,
3726 sender,
3727 block_entry.parent_hash(),
3728 block_entry.session(),
3729 )
3730 .await
3731 {
3732 Some(s) => s,
3733 None => return Ok(Vec::new()),
3734 };
3735
3736 if block_entry
3737 .defer_candidate_signature(
3738 candidate_index as _,
3739 candidate_hash,
3740 compute_delayed_approval_sending_tick(
3741 state,
3742 &block_entry,
3743 &candidate_entry,
3744 session_info,
3745 &metrics,
3746 ),
3747 )
3748 .is_some()
3749 {
3750 gum::error!(
3751 target: LOG_TARGET,
3752 ?candidate_hash,
3753 ?block_hash,
3754 validator_index = validator_index.0,
3755 "Possible bug, we shouldn't have to defer a candidate more than once",
3756 );
3757 }
3758
3759 gum::debug!(
3760 target: LOG_TARGET,
3761 ?candidate_hash,
3762 ?block_hash,
3763 validator_index = validator_index.0,
3764 "Ready to issue approval vote",
3765 );
3766
3767 let actions = advance_approval_state(
3768 sender,
3769 state,
3770 db,
3771 session_info_provider,
3772 metrics,
3773 block_entry,
3774 candidate_hash,
3775 candidate_entry,
3776 ApprovalStateTransition::LocalApproval(validator_index as _),
3777 wakeups,
3778 )
3779 .await;
3780
3781 if let Some(next_wakeup) = maybe_create_signature(
3782 db,
3783 session_info_provider,
3784 state,
3785 sender,
3786 approval_voting_sender,
3787 block_hash,
3788 validator_index,
3789 metrics,
3790 )
3791 .await?
3792 {
3793 delayed_approvals_timers.maybe_arm_timer(
3794 next_wakeup,
3795 state.clock.as_ref(),
3796 block_hash,
3797 validator_index,
3798 );
3799 }
3800 Ok(actions)
3801}
3802
3803#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
3805async fn maybe_create_signature<
3806 Sender: SubsystemSender<RuntimeApiMessage>,
3807 ADSender: SubsystemSender<ApprovalDistributionMessage>,
3808>(
3809 db: &mut OverlayedBackend<'_, impl Backend>,
3810 session_info_provider: &mut RuntimeInfo,
3811 state: &State,
3812 sender: &mut Sender,
3813 approval_voting_sender: &mut ADSender,
3814 block_hash: Hash,
3815 validator_index: ValidatorIndex,
3816 metrics: &Metrics,
3817) -> SubsystemResult<Option<Tick>> {
3818 let mut block_entry = match db.load_block_entry(&block_hash)? {
3819 Some(b) => b,
3820 None => {
3821 metrics.on_approval_stale();
3823 gum::debug!(
3824 target: LOG_TARGET,
3825 "Could not find block that needs signature {:}", block_hash
3826 );
3827 return Ok(None);
3828 },
3829 };
3830
3831 let approval_params = state
3832 .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash)
3833 .await
3834 .unwrap_or_default();
3835
3836 gum::trace!(
3837 target: LOG_TARGET,
3838 "Candidates pending signatures {:}", block_entry.num_candidates_pending_signature()
3839 );
3840 let tick_now = state.clock.tick_now();
3841
3842 let (candidates_to_sign, sign_no_later_then) = block_entry
3843 .get_candidates_that_need_signature(tick_now, approval_params.max_approval_coalesce_count);
3844
3845 let (candidates_hashes, candidates_indices) = match candidates_to_sign {
3846 Some(candidates_to_sign) => candidates_to_sign,
3847 None => return Ok(sign_no_later_then),
3848 };
3849
3850 let session_info = match get_session_info_by_index(
3851 session_info_provider,
3852 sender,
3853 block_entry.parent_hash(),
3854 block_entry.session(),
3855 )
3856 .await
3857 {
3858 Some(s) => s,
3859 None => {
3860 metrics.on_approval_error();
3861 gum::error!(
3862 target: LOG_TARGET,
3863 "Could not retrieve the session"
3864 );
3865 return Ok(None);
3866 },
3867 };
3868
3869 let validator_pubkey = match session_info.validators.get(validator_index) {
3870 Some(p) => p,
3871 None => {
3872 gum::error!(
3873 target: LOG_TARGET,
3874 "Validator index {} out of bounds in session {}",
3875 validator_index.0,
3876 block_entry.session(),
3877 );
3878
3879 metrics.on_approval_error();
3880 return Ok(None);
3881 },
3882 };
3883
3884 let signature = match sign_approval(
3885 &state.keystore,
3886 &validator_pubkey,
3887 &candidates_hashes,
3888 block_entry.session(),
3889 ) {
3890 Some(sig) => sig,
3891 None => {
3892 gum::error!(
3893 target: LOG_TARGET,
3894 validator_index = ?validator_index,
3895 session = ?block_entry.session(),
3896 "Could not issue approval signature. Assignment key present but not validator key?",
3897 );
3898
3899 metrics.on_approval_error();
3900 return Ok(None);
3901 },
3902 };
3903 metrics.on_approval_coalesce(candidates_hashes.len() as u32);
3904
3905 let candidate_entries = candidates_hashes
3906 .iter()
3907 .map(|candidate_hash| db.load_candidate_entry(candidate_hash))
3908 .collect::<SubsystemResult<Vec<Option<CandidateEntry>>>>()?;
3909
3910 for mut candidate_entry in candidate_entries {
3911 let approval_entry = candidate_entry.as_mut().and_then(|candidate_entry| {
3912 candidate_entry.approval_entry_mut(&block_entry.block_hash())
3913 });
3914
3915 match approval_entry {
3916 Some(approval_entry) => approval_entry.import_approval_sig(OurApproval {
3917 signature: signature.clone(),
3918 signed_candidates_indices: candidates_indices.clone(),
3919 }),
3920 None => {
3921 gum::error!(
3922 target: LOG_TARGET,
3923 candidate_entry = ?candidate_entry,
3924 "Candidate scheduled for signing approval entry should not be None"
3925 );
3926 },
3927 };
3928 candidate_entry.map(|candidate_entry| db.write_candidate_entry(candidate_entry));
3929 }
3930
3931 metrics.on_approval_produced();
3932
3933 approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval(
3934 IndirectSignedApprovalVoteV2 {
3935 block_hash: block_entry.block_hash(),
3936 candidate_indices: candidates_indices,
3937 validator: validator_index,
3938 signature,
3939 },
3940 ));
3941
3942 gum::trace!(
3943 target: LOG_TARGET,
3944 ?block_hash,
3945 signed_candidates = ?block_entry.num_candidates_pending_signature(),
3946 "Issue approval votes",
3947 );
3948 block_entry.issued_approval();
3949 db.write_block_entry(block_entry.into());
3950 Ok(None)
3951}
3952
3953fn sign_approval(
3955 keystore: &LocalKeystore,
3956 public: &ValidatorId,
3957 candidate_hashes: &[CandidateHash],
3958 session_index: SessionIndex,
3959) -> Option<ValidatorSignature> {
3960 let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
3961
3962 let payload = ApprovalVoteMultipleCandidates(candidate_hashes).signing_payload(session_index);
3963
3964 Some(key.sign(&payload[..]))
3965}
3966
3967fn issue_local_invalid_statement<Sender>(
3969 sender: &mut Sender,
3970 session_index: SessionIndex,
3971 candidate_hash: CandidateHash,
3972 candidate: CandidateReceipt,
3973) where
3974 Sender: SubsystemSender<DisputeCoordinatorMessage>,
3975{
3976 sender.send_unbounded_message(DisputeCoordinatorMessage::IssueLocalStatement(
3986 session_index,
3987 candidate_hash,
3988 candidate.clone(),
3989 false,
3990 ));
3991}
3992
3993fn compute_delayed_approval_sending_tick(
3995 state: &State,
3996 block_entry: &BlockEntry,
3997 candidate_entry: &CandidateEntry,
3998 session_info: &SessionInfo,
3999 metrics: &Metrics,
4000) -> Tick {
4001 let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
4002 let assignment_tranche = candidate_entry
4003 .approval_entry(&block_entry.block_hash())
4004 .and_then(|approval_entry| approval_entry.our_assignment())
4005 .map(|our_assignment| our_assignment.tranche())
4006 .unwrap_or_default();
4007
4008 let assignment_triggered_tick = current_block_tick + assignment_tranche as Tick;
4009
4010 let no_show_duration_ticks = slot_number_to_tick(
4011 state.slot_duration_millis,
4012 Slot::from(u64::from(session_info.no_show_slots)),
4013 );
4014 let tick_now = state.clock.tick_now();
4015
4016 let sign_no_later_than = min(
4017 tick_now + MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick,
4018 assignment_triggered_tick + no_show_duration_ticks / 2,
4022 );
4023
4024 metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default());
4025 sign_no_later_than
4026}