referrerpolicy=no-referrer-when-downgrade

messages_relay/
message_lane_loop.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Message delivery loop. Designed to work with messages pallet.
18//!
19//! Single relay instance delivers messages of single lane in single direction.
20//! To serve two-way lane, you would need two instances of relay.
21//! To serve N two-way lanes, you would need N*2 instances of relay.
22//!
23//! Please keep in mind that the best header in this file is actually best
24//! finalized header. I.e. when talking about headers in lane context, we
25//! only care about finalized headers.
26
27use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
28
29use async_trait::async_trait;
30use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
31
32use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
33use relay_utils::{
34	interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient,
35	retry_backoff, FailedClient, TransactionTracker,
36};
37
38use crate::{
39	message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
40	message_race_delivery::run as run_message_delivery_race,
41	message_race_receiving::run as run_message_receiving_race,
42	metrics::{Labeled, MessageLaneLoopMetrics},
43};
44
45/// Message lane loop configuration params.
46#[derive(Debug, Clone)]
47pub struct Params<LaneId> {
48	/// Id of lane this loop is servicing.
49	pub lane: LaneId,
50	/// Interval at which we ask target node about its updates.
51	pub source_tick: Duration,
52	/// Interval at which we ask target node about its updates.
53	pub target_tick: Duration,
54	/// Delay between moments when connection error happens and our reconnect attempt.
55	pub reconnect_delay: Duration,
56	/// Message delivery race parameters.
57	pub delivery_params: MessageDeliveryParams,
58}
59
60/// Message delivery race parameters.
61#[derive(Debug, Clone)]
62pub struct MessageDeliveryParams {
63	/// Maximal number of unconfirmed relayer entries at the inbound lane. If there's that number
64	/// of entries in the `InboundLaneData::relayers` set, all new messages will be rejected until
65	/// reward payment will be proved (by including outbound lane state to the message delivery
66	/// transaction).
67	pub max_unrewarded_relayer_entries_at_target: MessageNonce,
68	/// Message delivery race will stop delivering messages if there are
69	/// `max_unconfirmed_nonces_at_target` unconfirmed nonces on the target node. The race would
70	/// continue once they're confirmed by the receiving race.
71	pub max_unconfirmed_nonces_at_target: MessageNonce,
72	/// Maximal number of relayed messages in single delivery transaction.
73	pub max_messages_in_single_batch: MessageNonce,
74	/// Maximal cumulative dispatch weight of relayed messages in single delivery transaction.
75	pub max_messages_weight_in_single_batch: Weight,
76	/// Maximal cumulative size of relayed messages in single delivery transaction.
77	pub max_messages_size_in_single_batch: u32,
78}
79
80/// Message details.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct MessageDetails<SourceChainBalance> {
83	/// Message dispatch weight.
84	pub dispatch_weight: Weight,
85	/// Message size (number of bytes in encoded payload).
86	pub size: u32,
87	/// The relayer reward paid in the source chain tokens.
88	pub reward: SourceChainBalance,
89}
90
91/// Messages details map.
92pub type MessageDetailsMap<SourceChainBalance> =
93	BTreeMap<MessageNonce, MessageDetails<SourceChainBalance>>;
94
95/// Message delivery race proof parameters.
96#[derive(Debug, PartialEq, Eq)]
97pub struct MessageProofParameters {
98	/// Include outbound lane state proof?
99	pub outbound_state_proof_required: bool,
100	/// Cumulative dispatch weight of messages that we're building proof for.
101	pub dispatch_weight: Weight,
102}
103
104/// Artifacts of submitting nonces proof.
105pub struct NoncesSubmitArtifacts<T> {
106	/// Submitted nonces range.
107	pub nonces: RangeInclusive<MessageNonce>,
108	/// Submitted transaction tracker.
109	pub tx_tracker: T,
110}
111
112/// Batch transaction that already submit some headers and needs to be extended with
113/// messages/delivery proof before sending.
114pub trait BatchTransaction<HeaderId>: Debug + Send + Sync {
115	/// Header that was required in the original call and which is bundled within this
116	/// batch transaction.
117	fn required_header_id(&self) -> HeaderId;
118}
119
120/// Source client trait.
121#[async_trait]
122pub trait SourceClient<P: MessageLane>: RelayClient {
123	/// Type of batch transaction that submits finality and message receiving proof.
124	type BatchTransaction: BatchTransaction<TargetHeaderIdOf<P>> + Clone;
125	/// Transaction tracker to track submitted transactions.
126	type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>;
127
128	/// Returns state of the client.
129	async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
130
131	/// Get nonce of instance of latest generated message.
132	async fn latest_generated_nonce(
133		&self,
134		id: SourceHeaderIdOf<P>,
135	) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
136
137	/// Get nonce of the latest message, which receiving has been confirmed by the target chain.
138	async fn latest_confirmed_received_nonce(
139		&self,
140		id: SourceHeaderIdOf<P>,
141	) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
142
143	/// Returns mapping of message nonces, generated on this client, to their weights.
144	///
145	/// Some messages may be missing from returned map, if corresponding messages were pruned at
146	/// the source chain.
147	async fn generated_message_details(
148		&self,
149		id: SourceHeaderIdOf<P>,
150		nonces: RangeInclusive<MessageNonce>,
151	) -> Result<MessageDetailsMap<P::SourceChainBalance>, Self::Error>;
152
153	/// Prove messages in inclusive range [begin; end].
154	async fn prove_messages(
155		&self,
156		id: SourceHeaderIdOf<P>,
157		nonces: RangeInclusive<MessageNonce>,
158		proof_parameters: MessageProofParameters,
159	) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), Self::Error>;
160
161	/// Submit messages receiving proof.
162	async fn submit_messages_receiving_proof(
163		&self,
164		maybe_batch_tx: Option<Self::BatchTransaction>,
165		generated_at_block: TargetHeaderIdOf<P>,
166		proof: P::MessagesReceivingProof,
167	) -> Result<Self::TransactionTracker, Self::Error>;
168
169	/// We need given finalized target header on source to continue synchronization.
170	///
171	/// We assume that the absence of header `id` has already been checked by caller.
172	///
173	/// The client may return `Some(_)`, which means that nothing has happened yet and
174	/// the caller must generate and append message receiving proof to the batch transaction
175	/// to actually send it (along with required header) to the node.
176	///
177	/// If function has returned `None`, it means that the caller now must wait for the
178	/// appearance of the target header `id` at the source client.
179	async fn require_target_header_on_source(
180		&self,
181		id: TargetHeaderIdOf<P>,
182	) -> Result<Option<Self::BatchTransaction>, Self::Error>;
183}
184
185/// Target client trait.
186#[async_trait]
187pub trait TargetClient<P: MessageLane>: RelayClient {
188	/// Type of batch transaction that submits finality and messages proof.
189	type BatchTransaction: BatchTransaction<SourceHeaderIdOf<P>> + Clone;
190	/// Transaction tracker to track submitted transactions.
191	type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>;
192
193	/// Returns state of the client.
194	async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
195
196	/// Get nonce of latest received message.
197	async fn latest_received_nonce(
198		&self,
199		id: TargetHeaderIdOf<P>,
200	) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
201
202	/// Get nonce of the latest confirmed message.
203	async fn latest_confirmed_received_nonce(
204		&self,
205		id: TargetHeaderIdOf<P>,
206	) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
207
208	/// Get state of unrewarded relayers set at the inbound lane.
209	async fn unrewarded_relayers_state(
210		&self,
211		id: TargetHeaderIdOf<P>,
212	) -> Result<(TargetHeaderIdOf<P>, UnrewardedRelayersState), Self::Error>;
213
214	/// Prove messages receiving at given block.
215	async fn prove_messages_receiving(
216		&self,
217		id: TargetHeaderIdOf<P>,
218	) -> Result<(TargetHeaderIdOf<P>, P::MessagesReceivingProof), Self::Error>;
219
220	/// Submit messages proof.
221	async fn submit_messages_proof(
222		&self,
223		maybe_batch_tx: Option<Self::BatchTransaction>,
224		generated_at_header: SourceHeaderIdOf<P>,
225		nonces: RangeInclusive<MessageNonce>,
226		proof: P::MessagesProof,
227	) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
228
229	/// We need given finalized source header on target to continue synchronization.
230	///
231	/// The client may return `Some(_)`, which means that nothing has happened yet and
232	/// the caller must generate and append messages proof to the batch transaction
233	/// to actually send it (along with required header) to the node.
234	///
235	/// If function has returned `None`, it means that the caller now must wait for the
236	/// appearance of the source header `id` at the target client.
237	async fn require_source_header_on_target(
238		&self,
239		id: SourceHeaderIdOf<P>,
240	) -> Result<Option<Self::BatchTransaction>, Self::Error>;
241}
242
243/// State of the client.
244#[derive(Clone, Debug, Default, PartialEq, Eq)]
245pub struct ClientState<SelfHeaderId, PeerHeaderId> {
246	/// The best header id of this chain.
247	pub best_self: SelfHeaderId,
248	/// Best finalized header id of this chain.
249	pub best_finalized_self: SelfHeaderId,
250	/// Best finalized header id of the peer chain read at the best block of this chain (at
251	/// `best_finalized_self`).
252	///
253	/// It may be `None` e,g. if peer is a parachain and we haven't yet relayed any parachain
254	/// heads.
255	pub best_finalized_peer_at_best_self: Option<PeerHeaderId>,
256	/// Header id of the peer chain with the number, matching the
257	/// `best_finalized_peer_at_best_self`.
258	pub actual_best_finalized_peer_at_best_self: Option<PeerHeaderId>,
259}
260
261/// State of source client in one-way message lane.
262pub type SourceClientState<P> = ClientState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>;
263
264/// State of target client in one-way message lane.
265pub type TargetClientState<P> = ClientState<TargetHeaderIdOf<P>, SourceHeaderIdOf<P>>;
266
267/// Both clients state.
268#[derive(Debug, Default)]
269pub struct ClientsState<P: MessageLane> {
270	/// Source client state.
271	pub source: Option<SourceClientState<P>>,
272	/// Target client state.
273	pub target: Option<TargetClientState<P>>,
274}
275
276/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs
277/// sync loop.
278pub fn metrics_prefix<P: MessageLane>(lane: &P::LaneId) -> String {
279	format!("{}_to_{}_MessageLane_{}", P::SOURCE_NAME, P::TARGET_NAME, lane.label())
280}
281
282/// Run message lane service loop.
283pub async fn run<P: MessageLane>(
284	params: Params<P::LaneId>,
285	source_client: impl SourceClient<P>,
286	target_client: impl TargetClient<P>,
287	metrics_params: MetricsParams,
288	exit_signal: impl Future<Output = ()> + Send + 'static,
289) -> Result<(), relay_utils::Error> {
290	let exit_signal = exit_signal.shared();
291	relay_utils::relay_loop(source_client, target_client)
292		.reconnect_delay(params.reconnect_delay)
293		.with_metrics(metrics_params)
294		.loop_metric(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<P>(&params.lane)))?)?
295		.expose()
296		.await?
297		.run(metrics_prefix::<P>(&params.lane), move |source_client, target_client, metrics| {
298			run_until_connection_lost(
299				params.clone(),
300				source_client,
301				target_client,
302				metrics,
303				exit_signal.clone(),
304			)
305		})
306		.await
307}
308
309/// Run one-way message delivery loop until connection with target or source node is lost, or exit
310/// signal is received.
311async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
312	params: Params<P::LaneId>,
313	source_client: SC,
314	target_client: TC,
315	metrics_msg: Option<MessageLaneLoopMetrics>,
316	exit_signal: impl Future<Output = ()>,
317) -> Result<(), FailedClient> {
318	let mut source_retry_backoff = retry_backoff();
319	let mut source_client_is_online = false;
320	let mut source_state_required = true;
321	let source_state = source_client.state().fuse();
322	let source_go_offline_future = futures::future::Fuse::terminated();
323	let source_tick_stream = interval(params.source_tick).fuse();
324
325	let mut target_retry_backoff = retry_backoff();
326	let mut target_client_is_online = false;
327	let mut target_state_required = true;
328	let target_state = target_client.state().fuse();
329	let target_go_offline_future = futures::future::Fuse::terminated();
330	let target_tick_stream = interval(params.target_tick).fuse();
331
332	let (
333		(delivery_source_state_sender, delivery_source_state_receiver),
334		(delivery_target_state_sender, delivery_target_state_receiver),
335	) = (unbounded(), unbounded());
336	let delivery_race_loop = run_message_delivery_race(
337		source_client.clone(),
338		delivery_source_state_receiver,
339		target_client.clone(),
340		delivery_target_state_receiver,
341		metrics_msg.clone(),
342		params.delivery_params,
343	)
344	.fuse();
345
346	let (
347		(receiving_source_state_sender, receiving_source_state_receiver),
348		(receiving_target_state_sender, receiving_target_state_receiver),
349	) = (unbounded(), unbounded());
350	let receiving_race_loop = run_message_receiving_race(
351		source_client.clone(),
352		receiving_source_state_receiver,
353		target_client.clone(),
354		receiving_target_state_receiver,
355		metrics_msg.clone(),
356	)
357	.fuse();
358
359	let exit_signal = exit_signal.fuse();
360
361	futures::pin_mut!(
362		source_state,
363		source_go_offline_future,
364		source_tick_stream,
365		target_state,
366		target_go_offline_future,
367		target_tick_stream,
368		delivery_race_loop,
369		receiving_race_loop,
370		exit_signal
371	);
372
373	loop {
374		futures::select! {
375			new_source_state = source_state => {
376				source_state_required = false;
377
378				source_client_is_online = process_future_result(
379					new_source_state,
380					&mut source_retry_backoff,
381					|new_source_state| {
382						log::debug!(
383							target: "bridge",
384							"Received state from {} node: {:?}",
385							P::SOURCE_NAME,
386							new_source_state,
387						);
388						let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
389						let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
390
391						if let Some(metrics_msg) = metrics_msg.as_ref() {
392							metrics_msg.update_source_state::<P>(new_source_state);
393						}
394					},
395					&mut source_go_offline_future,
396					async_std::task::sleep,
397					|| format!("Error retrieving state from {} node", P::SOURCE_NAME),
398				).fail_if_connection_error(FailedClient::Source)?;
399			},
400			_ = source_go_offline_future => {
401				source_client_is_online = true;
402			},
403			_ = source_tick_stream.next() => {
404				source_state_required = true;
405			},
406			new_target_state = target_state => {
407				target_state_required = false;
408
409				target_client_is_online = process_future_result(
410					new_target_state,
411					&mut target_retry_backoff,
412					|new_target_state| {
413						log::debug!(
414							target: "bridge",
415							"Received state from {} node: {:?}",
416							P::TARGET_NAME,
417							new_target_state,
418						);
419						let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
420						let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
421
422						if let Some(metrics_msg) = metrics_msg.as_ref() {
423							metrics_msg.update_target_state::<P>(new_target_state);
424						}
425					},
426					&mut target_go_offline_future,
427					async_std::task::sleep,
428					|| format!("Error retrieving state from {} node", P::TARGET_NAME),
429				).fail_if_connection_error(FailedClient::Target)?;
430			},
431			_ = target_go_offline_future => {
432				target_client_is_online = true;
433			},
434			_ = target_tick_stream.next() => {
435				target_state_required = true;
436			},
437
438			delivery_error = delivery_race_loop => {
439				match delivery_error {
440					Ok(_) => unreachable!("only ends with error; qed"),
441					Err(err) => return Err(err),
442				}
443			},
444			receiving_error = receiving_race_loop => {
445				match receiving_error {
446					Ok(_) => unreachable!("only ends with error; qed"),
447					Err(err) => return Err(err),
448				}
449			},
450
451			() = exit_signal => {
452				return Ok(());
453			}
454		}
455
456		if source_client_is_online && source_state_required {
457			log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
458			source_state.set(source_client.state().fuse());
459			source_client_is_online = false;
460		}
461
462		if target_client_is_online && target_state_required {
463			log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME);
464			target_state.set(target_client.state().fuse());
465			target_client_is_online = false;
466		}
467	}
468}
469
470#[cfg(test)]
471pub(crate) mod tests {
472	use std::sync::Arc;
473
474	use bp_messages::{HashedLaneId, LaneIdType, LegacyLaneId};
475	use futures::stream::StreamExt;
476	use parking_lot::Mutex;
477	use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus};
478
479	use super::*;
480
481	pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId {
482		HeaderId(number, number)
483	}
484
485	pub type TestSourceChainBalance = u64;
486	pub type TestSourceHeaderId = HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash>;
487	pub type TestTargetHeaderId = HeaderId<TestTargetHeaderNumber, TestTargetHeaderHash>;
488
489	pub type TestMessagesProof = (RangeInclusive<MessageNonce>, Option<MessageNonce>);
490	pub type TestMessagesReceivingProof = MessageNonce;
491
492	pub type TestSourceHeaderNumber = u64;
493	pub type TestSourceHeaderHash = u64;
494
495	pub type TestTargetHeaderNumber = u64;
496	pub type TestTargetHeaderHash = u64;
497
498	#[derive(Debug)]
499	pub struct TestError;
500
501	impl MaybeConnectionError for TestError {
502		fn is_connection_error(&self) -> bool {
503			true
504		}
505	}
506
507	/// Lane identifier type used for tests.
508	pub type TestLaneIdType = HashedLaneId;
509
510	#[derive(Clone)]
511	pub struct TestMessageLane;
512
513	impl MessageLane for TestMessageLane {
514		const SOURCE_NAME: &'static str = "TestSource";
515		const TARGET_NAME: &'static str = "TestTarget";
516
517		type MessagesProof = TestMessagesProof;
518		type MessagesReceivingProof = TestMessagesReceivingProof;
519
520		type SourceChainBalance = TestSourceChainBalance;
521		type SourceHeaderNumber = TestSourceHeaderNumber;
522		type SourceHeaderHash = TestSourceHeaderHash;
523
524		type TargetHeaderNumber = TestTargetHeaderNumber;
525		type TargetHeaderHash = TestTargetHeaderHash;
526
527		type LaneId = TestLaneIdType;
528	}
529
530	#[derive(Clone, Debug)]
531	pub struct TestMessagesBatchTransaction {
532		required_header_id: TestSourceHeaderId,
533	}
534
535	#[async_trait]
536	impl BatchTransaction<TestSourceHeaderId> for TestMessagesBatchTransaction {
537		fn required_header_id(&self) -> TestSourceHeaderId {
538			self.required_header_id
539		}
540	}
541
542	#[derive(Clone, Debug)]
543	pub struct TestConfirmationBatchTransaction {
544		required_header_id: TestTargetHeaderId,
545	}
546
547	#[async_trait]
548	impl BatchTransaction<TestTargetHeaderId> for TestConfirmationBatchTransaction {
549		fn required_header_id(&self) -> TestTargetHeaderId {
550			self.required_header_id
551		}
552	}
553
554	#[derive(Clone, Debug)]
555	pub struct TestTransactionTracker(TrackedTransactionStatus<TestTargetHeaderId>);
556
557	impl Default for TestTransactionTracker {
558		fn default() -> TestTransactionTracker {
559			TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
560		}
561	}
562
563	#[async_trait]
564	impl TransactionTracker for TestTransactionTracker {
565		type HeaderId = TestTargetHeaderId;
566
567		async fn wait(self) -> TrackedTransactionStatus<TestTargetHeaderId> {
568			self.0
569		}
570	}
571
572	#[derive(Debug, Clone)]
573	pub struct TestClientData {
574		is_source_fails: bool,
575		is_source_reconnected: bool,
576		source_state: SourceClientState<TestMessageLane>,
577		source_latest_generated_nonce: MessageNonce,
578		source_latest_confirmed_received_nonce: MessageNonce,
579		source_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
580		submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
581		is_target_fails: bool,
582		is_target_reconnected: bool,
583		target_state: SourceClientState<TestMessageLane>,
584		target_latest_received_nonce: MessageNonce,
585		target_latest_confirmed_received_nonce: MessageNonce,
586		target_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
587		submitted_messages_proofs: Vec<TestMessagesProof>,
588		target_to_source_batch_transaction: Option<TestConfirmationBatchTransaction>,
589		target_to_source_header_required: Option<TestTargetHeaderId>,
590		target_to_source_header_requirements: Vec<TestTargetHeaderId>,
591		source_to_target_batch_transaction: Option<TestMessagesBatchTransaction>,
592		source_to_target_header_required: Option<TestSourceHeaderId>,
593		source_to_target_header_requirements: Vec<TestSourceHeaderId>,
594	}
595
596	impl Default for TestClientData {
597		fn default() -> TestClientData {
598			TestClientData {
599				is_source_fails: false,
600				is_source_reconnected: false,
601				source_state: Default::default(),
602				source_latest_generated_nonce: 0,
603				source_latest_confirmed_received_nonce: 0,
604				source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
605					0,
606					Default::default(),
607				)),
608				submitted_messages_receiving_proofs: Vec::new(),
609				is_target_fails: false,
610				is_target_reconnected: false,
611				target_state: Default::default(),
612				target_latest_received_nonce: 0,
613				target_latest_confirmed_received_nonce: 0,
614				target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
615					0,
616					Default::default(),
617				)),
618				submitted_messages_proofs: Vec::new(),
619				target_to_source_batch_transaction: None,
620				target_to_source_header_required: None,
621				target_to_source_header_requirements: Vec::new(),
622				source_to_target_batch_transaction: None,
623				source_to_target_header_required: None,
624				source_to_target_header_requirements: Vec::new(),
625			}
626		}
627	}
628
629	impl TestClientData {
630		fn receive_messages(
631			&mut self,
632			maybe_batch_tx: Option<TestMessagesBatchTransaction>,
633			proof: TestMessagesProof,
634		) {
635			self.target_state.best_self =
636				HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
637			self.target_state.best_finalized_self = self.target_state.best_self;
638			self.target_latest_received_nonce = *proof.0.end();
639			if let Some(maybe_batch_tx) = maybe_batch_tx {
640				self.target_state.best_finalized_peer_at_best_self =
641					Some(maybe_batch_tx.required_header_id());
642			}
643			if let Some(target_latest_confirmed_received_nonce) = proof.1 {
644				self.target_latest_confirmed_received_nonce =
645					target_latest_confirmed_received_nonce;
646			}
647			self.submitted_messages_proofs.push(proof);
648		}
649
650		fn receive_messages_delivery_proof(
651			&mut self,
652			maybe_batch_tx: Option<TestConfirmationBatchTransaction>,
653			proof: TestMessagesReceivingProof,
654		) {
655			self.source_state.best_self =
656				HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
657			self.source_state.best_finalized_self = self.source_state.best_self;
658			if let Some(maybe_batch_tx) = maybe_batch_tx {
659				self.source_state.best_finalized_peer_at_best_self =
660					Some(maybe_batch_tx.required_header_id());
661			}
662			self.submitted_messages_receiving_proofs.push(proof);
663			self.source_latest_confirmed_received_nonce = proof;
664		}
665	}
666
667	#[derive(Clone)]
668	pub struct TestSourceClient {
669		data: Arc<Mutex<TestClientData>>,
670		tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
671		post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
672	}
673
674	impl Default for TestSourceClient {
675		fn default() -> Self {
676			TestSourceClient {
677				data: Arc::new(Mutex::new(TestClientData::default())),
678				tick: Arc::new(|_| {}),
679				post_tick: Arc::new(|_| {}),
680			}
681		}
682	}
683
684	#[async_trait]
685	impl RelayClient for TestSourceClient {
686		type Error = TestError;
687
688		async fn reconnect(&mut self) -> Result<(), TestError> {
689			{
690				let mut data = self.data.lock();
691				(self.tick)(&mut data);
692				data.is_source_reconnected = true;
693				(self.post_tick)(&mut data);
694			}
695			Ok(())
696		}
697	}
698
699	#[async_trait]
700	impl SourceClient<TestMessageLane> for TestSourceClient {
701		type BatchTransaction = TestConfirmationBatchTransaction;
702		type TransactionTracker = TestTransactionTracker;
703
704		async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> {
705			let mut data = self.data.lock();
706			(self.tick)(&mut data);
707			if data.is_source_fails {
708				return Err(TestError)
709			}
710			(self.post_tick)(&mut data);
711			Ok(data.source_state.clone())
712		}
713
714		async fn latest_generated_nonce(
715			&self,
716			id: SourceHeaderIdOf<TestMessageLane>,
717		) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
718			let mut data = self.data.lock();
719			(self.tick)(&mut data);
720			if data.is_source_fails {
721				return Err(TestError)
722			}
723			(self.post_tick)(&mut data);
724			Ok((id, data.source_latest_generated_nonce))
725		}
726
727		async fn latest_confirmed_received_nonce(
728			&self,
729			id: SourceHeaderIdOf<TestMessageLane>,
730		) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
731			let mut data = self.data.lock();
732			(self.tick)(&mut data);
733			(self.post_tick)(&mut data);
734			Ok((id, data.source_latest_confirmed_received_nonce))
735		}
736
737		async fn generated_message_details(
738			&self,
739			_id: SourceHeaderIdOf<TestMessageLane>,
740			nonces: RangeInclusive<MessageNonce>,
741		) -> Result<MessageDetailsMap<TestSourceChainBalance>, TestError> {
742			Ok(nonces
743				.map(|nonce| {
744					(
745						nonce,
746						MessageDetails {
747							dispatch_weight: Weight::from_parts(1, 0),
748							size: 1,
749							reward: 1,
750						},
751					)
752				})
753				.collect())
754		}
755
756		async fn prove_messages(
757			&self,
758			id: SourceHeaderIdOf<TestMessageLane>,
759			nonces: RangeInclusive<MessageNonce>,
760			proof_parameters: MessageProofParameters,
761		) -> Result<
762			(SourceHeaderIdOf<TestMessageLane>, RangeInclusive<MessageNonce>, TestMessagesProof),
763			TestError,
764		> {
765			let mut data = self.data.lock();
766			(self.tick)(&mut data);
767			(self.post_tick)(&mut data);
768			Ok((
769				id,
770				nonces.clone(),
771				(
772					nonces,
773					if proof_parameters.outbound_state_proof_required {
774						Some(data.source_latest_confirmed_received_nonce)
775					} else {
776						None
777					},
778				),
779			))
780		}
781
782		async fn submit_messages_receiving_proof(
783			&self,
784			maybe_batch_tx: Option<Self::BatchTransaction>,
785			_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
786			proof: TestMessagesReceivingProof,
787		) -> Result<Self::TransactionTracker, TestError> {
788			let mut data = self.data.lock();
789			(self.tick)(&mut data);
790			data.receive_messages_delivery_proof(maybe_batch_tx, proof);
791			(self.post_tick)(&mut data);
792			Ok(TestTransactionTracker(data.source_tracked_transaction_status))
793		}
794
795		async fn require_target_header_on_source(
796			&self,
797			id: TargetHeaderIdOf<TestMessageLane>,
798		) -> Result<Option<Self::BatchTransaction>, Self::Error> {
799			let mut data = self.data.lock();
800			data.target_to_source_header_required = Some(id);
801			data.target_to_source_header_requirements.push(id);
802			(self.tick)(&mut data);
803			(self.post_tick)(&mut data);
804
805			Ok(data.target_to_source_batch_transaction.take().map(|mut tx| {
806				tx.required_header_id = id;
807				tx
808			}))
809		}
810	}
811
812	#[derive(Clone)]
813	pub struct TestTargetClient {
814		data: Arc<Mutex<TestClientData>>,
815		tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
816		post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
817	}
818
819	impl Default for TestTargetClient {
820		fn default() -> Self {
821			TestTargetClient {
822				data: Arc::new(Mutex::new(TestClientData::default())),
823				tick: Arc::new(|_| {}),
824				post_tick: Arc::new(|_| {}),
825			}
826		}
827	}
828
829	#[async_trait]
830	impl RelayClient for TestTargetClient {
831		type Error = TestError;
832
833		async fn reconnect(&mut self) -> Result<(), TestError> {
834			{
835				let mut data = self.data.lock();
836				(self.tick)(&mut data);
837				data.is_target_reconnected = true;
838				(self.post_tick)(&mut data);
839			}
840			Ok(())
841		}
842	}
843
844	#[async_trait]
845	impl TargetClient<TestMessageLane> for TestTargetClient {
846		type BatchTransaction = TestMessagesBatchTransaction;
847		type TransactionTracker = TestTransactionTracker;
848
849		async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> {
850			let mut data = self.data.lock();
851			(self.tick)(&mut data);
852			if data.is_target_fails {
853				return Err(TestError)
854			}
855			(self.post_tick)(&mut data);
856			Ok(data.target_state.clone())
857		}
858
859		async fn latest_received_nonce(
860			&self,
861			id: TargetHeaderIdOf<TestMessageLane>,
862		) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
863			let mut data = self.data.lock();
864			(self.tick)(&mut data);
865			if data.is_target_fails {
866				return Err(TestError)
867			}
868			(self.post_tick)(&mut data);
869			Ok((id, data.target_latest_received_nonce))
870		}
871
872		async fn unrewarded_relayers_state(
873			&self,
874			id: TargetHeaderIdOf<TestMessageLane>,
875		) -> Result<(TargetHeaderIdOf<TestMessageLane>, UnrewardedRelayersState), TestError> {
876			Ok((
877				id,
878				UnrewardedRelayersState {
879					unrewarded_relayer_entries: 0,
880					messages_in_oldest_entry: 0,
881					total_messages: 0,
882					last_delivered_nonce: 0,
883				},
884			))
885		}
886
887		async fn latest_confirmed_received_nonce(
888			&self,
889			id: TargetHeaderIdOf<TestMessageLane>,
890		) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
891			let mut data = self.data.lock();
892			(self.tick)(&mut data);
893			if data.is_target_fails {
894				return Err(TestError)
895			}
896			(self.post_tick)(&mut data);
897			Ok((id, data.target_latest_confirmed_received_nonce))
898		}
899
900		async fn prove_messages_receiving(
901			&self,
902			id: TargetHeaderIdOf<TestMessageLane>,
903		) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessagesReceivingProof), TestError> {
904			Ok((id, self.data.lock().target_latest_received_nonce))
905		}
906
907		async fn submit_messages_proof(
908			&self,
909			maybe_batch_tx: Option<Self::BatchTransaction>,
910			_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
911			nonces: RangeInclusive<MessageNonce>,
912			proof: TestMessagesProof,
913		) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, TestError> {
914			let mut data = self.data.lock();
915			(self.tick)(&mut data);
916			if data.is_target_fails {
917				return Err(TestError)
918			}
919			data.receive_messages(maybe_batch_tx, proof);
920			(self.post_tick)(&mut data);
921			Ok(NoncesSubmitArtifacts {
922				nonces,
923				tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
924			})
925		}
926
927		async fn require_source_header_on_target(
928			&self,
929			id: SourceHeaderIdOf<TestMessageLane>,
930		) -> Result<Option<Self::BatchTransaction>, Self::Error> {
931			let mut data = self.data.lock();
932			data.source_to_target_header_required = Some(id);
933			data.source_to_target_header_requirements.push(id);
934			(self.tick)(&mut data);
935			(self.post_tick)(&mut data);
936
937			Ok(data.source_to_target_batch_transaction.take().map(|mut tx| {
938				tx.required_header_id = id;
939				tx
940			}))
941		}
942	}
943
944	fn run_loop_test(
945		data: Arc<Mutex<TestClientData>>,
946		source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
947		source_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
948		target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
949		target_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
950		exit_signal: impl Future<Output = ()> + 'static + Send,
951	) -> TestClientData {
952		async_std::task::block_on(async {
953			let source_client = TestSourceClient {
954				data: data.clone(),
955				tick: source_tick,
956				post_tick: source_post_tick,
957			};
958			let target_client = TestTargetClient {
959				data: data.clone(),
960				tick: target_tick,
961				post_tick: target_post_tick,
962			};
963			let _ = run(
964				Params {
965					lane: TestLaneIdType::try_new(1, 2).unwrap(),
966					source_tick: Duration::from_millis(100),
967					target_tick: Duration::from_millis(100),
968					reconnect_delay: Duration::from_millis(0),
969					delivery_params: MessageDeliveryParams {
970						max_unrewarded_relayer_entries_at_target: 4,
971						max_unconfirmed_nonces_at_target: 4,
972						max_messages_in_single_batch: 4,
973						max_messages_weight_in_single_batch: Weight::from_parts(4, 0),
974						max_messages_size_in_single_batch: 4,
975					},
976				},
977				source_client,
978				target_client,
979				MetricsParams::disabled(),
980				exit_signal,
981			)
982			.await;
983			let result = data.lock().clone();
984			result
985		})
986	}
987
988	#[test]
989	fn message_lane_loop_is_able_to_recover_from_connection_errors() {
990		// with this configuration, source client will return Err, making source client
991		// reconnect. Then the target client will fail with Err + reconnect. Then we finally
992		// able to deliver messages.
993		let (exit_sender, exit_receiver) = unbounded();
994		let result = run_loop_test(
995			Arc::new(Mutex::new(TestClientData {
996				is_source_fails: true,
997				source_state: ClientState {
998					best_self: HeaderId(0, 0),
999					best_finalized_self: HeaderId(0, 0),
1000					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1001					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1002				},
1003				source_latest_generated_nonce: 1,
1004				target_state: ClientState {
1005					best_self: HeaderId(0, 0),
1006					best_finalized_self: HeaderId(0, 0),
1007					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1008					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1009				},
1010				target_latest_received_nonce: 0,
1011				..Default::default()
1012			})),
1013			Arc::new(|data: &mut TestClientData| {
1014				if data.is_source_reconnected {
1015					data.is_source_fails = false;
1016					data.is_target_fails = true;
1017				}
1018			}),
1019			Arc::new(|_| {}),
1020			Arc::new(move |data: &mut TestClientData| {
1021				if data.is_target_reconnected {
1022					data.is_target_fails = false;
1023				}
1024				if data.target_state.best_finalized_peer_at_best_self.unwrap().0 < 10 {
1025					data.target_state.best_finalized_peer_at_best_self = Some(HeaderId(
1026						data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
1027						data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
1028					));
1029				}
1030				if !data.submitted_messages_proofs.is_empty() {
1031					exit_sender.unbounded_send(()).unwrap();
1032				}
1033			}),
1034			Arc::new(|_| {}),
1035			exit_receiver.into_future().map(|(_, _)| ()),
1036		);
1037
1038		assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
1039	}
1040
1041	#[test]
1042	fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
1043		// with this configuration, both source and target clients will mine their transactions, but
1044		// their corresponding nonce won't be updated => reconnect will happen
1045		let (exit_sender, exit_receiver) = unbounded();
1046		let result = run_loop_test(
1047			Arc::new(Mutex::new(TestClientData {
1048				source_state: ClientState {
1049					best_self: HeaderId(0, 0),
1050					best_finalized_self: HeaderId(0, 0),
1051					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1052					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1053				},
1054				source_latest_generated_nonce: 1,
1055				target_state: ClientState {
1056					best_self: HeaderId(0, 0),
1057					best_finalized_self: HeaderId(0, 0),
1058					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1059					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1060				},
1061				target_latest_received_nonce: 0,
1062				..Default::default()
1063			})),
1064			Arc::new(move |data: &mut TestClientData| {
1065				// blocks are produced on every tick
1066				data.source_state.best_self =
1067					HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1068				data.source_state.best_finalized_self = data.source_state.best_self;
1069				// syncing target headers -> source chain
1070				if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
1071					if *last_requirement !=
1072						data.source_state.best_finalized_peer_at_best_self.unwrap()
1073					{
1074						data.source_state.best_finalized_peer_at_best_self =
1075							Some(*last_requirement);
1076					}
1077				}
1078			}),
1079			Arc::new(move |data: &mut TestClientData| {
1080				// if it is the first time we're submitting delivery proof, let's revert changes
1081				// to source status => then the delivery confirmation transaction is "finalized",
1082				// but the state is not altered
1083				if data.submitted_messages_receiving_proofs.len() == 1 {
1084					data.source_latest_confirmed_received_nonce = 0;
1085				}
1086			}),
1087			Arc::new(move |data: &mut TestClientData| {
1088				// blocks are produced on every tick
1089				data.target_state.best_self =
1090					HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1091				data.target_state.best_finalized_self = data.target_state.best_self;
1092				// syncing source headers -> target chain
1093				if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
1094					if *last_requirement !=
1095						data.target_state.best_finalized_peer_at_best_self.unwrap()
1096					{
1097						data.target_state.best_finalized_peer_at_best_self =
1098							Some(*last_requirement);
1099					}
1100				}
1101				// if source has received all messages receiving confirmations => stop
1102				if data.source_latest_confirmed_received_nonce == 1 {
1103					exit_sender.unbounded_send(()).unwrap();
1104				}
1105			}),
1106			Arc::new(move |data: &mut TestClientData| {
1107				// if it is the first time we're submitting messages proof, let's revert changes
1108				// to target status => then the messages delivery transaction is "finalized", but
1109				// the state is not altered
1110				if data.submitted_messages_proofs.len() == 1 {
1111					data.target_latest_received_nonce = 0;
1112					data.target_latest_confirmed_received_nonce = 0;
1113				}
1114			}),
1115			exit_receiver.into_future().map(|(_, _)| ()),
1116		);
1117
1118		assert_eq!(result.submitted_messages_proofs.len(), 2);
1119		assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
1120	}
1121
1122	#[test]
1123	fn message_lane_loop_works() {
1124		let (exit_sender, exit_receiver) = unbounded();
1125		let result = run_loop_test(
1126			Arc::new(Mutex::new(TestClientData {
1127				source_state: ClientState {
1128					best_self: HeaderId(10, 10),
1129					best_finalized_self: HeaderId(10, 10),
1130					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1131					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1132				},
1133				source_latest_generated_nonce: 10,
1134				target_state: ClientState {
1135					best_self: HeaderId(0, 0),
1136					best_finalized_self: HeaderId(0, 0),
1137					best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1138					actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1139				},
1140				target_latest_received_nonce: 0,
1141				..Default::default()
1142			})),
1143			Arc::new(|data: &mut TestClientData| {
1144				// blocks are produced on every tick
1145				data.source_state.best_self =
1146					HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1147				data.source_state.best_finalized_self = data.source_state.best_self;
1148				// headers relay must only be started when we need new target headers at source node
1149				if data.target_to_source_header_required.is_some() {
1150					assert!(
1151						data.source_state.best_finalized_peer_at_best_self.unwrap().0 <
1152							data.target_state.best_self.0
1153					);
1154					data.target_to_source_header_required = None;
1155				}
1156				// syncing target headers -> source chain
1157				if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
1158					if *last_requirement !=
1159						data.source_state.best_finalized_peer_at_best_self.unwrap()
1160					{
1161						data.source_state.best_finalized_peer_at_best_self =
1162							Some(*last_requirement);
1163					}
1164				}
1165			}),
1166			Arc::new(|_| {}),
1167			Arc::new(move |data: &mut TestClientData| {
1168				// blocks are produced on every tick
1169				data.target_state.best_self =
1170					HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1171				data.target_state.best_finalized_self = data.target_state.best_self;
1172				// headers relay must only be started when we need new source headers at target node
1173				if data.source_to_target_header_required.is_some() {
1174					assert!(
1175						data.target_state.best_finalized_peer_at_best_self.unwrap().0 <
1176							data.source_state.best_self.0
1177					);
1178					data.source_to_target_header_required = None;
1179				}
1180				// syncing source headers -> target chain
1181				if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
1182					if *last_requirement !=
1183						data.target_state.best_finalized_peer_at_best_self.unwrap()
1184					{
1185						data.target_state.best_finalized_peer_at_best_self =
1186							Some(*last_requirement);
1187					}
1188				}
1189				// if source has received all messages receiving confirmations => stop
1190				if data.source_latest_confirmed_received_nonce == 10 {
1191					exit_sender.unbounded_send(()).unwrap();
1192				}
1193			}),
1194			Arc::new(|_| {}),
1195			exit_receiver.into_future().map(|(_, _)| ()),
1196		);
1197
1198		// there are no strict restrictions on when reward confirmation should come
1199		// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
1200		// depends on the state of both clients)
1201		// => we do not check it here
1202		assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
1203		assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
1204		assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
1205		assert!(!result.submitted_messages_receiving_proofs.is_empty());
1206
1207		// check that we have at least once required new source->target or target->source headers
1208		assert!(!result.target_to_source_header_requirements.is_empty());
1209		assert!(!result.source_to_target_header_requirements.is_empty());
1210	}
1211
1212	#[test]
1213	fn message_lane_loop_works_with_batch_transactions() {
1214		let (exit_sender, exit_receiver) = unbounded();
1215		let original_data = Arc::new(Mutex::new(TestClientData {
1216			source_state: ClientState {
1217				best_self: HeaderId(10, 10),
1218				best_finalized_self: HeaderId(10, 10),
1219				best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1220				actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1221			},
1222			source_latest_generated_nonce: 10,
1223			target_state: ClientState {
1224				best_self: HeaderId(0, 0),
1225				best_finalized_self: HeaderId(0, 0),
1226				best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1227				actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
1228			},
1229			target_latest_received_nonce: 0,
1230			..Default::default()
1231		}));
1232		let result = run_loop_test(
1233			original_data,
1234			Arc::new(|_| {}),
1235			Arc::new(move |data: &mut TestClientData| {
1236				data.source_state.best_self =
1237					HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
1238				data.source_state.best_finalized_self = data.source_state.best_self;
1239				if let Some(target_to_source_header_required) =
1240					data.target_to_source_header_required.take()
1241				{
1242					data.target_to_source_batch_transaction =
1243						Some(TestConfirmationBatchTransaction {
1244							required_header_id: target_to_source_header_required,
1245						})
1246				}
1247			}),
1248			Arc::new(|_| {}),
1249			Arc::new(move |data: &mut TestClientData| {
1250				data.target_state.best_self =
1251					HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
1252				data.target_state.best_finalized_self = data.target_state.best_self;
1253
1254				if let Some(source_to_target_header_required) =
1255					data.source_to_target_header_required.take()
1256				{
1257					data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction {
1258						required_header_id: source_to_target_header_required,
1259					})
1260				}
1261
1262				if data.source_latest_confirmed_received_nonce == 10 {
1263					exit_sender.unbounded_send(()).unwrap();
1264				}
1265			}),
1266			exit_receiver.into_future().map(|(_, _)| ()),
1267		);
1268
1269		// there are no strict restrictions on when reward confirmation should come
1270		// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
1271		// depends on the state of both clients)
1272		// => we do not check it here
1273		assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
1274		assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
1275		assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
1276		assert!(!result.submitted_messages_receiving_proofs.is_empty());
1277
1278		// check that we have at least once required new source->target or target->source headers
1279		assert!(!result.target_to_source_header_requirements.is_empty());
1280		assert!(!result.source_to_target_header_requirements.is_empty());
1281	}
1282
1283	#[test]
1284	fn metrics_prefix_is_valid() {
1285		assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<TestMessageLane>(
1286			&HashedLaneId::try_new(1, 2).unwrap()
1287		)))
1288		.is_ok());
1289
1290		// with LegacyLaneId
1291		#[derive(Clone)]
1292		pub struct LegacyTestMessageLane;
1293		impl MessageLane for LegacyTestMessageLane {
1294			const SOURCE_NAME: &'static str = "LegacyTestSource";
1295			const TARGET_NAME: &'static str = "LegacyTestTarget";
1296
1297			type MessagesProof = TestMessagesProof;
1298			type MessagesReceivingProof = TestMessagesReceivingProof;
1299
1300			type SourceChainBalance = TestSourceChainBalance;
1301			type SourceHeaderNumber = TestSourceHeaderNumber;
1302			type SourceHeaderHash = TestSourceHeaderHash;
1303
1304			type TargetHeaderNumber = TestTargetHeaderNumber;
1305			type TargetHeaderHash = TestTargetHeaderHash;
1306
1307			type LaneId = LegacyLaneId;
1308		}
1309		assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<LegacyTestMessageLane>(
1310			&LegacyLaneId([0, 0, 0, 1])
1311		)))
1312		.is_ok());
1313	}
1314}