referrerpolicy=no-referrer-when-downgrade

finality_relay/
finality_loop.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The loop basically reads all missing headers and their finality proofs from the source client.
18//! The proof for the best possible header is then submitted to the target node. The only exception
19//! is the mandatory headers, which we always submit to the target node. For such headers, we
20//! assume that the persistent proof either exists, or will eventually become available.
21
22use crate::{sync_loop_metrics::SyncLoopMetrics, Error, FinalitySyncPipeline, SourceHeader};
23
24use crate::{
25	base::SourceClientBase,
26	finality_proofs::{FinalityProofsBuf, FinalityProofsStream},
27	headers::{JustifiedHeader, JustifiedHeaderSelector},
28};
29use async_trait::async_trait;
30use backoff::{backoff::Backoff, ExponentialBackoff};
31use futures::{future::Fuse, select, Future, FutureExt};
32use num_traits::{Saturating, Zero};
33use relay_utils::{
34	metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
35	HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
36};
37use std::{
38	fmt::Debug,
39	time::{Duration, Instant},
40};
41
42/// Type of headers that we relay.
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum HeadersToRelay {
45	/// Relay all headers.
46	All,
47	/// Relay only mandatory headers.
48	Mandatory,
49	/// Relay only free (including mandatory) headers.
50	Free,
51}
52
53/// Finality proof synchronization loop parameters.
54#[derive(Debug, Clone)]
55pub struct FinalitySyncParams {
56	/// Interval at which we check updates on both clients. Normally should be larger than
57	/// `min(source_block_time, target_block_time)`.
58	///
59	/// This parameter may be used to limit transactions rate. Increase the value && you'll get
60	/// infrequent updates => sparse headers => potential slow down of bridge applications, but
61	/// pallet storage won't be super large. Decrease the value to near `source_block_time` and
62	/// you'll get transaction for (almost) every block of the source chain => all source headers
63	/// will be known to the target chain => bridge applications will run faster, but pallet
64	/// storage may explode (but if pruning is there, then it's fine).
65	pub tick: Duration,
66	/// Number of finality proofs to keep in internal buffer between loop iterations.
67	///
68	/// While in "major syncing" state, we still read finality proofs from the stream. They're
69	/// stored in the internal buffer between loop iterations. When we're close to the tip of the
70	/// chain, we may meet finality delays if headers are not finalized frequently. So instead of
71	/// waiting for next finality proof to appear in the stream, we may use existing proof from
72	/// that buffer.
73	pub recent_finality_proofs_limit: usize,
74	/// Timeout before we treat our transactions as lost and restart the whole sync process.
75	pub stall_timeout: Duration,
76	/// If true, only mandatory headers are relayed.
77	pub headers_to_relay: HeadersToRelay,
78}
79
80/// Source client used in finality synchronization loop.
81#[async_trait]
82pub trait SourceClient<P: FinalitySyncPipeline>: SourceClientBase<P> {
83	/// Get best finalized block number.
84	async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
85
86	/// Get canonical header and its finality proof by number.
87	async fn header_and_finality_proof(
88		&self,
89		number: P::Number,
90	) -> Result<(P::Header, Option<P::FinalityProof>), Self::Error>;
91}
92
93/// Target client used in finality synchronization loop.
94#[async_trait]
95pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
96	/// Transaction tracker to track submitted transactions.
97	type TransactionTracker: TransactionTracker;
98
99	/// Get best finalized source block number.
100	async fn best_finalized_source_block_id(
101		&self,
102	) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
103
104	/// Get free source headers submission interval, if it is configured in the
105	/// target runtime.
106	async fn free_source_headers_interval(&self) -> Result<Option<P::Number>, Self::Error>;
107
108	/// Submit header finality proof.
109	async fn submit_finality_proof(
110		&self,
111		header: P::Header,
112		proof: P::FinalityProof,
113		is_free_execution_expected: bool,
114	) -> Result<Self::TransactionTracker, Self::Error>;
115}
116
117/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs
118/// sync loop.
119pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
120	format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
121}
122
123/// Finality sync information.
124pub struct SyncInfo<P: FinalitySyncPipeline> {
125	/// Best finalized header at the source client.
126	pub best_number_at_source: P::Number,
127	/// Best source header, known to the target client.
128	pub best_number_at_target: P::Number,
129	/// Whether the target client follows the same fork as the source client do.
130	pub is_using_same_fork: bool,
131}
132
133impl<P: FinalitySyncPipeline> SyncInfo<P> {
134	/// Checks if both clients are on the same fork.
135	async fn is_on_same_fork<SC: SourceClient<P>>(
136		source_client: &SC,
137		id_at_target: &HeaderId<P::Hash, P::Number>,
138	) -> Result<bool, SC::Error> {
139		let header_at_source = source_client.header_and_finality_proof(id_at_target.0).await?.0;
140		let header_hash_at_source = header_at_source.hash();
141		Ok(if id_at_target.1 == header_hash_at_source {
142			true
143		} else {
144			log::error!(
145				target: "bridge",
146				"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
147				at-source {:?} vs at-target {:?}",
148				P::SOURCE_NAME,
149				P::TARGET_NAME,
150				id_at_target.0,
151				header_hash_at_source,
152				id_at_target.1,
153			);
154
155			false
156		})
157	}
158
159	async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
160		source_client: &SC,
161		target_client: &TC,
162	) -> Result<Self, Error<P, SC::Error, TC::Error>> {
163		let best_number_at_source =
164			source_client.best_finalized_block_number().await.map_err(Error::Source)?;
165		let best_id_at_target =
166			target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
167		let best_number_at_target = best_id_at_target.0;
168
169		let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target)
170			.await
171			.map_err(Error::Source)?;
172
173		Ok(Self { best_number_at_source, best_number_at_target, is_using_same_fork })
174	}
175
176	fn update_metrics(&self, metrics_sync: &Option<SyncLoopMetrics>) {
177		if let Some(metrics_sync) = metrics_sync {
178			metrics_sync.update_best_block_at_source(self.best_number_at_source);
179			metrics_sync.update_best_block_at_target(self.best_number_at_target);
180			metrics_sync.update_using_same_fork(self.is_using_same_fork);
181		}
182	}
183
184	pub fn num_headers(&self) -> P::Number {
185		self.best_number_at_source.saturating_sub(self.best_number_at_target)
186	}
187}
188
189/// Information about transaction that we have submitted.
190#[derive(Debug, Clone)]
191pub struct Transaction<Tracker, Number> {
192	/// Submitted transaction tracker.
193	tracker: Tracker,
194	/// The number of the header we have submitted.
195	header_number: Number,
196}
197
198impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracker, Number> {
199	pub async fn submit<
200		P: FinalitySyncPipeline<Number = Number>,
201		TC: TargetClient<P, TransactionTracker = Tracker>,
202	>(
203		target_client: &TC,
204		header: P::Header,
205		justification: P::FinalityProof,
206		is_free_execution_expected: bool,
207	) -> Result<Self, TC::Error> {
208		let header_number = header.number();
209		log::debug!(
210			target: "bridge",
211			"Going to submit finality proof of {} header #{:?} to {}",
212			P::SOURCE_NAME,
213			header_number,
214			P::TARGET_NAME,
215		);
216
217		let tracker = target_client
218			.submit_finality_proof(header, justification, is_free_execution_expected)
219			.await?;
220		Ok(Transaction { tracker, header_number })
221	}
222
223	async fn track<
224		P: FinalitySyncPipeline<Number = Number>,
225		SC: SourceClient<P>,
226		TC: TargetClient<P>,
227	>(
228		self,
229		target_client: TC,
230	) -> Result<(), Error<P, SC::Error, TC::Error>> {
231		match self.tracker.wait().await {
232			TrackedTransactionStatus::Finalized(_) => {
233				// The transaction has been finalized, but it may have been finalized in the
234				// "failed" state. So let's check if the block number was actually updated.
235				target_client
236					.best_finalized_source_block_id()
237					.await
238					.map_err(Error::Target)
239					.and_then(|best_id_at_target| {
240						if self.header_number > best_id_at_target.0 {
241							return Err(Error::ProofSubmissionTxFailed {
242								submitted_number: self.header_number,
243								best_number_at_target: best_id_at_target.0,
244							})
245						}
246						Ok(())
247					})
248			},
249			TrackedTransactionStatus::Lost => Err(Error::ProofSubmissionTxLost),
250		}
251	}
252}
253
254/// Finality synchronization loop state.
255struct FinalityLoop<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> {
256	source_client: SC,
257	target_client: TC,
258
259	sync_params: FinalitySyncParams,
260	metrics_sync: Option<SyncLoopMetrics>,
261
262	progress: (Instant, Option<P::Number>),
263	retry_backoff: ExponentialBackoff,
264	finality_proofs_stream: FinalityProofsStream<P, SC>,
265	finality_proofs_buf: FinalityProofsBuf<P>,
266	best_submitted_number: Option<P::Number>,
267}
268
269impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> FinalityLoop<P, SC, TC> {
270	pub fn new(
271		source_client: SC,
272		target_client: TC,
273		sync_params: FinalitySyncParams,
274		metrics_sync: Option<SyncLoopMetrics>,
275	) -> Self {
276		Self {
277			source_client,
278			target_client,
279			sync_params,
280			metrics_sync,
281			progress: (Instant::now(), None),
282			retry_backoff: retry_backoff(),
283			finality_proofs_stream: FinalityProofsStream::new(),
284			finality_proofs_buf: FinalityProofsBuf::new(vec![]),
285			best_submitted_number: None,
286		}
287	}
288
289	fn update_progress(&mut self, info: &SyncInfo<P>) {
290		let (prev_time, prev_best_number_at_target) = self.progress;
291		let now = Instant::now();
292
293		let needs_update = now - prev_time > Duration::from_secs(10) ||
294			prev_best_number_at_target
295				.map(|prev_best_number_at_target| {
296					info.best_number_at_target.saturating_sub(prev_best_number_at_target) >
297						10.into()
298				})
299				.unwrap_or(true);
300
301		if !needs_update {
302			return
303		}
304
305		log::info!(
306			target: "bridge",
307			"Synced {:?} of {:?} headers",
308			info.best_number_at_target,
309			info.best_number_at_source,
310		);
311
312		self.progress = (now, Some(info.best_number_at_target))
313	}
314
315	pub async fn select_header_to_submit(
316		&mut self,
317		info: &SyncInfo<P>,
318		free_headers_interval: Option<P::Number>,
319	) -> Result<Option<JustifiedHeader<P>>, Error<P, SC::Error, TC::Error>> {
320		// to see that the loop is progressing
321		log::trace!(
322			target: "bridge",
323			"Considering range of headers ({}; {}]",
324			info.best_number_at_target,
325			info.best_number_at_source
326		);
327
328		// read missing headers
329		let selector = JustifiedHeaderSelector::new::<SC, TC>(
330			&self.source_client,
331			info,
332			self.sync_params.headers_to_relay,
333			free_headers_interval,
334		)
335		.await?;
336		// if we see that the header schedules GRANDPA change, we need to submit it
337		if self.sync_params.headers_to_relay == HeadersToRelay::Mandatory {
338			return Ok(selector.select_mandatory())
339		}
340
341		// all headers that are missing from the target client are non-mandatory
342		// => even if we have already selected some header and its persistent finality proof,
343		// we may try to select better header by reading non-persistent proofs from the stream
344		self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
345		let maybe_justified_header = selector.select(
346			info,
347			self.sync_params.headers_to_relay,
348			free_headers_interval,
349			&self.finality_proofs_buf,
350		);
351
352		// remove obsolete 'recent' finality proofs + keep its size under certain limit
353		let oldest_finality_proof_to_keep = maybe_justified_header
354			.as_ref()
355			.map(|justified_header| justified_header.number())
356			.unwrap_or(info.best_number_at_target);
357		self.finality_proofs_buf.prune(
358			oldest_finality_proof_to_keep,
359			Some(self.sync_params.recent_finality_proofs_limit),
360		);
361
362		Ok(maybe_justified_header)
363	}
364
365	pub async fn run_iteration(
366		&mut self,
367		free_headers_interval: Option<P::Number>,
368	) -> Result<
369		Option<Transaction<TC::TransactionTracker, P::Number>>,
370		Error<P, SC::Error, TC::Error>,
371	> {
372		// read best source headers ids from source and target nodes
373		let info = SyncInfo::new(&self.source_client, &self.target_client).await?;
374		info.update_metrics(&self.metrics_sync);
375		self.update_progress(&info);
376
377		// if we have already submitted header, then we just need to wait for it
378		// if we're waiting too much, then we believe our transaction has been lost and restart sync
379		if Some(info.best_number_at_target) < self.best_submitted_number {
380			return Ok(None)
381		}
382
383		// submit new header if we have something new
384		match self.select_header_to_submit(&info, free_headers_interval).await? {
385			Some(header) => {
386				let transaction = Transaction::submit(
387					&self.target_client,
388					header.header,
389					header.proof,
390					self.sync_params.headers_to_relay == HeadersToRelay::Free,
391				)
392				.await
393				.map_err(Error::Target)?;
394				self.best_submitted_number = Some(transaction.header_number);
395				Ok(Some(transaction))
396			},
397			None => Ok(None),
398		}
399	}
400
401	async fn ensure_finality_proofs_stream(&mut self) -> Result<(), FailedClient> {
402		if let Err(e) = self.finality_proofs_stream.ensure_stream(&self.source_client).await {
403			if e.is_connection_error() {
404				return Err(FailedClient::Source)
405			}
406		}
407
408		Ok(())
409	}
410
411	/// Run finality relay loop until connection to one of nodes is lost.
412	async fn run_until_connection_lost(
413		&mut self,
414		exit_signal: impl Future<Output = ()>,
415	) -> Result<(), FailedClient> {
416		self.ensure_finality_proofs_stream().await?;
417		let proof_submission_tx_tracker = Fuse::terminated();
418		let exit_signal = exit_signal.fuse();
419		futures::pin_mut!(exit_signal, proof_submission_tx_tracker);
420
421		let free_headers_interval = free_headers_interval(&self.target_client).await?;
422
423		loop {
424			// run loop iteration
425			let next_tick = match self.run_iteration(free_headers_interval).await {
426				Ok(Some(tx)) => {
427					proof_submission_tx_tracker
428						.set(tx.track::<P, SC, _>(self.target_client.clone()).fuse());
429					self.retry_backoff.reset();
430					self.sync_params.tick
431				},
432				Ok(None) => {
433					self.retry_backoff.reset();
434					self.sync_params.tick
435				},
436				Err(error) => {
437					log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
438					error.fail_if_connection_error()?;
439					self.retry_backoff
440						.next_backoff()
441						.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
442				},
443			};
444			self.ensure_finality_proofs_stream().await?;
445
446			// wait till exit signal, or new source block
447			select! {
448				proof_submission_result = proof_submission_tx_tracker => {
449					if let Err(e) = proof_submission_result {
450						log::error!(
451							target: "bridge",
452							"Finality sync proof submission tx to {} has failed with error: {:?}.",
453							P::TARGET_NAME,
454							e,
455						);
456						self.best_submitted_number = None;
457						e.fail_if_connection_error()?;
458					}
459				},
460				_ = async_std::task::sleep(next_tick).fuse() => {},
461				_ = exit_signal => return Ok(()),
462			}
463		}
464	}
465
466	pub async fn run(
467		source_client: SC,
468		target_client: TC,
469		sync_params: FinalitySyncParams,
470		metrics_sync: Option<SyncLoopMetrics>,
471		exit_signal: impl Future<Output = ()>,
472	) -> Result<(), FailedClient> {
473		let mut finality_loop = Self::new(source_client, target_client, sync_params, metrics_sync);
474		finality_loop.run_until_connection_lost(exit_signal).await
475	}
476}
477
478async fn free_headers_interval<P: FinalitySyncPipeline>(
479	target_client: &impl TargetClient<P>,
480) -> Result<Option<P::Number>, FailedClient> {
481	match target_client.free_source_headers_interval().await {
482		Ok(Some(free_headers_interval)) if !free_headers_interval.is_zero() => {
483			log::trace!(
484				target: "bridge",
485				"Free headers interval for {} headers at {} is: {:?}",
486				P::SOURCE_NAME,
487				P::TARGET_NAME,
488				free_headers_interval,
489			);
490			Ok(Some(free_headers_interval))
491		},
492		Ok(Some(_free_headers_interval)) => {
493			log::trace!(
494				target: "bridge",
495				"Free headers interval for {} headers at {} is zero. Not submitting any free headers",
496				P::SOURCE_NAME,
497				P::TARGET_NAME,
498			);
499			Ok(None)
500		},
501		Ok(None) => {
502			log::trace!(
503				target: "bridge",
504				"Free headers interval for {} headers at {} is None. Not submitting any free headers",
505				P::SOURCE_NAME,
506				P::TARGET_NAME,
507			);
508
509			Ok(None)
510		},
511		Err(e) => {
512			log::error!(
513				target: "bridge",
514				"Failed to read free headers interval for {} headers at {}: {:?}",
515				P::SOURCE_NAME,
516				P::TARGET_NAME,
517				e,
518			);
519			Err(FailedClient::Target)
520		},
521	}
522}
523
524/// Run finality proofs synchronization loop.
525pub async fn run<P: FinalitySyncPipeline>(
526	source_client: impl SourceClient<P>,
527	target_client: impl TargetClient<P>,
528	sync_params: FinalitySyncParams,
529	metrics_params: MetricsParams,
530	exit_signal: impl Future<Output = ()> + 'static + Send,
531) -> Result<(), relay_utils::Error> {
532	let exit_signal = exit_signal.shared();
533	relay_utils::relay_loop(source_client, target_client)
534		.with_metrics(metrics_params)
535		.loop_metric(SyncLoopMetrics::new(
536			Some(&metrics_prefix::<P>()),
537			"source",
538			"source_at_target",
539		)?)?
540		.expose()
541		.await?
542		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
543			FinalityLoop::run(
544				source_client,
545				target_client,
546				sync_params.clone(),
547				metrics,
548				exit_signal.clone(),
549			)
550		})
551		.await
552}
553
554#[cfg(test)]
555mod tests {
556	use super::*;
557
558	use crate::mock::*;
559	use futures::{FutureExt, StreamExt};
560	use parking_lot::Mutex;
561	use relay_utils::{FailedClient, HeaderId, TrackedTransactionStatus};
562	use std::{collections::HashMap, sync::Arc};
563
564	fn prepare_test_clients(
565		exit_sender: futures::channel::mpsc::UnboundedSender<()>,
566		state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
567		source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
568	) -> (TestSourceClient, TestTargetClient) {
569		let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> =
570			Arc::new(move |data| {
571				if state_function(data) {
572					exit_sender.unbounded_send(()).unwrap();
573				}
574			});
575		let clients_data = Arc::new(Mutex::new(ClientsData {
576			source_best_block_number: 10,
577			source_headers,
578			source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
579
580			target_best_block_id: HeaderId(5, 5),
581			target_headers: vec![],
582			target_transaction_tracker: TestTransactionTracker(
583				TrackedTransactionStatus::Finalized(Default::default()),
584			),
585		}));
586		(
587			TestSourceClient {
588				on_method_call: internal_state_function.clone(),
589				data: clients_data.clone(),
590			},
591			TestTargetClient { on_method_call: internal_state_function, data: clients_data },
592		)
593	}
594
595	fn test_sync_params() -> FinalitySyncParams {
596		FinalitySyncParams {
597			tick: Duration::from_secs(0),
598			recent_finality_proofs_limit: 1024,
599			stall_timeout: Duration::from_secs(1),
600			headers_to_relay: HeadersToRelay::All,
601		}
602	}
603
604	fn run_sync_loop(
605		state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
606	) -> (ClientsData, Result<(), FailedClient>) {
607		let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
608		let (source_client, target_client) = prepare_test_clients(
609			exit_sender,
610			state_function,
611			vec![
612				(5, (TestSourceHeader(false, 5, 5), None)),
613				(6, (TestSourceHeader(false, 6, 6), None)),
614				(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
615				(8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
616				(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
617				(10, (TestSourceHeader(false, 10, 10), None)),
618			]
619			.into_iter()
620			.collect(),
621		);
622		let sync_params = test_sync_params();
623
624		let clients_data = source_client.data.clone();
625		let result = async_std::task::block_on(FinalityLoop::run(
626			source_client,
627			target_client,
628			sync_params,
629			None,
630			exit_receiver.into_future().map(|(_, _)| ()),
631		));
632
633		let clients_data = clients_data.lock().clone();
634		(clients_data, result)
635	}
636
637	#[test]
638	fn finality_sync_loop_works() {
639		let (client_data, result) = run_sync_loop(|data| {
640			// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted,
641			// because header#8 has persistent finality proof && it is mandatory => it is submitted
642			// header#9 has persistent finality proof, but it isn't mandatory => it is submitted,
643			// because   there are no more persistent finality proofs
644			//
645			// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14
646			// from the stream
647			if data.target_best_block_id.0 == 9 {
648				data.source_best_block_number = 14;
649				data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
650				data.source_headers
651					.insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
652				data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
653				data.source_headers
654					.insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
655			}
656			// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
657			if data.target_best_block_id.0 == 14 {
658				data.source_best_block_number = 17;
659				data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
660				data.source_headers
661					.insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
662				data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
663			}
664
665			data.target_best_block_id.0 == 16
666		});
667
668		assert_eq!(result, Ok(()));
669		assert_eq!(
670			client_data.target_headers,
671			vec![
672				// before adding 11..14: finality proof for mandatory header#8
673				(TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
674				// before adding 11..14: persistent finality proof for non-mandatory header#9
675				(TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
676				// after adding 11..14: ephemeral finality proof for non-mandatory header#14
677				(TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
678				// after adding 15..17: persistent finality proof for non-mandatory header#16
679				(TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
680			],
681		);
682	}
683
684	fn run_headers_to_relay_mode_test(
685		headers_to_relay: HeadersToRelay,
686		has_mandatory_headers: bool,
687	) -> Option<JustifiedHeader<TestFinalitySyncPipeline>> {
688		let (exit_sender, _) = futures::channel::mpsc::unbounded();
689		let (source_client, target_client) = prepare_test_clients(
690			exit_sender,
691			|_| false,
692			vec![
693				(6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
694				(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
695				(8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
696				(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
697				(10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
698			]
699			.into_iter()
700			.collect(),
701		);
702		async_std::task::block_on(async {
703			let mut finality_loop = FinalityLoop::new(
704				source_client,
705				target_client,
706				FinalitySyncParams {
707					tick: Duration::from_secs(0),
708					recent_finality_proofs_limit: 0,
709					stall_timeout: Duration::from_secs(0),
710					headers_to_relay,
711				},
712				None,
713			);
714			let info = SyncInfo {
715				best_number_at_source: 10,
716				best_number_at_target: 5,
717				is_using_same_fork: true,
718			};
719			finality_loop.select_header_to_submit(&info, Some(3)).await.unwrap()
720		})
721	}
722
723	#[test]
724	fn select_header_to_submit_may_select_non_mandatory_header() {
725		assert_eq!(run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, false), None);
726		assert_eq!(
727			run_headers_to_relay_mode_test(HeadersToRelay::Free, false),
728			Some(JustifiedHeader {
729				header: TestSourceHeader(false, 10, 10),
730				proof: TestFinalityProof(10)
731			}),
732		);
733		assert_eq!(
734			run_headers_to_relay_mode_test(HeadersToRelay::All, false),
735			Some(JustifiedHeader {
736				header: TestSourceHeader(false, 10, 10),
737				proof: TestFinalityProof(10)
738			}),
739		);
740	}
741
742	#[test]
743	fn select_header_to_submit_may_select_mandatory_header() {
744		assert_eq!(
745			run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, true),
746			Some(JustifiedHeader {
747				header: TestSourceHeader(true, 8, 8),
748				proof: TestFinalityProof(8)
749			}),
750		);
751		assert_eq!(
752			run_headers_to_relay_mode_test(HeadersToRelay::Free, true),
753			Some(JustifiedHeader {
754				header: TestSourceHeader(true, 8, 8),
755				proof: TestFinalityProof(8)
756			}),
757		);
758		assert_eq!(
759			run_headers_to_relay_mode_test(HeadersToRelay::All, true),
760			Some(JustifiedHeader {
761				header: TestSourceHeader(true, 8, 8),
762				proof: TestFinalityProof(8)
763			}),
764		);
765	}
766
767	#[test]
768	fn different_forks_at_source_and_at_target_are_detected() {
769		let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
770		let (source_client, target_client) = prepare_test_clients(
771			exit_sender,
772			|_| false,
773			vec![
774				(5, (TestSourceHeader(false, 5, 42), None)),
775				(6, (TestSourceHeader(false, 6, 6), None)),
776				(7, (TestSourceHeader(false, 7, 7), None)),
777				(8, (TestSourceHeader(false, 8, 8), None)),
778				(9, (TestSourceHeader(false, 9, 9), None)),
779				(10, (TestSourceHeader(false, 10, 10), None)),
780			]
781			.into_iter()
782			.collect(),
783		);
784
785		let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
786		async_std::task::block_on(async {
787			let mut finality_loop = FinalityLoop::new(
788				source_client,
789				target_client,
790				test_sync_params(),
791				Some(metrics_sync.clone()),
792			);
793			finality_loop.run_iteration(None).await.unwrap()
794		});
795
796		assert!(!metrics_sync.is_using_same_fork());
797	}
798}