mixnet/core/
mod.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Mixnet core logic.
22
23// Get a bunch of these from [mut_]array_refs
24#![allow(clippy::ptr_offset_with_cast)]
25
26mod config;
27mod cover;
28mod fragment;
29mod kx_pair;
30mod packet_queues;
31mod replay_filter;
32mod request_builder;
33mod scattered;
34mod sessions;
35mod sphinx;
36mod surb_keystore;
37mod topology;
38mod util;
39
40pub use self::{
41	config::{Config, SessionConfig},
42	fragment::{MessageId, MESSAGE_ID_SIZE},
43	packet_queues::AddressedPacket,
44	scattered::Scattered,
45	sessions::{RelSessionIndex, SessionIndex, SessionPhase, SessionStatus},
46	sphinx::{
47		Delay, KxPublic, KxSecret, MixnodeIndex, Packet, PeerId, RawMixnodeIndex, Surb,
48		KX_PUBLIC_SIZE, MAX_HOPS, MAX_MIXNODE_INDEX, PACKET_SIZE, PEER_ID_SIZE, SURB_SIZE,
49	},
50	topology::{Mixnode, NetworkStatus, TopologyErr},
51};
52use self::{
53	cover::{gen_cover_packet, CoverKind},
54	fragment::{fragment_blueprints, FragmentAssembler},
55	kx_pair::KxPair,
56	packet_queues::{AuthoredPacketQueue, CheckSpaceErr, ForwardPacketQueue},
57	replay_filter::ReplayFilter,
58	request_builder::RequestBuilder,
59	sessions::{Session, SessionSlot, Sessions},
60	sphinx::{
61		complete_reply_packet, decrypt_reply_payload, kx_public, mut_payload_data, peel, Action,
62		PeelErr, PAYLOAD_DATA_SIZE, PAYLOAD_SIZE,
63	},
64	surb_keystore::SurbKeystore,
65	topology::Topology,
66	util::default_boxed_array,
67};
68use arrayref::{array_mut_ref, array_ref};
69use arrayvec::ArrayVec;
70use bitflags::bitflags;
71use either::Either;
72use log::{debug, info, trace};
73use rand::Rng;
74use std::{
75	cmp::{max, min},
76	time::{Duration, Instant},
77};
78
79/// Error querying the mixnodes for a session.
80pub enum MixnodesErr {
81	/// Transient error. The query might succeed later. Do not disable the mixnet for the session.
82	Transient,
83	/// Permanent error. The query will never succeed. Disable the mixnet for the session.
84	Permanent,
85}
86
87/// A request from another node.
88#[derive(Debug, PartialEq, Eq)]
89pub struct RequestMessage {
90	/// Index of the session this message was received in. This session index should be used when
91	/// sending replies.
92	pub session_index: SessionIndex,
93	/// Message identifier, explicitly provided by the request sender.
94	pub id: MessageId,
95	/// The message contents.
96	pub data: Vec<u8>,
97	/// SURBs that were attached to the message. These can be used to send replies.
98	pub surbs: Vec<Surb>,
99}
100
101/// A reply to a previously sent request.
102#[derive(Debug, PartialEq, Eq)]
103pub struct ReplyMessage {
104	/// ID of the request message this reply was sent in response to.
105	pub request_id: MessageId,
106	/// The message contents.
107	pub data: Vec<u8>,
108}
109
110/// A message received over the mixnet.
111#[derive(Debug, PartialEq, Eq)]
112pub enum Message {
113	/// A request from another node.
114	Request(RequestMessage),
115	/// A reply to a previously sent request.
116	Reply(ReplyMessage),
117}
118
119/// Request/reply posting error.
120#[derive(Debug, thiserror::Error)]
121pub enum PostErr {
122	/// Message contents too large or too many SURBs.
123	#[error("Message would need to be split into too many fragments")]
124	TooManyFragments,
125	/// The session is no longer active.
126	#[error("Session {0} is no longer active")]
127	SessionNoLongerActive(SessionIndex),
128	/// The session is not active yet.
129	#[error("Session {0} is not active yet")]
130	SessionNotActiveYet(SessionIndex),
131	/// Mixnodes not yet known for the session.
132	#[error("Mixnodes not yet known for session {0}")]
133	SessionMixnodesNotKnown(SessionIndex),
134	/// Mixnet disabled for the session.
135	#[error("Mixnet disabled for session {0}")]
136	SessionDisabled(SessionIndex),
137	/// Not enough space in the authored packet queue.
138	#[error("There is not enough space in the authored packet queue")]
139	NotEnoughSpaceInQueue,
140	/// Topology error.
141	#[error("Topology error: {0}")]
142	Topology(#[from] TopologyErr),
143	/// Bad SURB.
144	#[error("Bad SURB")]
145	BadSurb,
146}
147
148fn post_session<X>(
149	sessions: &mut Sessions<X>,
150	status: SessionStatus,
151	index: SessionIndex,
152) -> Result<&mut Session<X>, PostErr> {
153	let Some(rel_index) = RelSessionIndex::from_session_index(index, status.current_index) else {
154		return Err(if index < status.current_index {
155			PostErr::SessionNoLongerActive(index)
156		} else {
157			PostErr::SessionNotActiveYet(index)
158		})
159	};
160	if !status.phase.allow_requests_and_replies(rel_index) {
161		return Err(match rel_index {
162			RelSessionIndex::Prev => PostErr::SessionNoLongerActive(index),
163			RelSessionIndex::Current => PostErr::SessionNotActiveYet(index),
164		})
165	}
166	match &mut sessions[rel_index] {
167		SessionSlot::Empty | SessionSlot::KxPair(_) => Err(PostErr::SessionMixnodesNotKnown(index)),
168		// Note that in the case where the session has been disabled because it is no longer
169		// needed, we will enter the !allow_requests_and_replies if above and not get here
170		SessionSlot::Disabled => Err(PostErr::SessionDisabled(index)),
171		SessionSlot::Full(session) => Ok(session),
172	}
173}
174
175impl From<CheckSpaceErr> for PostErr {
176	fn from(value: CheckSpaceErr) -> Self {
177		match value {
178			CheckSpaceErr::Capacity => PostErr::TooManyFragments,
179			CheckSpaceErr::Len => PostErr::NotEnoughSpaceInQueue,
180		}
181	}
182}
183
184/// Returns a conservative estimate of the time taken for the last packet in the authored packet
185/// queue to get dispatched plus the time taken for all reply packets to get through the authored
186/// packet queue at the far end.
187fn estimate_authored_packet_queue_delay<X>(config: &Config, session: &Session<X>) -> Duration {
188	let rate_mul =
189		// When transitioning between sessions, the rate is halved
190		0.5 *
191		// Loop cover packets are never replaced with packets from the authored packet queue
192		(1.0 - config.loop_cover_proportion);
193	let request_period = session.mean_authored_packet_period.div_f64(rate_mul);
194	let request_len = session.authored_packet_queue.len();
195	// Assume that the destination mixnode is using the same configuration as us
196	let reply_period = config.mixnode_session.mean_authored_packet_period.div_f64(rate_mul);
197	let reply_len = config.mixnode_session.authored_packet_queue.capacity; // Worst case
198
199	// The delays between authored packet queue pops follow an exponential distribution. The sum of
200	// n independent exponential random variables with scale s follows a gamma distribution with
201	// shape n and scale s. A reasonable approximation to the 99.995th percentile of the gamma
202	// distribution with shape n and scale s is:
203	//
204	// s * (4.92582 + (3.87809 * sqrt(n)) + n)
205	//
206	// The constants were obtained by fitting to the actual values for n=1..200.
207	//
208	// This isn't quite what we want here; we are interested in the sum of two gamma-distributed
209	// random variables with different scales (request_period and reply_period). An approximation
210	// to the 99.995th percentile of such a sum is:
211	//
212	// s * (4.92582 + (3.87809 * sqrt(n + (r^3 * m))) + n + (r * m))
213	//
214	// Where:
215	//
216	// - s is the larger scale.
217	// - n is the corresponding shape.
218	// - m is the other shape.
219	// - r is the other scale divided by s (between 0 and 1).
220	//
221	// Note that when r is 0 this matches the first approximation, and when r is 1 this matches the
222	// first approximation with n replaced by (n + m).
223	let (s, n, m, rs) = if request_period > reply_period {
224		(request_period, request_len, reply_len, reply_period)
225	} else {
226		(reply_period, reply_len, request_len, request_period)
227	};
228	let n = n as f64;
229	let m = m as f64;
230	let r = rs.as_secs_f64() / s.as_secs_f64();
231	s.mul_f64(4.92582 + (3.87809 * (n + (r * r * r * m)).sqrt()) + n + (r * m))
232}
233
234/// Metrics that can be used to estimate a request's round-trip time.
235pub struct RequestMetrics {
236	/// The maximum number of hops for any of the fragments to reach the destination, plus the
237	/// maximum number of hops for any of the SURBs to come back.
238	pub num_hops: usize,
239	/// Conservative estimate of the network (and processing) delay per hop.
240	pub per_hop_net_delay: Duration,
241	/// The maximum total forwarding delay for any request fragment, plus the maximum total
242	/// forwarding delay for any SURB.
243	pub forwarding_delay: Duration,
244	/// A conservative estimate of the total delay through the authored packet queues at the source
245	/// and destination.
246	pub authored_packet_queue_delay: Duration,
247}
248
249impl RequestMetrics {
250	/// Returns a conservative estimate of the round-trip time, suitable for use as a timeout.
251	/// `handling_delay` should be a conservative estimate of the time taken to handle the request
252	/// at the destination and post the reply.
253	pub fn estimate_rtt(&self, handling_delay: Duration) -> Duration {
254		let net_delay = self.per_hop_net_delay * (self.num_hops as u32);
255		self.forwarding_delay + self.authored_packet_queue_delay + net_delay + handling_delay
256	}
257}
258
259bitflags! {
260	/// Flags to indicate events that have occurred. Note that these may be set spuriously.
261	pub struct Events: u32 {
262		/// The reserved peers returned by [`Mixnet::reserved_peers`] have changed.
263		const RESERVED_PEERS_CHANGED = 0b1;
264		/// The deadline returned by [`Mixnet::next_forward_packet_deadline`] has changed.
265		const NEXT_FORWARD_PACKET_DEADLINE_CHANGED = 0b10;
266		/// The effective deadline returned by [`Mixnet::next_authored_packet_delay`] has changed.
267		/// The delay (and thus the effective deadline) is randomly generated according to an
268		/// exponential distribution each time the function is called, but the last returned
269		/// deadline remains valid until this bit indicates otherwise. Due to the memoryless nature
270		/// of exponential distributions, it is harmless for this bit to be set spuriously.
271		const NEXT_AUTHORED_PACKET_DEADLINE_CHANGED = 0b100;
272		/// Space has become available in an authored packet queue.
273		const SPACE_IN_AUTHORED_PACKET_QUEUE = 0b1000;
274	}
275}
276
277/// Mixnet core state. `X` is the type of the extra data stored for each mixnode
278/// ([`Mixnode::extra`]).
279pub struct Mixnet<X> {
280	config: Config,
281
282	/// Index and phase of current session.
283	session_status: SessionStatus,
284	/// Current and previous sessions.
285	sessions: Sessions<X>,
286	/// Key-exchange key pair for the next session.
287	next_kx_pair: Option<KxPair>,
288
289	/// Queue of packets to be forwarded, after some delay.
290	forward_packet_queue: ForwardPacketQueue,
291
292	/// Keystore for SURB payload encryption keys.
293	surb_keystore: SurbKeystore,
294	/// Reassembles fragments into messages. Note that for simplicity there is just one assembler
295	/// for everything (requests and replies across all sessions).
296	fragment_assembler: FragmentAssembler,
297
298	/// Flags to indicate events that have occurred.
299	events: Events,
300}
301
302impl<X> Mixnet<X> {
303	/// Create a new `Mixnet`.
304	pub fn new(config: Config) -> Self {
305		let sessions = Sessions {
306			current: config
307				.session_0_kx_secret
308				.map_or(SessionSlot::Empty, |secret| SessionSlot::KxPair(secret.into())),
309			prev: SessionSlot::Disabled,
310		};
311
312		let forward_packet_queue = ForwardPacketQueue::new(config.forward_packet_queue_capacity);
313
314		let surb_keystore = SurbKeystore::new(config.surb_keystore_capacity);
315		let fragment_assembler = FragmentAssembler::new(
316			config.max_incomplete_messages,
317			config.max_incomplete_fragments,
318			config.max_fragments_per_message,
319		);
320
321		Self {
322			config,
323
324			session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
325			sessions,
326			next_kx_pair: None,
327
328			forward_packet_queue,
329
330			surb_keystore,
331			fragment_assembler,
332
333			events: Events::empty(),
334		}
335	}
336
337	/// Returns the current session index and phase.
338	pub fn session_status(&self) -> SessionStatus {
339		self.session_status
340	}
341
342	/// Sets the current session index and phase. The current and previous mixnodes may need to be
343	/// provided after calling this; see [`maybe_set_mixnodes`](Self::maybe_set_mixnodes).
344	pub fn set_session_status(&mut self, session_status: SessionStatus) {
345		if self.session_status == session_status {
346			return
347		}
348
349		// Shift sessions when current session index changes
350		if self.session_status.current_index != session_status.current_index {
351			let next_session = std::mem::take(&mut self.next_kx_pair)
352				.map_or(SessionSlot::Empty, SessionSlot::KxPair);
353			match session_status.current_index.saturating_sub(self.session_status.current_index) {
354				1 =>
355					self.sessions.prev = std::mem::replace(&mut self.sessions.current, next_session),
356				2 => {
357					self.sessions.prev = next_session;
358					self.sessions.current = SessionSlot::Empty;
359				},
360				_ =>
361					if !self.sessions.is_empty() || !next_session.is_empty() {
362						debug!(
363							target: self.config.log_target,
364							"Unexpected session index {}; previous session index was {}",
365							session_status.current_index,
366							self.session_status.current_index
367						);
368						self.sessions =
369							Sessions { current: SessionSlot::Empty, prev: SessionSlot::Empty };
370					},
371			}
372		}
373
374		// Discard previous session if it is not needed. Also, avoid ever having a previous session
375		// when the current session index is 0... there is no sensible index for it.
376		if !session_status.phase.need_prev() || (session_status.current_index == 0) {
377			self.sessions.prev = SessionSlot::Disabled;
378		}
379
380		// For simplicity just assume these have changed. This should happen at most once a minute
381		// or so.
382		self.events |=
383			Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
384
385		self.session_status = session_status;
386
387		info!(target: self.config.log_target, "Session status changed: {session_status}");
388	}
389
390	/// Sets the mixnodes for the specified session, if they are needed. If `mixnodes()` returns
391	/// `Err(MixnodesErr::Permanent)`, the session slot will be disabled, and later calls to
392	/// `maybe_set_mixnodes` for the session will return immediately. If `mixnodes()` returns
393	/// `Err(MixnodesErr::Transient)`, the session slot will merely remain empty, and later calls to
394	/// `maybe_set_mixnodes` may succeed.
395	///
396	/// The mixnode peer IDs are used for two things:
397	///
398	/// - Checking for connectivity (they are passed to [`NetworkStatus::is_connected`]).
399	/// - Sending packets (they are put in [`AddressedPacket::peer_id`]).
400	pub fn maybe_set_mixnodes(
401		&mut self,
402		rel_session_index: RelSessionIndex,
403		mixnodes: &mut dyn FnMut() -> Result<Vec<Mixnode<X>>, MixnodesErr>,
404	) {
405		let session = &mut self.sessions[rel_session_index];
406		if !matches!(session, SessionSlot::Empty | SessionSlot::KxPair(_)) {
407			return
408		}
409
410		let session_index = rel_session_index + self.session_status.current_index;
411		let mut rng = rand::thread_rng();
412
413		// Determine mixnodes
414		let mut mixnodes = match mixnodes() {
415			Ok(mixnodes) => mixnodes,
416			Err(err) => {
417				if matches!(err, MixnodesErr::Permanent) {
418					*session = SessionSlot::Disabled;
419				}
420				return
421			},
422		};
423		let max_mixnodes = (MAX_MIXNODE_INDEX + 1) as usize;
424		if mixnodes.len() > max_mixnodes {
425			debug!(
426				target: self.config.log_target,
427				"Session {session_index}: Too many mixnodes ({}, max {max_mixnodes}); ignoring excess",
428				mixnodes.len()
429			);
430			mixnodes.truncate(max_mixnodes);
431		}
432
433		// Determine key-exchange key pair for the local node. Note that from this point on, we are
434		// guaranteed to either panic or overwrite *session.
435		let kx_pair = match std::mem::replace(session, SessionSlot::Empty) {
436			SessionSlot::KxPair(kx_pair) => kx_pair,
437			_ => KxPair::gen(&mut rng),
438		};
439
440		// Build Topology struct
441		let topology =
442			Topology::new(&mut rng, mixnodes, kx_pair.public(), self.config.num_gateway_mixnodes);
443
444		// Determine session config
445		let config = if topology.is_mixnode() {
446			&self.config.mixnode_session
447		} else {
448			match &self.config.non_mixnode_session {
449				Some(config) => config,
450				None => {
451					info!(target: self.config.log_target,
452						"Session {session_index}: Local node is not a mixnode; \
453						disabling mixnet as per configuration");
454					*session = SessionSlot::Disabled;
455					return
456				},
457			}
458		};
459
460		info!(target: self.config.log_target, "Session {session_index}: {topology}");
461
462		// Build Session struct
463		*session = SessionSlot::Full(Session {
464			kx_pair,
465			topology,
466			authored_packet_queue: AuthoredPacketQueue::new(config.authored_packet_queue),
467			mean_authored_packet_period: config.mean_authored_packet_period,
468			replay_filter: ReplayFilter::new(&mut rng),
469		});
470
471		self.events |=
472			Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
473	}
474
475	/// Returns the key-exchange public key for the next session.
476	pub fn next_kx_public(&mut self) -> &KxPublic {
477		self.next_kx_pair
478			.get_or_insert_with(|| KxPair::gen(&mut rand::thread_rng()))
479			.public()
480	}
481
482	/// Returns the mixnodes we should try to maintain connections to.
483	pub fn reserved_peers(&self) -> impl Iterator<Item = &Mixnode<X>> {
484		self.sessions.iter().flat_map(|session| session.topology.reserved_peers())
485	}
486
487	/// Handle an incoming packet. If the packet completes a message, the message is returned.
488	/// Otherwise, [`None`] is returned.
489	pub fn handle_packet(&mut self, packet: &Packet) -> Option<Message> {
490		let mut out = [0; PACKET_SIZE];
491		let res = self.sessions.enumerate_mut().find_map(|(rel_session_index, session)| {
492			let kx_shared_secret = session.kx_pair.exchange(kx_public(packet));
493
494			let replay_tag = session.replay_filter.tag(&kx_shared_secret);
495			if session.replay_filter.contains(replay_tag) {
496				return Some(Err(Either::Left("Packet found in replay filter")))
497			}
498
499			match peel(&mut out, packet, &kx_shared_secret) {
500				// Bad MAC possibly means we used the wrong secret; try other session
501				Err(PeelErr::Mac) => None,
502				// Any other error means the packet is bad; just discard it
503				Err(err) => Some(Err(Either::Right(err))),
504				Ok(action) => Some(Ok((action, rel_session_index, session, replay_tag))),
505			}
506		});
507
508		let (action, rel_session_index, session, replay_tag) = match res {
509			None => {
510				// This will usually get hit quite a bit on session changeover after we discard the
511				// keys for the previous session. It may get hit just before a new session if other
512				// nodes switch sooner.
513				trace!(
514					target: self.config.log_target,
515					"Failed to peel packet; either bad MAC or unknown secret"
516				);
517				return None
518			},
519			Some(Err(err)) => {
520				debug!(target: self.config.log_target, "Failed to peel packet: {err}");
521				return None
522			},
523			Some(Ok(x)) => x,
524		};
525
526		match action {
527			Action::ForwardTo { target, delay } => {
528				if !session.topology.is_mixnode() {
529					debug!(target: self.config.log_target,
530						"Received packet to forward despite not being a mixnode in the session; discarding");
531					return None
532				}
533
534				if !self.forward_packet_queue.has_space() {
535					debug!(target: self.config.log_target, "Dropped forward packet; forward queue full");
536					return None
537				}
538
539				// After the is_mixnode check to avoid inserting anything into the replay filters
540				// for sessions where we are not a mixnode
541				session.replay_filter.insert(replay_tag);
542
543				match session.topology.target_to_peer_id(&target) {
544					Ok(peer_id) => {
545						let deadline =
546							Instant::now() + delay.to_duration(self.config.mean_forwarding_delay);
547						let packet = AddressedPacket { peer_id, packet: out.into() };
548						if self.forward_packet_queue.insert(deadline, packet) {
549							self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
550						}
551					},
552					Err(err) => debug!(
553						target: self.config.log_target,
554						"Failed to map target {target:?} to peer ID: {err}"
555					),
556				}
557
558				None
559			},
560			Action::DeliverRequest => {
561				let payload_data = array_ref![out, 0, PAYLOAD_DATA_SIZE];
562
563				if !session.topology.is_mixnode() {
564					debug!(target: self.config.log_target,
565						"Received request packet despite not being a mixnode in the session; discarding");
566					return None
567				}
568
569				// After the is_mixnode check to avoid inserting anything into the replay filters
570				// for sessions where we are not a mixnode
571				session.replay_filter.insert(replay_tag);
572
573				// Add to fragment assembler and return any completed message
574				self.fragment_assembler.insert(payload_data, self.config.log_target).map(
575					|message| {
576						Message::Request(RequestMessage {
577							session_index: rel_session_index + self.session_status.current_index,
578							id: message.id,
579							data: message.data,
580							surbs: message.surbs,
581						})
582					},
583				)
584			},
585			Action::DeliverReply { surb_id } => {
586				let payload = array_mut_ref![out, 0, PAYLOAD_SIZE];
587
588				// Note that we do not insert anything into the replay filter here. The SURB ID
589				// lookup will fail for replayed SURBs, so explicit replay prevention is not
590				// necessary. The main reason for avoiding the replay filter here is so that it
591				// does not need to be allocated at all for sessions where we are not a mixnode.
592
593				// Lookup payload encryption keys and decrypt payload. The original request message
594				// ID is stored alongside the keys; it is simply returned with any completed
595				// message to provide context.
596				let Some(entry) = self.surb_keystore.entry(&surb_id) else {
597					debug!(target: self.config.log_target,
598						"Received reply with unrecognised SURB ID {surb_id:x?}; discarding");
599					return None
600				};
601				let request_id = *entry.message_id();
602				let res = decrypt_reply_payload(payload, entry.keys());
603				entry.remove();
604				if let Err(err) = res {
605					debug!(target: self.config.log_target, "Failed to decrypt reply payload: {err}");
606					return None
607				}
608				let payload_data = array_ref![payload, 0, PAYLOAD_DATA_SIZE];
609
610				// Add to fragment assembler and return any completed message
611				self.fragment_assembler.insert(payload_data, self.config.log_target).map(
612					|message| {
613						if !message.surbs.is_empty() {
614							debug!(target: self.config.log_target,
615								"Reply message included SURBs; discarding them");
616						}
617						Message::Reply(ReplyMessage { request_id, data: message.data })
618					},
619				)
620			},
621			Action::DeliverCover { cover_id: _ } => None,
622		}
623	}
624
625	/// Returns the next instant at which
626	/// [`pop_next_forward_packet`](Self::pop_next_forward_packet) should be called. [`None`] means
627	/// never.
628	pub fn next_forward_packet_deadline(&self) -> Option<Instant> {
629		self.forward_packet_queue.next_deadline()
630	}
631
632	/// Pop and return the packet at the head of the forward packet queue. Returns [`None`] if the
633	/// queue is empty.
634	pub fn pop_next_forward_packet(&mut self) -> Option<AddressedPacket> {
635		self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
636		self.forward_packet_queue.pop()
637	}
638
639	/// Returns the delay after which [`pop_next_authored_packet`](Self::pop_next_authored_packet)
640	/// should be called. [`None`] means an infinite delay.
641	pub fn next_authored_packet_delay(&self) -> Option<Duration> {
642		// Determine the mean period
643		let means: ArrayVec<_, 2> = self
644			.sessions
645			.iter()
646			.map(|session| session.mean_authored_packet_period.as_secs_f64())
647			.collect();
648		let mean = match means.into_inner() {
649			// Both sessions active. Send at half rate in each. Note that pop_next_authored_packet
650			// will choose between the sessions randomly based on their rates.
651			Ok(means) => (2.0 * means[0] * means[1]) / (means[0] + means[1]),
652			Err(mut means) => {
653				let mean = means.pop()?;
654				// Just one session active
655				if self.session_status.phase.need_prev() {
656					// Both sessions _should_ be active. Send at half rate.
657					2.0 * mean
658				} else {
659					mean
660				}
661			},
662		};
663
664		let delay: f64 = rand::thread_rng().sample(rand_distr::Exp1);
665		// Cap at 10x the mean; this is about the 99.995th percentile. This avoids potential panics
666		// in from_secs_f64() due to overflow.
667		Some(Duration::from_secs_f64(delay.min(10.0) * mean))
668	}
669
670	/// Either generate and return a cover packet or pop and return the packet at the head of one
671	/// of the authored packet queues. May return [`None`] if cover packets are disabled, we fail
672	/// to generate a cover packet, or there are no active sessions (though in the no active
673	/// sessions case [`next_authored_packet_delay`](Self::next_authored_packet_delay) should
674	/// return [`None`] and so this function should not really be called).
675	pub fn pop_next_authored_packet(&mut self, ns: &dyn NetworkStatus) -> Option<AddressedPacket> {
676		// This function should be called according to a Poisson process. Randomly choosing between
677		// sessions and cover kinds here is equivalent to there being multiple independent Poisson
678		// processes; see https://www.randomservices.org/random/poisson/Splitting.html
679		let mut rng = rand::thread_rng();
680
681		// First pick the session
682		let sessions: ArrayVec<_, 2> = self.sessions.enumerate_mut().collect();
683		let (rel_session_index, session) = match sessions.into_inner() {
684			Ok(sessions) => {
685				// Both sessions active. We choose randomly based on their rates.
686				let periods = sessions
687					// TODO This could be replaced with .each_ref() once it is stabilised, allowing
688					// the collect/into_inner/expect at the end to be dropped
689					.iter()
690					.map(|(_, session)| session.mean_authored_packet_period.as_secs_f64())
691					.collect::<ArrayVec<_, 2>>()
692					.into_inner()
693					.expect("Input is array of length 2");
694				let [session_0, session_1] = sessions;
695				// Rate is 1/period, and (1/a)/((1/a)+(1/b)) = b/(a+b)
696				if rng.gen_bool(periods[1] / (periods[0] + periods[1])) {
697					session_0
698				} else {
699					session_1
700				}
701			},
702			// Either just one active session or no active sessions. This function shouldn't really
703			// be called in the latter case, as next_authored_packet_delay() should return None.
704			Err(mut sessions) => sessions.pop()?,
705		};
706
707		self.events |= Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
708
709		// Choose randomly between drop and loop cover packet
710		let cover_kind = if rng.gen_bool(self.config.loop_cover_proportion) {
711			CoverKind::Loop
712		} else {
713			CoverKind::Drop
714		};
715
716		// Maybe replace drop cover packet with request or reply packet from queue
717		if (cover_kind == CoverKind::Drop) &&
718			self.session_status.phase.allow_requests_and_replies(rel_session_index)
719		{
720			let (packet, space) = session.authored_packet_queue.pop();
721			if space {
722				self.events |= Events::SPACE_IN_AUTHORED_PACKET_QUEUE;
723			}
724			if packet.is_some() {
725				return packet
726			}
727		}
728
729		if !self.config.gen_cover_packets {
730			return None
731		}
732
733		// Generate cover packet
734		match gen_cover_packet(&mut rng, &session.topology, ns, cover_kind, self.config.num_hops) {
735			Ok(packet) => Some(packet),
736			Err(err) => {
737				if (self.session_status.phase == SessionPhase::CoverToCurrent) &&
738					(rel_session_index == RelSessionIndex::Current) &&
739					matches!(err, TopologyErr::NoConnectedGatewayMixnodes)
740				{
741					// Possibly still connecting to mixnodes
742					trace!(target: self.config.log_target, "Failed to generate cover packet: {err}");
743				} else {
744					debug!(target: self.config.log_target, "Failed to generate cover packet: {err}");
745				}
746				None
747			},
748		}
749	}
750
751	/// Post a request message. If `destination_index` is [`None`], a destination mixnode is chosen
752	/// at random and (on success) its index is written back to `destination_index`. The message is
753	/// split into fragments and each fragment is sent over a different path to the destination.
754	pub fn post_request(
755		&mut self,
756		session_index: SessionIndex,
757		destination_index: &mut Option<MixnodeIndex>,
758		message_id: &MessageId,
759		data: Scattered<u8>,
760		num_surbs: usize,
761		ns: &dyn NetworkStatus,
762	) -> Result<RequestMetrics, PostErr> {
763		// Split the message into fragments
764		let fragment_blueprints = match fragment_blueprints(message_id, data, num_surbs) {
765			Some(fragment_blueprints)
766				if fragment_blueprints.len() <= self.config.max_fragments_per_message =>
767				fragment_blueprints,
768			_ => return Err(PostErr::TooManyFragments),
769		};
770
771		// Grab the session and check there's room in the queue
772		let session = post_session(&mut self.sessions, self.session_status, session_index)?;
773		session.authored_packet_queue.check_space(fragment_blueprints.len())?;
774
775		// Generate the packets and push them into the queue
776		let mut rng = rand::thread_rng();
777		let request_builder =
778			RequestBuilder::new(&mut rng, &session.topology, ns, *destination_index)?;
779		let mut request_hops = 0;
780		let mut request_forwarding_delay = Delay::zero();
781		let mut reply_hops = 0;
782		let mut reply_forwarding_delay = Delay::zero();
783		for fragment_blueprint in fragment_blueprints {
784			let (packet, metrics) = request_builder.build_packet(
785				&mut rng,
786				|fragment, rng| {
787					fragment_blueprint.write_except_surbs(fragment);
788					for surb in fragment_blueprint.surbs(fragment) {
789						// TODO Currently we don't clean up keystore entries on failure
790						let (id, keys) =
791							self.surb_keystore.insert(rng, message_id, self.config.log_target);
792						let num_hops = self.config.num_hops;
793						let metrics = request_builder.build_surb(surb, keys, rng, &id, num_hops)?;
794						reply_hops = max(reply_hops, metrics.num_hops);
795						reply_forwarding_delay =
796							max(reply_forwarding_delay, metrics.forwarding_delay);
797					}
798					Ok(())
799				},
800				self.config.num_hops,
801			)?;
802			session.authored_packet_queue.push(packet);
803			request_hops = max(request_hops, metrics.num_hops);
804			request_forwarding_delay = max(request_forwarding_delay, metrics.forwarding_delay);
805		}
806
807		// Calculate metrics
808		let metrics = RequestMetrics {
809			num_hops: request_hops + reply_hops,
810			per_hop_net_delay: self.config.per_hop_net_delay,
811			forwarding_delay: (request_forwarding_delay + reply_forwarding_delay)
812				.to_duration(self.config.mean_forwarding_delay),
813			authored_packet_queue_delay: estimate_authored_packet_queue_delay(
814				&self.config,
815				session,
816			),
817		};
818
819		*destination_index = Some(request_builder.destination_index());
820		Ok(metrics)
821	}
822
823	/// Post a reply message using SURBs. The session index must match the session the SURBs were
824	/// generated for. SURBs are removed from `surbs` on use.
825	pub fn post_reply(
826		&mut self,
827		surbs: &mut Vec<Surb>,
828		session_index: SessionIndex,
829		message_id: &MessageId,
830		data: Scattered<u8>,
831	) -> Result<(), PostErr> {
832		// Split the message into fragments
833		let fragment_blueprints = match fragment_blueprints(message_id, data, 0) {
834			Some(fragment_blueprints)
835				if fragment_blueprints.len() <=
836					min(self.config.max_fragments_per_message, surbs.len()) =>
837				fragment_blueprints,
838			_ => return Err(PostErr::TooManyFragments),
839		};
840
841		// Grab the session and check there's room in the queue
842		let session = post_session(&mut self.sessions, self.session_status, session_index)?;
843		session.authored_packet_queue.check_space(fragment_blueprints.len())?;
844
845		// Generate the packets and push them into the queue
846		for fragment_blueprint in fragment_blueprints {
847			let mut packet = default_boxed_array();
848			fragment_blueprint.write_except_surbs(mut_payload_data(&mut packet));
849			let mixnode_index = complete_reply_packet(
850				&mut packet,
851				&surbs.pop().expect("Checked number of SURBs above"),
852			)
853			.ok_or(PostErr::BadSurb)?;
854			let peer_id = session.topology.mixnode_index_to_peer_id(mixnode_index)?;
855			session.authored_packet_queue.push(AddressedPacket { peer_id, packet });
856		}
857
858		Ok(())
859	}
860
861	/// Clear the event flags. Returns the flags that were cleared.
862	pub fn take_events(&mut self) -> Events {
863		let events = self.events;
864		self.events = Events::empty();
865		events
866	}
867}