use super::{
api::ApiBackend,
config::{Config, SubstrateConfig},
error::RemoteErr,
extrinsic_queue::ExtrinsicQueue,
maybe_inf_delay::MaybeInfDelay,
packet_dispatcher::PacketDispatcher,
peer_id::to_core_peer_id,
request::{extrinsic_delay, Request, SUBMIT_EXTRINSIC},
sync_with_runtime::sync_with_runtime,
};
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{
future::{pending, Either},
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use log::{debug, error, trace, warn};
use mixnet::{
core::{Events, Message, Mixnet, Packet},
reply_manager::{ReplyContext, ReplyManager},
request_manager::RequestManager,
};
use sc_client_api::{BlockchainEvents, HeaderBackend};
use sc_network::{
service::traits::{NetworkService, NotificationEvent, ValidationResult},
NetworkPeers, NetworkStateInfo, NotificationService, ProtocolName,
};
use sc_transaction_pool_api::{
LocalTransactionPool, OffchainTransactionPoolFactory, TransactionPool,
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_consensus::SyncOracle;
use sp_keystore::{KeystoreExt, KeystorePtr};
use sp_mixnet::{runtime_api::MixnetApi, types::Mixnode};
use sp_runtime::{
traits::{Block, Header},
transaction_validity::TransactionSource,
Saturating,
};
use std::{
sync::Arc,
time::{Duration, Instant},
};
const LOG_TARGET: &str = "mixnet";
const MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS: u32 = 3;
fn complete_submit_extrinsic<X>(
reply_manager: &mut ReplyManager,
reply_context: ReplyContext,
data: Result<(), RemoteErr>,
mixnet: &mut Mixnet<X>,
) {
reply_manager.complete(reply_context, data.encode(), mixnet);
}
fn handle_packet<X, E: Decode>(
packet: &Packet,
mixnet: &mut Mixnet<X>,
request_manager: &mut RequestManager<Request>,
reply_manager: &mut ReplyManager,
extrinsic_queue: &mut ExtrinsicQueue<E>,
config: &SubstrateConfig,
) {
match mixnet.handle_packet(packet) {
Some(Message::Request(message)) => {
let Some((reply_context, data)) = reply_manager.insert(message, mixnet) else { return };
match data.as_slice() {
[SUBMIT_EXTRINSIC, encoded_extrinsic @ ..] => {
if !extrinsic_queue.has_space() {
debug!(target: LOG_TARGET, "No space in extrinsic queue; dropping request");
reply_manager.abandon(reply_context);
return
}
let mut encoded_extrinsic = encoded_extrinsic;
let extrinsic = match E::decode_all(&mut encoded_extrinsic) {
Ok(extrinsic) => extrinsic,
Err(err) => {
complete_submit_extrinsic(
reply_manager,
reply_context,
Err(RemoteErr::Decode(format!("Bad extrinsic: {}", err))),
mixnet,
);
return
},
};
let deadline =
Instant::now() + extrinsic_delay(reply_context.message_id(), config);
extrinsic_queue.insert(deadline, extrinsic, reply_context);
},
_ => {
debug!(target: LOG_TARGET, "Unrecognised request; discarding");
reply_manager.abandon(reply_context);
},
}
},
Some(Message::Reply(message)) => {
let Some(request) = request_manager.remove(&message.request_id) else {
trace!(
target: LOG_TARGET,
"Received reply to already-completed request with message ID {:x?}",
message.request_id
);
return
};
request.send_reply(&message.data);
},
None => (),
}
}
fn time_until(instant: Instant) -> Duration {
instant.saturating_duration_since(Instant::now())
}
pub async fn run<B, C, S, P>(
config: Config,
mut api_backend: ApiBackend,
client: Arc<C>,
sync: Arc<S>,
network: Arc<dyn NetworkService>,
protocol_name: ProtocolName,
transaction_pool: Arc<P>,
keystore: Option<KeystorePtr>,
mut notification_service: Box<dyn NotificationService>,
) where
B: Block,
C: BlockchainEvents<B> + ProvideRuntimeApi<B> + HeaderBackend<B>,
C::Api: MixnetApi<B>,
S: SyncOracle,
P: TransactionPool<Block = B> + LocalTransactionPool<Block = B> + 'static,
{
let local_peer_id = network.local_peer_id();
let Some(local_peer_id) = to_core_peer_id(&local_peer_id) else {
error!(target: LOG_TARGET,
"Failed to convert libp2p local peer ID {local_peer_id} to mixnet peer ID; \
mixnet not running");
return
};
let offchain_transaction_pool_factory =
OffchainTransactionPoolFactory::new(transaction_pool.clone());
let mut mixnet = Mixnet::new(config.core);
let mut min_register_block = 0u32.into();
let mut packet_dispatcher = PacketDispatcher::new(&local_peer_id);
let mut request_manager = RequestManager::new(config.request_manager);
let mut reply_manager = ReplyManager::new(config.reply_manager);
let mut extrinsic_queue = ExtrinsicQueue::new(config.substrate.extrinsic_queue_capacity);
let mut finality_notifications = client.finality_notification_stream();
let mut import_notifications = if config.substrate.register && keystore.is_some() {
Some(client.import_notification_stream())
} else {
None
};
let mut next_forward_packet_delay = MaybeInfDelay::new(None);
let mut next_authored_packet_delay = MaybeInfDelay::new(None);
let mut ready_peers = FuturesUnordered::new();
let mut next_retry_delay = MaybeInfDelay::new(None);
let mut next_extrinsic_delay = MaybeInfDelay::new(None);
let mut submit_extrinsic_results = FuturesUnordered::new();
loop {
let mut next_request = if request_manager.has_space() {
Either::Left(api_backend.request_receiver.select_next_some())
} else {
Either::Right(pending())
};
let mut next_import_notification = import_notifications.as_mut().map_or_else(
|| Either::Right(pending()),
|notifications| Either::Left(notifications.select_next_some()),
);
futures::select! {
request = next_request =>
request_manager.insert(request, &mut mixnet, &packet_dispatcher, &config.substrate),
notification = finality_notifications.select_next_some() => {
if !sync.is_offline() && !sync.is_major_syncing() {
let api = client.runtime_api();
sync_with_runtime(&mut mixnet, api, notification.hash);
request_manager.update_session_status(
&mut mixnet, &packet_dispatcher, &config.substrate);
}
}
notification = next_import_notification => {
if notification.is_new_best && (*notification.header.number() >= min_register_block) {
let mut api = client.runtime_api();
api.register_extension(KeystoreExt(keystore.clone().expect(
"Import notification stream only setup if we have a keystore")));
api.register_extension(offchain_transaction_pool_factory
.offchain_transaction_pool(notification.hash));
let session_index = mixnet.session_status().current_index;
let mixnode = Mixnode {
kx_public: *mixnet.next_kx_public(),
peer_id: local_peer_id,
external_addresses: network.external_addresses().into_iter()
.map(|addr| addr.to_string().into_bytes()).collect(),
};
match api.maybe_register(notification.hash, session_index, mixnode) {
Ok(true) => min_register_block = notification.header.number().saturating_add(
MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS.into()),
Ok(false) => (),
Err(err) => debug!(target: LOG_TARGET,
"Error trying to register for the next session: {err}"),
}
}
}
event = notification_service.next_event().fuse() => match event {
None => todo!(),
Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
let _ = result_tx.send(ValidationResult::Accept);
},
Some(NotificationEvent::NotificationStreamOpened { peer, .. }) => {
packet_dispatcher.add_peer(&peer);
},
Some(NotificationEvent::NotificationStreamClosed { peer }) => {
packet_dispatcher.remove_peer(&peer);
},
Some(NotificationEvent::NotificationReceived { peer, notification }) => {
let notification: Bytes = notification.into();
match notification.as_ref().try_into() {
Ok(packet) => handle_packet(packet,
&mut mixnet, &mut request_manager, &mut reply_manager,
&mut extrinsic_queue, &config.substrate),
Err(_) => debug!(target: LOG_TARGET,
"Dropped incorrectly sized packet ({} bytes) from {peer}",
notification.len(),
),
}
},
},
_ = next_forward_packet_delay => {
if let Some(packet) = mixnet.pop_next_forward_packet() {
if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
if let Some(fut) = ready_peer.send_packet(¬ification_service) {
ready_peers.push(fut);
}
}
} else {
warn!(target: LOG_TARGET,
"Next forward packet deadline reached, but no packet in queue; \
this is a bug");
}
}
_ = next_authored_packet_delay => {
if let Some(packet) = mixnet.pop_next_authored_packet(&packet_dispatcher) {
if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
if let Some(fut) = ready_peer.send_packet(¬ification_service) {
ready_peers.push(fut);
}
}
}
}
ready_peer = ready_peers.select_next_some() => {
if let Some(ready_peer) = ready_peer {
if let Some(fut) = ready_peer.send_packet(¬ification_service) {
ready_peers.push(fut);
}
}
}
_ = next_retry_delay => {
if !request_manager.pop_next_retry(&mut mixnet, &packet_dispatcher, &config.substrate) {
warn!(target: LOG_TARGET,
"Next retry deadline reached, but no request in retry queue; \
this is a bug");
}
}
_ = next_extrinsic_delay => {
if let Some((extrinsic, reply_context)) = extrinsic_queue.pop() {
if submit_extrinsic_results.len() < config.substrate.max_pending_extrinsics {
let fut = transaction_pool.submit_one(
client.info().best_hash,
TransactionSource::External,
extrinsic);
submit_extrinsic_results.push(async move {
(fut.await, reply_context)
});
} else {
debug!(target: LOG_TARGET,
"Too many pending extrinsics; dropped submit extrinsic request");
reply_manager.abandon(reply_context);
}
} else {
warn!(target: LOG_TARGET,
"Next extrinsic deadline reached, but no extrinsic in queue; \
this is a bug");
}
}
res_reply_context = submit_extrinsic_results.select_next_some() => {
let (res, reply_context) = res_reply_context;
let res = match res {
Ok(_) => Ok(()),
Err(err) => Err(RemoteErr::Other(err.to_string())),
};
complete_submit_extrinsic(&mut reply_manager, reply_context, res, &mut mixnet);
}
}
let events = mixnet.take_events();
if !events.is_empty() {
if events.contains(Events::RESERVED_PEERS_CHANGED) {
let reserved_peer_addrs = mixnet
.reserved_peers()
.flat_map(|mixnode| mixnode.extra.iter()) .cloned()
.collect();
if let Err(err) =
network.set_reserved_peers(protocol_name.clone(), reserved_peer_addrs)
{
debug!(target: LOG_TARGET, "Setting reserved peers failed: {err}");
}
}
if events.contains(Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED) {
next_forward_packet_delay
.reset(mixnet.next_forward_packet_deadline().map(time_until));
}
if events.contains(Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED) {
next_authored_packet_delay.reset(mixnet.next_authored_packet_delay());
}
if events.contains(Events::SPACE_IN_AUTHORED_PACKET_QUEUE) {
request_manager.process_post_queues(
&mut mixnet,
&packet_dispatcher,
&config.substrate,
);
}
}
if request_manager.next_retry_deadline_changed() {
next_retry_delay.reset(request_manager.next_retry_deadline().map(time_until));
}
if extrinsic_queue.next_deadline_changed() {
next_extrinsic_delay.reset(extrinsic_queue.next_deadline().map(time_until));
}
}
}