referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/
network.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13// You should have received a copy of the GNU General Public License
14// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
15
16//! Implements network emulation and interfaces to control and specialize
17//! network peer behaviour.
18
19//	     [TestEnvironment]
20// 	  [NetworkEmulatorHandle]
21// 			    ||
22//   +-------+--||--+-------+
23//   |       |      |       |
24//  Peer1	Peer2  Peer3  Peer4
25//    \      |	    |	    /
26//     \     |      |	   /
27//      \    |      |    /
28//       \   |      |   /
29//        \  |      |  /
30//     [Network Interface]
31//               |
32//    [Emulated Network Bridge]
33//               |
34//     Subsystems under test
35
36use crate::{
37	configuration::{random_latency, TestAuthorities, TestConfiguration},
38	environment::TestEnvironmentDependencies,
39	NODE_UNDER_TEST,
40};
41use codec::Encode;
42use colored::Colorize;
43use futures::{
44	channel::{
45		mpsc,
46		mpsc::{UnboundedReceiver, UnboundedSender},
47		oneshot,
48	},
49	lock::Mutex,
50	stream::FuturesUnordered,
51	Future, FutureExt, StreamExt,
52};
53use itertools::Itertools;
54use net_protocol::{
55	peer_set::ValidationVersion,
56	request_response::{Recipient, Requests, ResponseSender},
57	ObservedRole, VersionedValidationProtocol, View,
58};
59use polkadot_node_network_protocol::{self as net_protocol, ValidationProtocols};
60use polkadot_node_subsystem::messages::StatementDistributionMessage;
61use polkadot_node_subsystem_types::messages::NetworkBridgeEvent;
62use polkadot_node_subsystem_util::metrics::prometheus::{
63	self, CounterVec, Opts, PrometheusError, Registry,
64};
65use polkadot_overseer::AllMessages;
66use polkadot_primitives::AuthorityDiscoveryId;
67use prometheus_endpoint::U64;
68use rand::{seq::SliceRandom, thread_rng};
69use sc_network::{
70	request_responses::{IncomingRequest, OutgoingResponse},
71	RequestFailure,
72};
73use sc_network_types::PeerId;
74use sc_service::SpawnTaskHandle;
75use std::{
76	collections::HashMap,
77	sync::Arc,
78	task::Poll,
79	time::{Duration, Instant},
80};
81
82const LOG_TARGET: &str = "subsystem-bench::network";
83
84// An emulated node egress traffic rate_limiter.
85#[derive(Debug)]
86pub struct RateLimit {
87	// How often we refill credits in buckets
88	tick_rate: usize,
89	// Total ticks
90	total_ticks: usize,
91	// Max refill per tick
92	max_refill: usize,
93	// Available credit. We allow for bursts over 1/tick_rate of `cps` budget, but we
94	// account it by negative credit.
95	credits: isize,
96	// When last refilled.
97	last_refill: Instant,
98}
99
100impl RateLimit {
101	// Create a new `RateLimit` from a `cps` (credits per second) budget and
102	// `tick_rate`.
103	pub fn new(tick_rate: usize, cps: usize) -> Self {
104		// Compute how much refill for each tick
105		let max_refill = cps / tick_rate;
106		RateLimit {
107			tick_rate,
108			total_ticks: 0,
109			max_refill,
110			// A fresh start
111			credits: max_refill as isize,
112			last_refill: Instant::now(),
113		}
114	}
115
116	pub async fn refill(&mut self) {
117		// If this is called to early, we need to sleep until next tick.
118		let now = Instant::now();
119		let next_tick_delta =
120			(self.last_refill + Duration::from_millis(1000 / self.tick_rate as u64)) - now;
121
122		// Sleep until next tick.
123		if !next_tick_delta.is_zero() {
124			gum::trace!(target: LOG_TARGET, "need to sleep {}ms", next_tick_delta.as_millis());
125			tokio::time::sleep(next_tick_delta).await;
126		}
127
128		self.total_ticks += 1;
129		self.credits += self.max_refill as isize;
130		self.last_refill = Instant::now();
131	}
132
133	// Reap credits from the bucket.
134	// Blocks if credits budged goes negative during call.
135	pub async fn reap(&mut self, amount: usize) {
136		self.credits -= amount as isize;
137
138		if self.credits >= 0 {
139			return
140		}
141
142		while self.credits < 0 {
143			gum::trace!(target: LOG_TARGET, "Before refill: {:?}", &self);
144			self.refill().await;
145			gum::trace!(target: LOG_TARGET, "After refill: {:?}", &self);
146		}
147	}
148}
149
150/// A wrapper for both gossip and request/response protocols along with the destination
151/// peer(`AuthorityDiscoveryId``).
152#[derive(Debug)]
153pub enum NetworkMessage {
154	/// A gossip message from peer to node.
155	MessageFromPeer(PeerId, VersionedValidationProtocol),
156	/// A gossip message from node to a peer.
157	MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol),
158	/// A request originating from our node
159	RequestFromNode(AuthorityDiscoveryId, Box<Requests>),
160	/// A request originating from an emulated peer
161	RequestFromPeer(IncomingRequest),
162}
163
164impl NetworkMessage {
165	/// Returns the size of the encoded message or request
166	pub fn size(&self) -> usize {
167		match &self {
168			NetworkMessage::MessageFromPeer(_, ValidationProtocols::V3(message)) =>
169				message.encoded_size(),
170			NetworkMessage::MessageFromNode(_peer_id, ValidationProtocols::V3(message)) =>
171				message.encoded_size(),
172			NetworkMessage::RequestFromNode(_peer_id, incoming) => incoming.size(),
173			NetworkMessage::RequestFromPeer(request) => request.payload.encoded_size(),
174		}
175	}
176
177	/// Returns the destination peer from the message or `None` if it originates from a peer.
178	pub fn peer(&self) -> Option<&AuthorityDiscoveryId> {
179		match &self {
180			NetworkMessage::MessageFromNode(peer_id, _) |
181			NetworkMessage::RequestFromNode(peer_id, _) => Some(peer_id),
182			_ => None,
183		}
184	}
185}
186
187/// A network interface of the node under test.
188pub struct NetworkInterface {
189	// Sender for subsystems.
190	bridge_to_interface_sender: UnboundedSender<NetworkMessage>,
191}
192
193// Wraps the receiving side of a interface to bridge channel. It is a required
194// parameter of the `network-bridge` mock.
195pub struct NetworkInterfaceReceiver(pub UnboundedReceiver<NetworkMessage>);
196
197struct ProxiedRequest {
198	sender: Option<oneshot::Sender<OutgoingResponse>>,
199	receiver: oneshot::Receiver<OutgoingResponse>,
200}
201
202struct ProxiedResponse {
203	pub sender: oneshot::Sender<OutgoingResponse>,
204	pub result: Result<Vec<u8>, RequestFailure>,
205}
206
207impl Future for ProxiedRequest {
208	// The sender and result.
209	type Output = ProxiedResponse;
210
211	fn poll(
212		mut self: std::pin::Pin<&mut Self>,
213		cx: &mut std::task::Context<'_>,
214	) -> std::task::Poll<Self::Output> {
215		match self.receiver.poll_unpin(cx) {
216			Poll::Pending => Poll::Pending,
217			Poll::Ready(response) => Poll::Ready(ProxiedResponse {
218				sender: self.sender.take().expect("sender already used"),
219				result: response
220					.expect("Response is always successfully received.")
221					.result
222					.map_err(|_| RequestFailure::Refused),
223			}),
224		}
225	}
226}
227
228impl NetworkInterface {
229	/// Create a new `NetworkInterface`
230	pub fn new(
231		spawn_task_handle: SpawnTaskHandle,
232		network: NetworkEmulatorHandle,
233		bandwidth_bps: usize,
234		mut from_network: UnboundedReceiver<NetworkMessage>,
235	) -> (NetworkInterface, NetworkInterfaceReceiver) {
236		let rx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
237		let tx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
238
239		// Channel for receiving messages from the network bridge subsystem.
240		let (bridge_to_interface_sender, mut bridge_to_interface_receiver) =
241			mpsc::unbounded::<NetworkMessage>();
242
243		// Channel for forwarding messages to the network bridge subsystem.
244		let (interface_to_bridge_sender, interface_to_bridge_receiver) =
245			mpsc::unbounded::<NetworkMessage>();
246
247		let rx_network = network.clone();
248		let tx_network = network;
249
250		let rx_task_bridge_sender = interface_to_bridge_sender.clone();
251
252		let task_rx_limiter = rx_limiter.clone();
253		let task_tx_limiter = tx_limiter.clone();
254
255		// A task that forwards messages from emulated peers to the node (emulated network bridge).
256		let rx_task = async move {
257			let mut proxied_requests = FuturesUnordered::new();
258
259			loop {
260				let mut from_network = from_network.next().fuse();
261				futures::select! {
262					maybe_peer_message = from_network => {
263						if let Some(peer_message) = maybe_peer_message {
264							let size = peer_message.size();
265							task_rx_limiter.lock().await.reap(size).await;
266							rx_network.inc_received(size);
267
268							// To be able to apply the configured bandwidth limits for responses being sent
269							// over channels, we need to implement a simple proxy that allows this loop
270							// to receive the response and enforce the configured bandwidth before
271							// sending it to the original recipient.
272							if let NetworkMessage::RequestFromPeer(request) = peer_message {
273								let (response_sender, response_receiver) = oneshot::channel();
274
275								// Create a new `IncomingRequest` that we forward to the network bridge.
276								let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
277								proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
278
279								// Send the new message to network bridge subsystem.
280								rx_task_bridge_sender
281									.unbounded_send(NetworkMessage::RequestFromPeer(new_request))
282									.expect("network bridge subsystem is alive");
283								continue
284							}
285
286							// Forward the message to the bridge.
287							rx_task_bridge_sender
288								.unbounded_send(peer_message)
289								.expect("network bridge subsystem is alive");
290						} else {
291							gum::info!(target: LOG_TARGET, "Uplink channel closed, network interface task exiting");
292							break
293						}
294					},
295					proxied_request = proxied_requests.next() => {
296						if let Some(proxied_request) = proxied_request {
297							match proxied_request.result {
298								Ok(result) => {
299									let bytes = result.encoded_size();
300									gum::trace!(target: LOG_TARGET, size = bytes, "proxied request completed");
301
302									// Enforce bandwidth based on the response the node has sent.
303									// TODO: Fix the stall of RX when TX lock() takes a while to refill
304									// the token bucket. Good idea would be to create a task for each request.
305									task_tx_limiter.lock().await.reap(bytes).await;
306									rx_network.inc_sent(bytes);
307
308									// Forward the response to original recipient.
309									proxied_request.sender.send(
310										OutgoingResponse {
311											reputation_changes: Vec::new(),
312											result: Ok(result),
313											sent_feedback: None
314										}
315									).expect("network is alive");
316								}
317								Err(e) => {
318									gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
319								}
320							}
321						} else {
322							gum::debug!(target: LOG_TARGET, "No more active proxied requests");
323							// break
324						}
325					}
326				}
327			}
328		}
329		.boxed();
330
331		let task_spawn_handle = spawn_task_handle.clone();
332		let task_rx_limiter = rx_limiter.clone();
333		let task_tx_limiter = tx_limiter.clone();
334
335		// A task that forwards messages from the node to emulated peers.
336		let tx_task = async move {
337			// Wrap it in an `Arc` to avoid `clone()` the inner data as we need to share it across
338			// many send tasks.
339			let tx_network = Arc::new(tx_network);
340
341			loop {
342				if let Some(peer_message) = bridge_to_interface_receiver.next().await {
343					let size = peer_message.size();
344					// Ensure bandwidth used is limited.
345					task_tx_limiter.lock().await.reap(size).await;
346
347					match peer_message {
348						NetworkMessage::MessageFromNode(peer, message) =>
349							tx_network.send_message_to_peer(&peer, message),
350						NetworkMessage::RequestFromNode(peer, request) => {
351							// Send request through a proxy so we can account and limit bandwidth
352							// usage for the node.
353							let send_task = Self::proxy_send_request(
354								peer.clone(),
355								*request,
356								tx_network.clone(),
357								task_rx_limiter.clone(),
358							)
359							.boxed();
360
361							task_spawn_handle.spawn("request-proxy", "test-environment", send_task);
362						},
363						_ => panic!(
364							"Unexpected network message received from emulated network bridge"
365						),
366					}
367
368					tx_network.inc_sent(size);
369				} else {
370					gum::info!(target: LOG_TARGET, "Downlink channel closed, network interface task exiting");
371					break
372				}
373			}
374		}
375		.boxed();
376
377		spawn_task_handle.spawn("network-interface-rx", "test-environment", rx_task);
378		spawn_task_handle.spawn("network-interface-tx", "test-environment", tx_task);
379
380		(
381			Self { bridge_to_interface_sender },
382			NetworkInterfaceReceiver(interface_to_bridge_receiver),
383		)
384	}
385
386	/// Get a sender that can be used by a subsystem to send network actions to the network.
387	pub fn subsystem_sender(&self) -> UnboundedSender<NetworkMessage> {
388		self.bridge_to_interface_sender.clone()
389	}
390
391	/// Helper method that proxies a request from node to peer and implements rate limiting and
392	/// accounting.
393	async fn proxy_send_request(
394		peer: AuthorityDiscoveryId,
395		mut request: Requests,
396		tx_network: Arc<NetworkEmulatorHandle>,
397		task_rx_limiter: Arc<Mutex<RateLimit>>,
398	) {
399		let (proxy_sender, proxy_receiver) = oneshot::channel();
400
401		// Modify the request response sender so we can intercept the answer
402		let sender = request.swap_response_sender(proxy_sender);
403
404		// Send the modified request to the peer.
405		tx_network.send_request_to_peer(&peer, request);
406
407		// Wait for answer (intercept the response).
408		match proxy_receiver.await {
409			Err(_) => {
410				panic!("Emulated peer hangup");
411			},
412			Ok(Err(err)) => {
413				sender.send(Err(err)).expect("Oneshot send always works.");
414			},
415			Ok(Ok((response, protocol_name))) => {
416				let response_size = response.encoded_size();
417				task_rx_limiter.lock().await.reap(response_size).await;
418				tx_network.inc_received(response_size);
419
420				// Send the response to the original request sender.
421				if sender.send(Ok((response, protocol_name))).is_err() {
422					gum::warn!(target: LOG_TARGET, response_size, "response oneshot canceled by node")
423				}
424			},
425		};
426	}
427}
428
429/// A handle for controlling an emulated peer.
430#[derive(Clone)]
431pub struct EmulatedPeerHandle {
432	/// Send messages to be processed by the peer.
433	messages_tx: UnboundedSender<NetworkMessage>,
434	/// Send actions to be performed by the peer.
435	actions_tx: UnboundedSender<NetworkMessage>,
436	peer_id: PeerId,
437	authority_id: AuthorityDiscoveryId,
438}
439
440impl EmulatedPeerHandle {
441	/// Receive and process a message from the node.
442	pub fn receive(&self, message: NetworkMessage) {
443		self.messages_tx.unbounded_send(message).expect("Peer message channel hangup");
444	}
445
446	/// Send a message to the node.
447	pub fn send_message(&self, message: VersionedValidationProtocol) {
448		self.actions_tx
449			.unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message))
450			.expect("Peer action channel hangup");
451	}
452
453	/// Send a `request` to the node.
454	pub fn send_request(&self, request: IncomingRequest) {
455		self.actions_tx
456			.unbounded_send(NetworkMessage::RequestFromPeer(request))
457			.expect("Peer action channel hangup");
458	}
459}
460
461// A network peer emulator.
462struct EmulatedPeer {
463	spawn_handle: SpawnTaskHandle,
464	to_node: UnboundedSender<NetworkMessage>,
465	tx_limiter: RateLimit,
466	rx_limiter: RateLimit,
467	latency_ms: usize,
468}
469
470impl EmulatedPeer {
471	/// Send a message to the node.
472	pub async fn send_message(&mut self, message: NetworkMessage) {
473		self.tx_limiter.reap(message.size()).await;
474
475		if self.latency_ms == 0 {
476			self.to_node.unbounded_send(message).expect("Sending to the node never fails");
477		} else {
478			let to_node = self.to_node.clone();
479			let latency_ms = std::time::Duration::from_millis(self.latency_ms as u64);
480
481			// Emulate RTT latency
482			self.spawn_handle
483				.spawn("peer-latency-emulator", "test-environment", async move {
484					tokio::time::sleep(latency_ms).await;
485					to_node.unbounded_send(message).expect("Sending to the node never fails");
486				});
487		}
488	}
489
490	/// Returns the rx bandwidth limiter.
491	pub fn rx_limiter(&mut self) -> &mut RateLimit {
492		&mut self.rx_limiter
493	}
494}
495
496/// Interceptor pattern for handling messages.
497#[async_trait::async_trait]
498pub trait HandleNetworkMessage {
499	/// Returns `None` if the message was handled, or the `message`
500	/// otherwise.
501	///
502	/// `node_sender` allows sending of messages to the node in response
503	/// to the handled message.
504	async fn handle(
505		&self,
506		message: NetworkMessage,
507		node_sender: &mut UnboundedSender<NetworkMessage>,
508	) -> Option<NetworkMessage>;
509}
510
511#[async_trait::async_trait]
512impl<T> HandleNetworkMessage for Arc<T>
513where
514	T: HandleNetworkMessage + Sync + Send,
515{
516	async fn handle(
517		&self,
518		message: NetworkMessage,
519		node_sender: &mut UnboundedSender<NetworkMessage>,
520	) -> Option<NetworkMessage> {
521		T::handle(self, message, node_sender).await
522	}
523}
524
525// This loop is responsible for handling of messages/requests between the peer and the node.
526async fn emulated_peer_loop(
527	handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
528	stats: Arc<PeerEmulatorStats>,
529	mut emulated_peer: EmulatedPeer,
530	messages_rx: UnboundedReceiver<NetworkMessage>,
531	actions_rx: UnboundedReceiver<NetworkMessage>,
532	mut to_network_interface: UnboundedSender<NetworkMessage>,
533) {
534	let mut proxied_requests = FuturesUnordered::new();
535	let mut messages_rx = messages_rx.fuse();
536	let mut actions_rx = actions_rx.fuse();
537
538	loop {
539		futures::select! {
540			maybe_peer_message = messages_rx.next() => {
541				if let Some(peer_message) = maybe_peer_message {
542					let size = peer_message.size();
543
544					emulated_peer.rx_limiter().reap(size).await;
545					stats.inc_received(size);
546
547					let mut message = Some(peer_message);
548
549					// Try all handlers until the message gets processed.
550					// Panic if the message is not consumed.
551					for handler in handlers.iter() {
552						// The check below guarantees that message is always `Some`: we are still
553						// inside the loop.
554						message = handler.handle(message.unwrap(), &mut to_network_interface).await;
555						if message.is_none() {
556							break
557						}
558					}
559					if let Some(message) = message {
560						panic!("Emulated message from peer {:?} not handled", message.peer());
561					}
562				} else {
563					gum::debug!(target: LOG_TARGET, "Downlink channel closed, peer task exiting");
564					break
565				}
566			},
567			maybe_action = actions_rx.next() => {
568				match maybe_action {
569					// We proxy any request being sent to the node to limit bandwidth as we
570					// do in the `NetworkInterface` task.
571					Some(NetworkMessage::RequestFromPeer(request)) => {
572						let (response_sender, response_receiver) = oneshot::channel();
573						// Create a new `IncomingRequest` that we forward to the network interface.
574						let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
575
576						proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
577
578						emulated_peer.send_message(NetworkMessage::RequestFromPeer(new_request)).await;
579					},
580					Some(message) => emulated_peer.send_message(message).await,
581					None => {
582						gum::debug!(target: LOG_TARGET, "Action channel closed, peer task exiting");
583						break
584					}
585				}
586			},
587			proxied_request = proxied_requests.next() => {
588				if let Some(proxied_request) = proxied_request {
589					match proxied_request.result {
590						Ok(result) => {
591							let bytes = result.encoded_size();
592							gum::trace!(target: LOG_TARGET, size = bytes, "Peer proxied request completed");
593
594							emulated_peer.rx_limiter().reap(bytes).await;
595							stats.inc_received(bytes);
596
597							proxied_request.sender.send(
598								OutgoingResponse {
599									reputation_changes: Vec::new(),
600									result: Ok(result),
601									sent_feedback: None
602								}
603							).expect("network is alive");
604						}
605						Err(e) => {
606							gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
607						}
608					}
609				}
610			}
611		}
612	}
613}
614
615/// Creates a new peer emulator task and returns a handle to it.
616#[allow(clippy::too_many_arguments)]
617pub fn new_peer(
618	bandwidth: usize,
619	spawn_task_handle: SpawnTaskHandle,
620	handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
621	stats: Arc<PeerEmulatorStats>,
622	to_network_interface: UnboundedSender<NetworkMessage>,
623	latency_ms: usize,
624	peer_id: PeerId,
625	authority_id: AuthorityDiscoveryId,
626) -> EmulatedPeerHandle {
627	let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
628	let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
629
630	let rx_limiter = RateLimit::new(10, bandwidth);
631	let tx_limiter = RateLimit::new(10, bandwidth);
632	let emulated_peer = EmulatedPeer {
633		spawn_handle: spawn_task_handle.clone(),
634		rx_limiter,
635		tx_limiter,
636		to_node: to_network_interface.clone(),
637		latency_ms,
638	};
639
640	spawn_task_handle.clone().spawn(
641		"peer-emulator",
642		"test-environment",
643		emulated_peer_loop(
644			handlers,
645			stats,
646			emulated_peer,
647			messages_rx,
648			actions_rx,
649			to_network_interface,
650		)
651		.boxed(),
652	);
653
654	EmulatedPeerHandle { messages_tx, actions_tx, peer_id, authority_id }
655}
656
657/// Book keeping of sent and received bytes.
658pub struct PeerEmulatorStats {
659	metrics: Metrics,
660	peer_index: usize,
661}
662
663impl PeerEmulatorStats {
664	pub(crate) fn new(peer_index: usize, metrics: Metrics) -> Self {
665		Self { metrics, peer_index }
666	}
667
668	pub fn inc_sent(&self, bytes: usize) {
669		self.metrics.on_peer_sent(self.peer_index, bytes);
670	}
671
672	pub fn inc_received(&self, bytes: usize) {
673		self.metrics.on_peer_received(self.peer_index, bytes);
674	}
675
676	pub fn sent(&self) -> usize {
677		self.metrics
678			.peer_total_sent
679			.get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
680			.expect("Metric exists")
681			.get() as usize
682	}
683
684	pub fn received(&self) -> usize {
685		self.metrics
686			.peer_total_received
687			.get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
688			.expect("Metric exists")
689			.get() as usize
690	}
691}
692
693/// The state of a peer on the emulated network.
694#[derive(Clone)]
695enum Peer {
696	Connected(EmulatedPeerHandle),
697	Disconnected(EmulatedPeerHandle),
698}
699
700impl Peer {
701	pub fn disconnect(&mut self) {
702		let new_self = match self {
703			Peer::Connected(peer) => Peer::Disconnected(peer.clone()),
704			_ => return,
705		};
706		*self = new_self;
707	}
708
709	pub fn is_connected(&self) -> bool {
710		matches!(self, Peer::Connected(_))
711	}
712
713	pub fn handle(&self) -> &EmulatedPeerHandle {
714		match self {
715			Peer::Connected(ref emulator) => emulator,
716			Peer::Disconnected(ref emulator) => emulator,
717		}
718	}
719
720	pub fn authority_id(&self) -> AuthorityDiscoveryId {
721		match self {
722			Peer::Connected(handle) | Peer::Disconnected(handle) => handle.authority_id.clone(),
723		}
724	}
725
726	pub fn peer_id(&self) -> PeerId {
727		match self {
728			Peer::Connected(handle) | Peer::Disconnected(handle) => handle.peer_id,
729		}
730	}
731}
732
733/// A ha emulated network implementation.
734#[derive(Clone)]
735pub struct NetworkEmulatorHandle {
736	// Per peer network emulation.
737	peers: Vec<Peer>,
738	/// Per peer stats.
739	stats: Vec<Arc<PeerEmulatorStats>>,
740	/// Each emulated peer is a validator.
741	validator_authority_ids: HashMap<AuthorityDiscoveryId, usize>,
742}
743
744impl NetworkEmulatorHandle {
745	pub fn generate_statement_distribution_peer_view_change(&self, view: View) -> Vec<AllMessages> {
746		self.peers
747			.iter()
748			.filter(|peer| peer.is_connected())
749			.map(|peer| {
750				AllMessages::StatementDistribution(
751					StatementDistributionMessage::NetworkBridgeUpdate(
752						NetworkBridgeEvent::PeerViewChange(peer.peer_id(), view.clone()),
753					),
754				)
755			})
756			.collect_vec()
757	}
758
759	/// Generates peer_connected messages for all peers in `test_authorities`
760	pub fn generate_peer_connected<F, T>(&self, mapper: F) -> Vec<AllMessages>
761	where
762		F: Fn(NetworkBridgeEvent<T>) -> AllMessages,
763	{
764		self.peers
765			.iter()
766			.filter(|peer| peer.is_connected())
767			.map(|peer| {
768				mapper(NetworkBridgeEvent::PeerConnected(
769					peer.handle().peer_id,
770					ObservedRole::Authority,
771					ValidationVersion::V3.into(),
772					Some(vec![peer.authority_id()].into_iter().collect()),
773				))
774			})
775			.collect_vec()
776	}
777}
778
779/// Create a new emulated network based on `config`.
780/// Each emulated peer will run the specified `handlers` to process incoming messages.
781pub fn new_network(
782	config: &TestConfiguration,
783	dependencies: &TestEnvironmentDependencies,
784	authorities: &TestAuthorities,
785	handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
786) -> (NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver) {
787	let n_peers = config.n_validators;
788	gum::info!(target: LOG_TARGET, "{}",format!("Initializing emulation for a {n_peers} peer network.").bright_blue());
789	gum::info!(target: LOG_TARGET, "{}",format!("connectivity {}%, latency {:?}", config.connectivity, config.latency).bright_black());
790
791	let metrics =
792		Metrics::new(&dependencies.registry).expect("Metrics always register successfully");
793	let mut validator_authority_id_mapping = HashMap::new();
794
795	// Create the channel from `peer` to `NetworkInterface` .
796	let (to_network_interface, from_network) = mpsc::unbounded();
797
798	// Create a `PeerEmulator` for each peer.
799	let (stats, mut peers): (_, Vec<_>) = (0..n_peers)
800		.zip(authorities.validator_authority_id.clone())
801		.map(|(peer_index, authority_id)| {
802			validator_authority_id_mapping.insert(authority_id.clone(), peer_index);
803			let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone()));
804			(
805				stats.clone(),
806				Peer::Connected(new_peer(
807					config.peer_bandwidth,
808					dependencies.task_manager.spawn_handle(),
809					handlers.clone(),
810					stats,
811					to_network_interface.clone(),
812					random_latency(config.latency.as_ref()),
813					*authorities.peer_ids.get(peer_index).unwrap(),
814					authority_id,
815				)),
816			)
817		})
818		.unzip();
819
820	let connected_count = config.connected_count();
821
822	let mut peers_indices = (0..n_peers).collect_vec();
823	let (_connected, to_disconnect) =
824		peers_indices.partial_shuffle(&mut thread_rng(), connected_count);
825
826	// Node under test is always mark as disconnected.
827	peers[NODE_UNDER_TEST as usize].disconnect();
828	for peer in to_disconnect.iter().skip(1) {
829		peers[*peer].disconnect();
830	}
831
832	gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {connected_count}").bright_black());
833
834	let handle = NetworkEmulatorHandle {
835		peers,
836		stats,
837		validator_authority_ids: validator_authority_id_mapping,
838	};
839
840	// Finally create the `NetworkInterface` with the `from_network` receiver.
841	let (network_interface, network_interface_receiver) = NetworkInterface::new(
842		dependencies.task_manager.spawn_handle(),
843		handle.clone(),
844		config.bandwidth,
845		from_network,
846	);
847
848	(handle, network_interface, network_interface_receiver)
849}
850
851/// Errors that can happen when sending data to emulated peers.
852#[derive(Clone, Debug)]
853pub enum EmulatedPeerError {
854	NotConnected,
855}
856
857impl NetworkEmulatorHandle {
858	/// Returns true if the emulated peer is connected to the node under test.
859	pub fn is_peer_connected(&self, peer: &AuthorityDiscoveryId) -> bool {
860		self.peer(peer).is_connected()
861	}
862
863	/// Forward notification `message` to an emulated `peer`.
864	/// Panics if peer is not connected.
865	pub fn send_message_to_peer(
866		&self,
867		peer_id: &AuthorityDiscoveryId,
868		message: VersionedValidationProtocol,
869	) {
870		let peer = self.peer(peer_id);
871		assert!(peer.is_connected(), "forward message only for connected peers.");
872		peer.handle().receive(NetworkMessage::MessageFromNode(peer_id.clone(), message));
873	}
874
875	/// Forward a `request`` to an emulated `peer`.
876	/// Panics if peer is not connected.
877	pub fn send_request_to_peer(&self, peer_id: &AuthorityDiscoveryId, request: Requests) {
878		let peer = self.peer(peer_id);
879		assert!(peer.is_connected(), "forward request only for connected peers.");
880		peer.handle()
881			.receive(NetworkMessage::RequestFromNode(peer_id.clone(), Box::new(request)));
882	}
883
884	/// Send a message from a peer to the node.
885	pub fn send_message_from_peer(
886		&self,
887		from_peer: &AuthorityDiscoveryId,
888		message: VersionedValidationProtocol,
889	) -> Result<(), EmulatedPeerError> {
890		let dst_peer = self.peer(from_peer);
891
892		if !dst_peer.is_connected() {
893			gum::warn!(target: LOG_TARGET, "Attempted to send message from a peer not connected to our node, operation ignored");
894			return Err(EmulatedPeerError::NotConnected)
895		}
896
897		dst_peer.handle().send_message(message);
898		Ok(())
899	}
900
901	/// Send a request from a peer to the node.
902	pub fn send_request_from_peer(
903		&self,
904		from_peer: &AuthorityDiscoveryId,
905		request: IncomingRequest,
906	) -> Result<(), EmulatedPeerError> {
907		let dst_peer = self.peer(from_peer);
908
909		if !dst_peer.is_connected() {
910			gum::warn!(target: LOG_TARGET, "Attempted to send request from a peer not connected to our node, operation ignored");
911			return Err(EmulatedPeerError::NotConnected)
912		}
913
914		dst_peer.handle().send_request(request);
915		Ok(())
916	}
917
918	// Returns the sent/received stats for `peer_index`.
919	pub fn peer_stats(&self, peer_index: usize) -> Arc<PeerEmulatorStats> {
920		self.stats[peer_index].clone()
921	}
922
923	// Helper to get peer index by `AuthorityDiscoveryId`
924	fn peer_index(&self, peer: &AuthorityDiscoveryId) -> usize {
925		*self
926			.validator_authority_ids
927			.get(peer)
928			.expect("all test authorities are valid; qed")
929	}
930
931	// Return the Peer entry for a given `AuthorityDiscoveryId`.
932	fn peer(&self, peer: &AuthorityDiscoveryId) -> &Peer {
933		&self.peers[self.peer_index(peer)]
934	}
935
936	// Increment bytes sent by our node (the node that contains the subsystem under test)
937	pub fn inc_sent(&self, bytes: usize) {
938		// Our node is always peer 0.
939		self.peer_stats(0).inc_sent(bytes);
940	}
941
942	// Increment bytes received by our node (the node that contains the subsystem under test)
943	pub fn inc_received(&self, bytes: usize) {
944		// Our node is always peer 0.
945		self.peer_stats(0).inc_received(bytes);
946	}
947}
948
949/// Emulated network metrics.
950#[derive(Clone)]
951pub(crate) struct Metrics {
952	/// Number of bytes sent per peer.
953	peer_total_sent: CounterVec<U64>,
954	/// Number of received sent per peer.
955	peer_total_received: CounterVec<U64>,
956}
957
958impl Metrics {
959	pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
960		Ok(Self {
961			peer_total_sent: prometheus::register(
962				CounterVec::new(
963					Opts::new(
964						"subsystem_benchmark_network_peer_total_bytes_sent",
965						"Total number of bytes a peer has sent.",
966					),
967					&["peer"],
968				)?,
969				registry,
970			)?,
971			peer_total_received: prometheus::register(
972				CounterVec::new(
973					Opts::new(
974						"subsystem_benchmark_network_peer_total_bytes_received",
975						"Total number of bytes a peer has received.",
976					),
977					&["peer"],
978				)?,
979				registry,
980			)?,
981		})
982	}
983
984	/// Increment total sent for a peer.
985	pub fn on_peer_sent(&self, peer_index: usize, bytes: usize) {
986		self.peer_total_sent
987			.with_label_values(vec![format!("node{peer_index}").as_str()].as_slice())
988			.inc_by(bytes as u64);
989	}
990
991	/// Increment total received for a peer.
992	pub fn on_peer_received(&self, peer_index: usize, bytes: usize) {
993		self.peer_total_received
994			.with_label_values(vec![format!("node{peer_index}").as_str()].as_slice())
995			.inc_by(bytes as u64);
996	}
997}
998
999// Helper trait for low level access to `Requests` variants.
1000pub trait RequestExt {
1001	/// Get the authority id if any from the request.
1002	fn authority_id(&self) -> Option<&AuthorityDiscoveryId>;
1003	/// Get the peer id if any from the request.
1004	fn peer_id(&self) -> Option<&PeerId>;
1005	/// Consume self and return the response sender.
1006	fn into_response_sender(self) -> ResponseSender;
1007	/// Allows to change the `ResponseSender` in place.
1008	fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender;
1009	/// Returns the size in bytes of the request payload.
1010	fn size(&self) -> usize;
1011}
1012
1013impl RequestExt for Requests {
1014	fn authority_id(&self) -> Option<&AuthorityDiscoveryId> {
1015		match self {
1016			Requests::ChunkFetching(request) => {
1017				if let Recipient::Authority(authority_id) = &request.peer {
1018					Some(authority_id)
1019				} else {
1020					None
1021				}
1022			},
1023			Requests::AvailableDataFetchingV1(request) => {
1024				if let Recipient::Authority(authority_id) = &request.peer {
1025					Some(authority_id)
1026				} else {
1027					None
1028				}
1029			},
1030			// Requested by PeerId
1031			Requests::AttestedCandidateV2(_) => None,
1032			Requests::DisputeSendingV1(request) => {
1033				if let Recipient::Authority(authority_id) = &request.peer {
1034					Some(authority_id)
1035				} else {
1036					None
1037				}
1038			},
1039			request => {
1040				unimplemented!("RequestAuthority not implemented for {:?}", request)
1041			},
1042		}
1043	}
1044
1045	fn peer_id(&self) -> Option<&PeerId> {
1046		match self {
1047			Requests::AttestedCandidateV2(request) => match &request.peer {
1048				Recipient::Authority(_) => None,
1049				Recipient::Peer(peer_id) => Some(peer_id),
1050			},
1051			request => {
1052				unimplemented!("peer_id() is not implemented for {:?}", request)
1053			},
1054		}
1055	}
1056
1057	fn into_response_sender(self) -> ResponseSender {
1058		match self {
1059			Requests::ChunkFetching(outgoing_request) => outgoing_request.pending_response,
1060			Requests::AvailableDataFetchingV1(outgoing_request) =>
1061				outgoing_request.pending_response,
1062			Requests::DisputeSendingV1(outgoing_request) => outgoing_request.pending_response,
1063			_ => unimplemented!("unsupported request type"),
1064		}
1065	}
1066
1067	/// Swaps the `ResponseSender` and returns the previous value.
1068	fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender {
1069		match self {
1070			Requests::ChunkFetching(outgoing_request) =>
1071				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1072			Requests::AvailableDataFetchingV1(outgoing_request) =>
1073				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1074			Requests::AttestedCandidateV2(outgoing_request) =>
1075				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1076			Requests::DisputeSendingV1(outgoing_request) =>
1077				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1078			_ => unimplemented!("unsupported request type"),
1079		}
1080	}
1081
1082	/// Returns the size in bytes of the request payload.
1083	fn size(&self) -> usize {
1084		match self {
1085			Requests::ChunkFetching(outgoing_request) => outgoing_request.payload.encoded_size(),
1086			Requests::AvailableDataFetchingV1(outgoing_request) =>
1087				outgoing_request.payload.encoded_size(),
1088			Requests::AttestedCandidateV2(outgoing_request) =>
1089				outgoing_request.payload.encoded_size(),
1090			Requests::DisputeSendingV1(outgoing_request) => outgoing_request.payload.encoded_size(),
1091			_ => unimplemented!("received an unexpected request"),
1092		}
1093	}
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098	use super::RateLimit;
1099	use std::time::Instant;
1100
1101	#[tokio::test]
1102	async fn test_expected_rate() {
1103		let tick_rate = 200;
1104		let budget = 1_000_000;
1105		// rate must not exceed 100 credits per second
1106		let mut rate_limiter = RateLimit::new(tick_rate, budget);
1107		let mut total_sent = 0usize;
1108		let start = Instant::now();
1109
1110		let mut reap_amount = 0;
1111		while rate_limiter.total_ticks < tick_rate {
1112			reap_amount += 1;
1113			reap_amount %= 100;
1114
1115			rate_limiter.reap(reap_amount).await;
1116			total_sent += reap_amount;
1117		}
1118
1119		let end = Instant::now();
1120
1121		println!("duration: {}", (end - start).as_millis());
1122
1123		// Allow up to `budget/max_refill` error tolerance
1124		let lower_bound = budget as u128 * ((end - start).as_millis() / 1000u128);
1125		let upper_bound = budget as u128 *
1126			((end - start).as_millis() / 1000u128 + rate_limiter.max_refill as u128);
1127		assert!(total_sent as u128 >= lower_bound);
1128		assert!(total_sent as u128 <= upper_bound);
1129	}
1130}