use crate::{
configuration::{random_latency, TestAuthorities, TestConfiguration},
environment::TestEnvironmentDependencies,
NODE_UNDER_TEST,
};
use codec::Encode;
use colored::Colorize;
use futures::{
channel::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
lock::Mutex,
stream::FuturesUnordered,
Future, FutureExt, StreamExt,
};
use itertools::Itertools;
use net_protocol::{
peer_set::ValidationVersion,
request_response::{Recipient, Requests, ResponseSender},
ObservedRole, VersionedValidationProtocol, View,
};
use polkadot_node_network_protocol::{self as net_protocol, Versioned};
use polkadot_node_subsystem::messages::StatementDistributionMessage;
use polkadot_node_subsystem_types::messages::NetworkBridgeEvent;
use polkadot_node_subsystem_util::metrics::prometheus::{
self, CounterVec, Opts, PrometheusError, Registry,
};
use polkadot_overseer::AllMessages;
use polkadot_primitives::AuthorityDiscoveryId;
use prometheus_endpoint::U64;
use rand::{seq::SliceRandom, thread_rng};
use sc_network::{
request_responses::{IncomingRequest, OutgoingResponse},
RequestFailure,
};
use sc_network_types::PeerId;
use sc_service::SpawnTaskHandle;
use std::{
collections::HashMap,
sync::Arc,
task::Poll,
time::{Duration, Instant},
};
const LOG_TARGET: &str = "subsystem-bench::network";
#[derive(Debug)]
pub struct RateLimit {
tick_rate: usize,
total_ticks: usize,
max_refill: usize,
credits: isize,
last_refill: Instant,
}
impl RateLimit {
pub fn new(tick_rate: usize, cps: usize) -> Self {
let max_refill = cps / tick_rate;
RateLimit {
tick_rate,
total_ticks: 0,
max_refill,
credits: max_refill as isize,
last_refill: Instant::now(),
}
}
pub async fn refill(&mut self) {
let now = Instant::now();
let next_tick_delta =
(self.last_refill + Duration::from_millis(1000 / self.tick_rate as u64)) - now;
if !next_tick_delta.is_zero() {
gum::trace!(target: LOG_TARGET, "need to sleep {}ms", next_tick_delta.as_millis());
tokio::time::sleep(next_tick_delta).await;
}
self.total_ticks += 1;
self.credits += self.max_refill as isize;
self.last_refill = Instant::now();
}
pub async fn reap(&mut self, amount: usize) {
self.credits -= amount as isize;
if self.credits >= 0 {
return
}
while self.credits < 0 {
gum::trace!(target: LOG_TARGET, "Before refill: {:?}", &self);
self.refill().await;
gum::trace!(target: LOG_TARGET, "After refill: {:?}", &self);
}
}
}
pub enum NetworkMessage {
MessageFromPeer(PeerId, VersionedValidationProtocol),
MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol),
RequestFromNode(AuthorityDiscoveryId, Requests),
RequestFromPeer(IncomingRequest),
}
impl NetworkMessage {
pub fn size(&self) -> usize {
match &self {
NetworkMessage::MessageFromPeer(_, Versioned::V2(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(_, Versioned::V1(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(_, Versioned::V3(message)) => message.encoded_size(),
NetworkMessage::MessageFromNode(_peer_id, Versioned::V2(message)) =>
message.encoded_size(),
NetworkMessage::MessageFromNode(_peer_id, Versioned::V1(message)) =>
message.encoded_size(),
NetworkMessage::MessageFromNode(_peer_id, Versioned::V3(message)) =>
message.encoded_size(),
NetworkMessage::RequestFromNode(_peer_id, incoming) => incoming.size(),
NetworkMessage::RequestFromPeer(request) => request.payload.encoded_size(),
}
}
pub fn peer(&self) -> Option<&AuthorityDiscoveryId> {
match &self {
NetworkMessage::MessageFromNode(peer_id, _) |
NetworkMessage::RequestFromNode(peer_id, _) => Some(peer_id),
_ => None,
}
}
}
pub struct NetworkInterface {
bridge_to_interface_sender: UnboundedSender<NetworkMessage>,
}
pub struct NetworkInterfaceReceiver(pub UnboundedReceiver<NetworkMessage>);
struct ProxiedRequest {
sender: Option<oneshot::Sender<OutgoingResponse>>,
receiver: oneshot::Receiver<OutgoingResponse>,
}
struct ProxiedResponse {
pub sender: oneshot::Sender<OutgoingResponse>,
pub result: Result<Vec<u8>, RequestFailure>,
}
impl Future for ProxiedRequest {
type Output = ProxiedResponse;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.receiver.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(response) => Poll::Ready(ProxiedResponse {
sender: self.sender.take().expect("sender already used"),
result: response
.expect("Response is always successfully received.")
.result
.map_err(|_| RequestFailure::Refused),
}),
}
}
}
impl NetworkInterface {
pub fn new(
spawn_task_handle: SpawnTaskHandle,
network: NetworkEmulatorHandle,
bandwidth_bps: usize,
mut from_network: UnboundedReceiver<NetworkMessage>,
) -> (NetworkInterface, NetworkInterfaceReceiver) {
let rx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
let tx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps)));
let (bridge_to_interface_sender, mut bridge_to_interface_receiver) =
mpsc::unbounded::<NetworkMessage>();
let (interface_to_bridge_sender, interface_to_bridge_receiver) =
mpsc::unbounded::<NetworkMessage>();
let rx_network = network.clone();
let tx_network = network;
let rx_task_bridge_sender = interface_to_bridge_sender.clone();
let task_rx_limiter = rx_limiter.clone();
let task_tx_limiter = tx_limiter.clone();
let rx_task = async move {
let mut proxied_requests = FuturesUnordered::new();
loop {
let mut from_network = from_network.next().fuse();
futures::select! {
maybe_peer_message = from_network => {
if let Some(peer_message) = maybe_peer_message {
let size = peer_message.size();
task_rx_limiter.lock().await.reap(size).await;
rx_network.inc_received(size);
if let NetworkMessage::RequestFromPeer(request) = peer_message {
let (response_sender, response_receiver) = oneshot::channel();
let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
rx_task_bridge_sender
.unbounded_send(NetworkMessage::RequestFromPeer(new_request))
.expect("network bridge subsystem is alive");
continue
}
rx_task_bridge_sender
.unbounded_send(peer_message)
.expect("network bridge subsystem is alive");
} else {
gum::info!(target: LOG_TARGET, "Uplink channel closed, network interface task exiting");
break
}
},
proxied_request = proxied_requests.next() => {
if let Some(proxied_request) = proxied_request {
match proxied_request.result {
Ok(result) => {
let bytes = result.encoded_size();
gum::trace!(target: LOG_TARGET, size = bytes, "proxied request completed");
task_tx_limiter.lock().await.reap(bytes).await;
rx_network.inc_sent(bytes);
proxied_request.sender.send(
OutgoingResponse {
reputation_changes: Vec::new(),
result: Ok(result),
sent_feedback: None
}
).expect("network is alive");
}
Err(e) => {
gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
}
}
} else {
gum::debug!(target: LOG_TARGET, "No more active proxied requests");
}
}
}
}
}
.boxed();
let task_spawn_handle = spawn_task_handle.clone();
let task_rx_limiter = rx_limiter.clone();
let task_tx_limiter = tx_limiter.clone();
let tx_task = async move {
let tx_network = Arc::new(tx_network);
loop {
if let Some(peer_message) = bridge_to_interface_receiver.next().await {
let size = peer_message.size();
task_tx_limiter.lock().await.reap(size).await;
match peer_message {
NetworkMessage::MessageFromNode(peer, message) =>
tx_network.send_message_to_peer(&peer, message),
NetworkMessage::RequestFromNode(peer, request) => {
let send_task = Self::proxy_send_request(
peer.clone(),
request,
tx_network.clone(),
task_rx_limiter.clone(),
)
.boxed();
task_spawn_handle.spawn("request-proxy", "test-environment", send_task);
},
_ => panic!(
"Unexpected network message received from emulated network bridge"
),
}
tx_network.inc_sent(size);
} else {
gum::info!(target: LOG_TARGET, "Downlink channel closed, network interface task exiting");
break
}
}
}
.boxed();
spawn_task_handle.spawn("network-interface-rx", "test-environment", rx_task);
spawn_task_handle.spawn("network-interface-tx", "test-environment", tx_task);
(
Self { bridge_to_interface_sender },
NetworkInterfaceReceiver(interface_to_bridge_receiver),
)
}
pub fn subsystem_sender(&self) -> UnboundedSender<NetworkMessage> {
self.bridge_to_interface_sender.clone()
}
async fn proxy_send_request(
peer: AuthorityDiscoveryId,
mut request: Requests,
tx_network: Arc<NetworkEmulatorHandle>,
task_rx_limiter: Arc<Mutex<RateLimit>>,
) {
let (proxy_sender, proxy_receiver) = oneshot::channel();
let sender = request.swap_response_sender(proxy_sender);
tx_network.send_request_to_peer(&peer, request);
match proxy_receiver.await {
Err(_) => {
panic!("Emulated peer hangup");
},
Ok(Err(err)) => {
sender.send(Err(err)).expect("Oneshot send always works.");
},
Ok(Ok((response, protocol_name))) => {
let response_size = response.encoded_size();
task_rx_limiter.lock().await.reap(response_size).await;
tx_network.inc_received(response_size);
if sender.send(Ok((response, protocol_name))).is_err() {
gum::warn!(target: LOG_TARGET, response_size, "response oneshot canceled by node")
}
},
};
}
}
#[derive(Clone)]
pub struct EmulatedPeerHandle {
messages_tx: UnboundedSender<NetworkMessage>,
actions_tx: UnboundedSender<NetworkMessage>,
peer_id: PeerId,
authority_id: AuthorityDiscoveryId,
}
impl EmulatedPeerHandle {
pub fn receive(&self, message: NetworkMessage) {
self.messages_tx.unbounded_send(message).expect("Peer message channel hangup");
}
pub fn send_message(&self, message: VersionedValidationProtocol) {
self.actions_tx
.unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message))
.expect("Peer action channel hangup");
}
pub fn send_request(&self, request: IncomingRequest) {
self.actions_tx
.unbounded_send(NetworkMessage::RequestFromPeer(request))
.expect("Peer action channel hangup");
}
}
struct EmulatedPeer {
spawn_handle: SpawnTaskHandle,
to_node: UnboundedSender<NetworkMessage>,
tx_limiter: RateLimit,
rx_limiter: RateLimit,
latency_ms: usize,
}
impl EmulatedPeer {
pub async fn send_message(&mut self, message: NetworkMessage) {
self.tx_limiter.reap(message.size()).await;
if self.latency_ms == 0 {
self.to_node.unbounded_send(message).expect("Sending to the node never fails");
} else {
let to_node = self.to_node.clone();
let latency_ms = std::time::Duration::from_millis(self.latency_ms as u64);
self.spawn_handle
.spawn("peer-latency-emulator", "test-environment", async move {
tokio::time::sleep(latency_ms).await;
to_node.unbounded_send(message).expect("Sending to the node never fails");
});
}
}
pub fn rx_limiter(&mut self) -> &mut RateLimit {
&mut self.rx_limiter
}
}
#[async_trait::async_trait]
pub trait HandleNetworkMessage {
async fn handle(
&self,
message: NetworkMessage,
node_sender: &mut UnboundedSender<NetworkMessage>,
) -> Option<NetworkMessage>;
}
#[async_trait::async_trait]
impl<T> HandleNetworkMessage for Arc<T>
where
T: HandleNetworkMessage + Sync + Send,
{
async fn handle(
&self,
message: NetworkMessage,
node_sender: &mut UnboundedSender<NetworkMessage>,
) -> Option<NetworkMessage> {
T::handle(self, message, node_sender).await
}
}
async fn emulated_peer_loop(
handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
stats: Arc<PeerEmulatorStats>,
mut emulated_peer: EmulatedPeer,
messages_rx: UnboundedReceiver<NetworkMessage>,
actions_rx: UnboundedReceiver<NetworkMessage>,
mut to_network_interface: UnboundedSender<NetworkMessage>,
) {
let mut proxied_requests = FuturesUnordered::new();
let mut messages_rx = messages_rx.fuse();
let mut actions_rx = actions_rx.fuse();
loop {
futures::select! {
maybe_peer_message = messages_rx.next() => {
if let Some(peer_message) = maybe_peer_message {
let size = peer_message.size();
emulated_peer.rx_limiter().reap(size).await;
stats.inc_received(size);
let mut message = Some(peer_message);
for handler in handlers.iter() {
message = handler.handle(message.unwrap(), &mut to_network_interface).await;
if message.is_none() {
break
}
}
if let Some(message) = message {
panic!("Emulated message from peer {:?} not handled", message.peer());
}
} else {
gum::debug!(target: LOG_TARGET, "Downlink channel closed, peer task exiting");
break
}
},
maybe_action = actions_rx.next() => {
match maybe_action {
Some(NetworkMessage::RequestFromPeer(request)) => {
let (response_sender, response_receiver) = oneshot::channel();
let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender};
proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver});
emulated_peer.send_message(NetworkMessage::RequestFromPeer(new_request)).await;
},
Some(message) => emulated_peer.send_message(message).await,
None => {
gum::debug!(target: LOG_TARGET, "Action channel closed, peer task exiting");
break
}
}
},
proxied_request = proxied_requests.next() => {
if let Some(proxied_request) = proxied_request {
match proxied_request.result {
Ok(result) => {
let bytes = result.encoded_size();
gum::trace!(target: LOG_TARGET, size = bytes, "Peer proxied request completed");
emulated_peer.rx_limiter().reap(bytes).await;
stats.inc_received(bytes);
proxied_request.sender.send(
OutgoingResponse {
reputation_changes: Vec::new(),
result: Ok(result),
sent_feedback: None
}
).expect("network is alive");
}
Err(e) => {
gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e)
}
}
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_peer(
bandwidth: usize,
spawn_task_handle: SpawnTaskHandle,
handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
stats: Arc<PeerEmulatorStats>,
to_network_interface: UnboundedSender<NetworkMessage>,
latency_ms: usize,
peer_id: PeerId,
authority_id: AuthorityDiscoveryId,
) -> EmulatedPeerHandle {
let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
let rx_limiter = RateLimit::new(10, bandwidth);
let tx_limiter = RateLimit::new(10, bandwidth);
let emulated_peer = EmulatedPeer {
spawn_handle: spawn_task_handle.clone(),
rx_limiter,
tx_limiter,
to_node: to_network_interface.clone(),
latency_ms,
};
spawn_task_handle.clone().spawn(
"peer-emulator",
"test-environment",
emulated_peer_loop(
handlers,
stats,
emulated_peer,
messages_rx,
actions_rx,
to_network_interface,
)
.boxed(),
);
EmulatedPeerHandle { messages_tx, actions_tx, peer_id, authority_id }
}
pub struct PeerEmulatorStats {
metrics: Metrics,
peer_index: usize,
}
impl PeerEmulatorStats {
pub(crate) fn new(peer_index: usize, metrics: Metrics) -> Self {
Self { metrics, peer_index }
}
pub fn inc_sent(&self, bytes: usize) {
self.metrics.on_peer_sent(self.peer_index, bytes);
}
pub fn inc_received(&self, bytes: usize) {
self.metrics.on_peer_received(self.peer_index, bytes);
}
pub fn sent(&self) -> usize {
self.metrics
.peer_total_sent
.get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
.expect("Metric exists")
.get() as usize
}
pub fn received(&self) -> usize {
self.metrics
.peer_total_received
.get_metric_with_label_values(&[&format!("node{}", self.peer_index)])
.expect("Metric exists")
.get() as usize
}
}
#[derive(Clone)]
enum Peer {
Connected(EmulatedPeerHandle),
Disconnected(EmulatedPeerHandle),
}
impl Peer {
pub fn disconnect(&mut self) {
let new_self = match self {
Peer::Connected(peer) => Peer::Disconnected(peer.clone()),
_ => return,
};
*self = new_self;
}
pub fn is_connected(&self) -> bool {
matches!(self, Peer::Connected(_))
}
pub fn handle(&self) -> &EmulatedPeerHandle {
match self {
Peer::Connected(ref emulator) => emulator,
Peer::Disconnected(ref emulator) => emulator,
}
}
pub fn authority_id(&self) -> AuthorityDiscoveryId {
match self {
Peer::Connected(handle) | Peer::Disconnected(handle) => handle.authority_id.clone(),
}
}
pub fn peer_id(&self) -> PeerId {
match self {
Peer::Connected(handle) | Peer::Disconnected(handle) => handle.peer_id,
}
}
}
#[derive(Clone)]
pub struct NetworkEmulatorHandle {
peers: Vec<Peer>,
stats: Vec<Arc<PeerEmulatorStats>>,
validator_authority_ids: HashMap<AuthorityDiscoveryId, usize>,
}
impl NetworkEmulatorHandle {
pub fn generate_statement_distribution_peer_view_change(&self, view: View) -> Vec<AllMessages> {
self.peers
.iter()
.filter(|peer| peer.is_connected())
.map(|peer| {
AllMessages::StatementDistribution(
StatementDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(peer.peer_id(), view.clone()),
),
)
})
.collect_vec()
}
pub fn generate_peer_connected<F, T>(&self, mapper: F) -> Vec<AllMessages>
where
F: Fn(NetworkBridgeEvent<T>) -> AllMessages,
{
self.peers
.iter()
.filter(|peer| peer.is_connected())
.map(|peer| {
mapper(NetworkBridgeEvent::PeerConnected(
peer.handle().peer_id,
ObservedRole::Authority,
ValidationVersion::V3.into(),
Some(vec![peer.authority_id()].into_iter().collect()),
))
})
.collect_vec()
}
}
pub fn new_network(
config: &TestConfiguration,
dependencies: &TestEnvironmentDependencies,
authorities: &TestAuthorities,
handlers: Vec<Arc<dyn HandleNetworkMessage + Sync + Send>>,
) -> (NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver) {
let n_peers = config.n_validators;
gum::info!(target: LOG_TARGET, "{}",format!("Initializing emulation for a {} peer network.", n_peers).bright_blue());
gum::info!(target: LOG_TARGET, "{}",format!("connectivity {}%, latency {:?}", config.connectivity, config.latency).bright_black());
let metrics =
Metrics::new(&dependencies.registry).expect("Metrics always register successfully");
let mut validator_authority_id_mapping = HashMap::new();
let (to_network_interface, from_network) = mpsc::unbounded();
let (stats, mut peers): (_, Vec<_>) = (0..n_peers)
.zip(authorities.validator_authority_id.clone())
.map(|(peer_index, authority_id)| {
validator_authority_id_mapping.insert(authority_id.clone(), peer_index);
let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone()));
(
stats.clone(),
Peer::Connected(new_peer(
config.peer_bandwidth,
dependencies.task_manager.spawn_handle(),
handlers.clone(),
stats,
to_network_interface.clone(),
random_latency(config.latency.as_ref()),
*authorities.peer_ids.get(peer_index).unwrap(),
authority_id,
)),
)
})
.unzip();
let connected_count = config.connected_count();
let mut peers_indices = (0..n_peers).collect_vec();
let (_connected, to_disconnect) =
peers_indices.partial_shuffle(&mut thread_rng(), connected_count);
peers[NODE_UNDER_TEST as usize].disconnect();
for peer in to_disconnect.iter().skip(1) {
peers[*peer].disconnect();
}
gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black());
let handle = NetworkEmulatorHandle {
peers,
stats,
validator_authority_ids: validator_authority_id_mapping,
};
let (network_interface, network_interface_receiver) = NetworkInterface::new(
dependencies.task_manager.spawn_handle(),
handle.clone(),
config.bandwidth,
from_network,
);
(handle, network_interface, network_interface_receiver)
}
#[derive(Clone, Debug)]
pub enum EmulatedPeerError {
NotConnected,
}
impl NetworkEmulatorHandle {
pub fn is_peer_connected(&self, peer: &AuthorityDiscoveryId) -> bool {
self.peer(peer).is_connected()
}
pub fn send_message_to_peer(
&self,
peer_id: &AuthorityDiscoveryId,
message: VersionedValidationProtocol,
) {
let peer = self.peer(peer_id);
assert!(peer.is_connected(), "forward message only for connected peers.");
peer.handle().receive(NetworkMessage::MessageFromNode(peer_id.clone(), message));
}
pub fn send_request_to_peer(&self, peer_id: &AuthorityDiscoveryId, request: Requests) {
let peer = self.peer(peer_id);
assert!(peer.is_connected(), "forward request only for connected peers.");
peer.handle().receive(NetworkMessage::RequestFromNode(peer_id.clone(), request));
}
pub fn send_message_from_peer(
&self,
from_peer: &AuthorityDiscoveryId,
message: VersionedValidationProtocol,
) -> Result<(), EmulatedPeerError> {
let dst_peer = self.peer(from_peer);
if !dst_peer.is_connected() {
gum::warn!(target: LOG_TARGET, "Attempted to send message from a peer not connected to our node, operation ignored");
return Err(EmulatedPeerError::NotConnected)
}
dst_peer.handle().send_message(message);
Ok(())
}
pub fn send_request_from_peer(
&self,
from_peer: &AuthorityDiscoveryId,
request: IncomingRequest,
) -> Result<(), EmulatedPeerError> {
let dst_peer = self.peer(from_peer);
if !dst_peer.is_connected() {
gum::warn!(target: LOG_TARGET, "Attempted to send request from a peer not connected to our node, operation ignored");
return Err(EmulatedPeerError::NotConnected)
}
dst_peer.handle().send_request(request);
Ok(())
}
pub fn peer_stats(&self, peer_index: usize) -> Arc<PeerEmulatorStats> {
self.stats[peer_index].clone()
}
fn peer_index(&self, peer: &AuthorityDiscoveryId) -> usize {
*self
.validator_authority_ids
.get(peer)
.expect("all test authorities are valid; qed")
}
fn peer(&self, peer: &AuthorityDiscoveryId) -> &Peer {
&self.peers[self.peer_index(peer)]
}
pub fn inc_sent(&self, bytes: usize) {
self.peer_stats(0).inc_sent(bytes);
}
pub fn inc_received(&self, bytes: usize) {
self.peer_stats(0).inc_received(bytes);
}
}
#[derive(Clone)]
pub(crate) struct Metrics {
peer_total_sent: CounterVec<U64>,
peer_total_received: CounterVec<U64>,
}
impl Metrics {
pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
peer_total_sent: prometheus::register(
CounterVec::new(
Opts::new(
"subsystem_benchmark_network_peer_total_bytes_sent",
"Total number of bytes a peer has sent.",
),
&["peer"],
)?,
registry,
)?,
peer_total_received: prometheus::register(
CounterVec::new(
Opts::new(
"subsystem_benchmark_network_peer_total_bytes_received",
"Total number of bytes a peer has received.",
),
&["peer"],
)?,
registry,
)?,
})
}
pub fn on_peer_sent(&self, peer_index: usize, bytes: usize) {
self.peer_total_sent
.with_label_values(vec![format!("node{}", peer_index).as_str()].as_slice())
.inc_by(bytes as u64);
}
pub fn on_peer_received(&self, peer_index: usize, bytes: usize) {
self.peer_total_received
.with_label_values(vec![format!("node{}", peer_index).as_str()].as_slice())
.inc_by(bytes as u64);
}
}
pub trait RequestExt {
fn authority_id(&self) -> Option<&AuthorityDiscoveryId>;
fn peer_id(&self) -> Option<&PeerId>;
fn into_response_sender(self) -> ResponseSender;
fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender;
fn size(&self) -> usize;
}
impl RequestExt for Requests {
fn authority_id(&self) -> Option<&AuthorityDiscoveryId> {
match self {
Requests::ChunkFetching(request) => {
if let Recipient::Authority(authority_id) = &request.peer {
Some(authority_id)
} else {
None
}
},
Requests::AvailableDataFetchingV1(request) => {
if let Recipient::Authority(authority_id) = &request.peer {
Some(authority_id)
} else {
None
}
},
Requests::AttestedCandidateV2(_) => None,
request => {
unimplemented!("RequestAuthority not implemented for {:?}", request)
},
}
}
fn peer_id(&self) -> Option<&PeerId> {
match self {
Requests::AttestedCandidateV2(request) => match &request.peer {
Recipient::Authority(_) => None,
Recipient::Peer(peer_id) => Some(peer_id),
},
request => {
unimplemented!("peer_id() is not implemented for {:?}", request)
},
}
}
fn into_response_sender(self) -> ResponseSender {
match self {
Requests::ChunkFetching(outgoing_request) => outgoing_request.pending_response,
Requests::AvailableDataFetchingV1(outgoing_request) =>
outgoing_request.pending_response,
_ => unimplemented!("unsupported request type"),
}
}
fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender {
match self {
Requests::ChunkFetching(outgoing_request) =>
std::mem::replace(&mut outgoing_request.pending_response, new_sender),
Requests::AvailableDataFetchingV1(outgoing_request) =>
std::mem::replace(&mut outgoing_request.pending_response, new_sender),
Requests::AttestedCandidateV2(outgoing_request) =>
std::mem::replace(&mut outgoing_request.pending_response, new_sender),
_ => unimplemented!("unsupported request type"),
}
}
fn size(&self) -> usize {
match self {
Requests::ChunkFetching(outgoing_request) => outgoing_request.payload.encoded_size(),
Requests::AvailableDataFetchingV1(outgoing_request) =>
outgoing_request.payload.encoded_size(),
Requests::AttestedCandidateV2(outgoing_request) =>
outgoing_request.payload.encoded_size(),
_ => unimplemented!("received an unexpected request"),
}
}
}
#[cfg(test)]
mod tests {
use super::RateLimit;
use std::time::Instant;
#[tokio::test]
async fn test_expected_rate() {
let tick_rate = 200;
let budget = 1_000_000;
let mut rate_limiter = RateLimit::new(tick_rate, budget);
let mut total_sent = 0usize;
let start = Instant::now();
let mut reap_amount = 0;
while rate_limiter.total_ticks < tick_rate {
reap_amount += 1;
reap_amount %= 100;
rate_limiter.reap(reap_amount).await;
total_sent += reap_amount;
}
let end = Instant::now();
println!("duration: {}", (end - start).as_millis());
let lower_bound = budget as u128 * ((end - start).as_millis() / 1000u128);
let upper_bound = budget as u128 *
((end - start).as_millis() / 1000u128 + rate_limiter.max_refill as u128);
assert!(total_sent as u128 >= lower_bound);
assert!(total_sent as u128 <= upper_bound);
}
}