referrerpolicy=no-referrer-when-downgrade

parachains_relay/
parachains_loop.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
18
19use async_trait::async_trait;
20use bp_polkadot_core::{
21	parachains::{ParaHash, ParaHeadsProof, ParaId},
22	BlockNumber as RelayBlockNumber,
23};
24use futures::{
25	future::{FutureExt, Shared},
26	poll, select_biased,
27};
28use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
29use relay_utils::{
30	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
31	TrackedTransactionStatus, TransactionTracker,
32};
33use std::{future::Future, pin::Pin, task::Poll};
34
35/// Parachain header availability at a certain chain.
36#[derive(Clone, Copy, Debug)]
37pub enum AvailableHeader<T> {
38	/// The client can not report actual parachain head at this moment.
39	///
40	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
41	/// This variant must be treated as "we don't want to update parachain head value at the
42	/// target chain at this moment".
43	Unavailable,
44	/// There's no parachain header at the relay chain.
45	///
46	/// Normally it means that the parachain is not registered there.
47	Missing,
48	/// Parachain head with given hash is available at the source chain.
49	Available(T),
50}
51
52impl<T> AvailableHeader<T> {
53	/// Return available header.
54	pub fn as_available(&self) -> Option<&T> {
55		match *self {
56			AvailableHeader::Available(ref header) => Some(header),
57			_ => None,
58		}
59	}
60}
61
62impl<T> From<Option<T>> for AvailableHeader<T> {
63	fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
64		match maybe_header {
65			Some(header) => AvailableHeader::Available(header),
66			None => AvailableHeader::Missing,
67		}
68	}
69}
70
71/// Source client used in parachain heads synchronization loop.
72#[async_trait]
73pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
74	/// Returns `Ok(true)` if client is in synced state.
75	async fn ensure_synced(&self) -> Result<bool, Self::Error>;
76
77	/// Get parachain head id at given block.
78	async fn parachain_head(
79		&self,
80		at_block: HeaderIdOf<P::SourceRelayChain>,
81	) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error>;
82
83	/// Get parachain head proof at given block.
84	async fn prove_parachain_head(
85		&self,
86		at_block: HeaderIdOf<P::SourceRelayChain>,
87	) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
88}
89
90/// Target client used in parachain heads synchronization loop.
91#[async_trait]
92pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
93	/// Transaction tracker to track submitted transactions.
94	type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
95
96	/// Get best block id.
97	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
98
99	/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
100	/// is `Some(_)`, the returned
101	async fn best_finalized_source_relay_chain_block(
102		&self,
103		at_block: &HeaderIdOf<P::TargetChain>,
104	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
105	/// Get free source **relay** headers submission interval, if it is configured in the
106	/// target runtime. We assume that the target chain will accept parachain header, proved
107	/// at such relay header for free.
108	async fn free_source_relay_headers_interval(
109		&self,
110	) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
111
112	/// Get parachain head id at given block.
113	async fn parachain_head(
114		&self,
115		at_block: HeaderIdOf<P::TargetChain>,
116	) -> Result<
117		Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
118		Self::Error,
119	>;
120
121	/// Submit parachain heads proof.
122	async fn submit_parachain_head_proof(
123		&self,
124		at_source_block: HeaderIdOf<P::SourceRelayChain>,
125		para_head_hash: ParaHash,
126		proof: ParaHeadsProof,
127		is_free_execution_expected: bool,
128	) -> Result<Self::TransactionTracker, Self::Error>;
129}
130
131/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
132/// sync loop.
133pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
134	format!(
135		"{}_to_{}_Parachains_{}",
136		P::SourceRelayChain::NAME,
137		P::TargetChain::NAME,
138		P::SourceParachain::PARACHAIN_ID
139	)
140}
141
142/// Relay single parachain head.
143pub async fn relay_single_head<P: ParachainsPipeline>(
144	source_client: impl SourceClient<P>,
145	target_client: impl TargetClient<P>,
146	at_relay_block: HeaderIdOf<P::SourceRelayChain>,
147) -> Result<(), ()>
148where
149	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
150{
151	let tx_tracker =
152		submit_selected_head::<P, _>(&source_client, &target_client, at_relay_block, false)
153			.await
154			.map_err(drop)?;
155	match tx_tracker.wait().await {
156		TrackedTransactionStatus::Finalized(_) => Ok(()),
157		TrackedTransactionStatus::Lost => {
158			log::error!(
159				"Transaction with {} header at relay header {:?} is considered lost at {}",
160				P::SourceParachain::NAME,
161				at_relay_block,
162				P::TargetChain::NAME,
163			);
164			Err(())
165		},
166	}
167}
168
169/// Run parachain heads synchronization.
170pub async fn run<P: ParachainsPipeline>(
171	source_client: impl SourceClient<P>,
172	target_client: impl TargetClient<P>,
173	metrics_params: MetricsParams,
174	only_free_headers: bool,
175	exit_signal: impl Future<Output = ()> + 'static + Send,
176) -> Result<(), relay_utils::Error>
177where
178	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
179{
180	log::info!(
181		target: "bridge",
182		"Starting {} -> {} finality proof relay: relaying (only_free_headers: {:?}) headers",
183		P::SourceParachain::NAME,
184		P::TargetChain::NAME,
185		only_free_headers,
186	);
187
188	let exit_signal = exit_signal.shared();
189	relay_utils::relay_loop(source_client, target_client)
190		.with_metrics(metrics_params)
191		.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
192		.expose()
193		.await?
194		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
195			run_until_connection_lost(
196				source_client,
197				target_client,
198				metrics,
199				only_free_headers,
200				exit_signal.clone(),
201			)
202		})
203		.await
204}
205
206/// Run parachain heads synchronization.
207async fn run_until_connection_lost<P: ParachainsPipeline>(
208	source_client: impl SourceClient<P>,
209	target_client: impl TargetClient<P>,
210	metrics: Option<ParachainsLoopMetrics>,
211	only_free_headers: bool,
212	exit_signal: impl Future<Output = ()> + Send,
213) -> Result<(), FailedClient>
214where
215	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
216{
217	let exit_signal = exit_signal.fuse();
218	let min_block_interval = std::cmp::min(
219		P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
220		P::TargetChain::AVERAGE_BLOCK_INTERVAL,
221	);
222
223	// free parachain header = header, available (proved) at free relay chain block. Let's
224	// read interval of free source relay chain blocks from target client
225	let free_source_relay_headers_interval = if only_free_headers {
226		let free_source_relay_headers_interval =
227			target_client.free_source_relay_headers_interval().await.map_err(|e| {
228				log::warn!(
229					target: "bridge",
230					"Failed to read free {} headers interval at {}: {:?}",
231					P::SourceRelayChain::NAME,
232					P::TargetChain::NAME,
233					e,
234				);
235				FailedClient::Target
236			})?;
237		match free_source_relay_headers_interval {
238			Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
239				log::trace!(
240					target: "bridge",
241					"Free {} headers interval at {}: {:?}",
242					P::SourceRelayChain::NAME,
243					P::TargetChain::NAME,
244					free_source_relay_headers_interval,
245				);
246				free_source_relay_headers_interval
247			},
248			_ => {
249				log::warn!(
250					target: "bridge",
251					"Invalid free {} headers interval at {}: {:?}",
252					P::SourceRelayChain::NAME,
253					P::TargetChain::NAME,
254					free_source_relay_headers_interval,
255				);
256				return Err(FailedClient::Target)
257			},
258		}
259	} else {
260		// ignore - we don't need it
261		0
262	};
263
264	let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
265
266	futures::pin_mut!(exit_signal);
267
268	// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
269	// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
270	// regular errors.
271
272	loop {
273		// Either wait for new block, or exit signal.
274		// Please note that we are prioritizing the exit signal since if both events happen at once
275		// it doesn't make sense to perform one more loop iteration.
276		select_biased! {
277			_ = exit_signal => return Ok(()),
278			_ = async_std::task::sleep(min_block_interval).fuse() => {},
279		}
280
281		// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
282		// much redundant transactions
283		match source_client.ensure_synced().await {
284			Ok(true) => (),
285			Ok(false) => {
286				log::warn!(
287					target: "bridge",
288					"{} client is syncing. Won't do anything until it is synced",
289					P::SourceRelayChain::NAME,
290				);
291				continue
292			},
293			Err(e) => {
294				log::warn!(
295					target: "bridge",
296					"{} client has failed to return its sync status: {:?}",
297					P::SourceRelayChain::NAME,
298					e,
299				);
300				return Err(FailedClient::Source)
301			},
302		}
303
304		// if we have active transaction, we'll need to wait until it is mined or dropped
305		let best_target_block = target_client.best_block().await.map_err(|e| {
306			log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
307			FailedClient::Target
308		})?;
309		let (relay_of_head_at_target, head_at_target) =
310			read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
311
312		// check if our transaction has been mined
313		if let Some(tracker) = submitted_heads_tracker.take() {
314			match tracker.update(&best_target_block, &head_at_target).await {
315				SubmittedHeadStatus::Waiting(tracker) => {
316					// no news about our transaction and we shall keep waiting
317					submitted_heads_tracker = Some(tracker);
318					continue
319				},
320				SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
321					// all heads have been updated, we don't need this tracker anymore
322				},
323				SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
324					log::warn!(
325						target: "bridge",
326						"Parachains synchronization from {} to {} has stalled. Going to restart",
327						P::SourceRelayChain::NAME,
328						P::TargetChain::NAME,
329					);
330
331					return Err(FailedClient::Both)
332				},
333			}
334		}
335
336		// in all-headers strategy we'll be submitting para head, available at
337		// `best_finalized_relay_block_at_target`
338		let best_finalized_relay_block_at_target = target_client
339			.best_finalized_source_relay_chain_block(&best_target_block)
340			.await
341			.map_err(|e| {
342				log::warn!(
343					target: "bridge",
344					"Failed to read best finalized {} block from {}: {:?}",
345					P::SourceRelayChain::NAME,
346					P::TargetChain::NAME,
347					e,
348				);
349				FailedClient::Target
350			})?;
351
352		// ..but if we only need to submit free headers, we need to submit para
353		// head, available at best free source relay chain header, known to the
354		// target chain
355		let prove_at_relay_block = if only_free_headers {
356			match relay_of_head_at_target {
357				Some(relay_of_head_at_target) => {
358					// find last free relay chain header in the range that we are interested in
359					let scan_range_begin = relay_of_head_at_target.number();
360					let scan_range_end = best_finalized_relay_block_at_target.number();
361					if scan_range_end.saturating_sub(scan_range_begin) <
362						free_source_relay_headers_interval
363					{
364						// there are no new **free** relay chain headers in the range
365						log::trace!(
366							target: "bridge",
367							"Waiting for new free {} headers at {}: scanned {:?}..={:?}",
368							P::SourceRelayChain::NAME,
369							P::TargetChain::NAME,
370							scan_range_begin,
371							scan_range_end,
372						);
373						continue;
374					}
375
376					// we may submit new parachain head for free
377					best_finalized_relay_block_at_target
378				},
379				None => {
380					// no parachain head at target => let's submit first one
381					best_finalized_relay_block_at_target
382				},
383			}
384		} else {
385			best_finalized_relay_block_at_target
386		};
387
388		// now let's check if we need to update parachain head at all
389		let head_at_source =
390			read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
391		let is_update_required = is_update_required::<P>(
392			head_at_source,
393			head_at_target,
394			prove_at_relay_block,
395			best_target_block,
396		);
397
398		if is_update_required {
399			let transaction_tracker = submit_selected_head::<P, _>(
400				&source_client,
401				&target_client,
402				prove_at_relay_block,
403				only_free_headers,
404			)
405			.await?;
406			submitted_heads_tracker =
407				Some(SubmittedHeadsTracker::<P>::new(head_at_source, transaction_tracker));
408		}
409	}
410}
411
412/// Prove and submit parachain head at given relay chain block.
413async fn submit_selected_head<P: ParachainsPipeline, TC: TargetClient<P>>(
414	source_client: &impl SourceClient<P>,
415	target_client: &TC,
416	prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
417	only_free_headers: bool,
418) -> Result<TC::TransactionTracker, FailedClient> {
419	let (head_proof, head_hash) =
420		source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| {
421			log::warn!(
422				target: "bridge",
423				"Failed to prove {} parachain ParaId({}) heads: {:?}",
424				P::SourceRelayChain::NAME,
425				P::SourceParachain::PARACHAIN_ID,
426				e,
427			);
428			FailedClient::Source
429		})?;
430	log::info!(
431		target: "bridge",
432		"Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}",
433		P::SourceRelayChain::NAME,
434		P::SourceParachain::PARACHAIN_ID,
435		P::TargetChain::NAME,
436		prove_at_relay_block,
437		head_hash,
438	);
439
440	target_client
441		.submit_parachain_head_proof(prove_at_relay_block, head_hash, head_proof, only_free_headers)
442		.await
443		.map_err(|e| {
444			log::warn!(
445				target: "bridge",
446				"Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}",
447				P::SourceRelayChain::NAME,
448				P::SourceParachain::PARACHAIN_ID,
449				P::TargetChain::NAME,
450				e,
451			);
452			FailedClient::Target
453		})
454}
455
456/// Returns `true` if we need to submit parachain-head-update transaction.
457fn is_update_required<P: ParachainsPipeline>(
458	head_at_source: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
459	head_at_target: Option<HeaderIdOf<P::SourceParachain>>,
460	prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
461	best_target_block: HeaderIdOf<P::TargetChain>,
462) -> bool
463where
464	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
465{
466	log::trace!(
467		target: "bridge",
468		"Checking if {} parachain ParaId({}) needs update at {}:\n\t\
469			At {} ({:?}): {:?}\n\t\
470			At {} ({:?}): {:?}",
471		P::SourceRelayChain::NAME,
472		P::SourceParachain::PARACHAIN_ID,
473		P::TargetChain::NAME,
474		P::SourceRelayChain::NAME,
475		prove_at_relay_block,
476		head_at_source,
477		P::TargetChain::NAME,
478		best_target_block,
479		head_at_target,
480	);
481
482	let needs_update = match (head_at_source, head_at_target) {
483		(AvailableHeader::Unavailable, _) => {
484			// source client has politely asked us not to update current parachain head
485			// at the target chain
486			false
487		},
488		(AvailableHeader::Available(head_at_source), Some(head_at_target))
489			if head_at_source.number() > head_at_target.number() =>
490		{
491			// source client knows head that is better than the head known to the target
492			// client
493			true
494		},
495		(AvailableHeader::Available(_), Some(_)) => {
496			// this is normal case when relay has recently updated heads, when parachain is
497			// not progressing, or when our source client is still syncing
498			false
499		},
500		(AvailableHeader::Available(_), None) => {
501			// parachain is not yet known to the target client. This is true when parachain
502			// or bridge has been just onboarded/started
503			true
504		},
505		(AvailableHeader::Missing, Some(_)) => {
506			// parachain/parathread has been offboarded removed from the system. It needs to
507			// be propagated to the target client
508			true
509		},
510		(AvailableHeader::Missing, None) => {
511			// all's good - parachain is unknown to both clients
512			false
513		},
514	};
515
516	if needs_update {
517		log::trace!(
518			target: "bridge",
519			"{} parachain ParaId({}) needs update at {}: {:?} vs {:?}",
520			P::SourceRelayChain::NAME,
521			P::SourceParachain::PARACHAIN_ID,
522			P::TargetChain::NAME,
523			head_at_source,
524			head_at_target,
525		);
526	}
527
528	needs_update
529}
530
531/// Reads parachain head from the source client.
532async fn read_head_at_source<P: ParachainsPipeline>(
533	source_client: &impl SourceClient<P>,
534	metrics: Option<&ParachainsLoopMetrics>,
535	at_relay_block: &HeaderIdOf<P::SourceRelayChain>,
536) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, FailedClient> {
537	let para_head = source_client.parachain_head(*at_relay_block).await;
538	match para_head {
539		Ok(AvailableHeader::Available(para_head)) => {
540			if let Some(metrics) = metrics {
541				metrics.update_best_parachain_block_at_source(
542					ParaId(P::SourceParachain::PARACHAIN_ID),
543					para_head.number(),
544				);
545			}
546			Ok(AvailableHeader::Available(para_head))
547		},
548		Ok(r) => Ok(r),
549		Err(e) => {
550			log::warn!(
551				target: "bridge",
552				"Failed to read head of {} parachain ParaId({:?}): {:?}",
553				P::SourceRelayChain::NAME,
554				P::SourceParachain::PARACHAIN_ID,
555				e,
556			);
557			Err(FailedClient::Source)
558		},
559	}
560}
561
562/// Reads parachain head from the target client. Also returns source relay chain header
563/// that has been used to prove that head.
564async fn read_head_at_target<P: ParachainsPipeline>(
565	target_client: &impl TargetClient<P>,
566	metrics: Option<&ParachainsLoopMetrics>,
567	at_block: &HeaderIdOf<P::TargetChain>,
568) -> Result<
569	(Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceParachain>>),
570	FailedClient,
571> {
572	let para_head_id = target_client.parachain_head(*at_block).await;
573	match para_head_id {
574		Ok(Some((relay_header_id, para_head_id))) => {
575			if let Some(metrics) = metrics {
576				metrics.update_best_parachain_block_at_target(
577					ParaId(P::SourceParachain::PARACHAIN_ID),
578					para_head_id.number(),
579				);
580			}
581			Ok((Some(relay_header_id), Some(para_head_id)))
582		},
583		Ok(None) => Ok((None, None)),
584		Err(e) => {
585			log::warn!(
586				target: "bridge",
587				"Failed to read head of {} parachain ParaId({}) at {}: {:?}",
588				P::SourceRelayChain::NAME,
589				P::SourceParachain::PARACHAIN_ID,
590				P::TargetChain::NAME,
591				e,
592			);
593			Err(FailedClient::Target)
594		},
595	}
596}
597
598/// Submitted heads status.
599enum SubmittedHeadStatus<P: ParachainsPipeline> {
600	/// Heads are not yet updated.
601	Waiting(SubmittedHeadsTracker<P>),
602	/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
603	Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
604}
605
606/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
607///
608/// It needs to be shared because of `poll` macro and our consuming `update` method.
609type SharedTransactionTracker<P> = Shared<
610	Pin<
611		Box<
612			dyn Future<
613					Output = TrackedTransactionStatus<
614						HeaderIdOf<<P as ParachainsPipeline>::TargetChain>,
615					>,
616				> + Send,
617		>,
618	>,
619>;
620
621/// Submitted parachain heads transaction.
622struct SubmittedHeadsTracker<P: ParachainsPipeline> {
623	/// Parachain header id that we have submitted.
624	submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
625	/// Future that waits for submitted transaction finality or loss.
626	///
627	/// It needs to be shared because of `poll` macro and our consuming `update` method.
628	transaction_tracker: SharedTransactionTracker<P>,
629}
630
631impl<P: ParachainsPipeline> SubmittedHeadsTracker<P> {
632	/// Creates new parachain heads transaction tracker.
633	pub fn new(
634		submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
635		transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
636	) -> Self {
637		SubmittedHeadsTracker {
638			submitted_head,
639			transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
640		}
641	}
642
643	/// Returns `None` if all submitted parachain heads have been updated.
644	pub async fn update(
645		self,
646		at_target_block: &HeaderIdOf<P::TargetChain>,
647		head_at_target: &Option<HeaderIdOf<P::SourceParachain>>,
648	) -> SubmittedHeadStatus<P> {
649		// check if our head has been updated
650		let is_head_updated = match (self.submitted_head, head_at_target) {
651			(AvailableHeader::Available(submitted_head), Some(head_at_target))
652				if head_at_target.number() >= submitted_head.number() =>
653				true,
654			(AvailableHeader::Missing, None) => true,
655			_ => false,
656		};
657		if is_head_updated {
658			log::trace!(
659				target: "bridge",
660				"Head of parachain ParaId({}) has been updated at {}: {:?}",
661				P::SourceParachain::PARACHAIN_ID,
662				P::TargetChain::NAME,
663				head_at_target,
664			);
665
666			return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block))
667		}
668
669		// if underlying transaction tracker has reported that the transaction is lost, we may
670		// then restart our sync
671		let transaction_tracker = self.transaction_tracker.clone();
672		match poll!(transaction_tracker) {
673			Poll::Ready(TrackedTransactionStatus::Lost) =>
674				return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
675			Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
676				// so we are here and our transaction is mined+finalized, but some of heads were not
677				// updated => we're considering our loop as stalled
678				return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
679			},
680			_ => (),
681		}
682
683		SubmittedHeadStatus::Waiting(self)
684	}
685}
686
687#[cfg(test)]
688mod tests {
689	use super::*;
690	use async_std::sync::{Arc, Mutex};
691	use futures::{SinkExt, StreamExt};
692	use relay_substrate_client::test_chain::{TestChain, TestParachain};
693	use relay_utils::{HeaderId, MaybeConnectionError};
694	use sp_core::H256;
695	use std::collections::HashMap;
696
697	const PARA_10_HASH: ParaHash = H256([10u8; 32]);
698	const PARA_20_HASH: ParaHash = H256([20u8; 32]);
699
700	#[derive(Clone, Debug)]
701	enum TestError {
702		Error,
703	}
704
705	impl MaybeConnectionError for TestError {
706		fn is_connection_error(&self) -> bool {
707			false
708		}
709	}
710
711	#[derive(Clone, Debug, PartialEq, Eq)]
712	struct TestParachainsPipeline;
713
714	impl ParachainsPipeline for TestParachainsPipeline {
715		type SourceRelayChain = TestChain;
716		type SourceParachain = TestParachain;
717		type TargetChain = TestChain;
718	}
719
720	#[derive(Clone, Debug)]
721	struct TestClient {
722		data: Arc<Mutex<TestClientData>>,
723	}
724
725	#[derive(Clone, Debug)]
726	struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);
727
728	#[async_trait]
729	impl TransactionTracker for TestTransactionTracker {
730		type HeaderId = HeaderIdOf<TestChain>;
731
732		async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
733			match self.0 {
734				Some(status) => status,
735				None => futures::future::pending().await,
736			}
737		}
738	}
739
740	#[derive(Clone, Debug)]
741	struct TestClientData {
742		source_sync_status: Result<bool, TestError>,
743		source_head: HashMap<
744			BlockNumberOf<TestChain>,
745			Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
746		>,
747		source_proof: Result<(), TestError>,
748
749		target_free_source_relay_headers_interval:
750			Result<Option<BlockNumberOf<TestChain>>, TestError>,
751		target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
752		target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
753		#[allow(clippy::type_complexity)]
754		target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError>,
755		target_submit_result: Result<(), TestError>,
756
757		submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
758		exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
759	}
760
761	impl TestClientData {
762		pub fn minimal() -> Self {
763			TestClientData {
764				source_sync_status: Ok(true),
765				source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
766					.into_iter()
767					.collect(),
768				source_proof: Ok(()),
769
770				target_free_source_relay_headers_interval: Ok(None),
771				target_best_block: Ok(HeaderId(0, Default::default())),
772				target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
773				target_head: Ok(None),
774				target_submit_result: Ok(()),
775
776				submitted_proof_at_source_relay_block: None,
777				exit_signal_sender: None,
778			}
779		}
780
781		pub fn with_exit_signal_sender(
782			sender: futures::channel::mpsc::UnboundedSender<()>,
783		) -> Self {
784			let mut client = Self::minimal();
785			client.exit_signal_sender = Some(Box::new(sender));
786			client
787		}
788	}
789
790	impl From<TestClientData> for TestClient {
791		fn from(data: TestClientData) -> TestClient {
792			TestClient { data: Arc::new(Mutex::new(data)) }
793		}
794	}
795
796	#[async_trait]
797	impl RelayClient for TestClient {
798		type Error = TestError;
799
800		async fn reconnect(&mut self) -> Result<(), TestError> {
801			unimplemented!()
802		}
803	}
804
805	#[async_trait]
806	impl SourceClient<TestParachainsPipeline> for TestClient {
807		async fn ensure_synced(&self) -> Result<bool, TestError> {
808			self.data.lock().await.source_sync_status.clone()
809		}
810
811		async fn parachain_head(
812			&self,
813			at_block: HeaderIdOf<TestChain>,
814		) -> Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError> {
815			self.data
816				.lock()
817				.await
818				.source_head
819				.get(&at_block.0)
820				.expect(&format!("SourceClient::parachain_head({})", at_block.0))
821				.clone()
822		}
823
824		async fn prove_parachain_head(
825			&self,
826			at_block: HeaderIdOf<TestChain>,
827		) -> Result<(ParaHeadsProof, ParaHash), TestError> {
828			let head_result =
829				SourceClient::<TestParachainsPipeline>::parachain_head(self, at_block).await?;
830			let head = head_result.as_available().unwrap();
831			let proof = (ParaHeadsProof { storage_proof: Default::default() }, head.hash());
832			self.data.lock().await.source_proof.clone().map(|_| proof)
833		}
834	}
835
836	#[async_trait]
837	impl TargetClient<TestParachainsPipeline> for TestClient {
838		type TransactionTracker = TestTransactionTracker;
839
840		async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
841			self.data.lock().await.target_best_block.clone()
842		}
843
844		async fn best_finalized_source_relay_chain_block(
845			&self,
846			_at_block: &HeaderIdOf<TestChain>,
847		) -> Result<HeaderIdOf<TestChain>, TestError> {
848			self.data.lock().await.target_best_finalized_source_block.clone()
849		}
850
851		async fn free_source_relay_headers_interval(
852			&self,
853		) -> Result<Option<BlockNumberOf<TestParachain>>, TestError> {
854			self.data.lock().await.target_free_source_relay_headers_interval.clone()
855		}
856
857		async fn parachain_head(
858			&self,
859			_at_block: HeaderIdOf<TestChain>,
860		) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError> {
861			self.data.lock().await.target_head.clone()
862		}
863
864		async fn submit_parachain_head_proof(
865			&self,
866			at_source_block: HeaderIdOf<TestChain>,
867			_updated_parachain_head: ParaHash,
868			_proof: ParaHeadsProof,
869			_is_free_execution_expected: bool,
870		) -> Result<TestTransactionTracker, Self::Error> {
871			let mut data = self.data.lock().await;
872			data.target_submit_result.clone()?;
873			data.submitted_proof_at_source_relay_block = Some(at_source_block);
874
875			if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
876				exit_signal_sender.send(()).await.unwrap();
877			}
878			Ok(TestTransactionTracker(Some(
879				TrackedTransactionStatus::Finalized(Default::default()),
880			)))
881		}
882	}
883
884	#[test]
885	fn when_source_client_fails_to_return_sync_state() {
886		let mut test_source_client = TestClientData::minimal();
887		test_source_client.source_sync_status = Err(TestError::Error);
888
889		assert_eq!(
890			async_std::task::block_on(run_until_connection_lost(
891				TestClient::from(test_source_client),
892				TestClient::from(TestClientData::minimal()),
893				None,
894				false,
895				futures::future::pending(),
896			)),
897			Err(FailedClient::Source),
898		);
899	}
900
901	#[test]
902	fn when_target_client_fails_to_return_best_block() {
903		let mut test_target_client = TestClientData::minimal();
904		test_target_client.target_best_block = Err(TestError::Error);
905
906		assert_eq!(
907			async_std::task::block_on(run_until_connection_lost(
908				TestClient::from(TestClientData::minimal()),
909				TestClient::from(test_target_client),
910				None,
911				false,
912				futures::future::pending(),
913			)),
914			Err(FailedClient::Target),
915		);
916	}
917
918	#[test]
919	fn when_target_client_fails_to_read_heads() {
920		let mut test_target_client = TestClientData::minimal();
921		test_target_client.target_head = Err(TestError::Error);
922
923		assert_eq!(
924			async_std::task::block_on(run_until_connection_lost(
925				TestClient::from(TestClientData::minimal()),
926				TestClient::from(test_target_client),
927				None,
928				false,
929				futures::future::pending(),
930			)),
931			Err(FailedClient::Target),
932		);
933	}
934
935	#[test]
936	fn when_target_client_fails_to_read_best_finalized_source_block() {
937		let mut test_target_client = TestClientData::minimal();
938		test_target_client.target_best_finalized_source_block = Err(TestError::Error);
939
940		assert_eq!(
941			async_std::task::block_on(run_until_connection_lost(
942				TestClient::from(TestClientData::minimal()),
943				TestClient::from(test_target_client),
944				None,
945				false,
946				futures::future::pending(),
947			)),
948			Err(FailedClient::Target),
949		);
950	}
951
952	#[test]
953	fn when_source_client_fails_to_read_heads() {
954		let mut test_source_client = TestClientData::minimal();
955		test_source_client.source_head.insert(0, Err(TestError::Error));
956
957		assert_eq!(
958			async_std::task::block_on(run_until_connection_lost(
959				TestClient::from(test_source_client),
960				TestClient::from(TestClientData::minimal()),
961				None,
962				false,
963				futures::future::pending(),
964			)),
965			Err(FailedClient::Source),
966		);
967	}
968
969	#[test]
970	fn when_source_client_fails_to_prove_heads() {
971		let mut test_source_client = TestClientData::minimal();
972		test_source_client.source_proof = Err(TestError::Error);
973
974		assert_eq!(
975			async_std::task::block_on(run_until_connection_lost(
976				TestClient::from(test_source_client),
977				TestClient::from(TestClientData::minimal()),
978				None,
979				false,
980				futures::future::pending(),
981			)),
982			Err(FailedClient::Source),
983		);
984	}
985
986	#[test]
987	fn when_target_client_rejects_update_transaction() {
988		let mut test_target_client = TestClientData::minimal();
989		test_target_client.target_submit_result = Err(TestError::Error);
990
991		assert_eq!(
992			async_std::task::block_on(run_until_connection_lost(
993				TestClient::from(TestClientData::minimal()),
994				TestClient::from(test_target_client),
995				None,
996				false,
997				futures::future::pending(),
998			)),
999			Err(FailedClient::Target),
1000		);
1001	}
1002
1003	#[test]
1004	fn minimal_working_case() {
1005		let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
1006		assert_eq!(
1007			async_std::task::block_on(run_until_connection_lost(
1008				TestClient::from(TestClientData::minimal()),
1009				TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
1010				None,
1011				false,
1012				exit_signal.into_future().map(|(_, _)| ()),
1013			)),
1014			Ok(()),
1015		);
1016	}
1017
1018	#[async_std::test]
1019	async fn free_headers_are_relayed() {
1020		// prepare following case:
1021		// 1) best source relay at target: 95
1022		// 2) best source parachain at target: 5 at relay 50
1023		// 3) free headers interval: 10
1024		// 4) at source relay chain block 90 source parachain block is 9
1025		// +
1026		// 5) best finalized source relay chain block is 95
1027		// 6) at source relay chain block 95 source parachain block is 42
1028		// =>
1029		// parachain block 42 would have been relayed, because 95 - 50 > 10
1030		let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
1031		let clients_data = TestClientData {
1032			source_sync_status: Ok(true),
1033			source_head: vec![
1034				(90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
1035				(95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
1036			]
1037			.into_iter()
1038			.collect(),
1039			source_proof: Ok(()),
1040
1041			target_free_source_relay_headers_interval: Ok(Some(10)),
1042			target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
1043			target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
1044			target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),
1045			target_submit_result: Ok(()),
1046
1047			submitted_proof_at_source_relay_block: None,
1048			exit_signal_sender: Some(Box::new(exit_signal_sender)),
1049		};
1050
1051		let source_client = TestClient::from(clients_data.clone());
1052		let target_client = TestClient::from(clients_data);
1053		assert_eq!(
1054			run_until_connection_lost(
1055				source_client,
1056				target_client.clone(),
1057				None,
1058				true,
1059				exit_signal.into_future().map(|(_, _)| ()),
1060			)
1061			.await,
1062			Ok(()),
1063		);
1064
1065		assert_eq!(
1066			target_client
1067				.data
1068				.lock()
1069				.await
1070				.submitted_proof_at_source_relay_block
1071				.map(|id| id.0),
1072			Some(95)
1073		);
1074
1075		// now source relay block chain 104 is mined with parachain head #84
1076		// => since 104 - 95 < 10, there are no free headers
1077		// => nothing is submitted
1078		let mut clients_data: TestClientData = target_client.data.lock().await.clone();
1079		clients_data
1080			.source_head
1081			.insert(104, Ok(AvailableHeader::Available(HeaderId(84, [84u8; 32].into()))));
1082		clients_data.target_best_finalized_source_block = Ok(HeaderId(104, [104u8; 32].into()));
1083		clients_data.target_head =
1084			Ok(Some((HeaderId(95, [95u8; 32].into()), HeaderId(42, [42u8; 32].into()))));
1085		clients_data.target_best_block = Ok(HeaderId(255, [255u8; 32].into()));
1086		clients_data.exit_signal_sender = None;
1087
1088		let source_client = TestClient::from(clients_data.clone());
1089		let target_client = TestClient::from(clients_data);
1090		assert_eq!(
1091			run_until_connection_lost(
1092				source_client,
1093				target_client.clone(),
1094				None,
1095				true,
1096				async_std::task::sleep(std::time::Duration::from_millis(100)),
1097			)
1098			.await,
1099			Ok(()),
1100		);
1101
1102		assert_eq!(
1103			target_client
1104				.data
1105				.lock()
1106				.await
1107				.submitted_proof_at_source_relay_block
1108				.map(|id| id.0),
1109			Some(95)
1110		);
1111	}
1112
1113	fn test_tx_tracker() -> SubmittedHeadsTracker<TestParachainsPipeline> {
1114		SubmittedHeadsTracker::new(
1115			AvailableHeader::Available(HeaderId(20, PARA_20_HASH)),
1116			TestTransactionTracker(None),
1117		)
1118	}
1119
1120	impl From<SubmittedHeadStatus<TestParachainsPipeline>> for Option<()> {
1121		fn from(status: SubmittedHeadStatus<TestParachainsPipeline>) -> Option<()> {
1122			match status {
1123				SubmittedHeadStatus::Waiting(_) => Some(()),
1124				_ => None,
1125			}
1126		}
1127	}
1128
1129	#[async_std::test]
1130	async fn tx_tracker_update_when_head_at_target_has_none_value() {
1131		assert_eq!(
1132			Some(()),
1133			test_tx_tracker()
1134				.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1135				.await
1136				.into(),
1137		);
1138	}
1139
1140	#[async_std::test]
1141	async fn tx_tracker_update_when_head_at_target_has_old_value() {
1142		assert_eq!(
1143			Some(()),
1144			test_tx_tracker()
1145				.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1146				.await
1147				.into(),
1148		);
1149	}
1150
1151	#[async_std::test]
1152	async fn tx_tracker_update_when_head_at_target_has_same_value() {
1153		assert!(matches!(
1154			test_tx_tracker()
1155				.update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH)))
1156				.await,
1157			SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
1158		));
1159	}
1160
1161	#[async_std::test]
1162	async fn tx_tracker_update_when_head_at_target_has_better_value() {
1163		assert!(matches!(
1164			test_tx_tracker()
1165				.update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH)))
1166				.await,
1167			SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
1168		));
1169	}
1170
1171	#[async_std::test]
1172	async fn tx_tracker_update_when_tx_is_lost() {
1173		let mut tx_tracker = test_tx_tracker();
1174		tx_tracker.transaction_tracker =
1175			futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
1176		assert!(matches!(
1177			tx_tracker
1178				.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1179				.await,
1180			SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
1181		));
1182	}
1183
1184	#[async_std::test]
1185	async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() {
1186		let mut tx_tracker = test_tx_tracker();
1187		tx_tracker.transaction_tracker =
1188			futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
1189				.boxed()
1190				.shared();
1191		assert!(matches!(
1192			tx_tracker
1193				.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1194				.await,
1195			SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
1196		));
1197	}
1198
1199	#[test]
1200	fn parachain_is_not_updated_if_it_is_unavailable() {
1201		assert!(!is_update_required::<TestParachainsPipeline>(
1202			AvailableHeader::Unavailable,
1203			None,
1204			Default::default(),
1205			Default::default(),
1206		));
1207		assert!(!is_update_required::<TestParachainsPipeline>(
1208			AvailableHeader::Unavailable,
1209			Some(HeaderId(10, PARA_10_HASH)),
1210			Default::default(),
1211			Default::default(),
1212		));
1213	}
1214
1215	#[test]
1216	fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
1217		assert!(!is_update_required::<TestParachainsPipeline>(
1218			AvailableHeader::Missing,
1219			None,
1220			Default::default(),
1221			Default::default(),
1222		),);
1223	}
1224
1225	#[test]
1226	fn parachain_is_not_updated_if_target_has_better_head() {
1227		assert!(!is_update_required::<TestParachainsPipeline>(
1228			AvailableHeader::Available(HeaderId(10, Default::default())),
1229			Some(HeaderId(20, Default::default())),
1230			Default::default(),
1231			Default::default(),
1232		),);
1233	}
1234
1235	#[test]
1236	fn parachain_is_updated_after_offboarding() {
1237		assert!(is_update_required::<TestParachainsPipeline>(
1238			AvailableHeader::Missing,
1239			Some(HeaderId(20, Default::default())),
1240			Default::default(),
1241			Default::default(),
1242		),);
1243	}
1244
1245	#[test]
1246	fn parachain_is_updated_after_onboarding() {
1247		assert!(is_update_required::<TestParachainsPipeline>(
1248			AvailableHeader::Available(HeaderId(30, Default::default())),
1249			None,
1250			Default::default(),
1251			Default::default(),
1252		),);
1253	}
1254
1255	#[test]
1256	fn parachain_is_updated_if_newer_head_is_known() {
1257		assert!(is_update_required::<TestParachainsPipeline>(
1258			AvailableHeader::Available(HeaderId(40, Default::default())),
1259			Some(HeaderId(30, Default::default())),
1260			Default::default(),
1261			Default::default(),
1262		),);
1263	}
1264}