1use super::{
22 api::ApiBackend,
23 config::{Config, SubstrateConfig},
24 error::RemoteErr,
25 extrinsic_queue::ExtrinsicQueue,
26 maybe_inf_delay::MaybeInfDelay,
27 packet_dispatcher::PacketDispatcher,
28 peer_id::to_core_peer_id,
29 request::{extrinsic_delay, Request, SUBMIT_EXTRINSIC},
30 sync_with_runtime::sync_with_runtime,
31};
32use bytes::Bytes;
33use codec::{Decode, DecodeAll, Encode};
34use futures::{
35 future::{pending, Either},
36 stream::FuturesUnordered,
37 FutureExt, StreamExt,
38};
39use log::{debug, error, trace, warn};
40use mixnet::{
41 core::{Events, Message, Mixnet, Packet},
42 reply_manager::{ReplyContext, ReplyManager},
43 request_manager::RequestManager,
44};
45use sc_client_api::{BlockchainEvents, HeaderBackend};
46use sc_network::{
47 service::traits::{NetworkService, NotificationEvent, ValidationResult},
48 NetworkPeers, NetworkStateInfo, NotificationService, ProtocolName,
49};
50use sc_transaction_pool_api::{
51 LocalTransactionPool, OffchainTransactionPoolFactory, TransactionPool,
52};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_consensus::SyncOracle;
55use sp_keystore::{KeystoreExt, KeystorePtr};
56use sp_mixnet::{runtime_api::MixnetApi, types::Mixnode};
57use sp_runtime::{
58 traits::{Block, Header},
59 transaction_validity::TransactionSource,
60 Saturating,
61};
62use std::{
63 sync::Arc,
64 time::{Duration, Instant},
65};
66
67const LOG_TARGET: &str = "mixnet";
68
69const MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS: u32 = 3;
70
71fn complete_submit_extrinsic<X>(
72 reply_manager: &mut ReplyManager,
73 reply_context: ReplyContext,
74 data: Result<(), RemoteErr>,
75 mixnet: &mut Mixnet<X>,
76) {
77 reply_manager.complete(reply_context, data.encode(), mixnet);
78}
79
80fn handle_packet<X, E: Decode>(
81 packet: &Packet,
82 mixnet: &mut Mixnet<X>,
83 request_manager: &mut RequestManager<Request>,
84 reply_manager: &mut ReplyManager,
85 extrinsic_queue: &mut ExtrinsicQueue<E>,
86 config: &SubstrateConfig,
87) {
88 match mixnet.handle_packet(packet) {
89 Some(Message::Request(message)) => {
90 let Some((reply_context, data)) = reply_manager.insert(message, mixnet) else { return };
91
92 match data.as_slice() {
93 [SUBMIT_EXTRINSIC, encoded_extrinsic @ ..] => {
94 if !extrinsic_queue.has_space() {
95 debug!(target: LOG_TARGET, "No space in extrinsic queue; dropping request");
96 reply_manager.abandon(reply_context);
98 return
99 }
100
101 let mut encoded_extrinsic = encoded_extrinsic;
103 let extrinsic = match E::decode_all(&mut encoded_extrinsic) {
104 Ok(extrinsic) => extrinsic,
105 Err(err) => {
106 complete_submit_extrinsic(
107 reply_manager,
108 reply_context,
109 Err(RemoteErr::Decode(format!("Bad extrinsic: {}", err))),
110 mixnet,
111 );
112 return
113 },
114 };
115
116 let deadline =
117 Instant::now() + extrinsic_delay(reply_context.message_id(), config);
118 extrinsic_queue.insert(deadline, extrinsic, reply_context);
119 },
120 _ => {
121 debug!(target: LOG_TARGET, "Unrecognised request; discarding");
122 reply_manager.abandon(reply_context);
125 },
126 }
127 },
128 Some(Message::Reply(message)) => {
129 let Some(request) = request_manager.remove(&message.request_id) else {
130 trace!(
131 target: LOG_TARGET,
132 "Received reply to already-completed request with message ID {:x?}",
133 message.request_id
134 );
135 return
136 };
137 request.send_reply(&message.data);
138 },
139 None => (),
140 }
141}
142
143fn time_until(instant: Instant) -> Duration {
144 instant.saturating_duration_since(Instant::now())
145}
146
147pub async fn run<B, C, S, P>(
150 config: Config,
151 mut api_backend: ApiBackend,
152 client: Arc<C>,
153 sync: Arc<S>,
154 network: Arc<dyn NetworkService>,
155 protocol_name: ProtocolName,
156 transaction_pool: Arc<P>,
157 keystore: Option<KeystorePtr>,
158 mut notification_service: Box<dyn NotificationService>,
159) where
160 B: Block,
161 C: BlockchainEvents<B> + ProvideRuntimeApi<B> + HeaderBackend<B>,
162 C::Api: MixnetApi<B>,
163 S: SyncOracle,
164 P: TransactionPool<Block = B> + LocalTransactionPool<Block = B> + 'static,
165{
166 let local_peer_id = network.local_peer_id();
167 let Some(local_peer_id) = to_core_peer_id(&local_peer_id) else {
168 error!(target: LOG_TARGET,
169 "Failed to convert libp2p local peer ID {local_peer_id} to mixnet peer ID; \
170 mixnet not running");
171 return
172 };
173
174 let offchain_transaction_pool_factory =
175 OffchainTransactionPoolFactory::new(transaction_pool.clone());
176
177 let mut mixnet = Mixnet::new(config.core);
178 let mut min_register_block = 0u32.into();
181 let mut packet_dispatcher = PacketDispatcher::new(&local_peer_id);
182 let mut request_manager = RequestManager::new(config.request_manager);
183 let mut reply_manager = ReplyManager::new(config.reply_manager);
184 let mut extrinsic_queue = ExtrinsicQueue::new(config.substrate.extrinsic_queue_capacity);
185
186 let mut finality_notifications = client.finality_notification_stream();
187 let mut import_notifications = if config.substrate.register && keystore.is_some() {
189 Some(client.import_notification_stream())
190 } else {
191 None
192 };
193 let mut next_forward_packet_delay = MaybeInfDelay::new(None);
194 let mut next_authored_packet_delay = MaybeInfDelay::new(None);
195 let mut ready_peers = FuturesUnordered::new();
196 let mut next_retry_delay = MaybeInfDelay::new(None);
197 let mut next_extrinsic_delay = MaybeInfDelay::new(None);
198 let mut submit_extrinsic_results = FuturesUnordered::new();
199
200 loop {
201 let mut next_request = if request_manager.has_space() {
202 Either::Left(api_backend.request_receiver.select_next_some())
203 } else {
204 Either::Right(pending())
205 };
206
207 let mut next_import_notification = import_notifications.as_mut().map_or_else(
208 || Either::Right(pending()),
209 |notifications| Either::Left(notifications.select_next_some()),
210 );
211
212 futures::select! {
213 request = next_request =>
214 request_manager.insert(request, &mut mixnet, &packet_dispatcher, &config.substrate),
215
216 notification = finality_notifications.select_next_some() => {
217 if !sync.is_offline() && !sync.is_major_syncing() {
220 let api = client.runtime_api();
221 sync_with_runtime(&mut mixnet, api, notification.hash);
222 request_manager.update_session_status(
223 &mut mixnet, &packet_dispatcher, &config.substrate);
224 }
225 }
226
227 notification = next_import_notification => {
228 if notification.is_new_best && (*notification.header.number() >= min_register_block) {
229 let mut api = client.runtime_api();
230 api.register_extension(KeystoreExt(keystore.clone().expect(
231 "Import notification stream only setup if we have a keystore")));
232 api.register_extension(offchain_transaction_pool_factory
233 .offchain_transaction_pool(notification.hash));
234 let session_index = mixnet.session_status().current_index;
235 let mixnode = Mixnode {
236 kx_public: *mixnet.next_kx_public(),
237 peer_id: local_peer_id,
238 external_addresses: network.external_addresses().into_iter()
239 .map(|addr| addr.to_string().into_bytes()).collect(),
240 };
241 match api.maybe_register(notification.hash, session_index, mixnode) {
242 Ok(true) => min_register_block = notification.header.number().saturating_add(
243 MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS.into()),
244 Ok(false) => (),
245 Err(err) => debug!(target: LOG_TARGET,
246 "Error trying to register for the next session: {err}"),
247 }
248 }
249 }
250
251 event = notification_service.next_event().fuse() => match event {
252 None => todo!(),
253 Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
254 let _ = result_tx.send(ValidationResult::Accept);
255 },
256 Some(NotificationEvent::NotificationStreamOpened { peer, .. }) => {
257 packet_dispatcher.add_peer(&peer);
258 },
259 Some(NotificationEvent::NotificationStreamClosed { peer }) => {
260 packet_dispatcher.remove_peer(&peer);
261 },
262 Some(NotificationEvent::NotificationReceived { peer, notification }) => {
263 let notification: Bytes = notification.into();
264
265 match notification.as_ref().try_into() {
266 Ok(packet) => handle_packet(packet,
267 &mut mixnet, &mut request_manager, &mut reply_manager,
268 &mut extrinsic_queue, &config.substrate),
269 Err(_) => debug!(target: LOG_TARGET,
270 "Dropped incorrectly sized packet ({} bytes) from {peer}",
271 notification.len(),
272 ),
273 }
274 },
275 },
276
277 _ = next_forward_packet_delay => {
278 if let Some(packet) = mixnet.pop_next_forward_packet() {
279 if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
280 if let Some(fut) = ready_peer.send_packet(¬ification_service) {
281 ready_peers.push(fut);
282 }
283 }
284 } else {
285 warn!(target: LOG_TARGET,
286 "Next forward packet deadline reached, but no packet in queue; \
287 this is a bug");
288 }
289 }
290
291 _ = next_authored_packet_delay => {
292 if let Some(packet) = mixnet.pop_next_authored_packet(&packet_dispatcher) {
293 if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
294 if let Some(fut) = ready_peer.send_packet(¬ification_service) {
295 ready_peers.push(fut);
296 }
297 }
298 }
299 }
300
301 ready_peer = ready_peers.select_next_some() => {
302 if let Some(ready_peer) = ready_peer {
303 if let Some(fut) = ready_peer.send_packet(¬ification_service) {
304 ready_peers.push(fut);
305 }
306 }
307 }
308
309 _ = next_retry_delay => {
310 if !request_manager.pop_next_retry(&mut mixnet, &packet_dispatcher, &config.substrate) {
311 warn!(target: LOG_TARGET,
312 "Next retry deadline reached, but no request in retry queue; \
313 this is a bug");
314 }
315 }
316
317 _ = next_extrinsic_delay => {
318 if let Some((extrinsic, reply_context)) = extrinsic_queue.pop() {
319 if submit_extrinsic_results.len() < config.substrate.max_pending_extrinsics {
320 let fut = transaction_pool.submit_one(
321 client.info().best_hash,
322 TransactionSource::External,
323 extrinsic);
324 submit_extrinsic_results.push(async move {
325 (fut.await, reply_context)
326 });
327 } else {
328 debug!(target: LOG_TARGET,
331 "Too many pending extrinsics; dropped submit extrinsic request");
332 reply_manager.abandon(reply_context);
333 }
334 } else {
335 warn!(target: LOG_TARGET,
336 "Next extrinsic deadline reached, but no extrinsic in queue; \
337 this is a bug");
338 }
339 }
340
341 res_reply_context = submit_extrinsic_results.select_next_some() => {
342 let (res, reply_context) = res_reply_context;
343 let res = match res {
344 Ok(_) => Ok(()),
345 Err(err) => Err(RemoteErr::Other(err.to_string())),
346 };
347 complete_submit_extrinsic(&mut reply_manager, reply_context, res, &mut mixnet);
348 }
349 }
350
351 let events = mixnet.take_events();
352 if !events.is_empty() {
353 if events.contains(Events::RESERVED_PEERS_CHANGED) {
354 let reserved_peer_addrs = mixnet
355 .reserved_peers()
356 .flat_map(|mixnode| mixnode.extra.iter()) .cloned()
358 .collect();
359 if let Err(err) =
360 network.set_reserved_peers(protocol_name.clone(), reserved_peer_addrs)
361 {
362 debug!(target: LOG_TARGET, "Setting reserved peers failed: {err}");
363 }
364 }
365 if events.contains(Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED) {
366 next_forward_packet_delay
367 .reset(mixnet.next_forward_packet_deadline().map(time_until));
368 }
369 if events.contains(Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED) {
370 next_authored_packet_delay.reset(mixnet.next_authored_packet_delay());
371 }
372 if events.contains(Events::SPACE_IN_AUTHORED_PACKET_QUEUE) {
373 request_manager.process_post_queues(
376 &mut mixnet,
377 &packet_dispatcher,
378 &config.substrate,
379 );
380 }
381 }
382
383 if request_manager.next_retry_deadline_changed() {
384 next_retry_delay.reset(request_manager.next_retry_deadline().map(time_until));
385 }
386
387 if extrinsic_queue.next_deadline_changed() {
388 next_extrinsic_delay.reset(extrinsic_queue.next_deadline().map(time_until));
389 }
390 }
391}