1use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
18
19use async_trait::async_trait;
20use bp_polkadot_core::{
21 parachains::{ParaHash, ParaHeadsProof, ParaId},
22 BlockNumber as RelayBlockNumber,
23};
24use futures::{
25 future::{FutureExt, Shared},
26 poll, select_biased,
27};
28use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
29use relay_utils::{
30 metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
31 TrackedTransactionStatus, TransactionTracker,
32};
33use std::{future::Future, pin::Pin, task::Poll};
34
35#[derive(Clone, Copy, Debug)]
37pub enum AvailableHeader<T> {
38 Unavailable,
44 Missing,
48 Available(T),
50}
51
52impl<T> AvailableHeader<T> {
53 pub fn as_available(&self) -> Option<&T> {
55 match *self {
56 AvailableHeader::Available(ref header) => Some(header),
57 _ => None,
58 }
59 }
60}
61
62impl<T> From<Option<T>> for AvailableHeader<T> {
63 fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
64 match maybe_header {
65 Some(header) => AvailableHeader::Available(header),
66 None => AvailableHeader::Missing,
67 }
68 }
69}
70
71#[async_trait]
73pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
74 async fn ensure_synced(&self) -> Result<bool, Self::Error>;
76
77 async fn parachain_head(
79 &self,
80 at_block: HeaderIdOf<P::SourceRelayChain>,
81 ) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error>;
82
83 async fn prove_parachain_head(
85 &self,
86 at_block: HeaderIdOf<P::SourceRelayChain>,
87 ) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
88}
89
90#[async_trait]
92pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
93 type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
95
96 async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
98
99 async fn best_finalized_source_relay_chain_block(
102 &self,
103 at_block: &HeaderIdOf<P::TargetChain>,
104 ) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
105 async fn free_source_relay_headers_interval(
109 &self,
110 ) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
111
112 async fn parachain_head(
114 &self,
115 at_block: HeaderIdOf<P::TargetChain>,
116 ) -> Result<
117 Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
118 Self::Error,
119 >;
120
121 async fn submit_parachain_head_proof(
123 &self,
124 at_source_block: HeaderIdOf<P::SourceRelayChain>,
125 para_head_hash: ParaHash,
126 proof: ParaHeadsProof,
127 is_free_execution_expected: bool,
128 ) -> Result<Self::TransactionTracker, Self::Error>;
129}
130
131pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
134 format!(
135 "{}_to_{}_Parachains_{}",
136 P::SourceRelayChain::NAME,
137 P::TargetChain::NAME,
138 P::SourceParachain::PARACHAIN_ID
139 )
140}
141
142pub async fn relay_single_head<P: ParachainsPipeline>(
144 source_client: impl SourceClient<P>,
145 target_client: impl TargetClient<P>,
146 at_relay_block: HeaderIdOf<P::SourceRelayChain>,
147) -> Result<(), ()>
148where
149 P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
150{
151 let tx_tracker =
152 submit_selected_head::<P, _>(&source_client, &target_client, at_relay_block, false)
153 .await
154 .map_err(drop)?;
155 match tx_tracker.wait().await {
156 TrackedTransactionStatus::Finalized(_) => Ok(()),
157 TrackedTransactionStatus::Lost => {
158 log::error!(
159 "Transaction with {} header at relay header {:?} is considered lost at {}",
160 P::SourceParachain::NAME,
161 at_relay_block,
162 P::TargetChain::NAME,
163 );
164 Err(())
165 },
166 }
167}
168
169pub async fn run<P: ParachainsPipeline>(
171 source_client: impl SourceClient<P>,
172 target_client: impl TargetClient<P>,
173 metrics_params: MetricsParams,
174 only_free_headers: bool,
175 exit_signal: impl Future<Output = ()> + 'static + Send,
176) -> Result<(), relay_utils::Error>
177where
178 P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
179{
180 log::info!(
181 target: "bridge",
182 "Starting {} -> {} finality proof relay: relaying (only_free_headers: {:?}) headers",
183 P::SourceParachain::NAME,
184 P::TargetChain::NAME,
185 only_free_headers,
186 );
187
188 let exit_signal = exit_signal.shared();
189 relay_utils::relay_loop(source_client, target_client)
190 .with_metrics(metrics_params)
191 .loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
192 .expose()
193 .await?
194 .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
195 run_until_connection_lost(
196 source_client,
197 target_client,
198 metrics,
199 only_free_headers,
200 exit_signal.clone(),
201 )
202 })
203 .await
204}
205
206async fn run_until_connection_lost<P: ParachainsPipeline>(
208 source_client: impl SourceClient<P>,
209 target_client: impl TargetClient<P>,
210 metrics: Option<ParachainsLoopMetrics>,
211 only_free_headers: bool,
212 exit_signal: impl Future<Output = ()> + Send,
213) -> Result<(), FailedClient>
214where
215 P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
216{
217 let exit_signal = exit_signal.fuse();
218 let min_block_interval = std::cmp::min(
219 P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
220 P::TargetChain::AVERAGE_BLOCK_INTERVAL,
221 );
222
223 let free_source_relay_headers_interval = if only_free_headers {
226 let free_source_relay_headers_interval =
227 target_client.free_source_relay_headers_interval().await.map_err(|e| {
228 log::warn!(
229 target: "bridge",
230 "Failed to read free {} headers interval at {}: {:?}",
231 P::SourceRelayChain::NAME,
232 P::TargetChain::NAME,
233 e,
234 );
235 FailedClient::Target
236 })?;
237 match free_source_relay_headers_interval {
238 Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
239 log::trace!(
240 target: "bridge",
241 "Free {} headers interval at {}: {:?}",
242 P::SourceRelayChain::NAME,
243 P::TargetChain::NAME,
244 free_source_relay_headers_interval,
245 );
246 free_source_relay_headers_interval
247 },
248 _ => {
249 log::warn!(
250 target: "bridge",
251 "Invalid free {} headers interval at {}: {:?}",
252 P::SourceRelayChain::NAME,
253 P::TargetChain::NAME,
254 free_source_relay_headers_interval,
255 );
256 return Err(FailedClient::Target)
257 },
258 }
259 } else {
260 0
262 };
263
264 let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
265
266 futures::pin_mut!(exit_signal);
267
268 loop {
273 select_biased! {
277 _ = exit_signal => return Ok(()),
278 _ = async_std::task::sleep(min_block_interval).fuse() => {},
279 }
280
281 match source_client.ensure_synced().await {
284 Ok(true) => (),
285 Ok(false) => {
286 log::warn!(
287 target: "bridge",
288 "{} client is syncing. Won't do anything until it is synced",
289 P::SourceRelayChain::NAME,
290 );
291 continue
292 },
293 Err(e) => {
294 log::warn!(
295 target: "bridge",
296 "{} client has failed to return its sync status: {:?}",
297 P::SourceRelayChain::NAME,
298 e,
299 );
300 return Err(FailedClient::Source)
301 },
302 }
303
304 let best_target_block = target_client.best_block().await.map_err(|e| {
306 log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
307 FailedClient::Target
308 })?;
309 let (relay_of_head_at_target, head_at_target) =
310 read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
311
312 if let Some(tracker) = submitted_heads_tracker.take() {
314 match tracker.update(&best_target_block, &head_at_target).await {
315 SubmittedHeadStatus::Waiting(tracker) => {
316 submitted_heads_tracker = Some(tracker);
318 continue
319 },
320 SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
321 },
323 SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
324 log::warn!(
325 target: "bridge",
326 "Parachains synchronization from {} to {} has stalled. Going to restart",
327 P::SourceRelayChain::NAME,
328 P::TargetChain::NAME,
329 );
330
331 return Err(FailedClient::Both)
332 },
333 }
334 }
335
336 let best_finalized_relay_block_at_target = target_client
339 .best_finalized_source_relay_chain_block(&best_target_block)
340 .await
341 .map_err(|e| {
342 log::warn!(
343 target: "bridge",
344 "Failed to read best finalized {} block from {}: {:?}",
345 P::SourceRelayChain::NAME,
346 P::TargetChain::NAME,
347 e,
348 );
349 FailedClient::Target
350 })?;
351
352 let prove_at_relay_block = if only_free_headers {
356 match relay_of_head_at_target {
357 Some(relay_of_head_at_target) => {
358 let scan_range_begin = relay_of_head_at_target.number();
360 let scan_range_end = best_finalized_relay_block_at_target.number();
361 if scan_range_end.saturating_sub(scan_range_begin) <
362 free_source_relay_headers_interval
363 {
364 log::trace!(
366 target: "bridge",
367 "Waiting for new free {} headers at {}: scanned {:?}..={:?}",
368 P::SourceRelayChain::NAME,
369 P::TargetChain::NAME,
370 scan_range_begin,
371 scan_range_end,
372 );
373 continue;
374 }
375
376 best_finalized_relay_block_at_target
378 },
379 None => {
380 best_finalized_relay_block_at_target
382 },
383 }
384 } else {
385 best_finalized_relay_block_at_target
386 };
387
388 let head_at_source =
390 read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
391 let is_update_required = is_update_required::<P>(
392 head_at_source,
393 head_at_target,
394 prove_at_relay_block,
395 best_target_block,
396 );
397
398 if is_update_required {
399 let transaction_tracker = submit_selected_head::<P, _>(
400 &source_client,
401 &target_client,
402 prove_at_relay_block,
403 only_free_headers,
404 )
405 .await?;
406 submitted_heads_tracker =
407 Some(SubmittedHeadsTracker::<P>::new(head_at_source, transaction_tracker));
408 }
409 }
410}
411
412async fn submit_selected_head<P: ParachainsPipeline, TC: TargetClient<P>>(
414 source_client: &impl SourceClient<P>,
415 target_client: &TC,
416 prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
417 only_free_headers: bool,
418) -> Result<TC::TransactionTracker, FailedClient> {
419 let (head_proof, head_hash) =
420 source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| {
421 log::warn!(
422 target: "bridge",
423 "Failed to prove {} parachain ParaId({}) heads: {:?}",
424 P::SourceRelayChain::NAME,
425 P::SourceParachain::PARACHAIN_ID,
426 e,
427 );
428 FailedClient::Source
429 })?;
430 log::info!(
431 target: "bridge",
432 "Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}",
433 P::SourceRelayChain::NAME,
434 P::SourceParachain::PARACHAIN_ID,
435 P::TargetChain::NAME,
436 prove_at_relay_block,
437 head_hash,
438 );
439
440 target_client
441 .submit_parachain_head_proof(prove_at_relay_block, head_hash, head_proof, only_free_headers)
442 .await
443 .map_err(|e| {
444 log::warn!(
445 target: "bridge",
446 "Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}",
447 P::SourceRelayChain::NAME,
448 P::SourceParachain::PARACHAIN_ID,
449 P::TargetChain::NAME,
450 e,
451 );
452 FailedClient::Target
453 })
454}
455
456fn is_update_required<P: ParachainsPipeline>(
458 head_at_source: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
459 head_at_target: Option<HeaderIdOf<P::SourceParachain>>,
460 prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
461 best_target_block: HeaderIdOf<P::TargetChain>,
462) -> bool
463where
464 P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
465{
466 log::trace!(
467 target: "bridge",
468 "Checking if {} parachain ParaId({}) needs update at {}:\n\t\
469 At {} ({:?}): {:?}\n\t\
470 At {} ({:?}): {:?}",
471 P::SourceRelayChain::NAME,
472 P::SourceParachain::PARACHAIN_ID,
473 P::TargetChain::NAME,
474 P::SourceRelayChain::NAME,
475 prove_at_relay_block,
476 head_at_source,
477 P::TargetChain::NAME,
478 best_target_block,
479 head_at_target,
480 );
481
482 let needs_update = match (head_at_source, head_at_target) {
483 (AvailableHeader::Unavailable, _) => {
484 false
487 },
488 (AvailableHeader::Available(head_at_source), Some(head_at_target))
489 if head_at_source.number() > head_at_target.number() =>
490 {
491 true
494 },
495 (AvailableHeader::Available(_), Some(_)) => {
496 false
499 },
500 (AvailableHeader::Available(_), None) => {
501 true
504 },
505 (AvailableHeader::Missing, Some(_)) => {
506 true
509 },
510 (AvailableHeader::Missing, None) => {
511 false
513 },
514 };
515
516 if needs_update {
517 log::trace!(
518 target: "bridge",
519 "{} parachain ParaId({}) needs update at {}: {:?} vs {:?}",
520 P::SourceRelayChain::NAME,
521 P::SourceParachain::PARACHAIN_ID,
522 P::TargetChain::NAME,
523 head_at_source,
524 head_at_target,
525 );
526 }
527
528 needs_update
529}
530
531async fn read_head_at_source<P: ParachainsPipeline>(
533 source_client: &impl SourceClient<P>,
534 metrics: Option<&ParachainsLoopMetrics>,
535 at_relay_block: &HeaderIdOf<P::SourceRelayChain>,
536) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, FailedClient> {
537 let para_head = source_client.parachain_head(*at_relay_block).await;
538 match para_head {
539 Ok(AvailableHeader::Available(para_head)) => {
540 if let Some(metrics) = metrics {
541 metrics.update_best_parachain_block_at_source(
542 ParaId(P::SourceParachain::PARACHAIN_ID),
543 para_head.number(),
544 );
545 }
546 Ok(AvailableHeader::Available(para_head))
547 },
548 Ok(r) => Ok(r),
549 Err(e) => {
550 log::warn!(
551 target: "bridge",
552 "Failed to read head of {} parachain ParaId({:?}): {:?}",
553 P::SourceRelayChain::NAME,
554 P::SourceParachain::PARACHAIN_ID,
555 e,
556 );
557 Err(FailedClient::Source)
558 },
559 }
560}
561
562async fn read_head_at_target<P: ParachainsPipeline>(
565 target_client: &impl TargetClient<P>,
566 metrics: Option<&ParachainsLoopMetrics>,
567 at_block: &HeaderIdOf<P::TargetChain>,
568) -> Result<
569 (Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceParachain>>),
570 FailedClient,
571> {
572 let para_head_id = target_client.parachain_head(*at_block).await;
573 match para_head_id {
574 Ok(Some((relay_header_id, para_head_id))) => {
575 if let Some(metrics) = metrics {
576 metrics.update_best_parachain_block_at_target(
577 ParaId(P::SourceParachain::PARACHAIN_ID),
578 para_head_id.number(),
579 );
580 }
581 Ok((Some(relay_header_id), Some(para_head_id)))
582 },
583 Ok(None) => Ok((None, None)),
584 Err(e) => {
585 log::warn!(
586 target: "bridge",
587 "Failed to read head of {} parachain ParaId({}) at {}: {:?}",
588 P::SourceRelayChain::NAME,
589 P::SourceParachain::PARACHAIN_ID,
590 P::TargetChain::NAME,
591 e,
592 );
593 Err(FailedClient::Target)
594 },
595 }
596}
597
598enum SubmittedHeadStatus<P: ParachainsPipeline> {
600 Waiting(SubmittedHeadsTracker<P>),
602 Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
604}
605
606type SharedTransactionTracker<P> = Shared<
610 Pin<
611 Box<
612 dyn Future<
613 Output = TrackedTransactionStatus<
614 HeaderIdOf<<P as ParachainsPipeline>::TargetChain>,
615 >,
616 > + Send,
617 >,
618 >,
619>;
620
621struct SubmittedHeadsTracker<P: ParachainsPipeline> {
623 submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
625 transaction_tracker: SharedTransactionTracker<P>,
629}
630
631impl<P: ParachainsPipeline> SubmittedHeadsTracker<P> {
632 pub fn new(
634 submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
635 transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
636 ) -> Self {
637 SubmittedHeadsTracker {
638 submitted_head,
639 transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
640 }
641 }
642
643 pub async fn update(
645 self,
646 at_target_block: &HeaderIdOf<P::TargetChain>,
647 head_at_target: &Option<HeaderIdOf<P::SourceParachain>>,
648 ) -> SubmittedHeadStatus<P> {
649 let is_head_updated = match (self.submitted_head, head_at_target) {
651 (AvailableHeader::Available(submitted_head), Some(head_at_target))
652 if head_at_target.number() >= submitted_head.number() =>
653 true,
654 (AvailableHeader::Missing, None) => true,
655 _ => false,
656 };
657 if is_head_updated {
658 log::trace!(
659 target: "bridge",
660 "Head of parachain ParaId({}) has been updated at {}: {:?}",
661 P::SourceParachain::PARACHAIN_ID,
662 P::TargetChain::NAME,
663 head_at_target,
664 );
665
666 return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block))
667 }
668
669 let transaction_tracker = self.transaction_tracker.clone();
672 match poll!(transaction_tracker) {
673 Poll::Ready(TrackedTransactionStatus::Lost) =>
674 return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
675 Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
676 return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
679 },
680 _ => (),
681 }
682
683 SubmittedHeadStatus::Waiting(self)
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690 use async_std::sync::{Arc, Mutex};
691 use futures::{SinkExt, StreamExt};
692 use relay_substrate_client::test_chain::{TestChain, TestParachain};
693 use relay_utils::{HeaderId, MaybeConnectionError};
694 use sp_core::H256;
695 use std::collections::HashMap;
696
697 const PARA_10_HASH: ParaHash = H256([10u8; 32]);
698 const PARA_20_HASH: ParaHash = H256([20u8; 32]);
699
700 #[derive(Clone, Debug)]
701 enum TestError {
702 Error,
703 }
704
705 impl MaybeConnectionError for TestError {
706 fn is_connection_error(&self) -> bool {
707 false
708 }
709 }
710
711 #[derive(Clone, Debug, PartialEq, Eq)]
712 struct TestParachainsPipeline;
713
714 impl ParachainsPipeline for TestParachainsPipeline {
715 type SourceRelayChain = TestChain;
716 type SourceParachain = TestParachain;
717 type TargetChain = TestChain;
718 }
719
720 #[derive(Clone, Debug)]
721 struct TestClient {
722 data: Arc<Mutex<TestClientData>>,
723 }
724
725 #[derive(Clone, Debug)]
726 struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);
727
728 #[async_trait]
729 impl TransactionTracker for TestTransactionTracker {
730 type HeaderId = HeaderIdOf<TestChain>;
731
732 async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
733 match self.0 {
734 Some(status) => status,
735 None => futures::future::pending().await,
736 }
737 }
738 }
739
740 #[derive(Clone, Debug)]
741 struct TestClientData {
742 source_sync_status: Result<bool, TestError>,
743 source_head: HashMap<
744 BlockNumberOf<TestChain>,
745 Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
746 >,
747 source_proof: Result<(), TestError>,
748
749 target_free_source_relay_headers_interval:
750 Result<Option<BlockNumberOf<TestChain>>, TestError>,
751 target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
752 target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
753 #[allow(clippy::type_complexity)]
754 target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError>,
755 target_submit_result: Result<(), TestError>,
756
757 submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
758 exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
759 }
760
761 impl TestClientData {
762 pub fn minimal() -> Self {
763 TestClientData {
764 source_sync_status: Ok(true),
765 source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
766 .into_iter()
767 .collect(),
768 source_proof: Ok(()),
769
770 target_free_source_relay_headers_interval: Ok(None),
771 target_best_block: Ok(HeaderId(0, Default::default())),
772 target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
773 target_head: Ok(None),
774 target_submit_result: Ok(()),
775
776 submitted_proof_at_source_relay_block: None,
777 exit_signal_sender: None,
778 }
779 }
780
781 pub fn with_exit_signal_sender(
782 sender: futures::channel::mpsc::UnboundedSender<()>,
783 ) -> Self {
784 let mut client = Self::minimal();
785 client.exit_signal_sender = Some(Box::new(sender));
786 client
787 }
788 }
789
790 impl From<TestClientData> for TestClient {
791 fn from(data: TestClientData) -> TestClient {
792 TestClient { data: Arc::new(Mutex::new(data)) }
793 }
794 }
795
796 #[async_trait]
797 impl RelayClient for TestClient {
798 type Error = TestError;
799
800 async fn reconnect(&mut self) -> Result<(), TestError> {
801 unimplemented!()
802 }
803 }
804
805 #[async_trait]
806 impl SourceClient<TestParachainsPipeline> for TestClient {
807 async fn ensure_synced(&self) -> Result<bool, TestError> {
808 self.data.lock().await.source_sync_status.clone()
809 }
810
811 async fn parachain_head(
812 &self,
813 at_block: HeaderIdOf<TestChain>,
814 ) -> Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError> {
815 self.data
816 .lock()
817 .await
818 .source_head
819 .get(&at_block.0)
820 .expect(&format!("SourceClient::parachain_head({})", at_block.0))
821 .clone()
822 }
823
824 async fn prove_parachain_head(
825 &self,
826 at_block: HeaderIdOf<TestChain>,
827 ) -> Result<(ParaHeadsProof, ParaHash), TestError> {
828 let head_result =
829 SourceClient::<TestParachainsPipeline>::parachain_head(self, at_block).await?;
830 let head = head_result.as_available().unwrap();
831 let proof = (ParaHeadsProof { storage_proof: Default::default() }, head.hash());
832 self.data.lock().await.source_proof.clone().map(|_| proof)
833 }
834 }
835
836 #[async_trait]
837 impl TargetClient<TestParachainsPipeline> for TestClient {
838 type TransactionTracker = TestTransactionTracker;
839
840 async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
841 self.data.lock().await.target_best_block.clone()
842 }
843
844 async fn best_finalized_source_relay_chain_block(
845 &self,
846 _at_block: &HeaderIdOf<TestChain>,
847 ) -> Result<HeaderIdOf<TestChain>, TestError> {
848 self.data.lock().await.target_best_finalized_source_block.clone()
849 }
850
851 async fn free_source_relay_headers_interval(
852 &self,
853 ) -> Result<Option<BlockNumberOf<TestParachain>>, TestError> {
854 self.data.lock().await.target_free_source_relay_headers_interval.clone()
855 }
856
857 async fn parachain_head(
858 &self,
859 _at_block: HeaderIdOf<TestChain>,
860 ) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError> {
861 self.data.lock().await.target_head.clone()
862 }
863
864 async fn submit_parachain_head_proof(
865 &self,
866 at_source_block: HeaderIdOf<TestChain>,
867 _updated_parachain_head: ParaHash,
868 _proof: ParaHeadsProof,
869 _is_free_execution_expected: bool,
870 ) -> Result<TestTransactionTracker, Self::Error> {
871 let mut data = self.data.lock().await;
872 data.target_submit_result.clone()?;
873 data.submitted_proof_at_source_relay_block = Some(at_source_block);
874
875 if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
876 exit_signal_sender.send(()).await.unwrap();
877 }
878 Ok(TestTransactionTracker(Some(
879 TrackedTransactionStatus::Finalized(Default::default()),
880 )))
881 }
882 }
883
884 #[test]
885 fn when_source_client_fails_to_return_sync_state() {
886 let mut test_source_client = TestClientData::minimal();
887 test_source_client.source_sync_status = Err(TestError::Error);
888
889 assert_eq!(
890 async_std::task::block_on(run_until_connection_lost(
891 TestClient::from(test_source_client),
892 TestClient::from(TestClientData::minimal()),
893 None,
894 false,
895 futures::future::pending(),
896 )),
897 Err(FailedClient::Source),
898 );
899 }
900
901 #[test]
902 fn when_target_client_fails_to_return_best_block() {
903 let mut test_target_client = TestClientData::minimal();
904 test_target_client.target_best_block = Err(TestError::Error);
905
906 assert_eq!(
907 async_std::task::block_on(run_until_connection_lost(
908 TestClient::from(TestClientData::minimal()),
909 TestClient::from(test_target_client),
910 None,
911 false,
912 futures::future::pending(),
913 )),
914 Err(FailedClient::Target),
915 );
916 }
917
918 #[test]
919 fn when_target_client_fails_to_read_heads() {
920 let mut test_target_client = TestClientData::minimal();
921 test_target_client.target_head = Err(TestError::Error);
922
923 assert_eq!(
924 async_std::task::block_on(run_until_connection_lost(
925 TestClient::from(TestClientData::minimal()),
926 TestClient::from(test_target_client),
927 None,
928 false,
929 futures::future::pending(),
930 )),
931 Err(FailedClient::Target),
932 );
933 }
934
935 #[test]
936 fn when_target_client_fails_to_read_best_finalized_source_block() {
937 let mut test_target_client = TestClientData::minimal();
938 test_target_client.target_best_finalized_source_block = Err(TestError::Error);
939
940 assert_eq!(
941 async_std::task::block_on(run_until_connection_lost(
942 TestClient::from(TestClientData::minimal()),
943 TestClient::from(test_target_client),
944 None,
945 false,
946 futures::future::pending(),
947 )),
948 Err(FailedClient::Target),
949 );
950 }
951
952 #[test]
953 fn when_source_client_fails_to_read_heads() {
954 let mut test_source_client = TestClientData::minimal();
955 test_source_client.source_head.insert(0, Err(TestError::Error));
956
957 assert_eq!(
958 async_std::task::block_on(run_until_connection_lost(
959 TestClient::from(test_source_client),
960 TestClient::from(TestClientData::minimal()),
961 None,
962 false,
963 futures::future::pending(),
964 )),
965 Err(FailedClient::Source),
966 );
967 }
968
969 #[test]
970 fn when_source_client_fails_to_prove_heads() {
971 let mut test_source_client = TestClientData::minimal();
972 test_source_client.source_proof = Err(TestError::Error);
973
974 assert_eq!(
975 async_std::task::block_on(run_until_connection_lost(
976 TestClient::from(test_source_client),
977 TestClient::from(TestClientData::minimal()),
978 None,
979 false,
980 futures::future::pending(),
981 )),
982 Err(FailedClient::Source),
983 );
984 }
985
986 #[test]
987 fn when_target_client_rejects_update_transaction() {
988 let mut test_target_client = TestClientData::minimal();
989 test_target_client.target_submit_result = Err(TestError::Error);
990
991 assert_eq!(
992 async_std::task::block_on(run_until_connection_lost(
993 TestClient::from(TestClientData::minimal()),
994 TestClient::from(test_target_client),
995 None,
996 false,
997 futures::future::pending(),
998 )),
999 Err(FailedClient::Target),
1000 );
1001 }
1002
1003 #[test]
1004 fn minimal_working_case() {
1005 let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
1006 assert_eq!(
1007 async_std::task::block_on(run_until_connection_lost(
1008 TestClient::from(TestClientData::minimal()),
1009 TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
1010 None,
1011 false,
1012 exit_signal.into_future().map(|(_, _)| ()),
1013 )),
1014 Ok(()),
1015 );
1016 }
1017
1018 #[async_std::test]
1019 async fn free_headers_are_relayed() {
1020 let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
1031 let clients_data = TestClientData {
1032 source_sync_status: Ok(true),
1033 source_head: vec![
1034 (90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
1035 (95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
1036 ]
1037 .into_iter()
1038 .collect(),
1039 source_proof: Ok(()),
1040
1041 target_free_source_relay_headers_interval: Ok(Some(10)),
1042 target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
1043 target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
1044 target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),
1045 target_submit_result: Ok(()),
1046
1047 submitted_proof_at_source_relay_block: None,
1048 exit_signal_sender: Some(Box::new(exit_signal_sender)),
1049 };
1050
1051 let source_client = TestClient::from(clients_data.clone());
1052 let target_client = TestClient::from(clients_data);
1053 assert_eq!(
1054 run_until_connection_lost(
1055 source_client,
1056 target_client.clone(),
1057 None,
1058 true,
1059 exit_signal.into_future().map(|(_, _)| ()),
1060 )
1061 .await,
1062 Ok(()),
1063 );
1064
1065 assert_eq!(
1066 target_client
1067 .data
1068 .lock()
1069 .await
1070 .submitted_proof_at_source_relay_block
1071 .map(|id| id.0),
1072 Some(95)
1073 );
1074
1075 let mut clients_data: TestClientData = target_client.data.lock().await.clone();
1079 clients_data
1080 .source_head
1081 .insert(104, Ok(AvailableHeader::Available(HeaderId(84, [84u8; 32].into()))));
1082 clients_data.target_best_finalized_source_block = Ok(HeaderId(104, [104u8; 32].into()));
1083 clients_data.target_head =
1084 Ok(Some((HeaderId(95, [95u8; 32].into()), HeaderId(42, [42u8; 32].into()))));
1085 clients_data.target_best_block = Ok(HeaderId(255, [255u8; 32].into()));
1086 clients_data.exit_signal_sender = None;
1087
1088 let source_client = TestClient::from(clients_data.clone());
1089 let target_client = TestClient::from(clients_data);
1090 assert_eq!(
1091 run_until_connection_lost(
1092 source_client,
1093 target_client.clone(),
1094 None,
1095 true,
1096 async_std::task::sleep(std::time::Duration::from_millis(100)),
1097 )
1098 .await,
1099 Ok(()),
1100 );
1101
1102 assert_eq!(
1103 target_client
1104 .data
1105 .lock()
1106 .await
1107 .submitted_proof_at_source_relay_block
1108 .map(|id| id.0),
1109 Some(95)
1110 );
1111 }
1112
1113 fn test_tx_tracker() -> SubmittedHeadsTracker<TestParachainsPipeline> {
1114 SubmittedHeadsTracker::new(
1115 AvailableHeader::Available(HeaderId(20, PARA_20_HASH)),
1116 TestTransactionTracker(None),
1117 )
1118 }
1119
1120 impl From<SubmittedHeadStatus<TestParachainsPipeline>> for Option<()> {
1121 fn from(status: SubmittedHeadStatus<TestParachainsPipeline>) -> Option<()> {
1122 match status {
1123 SubmittedHeadStatus::Waiting(_) => Some(()),
1124 _ => None,
1125 }
1126 }
1127 }
1128
1129 #[async_std::test]
1130 async fn tx_tracker_update_when_head_at_target_has_none_value() {
1131 assert_eq!(
1132 Some(()),
1133 test_tx_tracker()
1134 .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1135 .await
1136 .into(),
1137 );
1138 }
1139
1140 #[async_std::test]
1141 async fn tx_tracker_update_when_head_at_target_has_old_value() {
1142 assert_eq!(
1143 Some(()),
1144 test_tx_tracker()
1145 .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1146 .await
1147 .into(),
1148 );
1149 }
1150
1151 #[async_std::test]
1152 async fn tx_tracker_update_when_head_at_target_has_same_value() {
1153 assert!(matches!(
1154 test_tx_tracker()
1155 .update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH)))
1156 .await,
1157 SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
1158 ));
1159 }
1160
1161 #[async_std::test]
1162 async fn tx_tracker_update_when_head_at_target_has_better_value() {
1163 assert!(matches!(
1164 test_tx_tracker()
1165 .update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH)))
1166 .await,
1167 SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
1168 ));
1169 }
1170
1171 #[async_std::test]
1172 async fn tx_tracker_update_when_tx_is_lost() {
1173 let mut tx_tracker = test_tx_tracker();
1174 tx_tracker.transaction_tracker =
1175 futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
1176 assert!(matches!(
1177 tx_tracker
1178 .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1179 .await,
1180 SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
1181 ));
1182 }
1183
1184 #[async_std::test]
1185 async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() {
1186 let mut tx_tracker = test_tx_tracker();
1187 tx_tracker.transaction_tracker =
1188 futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
1189 .boxed()
1190 .shared();
1191 assert!(matches!(
1192 tx_tracker
1193 .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
1194 .await,
1195 SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
1196 ));
1197 }
1198
1199 #[test]
1200 fn parachain_is_not_updated_if_it_is_unavailable() {
1201 assert!(!is_update_required::<TestParachainsPipeline>(
1202 AvailableHeader::Unavailable,
1203 None,
1204 Default::default(),
1205 Default::default(),
1206 ));
1207 assert!(!is_update_required::<TestParachainsPipeline>(
1208 AvailableHeader::Unavailable,
1209 Some(HeaderId(10, PARA_10_HASH)),
1210 Default::default(),
1211 Default::default(),
1212 ));
1213 }
1214
1215 #[test]
1216 fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
1217 assert!(!is_update_required::<TestParachainsPipeline>(
1218 AvailableHeader::Missing,
1219 None,
1220 Default::default(),
1221 Default::default(),
1222 ),);
1223 }
1224
1225 #[test]
1226 fn parachain_is_not_updated_if_target_has_better_head() {
1227 assert!(!is_update_required::<TestParachainsPipeline>(
1228 AvailableHeader::Available(HeaderId(10, Default::default())),
1229 Some(HeaderId(20, Default::default())),
1230 Default::default(),
1231 Default::default(),
1232 ),);
1233 }
1234
1235 #[test]
1236 fn parachain_is_updated_after_offboarding() {
1237 assert!(is_update_required::<TestParachainsPipeline>(
1238 AvailableHeader::Missing,
1239 Some(HeaderId(20, Default::default())),
1240 Default::default(),
1241 Default::default(),
1242 ),);
1243 }
1244
1245 #[test]
1246 fn parachain_is_updated_after_onboarding() {
1247 assert!(is_update_required::<TestParachainsPipeline>(
1248 AvailableHeader::Available(HeaderId(30, Default::default())),
1249 None,
1250 Default::default(),
1251 Default::default(),
1252 ),);
1253 }
1254
1255 #[test]
1256 fn parachain_is_updated_if_newer_head_is_known() {
1257 assert!(is_update_required::<TestParachainsPipeline>(
1258 AvailableHeader::Available(HeaderId(40, Default::default())),
1259 Some(HeaderId(30, Default::default())),
1260 Default::default(),
1261 Default::default(),
1262 ),);
1263 }
1264}