1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::{hash_map, HashMap},
time::{Duration, Instant},
};
use futures::future::pending;
use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT;
use polkadot_primitives::{vstaging::CandidateReceiptV2 as CandidateReceipt, CandidateHash};
use crate::{
receiver::batches::{batch::TickResult, waiting_queue::PendingWake},
LOG_TARGET,
};
pub use self::batch::{Batch, PreparedImport};
use self::waiting_queue::WaitingQueue;
use super::{
error::{JfyiError, JfyiResult},
BATCH_COLLECTING_INTERVAL,
};
/// A single batch (per candidate) as managed by `Batches`.
mod batch;
/// Queue events in time and wait for them to become ready.
mod waiting_queue;
/// Safe-guard in case votes trickle in real slow.
///
/// If the batch life time exceeded the time the sender is willing to wait for a confirmation, we
/// would trigger pointless re-sends.
const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2));
/// Limit the number of batches that can be alive at any given time.
///
/// Reasoning for this number, see guide.
pub const MAX_BATCHES: usize = 1000;
/// Manage batches.
///
/// - Batches can be found via `find_batch()` in order to add votes to them/check they exist.
/// - Batches can be checked for being ready for flushing in order to import contained votes.
pub struct Batches {
/// The batches we manage.
///
/// Kept invariants:
/// For each entry in `batches`, there exists an entry in `waiting_queue` as well - we wait on
/// all batches!
batches: HashMap<CandidateHash, Batch>,
/// Waiting queue for waiting for batches to become ready for `tick`.
///
/// Kept invariants by `Batches`:
/// For each entry in the `waiting_queue` there exists a corresponding entry in `batches`.
waiting_queue: WaitingQueue<CandidateHash>,
}
/// A found batch is either really found or got created so it can be found.
pub enum FoundBatch<'a> {
/// Batch just got created.
Created(&'a mut Batch),
/// Batch already existed.
Found(&'a mut Batch),
}
impl Batches {
/// Create new empty `Batches`.
pub fn new() -> Self {
debug_assert!(
MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL,
"Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters."
);
Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() }
}
/// Find a particular batch.
///
/// That is either find it, or we create it as reflected by the result `FoundBatch`.
pub fn find_batch(
&mut self,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
) -> JfyiResult<FoundBatch> {
if self.batches.len() >= MAX_BATCHES {
return Err(JfyiError::MaxBatchLimitReached)
}
debug_assert!(candidate_hash == candidate_receipt.hash());
let result = match self.batches.entry(candidate_hash) {
hash_map::Entry::Vacant(vacant) => {
let now = Instant::now();
let (created, ready_at) = Batch::new(candidate_receipt, now);
let pending_wake = PendingWake { payload: candidate_hash, ready_at };
self.waiting_queue.push(pending_wake);
FoundBatch::Created(vacant.insert(created))
},
hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()),
};
Ok(result)
}
/// Wait for the next `tick` to check for ready batches.
///
/// This function blocks (returns `Poll::Pending`) until at least one batch can be
/// checked for readiness meaning that `BATCH_COLLECTING_INTERVAL` has passed since the last
/// check for that batch or it reached end of life.
///
/// If this `Batches` instance is empty (does not actually contain any batches), then this
/// function will always return `Poll::Pending`.
///
/// Returns: A `Vec` of all `PreparedImport`s from batches that became ready.
pub async fn check_batches(&mut self) -> Vec<PreparedImport> {
let now = Instant::now();
let mut imports = Vec::new();
// Wait for at least one batch to become ready:
self.waiting_queue.wait_ready(now).await;
// Process all ready entries:
while let Some(wake) = self.waiting_queue.pop_ready(now) {
let batch = self.batches.remove(&wake.payload);
debug_assert!(
batch.is_some(),
"Entries referenced in `waiting_queue` are supposed to exist!"
);
let batch = match batch {
None => return pending().await,
Some(batch) => batch,
};
match batch.tick(now) {
TickResult::Done(import) => {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?wake.payload,
"Batch became ready."
);
imports.push(import);
},
TickResult::Alive(old_batch, next_tick) => {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?wake.payload,
"Batch found to be still alive on check."
);
let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick };
self.waiting_queue.push(pending_wake);
self.batches.insert(wake.payload, old_batch);
},
}
}
imports
}
}