polkadot_dispute_distribution/receiver/batches/
mod.rs1use std::{
18 collections::{hash_map, HashMap},
19 time::{Duration, Instant},
20};
21
22use futures::future::pending;
23
24use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT;
25use polkadot_primitives::{CandidateHash, CandidateReceiptV2 as CandidateReceipt};
26
27use crate::{
28 receiver::batches::{batch::TickResult, waiting_queue::PendingWake},
29 LOG_TARGET,
30};
31
32pub use self::batch::{Batch, PreparedImport};
33use self::waiting_queue::WaitingQueue;
34
35use super::{
36 error::{JfyiError, JfyiResult},
37 BATCH_COLLECTING_INTERVAL,
38};
39
40mod batch;
42
43mod waiting_queue;
45
46const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2));
51
52pub const MAX_BATCHES: usize = 1000;
56
57pub struct Batches {
62 batches: HashMap<CandidateHash, Batch>,
68 waiting_queue: WaitingQueue<CandidateHash>,
73}
74
75pub enum FoundBatch<'a> {
77 Created(&'a mut Batch),
79 Found(&'a mut Batch),
81}
82
83impl Batches {
84 pub fn new() -> Self {
86 debug_assert!(
87 MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL,
88 "Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters."
89 );
90 Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() }
91 }
92
93 pub fn find_batch(
97 &mut self,
98 candidate_hash: CandidateHash,
99 candidate_receipt: CandidateReceipt,
100 ) -> JfyiResult<FoundBatch> {
101 if self.batches.len() >= MAX_BATCHES {
102 return Err(JfyiError::MaxBatchLimitReached)
103 }
104 debug_assert!(candidate_hash == candidate_receipt.hash());
105 let result = match self.batches.entry(candidate_hash) {
106 hash_map::Entry::Vacant(vacant) => {
107 let now = Instant::now();
108 let (created, ready_at) = Batch::new(candidate_receipt, now);
109 let pending_wake = PendingWake { payload: candidate_hash, ready_at };
110 self.waiting_queue.push(pending_wake);
111 FoundBatch::Created(vacant.insert(created))
112 },
113 hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()),
114 };
115 Ok(result)
116 }
117
118 pub async fn check_batches(&mut self) -> Vec<PreparedImport> {
129 let now = Instant::now();
130
131 let mut imports = Vec::new();
132
133 self.waiting_queue.wait_ready(now).await;
135
136 while let Some(wake) = self.waiting_queue.pop_ready(now) {
138 let batch = self.batches.remove(&wake.payload);
139 debug_assert!(
140 batch.is_some(),
141 "Entries referenced in `waiting_queue` are supposed to exist!"
142 );
143 let batch = match batch {
144 None => return pending().await,
145 Some(batch) => batch,
146 };
147 match batch.tick(now) {
148 TickResult::Done(import) => {
149 gum::trace!(
150 target: LOG_TARGET,
151 candidate_hash = ?wake.payload,
152 "Batch became ready."
153 );
154 imports.push(import);
155 },
156 TickResult::Alive(old_batch, next_tick) => {
157 gum::trace!(
158 target: LOG_TARGET,
159 candidate_hash = ?wake.payload,
160 "Batch found to be still alive on check."
161 );
162 let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick };
163 self.waiting_queue.push(pending_wake);
164 self.batches.insert(wake.payload, old_batch);
165 },
166 }
167 }
168 imports
169 }
170}