referrerpolicy=no-referrer-when-downgrade

messages_relay/
message_race_loop.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14//! Loop that is serving single race within message lane. This could be
15//! message delivery race, receiving confirmations race or processing
16//! confirmations race.
17//!
18//! The idea of the race is simple - we have `nonce`-s on source and target
19//! nodes. We're trying to prove that the source node has this nonce (and
20//! associated data - like messages, lane state, etc) to the target node by
21//! generating and submitting proof.
22
23use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifacts};
24
25use async_trait::async_trait;
26use bp_messages::MessageNonce;
27use futures::{
28	future::{FutureExt, TryFutureExt},
29	stream::{FusedStream, StreamExt},
30};
31use relay_utils::{
32	process_future_result, retry_backoff, FailedClient, MaybeConnectionError,
33	TrackedTransactionStatus, TransactionTracker,
34};
35use std::{
36	fmt::Debug,
37	ops::RangeInclusive,
38	time::{Duration, Instant},
39};
40
41/// One of races within lane.
42pub trait MessageRace {
43	/// Header id of the race source.
44	type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync;
45	/// Header id of the race source.
46	type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync;
47
48	/// Message nonce used in the race.
49	type MessageNonce: Debug + Clone;
50	/// Proof that is generated and delivered in this race.
51	type Proof: Debug + Clone + Send + Sync;
52
53	/// Name of the race source.
54	fn source_name() -> String;
55	/// Name of the race target.
56	fn target_name() -> String;
57}
58
59/// State of race source client.
60type SourceClientState<P> =
61	ClientState<<P as MessageRace>::SourceHeaderId, <P as MessageRace>::TargetHeaderId>;
62
63/// State of race target client.
64type TargetClientState<P> =
65	ClientState<<P as MessageRace>::TargetHeaderId, <P as MessageRace>::SourceHeaderId>;
66
67/// Inclusive nonces range.
68pub trait NoncesRange: Debug + Sized {
69	/// Get begin of the range.
70	fn begin(&self) -> MessageNonce;
71	/// Get end of the range.
72	fn end(&self) -> MessageNonce;
73	/// Returns new range with current range nonces that are greater than the passed `nonce`.
74	/// If there are no such nonces, `None` is returned.
75	fn greater_than(self, nonce: MessageNonce) -> Option<Self>;
76}
77
78/// Nonces on the race source client.
79#[derive(Debug, Clone)]
80pub struct SourceClientNonces<NoncesRange> {
81	/// New nonces range known to the client. `New` here means all nonces generated after
82	/// `prev_latest_nonce` passed to the `SourceClient::nonces` method.
83	pub new_nonces: NoncesRange,
84	/// The latest nonce that is confirmed to the bridged client. This nonce only makes
85	/// sense in some races. In other races it is `None`.
86	pub confirmed_nonce: Option<MessageNonce>,
87}
88
89/// Nonces on the race target client.
90#[derive(Debug, Clone)]
91pub struct TargetClientNonces<TargetNoncesData> {
92	/// The latest nonce that is known to the target client.
93	pub latest_nonce: MessageNonce,
94	/// Additional data from target node that may be used by the race.
95	pub nonces_data: TargetNoncesData,
96}
97
98/// One of message lane clients, which is source client for the race.
99#[async_trait]
100pub trait SourceClient<P: MessageRace> {
101	/// Type of error these clients returns.
102	type Error: std::fmt::Debug + MaybeConnectionError;
103	/// Type of nonces range returned by the source client.
104	type NoncesRange: NoncesRange;
105	/// Additional proof parameters required to generate proof.
106	type ProofParameters;
107
108	/// Return nonces that are known to the source client.
109	async fn nonces(
110		&self,
111		at_block: P::SourceHeaderId,
112		prev_latest_nonce: MessageNonce,
113	) -> Result<(P::SourceHeaderId, SourceClientNonces<Self::NoncesRange>), Self::Error>;
114	/// Generate proof for delivering to the target client.
115	async fn generate_proof(
116		&self,
117		at_block: P::SourceHeaderId,
118		nonces: RangeInclusive<MessageNonce>,
119		proof_parameters: Self::ProofParameters,
120	) -> Result<(P::SourceHeaderId, RangeInclusive<MessageNonce>, P::Proof), Self::Error>;
121}
122
123/// One of message lane clients, which is target client for the race.
124#[async_trait]
125pub trait TargetClient<P: MessageRace> {
126	/// Type of error these clients returns.
127	type Error: std::fmt::Debug + MaybeConnectionError;
128	/// Type of the additional data from the target client, used by the race.
129	type TargetNoncesData: std::fmt::Debug;
130	/// Type of batch transaction that submits finality and proof to the target node.
131	type BatchTransaction: BatchTransaction<P::SourceHeaderId> + Clone;
132	/// Transaction tracker to track submitted transactions.
133	type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>;
134
135	/// Ask headers relay to relay finalized headers up to (and including) given header
136	/// from race source to race target.
137	///
138	/// The client may return `Some(_)`, which means that nothing has happened yet and
139	/// the caller must generate and append proof to the batch transaction
140	/// to actually send it (along with required header) to the node.
141	///
142	/// If function has returned `None`, it means that the caller now must wait for the
143	/// appearance of the required header `id` at the target client.
144	async fn require_source_header(
145		&self,
146		id: P::SourceHeaderId,
147	) -> Result<Option<Self::BatchTransaction>, Self::Error>;
148
149	/// Return nonces that are known to the target client.
150	async fn nonces(
151		&self,
152		at_block: P::TargetHeaderId,
153		update_metrics: bool,
154	) -> Result<(P::TargetHeaderId, TargetClientNonces<Self::TargetNoncesData>), Self::Error>;
155	/// Submit proof to the target client.
156	async fn submit_proof(
157		&self,
158		maybe_batch_tx: Option<Self::BatchTransaction>,
159		generated_at_block: P::SourceHeaderId,
160		nonces: RangeInclusive<MessageNonce>,
161		proof: P::Proof,
162	) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
163}
164
165/// Race strategy.
166#[async_trait]
167pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
168	/// Type of nonces range expected from the source client.
169	type SourceNoncesRange: NoncesRange;
170	/// Additional proof parameters required to generate proof.
171	type ProofParameters;
172	/// Additional data expected from the target client.
173	type TargetNoncesData;
174
175	/// Should return true if nothing has to be synced.
176	fn is_empty(&self) -> bool;
177	/// Return id of source header that is required to be on target to continue synchronization.
178	async fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
179		&self,
180		race_state: RS,
181	) -> Option<SourceHeaderId>;
182	/// Return the best nonce at source node.
183	///
184	/// `Some` is returned only if we are sure that the value is greater or equal
185	/// than the result of `best_at_target`.
186	fn best_at_source(&self) -> Option<MessageNonce>;
187	/// Return the best nonce at target node.
188	///
189	/// May return `None` if value is yet unknown.
190	fn best_at_target(&self) -> Option<MessageNonce>;
191
192	/// Called when nonces are updated at source node of the race.
193	fn source_nonces_updated(
194		&mut self,
195		at_block: SourceHeaderId,
196		nonces: SourceClientNonces<Self::SourceNoncesRange>,
197	);
198	/// Called when we want to wait until next `best_target_nonces_updated` before selecting
199	/// any nonces for delivery.
200	fn reset_best_target_nonces(&mut self);
201	/// Called when best nonces are updated at target node of the race.
202	fn best_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
203		&mut self,
204		nonces: TargetClientNonces<Self::TargetNoncesData>,
205		race_state: &mut RS,
206	);
207	/// Called when finalized nonces are updated at target node of the race.
208	fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
209		&mut self,
210		nonces: TargetClientNonces<Self::TargetNoncesData>,
211		race_state: &mut RS,
212	);
213	/// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated
214	/// data) from source to target node.
215	/// Additionally, parameters required to generate proof are returned.
216	async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
217		&self,
218		race_state: RS,
219	) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
220}
221
222/// State of the race.
223pub trait RaceState<SourceHeaderId, TargetHeaderId>: Clone + Send + Sync {
224	/// Set best finalized source header id at the best block on the target
225	/// client (at the `best_finalized_source_header_id_at_best_target`).
226	fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId);
227
228	/// Best finalized source header id at the best block on the target
229	/// client (at the `best_finalized_source_header_id_at_best_target`).
230	fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId>;
231
232	/// Returns `true` if we have selected nonces to submit to the target node.
233	fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>>;
234	/// Reset our nonces selection.
235	fn reset_nonces_to_submit(&mut self);
236
237	/// Returns `true` if we have submitted some nonces to the target node and are
238	/// waiting for them to appear there.
239	fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>>;
240	/// Reset our nonces submission.
241	fn reset_nonces_submitted(&mut self);
242}
243
244/// State of the race and prepared batch transaction (if available).
245#[derive(Debug, Clone)]
246pub(crate) struct RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> {
247	/// Best finalized source header id at the source client.
248	pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>,
249	/// Best finalized source header id at the best block on the target
250	/// client (at the `best_finalized_source_header_id_at_best_target`).
251	pub best_finalized_source_header_id_at_best_target: Option<SourceHeaderId>,
252	/// The best header id at the target client.
253	pub best_target_header_id: Option<TargetHeaderId>,
254	/// Best finalized header id at the target client.
255	pub best_finalized_target_header_id: Option<TargetHeaderId>,
256	/// Range of nonces that we have selected to submit.
257	pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
258	/// Batch transaction ready to include and deliver selected `nonces_to_submit` from the
259	/// `state`.
260	pub nonces_to_submit_batch: Option<BatchTx>,
261	/// Range of nonces that is currently submitted.
262	pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
263}
264
265impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default
266	for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
267{
268	fn default() -> Self {
269		RaceStateImpl {
270			best_finalized_source_header_id_at_source: None,
271			best_finalized_source_header_id_at_best_target: None,
272			best_target_header_id: None,
273			best_finalized_target_header_id: None,
274			nonces_to_submit: None,
275			nonces_to_submit_batch: None,
276			nonces_submitted: None,
277		}
278	}
279}
280
281impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId>
282	for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
283where
284	SourceHeaderId: Clone + Send + Sync,
285	TargetHeaderId: Clone + Send + Sync,
286	Proof: Clone + Send + Sync,
287	BatchTx: Clone + Send + Sync,
288{
289	fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) {
290		self.best_finalized_source_header_id_at_best_target = Some(id);
291	}
292
293	fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId> {
294		self.best_finalized_source_header_id_at_best_target.clone()
295	}
296
297	fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>> {
298		self.nonces_to_submit.clone().map(|(_, nonces, _)| nonces)
299	}
300
301	fn reset_nonces_to_submit(&mut self) {
302		self.nonces_to_submit = None;
303		self.nonces_to_submit_batch = None;
304	}
305
306	fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>> {
307		self.nonces_submitted.clone()
308	}
309
310	fn reset_nonces_submitted(&mut self) {
311		self.nonces_submitted = None;
312	}
313}
314
315/// Run race loop until connection with target or source node is lost.
316pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
317	race_source: SC,
318	race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
319	race_target: TC,
320	race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
321	mut strategy: impl RaceStrategy<
322		P::SourceHeaderId,
323		P::TargetHeaderId,
324		P::Proof,
325		SourceNoncesRange = SC::NoncesRange,
326		ProofParameters = SC::ProofParameters,
327		TargetNoncesData = TC::TargetNoncesData,
328	>,
329) -> Result<(), FailedClient> {
330	let mut progress_context = Instant::now();
331	let mut race_state = RaceStateImpl::default();
332
333	let mut source_retry_backoff = retry_backoff();
334	let mut source_client_is_online = true;
335	let mut source_nonces_required = false;
336	let mut source_required_header = None;
337	let source_nonces = futures::future::Fuse::terminated();
338	let source_generate_proof = futures::future::Fuse::terminated();
339	let source_go_offline_future = futures::future::Fuse::terminated();
340
341	let mut target_retry_backoff = retry_backoff();
342	let mut target_client_is_online = true;
343	let mut target_best_nonces_required = false;
344	let mut target_finalized_nonces_required = false;
345	let mut target_batch_transaction = None;
346	let target_require_source_header = futures::future::Fuse::terminated();
347	let target_best_nonces = futures::future::Fuse::terminated();
348	let target_finalized_nonces = futures::future::Fuse::terminated();
349	let target_submit_proof = futures::future::Fuse::terminated();
350	let target_tx_tracker = futures::future::Fuse::terminated();
351	let target_go_offline_future = futures::future::Fuse::terminated();
352
353	futures::pin_mut!(
354		race_source_updated,
355		source_nonces,
356		source_generate_proof,
357		source_go_offline_future,
358		race_target_updated,
359		target_require_source_header,
360		target_best_nonces,
361		target_finalized_nonces,
362		target_submit_proof,
363		target_tx_tracker,
364		target_go_offline_future,
365	);
366
367	loop {
368		futures::select! {
369			// when headers ids are updated
370			source_state = race_source_updated.next() => {
371				if let Some(source_state) = source_state {
372					let is_source_state_updated = race_state.best_finalized_source_header_id_at_source.as_ref()
373						!= Some(&source_state.best_finalized_self);
374					if is_source_state_updated {
375						source_nonces_required = true;
376						race_state.best_finalized_source_header_id_at_source
377							= Some(source_state.best_finalized_self);
378					}
379				}
380			},
381			target_state = race_target_updated.next() => {
382				if let Some(target_state) = target_state {
383					let is_target_best_state_updated = race_state.best_target_header_id.as_ref()
384						!= Some(&target_state.best_self);
385
386					if is_target_best_state_updated {
387						target_best_nonces_required = true;
388						race_state.best_target_header_id = Some(target_state.best_self);
389						race_state.best_finalized_source_header_id_at_best_target
390							= target_state.best_finalized_peer_at_best_self;
391					}
392
393					let is_target_finalized_state_updated = race_state.best_finalized_target_header_id.as_ref()
394						!= Some(&target_state.best_finalized_self);
395					if  is_target_finalized_state_updated {
396						target_finalized_nonces_required = true;
397						race_state.best_finalized_target_header_id = Some(target_state.best_finalized_self);
398					}
399				}
400			},
401
402			// when nonces are updated
403			nonces = source_nonces => {
404				source_nonces_required = false;
405
406				source_client_is_online = process_future_result(
407					nonces,
408					&mut source_retry_backoff,
409					|(at_block, nonces)| {
410						log::debug!(
411							target: "bridge",
412							"Received nonces from {}: {:?}",
413							P::source_name(),
414							nonces,
415						);
416
417						strategy.source_nonces_updated(at_block, nonces);
418					},
419					&mut source_go_offline_future,
420					async_std::task::sleep,
421					|| format!("Error retrieving nonces from {}", P::source_name()),
422				).fail_if_connection_error(FailedClient::Source)?;
423
424				// ask for more headers if we have nonces to deliver and required headers are missing
425				source_required_header = strategy
426					.required_source_header_at_target(race_state.clone())
427					.await;
428			},
429			nonces = target_best_nonces => {
430				target_best_nonces_required = false;
431
432				target_client_is_online = process_future_result(
433					nonces,
434					&mut target_retry_backoff,
435					|(_, nonces)| {
436						log::debug!(
437							target: "bridge",
438							"Received best nonces from {}: {:?}",
439							P::target_name(),
440							nonces,
441						);
442
443						strategy.best_target_nonces_updated(nonces, &mut race_state);
444					},
445					&mut target_go_offline_future,
446					async_std::task::sleep,
447					|| format!("Error retrieving best nonces from {}", P::target_name()),
448				).fail_if_connection_error(FailedClient::Target)?;
449			},
450			nonces = target_finalized_nonces => {
451				target_finalized_nonces_required = false;
452
453				target_client_is_online = process_future_result(
454					nonces,
455					&mut target_retry_backoff,
456					|(_, nonces)| {
457						log::debug!(
458							target: "bridge",
459							"Received finalized nonces from {}: {:?}",
460							P::target_name(),
461							nonces,
462						);
463
464						strategy.finalized_target_nonces_updated(nonces, &mut race_state);
465					},
466					&mut target_go_offline_future,
467					async_std::task::sleep,
468					|| format!("Error retrieving finalized nonces from {}", P::target_name()),
469				).fail_if_connection_error(FailedClient::Target)?;
470			},
471
472			// proof generation and submission
473			maybe_batch_transaction = target_require_source_header => {
474				source_required_header = None;
475
476				target_client_is_online = process_future_result(
477					maybe_batch_transaction,
478					&mut target_retry_backoff,
479					|maybe_batch_transaction: Option<TC::BatchTransaction>| {
480						log::debug!(
481							target: "bridge",
482							"Target {} client has been asked for more {} headers. Batch tx: {}",
483							P::target_name(),
484							P::source_name(),
485							maybe_batch_transaction
486								.as_ref()
487								.map(|bt| format!("yes ({:?})", bt.required_header_id()))
488								.unwrap_or_else(|| "no".into()),
489						);
490
491						target_batch_transaction = maybe_batch_transaction;
492					},
493					&mut target_go_offline_future,
494					async_std::task::sleep,
495					|| format!("Error asking for source headers at {}", P::target_name()),
496				).fail_if_connection_error(FailedClient::Target)?;
497			},
498			proof = source_generate_proof => {
499				source_client_is_online = process_future_result(
500					proof,
501					&mut source_retry_backoff,
502					|(at_block, nonces_range, proof, batch_transaction)| {
503						log::debug!(
504							target: "bridge",
505							"Received proof for nonces in range {:?} from {}",
506							nonces_range,
507							P::source_name(),
508						);
509
510						race_state.nonces_to_submit = Some((at_block, nonces_range, proof));
511						race_state.nonces_to_submit_batch = batch_transaction;
512					},
513					&mut source_go_offline_future,
514					async_std::task::sleep,
515					|| format!("Error generating proof at {}", P::source_name()),
516				).fail_if_error(FailedClient::Source).map(|_| true)?;
517			},
518			proof_submit_result = target_submit_proof => {
519				target_client_is_online = process_future_result(
520					proof_submit_result,
521					&mut target_retry_backoff,
522					|artifacts: NoncesSubmitArtifacts<TC::TransactionTracker>| {
523						log::debug!(
524							target: "bridge",
525							"Successfully submitted proof of nonces {:?} to {}",
526							artifacts.nonces,
527							P::target_name(),
528						);
529
530						race_state.nonces_submitted = Some(artifacts.nonces);
531						target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
532					},
533					&mut target_go_offline_future,
534					async_std::task::sleep,
535					|| format!("Error submitting proof {}", P::target_name()),
536				).fail_if_connection_error(FailedClient::Target)?;
537
538				// in any case - we don't need to retry submitting the same nonces again until
539				// we read nonces from the target client
540				race_state.reset_nonces_to_submit();
541				// if we have failed to submit transaction AND that is not the connection issue,
542				// then we need to read best target nonces before selecting nonces again
543				if !target_client_is_online {
544					strategy.reset_best_target_nonces();
545				}
546			},
547			target_transaction_status = target_tx_tracker => {
548				match (target_transaction_status, race_state.nonces_submitted.as_ref()) {
549					(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
550						// our transaction has been mined, but was it successful or not? let's check the best
551						// nonce at the target node.
552						let _ = race_target.nonces(at_block, false)
553							.await
554							.map_err(|e| format!("failed to read nonces from target node: {e:?}"))
555							.and_then(|(_, nonces_at_target)| {
556								if nonces_at_target.latest_nonce < *nonces_submitted.end() {
557									Err(format!(
558										"best nonce at target after tx is {:?} and we've submitted {:?}",
559										nonces_at_target.latest_nonce,
560										nonces_submitted.end(),
561									))
562								} else {
563									Ok(())
564								}
565							})
566							.map_err(|e| {
567								log::error!(
568									target: "bridge",
569									"{} -> {} race transaction failed: {}",
570									P::source_name(),
571									P::target_name(),
572									e,
573								);
574
575								race_state.reset_nonces_submitted();
576							});
577					},
578					(TrackedTransactionStatus::Lost, _) => {
579						log::warn!(
580							target: "bridge",
581							"{} -> {} race transaction has been lost. State: {:?}. Strategy: {:?}",
582							P::source_name(),
583							P::target_name(),
584							race_state,
585							strategy,
586						);
587
588						race_state.reset_nonces_submitted();
589					},
590					_ => (),
591				}
592			},
593
594			// when we're ready to retry request
595			_ = source_go_offline_future => {
596				source_client_is_online = true;
597			},
598			_ = target_go_offline_future => {
599				target_client_is_online = true;
600			},
601		}
602
603		progress_context = print_race_progress::<P, _>(progress_context, &strategy);
604
605		if source_client_is_online {
606			source_client_is_online = false;
607
608			// if we've started to submit batch transaction, let's prioritize it
609			//
610			// we're using `take` here, because we don't need batch transaction (i.e. some
611			// underlying finality proof) anymore for our future calls - we were unable to
612			// use it for our current state, so why would we need to keep an obsolete proof
613			// for the future?
614			let target_batch_transaction = target_batch_transaction.take();
615			let expected_race_state =
616				if let Some(ref target_batch_transaction) = target_batch_transaction {
617					// when selecting nonces for the batch transaction, we assume that the required
618					// source header is already at the target chain
619					let required_source_header_at_target =
620						target_batch_transaction.required_header_id();
621					let mut expected_race_state = race_state.clone();
622					expected_race_state.best_finalized_source_header_id_at_best_target =
623						Some(required_source_header_at_target);
624					expected_race_state
625				} else {
626					race_state.clone()
627				};
628
629			let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await;
630			let best_at_source = strategy.best_at_source();
631
632			if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
633				log::debug!(
634					target: "bridge",
635					"Asking {} to prove nonces in range {:?} at block {:?}",
636					P::source_name(),
637					nonces_range,
638					at_block,
639				);
640
641				source_generate_proof.set(
642					race_source
643						.generate_proof(at_block, nonces_range, proof_parameters)
644						.and_then(|(at_source_block, nonces, proof)| async {
645							Ok((at_source_block, nonces, proof, target_batch_transaction))
646						})
647						.fuse(),
648				);
649			} else if let (true, Some(best_at_source)) = (source_nonces_required, best_at_source) {
650				log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
651				let at_block = race_state
652					.best_finalized_source_header_id_at_source
653					.as_ref()
654					.expect(
655						"source_nonces_required is only true when\
656						best_finalized_source_header_id_at_source is Some; qed",
657					)
658					.clone();
659				source_nonces.set(race_source.nonces(at_block, best_at_source).fuse());
660			} else {
661				source_client_is_online = true;
662			}
663		}
664
665		if target_client_is_online {
666			target_client_is_online = false;
667
668			if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() {
669				log::debug!(
670					target: "bridge",
671					"Going to submit proof of messages in range {:?} to {} node{}",
672					nonces_range,
673					P::target_name(),
674					race_state.nonces_to_submit_batch.as_ref().map(|tx| format!(
675						". This transaction is batched with sending the proof for header {:?}.",
676						tx.required_header_id())
677					).unwrap_or_default(),
678				);
679
680				target_submit_proof.set(
681					race_target
682						.submit_proof(
683							race_state.nonces_to_submit_batch.clone(),
684							at_block.clone(),
685							nonces_range.clone(),
686							proof.clone(),
687						)
688						.fuse(),
689				);
690			} else if let Some(source_required_header) = source_required_header.clone() {
691				log::debug!(
692					target: "bridge",
693					"Going to require {} header {:?} at {}",
694					P::source_name(),
695					source_required_header,
696					P::target_name(),
697				);
698				target_require_source_header
699					.set(race_target.require_source_header(source_required_header).fuse());
700			} else if target_best_nonces_required {
701				log::debug!(target: "bridge", "Asking {} about best message nonces", P::target_name());
702				let at_block = race_state
703					.best_target_header_id
704					.as_ref()
705					.expect("target_best_nonces_required is only true when best_target_header_id is Some; qed")
706					.clone();
707				target_best_nonces.set(race_target.nonces(at_block, false).fuse());
708			} else if target_finalized_nonces_required {
709				log::debug!(target: "bridge", "Asking {} about finalized message nonces", P::target_name());
710				let at_block = race_state
711					.best_finalized_target_header_id
712					.as_ref()
713					.expect(
714						"target_finalized_nonces_required is only true when\
715						best_finalized_target_header_id is Some; qed",
716					)
717					.clone();
718				target_finalized_nonces.set(race_target.nonces(at_block, true).fuse());
719			} else {
720				target_client_is_online = true;
721			}
722		}
723	}
724}
725
726/// Print race progress.
727fn print_race_progress<P, S>(prev_time: Instant, strategy: &S) -> Instant
728where
729	P: MessageRace,
730	S: RaceStrategy<P::SourceHeaderId, P::TargetHeaderId, P::Proof>,
731{
732	let now_time = Instant::now();
733
734	let need_update = now_time.saturating_duration_since(prev_time) > Duration::from_secs(10);
735	if !need_update {
736		return prev_time
737	}
738
739	let now_best_nonce_at_source = strategy.best_at_source();
740	let now_best_nonce_at_target = strategy.best_at_target();
741	log::info!(
742		target: "bridge",
743		"Synced {:?} of {:?} nonces in {} -> {} race",
744		now_best_nonce_at_target,
745		now_best_nonce_at_source,
746		P::source_name(),
747		P::target_name(),
748	);
749	now_time
750}
751
752async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>(
753	race_state: impl RaceState<SourceHeaderId, TargetHeaderId>,
754	strategy: &Strategy,
755) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)>
756where
757	SourceHeaderId: Clone,
758	Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>,
759{
760	let best_finalized_source_header_id_at_best_target =
761		race_state.best_finalized_source_header_id_at_best_target()?;
762	strategy
763		.select_nonces_to_deliver(race_state)
764		.await
765		.map(|(nonces_range, proof_parameters)| {
766			(best_finalized_source_header_id_at_best_target, nonces_range, proof_parameters)
767		})
768}
769
770#[cfg(test)]
771mod tests {
772	use super::*;
773	use crate::message_race_strategy::BasicStrategy;
774	use relay_utils::HeaderId;
775
776	#[async_std::test]
777	async fn proof_is_generated_at_best_block_known_to_target_node() {
778		const GENERATED_AT: u64 = 6;
779		const BEST_AT_SOURCE: u64 = 10;
780		const BEST_AT_TARGET: u64 = 8;
781
782		// target node only knows about source' BEST_AT_TARGET block
783		// source node has BEST_AT_SOURCE > BEST_AT_TARGET block
784		let mut race_state = RaceStateImpl::<_, _, (), ()> {
785			best_finalized_source_header_id_at_source: Some(HeaderId(
786				BEST_AT_SOURCE,
787				BEST_AT_SOURCE,
788			)),
789			best_finalized_source_header_id_at_best_target: Some(HeaderId(
790				BEST_AT_TARGET,
791				BEST_AT_TARGET,
792			)),
793			best_target_header_id: Some(HeaderId(0, 0)),
794			best_finalized_target_header_id: Some(HeaderId(0, 0)),
795			nonces_to_submit: None,
796			nonces_to_submit_batch: None,
797			nonces_submitted: None,
798		};
799
800		// we have some nonces to deliver and they're generated at GENERATED_AT < BEST_AT_SOURCE
801		let mut strategy = BasicStrategy::<_, _, _, _, _, ()>::new();
802		strategy.source_nonces_updated(
803			HeaderId(GENERATED_AT, GENERATED_AT),
804			SourceClientNonces { new_nonces: 0..=10, confirmed_nonce: None },
805		);
806		strategy.best_target_nonces_updated(
807			TargetClientNonces { latest_nonce: 5u64, nonces_data: () },
808			&mut race_state,
809		);
810
811		// the proof will be generated on source, but using BEST_AT_TARGET block
812		assert_eq!(
813			select_nonces_to_deliver(race_state, &strategy).await,
814			Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),))
815		);
816	}
817}