1mod config;
26mod post_queues;
27
28pub use self::config::Config;
29use self::post_queues::PostQueues;
30use super::core::{
31 MessageId, Mixnet, MixnodeIndex, NetworkStatus, PostErr, RelSessionIndex, Scattered,
32 SessionIndex, SessionPhase, SessionStatus,
33};
34use rand::RngCore;
35use std::{
36 cmp::max,
37 collections::VecDeque,
38 time::{Duration, Instant},
39};
40
41pub trait Request {
43 type Context;
46
47 fn with_data<T>(&self, f: impl FnOnce(Scattered<u8>) -> T, context: &Self::Context) -> T;
49 fn num_surbs(&self, context: &Self::Context) -> usize;
52 fn handling_delay(&self, message_id: &MessageId, context: &Self::Context) -> Duration;
55
56 fn handle_post_err(self, err: PostErr, context: &Self::Context);
58 fn handle_retry_limit_reached(self, context: &Self::Context);
60}
61
62struct RequestState<R> {
63 request: R,
64
65 destinations_remaining: u32,
66 attempts_remaining: u32,
67 posts_remaining: u32,
69
70 message_id: MessageId,
71 session_index: Option<SessionIndex>,
73 destination_index: Option<MixnodeIndex>,
74 retry_deadline: Instant,
75}
76
77impl<R> RequestState<R> {
78 fn new_destination(&mut self, past: Instant) {
80 rand::thread_rng().fill_bytes(&mut self.message_id);
91 self.session_index = None;
92 self.destination_index = None;
93 self.retry_deadline = past;
94 }
95}
96
97pub struct RequestManager<R> {
102 config: Config,
103 created_at: Instant,
104 session_status: SessionStatus,
105 post_queues: PostQueues<RequestState<R>>,
110 retry_queue: VecDeque<RequestState<R>>,
111 next_retry_deadline_changed: bool,
112}
113
114impl<C, R: Request<Context = C>> RequestManager<R> {
115 pub fn new(config: Config) -> Self {
117 let capacity = config.capacity;
118 Self {
119 config,
120 created_at: Instant::now(),
121 session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
122 post_queues: PostQueues::new(capacity),
123 retry_queue: VecDeque::with_capacity(capacity),
124 next_retry_deadline_changed: false,
125 }
126 }
127
128 pub fn update_session_status<X>(
131 &mut self,
132 mixnet: &mut Mixnet<X>,
133 ns: &dyn NetworkStatus,
134 context: &C,
135 ) {
136 let session_status = mixnet.session_status();
137 if self.session_status == session_status {
138 return
139 }
140
141 let prev_default_len = self.post_queues.default.len();
142
143 if self.session_status.current_index != session_status.current_index {
144 self.post_queues.default.append(&mut self.post_queues.prev); if session_status.current_index.saturating_sub(self.session_status.current_index) == 1 {
146 std::mem::swap(&mut self.post_queues.current, &mut self.post_queues.prev);
147 } else {
148 self.post_queues.default.append(&mut self.post_queues.current); }
152 }
153
154 if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Current) {
155 self.post_queues.default.append(&mut self.post_queues.current); }
157 if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Prev) {
158 self.post_queues.default.append(&mut self.post_queues.prev); }
160
161 for state in self.post_queues.default.iter_mut().skip(prev_default_len) {
162 state.new_destination(self.created_at);
163 }
164
165 self.session_status = session_status;
166
167 self.process_post_queues(mixnet, ns, context);
170 }
171
172 pub fn has_space(&self) -> bool {
174 let len =
175 self.post_queues.iter().map(VecDeque::len).sum::<usize>() + self.retry_queue.len();
176 len < self.config.capacity
177 }
178
179 pub fn insert<X>(
191 &mut self,
192 request: R,
193 mixnet: &mut Mixnet<X>,
194 ns: &dyn NetworkStatus,
195 context: &C,
196 ) {
197 debug_assert!(self.has_space());
198 let state = RequestState {
199 request,
200
201 destinations_remaining: self.config.num_destinations,
202 attempts_remaining: 0,
203 posts_remaining: 0,
204
205 message_id: Default::default(),
207 session_index: None,
208 destination_index: None,
209 retry_deadline: self.created_at,
210 };
211 self.retry(state, mixnet, ns, context);
212 }
213
214 pub fn remove(&mut self, message_id: &MessageId) -> Option<R> {
217 for post_queue in self.post_queues.iter_mut() {
218 if let Some(i) = post_queue.iter().position(|state| &state.message_id == message_id) {
219 return Some(post_queue.remove(i).expect("i returned by position()").request)
220 }
221 }
222
223 if let Some(i) = self.retry_queue.iter().position(|state| &state.message_id == message_id) {
224 if i == 0 {
225 self.next_retry_deadline_changed = true;
226 }
227 return Some(self.retry_queue.remove(i).expect("i returned by position()").request)
228 }
229
230 None
231 }
232
233 fn process_post_queue<X>(
234 &mut self,
235 rel_session_index: Option<RelSessionIndex>,
236 mixnet: &mut Mixnet<X>,
237 ns: &dyn NetworkStatus,
238 context: &C,
239 ) {
240 let rel_session_index_or_default =
241 rel_session_index.unwrap_or(self.session_status.phase.default_request_session());
242 if (rel_session_index_or_default == RelSessionIndex::Prev) &&
243 (self.session_status.current_index == 0)
244 {
245 debug_assert!(self.post_queues.prev.is_empty());
248 return
249 }
250
251 let session_index = rel_session_index
252 .map(|rel_session_index| rel_session_index + self.session_status.current_index);
253 let session_index_or_default =
254 rel_session_index_or_default + self.session_status.current_index;
255
256 while let Some(mut state) = self.post_queues[rel_session_index].pop_front() {
257 debug_assert_eq!(state.session_index, session_index);
258
259 let res = state.request.with_data(
261 |data| {
262 mixnet.post_request(
263 session_index_or_default,
264 &mut state.destination_index,
265 &state.message_id,
266 data,
267 state.request.num_surbs(context),
268 ns,
269 )
270 },
271 context,
272 );
273
274 match res {
275 Ok(metrics) => {
276 state.session_index = Some(session_index_or_default);
277
278 let handling_delay = state.request.handling_delay(&state.message_id, context);
280 let rtt = metrics.estimate_rtt(handling_delay);
281 state.retry_deadline = max(state.retry_deadline, Instant::now() + rtt);
282
283 match state.posts_remaining.checked_sub(1) {
284 Some(posts_remaining) => {
285 state.posts_remaining = posts_remaining;
286 self.post_queues[Some(rel_session_index_or_default)].push_back(state);
287 },
288 None => {
289 let i = self
290 .retry_queue
291 .partition_point(|s| s.retry_deadline < state.retry_deadline);
292 self.retry_queue.insert(i, state);
293 if i == 0 {
294 self.next_retry_deadline_changed = true;
295 }
296 },
297 }
298 },
299 Err(PostErr::NotEnoughSpaceInQueue) => {
300 self.post_queues[rel_session_index].push_front(state);
303 break
304 },
305 Err(err) => state.request.handle_post_err(err, context),
306 }
307 }
308 }
309
310 pub fn process_post_queues<X>(
315 &mut self,
316 mixnet: &mut Mixnet<X>,
317 ns: &dyn NetworkStatus,
318 context: &C,
319 ) {
320 self.process_post_queue(None, mixnet, ns, context);
323 self.process_post_queue(Some(RelSessionIndex::Current), mixnet, ns, context);
324 self.process_post_queue(Some(RelSessionIndex::Prev), mixnet, ns, context);
325 }
326
327 fn session_post_queues_empty(&self, rel_session_index: Option<RelSessionIndex>) -> bool {
328 if !self.post_queues[rel_session_index].is_empty() {
329 return false
330 }
331 let default = self.session_status.phase.default_request_session();
332 match rel_session_index {
333 Some(rel_session_index) if rel_session_index == default =>
334 self.post_queues.default.is_empty(),
335 Some(_) => true,
336 None => self.post_queues[Some(default)].is_empty(),
337 }
338 }
339
340 fn retry<X>(
341 &mut self,
342 mut state: RequestState<R>,
343 mixnet: &mut Mixnet<X>,
344 ns: &dyn NetworkStatus,
345 context: &C,
346 ) {
347 debug_assert_eq!(state.posts_remaining, 0);
348 match state.attempts_remaining.checked_sub(1) {
349 Some(attempts_remaining) => state.attempts_remaining = attempts_remaining,
350 None => {
351 let Some(destinations_remaining) = state.destinations_remaining.checked_sub(1)
352 else {
353 state.request.handle_retry_limit_reached(context);
354 return
355 };
356 state.destinations_remaining = destinations_remaining;
357 state.attempts_remaining = self.config.num_attempts_per_destination - 1;
358 state.new_destination(self.created_at);
359 },
360 }
361 state.posts_remaining = self.config.num_posts_per_attempt - 1;
362
363 let rel_session_index = state.session_index.and_then(|session_index| {
364 let rel_session_index = RelSessionIndex::from_session_index(
365 session_index,
366 self.session_status.current_index,
367 );
368 if !rel_session_index.map_or(false, |rel_session_index| {
369 self.session_status.phase.allow_requests_and_replies(rel_session_index)
370 }) {
371 state.new_destination(self.created_at);
372 return None
373 }
374 rel_session_index
375 });
376
377 let empty = self.session_post_queues_empty(rel_session_index);
378 self.post_queues[rel_session_index].push_back(state);
379 if empty {
380 self.process_post_queue(rel_session_index, mixnet, ns, context);
382 if rel_session_index.is_none() {
383 self.process_post_queue(
386 Some(self.session_status.phase.default_request_session()),
387 mixnet,
388 ns,
389 context,
390 );
391 }
392 }
393 }
394
395 pub fn next_retry_deadline(&self) -> Option<Instant> {
398 self.retry_queue.front().map(|state| state.retry_deadline)
399 }
400
401 pub fn pop_next_retry<X>(
405 &mut self,
406 mixnet: &mut Mixnet<X>,
407 ns: &dyn NetworkStatus,
408 context: &C,
409 ) -> bool {
410 if let Some(state) = self.retry_queue.pop_front() {
411 self.next_retry_deadline_changed = true;
412 self.retry(state, mixnet, ns, context);
413 true
414 } else {
415 false
416 }
417 }
418
419 pub fn next_retry_deadline_changed(&mut self) -> bool {
422 let changed = self.next_retry_deadline_changed;
423 self.next_retry_deadline_changed = false;
424 changed
425 }
426}