finality_relay/
finality_proofs.rs
1use crate::{base::SourceClientBase, FinalityPipeline};
18
19use bp_header_chain::FinalityProof;
20use futures::{FutureExt, Stream, StreamExt};
21use std::pin::Pin;
22
23#[derive(Default)]
25pub struct FinalityProofsStream<P: FinalityPipeline, SC: SourceClientBase<P>> {
26 stream: Option<Pin<Box<SC::FinalityProofsStream>>>,
28}
29
30impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
31 pub fn new() -> Self {
32 Self { stream: None }
33 }
34
35 pub fn from_stream(stream: SC::FinalityProofsStream) -> Self {
36 Self { stream: Some(Box::pin(stream)) }
37 }
38
39 fn next(&mut self) -> Option<<SC::FinalityProofsStream as Stream>::Item> {
40 let stream = match &mut self.stream {
41 Some(stream) => stream,
42 None => return None,
43 };
44
45 match stream.next().now_or_never() {
46 Some(Some(finality_proof)) => Some(finality_proof),
47 Some(None) => {
48 self.stream = None;
49 None
50 },
51 None => None,
52 }
53 }
54
55 pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> {
56 if self.stream.is_none() {
57 log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted",
58 P::SOURCE_NAME);
59
60 let stream = source_client.finality_proofs().await.map_err(|error| {
61 log::error!(
62 target: "bridge",
63 "Failed to subscribe to {} justifications: {:?}",
64 P::SOURCE_NAME,
65 error,
66 );
67
68 error
69 })?;
70 self.stream = Some(Box::pin(stream));
71 }
72
73 Ok(())
74 }
75}
76
77pub struct FinalityProofsBuf<P: FinalityPipeline> {
79 buf: Vec<P::FinalityProof>,
81}
82
83impl<P: FinalityPipeline> FinalityProofsBuf<P> {
84 pub fn new(buf: Vec<P::FinalityProof>) -> Self {
85 Self { buf }
86 }
87
88 pub fn buf(&self) -> &Vec<P::FinalityProof> {
89 &self.buf
90 }
91
92 pub fn fill<SC: SourceClientBase<P>>(&mut self, stream: &mut FinalityProofsStream<P, SC>) {
93 let mut proofs_count = 0;
94 let mut first_header_number = None;
95 let mut last_header_number = None;
96 while let Some(finality_proof) = stream.next() {
97 let target_header_number = finality_proof.target_header_number();
98 first_header_number.get_or_insert(target_header_number);
99 last_header_number = Some(target_header_number);
100 proofs_count += 1;
101
102 self.buf.push(finality_proof);
103 }
104
105 if proofs_count != 0 {
106 log::trace!(
107 target: "bridge",
108 "Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
109 proofs_count,
110 P::SOURCE_NAME,
111 first_header_number,
112 last_header_number,
113 );
114 }
115 }
116
117 pub fn prune(&mut self, first_to_keep: P::Number, maybe_buf_limit: Option<usize>) {
119 let first_to_keep_idx = self
120 .buf
121 .binary_search_by_key(&first_to_keep, |hdr| hdr.target_header_number())
122 .map(|idx| idx + 1)
123 .unwrap_or_else(|idx| idx);
124 let buf_limit_idx = match maybe_buf_limit {
125 Some(buf_limit) => self.buf.len().saturating_sub(buf_limit),
126 None => 0,
127 };
128
129 self.buf = self.buf.split_off(std::cmp::max(first_to_keep_idx, buf_limit_idx));
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::mock::*;
137
138 #[test]
139 fn finality_proofs_buf_fill_works() {
140 let mut finality_proofs_buf =
142 FinalityProofsBuf::<TestFinalitySyncPipeline> { buf: vec![TestFinalityProof(1)] };
143 let mut stream =
144 FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
145 Box::pin(futures::stream::pending()),
146 );
147 finality_proofs_buf.fill(&mut stream);
148 assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1)]);
149 assert!(stream.stream.is_some());
150
151 let mut stream =
153 FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
154 Box::pin(
155 futures::stream::iter(vec![TestFinalityProof(4)])
156 .chain(futures::stream::pending()),
157 ),
158 );
159 finality_proofs_buf.fill(&mut stream);
160 assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
161 assert!(stream.stream.is_some());
162
163 let mut stream =
165 FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
166 Box::pin(futures::stream::empty()),
167 );
168 finality_proofs_buf.fill(&mut stream);
169 assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
170 assert!(stream.stream.is_none());
171 }
172
173 #[test]
174 fn finality_proofs_buf_prune_works() {
175 let original_finality_proofs_buf: Vec<
176 <TestFinalitySyncPipeline as FinalityPipeline>::FinalityProof,
177 > = vec![
178 TestFinalityProof(10),
179 TestFinalityProof(13),
180 TestFinalityProof(15),
181 TestFinalityProof(17),
182 TestFinalityProof(19),
183 ]
184 .into_iter()
185 .collect();
186
187 let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
189 buf: original_finality_proofs_buf.clone(),
190 };
191 finality_proofs_buf.prune(10, None);
192 assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
193
194 let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
196 buf: original_finality_proofs_buf.clone(),
197 };
198 finality_proofs_buf.prune(11, None);
199 assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
200
201 let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
203 buf: original_finality_proofs_buf.clone(),
204 };
205 finality_proofs_buf.prune(10, Some(2));
206 assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,);
207
208 let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
210 buf: original_finality_proofs_buf.clone(),
211 };
212 finality_proofs_buf.prune(19, Some(2));
213 assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
214
215 let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
217 buf: original_finality_proofs_buf.clone(),
218 };
219 finality_proofs_buf.prune(20, Some(2));
220 assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
221 }
222}