1use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
28
29use async_trait::async_trait;
30use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
31
32use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
33use relay_utils::{
34 interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient,
35 retry_backoff, FailedClient, TransactionTracker,
36};
37
38use crate::{
39 message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
40 message_race_delivery::run as run_message_delivery_race,
41 message_race_receiving::run as run_message_receiving_race,
42 metrics::{Labeled, MessageLaneLoopMetrics},
43};
44
45#[derive(Debug, Clone)]
47pub struct Params<LaneId> {
48 pub lane: LaneId,
50 pub source_tick: Duration,
52 pub target_tick: Duration,
54 pub reconnect_delay: Duration,
56 pub delivery_params: MessageDeliveryParams,
58}
59
60#[derive(Debug, Clone)]
62pub struct MessageDeliveryParams {
63 pub max_unrewarded_relayer_entries_at_target: MessageNonce,
68 pub max_unconfirmed_nonces_at_target: MessageNonce,
72 pub max_messages_in_single_batch: MessageNonce,
74 pub max_messages_weight_in_single_batch: Weight,
76 pub max_messages_size_in_single_batch: u32,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct MessageDetails<SourceChainBalance> {
83 pub dispatch_weight: Weight,
85 pub size: u32,
87 pub reward: SourceChainBalance,
89}
90
91pub type MessageDetailsMap<SourceChainBalance> =
93 BTreeMap<MessageNonce, MessageDetails<SourceChainBalance>>;
94
95#[derive(Debug, PartialEq, Eq)]
97pub struct MessageProofParameters {
98 pub outbound_state_proof_required: bool,
100 pub dispatch_weight: Weight,
102}
103
104pub struct NoncesSubmitArtifacts<T> {
106 pub nonces: RangeInclusive<MessageNonce>,
108 pub tx_tracker: T,
110}
111
112pub trait BatchTransaction<HeaderId>: Debug + Send + Sync {
115 fn required_header_id(&self) -> HeaderId;
118}
119
120#[async_trait]
122pub trait SourceClient<P: MessageLane>: RelayClient {
123 type BatchTransaction: BatchTransaction<TargetHeaderIdOf<P>> + Clone;
125 type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>;
127
128 async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
130
131 async fn latest_generated_nonce(
133 &self,
134 id: SourceHeaderIdOf<P>,
135 ) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
136
137 async fn latest_confirmed_received_nonce(
139 &self,
140 id: SourceHeaderIdOf<P>,
141 ) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
142
143 async fn generated_message_details(
148 &self,
149 id: SourceHeaderIdOf<P>,
150 nonces: RangeInclusive<MessageNonce>,
151 ) -> Result<MessageDetailsMap<P::SourceChainBalance>, Self::Error>;
152
153 async fn prove_messages(
155 &self,
156 id: SourceHeaderIdOf<P>,
157 nonces: RangeInclusive<MessageNonce>,
158 proof_parameters: MessageProofParameters,
159 ) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), Self::Error>;
160
161 async fn submit_messages_receiving_proof(
163 &self,
164 maybe_batch_tx: Option<Self::BatchTransaction>,
165 generated_at_block: TargetHeaderIdOf<P>,
166 proof: P::MessagesReceivingProof,
167 ) -> Result<Self::TransactionTracker, Self::Error>;
168
169 async fn require_target_header_on_source(
180 &self,
181 id: TargetHeaderIdOf<P>,
182 ) -> Result<Option<Self::BatchTransaction>, Self::Error>;
183}
184
185#[async_trait]
187pub trait TargetClient<P: MessageLane>: RelayClient {
188 type BatchTransaction: BatchTransaction<SourceHeaderIdOf<P>> + Clone;
190 type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>;
192
193 async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
195
196 async fn latest_received_nonce(
198 &self,
199 id: TargetHeaderIdOf<P>,
200 ) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
201
202 async fn latest_confirmed_received_nonce(
204 &self,
205 id: TargetHeaderIdOf<P>,
206 ) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
207
208 async fn unrewarded_relayers_state(
210 &self,
211 id: TargetHeaderIdOf<P>,
212 ) -> Result<(TargetHeaderIdOf<P>, UnrewardedRelayersState), Self::Error>;
213
214 async fn prove_messages_receiving(
216 &self,
217 id: TargetHeaderIdOf<P>,
218 ) -> Result<(TargetHeaderIdOf<P>, P::MessagesReceivingProof), Self::Error>;
219
220 async fn submit_messages_proof(
222 &self,
223 maybe_batch_tx: Option<Self::BatchTransaction>,
224 generated_at_header: SourceHeaderIdOf<P>,
225 nonces: RangeInclusive<MessageNonce>,
226 proof: P::MessagesProof,
227 ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
228
229 async fn require_source_header_on_target(
238 &self,
239 id: SourceHeaderIdOf<P>,
240 ) -> Result<Option<Self::BatchTransaction>, Self::Error>;
241}
242
243#[derive(Clone, Debug, Default, PartialEq, Eq)]
245pub struct ClientState<SelfHeaderId, PeerHeaderId> {
246 pub best_self: SelfHeaderId,
248 pub best_finalized_self: SelfHeaderId,
250 pub best_finalized_peer_at_best_self: Option<PeerHeaderId>,
256 pub actual_best_finalized_peer_at_best_self: Option<PeerHeaderId>,
259}
260
261pub type SourceClientState<P> = ClientState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>;
263
264pub type TargetClientState<P> = ClientState<TargetHeaderIdOf<P>, SourceHeaderIdOf<P>>;
266
267#[derive(Debug, Default)]
269pub struct ClientsState<P: MessageLane> {
270 pub source: Option<SourceClientState<P>>,
272 pub target: Option<TargetClientState<P>>,
274}
275
276pub fn metrics_prefix<P: MessageLane>(lane: &P::LaneId) -> String {
279 format!("{}_to_{}_MessageLane_{}", P::SOURCE_NAME, P::TARGET_NAME, lane.label())
280}
281
282pub async fn run<P: MessageLane>(
284 params: Params<P::LaneId>,
285 source_client: impl SourceClient<P>,
286 target_client: impl TargetClient<P>,
287 metrics_params: MetricsParams,
288 exit_signal: impl Future<Output = ()> + Send + 'static,
289) -> Result<(), relay_utils::Error> {
290 let exit_signal = exit_signal.shared();
291 relay_utils::relay_loop(source_client, target_client)
292 .reconnect_delay(params.reconnect_delay)
293 .with_metrics(metrics_params)
294 .loop_metric(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<P>(¶ms.lane)))?)?
295 .expose()
296 .await?
297 .run(metrics_prefix::<P>(¶ms.lane), move |source_client, target_client, metrics| {
298 run_until_connection_lost(
299 params.clone(),
300 source_client,
301 target_client,
302 metrics,
303 exit_signal.clone(),
304 )
305 })
306 .await
307}
308
309async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
312 params: Params<P::LaneId>,
313 source_client: SC,
314 target_client: TC,
315 metrics_msg: Option<MessageLaneLoopMetrics>,
316 exit_signal: impl Future<Output = ()>,
317) -> Result<(), FailedClient> {
318 let mut source_retry_backoff = retry_backoff();
319 let mut source_client_is_online = false;
320 let mut source_state_required = true;
321 let source_state = source_client.state().fuse();
322 let source_go_offline_future = futures::future::Fuse::terminated();
323 let source_tick_stream = interval(params.source_tick).fuse();
324
325 let mut target_retry_backoff = retry_backoff();
326 let mut target_client_is_online = false;
327 let mut target_state_required = true;
328 let target_state = target_client.state().fuse();
329 let target_go_offline_future = futures::future::Fuse::terminated();
330 let target_tick_stream = interval(params.target_tick).fuse();
331
332 let (
333 (delivery_source_state_sender, delivery_source_state_receiver),
334 (delivery_target_state_sender, delivery_target_state_receiver),
335 ) = (unbounded(), unbounded());
336 let delivery_race_loop = run_message_delivery_race(
337 source_client.clone(),
338 delivery_source_state_receiver,
339 target_client.clone(),
340 delivery_target_state_receiver,
341 metrics_msg.clone(),
342 params.delivery_params,
343 )
344 .fuse();
345
346 let (
347 (receiving_source_state_sender, receiving_source_state_receiver),
348 (receiving_target_state_sender, receiving_target_state_receiver),
349 ) = (unbounded(), unbounded());
350 let receiving_race_loop = run_message_receiving_race(
351 source_client.clone(),
352 receiving_source_state_receiver,
353 target_client.clone(),
354 receiving_target_state_receiver,
355 metrics_msg.clone(),
356 )
357 .fuse();
358
359 let exit_signal = exit_signal.fuse();
360
361 futures::pin_mut!(
362 source_state,
363 source_go_offline_future,
364 source_tick_stream,
365 target_state,
366 target_go_offline_future,
367 target_tick_stream,
368 delivery_race_loop,
369 receiving_race_loop,
370 exit_signal
371 );
372
373 loop {
374 futures::select! {
375 new_source_state = source_state => {
376 source_state_required = false;
377
378 source_client_is_online = process_future_result(
379 new_source_state,
380 &mut source_retry_backoff,
381 |new_source_state| {
382 log::debug!(
383 target: "bridge",
384 "Received state from {} node: {:?}",
385 P::SOURCE_NAME,
386 new_source_state,
387 );
388 let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
389 let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
390
391 if let Some(metrics_msg) = metrics_msg.as_ref() {
392 metrics_msg.update_source_state::<P>(new_source_state);
393 }
394 },
395 &mut source_go_offline_future,
396 async_std::task::sleep,
397 || format!("Error retrieving state from {} node", P::SOURCE_NAME),
398 ).fail_if_connection_error(FailedClient::Source)?;
399 },
400 _ = source_go_offline_future => {
401 source_client_is_online = true;
402 },
403 _ = source_tick_stream.next() => {
404 source_state_required = true;
405 },
406 new_target_state = target_state => {
407 target_state_required = false;
408
409 target_client_is_online = process_future_result(
410 new_target_state,
411 &mut target_retry_backoff,
412 |new_target_state| {
413 log::debug!(
414 target: "bridge",
415 "Received state from {} node: {:?}",
416 P::TARGET_NAME,
417 new_target_state,
418 );
419 let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
420 let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
421
422 if let Some(metrics_msg) = metrics_msg.as_ref() {
423 metrics_msg.update_target_state::<P>(new_target_state);
424 }
425 },
426 &mut target_go_offline_future,
427 async_std::task::sleep,
428 || format!("Error retrieving state from {} node", P::TARGET_NAME),
429 ).fail_if_connection_error(FailedClient::Target)?;
430 },
431 _ = target_go_offline_future => {
432 target_client_is_online = true;
433 },
434 _ = target_tick_stream.next() => {
435 target_state_required = true;
436 },
437
438 delivery_error = delivery_race_loop => {
439 match delivery_error {
440 Ok(_) => unreachable!("only ends with error; qed"),
441 Err(err) => return Err(err),
442 }
443 },
444 receiving_error = receiving_race_loop => {
445 match receiving_error {
446 Ok(_) => unreachable!("only ends with error; qed"),
447 Err(err) => return Err(err),
448 }
449 },
450
451 () = exit_signal => {
452 return Ok(());
453 }
454 }
455
456 if source_client_is_online && source_state_required {
457 log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
458 source_state.set(source_client.state().fuse());
459 source_client_is_online = false;
460 }
461
462 if target_client_is_online && target_state_required {
463 log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME);
464 target_state.set(target_client.state().fuse());
465 target_client_is_online = false;
466 }
467 }
468}
469
470#[cfg(test)]
471pub(crate) mod tests {
472 use std::sync::Arc;
473
474 use bp_messages::{HashedLaneId, LaneIdType, LegacyLaneId};
475 use futures::stream::StreamExt;
476 use parking_lot::Mutex;
477 use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus};
478
479 use super::*;
480
481 pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId {
482 HeaderId(number, number)
483 }
484
485 pub type TestSourceChainBalance = u64;
486 pub type TestSourceHeaderId = HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash>;
487 pub type TestTargetHeaderId = HeaderId<TestTargetHeaderNumber, TestTargetHeaderHash>;
488
489 pub type TestMessagesProof = (RangeInclusive<MessageNonce>, Option<MessageNonce>);
490 pub type TestMessagesReceivingProof = MessageNonce;
491
492 pub type TestSourceHeaderNumber = u64;
493 pub type TestSourceHeaderHash = u64;
494
495 pub type TestTargetHeaderNumber = u64;
496 pub type TestTargetHeaderHash = u64;
497
498 #[derive(Debug)]
499 pub struct TestError;
500
501 impl MaybeConnectionError for TestError {
502 fn is_connection_error(&self) -> bool {
503 true
504 }
505 }
506
507 pub type TestLaneIdType = HashedLaneId;
509
510 #[derive(Clone)]
511 pub struct TestMessageLane;
512
513 impl MessageLane for TestMessageLane {
514 const SOURCE_NAME: &'static str = "TestSource";
515 const TARGET_NAME: &'static str = "TestTarget";
516
517 type MessagesProof = TestMessagesProof;
518 type MessagesReceivingProof = TestMessagesReceivingProof;
519
520 type SourceChainBalance = TestSourceChainBalance;
521 type SourceHeaderNumber = TestSourceHeaderNumber;
522 type SourceHeaderHash = TestSourceHeaderHash;
523
524 type TargetHeaderNumber = TestTargetHeaderNumber;
525 type TargetHeaderHash = TestTargetHeaderHash;
526
527 type LaneId = TestLaneIdType;
528 }
529
530 #[derive(Clone, Debug)]
531 pub struct TestMessagesBatchTransaction {
532 required_header_id: TestSourceHeaderId,
533 }
534
535 #[async_trait]
536 impl BatchTransaction<TestSourceHeaderId> for TestMessagesBatchTransaction {
537 fn required_header_id(&self) -> TestSourceHeaderId {
538 self.required_header_id
539 }
540 }
541
542 #[derive(Clone, Debug)]
543 pub struct TestConfirmationBatchTransaction {
544 required_header_id: TestTargetHeaderId,
545 }
546
547 #[async_trait]
548 impl BatchTransaction<TestTargetHeaderId> for TestConfirmationBatchTransaction {
549 fn required_header_id(&self) -> TestTargetHeaderId {
550 self.required_header_id
551 }
552 }
553
554 #[derive(Clone, Debug)]
555 pub struct TestTransactionTracker(TrackedTransactionStatus<TestTargetHeaderId>);
556
557 impl Default for TestTransactionTracker {
558 fn default() -> TestTransactionTracker {
559 TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
560 }
561 }
562
563 #[async_trait]
564 impl TransactionTracker for TestTransactionTracker {
565 type HeaderId = TestTargetHeaderId;
566
567 async fn wait(self) -> TrackedTransactionStatus<TestTargetHeaderId> {
568 self.0
569 }
570 }
571
572 #[derive(Debug, Clone)]
573 pub struct TestClientData {
574 is_source_fails: bool,
575 is_source_reconnected: bool,
576 source_state: SourceClientState<TestMessageLane>,
577 source_latest_generated_nonce: MessageNonce,
578 source_latest_confirmed_received_nonce: MessageNonce,
579 source_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
580 submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
581 is_target_fails: bool,
582 is_target_reconnected: bool,
583 target_state: SourceClientState<TestMessageLane>,
584 target_latest_received_nonce: MessageNonce,
585 target_latest_confirmed_received_nonce: MessageNonce,
586 target_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
587 submitted_messages_proofs: Vec<TestMessagesProof>,
588 target_to_source_batch_transaction: Option<TestConfirmationBatchTransaction>,
589 target_to_source_header_required: Option<TestTargetHeaderId>,
590 target_to_source_header_requirements: Vec<TestTargetHeaderId>,
591 source_to_target_batch_transaction: Option<TestMessagesBatchTransaction>,
592 source_to_target_header_required: Option<TestSourceHeaderId>,
593 source_to_target_header_requirements: Vec<TestSourceHeaderId>,
594 }
595
596 impl Default for TestClientData {
597 fn default() -> TestClientData {
598 TestClientData {
599 is_source_fails: false,
600 is_source_reconnected: false,
601 source_state: Default::default(),
602 source_latest_generated_nonce: 0,
603 source_latest_confirmed_received_nonce: 0,
604 source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
605 0,
606 Default::default(),
607 )),
608 submitted_messages_receiving_proofs: Vec::new(),
609 is_target_fails: false,
610 is_target_reconnected: false,
611 target_state: Default::default(),
612 target_latest_received_nonce: 0,
613 target_latest_confirmed_received_nonce: 0,
614 target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
615 0,
616 Default::default(),
617 )),
618 submitted_messages_proofs: Vec::new(),
619 target_to_source_batch_transaction: None,
620 target_to_source_header_required: None,
621 target_to_source_header_requirements: Vec::new(),
622 source_to_target_batch_transaction: None,
623 source_to_target_header_required: None,
624 source_to_target_header_requirements: Vec::new(),
625 }
626 }
627 }
628
629 impl TestClientData {
630 fn receive_messages(
631 &mut self,
632 maybe_batch_tx: Option<TestMessagesBatchTransaction>,
633 proof: TestMessagesProof,
634 ) {
635 self.target_state.best_self =
636 HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
637 self.target_state.best_finalized_self = self.target_state.best_self;
638 self.target_latest_received_nonce = *proof.0.end();
639 if let Some(maybe_batch_tx) = maybe_batch_tx {
640 self.target_state.best_finalized_peer_at_best_self =
641 Some(maybe_batch_tx.required_header_id());
642 }
643 if let Some(target_latest_confirmed_received_nonce) = proof.1 {
644 self.target_latest_confirmed_received_nonce =
645 target_latest_confirmed_received_nonce;
646 }
647 self.submitted_messages_proofs.push(proof);
648 }
649
650 fn receive_messages_delivery_proof(
651 &mut self,
652 maybe_batch_tx: Option<TestConfirmationBatchTransaction>,
653 proof: TestMessagesReceivingProof,
654 ) {
655 self.source_state.best_self =
656 HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
657 self.source_state.best_finalized_self = self.source_state.best_self;
658 if let Some(maybe_batch_tx) = maybe_batch_tx {
659 self.source_state.best_finalized_peer_at_best_self =
660 Some(maybe_batch_tx.required_header_id());
661 }
662 self.submitted_messages_receiving_proofs.push(proof);
663 self.source_latest_confirmed_received_nonce = proof;
664 }
665 }
666
667 #[derive(Clone)]
668 pub struct TestSourceClient {
669 data: Arc<Mutex<TestClientData>>,
670 tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
671 post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
672 }
673
674 impl Default for TestSourceClient {
675 fn default() -> Self {
676 TestSourceClient {
677 data: Arc::new(Mutex::new(TestClientData::default())),
678 tick: Arc::new(|_| {}),
679 post_tick: Arc::new(|_| {}),
680 }
681 }
682 }
683
684 #[async_trait]
685 impl RelayClient for TestSourceClient {
686 type Error = TestError;
687
688 async fn reconnect(&mut self) -> Result<(), TestError> {
689 {
690 let mut data = self.data.lock();
691 (self.tick)(&mut data);
692 data.is_source_reconnected = true;
693 (self.post_tick)(&mut data);
694 }
695 Ok(())
696 }
697 }
698
699 #[async_trait]
700 impl SourceClient<TestMessageLane> for TestSourceClient {
701 type BatchTransaction = TestConfirmationBatchTransaction;
702 type TransactionTracker = TestTransactionTracker;
703
704 async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> {
705 let mut data = self.data.lock();
706 (self.tick)(&mut data);
707 if data.is_source_fails {
708 return Err(TestError)
709 }
710 (self.post_tick)(&mut data);
711 Ok(data.source_state.clone())
712 }
713
714 async fn latest_generated_nonce(
715 &self,
716 id: SourceHeaderIdOf<TestMessageLane>,
717 ) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
718 let mut data = self.data.lock();
719 (self.tick)(&mut data);
720 if data.is_source_fails {
721 return Err(TestError)
722 }
723 (self.post_tick)(&mut data);
724 Ok((id, data.source_latest_generated_nonce))
725 }
726
727 async fn latest_confirmed_received_nonce(
728 &self,
729 id: SourceHeaderIdOf<TestMessageLane>,
730 ) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
731 let mut data = self.data.lock();
732 (self.tick)(&mut data);
733 (self.post_tick)(&mut data);
734 Ok((id, data.source_latest_confirmed_received_nonce))
735 }
736
737 async fn generated_message_details(
738 &self,
739 _id: SourceHeaderIdOf<TestMessageLane>,
740 nonces: RangeInclusive<MessageNonce>,
741 ) -> Result<MessageDetailsMap<TestSourceChainBalance>, TestError> {
742 Ok(nonces
743 .map(|nonce| {
744 (
745 nonce,
746 MessageDetails {
747 dispatch_weight: Weight::from_parts(1, 0),
748 size: 1,
749 reward: 1,
750 },
751 )
752 })
753 .collect())
754 }
755
756 async fn prove_messages(
757 &self,
758 id: SourceHeaderIdOf<TestMessageLane>,
759 nonces: RangeInclusive<MessageNonce>,
760 proof_parameters: MessageProofParameters,
761 ) -> Result<
762 (SourceHeaderIdOf<TestMessageLane>, RangeInclusive<MessageNonce>, TestMessagesProof),
763 TestError,
764 > {
765 let mut data = self.data.lock();
766 (self.tick)(&mut data);
767 (self.post_tick)(&mut data);
768 Ok((
769 id,
770 nonces.clone(),
771 (
772 nonces,
773 if proof_parameters.outbound_state_proof_required {
774 Some(data.source_latest_confirmed_received_nonce)
775 } else {
776 None
777 },
778 ),
779 ))
780 }
781
782 async fn submit_messages_receiving_proof(
783 &self,
784 maybe_batch_tx: Option<Self::BatchTransaction>,
785 _generated_at_block: TargetHeaderIdOf<TestMessageLane>,
786 proof: TestMessagesReceivingProof,
787 ) -> Result<Self::TransactionTracker, TestError> {
788 let mut data = self.data.lock();
789 (self.tick)(&mut data);
790 data.receive_messages_delivery_proof(maybe_batch_tx, proof);
791 (self.post_tick)(&mut data);
792 Ok(TestTransactionTracker(data.source_tracked_transaction_status))
793 }
794
795 async fn require_target_header_on_source(
796 &self,
797 id: TargetHeaderIdOf<TestMessageLane>,
798 ) -> Result<Option<Self::BatchTransaction>, Self::Error> {
799 let mut data = self.data.lock();
800 data.target_to_source_header_required = Some(id);
801 data.target_to_source_header_requirements.push(id);
802 (self.tick)(&mut data);
803 (self.post_tick)(&mut data);
804
805 Ok(data.target_to_source_batch_transaction.take().map(|mut tx| {
806 tx.required_header_id = id;
807 tx
808 }))
809 }
810 }
811
812 #[derive(Clone)]
813 pub struct TestTargetClient {
814 data: Arc<Mutex<TestClientData>>,
815 tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
816 post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
817 }
818
819 impl Default for TestTargetClient {
820 fn default() -> Self {
821 TestTargetClient {
822 data: Arc::new(Mutex::new(TestClientData::default())),
823 tick: Arc::new(|_| {}),
824 post_tick: Arc::new(|_| {}),
825 }
826 }
827 }
828
829 #[async_trait]
830 impl RelayClient for TestTargetClient {
831 type Error = TestError;
832
833 async fn reconnect(&mut self) -> Result<(), TestError> {
834 {
835 let mut data = self.data.lock();
836 (self.tick)(&mut data);
837 data.is_target_reconnected = true;
838 (self.post_tick)(&mut data);
839 }
840 Ok(())
841 }
842 }
843
844 #[async_trait]
845 impl TargetClient<TestMessageLane> for TestTargetClient {
846 type BatchTransaction = TestMessagesBatchTransaction;
847 type TransactionTracker = TestTransactionTracker;
848
849 async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> {
850 let mut data = self.data.lock();
851 (self.tick)(&mut data);
852 if data.is_target_fails {
853 return Err(TestError)
854 }
855 (self.post_tick)(&mut data);
856 Ok(data.target_state.clone())
857 }
858
859 async fn latest_received_nonce(
860 &self,
861 id: TargetHeaderIdOf<TestMessageLane>,
862 ) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
863 let mut data = self.data.lock();
864 (self.tick)(&mut data);
865 if data.is_target_fails {
866 return Err(TestError)
867 }
868 (self.post_tick)(&mut data);
869 Ok((id, data.target_latest_received_nonce))
870 }
871
872 async fn unrewarded_relayers_state(
873 &self,
874 id: TargetHeaderIdOf<TestMessageLane>,
875 ) -> Result<(TargetHeaderIdOf<TestMessageLane>, UnrewardedRelayersState), TestError> {
876 Ok((
877 id,
878 UnrewardedRelayersState {
879 unrewarded_relayer_entries: 0,
880 messages_in_oldest_entry: 0,
881 total_messages: 0,
882 last_delivered_nonce: 0,
883 },
884 ))
885 }
886
887 async fn latest_confirmed_received_nonce(
888 &self,
889 id: TargetHeaderIdOf<TestMessageLane>,
890 ) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
891 let mut data = self.data.lock();
892 (self.tick)(&mut data);
893 if data.is_target_fails {
894 return Err(TestError)
895 }
896 (self.post_tick)(&mut data);
897 Ok((id, data.target_latest_confirmed_received_nonce))
898 }
899
900 async fn prove_messages_receiving(
901 &self,
902 id: TargetHeaderIdOf<TestMessageLane>,
903 ) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessagesReceivingProof), TestError> {
904 Ok((id, self.data.lock().target_latest_received_nonce))
905 }
906
907 async fn submit_messages_proof(
908 &self,
909 maybe_batch_tx: Option<Self::BatchTransaction>,
910 _generated_at_header: SourceHeaderIdOf<TestMessageLane>,
911 nonces: RangeInclusive<MessageNonce>,
912 proof: TestMessagesProof,
913 ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, TestError> {
914 let mut data = self.data.lock();
915 (self.tick)(&mut data);
916 if data.is_target_fails {
917 return Err(TestError)
918 }
919 data.receive_messages(maybe_batch_tx, proof);
920 (self.post_tick)(&mut data);
921 Ok(NoncesSubmitArtifacts {
922 nonces,
923 tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
924 })
925 }
926
927 async fn require_source_header_on_target(
928 &self,
929 id: SourceHeaderIdOf<TestMessageLane>,
930 ) -> Result<Option<Self::BatchTransaction>, Self::Error> {
931 let mut data = self.data.lock();
932 data.source_to_target_header_required = Some(id);
933 data.source_to_target_header_requirements.push(id);
934 (self.tick)(&mut data);
935 (self.post_tick)(&mut data);
936
937 Ok(data.source_to_target_batch_transaction.take().map(|mut tx| {
938 tx.required_header_id = id;
939 tx
940 }))
941 }
942 }
943
944 fn run_loop_test(
945 data: Arc<Mutex<TestClientData>>,
946 source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
947 source_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
948 target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
949 target_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
950 exit_signal: impl Future<Output = ()> + 'static + Send,
951 ) -> TestClientData {
952 async_std::task::block_on(async {
953 let source_client = TestSourceClient {
954 data: data.clone(),
955 tick: source_tick,
956 post_tick: source_post_tick,
957 };
958 let target_client = TestTargetClient {
959 data: data.clone(),
960 tick: target_tick,
961 post_tick: target_post_tick,
962 };
963 let _ = run(
964 Params {
965 lane: TestLaneIdType::try_new(1, 2).unwrap(),
966 source_tick: Duration::from_millis(100),
967 target_tick: Duration::from_millis(100),
968 reconnect_delay: Duration::from_millis(0),
969 delivery_params: MessageDeliveryParams {
970 max_unrewarded_relayer_entries_at_target: 4,
971 max_unconfirmed_nonces_at_target: 4,
972 max_messages_in_single_batch: 4,
973 max_messages_weight_in_single_batch: Weight::from_parts(4, 0),
974 max_messages_size_in_single_batch: 4,
975 },
976 },
977 source_client,
978 target_client,
979 MetricsParams::disabled(),
980 exit_signal,
981 )
982 .await;
983 let result = data.lock().clone();
984 result
985 })
986 }
987
988 #[test]
989 fn message_lane_loop_is_able_to_recover_from_connection_errors() {
990 let (exit_sender, exit_receiver) = unbounded();
994 let result = run_loop_test(
995 Arc::new(Mutex::new(TestClientData {
996 is_source_fails: true,
997 source_state: ClientState {
998 best_self: HeaderId(0, 0),
999 best_finalized_self: HeaderId(0, 0),
1000 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1001 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1002 },
1003 source_latest_generated_nonce: 1,
1004 target_state: ClientState {
1005 best_self: HeaderId(0, 0),
1006 best_finalized_self: HeaderId(0, 0),
1007 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1008 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1009 },
1010 target_latest_received_nonce: 0,
1011 ..Default::default()
1012 })),
1013 Arc::new(|data: &mut TestClientData| {
1014 if data.is_source_reconnected {
1015 data.is_source_fails = false;
1016 data.is_target_fails = true;
1017 }
1018 }),
1019 Arc::new(|_| {}),
1020 Arc::new(move |data: &mut TestClientData| {
1021 if data.is_target_reconnected {
1022 data.is_target_fails = false;
1023 }
1024 if data.target_state.best_finalized_peer_at_best_self.unwrap().0 < 10 {
1025 data.target_state.best_finalized_peer_at_best_self = Some(HeaderId(
1026 data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
1027 data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
1028 ));
1029 }
1030 if !data.submitted_messages_proofs.is_empty() {
1031 exit_sender.unbounded_send(()).unwrap();
1032 }
1033 }),
1034 Arc::new(|_| {}),
1035 exit_receiver.into_future().map(|(_, _)| ()),
1036 );
1037
1038 assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
1039 }
1040
1041 #[test]
1042 fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
1043 let (exit_sender, exit_receiver) = unbounded();
1046 let result = run_loop_test(
1047 Arc::new(Mutex::new(TestClientData {
1048 source_state: ClientState {
1049 best_self: HeaderId(0, 0),
1050 best_finalized_self: HeaderId(0, 0),
1051 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1052 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1053 },
1054 source_latest_generated_nonce: 1,
1055 target_state: ClientState {
1056 best_self: HeaderId(0, 0),
1057 best_finalized_self: HeaderId(0, 0),
1058 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1059 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1060 },
1061 target_latest_received_nonce: 0,
1062 ..Default::default()
1063 })),
1064 Arc::new(move |data: &mut TestClientData| {
1065 data.source_state.best_self =
1067 HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1068 data.source_state.best_finalized_self = data.source_state.best_self;
1069 if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
1071 if *last_requirement !=
1072 data.source_state.best_finalized_peer_at_best_self.unwrap()
1073 {
1074 data.source_state.best_finalized_peer_at_best_self =
1075 Some(*last_requirement);
1076 }
1077 }
1078 }),
1079 Arc::new(move |data: &mut TestClientData| {
1080 if data.submitted_messages_receiving_proofs.len() == 1 {
1084 data.source_latest_confirmed_received_nonce = 0;
1085 }
1086 }),
1087 Arc::new(move |data: &mut TestClientData| {
1088 data.target_state.best_self =
1090 HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1091 data.target_state.best_finalized_self = data.target_state.best_self;
1092 if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
1094 if *last_requirement !=
1095 data.target_state.best_finalized_peer_at_best_self.unwrap()
1096 {
1097 data.target_state.best_finalized_peer_at_best_self =
1098 Some(*last_requirement);
1099 }
1100 }
1101 if data.source_latest_confirmed_received_nonce == 1 {
1103 exit_sender.unbounded_send(()).unwrap();
1104 }
1105 }),
1106 Arc::new(move |data: &mut TestClientData| {
1107 if data.submitted_messages_proofs.len() == 1 {
1111 data.target_latest_received_nonce = 0;
1112 data.target_latest_confirmed_received_nonce = 0;
1113 }
1114 }),
1115 exit_receiver.into_future().map(|(_, _)| ()),
1116 );
1117
1118 assert_eq!(result.submitted_messages_proofs.len(), 2);
1119 assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
1120 }
1121
1122 #[test]
1123 fn message_lane_loop_works() {
1124 let (exit_sender, exit_receiver) = unbounded();
1125 let result = run_loop_test(
1126 Arc::new(Mutex::new(TestClientData {
1127 source_state: ClientState {
1128 best_self: HeaderId(10, 10),
1129 best_finalized_self: HeaderId(10, 10),
1130 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1131 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1132 },
1133 source_latest_generated_nonce: 10,
1134 target_state: ClientState {
1135 best_self: HeaderId(0, 0),
1136 best_finalized_self: HeaderId(0, 0),
1137 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1138 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1139 },
1140 target_latest_received_nonce: 0,
1141 ..Default::default()
1142 })),
1143 Arc::new(|data: &mut TestClientData| {
1144 data.source_state.best_self =
1146 HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1147 data.source_state.best_finalized_self = data.source_state.best_self;
1148 if data.target_to_source_header_required.is_some() {
1150 assert!(
1151 data.source_state.best_finalized_peer_at_best_self.unwrap().0 <
1152 data.target_state.best_self.0
1153 );
1154 data.target_to_source_header_required = None;
1155 }
1156 if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
1158 if *last_requirement !=
1159 data.source_state.best_finalized_peer_at_best_self.unwrap()
1160 {
1161 data.source_state.best_finalized_peer_at_best_self =
1162 Some(*last_requirement);
1163 }
1164 }
1165 }),
1166 Arc::new(|_| {}),
1167 Arc::new(move |data: &mut TestClientData| {
1168 data.target_state.best_self =
1170 HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1171 data.target_state.best_finalized_self = data.target_state.best_self;
1172 if data.source_to_target_header_required.is_some() {
1174 assert!(
1175 data.target_state.best_finalized_peer_at_best_self.unwrap().0 <
1176 data.source_state.best_self.0
1177 );
1178 data.source_to_target_header_required = None;
1179 }
1180 if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
1182 if *last_requirement !=
1183 data.target_state.best_finalized_peer_at_best_self.unwrap()
1184 {
1185 data.target_state.best_finalized_peer_at_best_self =
1186 Some(*last_requirement);
1187 }
1188 }
1189 if data.source_latest_confirmed_received_nonce == 10 {
1191 exit_sender.unbounded_send(()).unwrap();
1192 }
1193 }),
1194 Arc::new(|_| {}),
1195 exit_receiver.into_future().map(|(_, _)| ()),
1196 );
1197
1198 assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
1203 assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
1204 assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
1205 assert!(!result.submitted_messages_receiving_proofs.is_empty());
1206
1207 assert!(!result.target_to_source_header_requirements.is_empty());
1209 assert!(!result.source_to_target_header_requirements.is_empty());
1210 }
1211
1212 #[test]
1213 fn message_lane_loop_works_with_batch_transactions() {
1214 let (exit_sender, exit_receiver) = unbounded();
1215 let original_data = Arc::new(Mutex::new(TestClientData {
1216 source_state: ClientState {
1217 best_self: HeaderId(10, 10),
1218 best_finalized_self: HeaderId(10, 10),
1219 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1220 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1221 },
1222 source_latest_generated_nonce: 10,
1223 target_state: ClientState {
1224 best_self: HeaderId(0, 0),
1225 best_finalized_self: HeaderId(0, 0),
1226 best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1227 actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1228 },
1229 target_latest_received_nonce: 0,
1230 ..Default::default()
1231 }));
1232 let result = run_loop_test(
1233 original_data,
1234 Arc::new(|_| {}),
1235 Arc::new(move |data: &mut TestClientData| {
1236 data.source_state.best_self =
1237 HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1238 data.source_state.best_finalized_self = data.source_state.best_self;
1239 if let Some(target_to_source_header_required) =
1240 data.target_to_source_header_required.take()
1241 {
1242 data.target_to_source_batch_transaction =
1243 Some(TestConfirmationBatchTransaction {
1244 required_header_id: target_to_source_header_required,
1245 })
1246 }
1247 }),
1248 Arc::new(|_| {}),
1249 Arc::new(move |data: &mut TestClientData| {
1250 data.target_state.best_self =
1251 HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1252 data.target_state.best_finalized_self = data.target_state.best_self;
1253
1254 if let Some(source_to_target_header_required) =
1255 data.source_to_target_header_required.take()
1256 {
1257 data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction {
1258 required_header_id: source_to_target_header_required,
1259 })
1260 }
1261
1262 if data.source_latest_confirmed_received_nonce == 10 {
1263 exit_sender.unbounded_send(()).unwrap();
1264 }
1265 }),
1266 exit_receiver.into_future().map(|(_, _)| ()),
1267 );
1268
1269 assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
1274 assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
1275 assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
1276 assert!(!result.submitted_messages_receiving_proofs.is_empty());
1277
1278 assert!(!result.target_to_source_header_requirements.is_empty());
1280 assert!(!result.source_to_target_header_requirements.is_empty());
1281 }
1282
1283 #[test]
1284 fn metrics_prefix_is_valid() {
1285 assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<TestMessageLane>(
1286 &HashedLaneId::try_new(1, 2).unwrap()
1287 )))
1288 .is_ok());
1289
1290 #[derive(Clone)]
1292 pub struct LegacyTestMessageLane;
1293 impl MessageLane for LegacyTestMessageLane {
1294 const SOURCE_NAME: &'static str = "LegacyTestSource";
1295 const TARGET_NAME: &'static str = "LegacyTestTarget";
1296
1297 type MessagesProof = TestMessagesProof;
1298 type MessagesReceivingProof = TestMessagesReceivingProof;
1299
1300 type SourceChainBalance = TestSourceChainBalance;
1301 type SourceHeaderNumber = TestSourceHeaderNumber;
1302 type SourceHeaderHash = TestSourceHeaderHash;
1303
1304 type TargetHeaderNumber = TestTargetHeaderNumber;
1305 type TargetHeaderHash = TestTargetHeaderHash;
1306
1307 type LaneId = LegacyLaneId;
1308 }
1309 assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<LegacyTestMessageLane>(
1310 &LegacyLaneId([0, 0, 0, 1])
1311 )))
1312 .is_ok());
1313 }
1314}