1#![allow(missing_docs)]
33
34use futures::channel::oneshot;
35use polkadot_cli::{
36 service::{
37 AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
38 OverseerGenArgs, OverseerHandle,
39 },
40 validator_overseer_builder, Cli,
41};
42use polkadot_node_subsystem::SpawnGlue;
43use polkadot_node_subsystem_types::{ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient};
44use polkadot_node_subsystem_util::request_candidate_events;
45use polkadot_primitives::CandidateEvent;
46use sp_core::traits::SpawnNamed;
47
48use crate::{interceptor::*, shared::MALUS};
50
51use std::sync::Arc;
52
53#[derive(Clone)]
56struct AncestorDisputer<Spawner> {
57 spawner: Spawner, dispute_offset: u32, }
61
62impl<Sender, Spawner> MessageInterceptor<Sender> for AncestorDisputer<Spawner>
63where
64 Sender: overseer::ApprovalVotingSenderTrait + Clone + Send + 'static,
65 Spawner: overseer::gen::Spawner + Clone + 'static,
66{
67 type Message = ApprovalVotingMessage;
68
69 fn intercept_incoming(
71 &self,
72 subsystem_sender: &mut Sender,
73 msg: FromOrchestra<Self::Message>,
74 ) -> Option<FromOrchestra<Self::Message>> {
75 match msg {
76 FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
77 FromOrchestra::Signal(OverseerSignal::BlockFinalized(
78 finalized_hash,
79 finalized_height,
80 )) => {
81 gum::debug!(
82 target: MALUS,
83 "๐ Block Finalization Interception! Block: {:?}", finalized_hash,
84 );
85
86 if finalized_height <= self.dispute_offset {
88 return Some(FromOrchestra::Signal(OverseerSignal::BlockFinalized(
89 finalized_hash,
90 finalized_height,
91 )))
92 }
93
94 let dispute_offset = self.dispute_offset;
95 let mut sender = subsystem_sender.clone();
96 self.spawner.spawn_blocking(
97 "malus-dispute-finalized-block",
98 Some("malus"),
99 Box::pin(async move {
100 let (tx, rx) = oneshot::channel();
102 sender
103 .send_message(ChainApiMessage::FinalizedBlockHash(
104 finalized_height - dispute_offset,
105 tx,
106 ))
107 .await;
108 let disputable_hash = match rx.await {
109 Ok(Ok(Some(hash))) => {
110 gum::debug!(
111 target: MALUS,
112 "๐ Time to search {:?}`th ancestor! Block: {:?}", dispute_offset, hash,
113 );
114 hash
115 },
116 _ => {
117 gum::debug!(
118 target: MALUS,
119 "๐ Seems the target is not yet finalized! Nothing to dispute."
120 );
121 return },
123 };
124
125 let events =
127 request_candidate_events(disputable_hash, &mut sender).await.await;
128 let events = match events {
129 Ok(Ok(events)) => events,
130 Ok(Err(e)) => {
131 gum::error!(
132 target: MALUS,
133 "๐ Failed to fetch candidate events: {:?}", e
134 );
135 return },
137 Err(e) => {
138 gum::error!(
139 target: MALUS,
140 "๐ Failed to fetch candidate events: {:?}", e
141 );
142 return },
144 };
145
146 let event = events.iter().find(|event| {
148 matches!(event, CandidateEvent::CandidateIncluded(_, _, _, _))
149 });
150 let candidate = match event {
151 Some(CandidateEvent::CandidateIncluded(candidate, _, _, _)) =>
152 candidate,
153 _ => {
154 gum::error!(
155 target: MALUS,
156 "๐ No candidate included event found! Nothing to dispute."
157 );
158 return },
160 };
161
162 let candidate_hash = candidate.hash();
164
165 let (tx, rx) = oneshot::channel();
167 sender
168 .send_message(RuntimeApiMessage::Request(
169 disputable_hash,
170 RuntimeApiRequest::SessionIndexForChild(tx),
171 ))
172 .await;
173 let session_index = match rx.await {
174 Ok(Ok(session_index)) => session_index,
175 _ => {
176 gum::error!(
177 target: MALUS,
178 "๐ Failed to fetch session index for candidate."
179 );
180 return },
182 };
183 gum::info!(
184 target: MALUS,
185 "๐ Disputing candidate with hash: {:?} in session {:?}", candidate_hash, session_index,
186 );
187
188 sender.send_unbounded_message(
190 DisputeCoordinatorMessage::IssueLocalStatement(
191 session_index,
192 candidate_hash,
193 candidate.clone(),
194 false, ),
196 );
197 }),
198 );
199
200 Some(FromOrchestra::Signal(OverseerSignal::BlockFinalized(
202 finalized_hash,
203 finalized_height,
204 )))
205 },
206 FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
207 }
208 }
209}
210
211#[derive(Debug, clap::Parser)]
214#[clap(rename_all = "kebab-case")]
215#[allow(missing_docs)]
216pub struct DisputeFinalizedCandidatesOptions {
217 #[clap(long, ignore_case = true, default_value_t = 2, value_parser = clap::value_parser!(u32).range(0..=50))]
220 pub dispute_offset: u32,
221
222 #[clap(flatten)]
223 pub cli: Cli,
224}
225
226pub(crate) struct DisputeFinalizedCandidates {
228 pub dispute_offset: u32,
231}
232
233impl OverseerGen for DisputeFinalizedCandidates {
234 fn generate<Spawner, RuntimeClient>(
235 &self,
236 connector: OverseerConnector,
237 args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
238 ext_args: Option<ExtendedOverseerGenArgs>,
239 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
240 where
241 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
242 Spawner: 'static + SpawnNamed + Clone + Unpin,
243 {
244 gum::info!(
245 target: MALUS,
246 "๐ Started Malus node that disputes finalized blocks after they are {:?} finalizations deep.",
247 &self.dispute_offset,
248 );
249
250 let ancestor_disputer = AncestorDisputer {
251 spawner: SpawnGlue(args.spawner.clone()),
252 dispute_offset: self.dispute_offset,
253 };
254
255 validator_overseer_builder(
256 args,
257 ext_args.expect("Extended arguments required to build validator overseer are provided"),
258 )?
259 .replace_approval_voting(move |cb| InterceptedSubsystem::new(cb, ancestor_disputer))
260 .build_with_connector(connector)
261 .map_err(|e| e.into())
262 }
263}