polkadot_dispute_distribution/receiver/batches/batch.rs
1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16
17use std::{collections::HashMap, time::Instant};
18
19use gum::CandidateHash;
20use polkadot_node_network_protocol::{
21 request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
22 PeerId,
23};
24use polkadot_node_primitives::SignedDisputeStatement;
25use polkadot_primitives::{CandidateReceiptV2 as CandidateReceipt, ValidatorIndex};
26
27use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};
28
29use super::MAX_BATCH_LIFETIME;
30
31/// A batch of votes to be imported into the `dispute-coordinator`.
32///
33/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
34/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
35/// coordinator.
36///
37/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
38/// it will "flush" in case the incoming rate dropped too low, preparing the import.
39pub struct Batch {
40 /// The actual candidate this batch is concerned with.
41 candidate_receipt: CandidateReceipt,
42
43 /// Cache of `CandidateHash` (candidate_receipt.hash()).
44 candidate_hash: CandidateHash,
45
46 /// All valid votes received in this batch so far.
47 ///
48 /// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
49 /// while still allowing validators to equivocate.
50 ///
51 /// Detecting and rejecting duplicates is crucial in order to effectively enforce
52 /// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
53 /// here, the mechanism would be broken.
54 valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
55
56 /// All invalid votes received in this batch so far.
57 invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
58
59 /// How many votes have been batched since the last tick/creation.
60 votes_batched_since_last_tick: u32,
61
62 /// Expiry time for the batch.
63 ///
64 /// By this time the latest this batch will get flushed.
65 best_before: Instant,
66
67 /// Requesters waiting for a response.
68 requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
69}
70
71/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
72pub(super) enum TickResult {
73 /// Batch is still alive, please call `tick` again at the given `Instant`.
74 Alive(Batch, Instant),
75 /// Batch is done, ready for import!
76 Done(PreparedImport),
77}
78
79/// Ready for import.
80pub struct PreparedImport {
81 pub candidate_receipt: CandidateReceipt,
82 pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
83 /// Information about original requesters.
84 pub requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
85}
86
87impl From<Batch> for PreparedImport {
88 fn from(batch: Batch) -> Self {
89 let Batch {
90 candidate_receipt,
91 valid_votes,
92 invalid_votes,
93 requesters: pending_responses,
94 ..
95 } = batch;
96
97 let statements = valid_votes
98 .into_iter()
99 .chain(invalid_votes.into_iter())
100 .map(|(index, statement)| (statement, index))
101 .collect();
102
103 Self { candidate_receipt, statements, requesters: pending_responses }
104 }
105}
106
107impl Batch {
108 /// Create a new empty batch based on the given `CandidateReceipt`.
109 ///
110 /// To create a `Batch` use Batches::find_batch`.
111 ///
112 /// Arguments:
113 ///
114 /// * `candidate_receipt` - The candidate this batch is meant to track votes for.
115 /// * `now` - current time stamp for calculating the first tick.
116 ///
117 /// Returns: A batch and the first `Instant` you are supposed to call `tick`.
118 pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
119 let s = Self {
120 candidate_hash: candidate_receipt.hash(),
121 candidate_receipt,
122 valid_votes: HashMap::new(),
123 invalid_votes: HashMap::new(),
124 votes_batched_since_last_tick: 0,
125 best_before: Instant::now() + MAX_BATCH_LIFETIME,
126 requesters: Vec::new(),
127 };
128 let next_tick = s.calculate_next_tick(now);
129 (s, next_tick)
130 }
131
132 /// Receipt of the candidate this batch is batching votes for.
133 pub fn candidate_receipt(&self) -> &CandidateReceipt {
134 &self.candidate_receipt
135 }
136
137 /// Add votes from a validator into the batch.
138 ///
139 /// The statements are supposed to be the valid and invalid statements received in a
140 /// `DisputeRequest`.
141 ///
142 /// The given `pending_response` is the corresponding response sender for responding to `peer`.
143 /// If at least one of the votes is new as far as this batch is concerned we record the
144 /// pending_response, for later use. In case both votes are known already, we return the
145 /// response sender as an `Err` value.
146 pub fn add_votes(
147 &mut self,
148 valid_vote: (SignedDisputeStatement, ValidatorIndex),
149 invalid_vote: (SignedDisputeStatement, ValidatorIndex),
150 peer: PeerId,
151 pending_response: OutgoingResponseSender<DisputeRequest>,
152 ) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
153 debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
154 debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);
155
156 let mut duplicate = true;
157
158 if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
159 self.votes_batched_since_last_tick += 1;
160 duplicate = false;
161 }
162 if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
163 self.votes_batched_since_last_tick += 1;
164 duplicate = false;
165 }
166
167 if duplicate {
168 Err(pending_response)
169 } else {
170 self.requesters.push((peer, pending_response));
171 Ok(())
172 }
173 }
174
175 /// Check batch for liveness.
176 ///
177 /// This function is supposed to be called at instants given at construction and as returned as
178 /// part of `TickResult`.
179 pub(super) fn tick(mut self, now: Instant) -> TickResult {
180 if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
181 now < self.best_before
182 {
183 // Still good:
184 let next_tick = self.calculate_next_tick(now);
185 // Reset counter:
186 self.votes_batched_since_last_tick = 0;
187 TickResult::Alive(self, next_tick)
188 } else {
189 TickResult::Done(PreparedImport::from(self))
190 }
191 }
192
193 /// Calculate when the next tick should happen.
194 ///
195 /// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this
196 /// batch would exceed `MAX_BATCH_LIFETIME`.
197 ///
198 /// # Arguments
199 ///
200 /// * `now` - The current time.
201 fn calculate_next_tick(&self, now: Instant) -> Instant {
202 let next_tick = now + BATCH_COLLECTING_INTERVAL;
203 if next_tick < self.best_before {
204 next_tick
205 } else {
206 self.best_before
207 }
208 }
209}