equivocation_detector/
reporter.rs1use crate::{EquivocationDetectionPipeline, SourceClient};
20
21use futures::FutureExt;
22use relay_utils::{TrackedTransactionFuture, TrackedTransactionStatus, TransactionTracker};
23use std::{
24 future::poll_fn,
25 task::{Context, Poll},
26};
27
28pub struct EquivocationsReporter<'a, P: EquivocationDetectionPipeline, SC: SourceClient<P>> {
29 pending_reports: Vec<TrackedTransactionFuture<'a, SC::TransactionTracker>>,
30}
31
32impl<'a, P: EquivocationDetectionPipeline, SC: SourceClient<P>> EquivocationsReporter<'a, P, SC> {
33 pub fn new() -> Self {
34 Self { pending_reports: vec![] }
35 }
36
37 pub async fn submit_report(
41 &mut self,
42 source_client: &SC,
43 at: P::Hash,
44 equivocation: P::EquivocationProof,
45 ) -> Result<(), SC::Error> {
46 let pending_report = source_client.report_equivocation(at, equivocation).await?;
47 self.pending_reports.push(pending_report.wait());
48
49 Ok(())
50 }
51
52 fn do_process_pending_reports(&mut self, cx: &mut Context<'_>) -> Poll<()> {
53 self.pending_reports.retain_mut(|pending_report| {
54 match pending_report.poll_unpin(cx) {
55 Poll::Ready(tx_status) => {
56 match tx_status {
57 TrackedTransactionStatus::Lost => {
58 log::error!(target: "bridge", "Equivocation report tx was lost");
59 },
60 TrackedTransactionStatus::Finalized(id) => {
61 log::error!(target: "bridge", "Equivocation report tx was finalized in source block {id:?}");
62 },
63 }
64
65 false
67 },
68 Poll::Pending => {
69 true
71 },
72 }
73 });
74
75 Poll::Ready(())
76 }
77
78 pub async fn process_pending_reports(&mut self) {
81 poll_fn(|cx| self.do_process_pending_reports(cx)).await
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::mock::*;
89 use relay_utils::HeaderId;
90 use std::sync::Mutex;
91
92 #[async_std::test]
93 async fn process_pending_reports_works() {
94 let polled_reports = Mutex::new(vec![]);
95 let finished_reports = Mutex::new(vec![]);
96
97 let mut reporter =
98 EquivocationsReporter::<TestEquivocationDetectionPipeline, TestSourceClient> {
99 pending_reports: vec![
100 Box::pin(async {
101 polled_reports.lock().unwrap().push(1);
102 finished_reports.lock().unwrap().push(1);
103 TrackedTransactionStatus::Finalized(HeaderId(1, 1))
104 }),
105 Box::pin(async {
106 polled_reports.lock().unwrap().push(2);
107 finished_reports.lock().unwrap().push(2);
108 TrackedTransactionStatus::Finalized(HeaderId(2, 2))
109 }),
110 Box::pin(async {
111 polled_reports.lock().unwrap().push(3);
112 std::future::pending::<()>().await;
113 finished_reports.lock().unwrap().push(3);
114 TrackedTransactionStatus::Finalized(HeaderId(3, 3))
115 }),
116 Box::pin(async {
117 polled_reports.lock().unwrap().push(4);
118 finished_reports.lock().unwrap().push(4);
119 TrackedTransactionStatus::Finalized(HeaderId(4, 4))
120 }),
121 ],
122 };
123
124 reporter.process_pending_reports().await;
125 assert_eq!(*polled_reports.lock().unwrap(), vec![1, 2, 3, 4]);
126 assert_eq!(*finished_reports.lock().unwrap(), vec![1, 2, 4]);
127 assert_eq!(reporter.pending_reports.len(), 1);
128 }
129}