1use crate::{
37 configuration::{random_latency, TestAuthorities, TestConfiguration},
38 environment::TestEnvironmentDependencies,
39 NODE_UNDER_TEST,
40};
41use codec::Encode;
42use colored::Colorize;
43use futures::{
44 channel::{
45 mpsc,
46 mpsc::{UnboundedReceiver, UnboundedSender},
47 oneshot,
48 },
49 lock::Mutex,
50 stream::FuturesUnordered,
51 Future, FutureExt, StreamExt,
52};
53use itertools::Itertools;
54use net_protocol::{
55 peer_set::ValidationVersion,
56 request_response::{Recipient, Requests, ResponseSender},
57 ObservedRole, VersionedValidationProtocol, View,
58};
59use polkadot_node_network_protocol::{self as net_protocol, ValidationProtocols};
60use polkadot_node_subsystem::messages::StatementDistributionMessage;
61use polkadot_node_subsystem_types::messages::NetworkBridgeEvent;
62use polkadot_node_subsystem_util::metrics::prometheus::{
63 self, CounterVec, Opts, PrometheusError, Registry,
64};
65use polkadot_overseer::AllMessages;
66use polkadot_primitives::AuthorityDiscoveryId;
67use prometheus_endpoint::U64;
68use rand::{seq::SliceRandom, thread_rng};
69use sc_network::{
70 request_responses::{IncomingRequest, OutgoingResponse},
71 RequestFailure,
72};
73use sc_network_types::PeerId;
74use sc_service::SpawnTaskHandle;
75use std::{
76 collections::HashMap,
77 sync::Arc,
78 task::Poll,
79 time::{Duration, Instant},
80};
81
82const LOG_TARGET: &str = "subsystem-bench::network";
83
84#[derive(Debug)]
86pub struct RateLimit {
87 tick_rate: usize,
89 total_ticks: usize,
91 max_refill: usize,
93 credits: isize,
96 last_refill: Instant,
98}
99
100impl RateLimit {
101 pub fn new(tick_rate: usize, cps: usize) -> Self {
104 let max_refill = cps / tick_rate;
106 RateLimit {
107 tick_rate,
108 total_ticks: 0,
109 max_refill,
110 credits: max_refill as isize,
112 last_refill: Instant::now(),
113 }
114 }
115
116 pub async fn refill(&mut self) {
117 let now = Instant::now();
119 let next_tick_delta =
120 (self.last_refill + Duration::from_millis(1000 / self.tick_rate as u64)) - now;
121
122 if !next_tick_delta.is_zero() {
124 gum::trace!(target: LOG_TARGET, "need to sleep {}ms", next_tick_delta.as_millis());
125 tokio::time::sleep(next_tick_delta).await;
126 }
127
128 self.total_ticks += 1;
129 self.credits += self.max_refill as isize;
130 self.last_refill = Instant::now();
131 }
132
133 pub async fn reap(&mut self, amount: usize) {
136 self.credits -= amount as isize;
137
138 if self.credits >= 0 {
139 return
140 }
141
142 while self.credits < 0 {
143 gum::trace!(target: LOG_TARGET, "Before refill: {:?}", &self);
144 self.refill().await;
145 gum::trace!(target: LOG_TARGET, "After refill: {:?}", &self);
146 }
147 }
148}
149
150#[derive(Debug)]
153pub enum NetworkMessage {
154 MessageFromPeer(PeerId, VersionedValidationProtocol),
156 MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol),
158 RequestFromNode(AuthorityDiscoveryId, Box<Requests>),
160 RequestFromPeer(IncomingRequest),
162}
163
164impl NetworkMessage {
165 pub fn size(&self) -> usize {
167 match &self {
168 NetworkMessage::MessageFromPeer(_, ValidationProtocols::V3(message)) =>
169 message.encoded_size(),
170 NetworkMessage::MessageFromNode(_peer_id, ValidationProtocols::V3(message)) =>
171 message.encoded_size(),
172 NetworkMessage::RequestFromNode(_peer_id, incoming) => incoming.size(),
173 NetworkMessage::RequestFromPeer(request) => request.payload.encoded_size(),
174 }
175 }
176
177 pub fn peer(&self) -> Option<&AuthorityDiscoveryId> {
179 match &self {
180 NetworkMessage::MessageFromNode(peer_id, _) |
181 NetworkMessage::RequestFromNode(peer_id, _) => Some(peer_id),
182 _ => None,
183 }
184 }
185}
186
187pub struct NetworkInterface {
189 bridge_to_interface_sender: UnboundedSender<NetworkMessage>,
191}
192
193pub struct NetworkInterfaceReceiver(pub UnboundedReceiver<NetworkMessage>);
196
197struct ProxiedRequest {
198 sender: Option<oneshot::Sender<OutgoingResponse>>,
199 receiver: oneshot::Receiver<OutgoingResponse>,
200}
201
202struct ProxiedResponse {
203 pub sender: oneshot::Sender<OutgoingResponse>,
204 pub result: Result<Vec<u8>, RequestFailure>,
205}
206
207impl Future for ProxiedRequest {
208 type Output = ProxiedResponse;
210
211 fn poll(
212 mut self: std::pin::Pin<&mut Self>,
213 cx: &mut std::task::Context<'_>,
214 ) -> std::task::Poll<Self::Output> {
215 match self.receiver.poll_unpin(cx) {
216 Poll::Pending => Poll::Pending,
217 Poll::Ready(response) => Poll::Ready(ProxiedResponse {
218 sender: self.sender.take().expect("sender already used"),
219 result: response
220 .expect("Response is always successfully received.")
221 .result
222 .map_err(|_| RequestFailure::Refused),
223 }),
224 }
225 }
226}
227
228impl NetworkInterface {
229 pub fn new(
231 spawn_task_handle: SpawnTaskHandle,
232 network: NetworkEmulatorHandle,
233 bandwidth_bps: usize,
234 mut from_network: UnboundedReceiver<NetworkMessage>,
235 ) -> (NetworkInterface, NetworkInterfaceReceiver) {
236 let rx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
237 let tx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
238
239 let (bridge_to_interface_sender, mut bridge_to_interface_receiver) =
241 mpsc::unbounded::<NetworkMessage>();
242
243 let (interface_to_bridge_sender, interface_to_bridge_receiver) =
245 mpsc::unbounded::<NetworkMessage>();
246
247 let rx_network = network.clone();
248 let tx_network = network;
249
250 let rx_task_bridge_sender = interface_to_bridge_sender.clone();
251
252 let task_rx_limiter = rx_limiter.clone();
253 let task_tx_limiter = tx_limiter.clone();
254
255 let rx_task = async move {
257 let mut proxied_requests = FuturesUnordered::new();
258
259 loop {
260 let mut from_network = from_network.next().fuse();
261 futures::select! {
262 maybe_peer_message = from_network => {
263 if let Some(peer_message) = maybe_peer_message {
264 let size = peer_message.size();
265 task_rx_limiter.lock().await.reap(size).await;
266 rx_network.inc_received(size);
267
268 if let NetworkMessage::RequestFromPeer(request) = peer_message {
273 let (response_sender, response_receiver) = oneshot::channel();
274
275 let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
277 proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
278
279 rx_task_bridge_sender
281 .unbounded_send(NetworkMessage::RequestFromPeer(new_request))
282 .expect("network bridge subsystem is alive");
283 continue
284 }
285
286 rx_task_bridge_sender
288 .unbounded_send(peer_message)
289 .expect("network bridge subsystem is alive");
290 } else {
291 gum::info!(target: LOG_TARGET, "Uplink channel closed, network interface task exiting");
292 break
293 }
294 },
295 proxied_request = proxied_requests.next() => {
296 if let Some(proxied_request) = proxied_request {
297 match proxied_request.result {
298 Ok(result) => {
299 let bytes = result.encoded_size();
300 gum::trace!(target: LOG_TARGET, size = bytes, "proxied request completed");
301
302 task_tx_limiter.lock().await.reap(bytes).await;
306 rx_network.inc_sent(bytes);
307
308 proxied_request.sender.send(
310 OutgoingResponse {
311 reputation_changes: Vec::new(),
312 result: Ok(result),
313 sent_feedback: None
314 }
315 ).expect("network is alive");
316 }
317 Err(e) => {
318 gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
319 }
320 }
321 } else {
322 gum::debug!(target: LOG_TARGET, "No more active proxied requests");
323 }
325 }
326 }
327 }
328 }
329 .boxed();
330
331 let task_spawn_handle = spawn_task_handle.clone();
332 let task_rx_limiter = rx_limiter.clone();
333 let task_tx_limiter = tx_limiter.clone();
334
335 let tx_task = async move {
337 let tx_network = Arc::new(tx_network);
340
341 loop {
342 if let Some(peer_message) = bridge_to_interface_receiver.next().await {
343 let size = peer_message.size();
344 task_tx_limiter.lock().await.reap(size).await;
346
347 match peer_message {
348 NetworkMessage::MessageFromNode(peer, message) =>
349 tx_network.send_message_to_peer(&peer, message),
350 NetworkMessage::RequestFromNode(peer, request) => {
351 let send_task = Self::proxy_send_request(
354 peer.clone(),
355 *request,
356 tx_network.clone(),
357 task_rx_limiter.clone(),
358 )
359 .boxed();
360
361 task_spawn_handle.spawn("request-proxy", "test-environment", send_task);
362 },
363 _ => panic!(
364 "Unexpected network message received from emulated network bridge"
365 ),
366 }
367
368 tx_network.inc_sent(size);
369 } else {
370 gum::info!(target: LOG_TARGET, "Downlink channel closed, network interface task exiting");
371 break
372 }
373 }
374 }
375 .boxed();
376
377 spawn_task_handle.spawn("network-interface-rx", "test-environment", rx_task);
378 spawn_task_handle.spawn("network-interface-tx", "test-environment", tx_task);
379
380 (
381 Self { bridge_to_interface_sender },
382 NetworkInterfaceReceiver(interface_to_bridge_receiver),
383 )
384 }
385
386 pub fn subsystem_sender(&self) -> UnboundedSender<NetworkMessage> {
388 self.bridge_to_interface_sender.clone()
389 }
390
391 async fn proxy_send_request(
394 peer: AuthorityDiscoveryId,
395 mut request: Requests,
396 tx_network: Arc<NetworkEmulatorHandle>,
397 task_rx_limiter: Arc<Mutex<RateLimit>>,
398 ) {
399 let (proxy_sender, proxy_receiver) = oneshot::channel();
400
401 let sender = request.swap_response_sender(proxy_sender);
403
404 tx_network.send_request_to_peer(&peer, request);
406
407 match proxy_receiver.await {
409 Err(_) => {
410 panic!("Emulated peer hangup");
411 },
412 Ok(Err(err)) => {
413 sender.send(Err(err)).expect("Oneshot send always works.");
414 },
415 Ok(Ok((response, protocol_name))) => {
416 let response_size = response.encoded_size();
417 task_rx_limiter.lock().await.reap(response_size).await;
418 tx_network.inc_received(response_size);
419
420 if sender.send(Ok((response, protocol_name))).is_err() {
422 gum::warn!(target: LOG_TARGET, response_size, "response oneshot canceled by node")
423 }
424 },
425 };
426 }
427}
428
429#[derive(Clone)]
431pub struct EmulatedPeerHandle {
432 messages_tx: UnboundedSender<NetworkMessage>,
434 actions_tx: UnboundedSender<NetworkMessage>,
436 peer_id: PeerId,
437 authority_id: AuthorityDiscoveryId,
438}
439
440impl EmulatedPeerHandle {
441 pub fn receive(&self, message: NetworkMessage) {
443 self.messages_tx.unbounded_send(message).expect("Peer message channel hangup");
444 }
445
446 pub fn send_message(&self, message: VersionedValidationProtocol) {
448 self.actions_tx
449 .unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message))
450 .expect("Peer action channel hangup");
451 }
452
453 pub fn send_request(&self, request: IncomingRequest) {
455 self.actions_tx
456 .unbounded_send(NetworkMessage::RequestFromPeer(request))
457 .expect("Peer action channel hangup");
458 }
459}
460
461struct EmulatedPeer {
463 spawn_handle: SpawnTaskHandle,
464 to_node: UnboundedSender<NetworkMessage>,
465 tx_limiter: RateLimit,
466 rx_limiter: RateLimit,
467 latency_ms: usize,
468}
469
470impl EmulatedPeer {
471 pub async fn send_message(&mut self, message: NetworkMessage) {
473 self.tx_limiter.reap(message.size()).await;
474
475 if self.latency_ms == 0 {
476 self.to_node.unbounded_send(message).expect("Sending to the node never fails");
477 } else {
478 let to_node = self.to_node.clone();
479 let latency_ms = std::time::Duration::from_millis(self.latency_ms as u64);
480
481 self.spawn_handle
483 .spawn("peer-latency-emulator", "test-environment", async move {
484 tokio::time::sleep(latency_ms).await;
485 to_node.unbounded_send(message).expect("Sending to the node never fails");
486 });
487 }
488 }
489
490 pub fn rx_limiter(&mut self) -> &mut RateLimit {
492 &mut self.rx_limiter
493 }
494}
495
496#[async_trait::async_trait]
498pub trait HandleNetworkMessage {
499 async fn handle(
505 &self,
506 message: NetworkMessage,
507 node_sender: &mut UnboundedSender<NetworkMessage>,
508 ) -> Option<NetworkMessage>;
509}
510
511#[async_trait::async_trait]
512impl<T> HandleNetworkMessage for Arc<T>
513where
514 T: HandleNetworkMessage + Sync + Send,
515{
516 async fn handle(
517 &self,
518 message: NetworkMessage,
519 node_sender: &mut UnboundedSender<NetworkMessage>,
520 ) -> Option<NetworkMessage> {
521 T::handle(self, message, node_sender).await
522 }
523}
524
525async fn emulated_peer_loop(
527 handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
528 stats: Arc<PeerEmulatorStats>,
529 mut emulated_peer: EmulatedPeer,
530 messages_rx: UnboundedReceiver<NetworkMessage>,
531 actions_rx: UnboundedReceiver<NetworkMessage>,
532 mut to_network_interface: UnboundedSender<NetworkMessage>,
533) {
534 let mut proxied_requests = FuturesUnordered::new();
535 let mut messages_rx = messages_rx.fuse();
536 let mut actions_rx = actions_rx.fuse();
537
538 loop {
539 futures::select! {
540 maybe_peer_message = messages_rx.next() => {
541 if let Some(peer_message) = maybe_peer_message {
542 let size = peer_message.size();
543
544 emulated_peer.rx_limiter().reap(size).await;
545 stats.inc_received(size);
546
547 let mut message = Some(peer_message);
548
549 for handler in handlers.iter() {
552 message = handler.handle(message.unwrap(), &mut to_network_interface).await;
555 if message.is_none() {
556 break
557 }
558 }
559 if let Some(message) = message {
560 panic!("Emulated message from peer {:?} not handled", message.peer());
561 }
562 } else {
563 gum::debug!(target: LOG_TARGET, "Downlink channel closed, peer task exiting");
564 break
565 }
566 },
567 maybe_action = actions_rx.next() => {
568 match maybe_action {
569 Some(NetworkMessage::RequestFromPeer(request)) => {
572 let (response_sender, response_receiver) = oneshot::channel();
573 let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
575
576 proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
577
578 emulated_peer.send_message(NetworkMessage::RequestFromPeer(new_request)).await;
579 },
580 Some(message) => emulated_peer.send_message(message).await,
581 None => {
582 gum::debug!(target: LOG_TARGET, "Action channel closed, peer task exiting");
583 break
584 }
585 }
586 },
587 proxied_request = proxied_requests.next() => {
588 if let Some(proxied_request) = proxied_request {
589 match proxied_request.result {
590 Ok(result) => {
591 let bytes = result.encoded_size();
592 gum::trace!(target: LOG_TARGET, size = bytes, "Peer proxied request completed");
593
594 emulated_peer.rx_limiter().reap(bytes).await;
595 stats.inc_received(bytes);
596
597 proxied_request.sender.send(
598 OutgoingResponse {
599 reputation_changes: Vec::new(),
600 result: Ok(result),
601 sent_feedback: None
602 }
603 ).expect("network is alive");
604 }
605 Err(e) => {
606 gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
607 }
608 }
609 }
610 }
611 }
612 }
613}
614
615#[allow(clippy::too_many_arguments)]
617pub fn new_peer(
618 bandwidth: usize,
619 spawn_task_handle: SpawnTaskHandle,
620 handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
621 stats: Arc<PeerEmulatorStats>,
622 to_network_interface: UnboundedSender<NetworkMessage>,
623 latency_ms: usize,
624 peer_id: PeerId,
625 authority_id: AuthorityDiscoveryId,
626) -> EmulatedPeerHandle {
627 let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
628 let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
629
630 let rx_limiter = RateLimit::new(10, bandwidth);
631 let tx_limiter = RateLimit::new(10, bandwidth);
632 let emulated_peer = EmulatedPeer {
633 spawn_handle: spawn_task_handle.clone(),
634 rx_limiter,
635 tx_limiter,
636 to_node: to_network_interface.clone(),
637 latency_ms,
638 };
639
640 spawn_task_handle.clone().spawn(
641 "peer-emulator",
642 "test-environment",
643 emulated_peer_loop(
644 handlers,
645 stats,
646 emulated_peer,
647 messages_rx,
648 actions_rx,
649 to_network_interface,
650 )
651 .boxed(),
652 );
653
654 EmulatedPeerHandle { messages_tx, actions_tx, peer_id, authority_id }
655}
656
657pub struct PeerEmulatorStats {
659 metrics: Metrics,
660 peer_index: usize,
661}
662
663impl PeerEmulatorStats {
664 pub(crate) fn new(peer_index: usize, metrics: Metrics) -> Self {
665 Self { metrics, peer_index }
666 }
667
668 pub fn inc_sent(&self, bytes: usize) {
669 self.metrics.on_peer_sent(self.peer_index, bytes);
670 }
671
672 pub fn inc_received(&self, bytes: usize) {
673 self.metrics.on_peer_received(self.peer_index, bytes);
674 }
675
676 pub fn sent(&self) -> usize {
677 self.metrics
678 .peer_total_sent
679 .get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
680 .expect("Metric exists")
681 .get() as usize
682 }
683
684 pub fn received(&self) -> usize {
685 self.metrics
686 .peer_total_received
687 .get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
688 .expect("Metric exists")
689 .get() as usize
690 }
691}
692
693#[derive(Clone)]
695enum Peer {
696 Connected(EmulatedPeerHandle),
697 Disconnected(EmulatedPeerHandle),
698}
699
700impl Peer {
701 pub fn disconnect(&mut self) {
702 let new_self = match self {
703 Peer::Connected(peer) => Peer::Disconnected(peer.clone()),
704 _ => return,
705 };
706 *self = new_self;
707 }
708
709 pub fn is_connected(&self) -> bool {
710 matches!(self, Peer::Connected(_))
711 }
712
713 pub fn handle(&self) -> &EmulatedPeerHandle {
714 match self {
715 Peer::Connected(ref emulator) => emulator,
716 Peer::Disconnected(ref emulator) => emulator,
717 }
718 }
719
720 pub fn authority_id(&self) -> AuthorityDiscoveryId {
721 match self {
722 Peer::Connected(handle) | Peer::Disconnected(handle) => handle.authority_id.clone(),
723 }
724 }
725
726 pub fn peer_id(&self) -> PeerId {
727 match self {
728 Peer::Connected(handle) | Peer::Disconnected(handle) => handle.peer_id,
729 }
730 }
731}
732
733#[derive(Clone)]
735pub struct NetworkEmulatorHandle {
736 peers: Vec<Peer>,
738 stats: Vec<Arc<PeerEmulatorStats>>,
740 validator_authority_ids: HashMap<AuthorityDiscoveryId, usize>,
742}
743
744impl NetworkEmulatorHandle {
745 pub fn generate_statement_distribution_peer_view_change(&self, view: View) -> Vec<AllMessages> {
746 self.peers
747 .iter()
748 .filter(|peer| peer.is_connected())
749 .map(|peer| {
750 AllMessages::StatementDistribution(
751 StatementDistributionMessage::NetworkBridgeUpdate(
752 NetworkBridgeEvent::PeerViewChange(peer.peer_id(), view.clone()),
753 ),
754 )
755 })
756 .collect_vec()
757 }
758
759 pub fn generate_peer_connected<F, T>(&self, mapper: F) -> Vec<AllMessages>
761 where
762 F: Fn(NetworkBridgeEvent<T>) -> AllMessages,
763 {
764 self.peers
765 .iter()
766 .filter(|peer| peer.is_connected())
767 .map(|peer| {
768 mapper(NetworkBridgeEvent::PeerConnected(
769 peer.handle().peer_id,
770 ObservedRole::Authority,
771 ValidationVersion::V3.into(),
772 Some(vec![peer.authority_id()].into_iter().collect()),
773 ))
774 })
775 .collect_vec()
776 }
777}
778
779pub fn new_network(
782 config: &TestConfiguration,
783 dependencies: &TestEnvironmentDependencies,
784 authorities: &TestAuthorities,
785 handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
786) -> (NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver) {
787 let n_peers = config.n_validators;
788 gum::info!(target: LOG_TARGET, "{}",format!("Initializing emulation for a {n_peers} peer network.").bright_blue());
789 gum::info!(target: LOG_TARGET, "{}",format!("connectivity {}%, latency {:?}", config.connectivity, config.latency).bright_black());
790
791 let metrics =
792 Metrics::new(&dependencies.registry).expect("Metrics always register successfully");
793 let mut validator_authority_id_mapping = HashMap::new();
794
795 let (to_network_interface, from_network) = mpsc::unbounded();
797
798 let (stats, mut peers): (_, Vec<_>) = (0..n_peers)
800 .zip(authorities.validator_authority_id.clone())
801 .map(|(peer_index, authority_id)| {
802 validator_authority_id_mapping.insert(authority_id.clone(), peer_index);
803 let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone()));
804 (
805 stats.clone(),
806 Peer::Connected(new_peer(
807 config.peer_bandwidth,
808 dependencies.task_manager.spawn_handle(),
809 handlers.clone(),
810 stats,
811 to_network_interface.clone(),
812 random_latency(config.latency.as_ref()),
813 *authorities.peer_ids.get(peer_index).unwrap(),
814 authority_id,
815 )),
816 )
817 })
818 .unzip();
819
820 let connected_count = config.connected_count();
821
822 let mut peers_indices = (0..n_peers).collect_vec();
823 let (_connected, to_disconnect) =
824 peers_indices.partial_shuffle(&mut thread_rng(), connected_count);
825
826 peers[NODE_UNDER_TEST as usize].disconnect();
828 for peer in to_disconnect.iter().skip(1) {
829 peers[*peer].disconnect();
830 }
831
832 gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {connected_count}").bright_black());
833
834 let handle = NetworkEmulatorHandle {
835 peers,
836 stats,
837 validator_authority_ids: validator_authority_id_mapping,
838 };
839
840 let (network_interface, network_interface_receiver) = NetworkInterface::new(
842 dependencies.task_manager.spawn_handle(),
843 handle.clone(),
844 config.bandwidth,
845 from_network,
846 );
847
848 (handle, network_interface, network_interface_receiver)
849}
850
851#[derive(Clone, Debug)]
853pub enum EmulatedPeerError {
854 NotConnected,
855}
856
857impl NetworkEmulatorHandle {
858 pub fn is_peer_connected(&self, peer: &AuthorityDiscoveryId) -> bool {
860 self.peer(peer).is_connected()
861 }
862
863 pub fn send_message_to_peer(
866 &self,
867 peer_id: &AuthorityDiscoveryId,
868 message: VersionedValidationProtocol,
869 ) {
870 let peer = self.peer(peer_id);
871 assert!(peer.is_connected(), "forward message only for connected peers.");
872 peer.handle().receive(NetworkMessage::MessageFromNode(peer_id.clone(), message));
873 }
874
875 pub fn send_request_to_peer(&self, peer_id: &AuthorityDiscoveryId, request: Requests) {
878 let peer = self.peer(peer_id);
879 assert!(peer.is_connected(), "forward request only for connected peers.");
880 peer.handle()
881 .receive(NetworkMessage::RequestFromNode(peer_id.clone(), Box::new(request)));
882 }
883
884 pub fn send_message_from_peer(
886 &self,
887 from_peer: &AuthorityDiscoveryId,
888 message: VersionedValidationProtocol,
889 ) -> Result<(), EmulatedPeerError> {
890 let dst_peer = self.peer(from_peer);
891
892 if !dst_peer.is_connected() {
893 gum::warn!(target: LOG_TARGET, "Attempted to send message from a peer not connected to our node, operation ignored");
894 return Err(EmulatedPeerError::NotConnected)
895 }
896
897 dst_peer.handle().send_message(message);
898 Ok(())
899 }
900
901 pub fn send_request_from_peer(
903 &self,
904 from_peer: &AuthorityDiscoveryId,
905 request: IncomingRequest,
906 ) -> Result<(), EmulatedPeerError> {
907 let dst_peer = self.peer(from_peer);
908
909 if !dst_peer.is_connected() {
910 gum::warn!(target: LOG_TARGET, "Attempted to send request from a peer not connected to our node, operation ignored");
911 return Err(EmulatedPeerError::NotConnected)
912 }
913
914 dst_peer.handle().send_request(request);
915 Ok(())
916 }
917
918 pub fn peer_stats(&self, peer_index: usize) -> Arc<PeerEmulatorStats> {
920 self.stats[peer_index].clone()
921 }
922
923 fn peer_index(&self, peer: &AuthorityDiscoveryId) -> usize {
925 *self
926 .validator_authority_ids
927 .get(peer)
928 .expect("all test authorities are valid; qed")
929 }
930
931 fn peer(&self, peer: &AuthorityDiscoveryId) -> &Peer {
933 &self.peers[self.peer_index(peer)]
934 }
935
936 pub fn inc_sent(&self, bytes: usize) {
938 self.peer_stats(0).inc_sent(bytes);
940 }
941
942 pub fn inc_received(&self, bytes: usize) {
944 self.peer_stats(0).inc_received(bytes);
946 }
947}
948
949#[derive(Clone)]
951pub(crate) struct Metrics {
952 peer_total_sent: CounterVec<U64>,
954 peer_total_received: CounterVec<U64>,
956}
957
958impl Metrics {
959 pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
960 Ok(Self {
961 peer_total_sent: prometheus::register(
962 CounterVec::new(
963 Opts::new(
964 "subsystem_benchmark_network_peer_total_bytes_sent",
965 "Total number of bytes a peer has sent.",
966 ),
967 &["peer"],
968 )?,
969 registry,
970 )?,
971 peer_total_received: prometheus::register(
972 CounterVec::new(
973 Opts::new(
974 "subsystem_benchmark_network_peer_total_bytes_received",
975 "Total number of bytes a peer has received.",
976 ),
977 &["peer"],
978 )?,
979 registry,
980 )?,
981 })
982 }
983
984 pub fn on_peer_sent(&self, peer_index: usize, bytes: usize) {
986 self.peer_total_sent
987 .with_label_values(vec![format!("node{peer_index}").as_str()].as_slice())
988 .inc_by(bytes as u64);
989 }
990
991 pub fn on_peer_received(&self, peer_index: usize, bytes: usize) {
993 self.peer_total_received
994 .with_label_values(vec![format!("node{peer_index}").as_str()].as_slice())
995 .inc_by(bytes as u64);
996 }
997}
998
999pub trait RequestExt {
1001 fn authority_id(&self) -> Option<&AuthorityDiscoveryId>;
1003 fn peer_id(&self) -> Option<&PeerId>;
1005 fn into_response_sender(self) -> ResponseSender;
1007 fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender;
1009 fn size(&self) -> usize;
1011}
1012
1013impl RequestExt for Requests {
1014 fn authority_id(&self) -> Option<&AuthorityDiscoveryId> {
1015 match self {
1016 Requests::ChunkFetching(request) => {
1017 if let Recipient::Authority(authority_id) = &request.peer {
1018 Some(authority_id)
1019 } else {
1020 None
1021 }
1022 },
1023 Requests::AvailableDataFetchingV1(request) => {
1024 if let Recipient::Authority(authority_id) = &request.peer {
1025 Some(authority_id)
1026 } else {
1027 None
1028 }
1029 },
1030 Requests::AttestedCandidateV2(_) => None,
1032 Requests::DisputeSendingV1(request) => {
1033 if let Recipient::Authority(authority_id) = &request.peer {
1034 Some(authority_id)
1035 } else {
1036 None
1037 }
1038 },
1039 request => {
1040 unimplemented!("RequestAuthority not implemented for {:?}", request)
1041 },
1042 }
1043 }
1044
1045 fn peer_id(&self) -> Option<&PeerId> {
1046 match self {
1047 Requests::AttestedCandidateV2(request) => match &request.peer {
1048 Recipient::Authority(_) => None,
1049 Recipient::Peer(peer_id) => Some(peer_id),
1050 },
1051 request => {
1052 unimplemented!("peer_id() is not implemented for {:?}", request)
1053 },
1054 }
1055 }
1056
1057 fn into_response_sender(self) -> ResponseSender {
1058 match self {
1059 Requests::ChunkFetching(outgoing_request) => outgoing_request.pending_response,
1060 Requests::AvailableDataFetchingV1(outgoing_request) =>
1061 outgoing_request.pending_response,
1062 Requests::DisputeSendingV1(outgoing_request) => outgoing_request.pending_response,
1063 _ => unimplemented!("unsupported request type"),
1064 }
1065 }
1066
1067 fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender {
1069 match self {
1070 Requests::ChunkFetching(outgoing_request) =>
1071 std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1072 Requests::AvailableDataFetchingV1(outgoing_request) =>
1073 std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1074 Requests::AttestedCandidateV2(outgoing_request) =>
1075 std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1076 Requests::DisputeSendingV1(outgoing_request) =>
1077 std::mem::replace(&mut outgoing_request.pending_response, new_sender),
1078 _ => unimplemented!("unsupported request type"),
1079 }
1080 }
1081
1082 fn size(&self) -> usize {
1084 match self {
1085 Requests::ChunkFetching(outgoing_request) => outgoing_request.payload.encoded_size(),
1086 Requests::AvailableDataFetchingV1(outgoing_request) =>
1087 outgoing_request.payload.encoded_size(),
1088 Requests::AttestedCandidateV2(outgoing_request) =>
1089 outgoing_request.payload.encoded_size(),
1090 Requests::DisputeSendingV1(outgoing_request) => outgoing_request.payload.encoded_size(),
1091 _ => unimplemented!("received an unexpected request"),
1092 }
1093 }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::RateLimit;
1099 use std::time::Instant;
1100
1101 #[tokio::test]
1102 async fn test_expected_rate() {
1103 let tick_rate = 200;
1104 let budget = 1_000_000;
1105 let mut rate_limiter = RateLimit::new(tick_rate, budget);
1107 let mut total_sent = 0usize;
1108 let start = Instant::now();
1109
1110 let mut reap_amount = 0;
1111 while rate_limiter.total_ticks < tick_rate {
1112 reap_amount += 1;
1113 reap_amount %= 100;
1114
1115 rate_limiter.reap(reap_amount).await;
1116 total_sent += reap_amount;
1117 }
1118
1119 let end = Instant::now();
1120
1121 println!("duration: {}", (end - start).as_millis());
1122
1123 let lower_bound = budget as u128 * ((end - start).as_millis() / 1000u128);
1125 let upper_bound = budget as u128 *
1126 ((end - start).as_millis() / 1000u128 + rate_limiter.max_refill as u128);
1127 assert!(total_sent as u128 >= lower_bound);
1128 assert!(total_sent as u128 <= upper_bound);
1129 }
1130}