referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/receiver/batches/
batch.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{collections::HashMap, time::Instant};
18
19use gum::CandidateHash;
20use polkadot_node_network_protocol::{
21	request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
22	PeerId,
23};
24use polkadot_node_primitives::SignedDisputeStatement;
25use polkadot_primitives::{CandidateReceiptV2 as CandidateReceipt, ValidatorIndex};
26
27use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};
28
29use super::MAX_BATCH_LIFETIME;
30
31/// A batch of votes to be imported into the `dispute-coordinator`.
32///
33/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
34/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
35/// coordinator.
36///
37/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
38/// it will "flush" in case the incoming rate dropped too low, preparing the import.
39pub struct Batch {
40	/// The actual candidate this batch is concerned with.
41	candidate_receipt: CandidateReceipt,
42
43	/// Cache of `CandidateHash` (candidate_receipt.hash()).
44	candidate_hash: CandidateHash,
45
46	/// All valid votes received in this batch so far.
47	///
48	/// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
49	/// while still allowing validators to equivocate.
50	///
51	/// Detecting and rejecting duplicates is crucial in order to effectively enforce
52	/// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
53	/// here, the mechanism would be broken.
54	valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
55
56	/// All invalid votes received in this batch so far.
57	invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,
58
59	/// How many votes have been batched since the last tick/creation.
60	votes_batched_since_last_tick: u32,
61
62	/// Expiry time for the batch.
63	///
64	/// By this time the latest this batch will get flushed.
65	best_before: Instant,
66
67	/// Requesters waiting for a response.
68	requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
69}
70
71/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
72pub(super) enum TickResult {
73	/// Batch is still alive, please call `tick` again at the given `Instant`.
74	Alive(Batch, Instant),
75	/// Batch is done, ready for import!
76	Done(PreparedImport),
77}
78
79/// Ready for import.
80pub struct PreparedImport {
81	pub candidate_receipt: CandidateReceipt,
82	pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
83	/// Information about original requesters.
84	pub requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
85}
86
87impl From<Batch> for PreparedImport {
88	fn from(batch: Batch) -> Self {
89		let Batch {
90			candidate_receipt,
91			valid_votes,
92			invalid_votes,
93			requesters: pending_responses,
94			..
95		} = batch;
96
97		let statements = valid_votes
98			.into_iter()
99			.chain(invalid_votes.into_iter())
100			.map(|(index, statement)| (statement, index))
101			.collect();
102
103		Self { candidate_receipt, statements, requesters: pending_responses }
104	}
105}
106
107impl Batch {
108	/// Create a new empty batch based on the given `CandidateReceipt`.
109	///
110	/// To create a `Batch` use Batches::find_batch`.
111	///
112	/// Arguments:
113	///
114	/// * `candidate_receipt` - The candidate this batch is meant to track votes for.
115	/// * `now` - current time stamp for calculating the first tick.
116	///
117	/// Returns: A batch and the first `Instant` you are supposed to call `tick`.
118	pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
119		let s = Self {
120			candidate_hash: candidate_receipt.hash(),
121			candidate_receipt,
122			valid_votes: HashMap::new(),
123			invalid_votes: HashMap::new(),
124			votes_batched_since_last_tick: 0,
125			best_before: Instant::now() + MAX_BATCH_LIFETIME,
126			requesters: Vec::new(),
127		};
128		let next_tick = s.calculate_next_tick(now);
129		(s, next_tick)
130	}
131
132	/// Receipt of the candidate this batch is batching votes for.
133	pub fn candidate_receipt(&self) -> &CandidateReceipt {
134		&self.candidate_receipt
135	}
136
137	/// Add votes from a validator into the batch.
138	///
139	/// The statements are supposed to be the valid and invalid statements received in a
140	/// `DisputeRequest`.
141	///
142	/// The given `pending_response` is the corresponding response sender for responding to `peer`.
143	/// If at least one of the votes is new as far as this batch is concerned we record the
144	/// pending_response, for later use. In case both votes are known already, we return the
145	/// response sender as an `Err` value.
146	pub fn add_votes(
147		&mut self,
148		valid_vote: (SignedDisputeStatement, ValidatorIndex),
149		invalid_vote: (SignedDisputeStatement, ValidatorIndex),
150		peer: PeerId,
151		pending_response: OutgoingResponseSender<DisputeRequest>,
152	) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
153		debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
154		debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);
155
156		let mut duplicate = true;
157
158		if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
159			self.votes_batched_since_last_tick += 1;
160			duplicate = false;
161		}
162		if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
163			self.votes_batched_since_last_tick += 1;
164			duplicate = false;
165		}
166
167		if duplicate {
168			Err(pending_response)
169		} else {
170			self.requesters.push((peer, pending_response));
171			Ok(())
172		}
173	}
174
175	/// Check batch for liveness.
176	///
177	/// This function is supposed to be called at instants given at construction and as returned as
178	/// part of `TickResult`.
179	pub(super) fn tick(mut self, now: Instant) -> TickResult {
180		if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
181			now < self.best_before
182		{
183			// Still good:
184			let next_tick = self.calculate_next_tick(now);
185			// Reset counter:
186			self.votes_batched_since_last_tick = 0;
187			TickResult::Alive(self, next_tick)
188		} else {
189			TickResult::Done(PreparedImport::from(self))
190		}
191	}
192
193	/// Calculate when the next tick should happen.
194	///
195	/// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this
196	/// batch would exceed `MAX_BATCH_LIFETIME`.
197	///
198	/// # Arguments
199	///
200	/// * `now` - The current time.
201	fn calculate_next_tick(&self, now: Instant) -> Instant {
202		let next_tick = now + BATCH_COLLECTING_INTERVAL;
203		if next_tick < self.best_before {
204			next_tick
205		} else {
206			self.best_before
207		}
208	}
209}