use crate::{base::SourceClientBase, FinalityPipeline};
use bp_header_chain::FinalityProof;
use futures::{FutureExt, Stream, StreamExt};
use std::pin::Pin;
#[derive(Default)]
pub struct FinalityProofsStream<P: FinalityPipeline, SC: SourceClientBase<P>> {
stream: Option<Pin<Box<SC::FinalityProofsStream>>>,
}
impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
pub fn new() -> Self {
Self { stream: None }
}
pub fn from_stream(stream: SC::FinalityProofsStream) -> Self {
Self { stream: Some(Box::pin(stream)) }
}
fn next(&mut self) -> Option<<SC::FinalityProofsStream as Stream>::Item> {
let stream = match &mut self.stream {
Some(stream) => stream,
None => return None,
};
match stream.next().now_or_never() {
Some(Some(finality_proof)) => Some(finality_proof),
Some(None) => {
self.stream = None;
None
},
None => None,
}
}
pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> {
if self.stream.is_none() {
log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted",
P::SOURCE_NAME);
let stream = source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}",
P::SOURCE_NAME,
error,
);
error
})?;
self.stream = Some(Box::pin(stream));
}
Ok(())
}
}
pub struct FinalityProofsBuf<P: FinalityPipeline> {
buf: Vec<P::FinalityProof>,
}
impl<P: FinalityPipeline> FinalityProofsBuf<P> {
pub fn new(buf: Vec<P::FinalityProof>) -> Self {
Self { buf }
}
pub fn buf(&self) -> &Vec<P::FinalityProof> {
&self.buf
}
pub fn fill<SC: SourceClientBase<P>>(&mut self, stream: &mut FinalityProofsStream<P, SC>) {
let mut proofs_count = 0;
let mut first_header_number = None;
let mut last_header_number = None;
while let Some(finality_proof) = stream.next() {
let target_header_number = finality_proof.target_header_number();
first_header_number.get_or_insert(target_header_number);
last_header_number = Some(target_header_number);
proofs_count += 1;
self.buf.push(finality_proof);
}
if proofs_count != 0 {
log::trace!(
target: "bridge",
"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
proofs_count,
P::SOURCE_NAME,
first_header_number,
last_header_number,
);
}
}
pub fn prune(&mut self, first_to_keep: P::Number, maybe_buf_limit: Option<usize>) {
let first_to_keep_idx = self
.buf
.binary_search_by_key(&first_to_keep, |hdr| hdr.target_header_number())
.map(|idx| idx + 1)
.unwrap_or_else(|idx| idx);
let buf_limit_idx = match maybe_buf_limit {
Some(buf_limit) => self.buf.len().saturating_sub(buf_limit),
None => 0,
};
self.buf = self.buf.split_off(std::cmp::max(first_to_keep_idx, buf_limit_idx));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
#[test]
fn finality_proofs_buf_fill_works() {
let mut finality_proofs_buf =
FinalityProofsBuf::<TestFinalitySyncPipeline> { buf: vec![TestFinalityProof(1)] };
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(futures::stream::pending()),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1)]);
assert!(stream.stream.is_some());
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(
futures::stream::iter(vec![TestFinalityProof(4)])
.chain(futures::stream::pending()),
),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
assert!(stream.stream.is_some());
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(futures::stream::empty()),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]);
assert!(stream.stream.is_none());
}
#[test]
fn finality_proofs_buf_prune_works() {
let original_finality_proofs_buf: Vec<
<TestFinalitySyncPipeline as FinalityPipeline>::FinalityProof,
> = vec![
TestFinalityProof(10),
TestFinalityProof(13),
TestFinalityProof(15),
TestFinalityProof(17),
TestFinalityProof(19),
]
.into_iter()
.collect();
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(10, None);
assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(11, None);
assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(10, Some(2));
assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,);
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(19, Some(2));
assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(20, Some(2));
assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
}
}