referrerpolicy=no-referrer-when-downgrade

messages_relay/
message_race_delivery.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14//! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target".
15
16use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive};
17
18use async_trait::async_trait;
19use futures::stream::FusedStream;
20
21use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
22use relay_utils::{FailedClient, TrackedTransactionStatus, TransactionTracker};
23
24use crate::{
25	message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
26	message_lane_loop::{
27		MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, NoncesSubmitArtifacts,
28		SourceClient as MessageLaneSourceClient, SourceClientState,
29		TargetClient as MessageLaneTargetClient, TargetClientState,
30	},
31	message_race_limits::{MessageRaceLimits, RelayMessagesBatchReference},
32	message_race_loop::{
33		MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces,
34		TargetClient, TargetClientNonces,
35	},
36	message_race_strategy::BasicStrategy,
37	metrics::MessageLaneLoopMetrics,
38};
39
40/// Run message delivery race.
41pub async fn run<P: MessageLane>(
42	source_client: impl MessageLaneSourceClient<P>,
43	source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
44	target_client: impl MessageLaneTargetClient<P>,
45	target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
46	metrics_msg: Option<MessageLaneLoopMetrics>,
47	params: MessageDeliveryParams,
48) -> Result<(), FailedClient> {
49	crate::message_race_loop::run(
50		MessageDeliveryRaceSource {
51			client: source_client.clone(),
52			metrics_msg: metrics_msg.clone(),
53			_phantom: Default::default(),
54		},
55		source_state_updates,
56		MessageDeliveryRaceTarget {
57			client: target_client.clone(),
58			metrics_msg: metrics_msg.clone(),
59			_phantom: Default::default(),
60		},
61		target_state_updates,
62		MessageDeliveryStrategy::<P> {
63			max_unrewarded_relayer_entries_at_target: params
64				.max_unrewarded_relayer_entries_at_target,
65			max_unconfirmed_nonces_at_target: params.max_unconfirmed_nonces_at_target,
66			max_messages_in_single_batch: params.max_messages_in_single_batch,
67			max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch,
68			max_messages_size_in_single_batch: params.max_messages_size_in_single_batch,
69			latest_confirmed_nonces_at_source: VecDeque::new(),
70			target_nonces: None,
71			strategy: BasicStrategy::new(),
72		},
73	)
74	.await
75}
76
77/// Relay range of messages.
78pub async fn relay_messages_range<P: MessageLane>(
79	source_client: impl MessageLaneSourceClient<P>,
80	target_client: impl MessageLaneTargetClient<P>,
81	at: SourceHeaderIdOf<P>,
82	range: RangeInclusive<MessageNonce>,
83	outbound_state_proof_required: bool,
84) -> Result<(), ()> {
85	// compute cumulative dispatch weight of all messages in given range
86	let dispatch_weight = source_client
87		.generated_message_details(at.clone(), range.clone())
88		.await
89		.map_err(|e| {
90			log::error!(
91				target: "bridge",
92				"Failed to get generated message details at {:?} for messages {:?}: {:?}",
93				at,
94				range,
95				e,
96			);
97		})?
98		.values()
99		.fold(Weight::zero(), |total, details| total.saturating_add(details.dispatch_weight));
100	// prepare messages proof
101	let (at, range, proof) = source_client
102		.prove_messages(
103			at.clone(),
104			range.clone(),
105			MessageProofParameters { outbound_state_proof_required, dispatch_weight },
106		)
107		.await
108		.map_err(|e| {
109			log::error!(
110				target: "bridge",
111				"Failed to generate messages proof at {:?} for messages {:?}: {:?}",
112				at,
113				range,
114				e,
115			);
116		})?;
117	// submit messages proof to the target node
118	let tx_tracker = target_client
119		.submit_messages_proof(None, at, range.clone(), proof)
120		.await
121		.map_err(|e| {
122			log::error!(
123				target: "bridge",
124				"Failed to submit messages proof for messages {:?}: {:?}",
125				range,
126				e,
127			);
128		})?
129		.tx_tracker;
130
131	match tx_tracker.wait().await {
132		TrackedTransactionStatus::Finalized(_) => Ok(()),
133		TrackedTransactionStatus::Lost => {
134			log::error!("Transaction with messages {:?} is considered lost", range,);
135			Err(())
136		},
137	}
138}
139
140/// Message delivery race.
141struct MessageDeliveryRace<P>(std::marker::PhantomData<P>);
142
143impl<P: MessageLane> MessageRace for MessageDeliveryRace<P> {
144	type SourceHeaderId = SourceHeaderIdOf<P>;
145	type TargetHeaderId = TargetHeaderIdOf<P>;
146
147	type MessageNonce = MessageNonce;
148	type Proof = P::MessagesProof;
149
150	fn source_name() -> String {
151		format!("{}::MessagesDelivery", P::SOURCE_NAME)
152	}
153
154	fn target_name() -> String {
155		format!("{}::MessagesDelivery", P::TARGET_NAME)
156	}
157}
158
159/// Message delivery race source, which is a source of the lane.
160struct MessageDeliveryRaceSource<P: MessageLane, C> {
161	client: C,
162	metrics_msg: Option<MessageLaneLoopMetrics>,
163	_phantom: PhantomData<P>,
164}
165
166#[async_trait]
167impl<P, C> SourceClient<MessageDeliveryRace<P>> for MessageDeliveryRaceSource<P, C>
168where
169	P: MessageLane,
170	C: MessageLaneSourceClient<P>,
171{
172	type Error = C::Error;
173	type NoncesRange = MessageDetailsMap<P::SourceChainBalance>;
174	type ProofParameters = MessageProofParameters;
175
176	async fn nonces(
177		&self,
178		at_block: SourceHeaderIdOf<P>,
179		prev_latest_nonce: MessageNonce,
180	) -> Result<(SourceHeaderIdOf<P>, SourceClientNonces<Self::NoncesRange>), Self::Error> {
181		let (at_block, latest_generated_nonce) =
182			self.client.latest_generated_nonce(at_block).await?;
183		let (at_block, latest_confirmed_nonce) =
184			self.client.latest_confirmed_received_nonce(at_block).await?;
185
186		if let Some(metrics_msg) = self.metrics_msg.as_ref() {
187			metrics_msg.update_source_latest_generated_nonce(latest_generated_nonce);
188			metrics_msg.update_source_latest_confirmed_nonce(latest_confirmed_nonce);
189		}
190
191		let new_nonces = if latest_generated_nonce > prev_latest_nonce {
192			self.client
193				.generated_message_details(
194					at_block.clone(),
195					prev_latest_nonce + 1..=latest_generated_nonce,
196				)
197				.await?
198		} else {
199			MessageDetailsMap::new()
200		};
201
202		Ok((
203			at_block,
204			SourceClientNonces { new_nonces, confirmed_nonce: Some(latest_confirmed_nonce) },
205		))
206	}
207
208	async fn generate_proof(
209		&self,
210		at_block: SourceHeaderIdOf<P>,
211		nonces: RangeInclusive<MessageNonce>,
212		proof_parameters: Self::ProofParameters,
213	) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), Self::Error>
214	{
215		self.client.prove_messages(at_block, nonces, proof_parameters).await
216	}
217}
218
219/// Message delivery race target, which is a target of the lane.
220struct MessageDeliveryRaceTarget<P: MessageLane, C> {
221	client: C,
222	metrics_msg: Option<MessageLaneLoopMetrics>,
223	_phantom: PhantomData<P>,
224}
225
226#[async_trait]
227impl<P, C> TargetClient<MessageDeliveryRace<P>> for MessageDeliveryRaceTarget<P, C>
228where
229	P: MessageLane,
230	C: MessageLaneTargetClient<P>,
231{
232	type Error = C::Error;
233	type TargetNoncesData = DeliveryRaceTargetNoncesData;
234	type BatchTransaction = C::BatchTransaction;
235	type TransactionTracker = C::TransactionTracker;
236
237	async fn require_source_header(
238		&self,
239		id: SourceHeaderIdOf<P>,
240	) -> Result<Option<C::BatchTransaction>, Self::Error> {
241		self.client.require_source_header_on_target(id).await
242	}
243
244	async fn nonces(
245		&self,
246		at_block: TargetHeaderIdOf<P>,
247		update_metrics: bool,
248	) -> Result<(TargetHeaderIdOf<P>, TargetClientNonces<DeliveryRaceTargetNoncesData>), Self::Error>
249	{
250		let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
251		let (at_block, latest_confirmed_nonce) =
252			self.client.latest_confirmed_received_nonce(at_block).await?;
253		let (at_block, unrewarded_relayers) =
254			self.client.unrewarded_relayers_state(at_block).await?;
255
256		if update_metrics {
257			if let Some(metrics_msg) = self.metrics_msg.as_ref() {
258				metrics_msg.update_target_latest_received_nonce(latest_received_nonce);
259				metrics_msg.update_target_latest_confirmed_nonce(latest_confirmed_nonce);
260			}
261		}
262
263		Ok((
264			at_block,
265			TargetClientNonces {
266				latest_nonce: latest_received_nonce,
267				nonces_data: DeliveryRaceTargetNoncesData {
268					confirmed_nonce: latest_confirmed_nonce,
269					unrewarded_relayers,
270				},
271			},
272		))
273	}
274
275	async fn submit_proof(
276		&self,
277		maybe_batch_tx: Option<Self::BatchTransaction>,
278		generated_at_block: SourceHeaderIdOf<P>,
279		nonces: RangeInclusive<MessageNonce>,
280		proof: P::MessagesProof,
281	) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
282		self.client
283			.submit_messages_proof(maybe_batch_tx, generated_at_block, nonces, proof)
284			.await
285	}
286}
287
288/// Additional nonces data from the target client used by message delivery race.
289#[derive(Debug, Clone)]
290struct DeliveryRaceTargetNoncesData {
291	/// The latest nonce that we know: (1) has been delivered to us (2) has been confirmed
292	/// back to the source node (by confirmations race) and (3) relayer has received
293	/// reward for (and this has been confirmed by the message delivery race).
294	confirmed_nonce: MessageNonce,
295	/// State of the unrewarded relayers set at the target node.
296	unrewarded_relayers: UnrewardedRelayersState,
297}
298
299/// Messages delivery strategy.
300struct MessageDeliveryStrategy<P: MessageLane> {
301	/// Maximal unrewarded relayer entries at target client.
302	max_unrewarded_relayer_entries_at_target: MessageNonce,
303	/// Maximal unconfirmed nonces at target client.
304	max_unconfirmed_nonces_at_target: MessageNonce,
305	/// Maximal number of messages in the single delivery transaction.
306	max_messages_in_single_batch: MessageNonce,
307	/// Maximal cumulative messages weight in the single delivery transaction.
308	max_messages_weight_in_single_batch: Weight,
309	/// Maximal messages size in the single delivery transaction.
310	max_messages_size_in_single_batch: u32,
311	/// Latest confirmed nonces at the source client + the header id where we have first met this
312	/// nonce.
313	latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
314	/// Target nonces available at the **best** block of the target chain.
315	target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
316	/// Basic delivery strategy.
317	strategy: MessageDeliveryStrategyBase<P>,
318}
319
320type MessageDeliveryStrategyBase<P> = BasicStrategy<
321	<P as MessageLane>::SourceHeaderNumber,
322	<P as MessageLane>::SourceHeaderHash,
323	<P as MessageLane>::TargetHeaderNumber,
324	<P as MessageLane>::TargetHeaderHash,
325	MessageDetailsMap<<P as MessageLane>::SourceChainBalance>,
326	<P as MessageLane>::MessagesProof,
327>;
328
329impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> {
330	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
331		fmt.debug_struct("MessageDeliveryStrategy")
332			.field(
333				"max_unrewarded_relayer_entries_at_target",
334				&self.max_unrewarded_relayer_entries_at_target,
335			)
336			.field("max_unconfirmed_nonces_at_target", &self.max_unconfirmed_nonces_at_target)
337			.field("max_messages_in_single_batch", &self.max_messages_in_single_batch)
338			.field("max_messages_weight_in_single_batch", &self.max_messages_weight_in_single_batch)
339			.field("max_messages_size_in_single_batch", &self.max_messages_size_in_single_batch)
340			.field("latest_confirmed_nonces_at_source", &self.latest_confirmed_nonces_at_source)
341			.field("target_nonces", &self.target_nonces)
342			.field("strategy", &self.strategy)
343			.finish()
344	}
345}
346
347impl<P: MessageLane> MessageDeliveryStrategy<P>
348where
349	P: MessageLane,
350{
351	/// Returns true if some race action can be selected (with `select_race_action`) at given
352	/// `best_finalized_source_header_id_at_best_target` source header at target.
353	async fn can_submit_transaction_with<
354		RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
355	>(
356		&self,
357		mut race_state: RS,
358		maybe_best_finalized_source_header_id_at_best_target: Option<SourceHeaderIdOf<P>>,
359	) -> bool {
360		if let Some(best_finalized_source_header_id_at_best_target) =
361			maybe_best_finalized_source_header_id_at_best_target
362		{
363			race_state.set_best_finalized_source_header_id_at_best_target(
364				best_finalized_source_header_id_at_best_target,
365			);
366
367			return self.select_race_action(race_state).await.is_some()
368		}
369
370		false
371	}
372
373	async fn select_race_action<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
374		&self,
375		race_state: RS,
376	) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
377		// if we have already selected nonces that we want to submit, do nothing
378		if race_state.nonces_to_submit().is_some() {
379			return None
380		}
381
382		// if we already submitted some nonces, do nothing
383		if race_state.nonces_submitted().is_some() {
384			return None
385		}
386
387		let best_target_nonce = self.strategy.best_at_target()?;
388		let best_finalized_source_header_id_at_best_target =
389			race_state.best_finalized_source_header_id_at_best_target()?;
390		let target_nonces = self.target_nonces.as_ref()?;
391		let latest_confirmed_nonce_at_source = self
392			.latest_confirmed_nonce_at_source(&best_finalized_source_header_id_at_best_target)
393			.unwrap_or(target_nonces.nonces_data.confirmed_nonce);
394
395		// There's additional condition in the message delivery race: target would reject messages
396		// if there are too much unconfirmed messages at the inbound lane.
397
398		// Ok - we may have new nonces to deliver. But target may still reject new messages, because
399		// we haven't notified it that (some) messages have been confirmed. So we may want to
400		// include updated `source.latest_confirmed` in the proof.
401		//
402		// Important note: we're including outbound state lane proof whenever there are unconfirmed
403		// nonces on the target chain. Other strategy is to include it only if it's absolutely
404		// necessary.
405		let latest_received_nonce_at_target = target_nonces.latest_nonce;
406		let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
407		let outbound_state_proof_required =
408			latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
409
410		// The target node would also reject messages if there are too many entries in the
411		// "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then
412		// we should wait for confirmations race.
413		let unrewarded_limit_reached =
414			target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >=
415				self.max_unrewarded_relayer_entries_at_target ||
416				target_nonces.nonces_data.unrewarded_relayers.total_messages >=
417					self.max_unconfirmed_nonces_at_target;
418		if unrewarded_limit_reached {
419			// so there are already too many unrewarded relayer entries in the set
420			//
421			// => check if we can prove enough rewards. If not, we should wait for more rewards to
422			// be paid
423			let number_of_rewards_being_proved =
424				latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target);
425			let enough_rewards_being_proved = number_of_rewards_being_proved >=
426				target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry;
427			if !enough_rewards_being_proved {
428				return None
429			}
430		}
431
432		// If we're here, then the confirmations race did its job && sending side now knows that
433		// messages have been delivered. Now let's select nonces that we want to deliver.
434		//
435		// We may deliver at most:
436		//
437		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
438		// latest_confirmed_nonce_at_target)
439		//
440		// messages in the batch. But since we're including outbound state proof in the batch, then
441		// it may be increased to:
442		//
443		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
444		// latest_confirmed_nonce_at_source)
445		let future_confirmed_nonce_at_target = if outbound_state_proof_required {
446			latest_confirmed_nonce_at_source
447		} else {
448			latest_confirmed_nonce_at_target
449		};
450		let max_nonces = latest_received_nonce_at_target
451			.checked_sub(future_confirmed_nonce_at_target)
452			.and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff))
453			.unwrap_or_default();
454		let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch);
455		let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch;
456		let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch;
457
458		// select nonces from nonces, available for delivery
459		let selected_nonces = match self.strategy.available_source_queue_indices(race_state) {
460			Some(available_source_queue_indices) => {
461				let source_queue = self.strategy.source_queue();
462				let reference = RelayMessagesBatchReference::<P> {
463					max_messages_in_this_batch: max_nonces,
464					max_messages_weight_in_single_batch,
465					max_messages_size_in_single_batch,
466					best_target_nonce,
467					nonces_queue: source_queue.clone(),
468					nonces_queue_range: available_source_queue_indices,
469				};
470
471				MessageRaceLimits::decide(reference).await
472			},
473			None => {
474				// we still may need to submit delivery transaction with zero messages to
475				// unblock the lane. But it'll only be accepted if the lane is blocked
476				// (i.e. when `unrewarded_limit_reached` is `true`)
477				None
478			},
479		};
480
481		// check if we need unblocking transaction and we may submit it
482		#[allow(clippy::reversed_empty_ranges)]
483		let selected_nonces = match selected_nonces {
484			Some(selected_nonces) => selected_nonces,
485			None if unrewarded_limit_reached && outbound_state_proof_required => 1..=0,
486			_ => return None,
487		};
488
489		let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
490		Some((
491			selected_nonces,
492			MessageProofParameters { outbound_state_proof_required, dispatch_weight },
493		))
494	}
495
496	/// Returns latest confirmed message at source chain, given source block.
497	fn latest_confirmed_nonce_at_source(&self, at: &SourceHeaderIdOf<P>) -> Option<MessageNonce> {
498		self.latest_confirmed_nonces_at_source
499			.iter()
500			.take_while(|(id, _)| id.0 <= at.0)
501			.last()
502			.map(|(_, nonce)| *nonce)
503	}
504
505	/// Returns total weight of all undelivered messages.
506	fn dispatch_weight_for_range(&self, range: &RangeInclusive<MessageNonce>) -> Weight {
507		self.strategy
508			.source_queue()
509			.iter()
510			.flat_map(|(_, subrange)| {
511				subrange
512					.iter()
513					.filter(|(nonce, _)| range.contains(nonce))
514					.map(|(_, details)| details.dispatch_weight)
515			})
516			.fold(Weight::zero(), |total, weight| total.saturating_add(weight))
517	}
518}
519
520#[async_trait]
521impl<P> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>
522	for MessageDeliveryStrategy<P>
523where
524	P: MessageLane,
525{
526	type SourceNoncesRange = MessageDetailsMap<P::SourceChainBalance>;
527	type ProofParameters = MessageProofParameters;
528	type TargetNoncesData = DeliveryRaceTargetNoncesData;
529
530	fn is_empty(&self) -> bool {
531		self.strategy.is_empty()
532	}
533
534	async fn required_source_header_at_target<
535		RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
536	>(
537		&self,
538		race_state: RS,
539	) -> Option<SourceHeaderIdOf<P>> {
540		// we have already submitted something - let's wait until it is mined
541		if race_state.nonces_submitted().is_some() {
542			return None
543		}
544
545		// if we can deliver something using current race state, go on
546		let selected_nonces = self.select_race_action(race_state.clone()).await;
547		if selected_nonces.is_some() {
548			return None
549		}
550
551		// check if we may deliver some messages if we'll relay require source header
552		// to target first
553		let maybe_source_header_for_delivery =
554			self.strategy.source_queue().back().map(|(id, _)| id.clone());
555		if self
556			.can_submit_transaction_with(
557				race_state.clone(),
558				maybe_source_header_for_delivery.clone(),
559			)
560			.await
561		{
562			return maybe_source_header_for_delivery
563		}
564
565		// ok, we can't delivery anything even if we relay some source blocks first. But maybe
566		// the lane is blocked and we need to submit unblock transaction?
567		let maybe_source_header_for_reward_confirmation =
568			self.latest_confirmed_nonces_at_source.back().map(|(id, _)| id.clone());
569		if self
570			.can_submit_transaction_with(
571				race_state.clone(),
572				maybe_source_header_for_reward_confirmation.clone(),
573			)
574			.await
575		{
576			return maybe_source_header_for_reward_confirmation
577		}
578
579		None
580	}
581
582	fn best_at_source(&self) -> Option<MessageNonce> {
583		self.strategy.best_at_source()
584	}
585
586	fn best_at_target(&self) -> Option<MessageNonce> {
587		self.strategy.best_at_target()
588	}
589
590	fn source_nonces_updated(
591		&mut self,
592		at_block: SourceHeaderIdOf<P>,
593		nonces: SourceClientNonces<Self::SourceNoncesRange>,
594	) {
595		if let Some(confirmed_nonce) = nonces.confirmed_nonce {
596			let is_confirmed_nonce_updated = self
597				.latest_confirmed_nonces_at_source
598				.back()
599				.map(|(_, prev_nonce)| *prev_nonce != confirmed_nonce)
600				.unwrap_or(true);
601			if is_confirmed_nonce_updated {
602				self.latest_confirmed_nonces_at_source
603					.push_back((at_block.clone(), confirmed_nonce));
604			}
605		}
606		self.strategy.source_nonces_updated(at_block, nonces)
607	}
608
609	fn reset_best_target_nonces(&mut self) {
610		self.target_nonces = None;
611		self.strategy.reset_best_target_nonces();
612	}
613
614	fn best_target_nonces_updated<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
615		&mut self,
616		nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
617		race_state: &mut RS,
618	) {
619		// best target nonces must always be ge than finalized target nonces
620		let latest_nonce = nonces.latest_nonce;
621		self.target_nonces = Some(nonces);
622
623		self.strategy.best_target_nonces_updated(
624			TargetClientNonces { latest_nonce, nonces_data: () },
625			race_state,
626		)
627	}
628
629	fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
630		&mut self,
631		nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
632		race_state: &mut RS,
633	) {
634		if let Some(ref best_finalized_source_header_id_at_best_target) =
635			race_state.best_finalized_source_header_id_at_best_target()
636		{
637			let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target.0;
638			while self
639				.latest_confirmed_nonces_at_source
640				.front()
641				.map(|(id, _)| id.0 < oldest_header_number_to_keep)
642				.unwrap_or(false)
643			{
644				self.latest_confirmed_nonces_at_source.pop_front();
645			}
646		}
647
648		if let Some(ref mut target_nonces) = self.target_nonces {
649			target_nonces.latest_nonce =
650				std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
651		}
652
653		self.strategy.finalized_target_nonces_updated(
654			TargetClientNonces { latest_nonce: nonces.latest_nonce, nonces_data: () },
655			race_state,
656		)
657	}
658
659	async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
660		&self,
661		race_state: RS,
662	) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
663		self.select_race_action(race_state).await
664	}
665}
666
667impl<SourceChainBalance: std::fmt::Debug> NoncesRange for MessageDetailsMap<SourceChainBalance> {
668	fn begin(&self) -> MessageNonce {
669		self.keys().next().cloned().unwrap_or_default()
670	}
671
672	fn end(&self) -> MessageNonce {
673		self.keys().next_back().cloned().unwrap_or_default()
674	}
675
676	fn greater_than(mut self, nonce: MessageNonce) -> Option<Self> {
677		let gte = self.split_off(&(nonce + 1));
678		if gte.is_empty() {
679			None
680		} else {
681			Some(gte)
682		}
683	}
684}
685
686#[cfg(test)]
687mod tests {
688	use crate::{
689		message_lane_loop::{
690			tests::{
691				header_id, TestMessageLane, TestMessagesBatchTransaction, TestMessagesProof,
692				TestSourceChainBalance, TestSourceHeaderId, TestTargetHeaderId,
693			},
694			MessageDetails,
695		},
696		message_race_loop::RaceStateImpl,
697	};
698
699	use super::*;
700
701	const DEFAULT_DISPATCH_WEIGHT: Weight = Weight::from_parts(1, 0);
702	const DEFAULT_SIZE: u32 = 1;
703
704	type TestRaceState = RaceStateImpl<
705		TestSourceHeaderId,
706		TestTargetHeaderId,
707		TestMessagesProof,
708		TestMessagesBatchTransaction,
709	>;
710	type TestStrategy = MessageDeliveryStrategy<TestMessageLane>;
711
712	fn source_nonces(
713		new_nonces: RangeInclusive<MessageNonce>,
714		confirmed_nonce: MessageNonce,
715		reward: TestSourceChainBalance,
716	) -> SourceClientNonces<MessageDetailsMap<TestSourceChainBalance>> {
717		SourceClientNonces {
718			new_nonces: new_nonces
719				.into_iter()
720				.map(|nonce| {
721					(
722						nonce,
723						MessageDetails {
724							dispatch_weight: DEFAULT_DISPATCH_WEIGHT,
725							size: DEFAULT_SIZE,
726							reward,
727						},
728					)
729				})
730				.collect(),
731			confirmed_nonce: Some(confirmed_nonce),
732		}
733	}
734
735	fn prepare_strategy() -> (TestRaceState, TestStrategy) {
736		let mut race_state = RaceStateImpl {
737			best_finalized_source_header_id_at_source: Some(header_id(1)),
738			best_finalized_source_header_id_at_best_target: Some(header_id(1)),
739			best_target_header_id: Some(header_id(1)),
740			best_finalized_target_header_id: Some(header_id(1)),
741			nonces_to_submit: None,
742			nonces_to_submit_batch: None,
743			nonces_submitted: None,
744		};
745
746		let mut race_strategy = TestStrategy {
747			max_unrewarded_relayer_entries_at_target: 4,
748			max_unconfirmed_nonces_at_target: 4,
749			max_messages_in_single_batch: 4,
750			max_messages_weight_in_single_batch: Weight::from_parts(4, 0),
751			max_messages_size_in_single_batch: 4,
752			latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(),
753			target_nonces: Some(TargetClientNonces {
754				latest_nonce: 19,
755				nonces_data: DeliveryRaceTargetNoncesData {
756					confirmed_nonce: 19,
757					unrewarded_relayers: UnrewardedRelayersState {
758						unrewarded_relayer_entries: 0,
759						messages_in_oldest_entry: 0,
760						total_messages: 0,
761						last_delivered_nonce: 0,
762					},
763				},
764			}),
765			strategy: BasicStrategy::new(),
766		};
767
768		race_strategy
769			.strategy
770			.source_nonces_updated(header_id(1), source_nonces(20..=23, 19, 0));
771
772		let target_nonces = TargetClientNonces { latest_nonce: 19, nonces_data: () };
773		race_strategy
774			.strategy
775			.best_target_nonces_updated(target_nonces.clone(), &mut race_state);
776		race_strategy
777			.strategy
778			.finalized_target_nonces_updated(target_nonces, &mut race_state);
779
780		(race_state, race_strategy)
781	}
782
783	fn proof_parameters(state_required: bool, weight: u32) -> MessageProofParameters {
784		MessageProofParameters {
785			outbound_state_proof_required: state_required,
786			dispatch_weight: Weight::from_parts(weight as u64, 0),
787		}
788	}
789
790	#[test]
791	fn weights_map_works_as_nonces_range() {
792		fn build_map(
793			range: RangeInclusive<MessageNonce>,
794		) -> MessageDetailsMap<TestSourceChainBalance> {
795			range
796				.map(|idx| {
797					(
798						idx,
799						MessageDetails {
800							dispatch_weight: Weight::from_parts(idx, 0),
801							size: idx as _,
802							reward: idx as _,
803						},
804					)
805				})
806				.collect()
807		}
808
809		let map = build_map(20..=30);
810
811		assert_eq!(map.begin(), 20);
812		assert_eq!(map.end(), 30);
813		assert_eq!(map.clone().greater_than(10), Some(build_map(20..=30)));
814		assert_eq!(map.clone().greater_than(19), Some(build_map(20..=30)));
815		assert_eq!(map.clone().greater_than(20), Some(build_map(21..=30)));
816		assert_eq!(map.clone().greater_than(25), Some(build_map(26..=30)));
817		assert_eq!(map.clone().greater_than(29), Some(build_map(30..=30)));
818		assert_eq!(map.greater_than(30), None);
819	}
820
821	#[async_std::test]
822	async fn message_delivery_strategy_selects_messages_to_deliver() {
823		let (state, strategy) = prepare_strategy();
824
825		// both sides are ready to relay new messages
826		assert_eq!(
827			strategy.select_nonces_to_deliver(state).await,
828			Some(((20..=23), proof_parameters(false, 4)))
829		);
830	}
831
832	#[async_std::test]
833	async fn message_delivery_strategy_includes_outbound_state_proof_when_new_nonces_are_available()
834	{
835		let (state, mut strategy) = prepare_strategy();
836
837		// if there are new confirmed nonces on source, we want to relay this information
838		// to target to prune rewards queue
839		let prev_confirmed_nonce_at_source =
840			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
841		strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce =
842			prev_confirmed_nonce_at_source - 1;
843		assert_eq!(
844			strategy.select_nonces_to_deliver(state).await,
845			Some(((20..=23), proof_parameters(true, 4)))
846		);
847	}
848
849	#[async_std::test]
850	async fn message_delivery_strategy_selects_nothing_if_there_are_too_many_unrewarded_relayers() {
851		let (state, mut strategy) = prepare_strategy();
852
853		// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
854		// we need to wait until rewards will be paid
855		{
856			let unrewarded_relayers =
857				&mut strategy.target_nonces.as_mut().unwrap().nonces_data.unrewarded_relayers;
858			unrewarded_relayers.unrewarded_relayer_entries =
859				strategy.max_unrewarded_relayer_entries_at_target;
860			unrewarded_relayers.messages_in_oldest_entry = 4;
861		}
862		assert_eq!(strategy.select_nonces_to_deliver(state).await, None);
863	}
864
865	#[async_std::test]
866	async fn message_delivery_strategy_selects_nothing_if_proved_rewards_is_not_enough_to_remove_oldest_unrewarded_entry(
867	) {
868		let (state, mut strategy) = prepare_strategy();
869
870		// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
871		// we need to prove at least `messages_in_oldest_entry` rewards
872		let prev_confirmed_nonce_at_source =
873			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
874		{
875			let nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
876			nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
877			let unrewarded_relayers = &mut nonces_data.unrewarded_relayers;
878			unrewarded_relayers.unrewarded_relayer_entries =
879				strategy.max_unrewarded_relayer_entries_at_target;
880			unrewarded_relayers.messages_in_oldest_entry = 4;
881		}
882		assert_eq!(strategy.select_nonces_to_deliver(state).await, None);
883	}
884
885	#[async_std::test]
886	async fn message_delivery_strategy_includes_outbound_state_proof_if_proved_rewards_is_enough() {
887		let (state, mut strategy) = prepare_strategy();
888
889		// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
890		// we need to prove at least `messages_in_oldest_entry` rewards
891		let prev_confirmed_nonce_at_source =
892			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
893		{
894			let nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
895			nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 3;
896			let unrewarded_relayers = &mut nonces_data.unrewarded_relayers;
897			unrewarded_relayers.unrewarded_relayer_entries =
898				strategy.max_unrewarded_relayer_entries_at_target;
899			unrewarded_relayers.messages_in_oldest_entry = 3;
900		}
901		assert_eq!(
902			strategy.select_nonces_to_deliver(state).await,
903			Some(((20..=23), proof_parameters(true, 4)))
904		);
905	}
906
907	#[async_std::test]
908	async fn message_delivery_strategy_limits_batch_by_messages_weight() {
909		let (state, mut strategy) = prepare_strategy();
910
911		// not all queued messages may fit in the batch, because batch has max weight
912		strategy.max_messages_weight_in_single_batch = Weight::from_parts(3, 0);
913		assert_eq!(
914			strategy.select_nonces_to_deliver(state).await,
915			Some(((20..=22), proof_parameters(false, 3)))
916		);
917	}
918
919	#[async_std::test]
920	async fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_weight(
921	) {
922		let (state, mut strategy) = prepare_strategy();
923
924		// first message doesn't fit in the batch, because it has weight (10) that overflows max
925		// weight (4)
926		strategy.strategy.source_queue_mut()[0].1.get_mut(&20).unwrap().dispatch_weight =
927			Weight::from_parts(10, 0);
928		assert_eq!(
929			strategy.select_nonces_to_deliver(state).await,
930			Some(((20..=20), proof_parameters(false, 10)))
931		);
932	}
933
934	#[async_std::test]
935	async fn message_delivery_strategy_limits_batch_by_messages_size() {
936		let (state, mut strategy) = prepare_strategy();
937
938		// not all queued messages may fit in the batch, because batch has max weight
939		strategy.max_messages_size_in_single_batch = 3;
940		assert_eq!(
941			strategy.select_nonces_to_deliver(state).await,
942			Some(((20..=22), proof_parameters(false, 3)))
943		);
944	}
945
946	#[async_std::test]
947	async fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_size(
948	) {
949		let (state, mut strategy) = prepare_strategy();
950
951		// first message doesn't fit in the batch, because it has weight (10) that overflows max
952		// weight (4)
953		strategy.strategy.source_queue_mut()[0].1.get_mut(&20).unwrap().size = 10;
954		assert_eq!(
955			strategy.select_nonces_to_deliver(state).await,
956			Some(((20..=20), proof_parameters(false, 1)))
957		);
958	}
959
960	#[async_std::test]
961	async fn message_delivery_strategy_limits_batch_by_messages_count_when_there_is_upper_limit() {
962		let (state, mut strategy) = prepare_strategy();
963
964		// not all queued messages may fit in the batch, because batch has max number of messages
965		// limit
966		strategy.max_messages_in_single_batch = 3;
967		assert_eq!(
968			strategy.select_nonces_to_deliver(state).await,
969			Some(((20..=22), proof_parameters(false, 3)))
970		);
971	}
972
973	#[async_std::test]
974	async fn message_delivery_strategy_limits_batch_by_messages_count_when_there_are_unconfirmed_nonces(
975	) {
976		let (state, mut strategy) = prepare_strategy();
977
978		// 1 delivery confirmation from target to source is still missing, so we may only
979		// relay 3 new messages
980		let prev_confirmed_nonce_at_source =
981			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
982		strategy.latest_confirmed_nonces_at_source =
983			vec![(header_id(1), prev_confirmed_nonce_at_source - 1)].into_iter().collect();
984		strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce =
985			prev_confirmed_nonce_at_source - 1;
986		assert_eq!(
987			strategy.select_nonces_to_deliver(state).await,
988			Some(((20..=22), proof_parameters(false, 3)))
989		);
990	}
991
992	#[async_std::test]
993	async fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() {
994		// 1 delivery confirmation from target to source is still missing, so we may deliver
995		// reward confirmation with our message delivery transaction. But the problem is that
996		// the reward has been paid at header 2 && this header is still unknown to target node.
997		//
998		// => so we can't deliver more than 3 messages
999		let (mut state, mut strategy) = prepare_strategy();
1000		let prev_confirmed_nonce_at_source =
1001			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
1002		strategy.latest_confirmed_nonces_at_source = vec![
1003			(header_id(1), prev_confirmed_nonce_at_source - 1),
1004			(header_id(2), prev_confirmed_nonce_at_source),
1005		]
1006		.into_iter()
1007		.collect();
1008		strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce =
1009			prev_confirmed_nonce_at_source - 1;
1010		state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
1011		assert_eq!(
1012			strategy.select_nonces_to_deliver(state).await,
1013			Some(((20..=22), proof_parameters(false, 3)))
1014		);
1015
1016		// the same situation, but the header 2 is known to the target node, so we may deliver
1017		// reward confirmation
1018		let (mut state, mut strategy) = prepare_strategy();
1019		let prev_confirmed_nonce_at_source =
1020			strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
1021		strategy.latest_confirmed_nonces_at_source = vec![
1022			(header_id(1), prev_confirmed_nonce_at_source - 1),
1023			(header_id(2), prev_confirmed_nonce_at_source),
1024		]
1025		.into_iter()
1026		.collect();
1027		strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce =
1028			prev_confirmed_nonce_at_source - 1;
1029		state.best_finalized_source_header_id_at_source = Some(header_id(2));
1030		state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
1031		assert_eq!(
1032			strategy.select_nonces_to_deliver(state).await,
1033			Some(((20..=23), proof_parameters(true, 4)))
1034		);
1035	}
1036
1037	#[async_std::test]
1038	async fn source_header_is_required_when_confirmations_are_required() {
1039		// let's prepare situation when:
1040		// - all messages [20; 23] have been generated at source block#1;
1041		let (mut state, mut strategy) = prepare_strategy();
1042		//
1043		// - messages [20; 23] have been delivered
1044		assert_eq!(
1045			strategy.select_nonces_to_deliver(state.clone()).await,
1046			Some(((20..=23), proof_parameters(false, 4)))
1047		);
1048		strategy.finalized_target_nonces_updated(
1049			TargetClientNonces {
1050				latest_nonce: 23,
1051				nonces_data: DeliveryRaceTargetNoncesData {
1052					confirmed_nonce: 19,
1053					unrewarded_relayers: UnrewardedRelayersState {
1054						unrewarded_relayer_entries: 1,
1055						messages_in_oldest_entry: 4,
1056						total_messages: 4,
1057						last_delivered_nonce: 23,
1058					},
1059				},
1060			},
1061			&mut state,
1062		);
1063		// nothing needs to be delivered now and we don't need any new headers
1064		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
1065		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1066
1067		// block#2 is generated
1068		state.best_finalized_source_header_id_at_source = Some(header_id(2));
1069		state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
1070		state.best_target_header_id = Some(header_id(2));
1071		state.best_finalized_target_header_id = Some(header_id(2));
1072
1073		// now let's generate two more nonces [24; 25] at the source;
1074		strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0));
1075		//
1076		// we don't need to relay more headers to target, because messages [20; 23] have
1077		// not confirmed to source yet
1078		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
1079		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1080
1081		// let's relay source block#3
1082		state.best_finalized_source_header_id_at_source = Some(header_id(3));
1083		state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
1084		state.best_target_header_id = Some(header_id(3));
1085		state.best_finalized_target_header_id = Some(header_id(3));
1086
1087		// and ask strategy again => still nothing to deliver, because parallel confirmations
1088		// race need to be pushed further
1089		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
1090		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1091
1092		// let's relay source block#3
1093		state.best_finalized_source_header_id_at_source = Some(header_id(4));
1094		state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
1095		state.best_target_header_id = Some(header_id(4));
1096		state.best_finalized_target_header_id = Some(header_id(4));
1097
1098		// let's confirm messages [20; 23]
1099		strategy.source_nonces_updated(header_id(4), source_nonces(24..=25, 23, 0));
1100
1101		// and ask strategy again => now we have everything required to deliver remaining
1102		// [24; 25] nonces and proof of [20; 23] confirmation
1103		assert_eq!(
1104			strategy.select_nonces_to_deliver(state.clone()).await,
1105			Some(((24..=25), proof_parameters(true, 2))),
1106		);
1107		assert_eq!(strategy.required_source_header_at_target(state).await, None);
1108	}
1109
1110	#[async_std::test]
1111	async fn relayer_uses_flattened_view_of_the_source_queue_to_select_nonces() {
1112		// Real scenario that has happened on test deployments:
1113		// 1) relayer witnessed M1 at block 1 => it has separate entry in the `source_queue`
1114		// 2) relayer witnessed M2 at block 2 => it has separate entry in the `source_queue`
1115		// 3) if block 2 is known to the target node, then both M1 and M2 are selected for single
1116		// delivery,    even though weight(M1+M2) > larger than largest allowed weight
1117		//
1118		// This was happening because selector (`select_nonces_for_delivery_transaction`) has been
1119		// called for every `source_queue` entry separately without preserving any context.
1120		let (mut state, mut strategy) = prepare_strategy();
1121		let nonces = source_nonces(24..=25, 19, 0);
1122		strategy.strategy.source_nonces_updated(header_id(2), nonces);
1123		strategy.max_unrewarded_relayer_entries_at_target = 100;
1124		strategy.max_unconfirmed_nonces_at_target = 100;
1125		strategy.max_messages_in_single_batch = 5;
1126		strategy.max_messages_weight_in_single_batch = Weight::from_parts(100, 0);
1127		strategy.max_messages_size_in_single_batch = 100;
1128		state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
1129
1130		assert_eq!(
1131			strategy.select_nonces_to_deliver(state).await,
1132			Some(((20..=24), proof_parameters(false, 5)))
1133		);
1134	}
1135
1136	#[async_std::test]
1137	#[allow(clippy::reversed_empty_ranges)]
1138	async fn no_source_headers_required_at_target_if_lanes_are_empty() {
1139		let (state, _) = prepare_strategy();
1140		let mut strategy = TestStrategy {
1141			max_unrewarded_relayer_entries_at_target: 4,
1142			max_unconfirmed_nonces_at_target: 4,
1143			max_messages_in_single_batch: 4,
1144			max_messages_weight_in_single_batch: Weight::from_parts(4, 0),
1145			max_messages_size_in_single_batch: 4,
1146			latest_confirmed_nonces_at_source: VecDeque::new(),
1147			target_nonces: None,
1148			strategy: BasicStrategy::new(),
1149		};
1150
1151		let source_header_id = header_id(10);
1152		strategy.source_nonces_updated(
1153			source_header_id,
1154			// MessageDeliveryRaceSource::nonces returns Some(0), because that's how it is
1155			// represented in memory (there's no Options in OutboundLaneState)
1156			source_nonces(1u64..=0u64, 0, 0),
1157		);
1158
1159		// even though `latest_confirmed_nonces_at_source` is not empty, new headers are not
1160		// requested
1161		assert_eq!(
1162			strategy.latest_confirmed_nonces_at_source,
1163			VecDeque::from([(source_header_id, 0)])
1164		);
1165		assert_eq!(strategy.required_source_header_at_target(state).await, None);
1166	}
1167
1168	#[async_std::test]
1169	async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() {
1170		// this is the copy of the similar test in the `mesage_race_strategy.rs`, but it also tests
1171		// that the `MessageDeliveryStrategy` acts properly in the similar scenario
1172
1173		// tune parameters to allow 5 nonces per delivery transaction
1174		let (mut state, mut strategy) = prepare_strategy();
1175		strategy.max_unrewarded_relayer_entries_at_target = 5;
1176		strategy.max_unconfirmed_nonces_at_target = 5;
1177		strategy.max_messages_in_single_batch = 5;
1178		strategy.max_messages_weight_in_single_batch = Weight::from_parts(5, 0);
1179		strategy.max_messages_size_in_single_batch = 5;
1180
1181		// in this state we have 4 available nonces for delivery
1182		assert_eq!(
1183			strategy.select_nonces_to_deliver(state.clone()).await,
1184			Some((
1185				20..=23,
1186				MessageProofParameters {
1187					outbound_state_proof_required: false,
1188					dispatch_weight: Weight::from_parts(4, 0),
1189				}
1190			)),
1191		);
1192
1193		// let's say we have submitted 20..=23
1194		state.nonces_submitted = Some(20..=23);
1195
1196		// then new nonce 24 appear at the source block 2
1197		let new_nonce_24 = vec![(
1198			24,
1199			MessageDetails { dispatch_weight: Weight::from_parts(1, 0), size: 0, reward: 0 },
1200		)]
1201		.into_iter()
1202		.collect();
1203		let source_header_2 = header_id(2);
1204		state.best_finalized_source_header_id_at_source = Some(source_header_2);
1205		strategy.source_nonces_updated(
1206			source_header_2,
1207			SourceClientNonces { new_nonces: new_nonce_24, confirmed_nonce: None },
1208		);
1209		// and nonce 23 appear at the best block of the target node (best finalized still has 0
1210		// nonces)
1211		let target_nonces_data = DeliveryRaceTargetNoncesData {
1212			confirmed_nonce: 19,
1213			unrewarded_relayers: UnrewardedRelayersState::default(),
1214		};
1215		let target_header_2 = header_id(2);
1216		state.best_target_header_id = Some(target_header_2);
1217		strategy.best_target_nonces_updated(
1218			TargetClientNonces { latest_nonce: 23, nonces_data: target_nonces_data.clone() },
1219			&mut state,
1220		);
1221
1222		// then best target header is retracted
1223		strategy.best_target_nonces_updated(
1224			TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
1225			&mut state,
1226		);
1227
1228		// ... and some fork with 19 delivered nonces is finalized
1229		let target_header_2_fork = header_id(2_1);
1230		state.best_finalized_source_header_id_at_source = Some(source_header_2);
1231		state.best_finalized_source_header_id_at_best_target = Some(source_header_2);
1232		state.best_target_header_id = Some(target_header_2_fork);
1233		state.best_finalized_target_header_id = Some(target_header_2_fork);
1234		strategy.finalized_target_nonces_updated(
1235			TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
1236			&mut state,
1237		);
1238
1239		// now we have to select nonces 20..=23 for delivery again
1240		assert_eq!(
1241			strategy.select_nonces_to_deliver(state.clone()).await,
1242			Some((
1243				20..=24,
1244				MessageProofParameters {
1245					outbound_state_proof_required: false,
1246					dispatch_weight: Weight::from_parts(5, 0),
1247				}
1248			)),
1249		);
1250	}
1251
1252	#[async_std::test]
1253	#[allow(clippy::reversed_empty_ranges)]
1254	async fn delivery_race_is_able_to_unblock_lane() {
1255		// step 1: messages 20..=23 are delivered from source to target at target block 2
1256		fn at_target_block_2_deliver_messages(
1257			strategy: &mut TestStrategy,
1258			state: &mut TestRaceState,
1259			occupied_relayer_slots: MessageNonce,
1260			occupied_message_slots: MessageNonce,
1261		) {
1262			let nonces_at_target = TargetClientNonces {
1263				latest_nonce: 23,
1264				nonces_data: DeliveryRaceTargetNoncesData {
1265					confirmed_nonce: 19,
1266					unrewarded_relayers: UnrewardedRelayersState {
1267						unrewarded_relayer_entries: occupied_relayer_slots,
1268						total_messages: occupied_message_slots,
1269						..Default::default()
1270					},
1271				},
1272			};
1273
1274			state.best_target_header_id = Some(header_id(2));
1275			state.best_finalized_target_header_id = Some(header_id(2));
1276
1277			strategy.best_target_nonces_updated(nonces_at_target.clone(), state);
1278			strategy.finalized_target_nonces_updated(nonces_at_target, state);
1279		}
1280
1281		// step 2: delivery of messages 20..=23 is confirmed to the source node at source block 2
1282		fn at_source_block_2_deliver_confirmations(
1283			strategy: &mut TestStrategy,
1284			state: &mut TestRaceState,
1285		) {
1286			state.best_finalized_source_header_id_at_source = Some(header_id(2));
1287
1288			strategy.source_nonces_updated(
1289				header_id(2),
1290				SourceClientNonces { new_nonces: Default::default(), confirmed_nonce: Some(23) },
1291			);
1292		}
1293
1294		// step 3: finalize source block 2 at target block 3 and select nonces to deliver
1295		async fn at_target_block_3_select_nonces_to_deliver(
1296			strategy: &TestStrategy,
1297			mut state: TestRaceState,
1298		) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
1299			state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
1300			state.best_target_header_id = Some(header_id(3));
1301			state.best_finalized_target_header_id = Some(header_id(3));
1302
1303			strategy.select_nonces_to_deliver(state).await
1304		}
1305
1306		let max_unrewarded_relayer_entries_at_target = 4;
1307		let max_unconfirmed_nonces_at_target = 4;
1308		let expected_rewards_proof = Some((
1309			1..=0,
1310			MessageProofParameters {
1311				outbound_state_proof_required: true,
1312				dispatch_weight: Weight::zero(),
1313			},
1314		));
1315
1316		// when lane is NOT blocked
1317		let (mut state, mut strategy) = prepare_strategy();
1318		at_target_block_2_deliver_messages(
1319			&mut strategy,
1320			&mut state,
1321			max_unrewarded_relayer_entries_at_target - 1,
1322			max_unconfirmed_nonces_at_target - 1,
1323		);
1324		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1325		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1326		assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None);
1327
1328		// when lane is blocked by no-relayer-slots in unrewarded relayers vector
1329		let (mut state, mut strategy) = prepare_strategy();
1330		at_target_block_2_deliver_messages(
1331			&mut strategy,
1332			&mut state,
1333			max_unrewarded_relayer_entries_at_target,
1334			max_unconfirmed_nonces_at_target - 1,
1335		);
1336		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1337		assert_eq!(
1338			strategy.required_source_header_at_target(state.clone()).await,
1339			Some(header_id(2))
1340		);
1341		assert_eq!(
1342			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
1343			expected_rewards_proof
1344		);
1345
1346		// when lane is blocked by no-message-slots in unrewarded relayers vector
1347		let (mut state, mut strategy) = prepare_strategy();
1348		at_target_block_2_deliver_messages(
1349			&mut strategy,
1350			&mut state,
1351			max_unrewarded_relayer_entries_at_target - 1,
1352			max_unconfirmed_nonces_at_target,
1353		);
1354		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1355		assert_eq!(
1356			strategy.required_source_header_at_target(state.clone()).await,
1357			Some(header_id(2))
1358		);
1359		assert_eq!(
1360			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
1361			expected_rewards_proof
1362		);
1363
1364		// when lane is blocked by no-message-slots and no-message-slots in unrewarded relayers
1365		// vector
1366		let (mut state, mut strategy) = prepare_strategy();
1367		at_target_block_2_deliver_messages(
1368			&mut strategy,
1369			&mut state,
1370			max_unrewarded_relayer_entries_at_target - 1,
1371			max_unconfirmed_nonces_at_target,
1372		);
1373		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1374		assert_eq!(
1375			strategy.required_source_header_at_target(state.clone()).await,
1376			Some(header_id(2))
1377		);
1378		assert_eq!(
1379			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
1380			expected_rewards_proof
1381		);
1382
1383		// when we have already selected some nonces to deliver, we don't need to select anything
1384		let (mut state, mut strategy) = prepare_strategy();
1385		at_target_block_2_deliver_messages(
1386			&mut strategy,
1387			&mut state,
1388			max_unrewarded_relayer_entries_at_target - 1,
1389			max_unconfirmed_nonces_at_target,
1390		);
1391		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1392		state.nonces_to_submit = Some((header_id(2), 1..=0, (1..=0, None)));
1393		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1394		assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None);
1395
1396		// when we have already submitted some nonces, we don't need to select anything
1397		let (mut state, mut strategy) = prepare_strategy();
1398		at_target_block_2_deliver_messages(
1399			&mut strategy,
1400			&mut state,
1401			max_unrewarded_relayer_entries_at_target - 1,
1402			max_unconfirmed_nonces_at_target,
1403		);
1404		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
1405		state.nonces_submitted = Some(1..=0);
1406		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
1407		assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None);
1408	}
1409
1410	#[async_std::test]
1411	async fn outbound_state_proof_is_not_required_when_we_have_no_new_confirmations() {
1412		let (mut state, mut strategy) = prepare_strategy();
1413
1414		// pretend that we haven't seen any confirmations yet (or they're at the future target chain
1415		// blocks)
1416		strategy.latest_confirmed_nonces_at_source.clear();
1417
1418		// emulate delivery of some nonces (20..=23 are generated, but we only deliver 20..=21)
1419		let nonces_at_target = TargetClientNonces {
1420			latest_nonce: 21,
1421			nonces_data: DeliveryRaceTargetNoncesData {
1422				confirmed_nonce: 19,
1423				unrewarded_relayers: UnrewardedRelayersState {
1424					unrewarded_relayer_entries: 1,
1425					total_messages: 2,
1426					..Default::default()
1427				},
1428			},
1429		};
1430		state.best_target_header_id = Some(header_id(2));
1431		state.best_finalized_target_header_id = Some(header_id(2));
1432		strategy.best_target_nonces_updated(nonces_at_target.clone(), &mut state);
1433		strategy.finalized_target_nonces_updated(nonces_at_target, &mut state);
1434
1435		// we won't include outbound lane state proof into 22..=23 delivery transaction
1436		// because it brings no new reward confirmations
1437		assert_eq!(
1438			strategy.select_nonces_to_deliver(state).await,
1439			Some(((22..=23), proof_parameters(false, 2)))
1440		);
1441	}
1442}