referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/receiver/batches/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{
18	collections::{hash_map, HashMap},
19	time::{Duration, Instant},
20};
21
22use futures::future::pending;
23
24use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT;
25use polkadot_primitives::{CandidateHash, CandidateReceiptV2 as CandidateReceipt};
26
27use crate::{
28	receiver::batches::{batch::TickResult, waiting_queue::PendingWake},
29	LOG_TARGET,
30};
31
32pub use self::batch::{Batch, PreparedImport};
33use self::waiting_queue::WaitingQueue;
34
35use super::{
36	error::{JfyiError, JfyiResult},
37	BATCH_COLLECTING_INTERVAL,
38};
39
40/// A single batch (per candidate) as managed by `Batches`.
41mod batch;
42
43/// Queue events in time and wait for them to become ready.
44mod waiting_queue;
45
46/// Safe-guard in case votes trickle in real slow.
47///
48/// If the batch life time exceeded the time the sender is willing to wait for a confirmation, we
49/// would trigger pointless re-sends.
50const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2));
51
52/// Limit the number of batches that can be alive at any given time.
53///
54/// Reasoning for this number, see guide.
55pub const MAX_BATCHES: usize = 1000;
56
57/// Manage batches.
58///
59/// - Batches can be found via `find_batch()` in order to add votes to them/check they exist.
60/// - Batches can be checked for being ready for flushing in order to import contained votes.
61pub struct Batches {
62	/// The batches we manage.
63	///
64	/// Kept invariants:
65	/// For each entry in `batches`, there exists an entry in `waiting_queue` as well - we wait on
66	/// all batches!
67	batches: HashMap<CandidateHash, Batch>,
68	/// Waiting queue for waiting for batches to become ready for `tick`.
69	///
70	/// Kept invariants by `Batches`:
71	/// For each entry in the `waiting_queue` there exists a corresponding entry in `batches`.
72	waiting_queue: WaitingQueue<CandidateHash>,
73}
74
75/// A found batch is either really found or got created so it can be found.
76pub enum FoundBatch<'a> {
77	/// Batch just got created.
78	Created(&'a mut Batch),
79	/// Batch already existed.
80	Found(&'a mut Batch),
81}
82
83impl Batches {
84	/// Create new empty `Batches`.
85	pub fn new() -> Self {
86		debug_assert!(
87			MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL,
88			"Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters."
89		);
90		Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() }
91	}
92
93	/// Find a particular batch.
94	///
95	/// That is either find it, or we create it as reflected by the result `FoundBatch`.
96	pub fn find_batch(
97		&mut self,
98		candidate_hash: CandidateHash,
99		candidate_receipt: CandidateReceipt,
100	) -> JfyiResult<FoundBatch> {
101		if self.batches.len() >= MAX_BATCHES {
102			return Err(JfyiError::MaxBatchLimitReached)
103		}
104		debug_assert!(candidate_hash == candidate_receipt.hash());
105		let result = match self.batches.entry(candidate_hash) {
106			hash_map::Entry::Vacant(vacant) => {
107				let now = Instant::now();
108				let (created, ready_at) = Batch::new(candidate_receipt, now);
109				let pending_wake = PendingWake { payload: candidate_hash, ready_at };
110				self.waiting_queue.push(pending_wake);
111				FoundBatch::Created(vacant.insert(created))
112			},
113			hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()),
114		};
115		Ok(result)
116	}
117
118	/// Wait for the next `tick` to check for ready batches.
119	///
120	/// This function blocks (returns `Poll::Pending`) until at least one batch can be
121	/// checked for readiness meaning that `BATCH_COLLECTING_INTERVAL` has passed since the last
122	/// check for that batch or it reached end of life.
123	///
124	/// If this `Batches` instance is empty (does not actually contain any batches), then this
125	/// function will always return `Poll::Pending`.
126	///
127	/// Returns: A `Vec` of all `PreparedImport`s from batches that became ready.
128	pub async fn check_batches(&mut self) -> Vec<PreparedImport> {
129		let now = Instant::now();
130
131		let mut imports = Vec::new();
132
133		// Wait for at least one batch to become ready:
134		self.waiting_queue.wait_ready(now).await;
135
136		// Process all ready entries:
137		while let Some(wake) = self.waiting_queue.pop_ready(now) {
138			let batch = self.batches.remove(&wake.payload);
139			debug_assert!(
140				batch.is_some(),
141				"Entries referenced in `waiting_queue` are supposed to exist!"
142			);
143			let batch = match batch {
144				None => return pending().await,
145				Some(batch) => batch,
146			};
147			match batch.tick(now) {
148				TickResult::Done(import) => {
149					gum::trace!(
150						target: LOG_TARGET,
151						candidate_hash = ?wake.payload,
152						"Batch became ready."
153					);
154					imports.push(import);
155				},
156				TickResult::Alive(old_batch, next_tick) => {
157					gum::trace!(
158						target: LOG_TARGET,
159						candidate_hash = ?wake.payload,
160						"Batch found to be still alive on check."
161					);
162					let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick };
163					self.waiting_queue.push(pending_wake);
164					self.batches.insert(wake.payload, old_batch);
165				},
166			}
167		}
168		imports
169	}
170}