1use super::core::{MessageId, Mixnet, RequestMessage, SessionIndex, Surb, MESSAGE_ID_SIZE};
26use hashlink::{linked_hash_map::Entry, LinkedHashMap};
27use log::{debug, trace};
28use rand::RngCore;
29use std::time::{Duration, Instant};
30
31#[derive(Clone, Debug)]
33pub struct Config {
34 pub log_target: &'static str,
36 pub capacity: usize,
39 pub max_posts: usize,
43 pub cooldown: Duration,
47}
48
49impl Default for Config {
50 fn default() -> Self {
51 Self {
52 log_target: "mixnet",
53 capacity: 400,
54 max_posts: 2,
55 cooldown: Duration::from_secs(10),
56 }
57 }
58}
59
60struct Reply {
61 message_id: MessageId,
63 data: Vec<u8>,
64}
65
66impl Reply {
67 fn new(data: Vec<u8>) -> Self {
68 let mut message_id = [0; MESSAGE_ID_SIZE];
69 rand::thread_rng().fill_bytes(&mut message_id);
70 Self { message_id, data }
71 }
72}
73
74pub struct ReplyContext {
76 session_index: SessionIndex,
77 message_id: MessageId,
79 surbs: Vec<Surb>,
80}
81
82impl ReplyContext {
83 pub fn message_id(&self) -> &MessageId {
85 &self.message_id
86 }
87
88 fn post_reply<X>(&mut self, reply: &Reply, mixnet: &mut Mixnet<X>, config: &Config) {
89 for _ in 0..config.max_posts {
90 if let Err(err) = mixnet.post_reply(
91 &mut self.surbs,
92 self.session_index,
93 &reply.message_id,
94 reply.data.as_slice().into(),
95 ) {
96 debug!(target: config.log_target,
97 "Failed to post reply to request with message ID {:x?}: {err}",
98 self.message_id);
99 break
100 }
101 }
102 }
103}
104
105enum ReplyState {
106 Pending,
108 Complete { reply: Reply, last_post: Instant },
110}
111
112pub struct ReplyManager {
114 config: Config,
115 states: LinkedHashMap<MessageId, ReplyState>,
116}
117
118impl ReplyManager {
119 pub fn new(config: Config) -> Self {
121 let states = LinkedHashMap::with_capacity(
122 config.capacity.saturating_add(1),
124 );
125 Self { config, states }
126 }
127
128 fn maybe_evict(&mut self) {
129 if self.states.len() > self.config.capacity {
130 self.states.pop_front();
131 debug_assert_eq!(self.states.len(), self.config.capacity);
132 }
133 }
134
135 pub fn insert<X>(
144 &mut self,
145 message: RequestMessage,
146 mixnet: &mut Mixnet<X>,
147 ) -> Option<(ReplyContext, Vec<u8>)> {
148 let mut reply_context = ReplyContext {
149 session_index: message.session_index,
150 message_id: message.id,
151 surbs: message.surbs,
152 };
153
154 match self.states.entry(message.id) {
155 Entry::Occupied(mut entry) => {
156 match entry.get_mut() {
157 ReplyState::Pending => trace!(target: self.config.log_target,
158 "Ignoring repeat request with message ID {:x?}; currently handling", message.id),
159 ReplyState::Complete { reply, last_post } => {
160 let now = Instant::now();
161 let since_last = now.saturating_duration_since(*last_post);
162 if since_last < self.config.cooldown {
163 trace!(target: self.config.log_target,
164 "Ignoring repeat request with message ID {:x?}; posted a reply {:.1}s ago",
165 message.id, since_last.as_secs_f32());
166 } else {
167 *last_post = now;
168 reply_context.post_reply(reply, mixnet, &self.config);
169 }
170 },
171 }
172 None
173 },
174 Entry::Vacant(entry) => {
175 entry.insert(ReplyState::Pending);
176 self.maybe_evict();
177 Some((reply_context, message.data))
178 },
179 }
180 }
181
182 pub fn abandon(&mut self, reply_context: ReplyContext) {
186 if let Entry::Occupied(entry) = self.states.entry(reply_context.message_id) {
187 match entry.get() {
188 ReplyState::Pending => {
189 entry.remove();
190 },
191 ReplyState::Complete { .. } => debug!(
192 target: self.config.log_target,
193 "Ignoring abandon of request with message ID {:x?}; already completed",
194 reply_context.message_id
195 ),
196 }
197 }
198 }
199
200 pub fn complete<X>(
202 &mut self,
203 mut reply_context: ReplyContext,
204 data: Vec<u8>,
205 mixnet: &mut Mixnet<X>,
206 ) {
207 let state = match self.states.entry(reply_context.message_id) {
208 Entry::Occupied(entry) => match entry.into_mut() {
209 state @ ReplyState::Pending => state,
210 ReplyState::Complete { .. } => {
211 debug!(target: self.config.log_target,
212 "Request with message ID {:x?} completed twice",
213 reply_context.message_id);
214 return
215 },
216 },
217 Entry::Vacant(entry) => entry.insert(ReplyState::Pending),
218 };
219
220 let reply = Reply::new(data);
221 reply_context.post_reply(&reply, mixnet, &self.config);
222 *state = ReplyState::Complete { reply, last_post: Instant::now() };
223
224 self.maybe_evict();
225 }
226}