#![allow(missing_docs)]
use futures::channel::oneshot;
use polkadot_cli::{
service::{
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_types::{ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient};
use polkadot_node_subsystem_util::request_candidate_events;
use polkadot_primitives::vstaging::CandidateEvent;
use sp_core::traits::SpawnNamed;
use crate::{interceptor::*, shared::MALUS};
use std::sync::Arc;
#[derive(Clone)]
struct AncestorDisputer<Spawner> {
spawner: Spawner, dispute_offset: u32, }
impl<Sender, Spawner> MessageInterceptor<Sender> for AncestorDisputer<Spawner>
where
Sender: overseer::ApprovalVotingSenderTrait + Clone + Send + 'static,
Spawner: overseer::gen::Spawner + Clone + 'static,
{
type Message = ApprovalVotingMessage;
fn intercept_incoming(
&self,
subsystem_sender: &mut Sender,
msg: FromOrchestra<Self::Message>,
) -> Option<FromOrchestra<Self::Message>> {
match msg {
FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
FromOrchestra::Signal(OverseerSignal::BlockFinalized(
finalized_hash,
finalized_height,
)) => {
gum::debug!(
target: MALUS,
"๐ Block Finalization Interception! Block: {:?}", finalized_hash,
);
if finalized_height <= self.dispute_offset {
return Some(FromOrchestra::Signal(OverseerSignal::BlockFinalized(
finalized_hash,
finalized_height,
)))
}
let dispute_offset = self.dispute_offset;
let mut sender = subsystem_sender.clone();
self.spawner.spawn_blocking(
"malus-dispute-finalized-block",
Some("malus"),
Box::pin(async move {
let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::FinalizedBlockHash(
finalized_height - dispute_offset,
tx,
))
.await;
let disputable_hash = match rx.await {
Ok(Ok(Some(hash))) => {
gum::debug!(
target: MALUS,
"๐ Time to search {:?}`th ancestor! Block: {:?}", dispute_offset, hash,
);
hash
},
_ => {
gum::debug!(
target: MALUS,
"๐ Seems the target is not yet finalized! Nothing to dispute."
);
return },
};
let events =
request_candidate_events(disputable_hash, &mut sender).await.await;
let events = match events {
Ok(Ok(events)) => events,
Ok(Err(e)) => {
gum::error!(
target: MALUS,
"๐ Failed to fetch candidate events: {:?}", e
);
return },
Err(e) => {
gum::error!(
target: MALUS,
"๐ Failed to fetch candidate events: {:?}", e
);
return },
};
let event = events.iter().find(|event| {
matches!(event, CandidateEvent::CandidateIncluded(_, _, _, _))
});
let candidate = match event {
Some(CandidateEvent::CandidateIncluded(candidate, _, _, _)) =>
candidate,
_ => {
gum::error!(
target: MALUS,
"๐ No candidate included event found! Nothing to dispute."
);
return },
};
let candidate_hash = candidate.hash();
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(
disputable_hash,
RuntimeApiRequest::SessionIndexForChild(tx),
))
.await;
let session_index = match rx.await {
Ok(Ok(session_index)) => session_index,
_ => {
gum::error!(
target: MALUS,
"๐ Failed to fetch session index for candidate."
);
return },
};
gum::info!(
target: MALUS,
"๐ Disputing candidate with hash: {:?} in session {:?}", candidate_hash, session_index,
);
sender.send_unbounded_message(
DisputeCoordinatorMessage::IssueLocalStatement(
session_index,
candidate_hash,
candidate.clone(),
false, ),
);
}),
);
Some(FromOrchestra::Signal(OverseerSignal::BlockFinalized(
finalized_hash,
finalized_height,
)))
},
FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
}
}
}
#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
pub struct DisputeFinalizedCandidatesOptions {
#[clap(long, ignore_case = true, default_value_t = 2, value_parser = clap::value_parser!(u32).range(0..=50))]
pub dispute_offset: u32,
#[clap(flatten)]
pub cli: Cli,
}
pub(crate) struct DisputeFinalizedCandidates {
pub dispute_offset: u32,
}
impl OverseerGen for DisputeFinalizedCandidates {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
target: MALUS,
"๐ Started Malus node that disputes finalized blocks after they are {:?} finalizations deep.",
&self.dispute_offset,
);
let ancestor_disputer = AncestorDisputer {
spawner: SpawnGlue(args.spawner.clone()),
dispute_offset: self.dispute_offset,
};
validator_overseer_builder(
args,
ext_args.expect("Extended arguments required to build validator overseer are provided"),
)?
.replace_approval_voting(move |cb| InterceptedSubsystem::new(cb, ancestor_disputer))
.build_with_connector(connector)
.map_err(|e| e.into())
}
}