mixnet/
reply_manager.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A mixnode may receive the same request multiple times due to retransmission (see eg
22//! [`request_manager`](super::request_manager)). A [`ReplyManager`] can be used to cache replies,
23//! to avoid needing to execute requests more than once.
24
25use super::core::{MessageId, Mixnet, RequestMessage, SessionIndex, Surb, MESSAGE_ID_SIZE};
26use hashlink::{linked_hash_map::Entry, LinkedHashMap};
27use log::{debug, trace};
28use rand::RngCore;
29use std::time::{Duration, Instant};
30
31/// Reply manager configuration.
32#[derive(Clone, Debug)]
33pub struct Config {
34	/// The target for log messages.
35	pub log_target: &'static str,
36	/// Maximum number of requests to remember. When this limit is reached, old requests will be
37	/// automatically discarded to make space for new ones.
38	pub capacity: usize,
39	/// Maximum number of copies of a reply message to post in response to a single request
40	/// message. Note that the number of copies is also limited by the number of SURBs provided in
41	/// the request message.
42	pub max_posts: usize,
43	/// After replying to a request, ignore repeats of the request for this length of time. This
44	/// should ideally be set such that extra copies of a request message posted at the same time
45	/// as the first received one get ignored, but retries posted after a timeout do not.
46	pub cooldown: Duration,
47}
48
49impl Default for Config {
50	fn default() -> Self {
51		Self {
52			log_target: "mixnet",
53			capacity: 400,
54			max_posts: 2,
55			cooldown: Duration::from_secs(10),
56		}
57	}
58}
59
60struct Reply {
61	/// The _reply_ message ID.
62	message_id: MessageId,
63	data: Vec<u8>,
64}
65
66impl Reply {
67	fn new(data: Vec<u8>) -> Self {
68		let mut message_id = [0; MESSAGE_ID_SIZE];
69		rand::thread_rng().fill_bytes(&mut message_id);
70		Self { message_id, data }
71	}
72}
73
74/// Context needed to reply to a request.
75pub struct ReplyContext {
76	session_index: SessionIndex,
77	/// The _request_ message ID.
78	message_id: MessageId,
79	surbs: Vec<Surb>,
80}
81
82impl ReplyContext {
83	/// Returns a reference to the request message ID.
84	pub fn message_id(&self) -> &MessageId {
85		&self.message_id
86	}
87
88	fn post_reply<X>(&mut self, reply: &Reply, mixnet: &mut Mixnet<X>, config: &Config) {
89		for _ in 0..config.max_posts {
90			if let Err(err) = mixnet.post_reply(
91				&mut self.surbs,
92				self.session_index,
93				&reply.message_id,
94				reply.data.as_slice().into(),
95			) {
96				debug!(target: config.log_target,
97					"Failed to post reply to request with message ID {:x?}: {err}",
98					self.message_id);
99				break
100			}
101		}
102	}
103}
104
105enum ReplyState {
106	/// The request is currently being handled.
107	Pending,
108	/// The request has been handled already.
109	Complete { reply: Reply, last_post: Instant },
110}
111
112/// Reply manager state.
113pub struct ReplyManager {
114	config: Config,
115	states: LinkedHashMap<MessageId, ReplyState>,
116}
117
118impl ReplyManager {
119	/// Create a new `ReplyManager` with the given configuration.
120	pub fn new(config: Config) -> Self {
121		let states = LinkedHashMap::with_capacity(
122			// Plus one because we only evict _after_ going over the limit
123			config.capacity.saturating_add(1),
124		);
125		Self { config, states }
126	}
127
128	fn maybe_evict(&mut self) {
129		if self.states.len() > self.config.capacity {
130			self.states.pop_front();
131			debug_assert_eq!(self.states.len(), self.config.capacity);
132		}
133	}
134
135	/// Attempt to insert a request.
136	///
137	/// If the request is already present, posts the reply if necessary, and returns `None`. The
138	/// caller does not need to do anything more.
139	///
140	/// If `Some` is returned, the caller should handle the request and then call either
141	/// [`abandon`](Self::abandon) or [`complete`](Self::complete) with the [`ReplyContext`]. The
142	/// `Vec<u8>` contains the request message data.
143	pub fn insert<X>(
144		&mut self,
145		message: RequestMessage,
146		mixnet: &mut Mixnet<X>,
147	) -> Option<(ReplyContext, Vec<u8>)> {
148		let mut reply_context = ReplyContext {
149			session_index: message.session_index,
150			message_id: message.id,
151			surbs: message.surbs,
152		};
153
154		match self.states.entry(message.id) {
155			Entry::Occupied(mut entry) => {
156				match entry.get_mut() {
157					ReplyState::Pending => trace!(target: self.config.log_target,
158						"Ignoring repeat request with message ID {:x?}; currently handling", message.id),
159					ReplyState::Complete { reply, last_post } => {
160						let now = Instant::now();
161						let since_last = now.saturating_duration_since(*last_post);
162						if since_last < self.config.cooldown {
163							trace!(target: self.config.log_target,
164								"Ignoring repeat request with message ID {:x?}; posted a reply {:.1}s ago",
165								message.id, since_last.as_secs_f32());
166						} else {
167							*last_post = now;
168							reply_context.post_reply(reply, mixnet, &self.config);
169						}
170					},
171				}
172				None
173			},
174			Entry::Vacant(entry) => {
175				entry.insert(ReplyState::Pending);
176				self.maybe_evict();
177				Some((reply_context, message.data))
178			},
179		}
180	}
181
182	/// Abandon a request. This should be called if you do not wish to reply at this time. If
183	/// [`insert`](Self::insert) is called again with a matching message (same ID), it will return
184	/// `Some`, and you will have another chance to handle the request.
185	pub fn abandon(&mut self, reply_context: ReplyContext) {
186		if let Entry::Occupied(entry) = self.states.entry(reply_context.message_id) {
187			match entry.get() {
188				ReplyState::Pending => {
189					entry.remove();
190				},
191				ReplyState::Complete { .. } => debug!(
192					target: self.config.log_target,
193					"Ignoring abandon of request with message ID {:x?}; already completed",
194					reply_context.message_id
195				),
196			}
197		}
198	}
199
200	/// Complete a request. This will post the reply and cache it for repeat requests.
201	pub fn complete<X>(
202		&mut self,
203		mut reply_context: ReplyContext,
204		data: Vec<u8>,
205		mixnet: &mut Mixnet<X>,
206	) {
207		let state = match self.states.entry(reply_context.message_id) {
208			Entry::Occupied(entry) => match entry.into_mut() {
209				state @ ReplyState::Pending => state,
210				ReplyState::Complete { .. } => {
211					debug!(target: self.config.log_target,
212						"Request with message ID {:x?} completed twice",
213						reply_context.message_id);
214					return
215				},
216			},
217			Entry::Vacant(entry) => entry.insert(ReplyState::Pending),
218		};
219
220		let reply = Reply::new(data);
221		reply_context.post_reply(&reply, mixnet, &self.config);
222		*state = ReplyState::Complete { reply, last_post: Instant::now() };
223
224		self.maybe_evict();
225	}
226}