substrate_relay_helper/finality/
source.rs
1use crate::{
20 finality::{FinalitySyncPipelineAdapter, SubstrateFinalitySyncPipeline},
21 finality_base::{
22 engine::Engine, finality_proofs, SubstrateFinalityProof, SubstrateFinalityProofsStream,
23 },
24};
25
26use async_std::sync::{Arc, Mutex};
27use async_trait::async_trait;
28use bp_header_chain::FinalityProof;
29use codec::Decode;
30use finality_relay::{SourceClient, SourceClientBase};
31use futures::{
32 select,
33 stream::{try_unfold, Stream, StreamExt, TryStreamExt},
34};
35use num_traits::One;
36use relay_substrate_client::{BlockNumberOf, BlockWithJustification, Client, Error, HeaderOf};
37use relay_utils::{relay_loop::Client as RelayClient, UniqueSaturatedInto};
38
39pub type RequiredHeaderNumberRef<C> = Arc<Mutex<<C as bp_runtime::Chain>::BlockNumber>>;
41
42pub struct SubstrateFinalitySource<P: SubstrateFinalitySyncPipeline, SourceClnt> {
44 client: SourceClnt,
45 maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceChain>>,
46}
47
48impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
49 SubstrateFinalitySource<P, SourceClnt>
50{
51 pub fn new(
53 client: SourceClnt,
54 maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceChain>>,
55 ) -> Self {
56 SubstrateFinalitySource { client, maximal_header_number }
57 }
58
59 pub fn client(&self) -> &SourceClnt {
61 &self.client
62 }
63
64 pub async fn on_chain_best_finalized_block_number(
66 &self,
67 ) -> Result<BlockNumberOf<P::SourceChain>, Error> {
68 self.client.best_finalized_header_number().await
71 }
72
73 pub async fn prove_block_finality(
79 &self,
80 block_number: BlockNumberOf<P::SourceChain>,
81 ) -> Result<
82 (relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>, SubstrateFinalityProof<P>),
83 Error,
84 > {
85 let next_persistent_proof =
87 self.persistent_proofs_stream(block_number + One::one()).await?.fuse();
88 let next_ephemeral_proof = self.ephemeral_proofs_stream(block_number).await?.fuse();
89
90 let (header, maybe_proof) = self.header_and_finality_proof(block_number).await?;
92 if let Some(proof) = maybe_proof {
93 return Ok((header, proof))
94 }
95
96 futures::pin_mut!(next_persistent_proof, next_ephemeral_proof);
98 loop {
99 select! {
100 maybe_header_and_proof = next_persistent_proof.next() => match maybe_header_and_proof {
101 Some(header_and_proof) => return header_and_proof,
102 None => continue,
103 },
104 maybe_header_and_proof = next_ephemeral_proof.next() => match maybe_header_and_proof {
105 Some(header_and_proof) => return header_and_proof,
106 None => continue,
107 },
108 complete => return Err(Error::FinalityProofNotFound(block_number.unique_saturated_into()))
109 }
110 }
111 }
112
113 async fn persistent_proofs_stream(
115 &self,
116 block_number: BlockNumberOf<P::SourceChain>,
117 ) -> Result<
118 impl Stream<
119 Item = Result<
120 (
121 relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
122 SubstrateFinalityProof<P>,
123 ),
124 Error,
125 >,
126 >,
127 Error,
128 > {
129 let client = self.client.clone();
130 let best_finalized_block_number = client.best_finalized_header_number().await?;
131 Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move {
132 if current_block_number > best_finalized_block_number {
135 return Ok(None)
136 }
137
138 let (header, maybe_proof) =
139 header_and_finality_proof::<P>(&client, current_block_number).await?;
140 let next_block_number = current_block_number + One::one();
141 let next_state = (client, next_block_number);
142
143 Ok(Some((maybe_proof.map(|proof| (header, proof)), next_state)))
144 })
145 .try_filter_map(|maybe_result| async { Ok(maybe_result) }))
146 }
147
148 async fn ephemeral_proofs_stream(
150 &self,
151 block_number: BlockNumberOf<P::SourceChain>,
152 ) -> Result<
153 impl Stream<
154 Item = Result<
155 (
156 relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
157 SubstrateFinalityProof<P>,
158 ),
159 Error,
160 >,
161 >,
162 Error,
163 > {
164 let client = self.client.clone();
165 Ok(self.finality_proofs().await?.map(Ok).try_filter_map(move |proof| {
166 let client = client.clone();
167 async move {
168 if proof.target_header_number() < block_number {
169 return Ok(None)
170 }
171
172 let header = client.header_by_number(proof.target_header_number()).await?;
173 Ok(Some((header.into(), proof)))
174 }
175 }))
176 }
177}
178
179impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Clone> Clone
180 for SubstrateFinalitySource<P, SourceClnt>
181{
182 fn clone(&self) -> Self {
183 SubstrateFinalitySource {
184 client: self.client.clone(),
185 maximal_header_number: self.maximal_header_number.clone(),
186 }
187 }
188}
189
190#[async_trait]
191impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>> RelayClient
192 for SubstrateFinalitySource<P, SourceClnt>
193{
194 type Error = Error;
195
196 async fn reconnect(&mut self) -> Result<(), Error> {
197 self.client.reconnect().await
198 }
199}
200
201#[async_trait]
202impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
203 SourceClientBase<FinalitySyncPipelineAdapter<P>> for SubstrateFinalitySource<P, SourceClnt>
204{
205 type FinalityProofsStream = SubstrateFinalityProofsStream<P>;
206
207 async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
208 finality_proofs::<P>(&self.client).await
209 }
210}
211
212#[async_trait]
213impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
214 SourceClient<FinalitySyncPipelineAdapter<P>> for SubstrateFinalitySource<P, SourceClnt>
215{
216 async fn best_finalized_block_number(&self) -> Result<BlockNumberOf<P::SourceChain>, Error> {
217 let mut finalized_header_number = self.on_chain_best_finalized_block_number().await?;
218 if let Some(ref maximal_header_number) = self.maximal_header_number {
221 let maximal_header_number = *maximal_header_number.lock().await;
222 if finalized_header_number > maximal_header_number {
223 finalized_header_number = maximal_header_number;
224 }
225 }
226 Ok(finalized_header_number)
227 }
228
229 async fn header_and_finality_proof(
230 &self,
231 number: BlockNumberOf<P::SourceChain>,
232 ) -> Result<
233 (
234 relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
235 Option<SubstrateFinalityProof<P>>,
236 ),
237 Error,
238 > {
239 header_and_finality_proof::<P>(&self.client, number).await
240 }
241}
242
243async fn header_and_finality_proof<P: SubstrateFinalitySyncPipeline>(
244 client: &impl Client<P::SourceChain>,
245 number: BlockNumberOf<P::SourceChain>,
246) -> Result<
247 (
248 relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
249 Option<SubstrateFinalityProof<P>>,
250 ),
251 Error,
252> {
253 let header_hash = client.header_hash_by_number(number).await?;
254 let signed_block = client.block_by_hash(header_hash).await?;
255
256 let justification = signed_block
257 .justification(P::FinalityEngine::ID)
258 .map(|raw_justification| {
259 SubstrateFinalityProof::<P>::decode(&mut raw_justification.as_slice())
260 })
261 .transpose()
262 .map_err(Error::ResponseParseFailed)?;
263
264 Ok((signed_block.header().into(), justification))
265}