referrerpolicy=no-referrer-when-downgrade

substrate_relay_helper/finality/
source.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Default generic implementation of finality source for basic Substrate client.
18
19use crate::{
20	finality::{FinalitySyncPipelineAdapter, SubstrateFinalitySyncPipeline},
21	finality_base::{
22		engine::Engine, finality_proofs, SubstrateFinalityProof, SubstrateFinalityProofsStream,
23	},
24};
25
26use async_std::sync::{Arc, Mutex};
27use async_trait::async_trait;
28use bp_header_chain::FinalityProof;
29use codec::Decode;
30use finality_relay::{SourceClient, SourceClientBase};
31use futures::{
32	select,
33	stream::{try_unfold, Stream, StreamExt, TryStreamExt},
34};
35use num_traits::One;
36use relay_substrate_client::{BlockNumberOf, BlockWithJustification, Client, Error, HeaderOf};
37use relay_utils::{relay_loop::Client as RelayClient, UniqueSaturatedInto};
38
39/// Shared updatable reference to the maximal header number that we want to sync from the source.
40pub type RequiredHeaderNumberRef<C> = Arc<Mutex<<C as bp_runtime::Chain>::BlockNumber>>;
41
42/// Substrate node as finality source.
43pub struct SubstrateFinalitySource<P: SubstrateFinalitySyncPipeline, SourceClnt> {
44	client: SourceClnt,
45	maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceChain>>,
46}
47
48impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
49	SubstrateFinalitySource<P, SourceClnt>
50{
51	/// Create new headers source using given client.
52	pub fn new(
53		client: SourceClnt,
54		maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceChain>>,
55	) -> Self {
56		SubstrateFinalitySource { client, maximal_header_number }
57	}
58
59	/// Returns reference to the underlying RPC client.
60	pub fn client(&self) -> &SourceClnt {
61		&self.client
62	}
63
64	/// Returns best finalized block number.
65	pub async fn on_chain_best_finalized_block_number(
66		&self,
67	) -> Result<BlockNumberOf<P::SourceChain>, Error> {
68		// we **CAN** continue to relay finality proofs if source node is out of sync, because
69		// target node may be missing proofs that are already available at the source
70		self.client.best_finalized_header_number().await
71	}
72
73	/// Return header and its justification of the given block or its descendant that
74	/// has a GRANDPA justification.
75	///
76	/// This method is optimized for cases when `block_number` is close to the best finalized
77	/// chain block.
78	pub async fn prove_block_finality(
79		&self,
80		block_number: BlockNumberOf<P::SourceChain>,
81	) -> Result<
82		(relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>, SubstrateFinalityProof<P>),
83		Error,
84	> {
85		// first, subscribe to proofs
86		let next_persistent_proof =
87			self.persistent_proofs_stream(block_number + One::one()).await?.fuse();
88		let next_ephemeral_proof = self.ephemeral_proofs_stream(block_number).await?.fuse();
89
90		// in perfect world we'll need to return justfication for the requested `block_number`
91		let (header, maybe_proof) = self.header_and_finality_proof(block_number).await?;
92		if let Some(proof) = maybe_proof {
93			return Ok((header, proof))
94		}
95
96		// otherwise we don't care which header to return, so let's select first
97		futures::pin_mut!(next_persistent_proof, next_ephemeral_proof);
98		loop {
99			select! {
100				maybe_header_and_proof = next_persistent_proof.next() => match maybe_header_and_proof {
101					Some(header_and_proof) => return header_and_proof,
102					None => continue,
103				},
104				maybe_header_and_proof = next_ephemeral_proof.next() => match maybe_header_and_proof {
105					Some(header_and_proof) => return header_and_proof,
106					None => continue,
107				},
108				complete => return Err(Error::FinalityProofNotFound(block_number.unique_saturated_into()))
109			}
110		}
111	}
112
113	/// Returns stream of headers and their persistent proofs, starting from given block.
114	async fn persistent_proofs_stream(
115		&self,
116		block_number: BlockNumberOf<P::SourceChain>,
117	) -> Result<
118		impl Stream<
119			Item = Result<
120				(
121					relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
122					SubstrateFinalityProof<P>,
123				),
124				Error,
125			>,
126		>,
127		Error,
128	> {
129		let client = self.client.clone();
130		let best_finalized_block_number = client.best_finalized_header_number().await?;
131		Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move {
132			// if we've passed the `best_finalized_block_number`, we no longer need persistent
133			// justifications
134			if current_block_number > best_finalized_block_number {
135				return Ok(None)
136			}
137
138			let (header, maybe_proof) =
139				header_and_finality_proof::<P>(&client, current_block_number).await?;
140			let next_block_number = current_block_number + One::one();
141			let next_state = (client, next_block_number);
142
143			Ok(Some((maybe_proof.map(|proof| (header, proof)), next_state)))
144		})
145		.try_filter_map(|maybe_result| async { Ok(maybe_result) }))
146	}
147
148	/// Returns stream of headers and their ephemeral proofs, starting from given block.
149	async fn ephemeral_proofs_stream(
150		&self,
151		block_number: BlockNumberOf<P::SourceChain>,
152	) -> Result<
153		impl Stream<
154			Item = Result<
155				(
156					relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
157					SubstrateFinalityProof<P>,
158				),
159				Error,
160			>,
161		>,
162		Error,
163	> {
164		let client = self.client.clone();
165		Ok(self.finality_proofs().await?.map(Ok).try_filter_map(move |proof| {
166			let client = client.clone();
167			async move {
168				if proof.target_header_number() < block_number {
169					return Ok(None)
170				}
171
172				let header = client.header_by_number(proof.target_header_number()).await?;
173				Ok(Some((header.into(), proof)))
174			}
175		}))
176	}
177}
178
179impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Clone> Clone
180	for SubstrateFinalitySource<P, SourceClnt>
181{
182	fn clone(&self) -> Self {
183		SubstrateFinalitySource {
184			client: self.client.clone(),
185			maximal_header_number: self.maximal_header_number.clone(),
186		}
187	}
188}
189
190#[async_trait]
191impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>> RelayClient
192	for SubstrateFinalitySource<P, SourceClnt>
193{
194	type Error = Error;
195
196	async fn reconnect(&mut self) -> Result<(), Error> {
197		self.client.reconnect().await
198	}
199}
200
201#[async_trait]
202impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
203	SourceClientBase<FinalitySyncPipelineAdapter<P>> for SubstrateFinalitySource<P, SourceClnt>
204{
205	type FinalityProofsStream = SubstrateFinalityProofsStream<P>;
206
207	async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
208		finality_proofs::<P>(&self.client).await
209	}
210}
211
212#[async_trait]
213impl<P: SubstrateFinalitySyncPipeline, SourceClnt: Client<P::SourceChain>>
214	SourceClient<FinalitySyncPipelineAdapter<P>> for SubstrateFinalitySource<P, SourceClnt>
215{
216	async fn best_finalized_block_number(&self) -> Result<BlockNumberOf<P::SourceChain>, Error> {
217		let mut finalized_header_number = self.on_chain_best_finalized_block_number().await?;
218		// never return block number larger than requested. This way we'll never sync headers
219		// past `maximal_header_number`
220		if let Some(ref maximal_header_number) = self.maximal_header_number {
221			let maximal_header_number = *maximal_header_number.lock().await;
222			if finalized_header_number > maximal_header_number {
223				finalized_header_number = maximal_header_number;
224			}
225		}
226		Ok(finalized_header_number)
227	}
228
229	async fn header_and_finality_proof(
230		&self,
231		number: BlockNumberOf<P::SourceChain>,
232	) -> Result<
233		(
234			relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
235			Option<SubstrateFinalityProof<P>>,
236		),
237		Error,
238	> {
239		header_and_finality_proof::<P>(&self.client, number).await
240	}
241}
242
243async fn header_and_finality_proof<P: SubstrateFinalitySyncPipeline>(
244	client: &impl Client<P::SourceChain>,
245	number: BlockNumberOf<P::SourceChain>,
246) -> Result<
247	(
248		relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
249		Option<SubstrateFinalityProof<P>>,
250	),
251	Error,
252> {
253	let header_hash = client.header_hash_by_number(number).await?;
254	let signed_block = client.block_by_hash(header_hash).await?;
255
256	let justification = signed_block
257		.justification(P::FinalityEngine::ID)
258		.map(|raw_justification| {
259			SubstrateFinalityProof::<P>::decode(&mut raw_justification.as_slice())
260		})
261		.transpose()
262		.map_err(Error::ResponseParseFailed)?;
263
264	Ok((signed_block.header().into(), justification))
265}