#![allow(clippy::ptr_offset_with_cast)]
mod config;
mod cover;
mod fragment;
mod kx_pair;
mod packet_queues;
mod replay_filter;
mod request_builder;
mod scattered;
mod sessions;
mod sphinx;
mod surb_keystore;
mod topology;
mod util;
pub use self::{
config::{Config, SessionConfig},
fragment::{MessageId, MESSAGE_ID_SIZE},
packet_queues::AddressedPacket,
scattered::Scattered,
sessions::{RelSessionIndex, SessionIndex, SessionPhase, SessionStatus},
sphinx::{
Delay, KxPublic, KxSecret, MixnodeIndex, Packet, PeerId, RawMixnodeIndex, Surb,
KX_PUBLIC_SIZE, MAX_HOPS, MAX_MIXNODE_INDEX, PACKET_SIZE, PEER_ID_SIZE, SURB_SIZE,
},
topology::{Mixnode, NetworkStatus, TopologyErr},
};
use self::{
cover::{gen_cover_packet, CoverKind},
fragment::{fragment_blueprints, FragmentAssembler},
kx_pair::KxPair,
packet_queues::{AuthoredPacketQueue, CheckSpaceErr, ForwardPacketQueue},
replay_filter::ReplayFilter,
request_builder::RequestBuilder,
sessions::{Session, SessionSlot, Sessions},
sphinx::{
complete_reply_packet, decrypt_reply_payload, kx_public, mut_payload_data, peel, Action,
PeelErr, PAYLOAD_DATA_SIZE, PAYLOAD_SIZE,
},
surb_keystore::SurbKeystore,
topology::Topology,
util::default_boxed_array,
};
use arrayref::{array_mut_ref, array_ref};
use arrayvec::ArrayVec;
use bitflags::bitflags;
use either::Either;
use log::{debug, info, trace};
use rand::Rng;
use std::{
cmp::{max, min},
time::{Duration, Instant},
};
pub enum MixnodesErr {
Transient,
Permanent,
}
#[derive(Debug, PartialEq, Eq)]
pub struct RequestMessage {
pub session_index: SessionIndex,
pub id: MessageId,
pub data: Vec<u8>,
pub surbs: Vec<Surb>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ReplyMessage {
pub request_id: MessageId,
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq)]
pub enum Message {
Request(RequestMessage),
Reply(ReplyMessage),
}
#[derive(Debug, thiserror::Error)]
pub enum PostErr {
#[error("Message would need to be split into too many fragments")]
TooManyFragments,
#[error("Session {0} is no longer active")]
SessionNoLongerActive(SessionIndex),
#[error("Session {0} is not active yet")]
SessionNotActiveYet(SessionIndex),
#[error("Mixnodes not yet known for session {0}")]
SessionMixnodesNotKnown(SessionIndex),
#[error("Mixnet disabled for session {0}")]
SessionDisabled(SessionIndex),
#[error("There is not enough space in the authored packet queue")]
NotEnoughSpaceInQueue,
#[error("Topology error: {0}")]
Topology(#[from] TopologyErr),
#[error("Bad SURB")]
BadSurb,
}
fn post_session<X>(
sessions: &mut Sessions<X>,
status: SessionStatus,
index: SessionIndex,
) -> Result<&mut Session<X>, PostErr> {
let Some(rel_index) = RelSessionIndex::from_session_index(index, status.current_index) else {
return Err(if index < status.current_index {
PostErr::SessionNoLongerActive(index)
} else {
PostErr::SessionNotActiveYet(index)
})
};
if !status.phase.allow_requests_and_replies(rel_index) {
return Err(match rel_index {
RelSessionIndex::Prev => PostErr::SessionNoLongerActive(index),
RelSessionIndex::Current => PostErr::SessionNotActiveYet(index),
})
}
match &mut sessions[rel_index] {
SessionSlot::Empty | SessionSlot::KxPair(_) => Err(PostErr::SessionMixnodesNotKnown(index)),
SessionSlot::Disabled => Err(PostErr::SessionDisabled(index)),
SessionSlot::Full(session) => Ok(session),
}
}
impl From<CheckSpaceErr> for PostErr {
fn from(value: CheckSpaceErr) -> Self {
match value {
CheckSpaceErr::Capacity => PostErr::TooManyFragments,
CheckSpaceErr::Len => PostErr::NotEnoughSpaceInQueue,
}
}
}
fn estimate_authored_packet_queue_delay<X>(config: &Config, session: &Session<X>) -> Duration {
let rate_mul =
0.5 *
(1.0 - config.loop_cover_proportion);
let request_period = session.mean_authored_packet_period.div_f64(rate_mul);
let request_len = session.authored_packet_queue.len();
let reply_period = config.mixnode_session.mean_authored_packet_period.div_f64(rate_mul);
let reply_len = config.mixnode_session.authored_packet_queue.capacity; let (s, n, m, rs) = if request_period > reply_period {
(request_period, request_len, reply_len, reply_period)
} else {
(reply_period, reply_len, request_len, request_period)
};
let n = n as f64;
let m = m as f64;
let r = rs.as_secs_f64() / s.as_secs_f64();
s.mul_f64(4.92582 + (3.87809 * (n + (r * r * r * m)).sqrt()) + n + (r * m))
}
pub struct RequestMetrics {
pub num_hops: usize,
pub per_hop_net_delay: Duration,
pub forwarding_delay: Duration,
pub authored_packet_queue_delay: Duration,
}
impl RequestMetrics {
pub fn estimate_rtt(&self, handling_delay: Duration) -> Duration {
let net_delay = self.per_hop_net_delay * (self.num_hops as u32);
self.forwarding_delay + self.authored_packet_queue_delay + net_delay + handling_delay
}
}
bitflags! {
pub struct Events: u32 {
const RESERVED_PEERS_CHANGED = 0b1;
const NEXT_FORWARD_PACKET_DEADLINE_CHANGED = 0b10;
const NEXT_AUTHORED_PACKET_DEADLINE_CHANGED = 0b100;
const SPACE_IN_AUTHORED_PACKET_QUEUE = 0b1000;
}
}
pub struct Mixnet<X> {
config: Config,
session_status: SessionStatus,
sessions: Sessions<X>,
next_kx_pair: Option<KxPair>,
forward_packet_queue: ForwardPacketQueue,
surb_keystore: SurbKeystore,
fragment_assembler: FragmentAssembler,
events: Events,
}
impl<X> Mixnet<X> {
pub fn new(config: Config) -> Self {
let sessions = Sessions {
current: config
.session_0_kx_secret
.map_or(SessionSlot::Empty, |secret| SessionSlot::KxPair(secret.into())),
prev: SessionSlot::Disabled,
};
let forward_packet_queue = ForwardPacketQueue::new(config.forward_packet_queue_capacity);
let surb_keystore = SurbKeystore::new(config.surb_keystore_capacity);
let fragment_assembler = FragmentAssembler::new(
config.max_incomplete_messages,
config.max_incomplete_fragments,
config.max_fragments_per_message,
);
Self {
config,
session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
sessions,
next_kx_pair: None,
forward_packet_queue,
surb_keystore,
fragment_assembler,
events: Events::empty(),
}
}
pub fn session_status(&self) -> SessionStatus {
self.session_status
}
pub fn set_session_status(&mut self, session_status: SessionStatus) {
if self.session_status == session_status {
return
}
if self.session_status.current_index != session_status.current_index {
let next_session = std::mem::take(&mut self.next_kx_pair)
.map_or(SessionSlot::Empty, SessionSlot::KxPair);
match session_status.current_index.saturating_sub(self.session_status.current_index) {
1 =>
self.sessions.prev = std::mem::replace(&mut self.sessions.current, next_session),
2 => {
self.sessions.prev = next_session;
self.sessions.current = SessionSlot::Empty;
},
_ =>
if !self.sessions.is_empty() || !next_session.is_empty() {
debug!(
target: self.config.log_target,
"Unexpected session index {}; previous session index was {}",
session_status.current_index,
self.session_status.current_index
);
self.sessions =
Sessions { current: SessionSlot::Empty, prev: SessionSlot::Empty };
},
}
}
if !session_status.phase.need_prev() || (session_status.current_index == 0) {
self.sessions.prev = SessionSlot::Disabled;
}
self.events |=
Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
self.session_status = session_status;
info!(target: self.config.log_target, "Session status changed: {session_status}");
}
pub fn maybe_set_mixnodes(
&mut self,
rel_session_index: RelSessionIndex,
mixnodes: &mut dyn FnMut() -> Result<Vec<Mixnode<X>>, MixnodesErr>,
) {
let session = &mut self.sessions[rel_session_index];
if !matches!(session, SessionSlot::Empty | SessionSlot::KxPair(_)) {
return
}
let session_index = rel_session_index + self.session_status.current_index;
let mut rng = rand::thread_rng();
let mut mixnodes = match mixnodes() {
Ok(mixnodes) => mixnodes,
Err(err) => {
if matches!(err, MixnodesErr::Permanent) {
*session = SessionSlot::Disabled;
}
return
},
};
let max_mixnodes = (MAX_MIXNODE_INDEX + 1) as usize;
if mixnodes.len() > max_mixnodes {
debug!(
target: self.config.log_target,
"Session {session_index}: Too many mixnodes ({}, max {max_mixnodes}); ignoring excess",
mixnodes.len()
);
mixnodes.truncate(max_mixnodes);
}
let kx_pair = match std::mem::replace(session, SessionSlot::Empty) {
SessionSlot::KxPair(kx_pair) => kx_pair,
_ => KxPair::gen(&mut rng),
};
let topology =
Topology::new(&mut rng, mixnodes, kx_pair.public(), self.config.num_gateway_mixnodes);
let config = if topology.is_mixnode() {
&self.config.mixnode_session
} else {
match &self.config.non_mixnode_session {
Some(config) => config,
None => {
info!(target: self.config.log_target,
"Session {session_index}: Local node is not a mixnode; \
disabling mixnet as per configuration");
*session = SessionSlot::Disabled;
return
},
}
};
info!(target: self.config.log_target, "Session {session_index}: {topology}");
*session = SessionSlot::Full(Session {
kx_pair,
topology,
authored_packet_queue: AuthoredPacketQueue::new(config.authored_packet_queue),
mean_authored_packet_period: config.mean_authored_packet_period,
replay_filter: ReplayFilter::new(&mut rng),
});
self.events |=
Events::RESERVED_PEERS_CHANGED | Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
}
pub fn next_kx_public(&mut self) -> &KxPublic {
self.next_kx_pair
.get_or_insert_with(|| KxPair::gen(&mut rand::thread_rng()))
.public()
}
pub fn reserved_peers(&self) -> impl Iterator<Item = &Mixnode<X>> {
self.sessions.iter().flat_map(|session| session.topology.reserved_peers())
}
pub fn handle_packet(&mut self, packet: &Packet) -> Option<Message> {
let mut out = [0; PACKET_SIZE];
let res = self.sessions.enumerate_mut().find_map(|(rel_session_index, session)| {
let kx_shared_secret = session.kx_pair.exchange(kx_public(packet));
let replay_tag = session.replay_filter.tag(&kx_shared_secret);
if session.replay_filter.contains(replay_tag) {
return Some(Err(Either::Left("Packet found in replay filter")))
}
match peel(&mut out, packet, &kx_shared_secret) {
Err(PeelErr::Mac) => None,
Err(err) => Some(Err(Either::Right(err))),
Ok(action) => Some(Ok((action, rel_session_index, session, replay_tag))),
}
});
let (action, rel_session_index, session, replay_tag) = match res {
None => {
trace!(
target: self.config.log_target,
"Failed to peel packet; either bad MAC or unknown secret"
);
return None
},
Some(Err(err)) => {
debug!(target: self.config.log_target, "Failed to peel packet: {err}");
return None
},
Some(Ok(x)) => x,
};
match action {
Action::ForwardTo { target, delay } => {
if !session.topology.is_mixnode() {
debug!(target: self.config.log_target,
"Received packet to forward despite not being a mixnode in the session; discarding");
return None
}
if !self.forward_packet_queue.has_space() {
debug!(target: self.config.log_target, "Dropped forward packet; forward queue full");
return None
}
session.replay_filter.insert(replay_tag);
match session.topology.target_to_peer_id(&target) {
Ok(peer_id) => {
let deadline =
Instant::now() + delay.to_duration(self.config.mean_forwarding_delay);
let packet = AddressedPacket { peer_id, packet: out.into() };
if self.forward_packet_queue.insert(deadline, packet) {
self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
}
},
Err(err) => debug!(
target: self.config.log_target,
"Failed to map target {target:?} to peer ID: {err}"
),
}
None
},
Action::DeliverRequest => {
let payload_data = array_ref![out, 0, PAYLOAD_DATA_SIZE];
if !session.topology.is_mixnode() {
debug!(target: self.config.log_target,
"Received request packet despite not being a mixnode in the session; discarding");
return None
}
session.replay_filter.insert(replay_tag);
self.fragment_assembler.insert(payload_data, self.config.log_target).map(
|message| {
Message::Request(RequestMessage {
session_index: rel_session_index + self.session_status.current_index,
id: message.id,
data: message.data,
surbs: message.surbs,
})
},
)
},
Action::DeliverReply { surb_id } => {
let payload = array_mut_ref![out, 0, PAYLOAD_SIZE];
let Some(entry) = self.surb_keystore.entry(&surb_id) else {
debug!(target: self.config.log_target,
"Received reply with unrecognised SURB ID {surb_id:x?}; discarding");
return None
};
let request_id = *entry.message_id();
let res = decrypt_reply_payload(payload, entry.keys());
entry.remove();
if let Err(err) = res {
debug!(target: self.config.log_target, "Failed to decrypt reply payload: {err}");
return None
}
let payload_data = array_ref![payload, 0, PAYLOAD_DATA_SIZE];
self.fragment_assembler.insert(payload_data, self.config.log_target).map(
|message| {
if !message.surbs.is_empty() {
debug!(target: self.config.log_target,
"Reply message included SURBs; discarding them");
}
Message::Reply(ReplyMessage { request_id, data: message.data })
},
)
},
Action::DeliverCover { cover_id: _ } => None,
}
}
pub fn next_forward_packet_deadline(&self) -> Option<Instant> {
self.forward_packet_queue.next_deadline()
}
pub fn pop_next_forward_packet(&mut self) -> Option<AddressedPacket> {
self.events |= Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED;
self.forward_packet_queue.pop()
}
pub fn next_authored_packet_delay(&self) -> Option<Duration> {
let means: ArrayVec<_, 2> = self
.sessions
.iter()
.map(|session| session.mean_authored_packet_period.as_secs_f64())
.collect();
let mean = match means.into_inner() {
Ok(means) => (2.0 * means[0] * means[1]) / (means[0] + means[1]),
Err(mut means) => {
let mean = means.pop()?;
if self.session_status.phase.need_prev() {
2.0 * mean
} else {
mean
}
},
};
let delay: f64 = rand::thread_rng().sample(rand_distr::Exp1);
Some(Duration::from_secs_f64(delay.min(10.0) * mean))
}
pub fn pop_next_authored_packet(&mut self, ns: &dyn NetworkStatus) -> Option<AddressedPacket> {
let mut rng = rand::thread_rng();
let sessions: ArrayVec<_, 2> = self.sessions.enumerate_mut().collect();
let (rel_session_index, session) = match sessions.into_inner() {
Ok(sessions) => {
let periods = sessions
.iter()
.map(|(_, session)| session.mean_authored_packet_period.as_secs_f64())
.collect::<ArrayVec<_, 2>>()
.into_inner()
.expect("Input is array of length 2");
let [session_0, session_1] = sessions;
if rng.gen_bool(periods[1] / (periods[0] + periods[1])) {
session_0
} else {
session_1
}
},
Err(mut sessions) => sessions.pop()?,
};
self.events |= Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED;
let cover_kind = if rng.gen_bool(self.config.loop_cover_proportion) {
CoverKind::Loop
} else {
CoverKind::Drop
};
if (cover_kind == CoverKind::Drop) &&
self.session_status.phase.allow_requests_and_replies(rel_session_index)
{
let (packet, space) = session.authored_packet_queue.pop();
if space {
self.events |= Events::SPACE_IN_AUTHORED_PACKET_QUEUE;
}
if packet.is_some() {
return packet
}
}
if !self.config.gen_cover_packets {
return None
}
match gen_cover_packet(&mut rng, &session.topology, ns, cover_kind, self.config.num_hops) {
Ok(packet) => Some(packet),
Err(err) => {
if (self.session_status.phase == SessionPhase::CoverToCurrent) &&
(rel_session_index == RelSessionIndex::Current) &&
matches!(err, TopologyErr::NoConnectedGatewayMixnodes)
{
trace!(target: self.config.log_target, "Failed to generate cover packet: {err}");
} else {
debug!(target: self.config.log_target, "Failed to generate cover packet: {err}");
}
None
},
}
}
pub fn post_request(
&mut self,
session_index: SessionIndex,
destination_index: &mut Option<MixnodeIndex>,
message_id: &MessageId,
data: Scattered<u8>,
num_surbs: usize,
ns: &dyn NetworkStatus,
) -> Result<RequestMetrics, PostErr> {
let fragment_blueprints = match fragment_blueprints(message_id, data, num_surbs) {
Some(fragment_blueprints)
if fragment_blueprints.len() <= self.config.max_fragments_per_message =>
fragment_blueprints,
_ => return Err(PostErr::TooManyFragments),
};
let session = post_session(&mut self.sessions, self.session_status, session_index)?;
session.authored_packet_queue.check_space(fragment_blueprints.len())?;
let mut rng = rand::thread_rng();
let request_builder =
RequestBuilder::new(&mut rng, &session.topology, ns, *destination_index)?;
let mut request_hops = 0;
let mut request_forwarding_delay = Delay::zero();
let mut reply_hops = 0;
let mut reply_forwarding_delay = Delay::zero();
for fragment_blueprint in fragment_blueprints {
let (packet, metrics) = request_builder.build_packet(
&mut rng,
|fragment, rng| {
fragment_blueprint.write_except_surbs(fragment);
for surb in fragment_blueprint.surbs(fragment) {
let (id, keys) =
self.surb_keystore.insert(rng, message_id, self.config.log_target);
let num_hops = self.config.num_hops;
let metrics = request_builder.build_surb(surb, keys, rng, &id, num_hops)?;
reply_hops = max(reply_hops, metrics.num_hops);
reply_forwarding_delay =
max(reply_forwarding_delay, metrics.forwarding_delay);
}
Ok(())
},
self.config.num_hops,
)?;
session.authored_packet_queue.push(packet);
request_hops = max(request_hops, metrics.num_hops);
request_forwarding_delay = max(request_forwarding_delay, metrics.forwarding_delay);
}
let metrics = RequestMetrics {
num_hops: request_hops + reply_hops,
per_hop_net_delay: self.config.per_hop_net_delay,
forwarding_delay: (request_forwarding_delay + reply_forwarding_delay)
.to_duration(self.config.mean_forwarding_delay),
authored_packet_queue_delay: estimate_authored_packet_queue_delay(
&self.config,
session,
),
};
*destination_index = Some(request_builder.destination_index());
Ok(metrics)
}
pub fn post_reply(
&mut self,
surbs: &mut Vec<Surb>,
session_index: SessionIndex,
message_id: &MessageId,
data: Scattered<u8>,
) -> Result<(), PostErr> {
let fragment_blueprints = match fragment_blueprints(message_id, data, 0) {
Some(fragment_blueprints)
if fragment_blueprints.len() <=
min(self.config.max_fragments_per_message, surbs.len()) =>
fragment_blueprints,
_ => return Err(PostErr::TooManyFragments),
};
let session = post_session(&mut self.sessions, self.session_status, session_index)?;
session.authored_packet_queue.check_space(fragment_blueprints.len())?;
for fragment_blueprint in fragment_blueprints {
let mut packet = default_boxed_array();
fragment_blueprint.write_except_surbs(mut_payload_data(&mut packet));
let mixnode_index = complete_reply_packet(
&mut packet,
&surbs.pop().expect("Checked number of SURBs above"),
)
.ok_or(PostErr::BadSurb)?;
let peer_id = session.topology.mixnode_index_to_peer_id(mixnode_index)?;
session.authored_packet_queue.push(AddressedPacket { peer_id, packet });
}
Ok(())
}
pub fn take_events(&mut self) -> Events {
let events = self.events;
self.events = Events::empty();
events
}
}