1use crate::{sync_loop_metrics::SyncLoopMetrics, Error, FinalitySyncPipeline, SourceHeader};
23
24use crate::{
25 base::SourceClientBase,
26 finality_proofs::{FinalityProofsBuf, FinalityProofsStream},
27 headers::{JustifiedHeader, JustifiedHeaderSelector},
28};
29use async_trait::async_trait;
30use backoff::{backoff::Backoff, ExponentialBackoff};
31use futures::{future::Fuse, select, Future, FutureExt};
32use num_traits::{Saturating, Zero};
33use relay_utils::{
34 metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
35 HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
36};
37use std::{
38 fmt::Debug,
39 time::{Duration, Instant},
40};
41
42#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum HeadersToRelay {
45 All,
47 Mandatory,
49 Free,
51}
52
53#[derive(Debug, Clone)]
55pub struct FinalitySyncParams {
56 pub tick: Duration,
66 pub recent_finality_proofs_limit: usize,
74 pub stall_timeout: Duration,
76 pub headers_to_relay: HeadersToRelay,
78}
79
80#[async_trait]
82pub trait SourceClient<P: FinalitySyncPipeline>: SourceClientBase<P> {
83 async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
85
86 async fn header_and_finality_proof(
88 &self,
89 number: P::Number,
90 ) -> Result<(P::Header, Option<P::FinalityProof>), Self::Error>;
91}
92
93#[async_trait]
95pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
96 type TransactionTracker: TransactionTracker;
98
99 async fn best_finalized_source_block_id(
101 &self,
102 ) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
103
104 async fn free_source_headers_interval(&self) -> Result<Option<P::Number>, Self::Error>;
107
108 async fn submit_finality_proof(
110 &self,
111 header: P::Header,
112 proof: P::FinalityProof,
113 is_free_execution_expected: bool,
114 ) -> Result<Self::TransactionTracker, Self::Error>;
115}
116
117pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
120 format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
121}
122
123pub struct SyncInfo<P: FinalitySyncPipeline> {
125 pub best_number_at_source: P::Number,
127 pub best_number_at_target: P::Number,
129 pub is_using_same_fork: bool,
131}
132
133impl<P: FinalitySyncPipeline> SyncInfo<P> {
134 async fn is_on_same_fork<SC: SourceClient<P>>(
136 source_client: &SC,
137 id_at_target: &HeaderId<P::Hash, P::Number>,
138 ) -> Result<bool, SC::Error> {
139 let header_at_source = source_client.header_and_finality_proof(id_at_target.0).await?.0;
140 let header_hash_at_source = header_at_source.hash();
141 Ok(if id_at_target.1 == header_hash_at_source {
142 true
143 } else {
144 log::error!(
145 target: "bridge",
146 "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
147 at-source {:?} vs at-target {:?}",
148 P::SOURCE_NAME,
149 P::TARGET_NAME,
150 id_at_target.0,
151 header_hash_at_source,
152 id_at_target.1,
153 );
154
155 false
156 })
157 }
158
159 async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
160 source_client: &SC,
161 target_client: &TC,
162 ) -> Result<Self, Error<P, SC::Error, TC::Error>> {
163 let best_number_at_source =
164 source_client.best_finalized_block_number().await.map_err(Error::Source)?;
165 let best_id_at_target =
166 target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
167 let best_number_at_target = best_id_at_target.0;
168
169 let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target)
170 .await
171 .map_err(Error::Source)?;
172
173 Ok(Self { best_number_at_source, best_number_at_target, is_using_same_fork })
174 }
175
176 fn update_metrics(&self, metrics_sync: &Option<SyncLoopMetrics>) {
177 if let Some(metrics_sync) = metrics_sync {
178 metrics_sync.update_best_block_at_source(self.best_number_at_source);
179 metrics_sync.update_best_block_at_target(self.best_number_at_target);
180 metrics_sync.update_using_same_fork(self.is_using_same_fork);
181 }
182 }
183
184 pub fn num_headers(&self) -> P::Number {
185 self.best_number_at_source.saturating_sub(self.best_number_at_target)
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct Transaction<Tracker, Number> {
192 tracker: Tracker,
194 header_number: Number,
196}
197
198impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracker, Number> {
199 pub async fn submit<
200 P: FinalitySyncPipeline<Number = Number>,
201 TC: TargetClient<P, TransactionTracker = Tracker>,
202 >(
203 target_client: &TC,
204 header: P::Header,
205 justification: P::FinalityProof,
206 is_free_execution_expected: bool,
207 ) -> Result<Self, TC::Error> {
208 let header_number = header.number();
209 log::debug!(
210 target: "bridge",
211 "Going to submit finality proof of {} header #{:?} to {}",
212 P::SOURCE_NAME,
213 header_number,
214 P::TARGET_NAME,
215 );
216
217 let tracker = target_client
218 .submit_finality_proof(header, justification, is_free_execution_expected)
219 .await?;
220 Ok(Transaction { tracker, header_number })
221 }
222
223 async fn track<
224 P: FinalitySyncPipeline<Number = Number>,
225 SC: SourceClient<P>,
226 TC: TargetClient<P>,
227 >(
228 self,
229 target_client: TC,
230 ) -> Result<(), Error<P, SC::Error, TC::Error>> {
231 match self.tracker.wait().await {
232 TrackedTransactionStatus::Finalized(_) => {
233 target_client
236 .best_finalized_source_block_id()
237 .await
238 .map_err(Error::Target)
239 .and_then(|best_id_at_target| {
240 if self.header_number > best_id_at_target.0 {
241 return Err(Error::ProofSubmissionTxFailed {
242 submitted_number: self.header_number,
243 best_number_at_target: best_id_at_target.0,
244 })
245 }
246 Ok(())
247 })
248 },
249 TrackedTransactionStatus::Lost => Err(Error::ProofSubmissionTxLost),
250 }
251 }
252}
253
254struct FinalityLoop<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> {
256 source_client: SC,
257 target_client: TC,
258
259 sync_params: FinalitySyncParams,
260 metrics_sync: Option<SyncLoopMetrics>,
261
262 progress: (Instant, Option<P::Number>),
263 retry_backoff: ExponentialBackoff,
264 finality_proofs_stream: FinalityProofsStream<P, SC>,
265 finality_proofs_buf: FinalityProofsBuf<P>,
266 best_submitted_number: Option<P::Number>,
267}
268
269impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> FinalityLoop<P, SC, TC> {
270 pub fn new(
271 source_client: SC,
272 target_client: TC,
273 sync_params: FinalitySyncParams,
274 metrics_sync: Option<SyncLoopMetrics>,
275 ) -> Self {
276 Self {
277 source_client,
278 target_client,
279 sync_params,
280 metrics_sync,
281 progress: (Instant::now(), None),
282 retry_backoff: retry_backoff(),
283 finality_proofs_stream: FinalityProofsStream::new(),
284 finality_proofs_buf: FinalityProofsBuf::new(vec![]),
285 best_submitted_number: None,
286 }
287 }
288
289 fn update_progress(&mut self, info: &SyncInfo<P>) {
290 let (prev_time, prev_best_number_at_target) = self.progress;
291 let now = Instant::now();
292
293 let needs_update = now - prev_time > Duration::from_secs(10) ||
294 prev_best_number_at_target
295 .map(|prev_best_number_at_target| {
296 info.best_number_at_target.saturating_sub(prev_best_number_at_target) >
297 10.into()
298 })
299 .unwrap_or(true);
300
301 if !needs_update {
302 return
303 }
304
305 log::info!(
306 target: "bridge",
307 "Synced {:?} of {:?} headers",
308 info.best_number_at_target,
309 info.best_number_at_source,
310 );
311
312 self.progress = (now, Some(info.best_number_at_target))
313 }
314
315 pub async fn select_header_to_submit(
316 &mut self,
317 info: &SyncInfo<P>,
318 free_headers_interval: Option<P::Number>,
319 ) -> Result<Option<JustifiedHeader<P>>, Error<P, SC::Error, TC::Error>> {
320 log::trace!(
322 target: "bridge",
323 "Considering range of headers ({}; {}]",
324 info.best_number_at_target,
325 info.best_number_at_source
326 );
327
328 let selector = JustifiedHeaderSelector::new::<SC, TC>(
330 &self.source_client,
331 info,
332 self.sync_params.headers_to_relay,
333 free_headers_interval,
334 )
335 .await?;
336 if self.sync_params.headers_to_relay == HeadersToRelay::Mandatory {
338 return Ok(selector.select_mandatory())
339 }
340
341 self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
345 let maybe_justified_header = selector.select(
346 info,
347 self.sync_params.headers_to_relay,
348 free_headers_interval,
349 &self.finality_proofs_buf,
350 );
351
352 let oldest_finality_proof_to_keep = maybe_justified_header
354 .as_ref()
355 .map(|justified_header| justified_header.number())
356 .unwrap_or(info.best_number_at_target);
357 self.finality_proofs_buf.prune(
358 oldest_finality_proof_to_keep,
359 Some(self.sync_params.recent_finality_proofs_limit),
360 );
361
362 Ok(maybe_justified_header)
363 }
364
365 pub async fn run_iteration(
366 &mut self,
367 free_headers_interval: Option<P::Number>,
368 ) -> Result<
369 Option<Transaction<TC::TransactionTracker, P::Number>>,
370 Error<P, SC::Error, TC::Error>,
371 > {
372 let info = SyncInfo::new(&self.source_client, &self.target_client).await?;
374 info.update_metrics(&self.metrics_sync);
375 self.update_progress(&info);
376
377 if Some(info.best_number_at_target) < self.best_submitted_number {
380 return Ok(None)
381 }
382
383 match self.select_header_to_submit(&info, free_headers_interval).await? {
385 Some(header) => {
386 let transaction = Transaction::submit(
387 &self.target_client,
388 header.header,
389 header.proof,
390 self.sync_params.headers_to_relay == HeadersToRelay::Free,
391 )
392 .await
393 .map_err(Error::Target)?;
394 self.best_submitted_number = Some(transaction.header_number);
395 Ok(Some(transaction))
396 },
397 None => Ok(None),
398 }
399 }
400
401 async fn ensure_finality_proofs_stream(&mut self) -> Result<(), FailedClient> {
402 if let Err(e) = self.finality_proofs_stream.ensure_stream(&self.source_client).await {
403 if e.is_connection_error() {
404 return Err(FailedClient::Source)
405 }
406 }
407
408 Ok(())
409 }
410
411 async fn run_until_connection_lost(
413 &mut self,
414 exit_signal: impl Future<Output = ()>,
415 ) -> Result<(), FailedClient> {
416 self.ensure_finality_proofs_stream().await?;
417 let proof_submission_tx_tracker = Fuse::terminated();
418 let exit_signal = exit_signal.fuse();
419 futures::pin_mut!(exit_signal, proof_submission_tx_tracker);
420
421 let free_headers_interval = free_headers_interval(&self.target_client).await?;
422
423 loop {
424 let next_tick = match self.run_iteration(free_headers_interval).await {
426 Ok(Some(tx)) => {
427 proof_submission_tx_tracker
428 .set(tx.track::<P, SC, _>(self.target_client.clone()).fuse());
429 self.retry_backoff.reset();
430 self.sync_params.tick
431 },
432 Ok(None) => {
433 self.retry_backoff.reset();
434 self.sync_params.tick
435 },
436 Err(error) => {
437 log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
438 error.fail_if_connection_error()?;
439 self.retry_backoff
440 .next_backoff()
441 .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
442 },
443 };
444 self.ensure_finality_proofs_stream().await?;
445
446 select! {
448 proof_submission_result = proof_submission_tx_tracker => {
449 if let Err(e) = proof_submission_result {
450 log::error!(
451 target: "bridge",
452 "Finality sync proof submission tx to {} has failed with error: {:?}.",
453 P::TARGET_NAME,
454 e,
455 );
456 self.best_submitted_number = None;
457 e.fail_if_connection_error()?;
458 }
459 },
460 _ = async_std::task::sleep(next_tick).fuse() => {},
461 _ = exit_signal => return Ok(()),
462 }
463 }
464 }
465
466 pub async fn run(
467 source_client: SC,
468 target_client: TC,
469 sync_params: FinalitySyncParams,
470 metrics_sync: Option<SyncLoopMetrics>,
471 exit_signal: impl Future<Output = ()>,
472 ) -> Result<(), FailedClient> {
473 let mut finality_loop = Self::new(source_client, target_client, sync_params, metrics_sync);
474 finality_loop.run_until_connection_lost(exit_signal).await
475 }
476}
477
478async fn free_headers_interval<P: FinalitySyncPipeline>(
479 target_client: &impl TargetClient<P>,
480) -> Result<Option<P::Number>, FailedClient> {
481 match target_client.free_source_headers_interval().await {
482 Ok(Some(free_headers_interval)) if !free_headers_interval.is_zero() => {
483 log::trace!(
484 target: "bridge",
485 "Free headers interval for {} headers at {} is: {:?}",
486 P::SOURCE_NAME,
487 P::TARGET_NAME,
488 free_headers_interval,
489 );
490 Ok(Some(free_headers_interval))
491 },
492 Ok(Some(_free_headers_interval)) => {
493 log::trace!(
494 target: "bridge",
495 "Free headers interval for {} headers at {} is zero. Not submitting any free headers",
496 P::SOURCE_NAME,
497 P::TARGET_NAME,
498 );
499 Ok(None)
500 },
501 Ok(None) => {
502 log::trace!(
503 target: "bridge",
504 "Free headers interval for {} headers at {} is None. Not submitting any free headers",
505 P::SOURCE_NAME,
506 P::TARGET_NAME,
507 );
508
509 Ok(None)
510 },
511 Err(e) => {
512 log::error!(
513 target: "bridge",
514 "Failed to read free headers interval for {} headers at {}: {:?}",
515 P::SOURCE_NAME,
516 P::TARGET_NAME,
517 e,
518 );
519 Err(FailedClient::Target)
520 },
521 }
522}
523
524pub async fn run<P: FinalitySyncPipeline>(
526 source_client: impl SourceClient<P>,
527 target_client: impl TargetClient<P>,
528 sync_params: FinalitySyncParams,
529 metrics_params: MetricsParams,
530 exit_signal: impl Future<Output = ()> + 'static + Send,
531) -> Result<(), relay_utils::Error> {
532 let exit_signal = exit_signal.shared();
533 relay_utils::relay_loop(source_client, target_client)
534 .with_metrics(metrics_params)
535 .loop_metric(SyncLoopMetrics::new(
536 Some(&metrics_prefix::<P>()),
537 "source",
538 "source_at_target",
539 )?)?
540 .expose()
541 .await?
542 .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
543 FinalityLoop::run(
544 source_client,
545 target_client,
546 sync_params.clone(),
547 metrics,
548 exit_signal.clone(),
549 )
550 })
551 .await
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557
558 use crate::mock::*;
559 use futures::{FutureExt, StreamExt};
560 use parking_lot::Mutex;
561 use relay_utils::{FailedClient, HeaderId, TrackedTransactionStatus};
562 use std::{collections::HashMap, sync::Arc};
563
564 fn prepare_test_clients(
565 exit_sender: futures::channel::mpsc::UnboundedSender<()>,
566 state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
567 source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
568 ) -> (TestSourceClient, TestTargetClient) {
569 let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> =
570 Arc::new(move |data| {
571 if state_function(data) {
572 exit_sender.unbounded_send(()).unwrap();
573 }
574 });
575 let clients_data = Arc::new(Mutex::new(ClientsData {
576 source_best_block_number: 10,
577 source_headers,
578 source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
579
580 target_best_block_id: HeaderId(5, 5),
581 target_headers: vec![],
582 target_transaction_tracker: TestTransactionTracker(
583 TrackedTransactionStatus::Finalized(Default::default()),
584 ),
585 }));
586 (
587 TestSourceClient {
588 on_method_call: internal_state_function.clone(),
589 data: clients_data.clone(),
590 },
591 TestTargetClient { on_method_call: internal_state_function, data: clients_data },
592 )
593 }
594
595 fn test_sync_params() -> FinalitySyncParams {
596 FinalitySyncParams {
597 tick: Duration::from_secs(0),
598 recent_finality_proofs_limit: 1024,
599 stall_timeout: Duration::from_secs(1),
600 headers_to_relay: HeadersToRelay::All,
601 }
602 }
603
604 fn run_sync_loop(
605 state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
606 ) -> (ClientsData, Result<(), FailedClient>) {
607 let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
608 let (source_client, target_client) = prepare_test_clients(
609 exit_sender,
610 state_function,
611 vec![
612 (5, (TestSourceHeader(false, 5, 5), None)),
613 (6, (TestSourceHeader(false, 6, 6), None)),
614 (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
615 (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
616 (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
617 (10, (TestSourceHeader(false, 10, 10), None)),
618 ]
619 .into_iter()
620 .collect(),
621 );
622 let sync_params = test_sync_params();
623
624 let clients_data = source_client.data.clone();
625 let result = async_std::task::block_on(FinalityLoop::run(
626 source_client,
627 target_client,
628 sync_params,
629 None,
630 exit_receiver.into_future().map(|(_, _)| ()),
631 ));
632
633 let clients_data = clients_data.lock().clone();
634 (clients_data, result)
635 }
636
637 #[test]
638 fn finality_sync_loop_works() {
639 let (client_data, result) = run_sync_loop(|data| {
640 if data.target_best_block_id.0 == 9 {
648 data.source_best_block_number = 14;
649 data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
650 data.source_headers
651 .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
652 data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
653 data.source_headers
654 .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
655 }
656 if data.target_best_block_id.0 == 14 {
658 data.source_best_block_number = 17;
659 data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
660 data.source_headers
661 .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
662 data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
663 }
664
665 data.target_best_block_id.0 == 16
666 });
667
668 assert_eq!(result, Ok(()));
669 assert_eq!(
670 client_data.target_headers,
671 vec![
672 (TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
674 (TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
676 (TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
678 (TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
680 ],
681 );
682 }
683
684 fn run_headers_to_relay_mode_test(
685 headers_to_relay: HeadersToRelay,
686 has_mandatory_headers: bool,
687 ) -> Option<JustifiedHeader<TestFinalitySyncPipeline>> {
688 let (exit_sender, _) = futures::channel::mpsc::unbounded();
689 let (source_client, target_client) = prepare_test_clients(
690 exit_sender,
691 |_| false,
692 vec![
693 (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
694 (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
695 (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
696 (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
697 (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
698 ]
699 .into_iter()
700 .collect(),
701 );
702 async_std::task::block_on(async {
703 let mut finality_loop = FinalityLoop::new(
704 source_client,
705 target_client,
706 FinalitySyncParams {
707 tick: Duration::from_secs(0),
708 recent_finality_proofs_limit: 0,
709 stall_timeout: Duration::from_secs(0),
710 headers_to_relay,
711 },
712 None,
713 );
714 let info = SyncInfo {
715 best_number_at_source: 10,
716 best_number_at_target: 5,
717 is_using_same_fork: true,
718 };
719 finality_loop.select_header_to_submit(&info, Some(3)).await.unwrap()
720 })
721 }
722
723 #[test]
724 fn select_header_to_submit_may_select_non_mandatory_header() {
725 assert_eq!(run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, false), None);
726 assert_eq!(
727 run_headers_to_relay_mode_test(HeadersToRelay::Free, false),
728 Some(JustifiedHeader {
729 header: TestSourceHeader(false, 10, 10),
730 proof: TestFinalityProof(10)
731 }),
732 );
733 assert_eq!(
734 run_headers_to_relay_mode_test(HeadersToRelay::All, false),
735 Some(JustifiedHeader {
736 header: TestSourceHeader(false, 10, 10),
737 proof: TestFinalityProof(10)
738 }),
739 );
740 }
741
742 #[test]
743 fn select_header_to_submit_may_select_mandatory_header() {
744 assert_eq!(
745 run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, true),
746 Some(JustifiedHeader {
747 header: TestSourceHeader(true, 8, 8),
748 proof: TestFinalityProof(8)
749 }),
750 );
751 assert_eq!(
752 run_headers_to_relay_mode_test(HeadersToRelay::Free, true),
753 Some(JustifiedHeader {
754 header: TestSourceHeader(true, 8, 8),
755 proof: TestFinalityProof(8)
756 }),
757 );
758 assert_eq!(
759 run_headers_to_relay_mode_test(HeadersToRelay::All, true),
760 Some(JustifiedHeader {
761 header: TestSourceHeader(true, 8, 8),
762 proof: TestFinalityProof(8)
763 }),
764 );
765 }
766
767 #[test]
768 fn different_forks_at_source_and_at_target_are_detected() {
769 let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
770 let (source_client, target_client) = prepare_test_clients(
771 exit_sender,
772 |_| false,
773 vec![
774 (5, (TestSourceHeader(false, 5, 42), None)),
775 (6, (TestSourceHeader(false, 6, 6), None)),
776 (7, (TestSourceHeader(false, 7, 7), None)),
777 (8, (TestSourceHeader(false, 8, 8), None)),
778 (9, (TestSourceHeader(false, 9, 9), None)),
779 (10, (TestSourceHeader(false, 10, 10), None)),
780 ]
781 .into_iter()
782 .collect(),
783 );
784
785 let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
786 async_std::task::block_on(async {
787 let mut finality_loop = FinalityLoop::new(
788 source_client,
789 target_client,
790 test_sync_params(),
791 Some(metrics_sync.clone()),
792 );
793 finality_loop.run_iteration(None).await.unwrap()
794 });
795
796 assert!(!metrics_sync.is_using_same_fork());
797 }
798}