1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Top-level mixnet service function.

use super::{
	api::ApiBackend,
	config::{Config, SubstrateConfig},
	error::RemoteErr,
	extrinsic_queue::ExtrinsicQueue,
	maybe_inf_delay::MaybeInfDelay,
	packet_dispatcher::PacketDispatcher,
	peer_id::to_core_peer_id,
	request::{extrinsic_delay, Request, SUBMIT_EXTRINSIC},
	sync_with_runtime::sync_with_runtime,
};
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{
	future::{pending, Either},
	stream::FuturesUnordered,
	FutureExt, StreamExt,
};
use log::{debug, error, trace, warn};
use mixnet::{
	core::{Events, Message, Mixnet, Packet},
	reply_manager::{ReplyContext, ReplyManager},
	request_manager::RequestManager,
};
use sc_client_api::{BlockchainEvents, HeaderBackend};
use sc_network::{
	service::traits::{NetworkService, NotificationEvent, ValidationResult},
	NetworkPeers, NetworkStateInfo, NotificationService, ProtocolName,
};
use sc_transaction_pool_api::{
	LocalTransactionPool, OffchainTransactionPoolFactory, TransactionPool,
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_consensus::SyncOracle;
use sp_keystore::{KeystoreExt, KeystorePtr};
use sp_mixnet::{runtime_api::MixnetApi, types::Mixnode};
use sp_runtime::{
	traits::{Block, Header},
	transaction_validity::TransactionSource,
	Saturating,
};
use std::{
	sync::Arc,
	time::{Duration, Instant},
};

const LOG_TARGET: &str = "mixnet";

const MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS: u32 = 3;

fn complete_submit_extrinsic<X>(
	reply_manager: &mut ReplyManager,
	reply_context: ReplyContext,
	data: Result<(), RemoteErr>,
	mixnet: &mut Mixnet<X>,
) {
	reply_manager.complete(reply_context, data.encode(), mixnet);
}

fn handle_packet<X, E: Decode>(
	packet: &Packet,
	mixnet: &mut Mixnet<X>,
	request_manager: &mut RequestManager<Request>,
	reply_manager: &mut ReplyManager,
	extrinsic_queue: &mut ExtrinsicQueue<E>,
	config: &SubstrateConfig,
) {
	match mixnet.handle_packet(packet) {
		Some(Message::Request(message)) => {
			let Some((reply_context, data)) = reply_manager.insert(message, mixnet) else { return };

			match data.as_slice() {
				[SUBMIT_EXTRINSIC, encoded_extrinsic @ ..] => {
					if !extrinsic_queue.has_space() {
						debug!(target: LOG_TARGET, "No space in extrinsic queue; dropping request");
						// We don't send a reply in this case; we want the requester to retry
						reply_manager.abandon(reply_context);
						return
					}

					// Decode the extrinsic
					let mut encoded_extrinsic = encoded_extrinsic;
					let extrinsic = match E::decode_all(&mut encoded_extrinsic) {
						Ok(extrinsic) => extrinsic,
						Err(err) => {
							complete_submit_extrinsic(
								reply_manager,
								reply_context,
								Err(RemoteErr::Decode(format!("Bad extrinsic: {}", err))),
								mixnet,
							);
							return
						},
					};

					let deadline =
						Instant::now() + extrinsic_delay(reply_context.message_id(), config);
					extrinsic_queue.insert(deadline, extrinsic, reply_context);
				},
				_ => {
					debug!(target: LOG_TARGET, "Unrecognised request; discarding");
					// To keep things simple we don't bother sending a reply in this case. The
					// requester will give up and try another mixnode eventually.
					reply_manager.abandon(reply_context);
				},
			}
		},
		Some(Message::Reply(message)) => {
			let Some(request) = request_manager.remove(&message.request_id) else {
				trace!(
					target: LOG_TARGET,
					"Received reply to already-completed request with message ID {:x?}",
					message.request_id
				);
				return
			};
			request.send_reply(&message.data);
		},
		None => (),
	}
}

fn time_until(instant: Instant) -> Duration {
	instant.saturating_duration_since(Instant::now())
}

/// Run the mixnet service. If `keystore` is `None`, the service will not attempt to register the
/// local node as a mixnode, even if `config.register` is `true`.
pub async fn run<B, C, S, P>(
	config: Config,
	mut api_backend: ApiBackend,
	client: Arc<C>,
	sync: Arc<S>,
	network: Arc<dyn NetworkService>,
	protocol_name: ProtocolName,
	transaction_pool: Arc<P>,
	keystore: Option<KeystorePtr>,
	mut notification_service: Box<dyn NotificationService>,
) where
	B: Block,
	C: BlockchainEvents<B> + ProvideRuntimeApi<B> + HeaderBackend<B>,
	C::Api: MixnetApi<B>,
	S: SyncOracle,
	P: TransactionPool<Block = B> + LocalTransactionPool<Block = B> + 'static,
{
	let local_peer_id = network.local_peer_id();
	let Some(local_peer_id) = to_core_peer_id(&local_peer_id) else {
		error!(target: LOG_TARGET,
			"Failed to convert libp2p local peer ID {local_peer_id} to mixnet peer ID; \
			mixnet not running");
		return
	};

	let offchain_transaction_pool_factory =
		OffchainTransactionPoolFactory::new(transaction_pool.clone());

	let mut mixnet = Mixnet::new(config.core);
	// It would make sense to reset this to 0 when the session changes, but registrations aren't
	// allowed at the start of a session anyway, so it doesn't really matter
	let mut min_register_block = 0u32.into();
	let mut packet_dispatcher = PacketDispatcher::new(&local_peer_id);
	let mut request_manager = RequestManager::new(config.request_manager);
	let mut reply_manager = ReplyManager::new(config.reply_manager);
	let mut extrinsic_queue = ExtrinsicQueue::new(config.substrate.extrinsic_queue_capacity);

	let mut finality_notifications = client.finality_notification_stream();
	// Import notifications only used for triggering registration attempts
	let mut import_notifications = if config.substrate.register && keystore.is_some() {
		Some(client.import_notification_stream())
	} else {
		None
	};
	let mut next_forward_packet_delay = MaybeInfDelay::new(None);
	let mut next_authored_packet_delay = MaybeInfDelay::new(None);
	let mut ready_peers = FuturesUnordered::new();
	let mut next_retry_delay = MaybeInfDelay::new(None);
	let mut next_extrinsic_delay = MaybeInfDelay::new(None);
	let mut submit_extrinsic_results = FuturesUnordered::new();

	loop {
		let mut next_request = if request_manager.has_space() {
			Either::Left(api_backend.request_receiver.select_next_some())
		} else {
			Either::Right(pending())
		};

		let mut next_import_notification = import_notifications.as_mut().map_or_else(
			|| Either::Right(pending()),
			|notifications| Either::Left(notifications.select_next_some()),
		);

		futures::select! {
			request = next_request =>
				request_manager.insert(request, &mut mixnet, &packet_dispatcher, &config.substrate),

			notification = finality_notifications.select_next_some() => {
				// To avoid trying to connect to old mixnodes, ignore finality notifications while
				// offline or major syncing. This is a bit racy but should be good enough.
				if !sync.is_offline() && !sync.is_major_syncing() {
					let api = client.runtime_api();
					sync_with_runtime(&mut mixnet, api, notification.hash);
					request_manager.update_session_status(
						&mut mixnet, &packet_dispatcher, &config.substrate);
				}
			}

			notification = next_import_notification => {
				if notification.is_new_best && (*notification.header.number() >= min_register_block) {
					let mut api = client.runtime_api();
					api.register_extension(KeystoreExt(keystore.clone().expect(
						"Import notification stream only setup if we have a keystore")));
					api.register_extension(offchain_transaction_pool_factory
						.offchain_transaction_pool(notification.hash));
					let session_index = mixnet.session_status().current_index;
					let mixnode = Mixnode {
						kx_public: *mixnet.next_kx_public(),
						peer_id: local_peer_id,
						external_addresses: network.external_addresses().into_iter()
							.map(|addr| addr.to_string().into_bytes()).collect(),
					};
					match api.maybe_register(notification.hash, session_index, mixnode) {
						Ok(true) => min_register_block = notification.header.number().saturating_add(
							MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS.into()),
						Ok(false) => (),
						Err(err) => debug!(target: LOG_TARGET,
							"Error trying to register for the next session: {err}"),
					}
				}
			}

			event = notification_service.next_event().fuse() => match event {
				None => todo!(),
				Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
					let _ = result_tx.send(ValidationResult::Accept);
				},
				Some(NotificationEvent::NotificationStreamOpened { peer, .. }) => {
					packet_dispatcher.add_peer(&peer);
				},
				Some(NotificationEvent::NotificationStreamClosed { peer }) => {
					packet_dispatcher.remove_peer(&peer);
				},
				Some(NotificationEvent::NotificationReceived { peer, notification }) => {
					let notification: Bytes = notification.into();

					match notification.as_ref().try_into() {
						Ok(packet) => handle_packet(packet,
							&mut mixnet, &mut request_manager, &mut reply_manager,
							&mut extrinsic_queue, &config.substrate),
						Err(_) => debug!(target: LOG_TARGET,
							"Dropped incorrectly sized packet ({} bytes) from {peer}",
							notification.len(),
						),
					}
				},
			},

			_ = next_forward_packet_delay => {
				if let Some(packet) = mixnet.pop_next_forward_packet() {
					if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
						if let Some(fut) = ready_peer.send_packet(&notification_service) {
							ready_peers.push(fut);
						}
					}
				} else {
					warn!(target: LOG_TARGET,
						"Next forward packet deadline reached, but no packet in queue; \
						this is a bug");
				}
			}

			_ = next_authored_packet_delay => {
				if let Some(packet) = mixnet.pop_next_authored_packet(&packet_dispatcher) {
					if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
						if let Some(fut) = ready_peer.send_packet(&notification_service) {
							ready_peers.push(fut);
						}
					}
				}
			}

			ready_peer = ready_peers.select_next_some() => {
				if let Some(ready_peer) = ready_peer {
					if let Some(fut) = ready_peer.send_packet(&notification_service) {
						ready_peers.push(fut);
					}
				}
			}

			_ = next_retry_delay => {
				if !request_manager.pop_next_retry(&mut mixnet, &packet_dispatcher, &config.substrate) {
					warn!(target: LOG_TARGET,
						"Next retry deadline reached, but no request in retry queue; \
						this is a bug");
				}
			}

			_ = next_extrinsic_delay => {
				if let Some((extrinsic, reply_context)) = extrinsic_queue.pop() {
					if submit_extrinsic_results.len() < config.substrate.max_pending_extrinsics {
						let fut = transaction_pool.submit_one(
							client.info().best_hash,
							TransactionSource::External,
							extrinsic);
						submit_extrinsic_results.push(async move {
							(fut.await, reply_context)
						});
					} else {
						// There are already too many pending extrinsics, just drop this one. We
						// don't send a reply; we want the requester to retry.
						debug!(target: LOG_TARGET,
							"Too many pending extrinsics; dropped submit extrinsic request");
						reply_manager.abandon(reply_context);
					}
				} else {
					warn!(target: LOG_TARGET,
						"Next extrinsic deadline reached, but no extrinsic in queue; \
						this is a bug");
				}
			}

			res_reply_context = submit_extrinsic_results.select_next_some() => {
				let (res, reply_context) = res_reply_context;
				let res = match res {
					Ok(_) => Ok(()),
					Err(err) => Err(RemoteErr::Other(err.to_string())),
				};
				complete_submit_extrinsic(&mut reply_manager, reply_context, res, &mut mixnet);
			}
		}

		let events = mixnet.take_events();
		if !events.is_empty() {
			if events.contains(Events::RESERVED_PEERS_CHANGED) {
				let reserved_peer_addrs = mixnet
					.reserved_peers()
					.flat_map(|mixnode| mixnode.extra.iter()) // External addresses
					.cloned()
					.collect();
				if let Err(err) =
					network.set_reserved_peers(protocol_name.clone(), reserved_peer_addrs)
				{
					debug!(target: LOG_TARGET, "Setting reserved peers failed: {err}");
				}
			}
			if events.contains(Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED) {
				next_forward_packet_delay
					.reset(mixnet.next_forward_packet_deadline().map(time_until));
			}
			if events.contains(Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED) {
				next_authored_packet_delay.reset(mixnet.next_authored_packet_delay());
			}
			if events.contains(Events::SPACE_IN_AUTHORED_PACKET_QUEUE) {
				// Note this may cause the next retry deadline to change, but should not trigger
				// any mixnet events
				request_manager.process_post_queues(
					&mut mixnet,
					&packet_dispatcher,
					&config.substrate,
				);
			}
		}

		if request_manager.next_retry_deadline_changed() {
			next_retry_delay.reset(request_manager.next_retry_deadline().map(time_until));
		}

		if extrinsic_queue.next_deadline_changed() {
			next_extrinsic_delay.reset(extrinsic_queue.next_deadline().map(time_until));
		}
	}
}