referrerpolicy=no-referrer-when-downgrade

equivocation_detector/
equivocation_loop.rs

1// Copyright 2019-2023 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline,
19	SourceClient, TargetClient,
20};
21
22use crate::block_checker::BlockChecker;
23use finality_relay::{FinalityProofsBuf, FinalityProofsStream};
24use futures::{select_biased, FutureExt};
25use num_traits::Saturating;
26use relay_utils::{metrics::MetricsParams, FailedClient};
27use std::{future::Future, time::Duration};
28
29/// Equivocations detection loop state.
30struct EquivocationDetectionLoop<
31	P: EquivocationDetectionPipeline,
32	SC: SourceClient<P>,
33	TC: TargetClient<P>,
34> {
35	source_client: SC,
36	target_client: TC,
37
38	from_block_num: Option<P::TargetNumber>,
39	until_block_num: Option<P::TargetNumber>,
40
41	reporter: EquivocationsReporter<'static, P, SC>,
42
43	finality_proofs_stream: FinalityProofsStream<P, SC>,
44	finality_proofs_buf: FinalityProofsBuf<P>,
45}
46
47impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
48	EquivocationDetectionLoop<P, SC, TC>
49{
50	async fn ensure_finality_proofs_stream(&mut self) {
51		match self.finality_proofs_stream.ensure_stream(&self.source_client).await {
52			Ok(_) => {},
53			Err(e) => {
54				log::error!(
55					target: "bridge",
56					"Could not connect to the {} `FinalityProofsStream`: {e:?}",
57					P::SOURCE_NAME,
58				);
59
60				// Reconnect to the source client if needed
61				handle_client_error(&mut self.source_client, e).await;
62			},
63		}
64	}
65
66	async fn best_finalized_target_block_number(&mut self) -> Option<P::TargetNumber> {
67		match self.target_client.best_finalized_header_number().await {
68			Ok(block_num) => Some(block_num),
69			Err(e) => {
70				log::error!(
71					target: "bridge",
72					"Could not read best finalized header number from {}: {e:?}",
73					P::TARGET_NAME,
74				);
75
76				// Reconnect target client and move on
77				handle_client_error(&mut self.target_client, e).await;
78
79				None
80			},
81		}
82	}
83
84	async fn do_run(&mut self, tick: Duration, exit_signal: impl Future<Output = ()>) {
85		let exit_signal = exit_signal.fuse();
86		futures::pin_mut!(exit_signal);
87
88		loop {
89			// Make sure that we are connected to the source finality proofs stream.
90			self.ensure_finality_proofs_stream().await;
91			// Check the status of the pending equivocation reports
92			self.reporter.process_pending_reports().await;
93
94			// Update blocks range.
95			if let Some(block_number) = self.best_finalized_target_block_number().await {
96				self.from_block_num.get_or_insert(block_number);
97				self.until_block_num = Some(block_number);
98			}
99			let (from, until) = match (self.from_block_num, self.until_block_num) {
100				(Some(from), Some(until)) => (from, until),
101				_ => continue,
102			};
103
104			// Check the available blocks
105			let mut current_block_number = from;
106			while current_block_number <= until {
107				self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
108				let block_checker = BlockChecker::new(current_block_number);
109				let _ = block_checker
110					.run(
111						&mut self.source_client,
112						&mut self.target_client,
113						&mut self.finality_proofs_buf,
114						&mut self.reporter,
115					)
116					.await;
117				current_block_number = current_block_number.saturating_add(1.into());
118			}
119			self.from_block_num = Some(current_block_number);
120
121			select_biased! {
122				_ = exit_signal => return,
123				_ = async_std::task::sleep(tick).fuse() => {},
124			}
125		}
126	}
127
128	pub async fn run(
129		source_client: SC,
130		target_client: TC,
131		tick: Duration,
132		exit_signal: impl Future<Output = ()>,
133	) -> Result<(), FailedClient> {
134		let mut equivocation_detection_loop = Self {
135			source_client,
136			target_client,
137			from_block_num: None,
138			until_block_num: None,
139			reporter: EquivocationsReporter::<P, SC>::new(),
140			finality_proofs_stream: FinalityProofsStream::new(),
141			finality_proofs_buf: FinalityProofsBuf::new(vec![]),
142		};
143
144		equivocation_detection_loop.do_run(tick, exit_signal).await;
145		Ok(())
146	}
147}
148
149/// Spawn the equivocations detection loop.
150pub async fn run<P: EquivocationDetectionPipeline>(
151	source_client: impl SourceClient<P>,
152	target_client: impl TargetClient<P>,
153	tick: Duration,
154	metrics_params: MetricsParams,
155	exit_signal: impl Future<Output = ()> + 'static + Send,
156) -> Result<(), relay_utils::Error> {
157	let exit_signal = exit_signal.shared();
158	relay_utils::relay_loop(source_client, target_client)
159		.with_metrics(metrics_params)
160		.expose()
161		.await?
162		.run(
163			format!("{}_to_{}_EquivocationDetection", P::SOURCE_NAME, P::TARGET_NAME),
164			move |source_client, target_client, _metrics| {
165				EquivocationDetectionLoop::run(
166					source_client,
167					target_client,
168					tick,
169					exit_signal.clone(),
170				)
171			},
172		)
173		.await
174}
175
176#[cfg(test)]
177mod tests {
178	use super::*;
179	use crate::mock::*;
180	use futures::{channel::mpsc::UnboundedSender, StreamExt};
181	use std::{
182		collections::{HashMap, VecDeque},
183		sync::{Arc, Mutex},
184	};
185
186	fn best_finalized_header_number(
187		best_finalized_headers: &Mutex<VecDeque<Result<TestTargetNumber, TestClientError>>>,
188		exit_sender: &UnboundedSender<()>,
189	) -> Result<TestTargetNumber, TestClientError> {
190		let mut best_finalized_headers = best_finalized_headers.lock().unwrap();
191		let result = best_finalized_headers.pop_front().unwrap();
192		if best_finalized_headers.is_empty() {
193			exit_sender.unbounded_send(()).unwrap();
194		}
195		result
196	}
197
198	#[async_std::test]
199	async fn multiple_blocks_are_checked_correctly() {
200		let best_finalized_headers = Arc::new(Mutex::new(VecDeque::from([Ok(10), Ok(12), Ok(13)])));
201		let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
202
203		let source_client = TestSourceClient {
204			finality_proofs: Arc::new(Mutex::new(vec![
205				TestFinalityProof(2, vec!["2-1"]),
206				TestFinalityProof(3, vec!["3-1", "3-2"]),
207				TestFinalityProof(4, vec!["4-1"]),
208				TestFinalityProof(5, vec!["5-1"]),
209				TestFinalityProof(6, vec!["6-1", "6-2"]),
210				TestFinalityProof(7, vec!["7-1", "7-2"]),
211			])),
212			..Default::default()
213		};
214		let reported_equivocations = source_client.reported_equivocations.clone();
215		let target_client = TestTargetClient {
216			best_finalized_header_number: Arc::new(move || {
217				best_finalized_header_number(&best_finalized_headers, &exit_sender)
218			}),
219			best_synced_header_hash: HashMap::from([
220				(9, Ok(Some(1))),
221				(10, Ok(Some(3))),
222				(11, Ok(Some(5))),
223				(12, Ok(Some(6))),
224			]),
225			finality_verification_context: HashMap::from([
226				(9, Ok(TestFinalityVerificationContext { check_equivocations: true })),
227				(10, Ok(TestFinalityVerificationContext { check_equivocations: true })),
228				(11, Ok(TestFinalityVerificationContext { check_equivocations: false })),
229				(12, Ok(TestFinalityVerificationContext { check_equivocations: true })),
230			]),
231			synced_headers_finality_info: HashMap::from([
232				(
233					10,
234					Ok(vec![new_header_finality_info(2, None), new_header_finality_info(3, None)]),
235				),
236				(
237					11,
238					Ok(vec![
239						new_header_finality_info(4, None),
240						new_header_finality_info(5, Some(false)),
241					]),
242				),
243				(12, Ok(vec![new_header_finality_info(6, None)])),
244				(13, Ok(vec![new_header_finality_info(7, None)])),
245			]),
246			..Default::default()
247		};
248
249		assert!(run::<TestEquivocationDetectionPipeline>(
250			source_client,
251			target_client,
252			Duration::from_secs(0),
253			MetricsParams { address: None, registry: Default::default() },
254			exit_receiver.into_future().map(|(_, _)| ()),
255		)
256		.await
257		.is_ok());
258		assert_eq!(
259			*reported_equivocations.lock().unwrap(),
260			HashMap::from([
261				(1, vec!["2-1", "3-1", "3-2"]),
262				(3, vec!["4-1", "5-1"]),
263				(6, vec!["7-1", "7-2"])
264			])
265		);
266	}
267
268	#[async_std::test]
269	async fn blocks_following_error_are_checked_correctly() {
270		let best_finalized_headers = Mutex::new(VecDeque::from([Ok(10), Ok(11)]));
271		let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
272
273		let source_client = TestSourceClient {
274			finality_proofs: Arc::new(Mutex::new(vec![
275				TestFinalityProof(2, vec!["2-1"]),
276				TestFinalityProof(3, vec!["3-1"]),
277			])),
278			..Default::default()
279		};
280		let reported_equivocations = source_client.reported_equivocations.clone();
281		let target_client = TestTargetClient {
282			best_finalized_header_number: Arc::new(move || {
283				best_finalized_header_number(&best_finalized_headers, &exit_sender)
284			}),
285			best_synced_header_hash: HashMap::from([(9, Ok(Some(1))), (10, Ok(Some(2)))]),
286			finality_verification_context: HashMap::from([
287				(9, Ok(TestFinalityVerificationContext { check_equivocations: true })),
288				(10, Ok(TestFinalityVerificationContext { check_equivocations: true })),
289			]),
290			synced_headers_finality_info: HashMap::from([
291				(10, Err(TestClientError::NonConnection)),
292				(11, Ok(vec![new_header_finality_info(3, None)])),
293			]),
294			..Default::default()
295		};
296
297		assert!(run::<TestEquivocationDetectionPipeline>(
298			source_client,
299			target_client,
300			Duration::from_secs(0),
301			MetricsParams { address: None, registry: Default::default() },
302			exit_receiver.into_future().map(|(_, _)| ()),
303		)
304		.await
305		.is_ok());
306		assert_eq!(*reported_equivocations.lock().unwrap(), HashMap::from([(2, vec!["3-1"]),]));
307	}
308}