#![allow(missing_docs)]
use polkadot_cli::{
service::{
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
use sp_core::traits::SpawnNamed;
use crate::{interceptor::*, shared::MALUS};
use std::sync::Arc;
#[derive(Clone)]
struct RequestSpammer {
spam_factor: u32, }
impl<Sender> MessageInterceptor<Sender> for RequestSpammer
where
Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
{
type Message = NetworkBridgeTxMessage;
fn intercept_incoming(
&self,
_subsystem_sender: &mut Sender,
msg: FromOrchestra<Self::Message>,
) -> Option<FromOrchestra<Self::Message>> {
match msg {
FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
} => {
let mut new_requests = Vec::new();
for request in requests {
match request {
Requests::AttestedCandidateV2(ref req) => {
let peer_to_duplicate = req.peer.clone();
let payload_to_duplicate = req.payload.clone();
new_requests.push(request);
gum::info!(
target: MALUS,
"๐ Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
);
new_requests.extend((0..self.spam_factor - 1).map(|_| {
let (new_outgoing_request, _) = OutgoingRequest::new(
peer_to_duplicate.clone(),
payload_to_duplicate.clone(),
);
Requests::AttestedCandidateV2(new_outgoing_request)
}));
},
_ => {
new_requests.push(request);
},
}
}
Some(FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
})
},
FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
}
}
}
#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
pub struct SpamStatementRequestsOptions {
#[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
pub spam_factor: u32,
#[clap(flatten)]
pub cli: Cli,
}
pub(crate) struct SpamStatementRequests {
pub spam_factor: u32,
}
impl OverseerGen for SpamStatementRequests {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
target: MALUS,
"๐ Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
&self.spam_factor,
);
let request_spammer = RequestSpammer { spam_factor: self.spam_factor };
validator_overseer_builder(
args,
ext_args.expect("Extended arguments required to build validator overseer are provided"),
)?
.replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
.build_with_connector(connector)
.map_err(|e| e.into())
}
}