substrate_relay_helper/finality_base/
mod.rs1pub mod engine;
21
22use crate::finality_base::engine::Engine;
23
24use async_trait::async_trait;
25use bp_runtime::{HashOf, HeaderIdOf};
26use codec::Decode;
27use futures::{stream::unfold, Stream, StreamExt};
28use relay_substrate_client::{Chain, Client, Error};
29use std::{fmt::Debug, pin::Pin};
30
31#[async_trait]
33pub trait SubstrateFinalityPipeline: 'static + Clone + Debug + Send + Sync {
34 type SourceChain: Chain;
36 type TargetChain: Chain;
38 type FinalityEngine: Engine<Self::SourceChain>;
40}
41
42pub type SubstrateFinalityProof<P> = <<P as SubstrateFinalityPipeline>::FinalityEngine as Engine<
44 <P as SubstrateFinalityPipeline>::SourceChain,
45>>::FinalityProof;
46
47pub type SubstrateFinalityProofsStream<P> =
49 Pin<Box<dyn Stream<Item = SubstrateFinalityProof<P>> + Send>>;
50
51pub async fn finality_proofs<P: SubstrateFinalityPipeline>(
53 client: &impl Client<P::SourceChain>,
54) -> Result<SubstrateFinalityProofsStream<P>, Error> {
55 Ok(unfold(
56 P::FinalityEngine::source_finality_proofs(client).await?,
57 move |mut subscription| async move {
58 loop {
59 let log_error = |err| {
60 log::error!(
61 target: "bridge",
62 "Failed to read justification target from the {} justifications stream: {:?}",
63 P::SourceChain::NAME,
64 err,
65 );
66 };
67
68 let next_justification = subscription.next().await?;
69
70 let decoded_justification =
71 <P::FinalityEngine as Engine<P::SourceChain>>::FinalityProof::decode(
72 &mut &next_justification[..],
73 );
74
75 let justification = match decoded_justification {
76 Ok(j) => j,
77 Err(err) => {
78 log_error(format!("decode failed with error {err:?}"));
79 continue
80 },
81 };
82
83 return Some((justification, subscription))
84 }
85 },
86 )
87 .boxed())
88}
89
90pub async fn best_synced_header_id<SourceChain, TargetChain>(
95 target_client: &impl Client<TargetChain>,
96 at: HashOf<TargetChain>,
97) -> Result<Option<HeaderIdOf<SourceChain>>, Error>
98where
99 SourceChain: Chain,
100 TargetChain: Chain,
101{
102 target_client
104 .state_call(at, SourceChain::BEST_FINALIZED_HEADER_ID_METHOD.into(), ())
105 .await
106}