1use crate::{
18 handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline,
19 SourceClient, TargetClient,
20};
21
22use crate::block_checker::BlockChecker;
23use finality_relay::{FinalityProofsBuf, FinalityProofsStream};
24use futures::{select_biased, FutureExt};
25use num_traits::Saturating;
26use relay_utils::{metrics::MetricsParams, FailedClient};
27use std::{future::Future, time::Duration};
28
29struct EquivocationDetectionLoop<
31 P: EquivocationDetectionPipeline,
32 SC: SourceClient<P>,
33 TC: TargetClient<P>,
34> {
35 source_client: SC,
36 target_client: TC,
37
38 from_block_num: Option<P::TargetNumber>,
39 until_block_num: Option<P::TargetNumber>,
40
41 reporter: EquivocationsReporter<'static, P, SC>,
42
43 finality_proofs_stream: FinalityProofsStream<P, SC>,
44 finality_proofs_buf: FinalityProofsBuf<P>,
45}
46
47impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
48 EquivocationDetectionLoop<P, SC, TC>
49{
50 async fn ensure_finality_proofs_stream(&mut self) {
51 match self.finality_proofs_stream.ensure_stream(&self.source_client).await {
52 Ok(_) => {},
53 Err(e) => {
54 log::error!(
55 target: "bridge",
56 "Could not connect to the {} `FinalityProofsStream`: {e:?}",
57 P::SOURCE_NAME,
58 );
59
60 handle_client_error(&mut self.source_client, e).await;
62 },
63 }
64 }
65
66 async fn best_finalized_target_block_number(&mut self) -> Option<P::TargetNumber> {
67 match self.target_client.best_finalized_header_number().await {
68 Ok(block_num) => Some(block_num),
69 Err(e) => {
70 log::error!(
71 target: "bridge",
72 "Could not read best finalized header number from {}: {e:?}",
73 P::TARGET_NAME,
74 );
75
76 handle_client_error(&mut self.target_client, e).await;
78
79 None
80 },
81 }
82 }
83
84 async fn do_run(&mut self, tick: Duration, exit_signal: impl Future<Output = ()>) {
85 let exit_signal = exit_signal.fuse();
86 futures::pin_mut!(exit_signal);
87
88 loop {
89 self.ensure_finality_proofs_stream().await;
91 self.reporter.process_pending_reports().await;
93
94 if let Some(block_number) = self.best_finalized_target_block_number().await {
96 self.from_block_num.get_or_insert(block_number);
97 self.until_block_num = Some(block_number);
98 }
99 let (from, until) = match (self.from_block_num, self.until_block_num) {
100 (Some(from), Some(until)) => (from, until),
101 _ => continue,
102 };
103
104 let mut current_block_number = from;
106 while current_block_number <= until {
107 self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
108 let block_checker = BlockChecker::new(current_block_number);
109 let _ = block_checker
110 .run(
111 &mut self.source_client,
112 &mut self.target_client,
113 &mut self.finality_proofs_buf,
114 &mut self.reporter,
115 )
116 .await;
117 current_block_number = current_block_number.saturating_add(1.into());
118 }
119 self.from_block_num = Some(current_block_number);
120
121 select_biased! {
122 _ = exit_signal => return,
123 _ = async_std::task::sleep(tick).fuse() => {},
124 }
125 }
126 }
127
128 pub async fn run(
129 source_client: SC,
130 target_client: TC,
131 tick: Duration,
132 exit_signal: impl Future<Output = ()>,
133 ) -> Result<(), FailedClient> {
134 let mut equivocation_detection_loop = Self {
135 source_client,
136 target_client,
137 from_block_num: None,
138 until_block_num: None,
139 reporter: EquivocationsReporter::<P, SC>::new(),
140 finality_proofs_stream: FinalityProofsStream::new(),
141 finality_proofs_buf: FinalityProofsBuf::new(vec![]),
142 };
143
144 equivocation_detection_loop.do_run(tick, exit_signal).await;
145 Ok(())
146 }
147}
148
149pub async fn run<P: EquivocationDetectionPipeline>(
151 source_client: impl SourceClient<P>,
152 target_client: impl TargetClient<P>,
153 tick: Duration,
154 metrics_params: MetricsParams,
155 exit_signal: impl Future<Output = ()> + 'static + Send,
156) -> Result<(), relay_utils::Error> {
157 let exit_signal = exit_signal.shared();
158 relay_utils::relay_loop(source_client, target_client)
159 .with_metrics(metrics_params)
160 .expose()
161 .await?
162 .run(
163 format!("{}_to_{}_EquivocationDetection", P::SOURCE_NAME, P::TARGET_NAME),
164 move |source_client, target_client, _metrics| {
165 EquivocationDetectionLoop::run(
166 source_client,
167 target_client,
168 tick,
169 exit_signal.clone(),
170 )
171 },
172 )
173 .await
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::mock::*;
180 use futures::{channel::mpsc::UnboundedSender, StreamExt};
181 use std::{
182 collections::{HashMap, VecDeque},
183 sync::{Arc, Mutex},
184 };
185
186 fn best_finalized_header_number(
187 best_finalized_headers: &Mutex<VecDeque<Result<TestTargetNumber, TestClientError>>>,
188 exit_sender: &UnboundedSender<()>,
189 ) -> Result<TestTargetNumber, TestClientError> {
190 let mut best_finalized_headers = best_finalized_headers.lock().unwrap();
191 let result = best_finalized_headers.pop_front().unwrap();
192 if best_finalized_headers.is_empty() {
193 exit_sender.unbounded_send(()).unwrap();
194 }
195 result
196 }
197
198 #[async_std::test]
199 async fn multiple_blocks_are_checked_correctly() {
200 let best_finalized_headers = Arc::new(Mutex::new(VecDeque::from([Ok(10), Ok(12), Ok(13)])));
201 let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
202
203 let source_client = TestSourceClient {
204 finality_proofs: Arc::new(Mutex::new(vec![
205 TestFinalityProof(2, vec!["2-1"]),
206 TestFinalityProof(3, vec!["3-1", "3-2"]),
207 TestFinalityProof(4, vec!["4-1"]),
208 TestFinalityProof(5, vec!["5-1"]),
209 TestFinalityProof(6, vec!["6-1", "6-2"]),
210 TestFinalityProof(7, vec!["7-1", "7-2"]),
211 ])),
212 ..Default::default()
213 };
214 let reported_equivocations = source_client.reported_equivocations.clone();
215 let target_client = TestTargetClient {
216 best_finalized_header_number: Arc::new(move || {
217 best_finalized_header_number(&best_finalized_headers, &exit_sender)
218 }),
219 best_synced_header_hash: HashMap::from([
220 (9, Ok(Some(1))),
221 (10, Ok(Some(3))),
222 (11, Ok(Some(5))),
223 (12, Ok(Some(6))),
224 ]),
225 finality_verification_context: HashMap::from([
226 (9, Ok(TestFinalityVerificationContext { check_equivocations: true })),
227 (10, Ok(TestFinalityVerificationContext { check_equivocations: true })),
228 (11, Ok(TestFinalityVerificationContext { check_equivocations: false })),
229 (12, Ok(TestFinalityVerificationContext { check_equivocations: true })),
230 ]),
231 synced_headers_finality_info: HashMap::from([
232 (
233 10,
234 Ok(vec![new_header_finality_info(2, None), new_header_finality_info(3, None)]),
235 ),
236 (
237 11,
238 Ok(vec![
239 new_header_finality_info(4, None),
240 new_header_finality_info(5, Some(false)),
241 ]),
242 ),
243 (12, Ok(vec![new_header_finality_info(6, None)])),
244 (13, Ok(vec![new_header_finality_info(7, None)])),
245 ]),
246 ..Default::default()
247 };
248
249 assert!(run::<TestEquivocationDetectionPipeline>(
250 source_client,
251 target_client,
252 Duration::from_secs(0),
253 MetricsParams { address: None, registry: Default::default() },
254 exit_receiver.into_future().map(|(_, _)| ()),
255 )
256 .await
257 .is_ok());
258 assert_eq!(
259 *reported_equivocations.lock().unwrap(),
260 HashMap::from([
261 (1, vec!["2-1", "3-1", "3-2"]),
262 (3, vec!["4-1", "5-1"]),
263 (6, vec!["7-1", "7-2"])
264 ])
265 );
266 }
267
268 #[async_std::test]
269 async fn blocks_following_error_are_checked_correctly() {
270 let best_finalized_headers = Mutex::new(VecDeque::from([Ok(10), Ok(11)]));
271 let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
272
273 let source_client = TestSourceClient {
274 finality_proofs: Arc::new(Mutex::new(vec![
275 TestFinalityProof(2, vec!["2-1"]),
276 TestFinalityProof(3, vec!["3-1"]),
277 ])),
278 ..Default::default()
279 };
280 let reported_equivocations = source_client.reported_equivocations.clone();
281 let target_client = TestTargetClient {
282 best_finalized_header_number: Arc::new(move || {
283 best_finalized_header_number(&best_finalized_headers, &exit_sender)
284 }),
285 best_synced_header_hash: HashMap::from([(9, Ok(Some(1))), (10, Ok(Some(2)))]),
286 finality_verification_context: HashMap::from([
287 (9, Ok(TestFinalityVerificationContext { check_equivocations: true })),
288 (10, Ok(TestFinalityVerificationContext { check_equivocations: true })),
289 ]),
290 synced_headers_finality_info: HashMap::from([
291 (10, Err(TestClientError::NonConnection)),
292 (11, Ok(vec![new_header_finality_info(3, None)])),
293 ]),
294 ..Default::default()
295 };
296
297 assert!(run::<TestEquivocationDetectionPipeline>(
298 source_client,
299 target_client,
300 Duration::from_secs(0),
301 MetricsParams { address: None, registry: Default::default() },
302 exit_receiver.into_future().map(|(_, _)| ()),
303 )
304 .await
305 .is_ok());
306 assert_eq!(*reported_equivocations.lock().unwrap(), HashMap::from([(2, vec!["3-1"]),]));
307 }
308}