referrerpolicy=no-referrer-when-downgrade

finality_relay/
finality_proofs.rs

1// Copyright 2019-2023 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{base::SourceClientBase, FinalityPipeline};
18
19use bp_header_chain::FinalityProof;
20use futures::{FutureExt, Stream, StreamExt};
21use std::pin::Pin;
22
23/// Source finality proofs stream that may be restarted.
24#[derive(Default)]
25pub struct FinalityProofsStream<P: FinalityPipeline, SC: SourceClientBase<P>> {
26	/// The underlying stream.
27	stream: Option<Pin<Box<SC::FinalityProofsStream>>>,
28}
29
30impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
31	pub fn new() -> Self {
32		Self { stream: None }
33	}
34
35	pub fn from_stream(stream: SC::FinalityProofsStream) -> Self {
36		Self { stream: Some(Box::pin(stream)) }
37	}
38
39	fn next(&mut self) -> Option<<SC::FinalityProofsStream as Stream>::Item> {
40		let stream = match &mut self.stream {
41			Some(stream) => stream,
42			None => return None,
43		};
44
45		match stream.next().now_or_never() {
46			Some(Some(finality_proof)) => Some(finality_proof),
47			Some(None) => {
48				self.stream = None;
49				None
50			},
51			None => None,
52		}
53	}
54
55	pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> {
56		if self.stream.is_none() {
57			log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted",
58				P::SOURCE_NAME);
59
60			let stream = source_client.finality_proofs().await.map_err(|error| {
61				log::error!(
62					target: "bridge",
63					"Failed to subscribe to {} justifications: {:?}",
64					P::SOURCE_NAME,
65					error,
66				);
67
68				error
69			})?;
70			self.stream = Some(Box::pin(stream));
71		}
72
73		Ok(())
74	}
75}
76
77/// Source finality proofs buffer.
78pub struct FinalityProofsBuf<P: FinalityPipeline> {
79	/// Proofs buffer. Ordered by target header number.
80	buf: Vec<P::FinalityProof>,
81}
82
83impl<P: FinalityPipeline> FinalityProofsBuf<P> {
84	pub fn new(buf: Vec<P::FinalityProof>) -> Self {
85		Self { buf }
86	}
87
88	pub fn buf(&self) -> &Vec<P::FinalityProof> {
89		&self.buf
90	}
91
92	pub fn fill<SC: SourceClientBase<P>>(&mut self, stream: &mut FinalityProofsStream<P, SC>) {
93		let mut proofs_count = 0;
94		let mut first_header_number = None;
95		let mut last_header_number = None;
96		while let Some(finality_proof) = stream.next() {
97			let target_header_number = finality_proof.target_header_number();
98			first_header_number.get_or_insert(target_header_number);
99			last_header_number = Some(target_header_number);
100			proofs_count += 1;
101
102			self.buf.push(finality_proof);
103		}
104
105		if proofs_count != 0 {
106			log::trace!(
107				target: "bridge",
108				"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
109				proofs_count,
110				P::SOURCE_NAME,
111				first_header_number,
112				last_header_number,
113			);
114		}
115	}
116
117	/// Prune all finality proofs that target header numbers older than `first_to_keep`.
118	pub fn prune(&mut self, first_to_keep: P::Number, maybe_buf_limit: Option<usize>) {
119		let first_to_keep_idx = self
120			.buf
121			.binary_search_by_key(&first_to_keep, |hdr| hdr.target_header_number())
122			.map(|idx| idx + 1)
123			.unwrap_or_else(|idx| idx);
124		let buf_limit_idx = match maybe_buf_limit {
125			Some(buf_limit) => self.buf.len().saturating_sub(buf_limit),
126			None => 0,
127		};
128
129		self.buf = self.buf.split_off(std::cmp::max(first_to_keep_idx, buf_limit_idx));
130	}
131}
132
133#[cfg(test)]
134mod tests {
135	use super::*;
136	use crate::mock::*;
137
138	#[test]
139	fn finality_proofs_buf_fill_works() {
140		// when stream is currently empty, nothing is changed
141		let mut finality_proofs_buf =
142			FinalityProofsBuf::<TestFinalitySyncPipeline> { buf: vec![TestFinalityProof(1)] };
143		let mut stream =
144			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
145				Box::pin(futures::stream::pending()),
146			);
147		finality_proofs_buf.fill(&mut stream);
148		assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1)]);
149		assert!(stream.stream.is_some());
150
151		// when stream has entry with target, it is added to the recent proofs container
152		let mut stream =
153			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
154				Box::pin(
155					futures::stream::iter(vec![TestFinalityProof(4)])
156						.chain(futures::stream::pending()),
157				),
158			);
159		finality_proofs_buf.fill(&mut stream);
160		assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
161		assert!(stream.stream.is_some());
162
163		// when stream has ended, we'll need to restart it
164		let mut stream =
165			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
166				Box::pin(futures::stream::empty()),
167			);
168		finality_proofs_buf.fill(&mut stream);
169		assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
170		assert!(stream.stream.is_none());
171	}
172
173	#[test]
174	fn finality_proofs_buf_prune_works() {
175		let original_finality_proofs_buf: Vec<
176			<TestFinalitySyncPipeline as FinalityPipeline>::FinalityProof,
177		> = vec![
178			TestFinalityProof(10),
179			TestFinalityProof(13),
180			TestFinalityProof(15),
181			TestFinalityProof(17),
182			TestFinalityProof(19),
183		]
184		.into_iter()
185		.collect();
186
187		// when there's proof for justified header in the vec
188		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
189			buf: original_finality_proofs_buf.clone(),
190		};
191		finality_proofs_buf.prune(10, None);
192		assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
193
194		// when there are no proof for justified header in the vec
195		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
196			buf: original_finality_proofs_buf.clone(),
197		};
198		finality_proofs_buf.prune(11, None);
199		assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
200
201		// when there are too many entries after initial prune && they also need to be pruned
202		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
203			buf: original_finality_proofs_buf.clone(),
204		};
205		finality_proofs_buf.prune(10, Some(2));
206		assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,);
207
208		// when last entry is pruned
209		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
210			buf: original_finality_proofs_buf.clone(),
211		};
212		finality_proofs_buf.prune(19, Some(2));
213		assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
214
215		// when post-last entry is pruned
216		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
217			buf: original_finality_proofs_buf.clone(),
218		};
219		finality_proofs_buf.prune(20, Some(2));
220		assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
221	}
222}