referrerpolicy=no-referrer-when-downgrade

substrate_relay_helper/on_demand/
headers.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! On-demand Substrate -> Substrate header finality relay.
18
19use crate::finality::SubmitFinalityProofCallBuilder;
20
21use async_std::sync::{Arc, Mutex};
22use async_trait::async_trait;
23use bp_header_chain::ConsensusLogReader;
24use bp_runtime::HeaderIdProvider;
25use futures::{select, FutureExt};
26use num_traits::{One, Saturating, Zero};
27use sp_runtime::traits::Header;
28
29use finality_relay::{FinalitySyncParams, HeadersToRelay, TargetClient as FinalityTargetClient};
30use relay_substrate_client::{
31	AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
32	HeaderIdOf,
33};
34use relay_utils::{
35	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError,
36	STALL_TIMEOUT,
37};
38
39use crate::{
40	finality::{
41		source::{RequiredHeaderNumberRef, SubstrateFinalitySource},
42		target::SubstrateFinalityTarget,
43		SubstrateFinalitySyncPipeline, RECENT_FINALITY_PROOFS_LIMIT,
44	},
45	finality_base::engine::Engine,
46	on_demand::OnDemandRelay,
47	TransactionParams,
48};
49
50/// On-demand Substrate <-> Substrate header finality relay.
51///
52/// This relay may be requested to sync more headers, whenever some other relay (e.g. messages
53/// relay) needs it to continue its regular work. When enough headers are relayed, on-demand stops
54/// syncing headers.
55#[derive(Clone)]
56pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline, SourceClnt, TargetClnt> {
57	/// Relay task name.
58	relay_task_name: String,
59	/// Shared reference to maximal required finalized header number.
60	required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
61	/// Client of the source chain.
62	source_client: SourceClnt,
63	/// Client of the target chain.
64	target_client: TargetClnt,
65}
66
67impl<
68		P: SubstrateFinalitySyncPipeline,
69		SourceClnt: Client<P::SourceChain>,
70		TargetClnt: Client<P::TargetChain>,
71	> OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
72{
73	/// Create new on-demand headers relay.
74	///
75	/// If `metrics_params` is `Some(_)`, the metrics of the finality relay are registered.
76	/// Otherwise, all required metrics must be exposed outside of this method.
77	pub fn new(
78		source_client: SourceClnt,
79		target_client: TargetClnt,
80		target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
81		headers_to_relay: HeadersToRelay,
82		metrics_params: Option<MetricsParams>,
83	) -> Self
84	where
85		AccountIdOf<P::TargetChain>:
86			From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
87	{
88		let required_header_number = Arc::new(Mutex::new(Zero::zero()));
89		let this = OnDemandHeadersRelay {
90			relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
91			required_header_number: required_header_number.clone(),
92			source_client: source_client.clone(),
93			target_client: target_client.clone(),
94		};
95		async_std::task::spawn(async move {
96			background_task::<P>(
97				source_client,
98				target_client,
99				target_transaction_params,
100				headers_to_relay,
101				required_header_number,
102				metrics_params,
103			)
104			.await;
105		});
106
107		this
108	}
109}
110
111#[async_trait]
112impl<
113		P: SubstrateFinalitySyncPipeline,
114		SourceClnt: Client<P::SourceChain>,
115		TargetClnt: Client<P::TargetChain>,
116	> OnDemandRelay<P::SourceChain, P::TargetChain>
117	for OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
118{
119	async fn reconnect(&self) -> Result<(), SubstrateError> {
120		// using clone is fine here (to avoid mut requirement), because clone on Client clones
121		// internal references
122		self.source_client.clone().reconnect().await?;
123		self.target_client.clone().reconnect().await
124	}
125
126	async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
127		let mut required_header_number = self.required_header_number.lock().await;
128		if required_header > *required_header_number {
129			log::trace!(
130				target: "bridge",
131				"[{}] More {} headers required. Going to sync up to the {}",
132				self.relay_task_name,
133				P::SourceChain::NAME,
134				required_header,
135			);
136
137			*required_header_number = required_header;
138		}
139	}
140
141	async fn prove_header(
142		&self,
143		required_header: BlockNumberOf<P::SourceChain>,
144	) -> Result<(HeaderIdOf<P::SourceChain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
145		const MAX_ITERATIONS: u32 = 4;
146		let mut iterations = 0;
147		let mut current_required_header = required_header;
148		loop {
149			// first find proper header (either `current_required_header`) or its descendant
150			let finality_source =
151				SubstrateFinalitySource::<P, _>::new(self.source_client.clone(), None);
152			let (header, mut proof) =
153				finality_source.prove_block_finality(current_required_header).await?;
154			let header_id = header.id();
155
156			// verify and optimize justification before including it into the call
157			let context = P::FinalityEngine::verify_and_optimize_proof(
158				&self.target_client,
159				&header,
160				&mut proof,
161			)
162			.await?;
163
164			// now we have the header and its proof, but we want to minimize our losses, so let's
165			// check if we'll get the full refund for submitting this header
166			let check_result = P::FinalityEngine::check_max_expected_call_limits(&header, &proof);
167			if check_result.is_weight_limit_exceeded || check_result.extra_size != 0 {
168				iterations += 1;
169				current_required_header = header_id.number().saturating_add(One::one());
170				if iterations < MAX_ITERATIONS {
171					log::debug!(
172						target: "bridge",
173						"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?}. But it exceeds limits: {:?}. \
174						Going to select next header",
175						self.relay_task_name,
176						P::SourceChain::NAME,
177						required_header,
178						P::SourceChain::NAME,
179						header_id,
180						check_result,
181					);
182
183					continue;
184				}
185			}
186
187			log::debug!(
188				target: "bridge",
189				"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} (after {} iterations)",
190				self.relay_task_name,
191				P::SourceChain::NAME,
192				required_header,
193				P::SourceChain::NAME,
194				header_id,
195				iterations,
196			);
197
198			// and then craft the submit-proof call
199			let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(
200				header, proof, false, context,
201			);
202
203			return Ok((header_id, vec![call]));
204		}
205	}
206}
207
208/// Background task that is responsible for starting headers relay.
209async fn background_task<P: SubstrateFinalitySyncPipeline>(
210	source_client: impl Client<P::SourceChain>,
211	target_client: impl Client<P::TargetChain>,
212	target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
213	headers_to_relay: HeadersToRelay,
214	required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
215	metrics_params: Option<MetricsParams>,
216) where
217	AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
218{
219	let relay_task_name = on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>();
220	let target_transactions_mortality = target_transaction_params.mortality;
221	let mut finality_source = SubstrateFinalitySource::<P, _>::new(
222		source_client.clone(),
223		Some(required_header_number.clone()),
224	);
225	let mut finality_target =
226		SubstrateFinalityTarget::new(target_client.clone(), target_transaction_params);
227	let mut latest_non_mandatory_at_source = Zero::zero();
228
229	let mut restart_relay = true;
230	let finality_relay_task = futures::future::Fuse::terminated();
231	futures::pin_mut!(finality_relay_task);
232
233	loop {
234		select! {
235			_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
236			_ = finality_relay_task => {
237				// this should never happen in practice given the current code
238				restart_relay = true;
239			},
240		}
241
242		// read best finalized source header number from source
243		let best_finalized_source_header_at_source =
244			best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
245		if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
246			relay_utils::relay_loop::reconnect_failed_client(
247				FailedClient::Source,
248				relay_utils::relay_loop::RECONNECT_DELAY,
249				&mut finality_source,
250				&mut finality_target,
251			)
252			.await;
253			continue
254		}
255
256		// read best finalized source header number from target
257		let best_finalized_source_header_at_target =
258			best_finalized_source_header_at_target::<P, _>(&finality_target, &relay_task_name)
259				.await;
260		if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
261			relay_utils::relay_loop::reconnect_failed_client(
262				FailedClient::Target,
263				relay_utils::relay_loop::RECONNECT_DELAY,
264				&mut finality_source,
265				&mut finality_target,
266			)
267			.await;
268			continue
269		}
270
271		// submit mandatory header if some headers are missing
272		let best_finalized_source_header_at_source_fmt =
273			format!("{best_finalized_source_header_at_source:?}");
274		let best_finalized_source_header_at_target_fmt =
275			format!("{best_finalized_source_header_at_target:?}");
276		let required_header_number_value = *required_header_number.lock().await;
277		let mandatory_scan_range = mandatory_headers_scan_range::<P::SourceChain>(
278			best_finalized_source_header_at_source.ok(),
279			best_finalized_source_header_at_target.ok(),
280			required_header_number_value,
281		)
282		.await;
283
284		log::trace!(
285			target: "bridge",
286			"[{}] Mandatory headers scan range: ({:?}, {:?}, {:?}) -> {:?}",
287			relay_task_name,
288			required_header_number_value,
289			best_finalized_source_header_at_source_fmt,
290			best_finalized_source_header_at_target_fmt,
291			mandatory_scan_range,
292		);
293
294		if let Some(mandatory_scan_range) = mandatory_scan_range {
295			let relay_mandatory_header_result = relay_mandatory_header_from_range(
296				&finality_source,
297				&required_header_number,
298				best_finalized_source_header_at_target_fmt,
299				(
300					std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source),
301					mandatory_scan_range.1,
302				),
303				&relay_task_name,
304			)
305			.await;
306			match relay_mandatory_header_result {
307				Ok(true) => (),
308				Ok(false) => {
309					// there are no (or we don't need to relay them) mandatory headers in the range
310					// => to avoid scanning the same headers over and over again, remember that
311					latest_non_mandatory_at_source = mandatory_scan_range.1;
312
313					log::trace!(
314						target: "bridge",
315						"[{}] No mandatory {} headers in the range {:?}",
316						relay_task_name,
317						P::SourceChain::NAME,
318						mandatory_scan_range,
319					);
320				},
321				Err(e) => {
322					log::warn!(
323						target: "bridge",
324						"[{}] Failed to scan mandatory {} headers range ({:?}): {:?}",
325						relay_task_name,
326						P::SourceChain::NAME,
327						mandatory_scan_range,
328						e,
329					);
330
331					if e.is_connection_error() {
332						relay_utils::relay_loop::reconnect_failed_client(
333							FailedClient::Source,
334							relay_utils::relay_loop::RECONNECT_DELAY,
335							&mut finality_source,
336							&mut finality_target,
337						)
338						.await;
339						continue
340					}
341				},
342			}
343		}
344
345		// start/restart relay
346		if restart_relay {
347			let stall_timeout = relay_substrate_client::transaction_stall_timeout(
348				target_transactions_mortality,
349				P::TargetChain::AVERAGE_BLOCK_INTERVAL,
350				STALL_TIMEOUT,
351			);
352
353			log::info!(
354				target: "bridge",
355				"[{}] Starting on-demand headers relay task\n\t\
356					Headers to relay: {:?}\n\t\
357					Tx mortality: {:?} (~{}m)\n\t\
358					Stall timeout: {:?}",
359				relay_task_name,
360				headers_to_relay,
361				target_transactions_mortality,
362				stall_timeout.as_secs_f64() / 60.0f64,
363				stall_timeout,
364			);
365
366			finality_relay_task.set(
367				finality_relay::run(
368					finality_source.clone(),
369					finality_target.clone(),
370					FinalitySyncParams {
371						tick: std::cmp::max(
372							P::SourceChain::AVERAGE_BLOCK_INTERVAL,
373							P::TargetChain::AVERAGE_BLOCK_INTERVAL,
374						),
375						recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
376						stall_timeout,
377						headers_to_relay,
378					},
379					metrics_params.clone().unwrap_or_else(MetricsParams::disabled),
380					futures::future::pending(),
381				)
382				.fuse(),
383			);
384
385			restart_relay = false;
386		}
387	}
388}
389
390/// Returns `Some()` with inclusive range of headers which must be scanned for mandatory headers
391/// and the first of such headers must be submitted to the target node.
392async fn mandatory_headers_scan_range<C: Chain>(
393	best_finalized_source_header_at_source: Option<C::BlockNumber>,
394	best_finalized_source_header_at_target: Option<C::BlockNumber>,
395	required_header_number: BlockNumberOf<C>,
396) -> Option<(C::BlockNumber, C::BlockNumber)> {
397	// if we have been unable to read header number from the target, then let's assume
398	// that it is the same as required header number. Otherwise we risk submitting
399	// unneeded transactions
400	let best_finalized_source_header_at_target =
401		best_finalized_source_header_at_target.unwrap_or(required_header_number);
402
403	// if we have been unable to read header number from the source, then let's assume
404	// that it is the same as at the target
405	let best_finalized_source_header_at_source =
406		best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
407
408	// if relay is already asked to sync more headers than we have at source, don't do anything yet
409	if required_header_number >= best_finalized_source_header_at_source {
410		return None
411	}
412
413	Some((
414		best_finalized_source_header_at_target + One::one(),
415		best_finalized_source_header_at_source,
416	))
417}
418
419/// Try to find mandatory header in the inclusive headers range and, if one is found, ask to relay
420/// it.
421///
422/// Returns `true` if header was found and (asked to be) relayed and `false` otherwise.
423async fn relay_mandatory_header_from_range<P, SourceClnt>(
424	finality_source: &SubstrateFinalitySource<P, SourceClnt>,
425	required_header_number: &RequiredHeaderNumberRef<P::SourceChain>,
426	best_finalized_source_header_at_target: String,
427	range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
428	relay_task_name: &str,
429) -> Result<bool, relay_substrate_client::Error>
430where
431	P: SubstrateFinalitySyncPipeline,
432	SourceClnt: Client<P::SourceChain>,
433{
434	// search for mandatory header first
435	let mandatory_source_header_number =
436		find_mandatory_header_in_range(finality_source, range).await?;
437
438	// if there are no mandatory headers - we have nothing to do
439	let mandatory_source_header_number = match mandatory_source_header_number {
440		Some(mandatory_source_header_number) => mandatory_source_header_number,
441		None => return Ok(false),
442	};
443
444	// `find_mandatory_header` call may take a while => check if `required_header_number` is still
445	// less than our `mandatory_source_header_number` before logging anything
446	let mut required_header_number = required_header_number.lock().await;
447	if *required_header_number >= mandatory_source_header_number {
448		return Ok(false)
449	}
450
451	log::trace!(
452		target: "bridge",
453		"[{}] Too many {} headers missing at target ({} vs {}). Going to sync up to the mandatory {}",
454		relay_task_name,
455		P::SourceChain::NAME,
456		best_finalized_source_header_at_target,
457		range.1,
458		mandatory_source_header_number,
459	);
460
461	*required_header_number = mandatory_source_header_number;
462	Ok(true)
463}
464
465/// Read best finalized source block number from source client.
466///
467/// Returns `None` if we have failed to read the number.
468async fn best_finalized_source_header_at_source<P, SourceClnt>(
469	finality_source: &SubstrateFinalitySource<P, SourceClnt>,
470	relay_task_name: &str,
471) -> Result<BlockNumberOf<P::SourceChain>, relay_substrate_client::Error>
472where
473	P: SubstrateFinalitySyncPipeline,
474	SourceClnt: Client<P::SourceChain>,
475{
476	finality_source.on_chain_best_finalized_block_number().await.map_err(|error| {
477		log::error!(
478			target: "bridge",
479			"[{}] Failed to read best finalized source header from source: {:?}",
480			relay_task_name,
481			error,
482		);
483
484		error
485	})
486}
487
488/// Read best finalized source block number from target client.
489///
490/// Returns `None` if we have failed to read the number.
491async fn best_finalized_source_header_at_target<P, TargetClnt>(
492	finality_target: &SubstrateFinalityTarget<P, TargetClnt>,
493	relay_task_name: &str,
494) -> Result<
495	BlockNumberOf<P::SourceChain>,
496	<SubstrateFinalityTarget<P, TargetClnt> as RelayClient>::Error,
497>
498where
499	P: SubstrateFinalitySyncPipeline,
500	TargetClnt: Client<P::TargetChain>,
501	AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
502{
503	finality_target
504		.best_finalized_source_block_id()
505		.await
506		.map_err(|error| {
507			log::error!(
508				target: "bridge",
509				"[{}] Failed to read best finalized source header from target: {:?}",
510				relay_task_name,
511				error,
512			);
513
514			error
515		})
516		.map(|id| id.0)
517}
518
519/// Read first mandatory header in given inclusive range.
520///
521/// Returns `Ok(None)` if there were no mandatory headers in the range.
522async fn find_mandatory_header_in_range<P, SourceClnt>(
523	finality_source: &SubstrateFinalitySource<P, SourceClnt>,
524	range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
525) -> Result<Option<BlockNumberOf<P::SourceChain>>, relay_substrate_client::Error>
526where
527	P: SubstrateFinalitySyncPipeline,
528	SourceClnt: Client<P::SourceChain>,
529{
530	let mut current = range.0;
531	while current <= range.1 {
532		let header = finality_source.client().header_by_number(current).await?;
533		if <P::FinalityEngine as Engine<P::SourceChain>>::ConsensusLogReader::schedules_authorities_change(
534			header.digest(),
535		) {
536			return Ok(Some(current))
537		}
538
539		current += One::one();
540	}
541
542	Ok(None)
543}
544
545/// On-demand headers relay task name.
546fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
547	format!("{}-to-{}-on-demand-headers", SourceChain::NAME, TargetChain::NAME)
548}
549
550#[cfg(test)]
551mod tests {
552	use super::*;
553	use relay_substrate_client::test_chain::TestChain;
554
555	const AT_SOURCE: Option<BlockNumberOf<TestChain>> = Some(10);
556	const AT_TARGET: Option<BlockNumberOf<TestChain>> = Some(1);
557
558	#[async_std::test]
559	async fn mandatory_headers_scan_range_selects_range_if_some_headers_are_missing() {
560		assert_eq!(
561			mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 0,).await,
562			Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())),
563		);
564	}
565
566	#[async_std::test]
567	async fn mandatory_headers_scan_range_selects_nothing_if_already_queued() {
568		assert_eq!(
569			mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, AT_SOURCE.unwrap(),)
570				.await,
571			None,
572		);
573	}
574}