1#![allow(clippy::ptr_offset_with_cast)]
25
26mod config;
27mod cover;
28mod fragment;
29mod kx_pair;
30mod packet_queues;
31mod replay_filter;
32mod request_builder;
33mod scattered;
34mod sessions;
35mod sphinx;
36mod surb_keystore;
37mod topology;
38mod util;
39
40pub use self::{
41 config::{Config, SessionConfig},
42 fragment::{MessageId, MESSAGE_ID_SIZE},
43 packet_queues::AddressedPacket,
44 scattered::Scattered,
45 sessions::{RelSessionIndex, SessionIndex, SessionPhase, SessionStatus},
46 sphinx::{
47 Delay, KxPublic, KxSecret, MixnodeIndex, Packet, PeerId, RawMixnodeIndex, Surb,
48 KX_PUBLIC_SIZE, MAX_HOPS, MAX_MIXNODE_INDEX, PACKET_SIZE, PEER_ID_SIZE, SURB_SIZE,
49 },
50 topology::{Mixnode, NetworkStatus, TopologyErr},
51};
52use self::{
53 cover::{gen_cover_packet, CoverKind},
54 fragment::{fragment_blueprints, FragmentAssembler},
55 kx_pair::KxPair,
56 packet_queues::{AuthoredPacketQueue, CheckSpaceErr, ForwardPacketQueue},
57 replay_filter::ReplayFilter,
58 request_builder::RequestBuilder,
59 sessions::{Session, SessionSlot, Sessions},
60 sphinx::{
61 complete_reply_packet, decrypt_reply_payload, kx_public, mut_payload_data, peel, Action,
62 PeelErr, PAYLOAD_DATA_SIZE, PAYLOAD_SIZE,
63 },
64 surb_keystore::SurbKeystore,
65 topology::Topology,
66 util::default_boxed_array,
67};
68use arrayref::{array_mut_ref, array_ref};
69use arrayvec::ArrayVec;
70use bitflags::bitflags;
71use either::Either;
72use log::{debug, info, trace};
73use rand::Rng;
74use std::{
75 cmp::{max, min},
76 time::{Duration, Instant},
77};
78
79pub enum MixnodesErr {
81 Transient,
83 Permanent,
85}
86
87#[derive(Debug, PartialEq, Eq)]
89pub struct RequestMessage {
90 pub session_index: SessionIndex,
93 pub id: MessageId,
95 pub data: Vec<u8>,
97 pub surbs: Vec<Surb>,
99}
100
101#[derive(Debug, PartialEq, Eq)]
103pub struct ReplyMessage {
104 pub request_id: MessageId,
106 pub data: Vec<u8>,
108}
109
110#[derive(Debug, PartialEq, Eq)]
112pub enum Message {
113 Request(RequestMessage),
115 Reply(ReplyMessage),
117}
118
119#[derive(Debug, thiserror::Error)]
121pub enum PostErr {
122 #[error("Message would need to be split into too many fragments")]
124 TooManyFragments,
125 #[error("Session {0} is no longer active")]
127 SessionNoLongerActive(SessionIndex),
128 #[error("Session {0} is not active yet")]
130 SessionNotActiveYet(SessionIndex),
131 #[error("Mixnodes not yet known for session {0}")]
133 SessionMixnodesNotKnown(SessionIndex),
134 #[error("Mixnet disabled for session {0}")]
136 SessionDisabled(SessionIndex),
137 #[error("There is not enough space in the authored packet queue")]
139 NotEnoughSpaceInQueue,
140 #[error("Topology error: {0}")]
142 Topology(#[from] TopologyErr),
143 #[error("Bad SURB")]
145 BadSurb,
146}
147
148fn post_session<X>(
149 sessions: &mut Sessions<X>,
150 status: SessionStatus,
151 index: SessionIndex,
152) -> Result<&mut Session<X>, PostErr> {
153 let Some(rel_index) = RelSessionIndex::from_session_index(index, status.current_index) else {
154 return Err(if index < status.current_index {
155 PostErr::SessionNoLongerActive(index)
156 } else {
157 PostErr::SessionNotActiveYet(index)
158 })
159 };
160 if !status.phase.allow_requests_and_replies(rel_index) {
161 return Err(match rel_index {
162 RelSessionIndex::Prev => PostErr::SessionNoLongerActive(index),
163 RelSessionIndex::Current => PostErr::SessionNotActiveYet(index),
164 })
165 }
166 match &mut sessions[rel_index] {
167 SessionSlot::Empty | SessionSlot::KxPair(_) => Err(PostErr::SessionMixnodesNotKnown(index)),
168 SessionSlot::Disabled => Err(PostErr::SessionDisabled(index)),
171 SessionSlot::Full(session) => Ok(session),
172 }
173}
174
175impl From<CheckSpaceErr> for PostErr {
176 fn from(value: CheckSpaceErr) -> Self {
177 match value {
178 CheckSpaceErr::Capacity => PostErr::TooManyFragments,
179 CheckSpaceErr::Len => PostErr::NotEnoughSpaceInQueue,
180 }
181 }
182}
183
184fn estimate_authored_packet_queue_delay<X>(config: &Config, session: &Session<X>) -> Duration {
188 let rate_mul =
189 0.5 *
191 (1.0 - config.loop_cover_proportion);
193 let request_period = session.mean_authored_packet_period.div_f64(rate_mul);
194 let request_len = session.authored_packet_queue.len();
195 let reply_period = config.mixnode_session.mean_authored_packet_period.div_f64(rate_mul);
197 let reply_len = config.mixnode_session.authored_packet_queue.capacity; let (s, n, m, rs) = if request_period > reply_period {
224 (request_period, request_len, reply_len, reply_period)
225 } else {
226 (reply_period, reply_len, request_len, request_period)
227 };
228 let n = n as f64;
229 let m = m as f64;
230 let r = rs.as_secs_f64() / s.as_secs_f64();
231 s.mul_f64(4.92582 + (3.87809 * (n + (r * r * r * m)).sqrt()) + n + (r * m))
232}
233
234pub struct RequestMetrics {
236 pub num_hops: usize,
239 pub per_hop_net_delay: Duration,
241 pub forwarding_delay: Duration,
244 pub authored_packet_queue_delay: Duration,
247}
248
249impl RequestMetrics {
250 pub fn estimate_rtt(&self, handling_delay: Duration) -> Duration {
254 let net_delay = self.per_hop_net_delay * (self.num_hops as u32);
255 self.forwarding_delay + self.authored_packet_queue_delay + net_delay + handling_delay
256 }
257}
258
259bitflags! {
260 pub struct Events: u32 {
262 const RESERVED_PEERS_CHANGED = 0b1;
264 const NEXT_FORWARD_PACKET_DEADLINE_CHANGED = 0b10;
266 const NEXT_AUTHORED_PACKET_DEADLINE_CHANGED = 0b100;
272 const SPACE_IN_AUTHORED_PACKET_QUEUE = 0b1000;
274 }
275}
276
277pub struct Mixnet<X> {
280 config: Config,
281
282 session_status: SessionStatus,
284 sessions: Sessions<X>,
286 next_kx_pair: Option<KxPair>,
288
289 forward_packet_queue: ForwardPacketQueue,
291
292 surb_keystore: SurbKeystore,
294 fragment_assembler: FragmentAssembler,
297
298 events: Events,
300}
301
302impl<X> Mixnet<X> {
303 pub fn new(config: Config) -> Self {
305 let sessions = Sessions {
306 current: config
307 .session_0_kx_secret
308 .map_or(SessionSlot::Empty, |secret| SessionSlot::KxPair(secret.into())),
309 prev: SessionSlot::Disabled,
310 };
311
312 let forward_packet_queue = ForwardPacketQueue::new(config.forward_packet_queue_capacity);
313
314 let surb_keystore = SurbKeystore::new(config.surb_keystore_capacity);
315 let fragment_assembler = FragmentAssembler::new(
316 config.max_incomplete_messages,
317 config.max_incomplete_fragments,
318 config.max_fragments_per_message,
319 );
320
321 Self {
322 config,
323
324 session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
325 sessions,
326 next_kx_pair: None,
327
328 forward_packet_queue,
329
330 surb_keystore,
331 fragment_assembler,
332
333 events: Events::empty(),
334 }
335 }
336
337 pub fn session_status(&self) -> SessionStatus {
339 self.session_status
340 }
341
342 pub fn set_session_status(&mut self, session_status: SessionStatus) {
345 if self.session_status == session_status {
346 return
347 }
348
349 if self.session_status.current_index != session_status.current_index {
351 let next_session = std::mem::take(&mut self.next_kx_pair)
352 .map_or(SessionSlot::Empty, SessionSlot::KxPair);
353 match session_status.current_index.saturating_sub(self.session_status.current_index) {
354 1 =>
355 self.sessions.prev = std::mem::replace(&mut self.sessions.current, next_session),
356 2 => {
357 self.sessions.prev = next_session;
358 self.sessions.current = SessionSlot::Empty;
359 },
360 _ =>
361 if !self.sessions.is_empty() || !next_session.is_empty() {
362 debug!(
363 target: self.config.log_target,
364 "Unexpected session index {}; previous session index was {}",
365 session_status.current_index,
366 self.session_status.current_index
367 );
368 self.sessions =
369 Sessions { current: SessionSlot::Empty, prev: SessionSlot::Empty };
370 },
371 }
372 }
373
374 if !session_status.phase.need_prev() || (session_status.current_index == 0) {
377 self.sessions.prev = SessionSlot::Disabled;
378 }
379
380 self.events |=
383 Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
384
385 self.session_status = session_status;
386
387 info!(target: self.config.log_target, "Session status changed: {session_status}");
388 }
389
390 pub fn maybe_set_mixnodes(
401 &mut self,
402 rel_session_index: RelSessionIndex,
403 mixnodes: &mut dyn FnMut() -> Result<Vec<Mixnode<X>>, MixnodesErr>,
404 ) {
405 let session = &mut self.sessions[rel_session_index];
406 if !matches!(session, SessionSlot::Empty | SessionSlot::KxPair(_)) {
407 return
408 }
409
410 let session_index = rel_session_index + self.session_status.current_index;
411 let mut rng = rand::thread_rng();
412
413 let mut mixnodes = match mixnodes() {
415 Ok(mixnodes) => mixnodes,
416 Err(err) => {
417 if matches!(err, MixnodesErr::Permanent) {
418 *session = SessionSlot::Disabled;
419 }
420 return
421 },
422 };
423 let max_mixnodes = (MAX_MIXNODE_INDEX + 1) as usize;
424 if mixnodes.len() > max_mixnodes {
425 debug!(
426 target: self.config.log_target,
427 "Session {session_index}: Too many mixnodes ({}, max {max_mixnodes}); ignoring excess",
428 mixnodes.len()
429 );
430 mixnodes.truncate(max_mixnodes);
431 }
432
433 let kx_pair = match std::mem::replace(session, SessionSlot::Empty) {
436 SessionSlot::KxPair(kx_pair) => kx_pair,
437 _ => KxPair::gen(&mut rng),
438 };
439
440 let topology =
442 Topology::new(&mut rng, mixnodes, kx_pair.public(), self.config.num_gateway_mixnodes);
443
444 let config = if topology.is_mixnode() {
446 &self.config.mixnode_session
447 } else {
448 match &self.config.non_mixnode_session {
449 Some(config) => config,
450 None => {
451 info!(target: self.config.log_target,
452 "Session {session_index}: Local node is not a mixnode; \
453 disabling mixnet as per configuration");
454 *session = SessionSlot::Disabled;
455 return
456 },
457 }
458 };
459
460 info!(target: self.config.log_target, "Session {session_index}: {topology}");
461
462 *session = SessionSlot::Full(Session {
464 kx_pair,
465 topology,
466 authored_packet_queue: AuthoredPacketQueue::new(config.authored_packet_queue),
467 mean_authored_packet_period: config.mean_authored_packet_period,
468 replay_filter: ReplayFilter::new(&mut rng),
469 });
470
471 self.events |=
472 Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
473 }
474
475 pub fn next_kx_public(&mut self) -> &KxPublic {
477 self.next_kx_pair
478 .get_or_insert_with(|| KxPair::gen(&mut rand::thread_rng()))
479 .public()
480 }
481
482 pub fn reserved_peers(&self) -> impl Iterator<Item = &Mixnode<X>> {
484 self.sessions.iter().flat_map(|session| session.topology.reserved_peers())
485 }
486
487 pub fn handle_packet(&mut self, packet: &Packet) -> Option<Message> {
490 let mut out = [0; PACKET_SIZE];
491 let res = self.sessions.enumerate_mut().find_map(|(rel_session_index, session)| {
492 let kx_shared_secret = session.kx_pair.exchange(kx_public(packet));
493
494 let replay_tag = session.replay_filter.tag(&kx_shared_secret);
495 if session.replay_filter.contains(replay_tag) {
496 return Some(Err(Either::Left("Packet found in replay filter")))
497 }
498
499 match peel(&mut out, packet, &kx_shared_secret) {
500 Err(PeelErr::Mac) => None,
502 Err(err) => Some(Err(Either::Right(err))),
504 Ok(action) => Some(Ok((action, rel_session_index, session, replay_tag))),
505 }
506 });
507
508 let (action, rel_session_index, session, replay_tag) = match res {
509 None => {
510 trace!(
514 target: self.config.log_target,
515 "Failed to peel packet; either bad MAC or unknown secret"
516 );
517 return None
518 },
519 Some(Err(err)) => {
520 debug!(target: self.config.log_target, "Failed to peel packet: {err}");
521 return None
522 },
523 Some(Ok(x)) => x,
524 };
525
526 match action {
527 Action::ForwardTo { target, delay } => {
528 if !session.topology.is_mixnode() {
529 debug!(target: self.config.log_target,
530 "Received packet to forward despite not being a mixnode in the session; discarding");
531 return None
532 }
533
534 if !self.forward_packet_queue.has_space() {
535 debug!(target: self.config.log_target, "Dropped forward packet; forward queue full");
536 return None
537 }
538
539 session.replay_filter.insert(replay_tag);
542
543 match session.topology.target_to_peer_id(&target) {
544 Ok(peer_id) => {
545 let deadline =
546 Instant::now() + delay.to_duration(self.config.mean_forwarding_delay);
547 let packet = AddressedPacket { peer_id, packet: out.into() };
548 if self.forward_packet_queue.insert(deadline, packet) {
549 self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
550 }
551 },
552 Err(err) => debug!(
553 target: self.config.log_target,
554 "Failed to map target {target:?} to peer ID: {err}"
555 ),
556 }
557
558 None
559 },
560 Action::DeliverRequest => {
561 let payload_data = array_ref![out, 0, PAYLOAD_DATA_SIZE];
562
563 if !session.topology.is_mixnode() {
564 debug!(target: self.config.log_target,
565 "Received request packet despite not being a mixnode in the session; discarding");
566 return None
567 }
568
569 session.replay_filter.insert(replay_tag);
572
573 self.fragment_assembler.insert(payload_data, self.config.log_target).map(
575 |message| {
576 Message::Request(RequestMessage {
577 session_index: rel_session_index + self.session_status.current_index,
578 id: message.id,
579 data: message.data,
580 surbs: message.surbs,
581 })
582 },
583 )
584 },
585 Action::DeliverReply { surb_id } => {
586 let payload = array_mut_ref![out, 0, PAYLOAD_SIZE];
587
588 let Some(entry) = self.surb_keystore.entry(&surb_id) else {
597 debug!(target: self.config.log_target,
598 "Received reply with unrecognised SURB ID {surb_id:x?}; discarding");
599 return None
600 };
601 let request_id = *entry.message_id();
602 let res = decrypt_reply_payload(payload, entry.keys());
603 entry.remove();
604 if let Err(err) = res {
605 debug!(target: self.config.log_target, "Failed to decrypt reply payload: {err}");
606 return None
607 }
608 let payload_data = array_ref![payload, 0, PAYLOAD_DATA_SIZE];
609
610 self.fragment_assembler.insert(payload_data, self.config.log_target).map(
612 |message| {
613 if !message.surbs.is_empty() {
614 debug!(target: self.config.log_target,
615 "Reply message included SURBs; discarding them");
616 }
617 Message::Reply(ReplyMessage { request_id, data: message.data })
618 },
619 )
620 },
621 Action::DeliverCover { cover_id: _ } => None,
622 }
623 }
624
625 pub fn next_forward_packet_deadline(&self) -> Option<Instant> {
629 self.forward_packet_queue.next_deadline()
630 }
631
632 pub fn pop_next_forward_packet(&mut self) -> Option<AddressedPacket> {
635 self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
636 self.forward_packet_queue.pop()
637 }
638
639 pub fn next_authored_packet_delay(&self) -> Option<Duration> {
642 let means: ArrayVec<_, 2> = self
644 .sessions
645 .iter()
646 .map(|session| session.mean_authored_packet_period.as_secs_f64())
647 .collect();
648 let mean = match means.into_inner() {
649 Ok(means) => (2.0 * means[0] * means[1]) / (means[0] + means[1]),
652 Err(mut means) => {
653 let mean = means.pop()?;
654 if self.session_status.phase.need_prev() {
656 2.0 * mean
658 } else {
659 mean
660 }
661 },
662 };
663
664 let delay: f64 = rand::thread_rng().sample(rand_distr::Exp1);
665 Some(Duration::from_secs_f64(delay.min(10.0) * mean))
668 }
669
670 pub fn pop_next_authored_packet(&mut self, ns: &dyn NetworkStatus) -> Option<AddressedPacket> {
676 let mut rng = rand::thread_rng();
680
681 let sessions: ArrayVec<_, 2> = self.sessions.enumerate_mut().collect();
683 let (rel_session_index, session) = match sessions.into_inner() {
684 Ok(sessions) => {
685 let periods = sessions
687 .iter()
690 .map(|(_, session)| session.mean_authored_packet_period.as_secs_f64())
691 .collect::<ArrayVec<_, 2>>()
692 .into_inner()
693 .expect("Input is array of length 2");
694 let [session_0, session_1] = sessions;
695 if rng.gen_bool(periods[1] / (periods[0] + periods[1])) {
697 session_0
698 } else {
699 session_1
700 }
701 },
702 Err(mut sessions) => sessions.pop()?,
705 };
706
707 self.events |= Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
708
709 let cover_kind = if rng.gen_bool(self.config.loop_cover_proportion) {
711 CoverKind::Loop
712 } else {
713 CoverKind::Drop
714 };
715
716 if (cover_kind == CoverKind::Drop) &&
718 self.session_status.phase.allow_requests_and_replies(rel_session_index)
719 {
720 let (packet, space) = session.authored_packet_queue.pop();
721 if space {
722 self.events |= Events::SPACE_IN_AUTHORED_PACKET_QUEUE;
723 }
724 if packet.is_some() {
725 return packet
726 }
727 }
728
729 if !self.config.gen_cover_packets {
730 return None
731 }
732
733 match gen_cover_packet(&mut rng, &session.topology, ns, cover_kind, self.config.num_hops) {
735 Ok(packet) => Some(packet),
736 Err(err) => {
737 if (self.session_status.phase == SessionPhase::CoverToCurrent) &&
738 (rel_session_index == RelSessionIndex::Current) &&
739 matches!(err, TopologyErr::NoConnectedGatewayMixnodes)
740 {
741 trace!(target: self.config.log_target, "Failed to generate cover packet: {err}");
743 } else {
744 debug!(target: self.config.log_target, "Failed to generate cover packet: {err}");
745 }
746 None
747 },
748 }
749 }
750
751 pub fn post_request(
755 &mut self,
756 session_index: SessionIndex,
757 destination_index: &mut Option<MixnodeIndex>,
758 message_id: &MessageId,
759 data: Scattered<u8>,
760 num_surbs: usize,
761 ns: &dyn NetworkStatus,
762 ) -> Result<RequestMetrics, PostErr> {
763 let fragment_blueprints = match fragment_blueprints(message_id, data, num_surbs) {
765 Some(fragment_blueprints)
766 if fragment_blueprints.len() <= self.config.max_fragments_per_message =>
767 fragment_blueprints,
768 _ => return Err(PostErr::TooManyFragments),
769 };
770
771 let session = post_session(&mut self.sessions, self.session_status, session_index)?;
773 session.authored_packet_queue.check_space(fragment_blueprints.len())?;
774
775 let mut rng = rand::thread_rng();
777 let request_builder =
778 RequestBuilder::new(&mut rng, &session.topology, ns, *destination_index)?;
779 let mut request_hops = 0;
780 let mut request_forwarding_delay = Delay::zero();
781 let mut reply_hops = 0;
782 let mut reply_forwarding_delay = Delay::zero();
783 for fragment_blueprint in fragment_blueprints {
784 let (packet, metrics) = request_builder.build_packet(
785 &mut rng,
786 |fragment, rng| {
787 fragment_blueprint.write_except_surbs(fragment);
788 for surb in fragment_blueprint.surbs(fragment) {
789 let (id, keys) =
791 self.surb_keystore.insert(rng, message_id, self.config.log_target);
792 let num_hops = self.config.num_hops;
793 let metrics = request_builder.build_surb(surb, keys, rng, &id, num_hops)?;
794 reply_hops = max(reply_hops, metrics.num_hops);
795 reply_forwarding_delay =
796 max(reply_forwarding_delay, metrics.forwarding_delay);
797 }
798 Ok(())
799 },
800 self.config.num_hops,
801 )?;
802 session.authored_packet_queue.push(packet);
803 request_hops = max(request_hops, metrics.num_hops);
804 request_forwarding_delay = max(request_forwarding_delay, metrics.forwarding_delay);
805 }
806
807 let metrics = RequestMetrics {
809 num_hops: request_hops + reply_hops,
810 per_hop_net_delay: self.config.per_hop_net_delay,
811 forwarding_delay: (request_forwarding_delay + reply_forwarding_delay)
812 .to_duration(self.config.mean_forwarding_delay),
813 authored_packet_queue_delay: estimate_authored_packet_queue_delay(
814 &self.config,
815 session,
816 ),
817 };
818
819 *destination_index = Some(request_builder.destination_index());
820 Ok(metrics)
821 }
822
823 pub fn post_reply(
826 &mut self,
827 surbs: &mut Vec<Surb>,
828 session_index: SessionIndex,
829 message_id: &MessageId,
830 data: Scattered<u8>,
831 ) -> Result<(), PostErr> {
832 let fragment_blueprints = match fragment_blueprints(message_id, data, 0) {
834 Some(fragment_blueprints)
835 if fragment_blueprints.len() <=
836 min(self.config.max_fragments_per_message, surbs.len()) =>
837 fragment_blueprints,
838 _ => return Err(PostErr::TooManyFragments),
839 };
840
841 let session = post_session(&mut self.sessions, self.session_status, session_index)?;
843 session.authored_packet_queue.check_space(fragment_blueprints.len())?;
844
845 for fragment_blueprint in fragment_blueprints {
847 let mut packet = default_boxed_array();
848 fragment_blueprint.write_except_surbs(mut_payload_data(&mut packet));
849 let mixnode_index = complete_reply_packet(
850 &mut packet,
851 &surbs.pop().expect("Checked number of SURBs above"),
852 )
853 .ok_or(PostErr::BadSurb)?;
854 let peer_id = session.topology.mixnode_index_to_peer_id(mixnode_index)?;
855 session.authored_packet_queue.push(AddressedPacket { peer_id, packet });
856 }
857
858 Ok(())
859 }
860
861 pub fn take_events(&mut self) -> Events {
863 let events = self.events;
864 self.events = Events::empty();
865 events
866 }
867}