malus/variants/
spam_statement_requests.rs1#![allow(missing_docs)]
26
27use polkadot_cli::{
28 service::{
29 AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
30 OverseerGenArgs, OverseerHandle,
31 },
32 validator_overseer_builder, Cli,
33};
34use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
35use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
36use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
37use sp_core::traits::SpawnNamed;
38
39use crate::{interceptor::*, shared::MALUS};
41
42use std::sync::Arc;
43
44#[derive(Clone)]
46struct RequestSpammer {
47 spam_factor: u32, }
49
50impl<Sender> MessageInterceptor<Sender> for RequestSpammer
51where
52 Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
53{
54 type Message = NetworkBridgeTxMessage;
55
56 fn intercept_incoming(
59 &self,
60 _subsystem_sender: &mut Sender,
61 msg: FromOrchestra<Self::Message>,
62 ) -> Option<FromOrchestra<Self::Message>> {
63 match msg {
64 FromOrchestra::Communication {
65 msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
66 } => {
67 let mut new_requests = Vec::new();
68
69 for request in requests {
70 match request {
71 Requests::AttestedCandidateV2(ref req) => {
72 let peer_to_duplicate = req.peer.clone();
74 let payload_to_duplicate = req.payload.clone();
75 new_requests.push(request);
77
78 gum::info!(
80 target: MALUS,
81 "๐ Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
82 );
83 new_requests.extend((0..self.spam_factor - 1).map(|_| {
84 let (new_outgoing_request, _) = OutgoingRequest::new(
85 peer_to_duplicate.clone(),
86 payload_to_duplicate.clone(),
87 );
88 Requests::AttestedCandidateV2(new_outgoing_request)
89 }));
90 },
91 _ => {
92 new_requests.push(request);
93 },
94 }
95 }
96
97 Some(FromOrchestra::Communication {
99 msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
100 })
101 },
102 FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
103 FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
104 }
105 }
106}
107
108#[derive(Debug, clap::Parser)]
111#[clap(rename_all = "kebab-case")]
112#[allow(missing_docs)]
113pub struct SpamStatementRequestsOptions {
114 #[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
116 pub spam_factor: u32,
117
118 #[clap(flatten)]
119 pub cli: Cli,
120}
121
122pub(crate) struct SpamStatementRequests {
124 pub spam_factor: u32,
126}
127
128impl OverseerGen for SpamStatementRequests {
129 fn generate<Spawner, RuntimeClient>(
130 &self,
131 connector: OverseerConnector,
132 args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
133 ext_args: Option<ExtendedOverseerGenArgs>,
134 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
135 where
136 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
137 Spawner: 'static + SpawnNamed + Clone + Unpin,
138 {
139 gum::info!(
140 target: MALUS,
141 "๐ Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
142 &self.spam_factor,
143 );
144
145 let request_spammer = RequestSpammer { spam_factor: self.spam_factor };
146
147 validator_overseer_builder(
148 args,
149 ext_args.expect("Extended arguments required to build validator overseer are provided"),
150 )?
151 .replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
152 .build_with_connector(connector)
153 .map_err(|e| e.into())
154 }
155}