mixnet/request_manager/
mod.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! For more reliable delivery, a [`RequestManager`] can be used instead of calling
22//! [`Mixnet::post_request`] directly. A [`RequestManager`] serves as an additional buffer for
23//! requests, and will retry posting if requests are not removed within the expected time.
24
25mod config;
26mod post_queues;
27
28pub use self::config::Config;
29use self::post_queues::PostQueues;
30use super::core::{
31	MessageId, Mixnet, MixnodeIndex, NetworkStatus, PostErr, RelSessionIndex, Scattered,
32	SessionIndex, SessionPhase, SessionStatus,
33};
34use rand::RngCore;
35use std::{
36	cmp::max,
37	collections::VecDeque,
38	time::{Duration, Instant},
39};
40
41/// Requests managed by a [`RequestManager`] must implement this trait.
42pub trait Request {
43	/// Opaque context type; a `&Context` is passed through [`RequestManager`] methods to `Request`
44	/// methods.
45	type Context;
46
47	/// Call `f` with the message data. The same data must be provided every time this is called.
48	fn with_data<T>(&self, f: impl FnOnce(Scattered<u8>) -> T, context: &Self::Context) -> T;
49	/// Returns the number of SURBs that should be sent along with the request. The same number
50	/// must be returned every time this is called.
51	fn num_surbs(&self, context: &Self::Context) -> usize;
52	/// Returns a conservative estimate of the handling delay. That is, the maximum time it should
53	/// take for the destination mixnode to process the request and post a reply.
54	fn handling_delay(&self, message_id: &MessageId, context: &Self::Context) -> Duration;
55
56	/// Called if an unrecoverable error is encountered while posting to the mixnet.
57	fn handle_post_err(self, err: PostErr, context: &Self::Context);
58	/// Called if we cannot retry posting because the configured limit has been reached.
59	fn handle_retry_limit_reached(self, context: &Self::Context);
60}
61
62struct RequestState<R> {
63	request: R,
64
65	destinations_remaining: u32,
66	attempts_remaining: u32,
67	/// This is decremented on insertion into the post queue.
68	posts_remaining: u32,
69
70	message_id: MessageId,
71	/// Should be `None` iff `destination_index` is `None`.
72	session_index: Option<SessionIndex>,
73	destination_index: Option<MixnodeIndex>,
74	retry_deadline: Instant,
75}
76
77impl<R> RequestState<R> {
78	/// `past` should be some instant in the past.
79	fn new_destination(&mut self, past: Instant) {
80		// Change message ID when changing destination; a message ID should only be known by the
81		// sender and receiver. Additionally, if we're changing session, and happen to pick the
82		// same node in the new session, we really need a different message ID to avoid old SURBs
83		// getting used in the new session.
84		//
85		// Assuming that message IDs are used to identify replies, this will mean that we no longer
86		// recognise replies from the previous destination. We only switch if there is an issue
87		// with the previous destination (eg the session is ending, or it has not replied), so this
88		// shouldn't matter much. TODO We could keep the old message ID around as well as the new
89		// one and match against it in remove().
90		rand::thread_rng().fill_bytes(&mut self.message_id);
91		self.session_index = None;
92		self.destination_index = None;
93		self.retry_deadline = past;
94	}
95}
96
97/// Request manager state. The user is responsible for calling
98/// [`update_session_status`](Self::update_session_status),
99/// [`process_post_queues`](Self::process_post_queues), and
100/// [`pop_next_retry`](Self::pop_next_retry) at the appropriate times to make progress.
101pub struct RequestManager<R> {
102	config: Config,
103	created_at: Instant,
104	session_status: SessionStatus,
105	/// `post_queues.prev` should be empty if `session_status.current_index` is 0, or if
106	/// previous-session requests are not allowed in the current phase. Similarly,
107	/// `post_queues.current` should be empty if current-session requests are not allowed in the
108	/// current phase.
109	post_queues: PostQueues<RequestState<R>>,
110	retry_queue: VecDeque<RequestState<R>>,
111	next_retry_deadline_changed: bool,
112}
113
114impl<C, R: Request<Context = C>> RequestManager<R> {
115	/// Create a new `RequestManager` with the given configuration.
116	pub fn new(config: Config) -> Self {
117		let capacity = config.capacity;
118		Self {
119			config,
120			created_at: Instant::now(),
121			session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
122			post_queues: PostQueues::new(capacity),
123			retry_queue: VecDeque::with_capacity(capacity),
124			next_retry_deadline_changed: false,
125		}
126	}
127
128	/// Update the current session index and phase. This should be called after
129	/// [`Mixnet::set_session_status`]. This may post messages to `mixnet`.
130	pub fn update_session_status<X>(
131		&mut self,
132		mixnet: &mut Mixnet<X>,
133		ns: &dyn NetworkStatus,
134		context: &C,
135	) {
136		let session_status = mixnet.session_status();
137		if self.session_status == session_status {
138			return
139		}
140
141		let prev_default_len = self.post_queues.default.len();
142
143		if self.session_status.current_index != session_status.current_index {
144			self.post_queues.default.append(&mut self.post_queues.prev); // Clears prev
145			if session_status.current_index.saturating_sub(self.session_status.current_index) == 1 {
146				std::mem::swap(&mut self.post_queues.current, &mut self.post_queues.prev);
147			} else {
148				// Unexpected session index change. Mixnet core will warn about this, don't bother
149				// warning again here.
150				self.post_queues.default.append(&mut self.post_queues.current); // Clears current
151			}
152		}
153
154		if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Current) {
155			self.post_queues.default.append(&mut self.post_queues.current); // Clears current
156		}
157		if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Prev) {
158			self.post_queues.default.append(&mut self.post_queues.prev); // Clears prev
159		}
160
161		for state in self.post_queues.default.iter_mut().skip(prev_default_len) {
162			state.new_destination(self.created_at);
163		}
164
165		self.session_status = session_status;
166
167		// The session status shouldn't change very often. For simplicity just retry posting in all
168		// sessions, rather than trying to figure out if we can skip some.
169		self.process_post_queues(mixnet, ns, context);
170	}
171
172	/// Returns `true` iff there is space for another request.
173	pub fn has_space(&self) -> bool {
174		let len =
175			self.post_queues.iter().map(VecDeque::len).sum::<usize>() + self.retry_queue.len();
176		len < self.config.capacity
177	}
178
179	/// Insert a request. This should only be called if there is space (see
180	/// [`has_space`](Self::has_space)). This may post messages to `mixnet`.
181	///
182	/// A request is only removed when:
183	///
184	/// - [`remove`](Self::remove) is called with the corresponding message ID. This would typically
185	///   happen when a reply is received.
186	/// - An unrecoverable error is encountered while posting to the mixnet. In this case,
187	///   [`Request::handle_post_err`] is called.
188	/// - The retry limit is reached. In this case, [`Request::handle_retry_limit_reached`] is
189	///   called.
190	pub fn insert<X>(
191		&mut self,
192		request: R,
193		mixnet: &mut Mixnet<X>,
194		ns: &dyn NetworkStatus,
195		context: &C,
196	) {
197		debug_assert!(self.has_space());
198		let state = RequestState {
199			request,
200
201			destinations_remaining: self.config.num_destinations,
202			attempts_remaining: 0,
203			posts_remaining: 0,
204
205			// The message ID will get generated when retry (below) calls state.new_destination()
206			message_id: Default::default(),
207			session_index: None,
208			destination_index: None,
209			retry_deadline: self.created_at,
210		};
211		self.retry(state, mixnet, ns, context);
212	}
213
214	/// Remove a request. Typically this would be called when a reply is received. Returns `None`
215	/// if there is no request with the given message ID.
216	pub fn remove(&mut self, message_id: &MessageId) -> Option<R> {
217		for post_queue in self.post_queues.iter_mut() {
218			if let Some(i) = post_queue.iter().position(|state| &state.message_id == message_id) {
219				return Some(post_queue.remove(i).expect("i returned by position()").request)
220			}
221		}
222
223		if let Some(i) = self.retry_queue.iter().position(|state| &state.message_id == message_id) {
224			if i == 0 {
225				self.next_retry_deadline_changed = true;
226			}
227			return Some(self.retry_queue.remove(i).expect("i returned by position()").request)
228		}
229
230		None
231	}
232
233	fn process_post_queue<X>(
234		&mut self,
235		rel_session_index: Option<RelSessionIndex>,
236		mixnet: &mut Mixnet<X>,
237		ns: &dyn NetworkStatus,
238		context: &C,
239	) {
240		let rel_session_index_or_default =
241			rel_session_index.unwrap_or(self.session_status.phase.default_request_session());
242		if (rel_session_index_or_default == RelSessionIndex::Prev) &&
243			(self.session_status.current_index == 0)
244		{
245			// The session does not exist. If this is the default session queue, just wait for the
246			// default session to change.
247			debug_assert!(self.post_queues.prev.is_empty());
248			return
249		}
250
251		let session_index = rel_session_index
252			.map(|rel_session_index| rel_session_index + self.session_status.current_index);
253		let session_index_or_default =
254			rel_session_index_or_default + self.session_status.current_index;
255
256		while let Some(mut state) = self.post_queues[rel_session_index].pop_front() {
257			debug_assert_eq!(state.session_index, session_index);
258
259			// Attempt to post a request message
260			let res = state.request.with_data(
261				|data| {
262					mixnet.post_request(
263						session_index_or_default,
264						&mut state.destination_index,
265						&state.message_id,
266						data,
267						state.request.num_surbs(context),
268						ns,
269					)
270				},
271				context,
272			);
273
274			match res {
275				Ok(metrics) => {
276					state.session_index = Some(session_index_or_default);
277
278					// Extend the retry deadline
279					let handling_delay = state.request.handling_delay(&state.message_id, context);
280					let rtt = metrics.estimate_rtt(handling_delay);
281					state.retry_deadline = max(state.retry_deadline, Instant::now() + rtt);
282
283					match state.posts_remaining.checked_sub(1) {
284						Some(posts_remaining) => {
285							state.posts_remaining = posts_remaining;
286							self.post_queues[Some(rel_session_index_or_default)].push_back(state);
287						},
288						None => {
289							let i = self
290								.retry_queue
291								.partition_point(|s| s.retry_deadline < state.retry_deadline);
292							self.retry_queue.insert(i, state);
293							if i == 0 {
294								self.next_retry_deadline_changed = true;
295							}
296						},
297					}
298				},
299				Err(PostErr::NotEnoughSpaceInQueue) => {
300					// In this case, nothing should have changed. Just push the request back on the
301					// front of the queue and try again later.
302					self.post_queues[rel_session_index].push_front(state);
303					break
304				},
305				Err(err) => state.request.handle_post_err(err, context),
306			}
307		}
308	}
309
310	/// Attempt to post messages from the internal post queues to `mixnet`. This should be called
311	/// when the
312	/// [`SPACE_IN_AUTHORED_PACKET_QUEUE`](super::core::Events::SPACE_IN_AUTHORED_PACKET_QUEUE)
313	/// event fires.
314	pub fn process_post_queues<X>(
315		&mut self,
316		mixnet: &mut Mixnet<X>,
317		ns: &dyn NetworkStatus,
318		context: &C,
319	) {
320		// Process the default session queue first, as doing so might result in requests getting
321		// pushed onto the other queues
322		self.process_post_queue(None, mixnet, ns, context);
323		self.process_post_queue(Some(RelSessionIndex::Current), mixnet, ns, context);
324		self.process_post_queue(Some(RelSessionIndex::Prev), mixnet, ns, context);
325	}
326
327	fn session_post_queues_empty(&self, rel_session_index: Option<RelSessionIndex>) -> bool {
328		if !self.post_queues[rel_session_index].is_empty() {
329			return false
330		}
331		let default = self.session_status.phase.default_request_session();
332		match rel_session_index {
333			Some(rel_session_index) if rel_session_index == default =>
334				self.post_queues.default.is_empty(),
335			Some(_) => true,
336			None => self.post_queues[Some(default)].is_empty(),
337		}
338	}
339
340	fn retry<X>(
341		&mut self,
342		mut state: RequestState<R>,
343		mixnet: &mut Mixnet<X>,
344		ns: &dyn NetworkStatus,
345		context: &C,
346	) {
347		debug_assert_eq!(state.posts_remaining, 0);
348		match state.attempts_remaining.checked_sub(1) {
349			Some(attempts_remaining) => state.attempts_remaining = attempts_remaining,
350			None => {
351				let Some(destinations_remaining) = state.destinations_remaining.checked_sub(1)
352				else {
353					state.request.handle_retry_limit_reached(context);
354					return
355				};
356				state.destinations_remaining = destinations_remaining;
357				state.attempts_remaining = self.config.num_attempts_per_destination - 1;
358				state.new_destination(self.created_at);
359			},
360		}
361		state.posts_remaining = self.config.num_posts_per_attempt - 1;
362
363		let rel_session_index = state.session_index.and_then(|session_index| {
364			let rel_session_index = RelSessionIndex::from_session_index(
365				session_index,
366				self.session_status.current_index,
367			);
368			if !rel_session_index.map_or(false, |rel_session_index| {
369				self.session_status.phase.allow_requests_and_replies(rel_session_index)
370			}) {
371				state.new_destination(self.created_at);
372				return None
373			}
374			rel_session_index
375		});
376
377		let empty = self.session_post_queues_empty(rel_session_index);
378		self.post_queues[rel_session_index].push_back(state);
379		if empty {
380			// There were no requests waiting. It might be possible to post immediately.
381			self.process_post_queue(rel_session_index, mixnet, ns, context);
382			if rel_session_index.is_none() {
383				// Might have pushed requests onto this queue while processing the default session
384				// queue
385				self.process_post_queue(
386					Some(self.session_status.phase.default_request_session()),
387					mixnet,
388					ns,
389					context,
390				);
391			}
392		}
393	}
394
395	/// Returns the next instant at which [`pop_next_retry`](Self::pop_next_retry) should be
396	/// called.
397	pub fn next_retry_deadline(&self) -> Option<Instant> {
398		self.retry_queue.front().map(|state| state.retry_deadline)
399	}
400
401	/// Pop the next request from the internal retry queue. This should be called whenever the
402	/// deadline returned by [`next_retry_deadline`](Self::next_retry_deadline) is reached. This
403	/// may post messages to `mixnet`. Returns `false` if the internal retry queue is empty.
404	pub fn pop_next_retry<X>(
405		&mut self,
406		mixnet: &mut Mixnet<X>,
407		ns: &dyn NetworkStatus,
408		context: &C,
409	) -> bool {
410		if let Some(state) = self.retry_queue.pop_front() {
411			self.next_retry_deadline_changed = true;
412			self.retry(state, mixnet, ns, context);
413			true
414		} else {
415			false
416		}
417	}
418
419	/// Returns `true` if the next retry deadline (see
420	/// [`next_retry_deadline`](Self::next_retry_deadline)) has changed since the last call.
421	pub fn next_retry_deadline_changed(&mut self) -> bool {
422		let changed = self.next_retry_deadline_changed;
423		self.next_retry_deadline_changed = false;
424		changed
425	}
426}