referrerpolicy=no-referrer-when-downgrade

sc_mixnet/
run.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Top-level mixnet service function.
20
21use super::{
22	api::ApiBackend,
23	config::{Config, SubstrateConfig},
24	error::RemoteErr,
25	extrinsic_queue::ExtrinsicQueue,
26	maybe_inf_delay::MaybeInfDelay,
27	packet_dispatcher::PacketDispatcher,
28	peer_id::to_core_peer_id,
29	request::{extrinsic_delay, Request, SUBMIT_EXTRINSIC},
30	sync_with_runtime::sync_with_runtime,
31};
32use bytes::Bytes;
33use codec::{Decode, DecodeAll, Encode};
34use futures::{
35	future::{pending, Either},
36	stream::FuturesUnordered,
37	FutureExt, StreamExt,
38};
39use log::{debug, error, trace, warn};
40use mixnet::{
41	core::{Events, Message, Mixnet, Packet},
42	reply_manager::{ReplyContext, ReplyManager},
43	request_manager::RequestManager,
44};
45use sc_client_api::{BlockchainEvents, HeaderBackend};
46use sc_network::{
47	service::traits::{NetworkService, NotificationEvent, ValidationResult},
48	NetworkPeers, NetworkStateInfo, NotificationService, ProtocolName,
49};
50use sc_transaction_pool_api::{
51	LocalTransactionPool, OffchainTransactionPoolFactory, TransactionPool,
52};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_consensus::SyncOracle;
55use sp_keystore::{KeystoreExt, KeystorePtr};
56use sp_mixnet::{runtime_api::MixnetApi, types::Mixnode};
57use sp_runtime::{
58	traits::{Block, Header},
59	transaction_validity::TransactionSource,
60	Saturating,
61};
62use std::{
63	sync::Arc,
64	time::{Duration, Instant},
65};
66
67const LOG_TARGET: &str = "mixnet";
68
69const MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS: u32 = 3;
70
71fn complete_submit_extrinsic<X>(
72	reply_manager: &mut ReplyManager,
73	reply_context: ReplyContext,
74	data: Result<(), RemoteErr>,
75	mixnet: &mut Mixnet<X>,
76) {
77	reply_manager.complete(reply_context, data.encode(), mixnet);
78}
79
80fn handle_packet<X, E: Decode>(
81	packet: &Packet,
82	mixnet: &mut Mixnet<X>,
83	request_manager: &mut RequestManager<Request>,
84	reply_manager: &mut ReplyManager,
85	extrinsic_queue: &mut ExtrinsicQueue<E>,
86	config: &SubstrateConfig,
87) {
88	match mixnet.handle_packet(packet) {
89		Some(Message::Request(message)) => {
90			let Some((reply_context, data)) = reply_manager.insert(message, mixnet) else { return };
91
92			match data.as_slice() {
93				[SUBMIT_EXTRINSIC, encoded_extrinsic @ ..] => {
94					if !extrinsic_queue.has_space() {
95						debug!(target: LOG_TARGET, "No space in extrinsic queue; dropping request");
96						// We don't send a reply in this case; we want the requester to retry
97						reply_manager.abandon(reply_context);
98						return
99					}
100
101					// Decode the extrinsic
102					let mut encoded_extrinsic = encoded_extrinsic;
103					let extrinsic = match E::decode_all(&mut encoded_extrinsic) {
104						Ok(extrinsic) => extrinsic,
105						Err(err) => {
106							complete_submit_extrinsic(
107								reply_manager,
108								reply_context,
109								Err(RemoteErr::Decode(format!("Bad extrinsic: {}", err))),
110								mixnet,
111							);
112							return
113						},
114					};
115
116					let deadline =
117						Instant::now() + extrinsic_delay(reply_context.message_id(), config);
118					extrinsic_queue.insert(deadline, extrinsic, reply_context);
119				},
120				_ => {
121					debug!(target: LOG_TARGET, "Unrecognised request; discarding");
122					// To keep things simple we don't bother sending a reply in this case. The
123					// requester will give up and try another mixnode eventually.
124					reply_manager.abandon(reply_context);
125				},
126			}
127		},
128		Some(Message::Reply(message)) => {
129			let Some(request) = request_manager.remove(&message.request_id) else {
130				trace!(
131					target: LOG_TARGET,
132					"Received reply to already-completed request with message ID {:x?}",
133					message.request_id
134				);
135				return
136			};
137			request.send_reply(&message.data);
138		},
139		None => (),
140	}
141}
142
143fn time_until(instant: Instant) -> Duration {
144	instant.saturating_duration_since(Instant::now())
145}
146
147/// Run the mixnet service. If `keystore` is `None`, the service will not attempt to register the
148/// local node as a mixnode, even if `config.register` is `true`.
149pub async fn run<B, C, S, P>(
150	config: Config,
151	mut api_backend: ApiBackend,
152	client: Arc<C>,
153	sync: Arc<S>,
154	network: Arc<dyn NetworkService>,
155	protocol_name: ProtocolName,
156	transaction_pool: Arc<P>,
157	keystore: Option<KeystorePtr>,
158	mut notification_service: Box<dyn NotificationService>,
159) where
160	B: Block,
161	C: BlockchainEvents<B> + ProvideRuntimeApi<B> + HeaderBackend<B>,
162	C::Api: MixnetApi<B>,
163	S: SyncOracle,
164	P: TransactionPool<Block = B> + LocalTransactionPool<Block = B> + 'static,
165{
166	let local_peer_id = network.local_peer_id();
167	let Some(local_peer_id) = to_core_peer_id(&local_peer_id) else {
168		error!(target: LOG_TARGET,
169			"Failed to convert libp2p local peer ID {local_peer_id} to mixnet peer ID; \
170			mixnet not running");
171		return
172	};
173
174	let offchain_transaction_pool_factory =
175		OffchainTransactionPoolFactory::new(transaction_pool.clone());
176
177	let mut mixnet = Mixnet::new(config.core);
178	// It would make sense to reset this to 0 when the session changes, but registrations aren't
179	// allowed at the start of a session anyway, so it doesn't really matter
180	let mut min_register_block = 0u32.into();
181	let mut packet_dispatcher = PacketDispatcher::new(&local_peer_id);
182	let mut request_manager = RequestManager::new(config.request_manager);
183	let mut reply_manager = ReplyManager::new(config.reply_manager);
184	let mut extrinsic_queue = ExtrinsicQueue::new(config.substrate.extrinsic_queue_capacity);
185
186	let mut finality_notifications = client.finality_notification_stream();
187	// Import notifications only used for triggering registration attempts
188	let mut import_notifications = if config.substrate.register && keystore.is_some() {
189		Some(client.import_notification_stream())
190	} else {
191		None
192	};
193	let mut next_forward_packet_delay = MaybeInfDelay::new(None);
194	let mut next_authored_packet_delay = MaybeInfDelay::new(None);
195	let mut ready_peers = FuturesUnordered::new();
196	let mut next_retry_delay = MaybeInfDelay::new(None);
197	let mut next_extrinsic_delay = MaybeInfDelay::new(None);
198	let mut submit_extrinsic_results = FuturesUnordered::new();
199
200	loop {
201		let mut next_request = if request_manager.has_space() {
202			Either::Left(api_backend.request_receiver.select_next_some())
203		} else {
204			Either::Right(pending())
205		};
206
207		let mut next_import_notification = import_notifications.as_mut().map_or_else(
208			|| Either::Right(pending()),
209			|notifications| Either::Left(notifications.select_next_some()),
210		);
211
212		futures::select! {
213			request = next_request =>
214				request_manager.insert(request, &mut mixnet, &packet_dispatcher, &config.substrate),
215
216			notification = finality_notifications.select_next_some() => {
217				// To avoid trying to connect to old mixnodes, ignore finality notifications while
218				// offline or major syncing. This is a bit racy but should be good enough.
219				if !sync.is_offline() && !sync.is_major_syncing() {
220					let api = client.runtime_api();
221					sync_with_runtime(&mut mixnet, api, notification.hash);
222					request_manager.update_session_status(
223						&mut mixnet, &packet_dispatcher, &config.substrate);
224				}
225			}
226
227			notification = next_import_notification => {
228				if notification.is_new_best && (*notification.header.number() >= min_register_block) {
229					let mut api = client.runtime_api();
230					api.register_extension(KeystoreExt(keystore.clone().expect(
231						"Import notification stream only setup if we have a keystore")));
232					api.register_extension(offchain_transaction_pool_factory
233						.offchain_transaction_pool(notification.hash));
234					let session_index = mixnet.session_status().current_index;
235					let mixnode = Mixnode {
236						kx_public: *mixnet.next_kx_public(),
237						peer_id: local_peer_id,
238						external_addresses: network.external_addresses().into_iter()
239							.map(|addr| addr.to_string().into_bytes()).collect(),
240					};
241					match api.maybe_register(notification.hash, session_index, mixnode) {
242						Ok(true) => min_register_block = notification.header.number().saturating_add(
243							MIN_BLOCKS_BETWEEN_REGISTRATION_ATTEMPTS.into()),
244						Ok(false) => (),
245						Err(err) => debug!(target: LOG_TARGET,
246							"Error trying to register for the next session: {err}"),
247					}
248				}
249			}
250
251			event = notification_service.next_event().fuse() => match event {
252				None => todo!(),
253				Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
254					let _ = result_tx.send(ValidationResult::Accept);
255				},
256				Some(NotificationEvent::NotificationStreamOpened { peer, .. }) => {
257					packet_dispatcher.add_peer(&peer);
258				},
259				Some(NotificationEvent::NotificationStreamClosed { peer }) => {
260					packet_dispatcher.remove_peer(&peer);
261				},
262				Some(NotificationEvent::NotificationReceived { peer, notification }) => {
263					let notification: Bytes = notification.into();
264
265					match notification.as_ref().try_into() {
266						Ok(packet) => handle_packet(packet,
267							&mut mixnet, &mut request_manager, &mut reply_manager,
268							&mut extrinsic_queue, &config.substrate),
269						Err(_) => debug!(target: LOG_TARGET,
270							"Dropped incorrectly sized packet ({} bytes) from {peer}",
271							notification.len(),
272						),
273					}
274				},
275			},
276
277			_ = next_forward_packet_delay => {
278				if let Some(packet) = mixnet.pop_next_forward_packet() {
279					if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
280						if let Some(fut) = ready_peer.send_packet(&notification_service) {
281							ready_peers.push(fut);
282						}
283					}
284				} else {
285					warn!(target: LOG_TARGET,
286						"Next forward packet deadline reached, but no packet in queue; \
287						this is a bug");
288				}
289			}
290
291			_ = next_authored_packet_delay => {
292				if let Some(packet) = mixnet.pop_next_authored_packet(&packet_dispatcher) {
293					if let Some(ready_peer) = packet_dispatcher.dispatch(packet) {
294						if let Some(fut) = ready_peer.send_packet(&notification_service) {
295							ready_peers.push(fut);
296						}
297					}
298				}
299			}
300
301			ready_peer = ready_peers.select_next_some() => {
302				if let Some(ready_peer) = ready_peer {
303					if let Some(fut) = ready_peer.send_packet(&notification_service) {
304						ready_peers.push(fut);
305					}
306				}
307			}
308
309			_ = next_retry_delay => {
310				if !request_manager.pop_next_retry(&mut mixnet, &packet_dispatcher, &config.substrate) {
311					warn!(target: LOG_TARGET,
312						"Next retry deadline reached, but no request in retry queue; \
313						this is a bug");
314				}
315			}
316
317			_ = next_extrinsic_delay => {
318				if let Some((extrinsic, reply_context)) = extrinsic_queue.pop() {
319					if submit_extrinsic_results.len() < config.substrate.max_pending_extrinsics {
320						let fut = transaction_pool.submit_one(
321							client.info().best_hash,
322							TransactionSource::External,
323							extrinsic);
324						submit_extrinsic_results.push(async move {
325							(fut.await, reply_context)
326						});
327					} else {
328						// There are already too many pending extrinsics, just drop this one. We
329						// don't send a reply; we want the requester to retry.
330						debug!(target: LOG_TARGET,
331							"Too many pending extrinsics; dropped submit extrinsic request");
332						reply_manager.abandon(reply_context);
333					}
334				} else {
335					warn!(target: LOG_TARGET,
336						"Next extrinsic deadline reached, but no extrinsic in queue; \
337						this is a bug");
338				}
339			}
340
341			res_reply_context = submit_extrinsic_results.select_next_some() => {
342				let (res, reply_context) = res_reply_context;
343				let res = match res {
344					Ok(_) => Ok(()),
345					Err(err) => Err(RemoteErr::Other(err.to_string())),
346				};
347				complete_submit_extrinsic(&mut reply_manager, reply_context, res, &mut mixnet);
348			}
349		}
350
351		let events = mixnet.take_events();
352		if !events.is_empty() {
353			if events.contains(Events::RESERVED_PEERS_CHANGED) {
354				let reserved_peer_addrs = mixnet
355					.reserved_peers()
356					.flat_map(|mixnode| mixnode.extra.iter()) // External addresses
357					.cloned()
358					.collect();
359				if let Err(err) =
360					network.set_reserved_peers(protocol_name.clone(), reserved_peer_addrs)
361				{
362					debug!(target: LOG_TARGET, "Setting reserved peers failed: {err}");
363				}
364			}
365			if events.contains(Events::NEXT_FORWARD_PACKET_DEADLINE_CHANGED) {
366				next_forward_packet_delay
367					.reset(mixnet.next_forward_packet_deadline().map(time_until));
368			}
369			if events.contains(Events::NEXT_AUTHORED_PACKET_DEADLINE_CHANGED) {
370				next_authored_packet_delay.reset(mixnet.next_authored_packet_delay());
371			}
372			if events.contains(Events::SPACE_IN_AUTHORED_PACKET_QUEUE) {
373				// Note this may cause the next retry deadline to change, but should not trigger
374				// any mixnet events
375				request_manager.process_post_queues(
376					&mut mixnet,
377					&packet_dispatcher,
378					&config.substrate,
379				);
380			}
381		}
382
383		if request_manager.next_retry_deadline_changed() {
384			next_retry_delay.reset(request_manager.next_retry_deadline().map(time_until));
385		}
386
387		if extrinsic_queue.next_deadline_changed() {
388			next_extrinsic_delay.reset(extrinsic_queue.next_deadline().map(time_until));
389		}
390	}
391}