use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifacts};
use async_trait::async_trait;
use bp_messages::MessageNonce;
use futures::{
future::{FutureExt, TryFutureExt},
stream::{FusedStream, StreamExt},
};
use relay_utils::{
process_future_result, retry_backoff, FailedClient, MaybeConnectionError,
TrackedTransactionStatus, TransactionTracker,
};
use std::{
fmt::Debug,
ops::RangeInclusive,
time::{Duration, Instant},
};
pub trait MessageRace {
type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync;
type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync;
type MessageNonce: Debug + Clone;
type Proof: Debug + Clone + Send + Sync;
fn source_name() -> String;
fn target_name() -> String;
}
type SourceClientState<P> =
ClientState<<P as MessageRace>::SourceHeaderId, <P as MessageRace>::TargetHeaderId>;
type TargetClientState<P> =
ClientState<<P as MessageRace>::TargetHeaderId, <P as MessageRace>::SourceHeaderId>;
pub trait NoncesRange: Debug + Sized {
fn begin(&self) -> MessageNonce;
fn end(&self) -> MessageNonce;
fn greater_than(self, nonce: MessageNonce) -> Option<Self>;
}
#[derive(Debug, Clone)]
pub struct SourceClientNonces<NoncesRange> {
pub new_nonces: NoncesRange,
pub confirmed_nonce: Option<MessageNonce>,
}
#[derive(Debug, Clone)]
pub struct TargetClientNonces<TargetNoncesData> {
pub latest_nonce: MessageNonce,
pub nonces_data: TargetNoncesData,
}
#[async_trait]
pub trait SourceClient<P: MessageRace> {
type Error: std::fmt::Debug + MaybeConnectionError;
type NoncesRange: NoncesRange;
type ProofParameters;
async fn nonces(
&self,
at_block: P::SourceHeaderId,
prev_latest_nonce: MessageNonce,
) -> Result<(P::SourceHeaderId, SourceClientNonces<Self::NoncesRange>), Self::Error>;
async fn generate_proof(
&self,
at_block: P::SourceHeaderId,
nonces: RangeInclusive<MessageNonce>,
proof_parameters: Self::ProofParameters,
) -> Result<(P::SourceHeaderId, RangeInclusive<MessageNonce>, P::Proof), Self::Error>;
}
#[async_trait]
pub trait TargetClient<P: MessageRace> {
type Error: std::fmt::Debug + MaybeConnectionError;
type TargetNoncesData: std::fmt::Debug;
type BatchTransaction: BatchTransaction<P::SourceHeaderId> + Clone;
type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>;
async fn require_source_header(
&self,
id: P::SourceHeaderId,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
async fn nonces(
&self,
at_block: P::TargetHeaderId,
update_metrics: bool,
) -> Result<(P::TargetHeaderId, TargetClientNonces<Self::TargetNoncesData>), Self::Error>;
async fn submit_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
generated_at_block: P::SourceHeaderId,
nonces: RangeInclusive<MessageNonce>,
proof: P::Proof,
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
}
#[async_trait]
pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
type SourceNoncesRange: NoncesRange;
type ProofParameters;
type TargetNoncesData;
fn is_empty(&self) -> bool;
async fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
&self,
race_state: RS,
) -> Option<SourceHeaderId>;
fn best_at_source(&self) -> Option<MessageNonce>;
fn best_at_target(&self) -> Option<MessageNonce>;
fn source_nonces_updated(
&mut self,
at_block: SourceHeaderId,
nonces: SourceClientNonces<Self::SourceNoncesRange>,
);
fn reset_best_target_nonces(&mut self);
fn best_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
&mut self,
nonces: TargetClientNonces<Self::TargetNoncesData>,
race_state: &mut RS,
);
fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
&mut self,
nonces: TargetClientNonces<Self::TargetNoncesData>,
race_state: &mut RS,
);
async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
&self,
race_state: RS,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
}
pub trait RaceState<SourceHeaderId, TargetHeaderId>: Clone + Send + Sync {
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId);
fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId>;
fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>>;
fn reset_nonces_to_submit(&mut self);
fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>>;
fn reset_nonces_submitted(&mut self);
}
#[derive(Debug, Clone)]
pub(crate) struct RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> {
pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>,
pub best_finalized_source_header_id_at_best_target: Option<SourceHeaderId>,
pub best_target_header_id: Option<TargetHeaderId>,
pub best_finalized_target_header_id: Option<TargetHeaderId>,
pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
pub nonces_to_submit_batch: Option<BatchTx>,
pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
}
impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default
for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
{
fn default() -> Self {
RaceStateImpl {
best_finalized_source_header_id_at_source: None,
best_finalized_source_header_id_at_best_target: None,
best_target_header_id: None,
best_finalized_target_header_id: None,
nonces_to_submit: None,
nonces_to_submit_batch: None,
nonces_submitted: None,
}
}
}
impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId>
for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
where
SourceHeaderId: Clone + Send + Sync,
TargetHeaderId: Clone + Send + Sync,
Proof: Clone + Send + Sync,
BatchTx: Clone + Send + Sync,
{
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) {
self.best_finalized_source_header_id_at_best_target = Some(id);
}
fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId> {
self.best_finalized_source_header_id_at_best_target.clone()
}
fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>> {
self.nonces_to_submit.clone().map(|(_, nonces, _)| nonces)
}
fn reset_nonces_to_submit(&mut self) {
self.nonces_to_submit = None;
self.nonces_to_submit_batch = None;
}
fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>> {
self.nonces_submitted.clone()
}
fn reset_nonces_submitted(&mut self) {
self.nonces_submitted = None;
}
}
pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_source: SC,
race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
race_target: TC,
race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
mut strategy: impl RaceStrategy<
P::SourceHeaderId,
P::TargetHeaderId,
P::Proof,
SourceNoncesRange = SC::NoncesRange,
ProofParameters = SC::ProofParameters,
TargetNoncesData = TC::TargetNoncesData,
>,
) -> Result<(), FailedClient> {
let mut progress_context = Instant::now();
let mut race_state = RaceStateImpl::default();
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = true;
let mut source_nonces_required = false;
let mut source_required_header = None;
let source_nonces = futures::future::Fuse::terminated();
let source_generate_proof = futures::future::Fuse::terminated();
let source_go_offline_future = futures::future::Fuse::terminated();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = true;
let mut target_best_nonces_required = false;
let mut target_finalized_nonces_required = false;
let mut target_batch_transaction = None;
let target_require_source_header = futures::future::Fuse::terminated();
let target_best_nonces = futures::future::Fuse::terminated();
let target_finalized_nonces = futures::future::Fuse::terminated();
let target_submit_proof = futures::future::Fuse::terminated();
let target_tx_tracker = futures::future::Fuse::terminated();
let target_go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!(
race_source_updated,
source_nonces,
source_generate_proof,
source_go_offline_future,
race_target_updated,
target_require_source_header,
target_best_nonces,
target_finalized_nonces,
target_submit_proof,
target_tx_tracker,
target_go_offline_future,
);
loop {
futures::select! {
source_state = race_source_updated.next() => {
if let Some(source_state) = source_state {
let is_source_state_updated = race_state.best_finalized_source_header_id_at_source.as_ref()
!= Some(&source_state.best_finalized_self);
if is_source_state_updated {
source_nonces_required = true;
race_state.best_finalized_source_header_id_at_source
= Some(source_state.best_finalized_self);
}
}
},
target_state = race_target_updated.next() => {
if let Some(target_state) = target_state {
let is_target_best_state_updated = race_state.best_target_header_id.as_ref()
!= Some(&target_state.best_self);
if is_target_best_state_updated {
target_best_nonces_required = true;
race_state.best_target_header_id = Some(target_state.best_self);
race_state.best_finalized_source_header_id_at_best_target
= target_state.best_finalized_peer_at_best_self;
}
let is_target_finalized_state_updated = race_state.best_finalized_target_header_id.as_ref()
!= Some(&target_state.best_finalized_self);
if is_target_finalized_state_updated {
target_finalized_nonces_required = true;
race_state.best_finalized_target_header_id = Some(target_state.best_finalized_self);
}
}
},
nonces = source_nonces => {
source_nonces_required = false;
source_client_is_online = process_future_result(
nonces,
&mut source_retry_backoff,
|(at_block, nonces)| {
log::debug!(
target: "bridge",
"Received nonces from {}: {:?}",
P::source_name(),
nonces,
);
strategy.source_nonces_updated(at_block, nonces);
},
&mut source_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving nonces from {}", P::source_name()),
).fail_if_connection_error(FailedClient::Source)?;
source_required_header = strategy
.required_source_header_at_target(race_state.clone())
.await;
},
nonces = target_best_nonces => {
target_best_nonces_required = false;
target_client_is_online = process_future_result(
nonces,
&mut target_retry_backoff,
|(_, nonces)| {
log::debug!(
target: "bridge",
"Received best nonces from {}: {:?}",
P::target_name(),
nonces,
);
strategy.best_target_nonces_updated(nonces, &mut race_state);
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving best nonces from {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
},
nonces = target_finalized_nonces => {
target_finalized_nonces_required = false;
target_client_is_online = process_future_result(
nonces,
&mut target_retry_backoff,
|(_, nonces)| {
log::debug!(
target: "bridge",
"Received finalized nonces from {}: {:?}",
P::target_name(),
nonces,
);
strategy.finalized_target_nonces_updated(nonces, &mut race_state);
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving finalized nonces from {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
},
maybe_batch_transaction = target_require_source_header => {
source_required_header = None;
target_client_is_online = process_future_result(
maybe_batch_transaction,
&mut target_retry_backoff,
|maybe_batch_transaction: Option<TC::BatchTransaction>| {
log::debug!(
target: "bridge",
"Target {} client has been asked for more {} headers. Batch tx: {}",
P::target_name(),
P::source_name(),
maybe_batch_transaction
.as_ref()
.map(|bt| format!("yes ({:?})", bt.required_header_id()))
.unwrap_or_else(|| "no".into()),
);
target_batch_transaction = maybe_batch_transaction;
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error asking for source headers at {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
},
proof = source_generate_proof => {
source_client_is_online = process_future_result(
proof,
&mut source_retry_backoff,
|(at_block, nonces_range, proof, batch_transaction)| {
log::debug!(
target: "bridge",
"Received proof for nonces in range {:?} from {}",
nonces_range,
P::source_name(),
);
race_state.nonces_to_submit = Some((at_block, nonces_range, proof));
race_state.nonces_to_submit_batch = batch_transaction;
},
&mut source_go_offline_future,
async_std::task::sleep,
|| format!("Error generating proof at {}", P::source_name()),
).fail_if_error(FailedClient::Source).map(|_| true)?;
},
proof_submit_result = target_submit_proof => {
target_client_is_online = process_future_result(
proof_submit_result,
&mut target_retry_backoff,
|artifacts: NoncesSubmitArtifacts<TC::TransactionTracker>| {
log::debug!(
target: "bridge",
"Successfully submitted proof of nonces {:?} to {}",
artifacts.nonces,
P::target_name(),
);
race_state.nonces_submitted = Some(artifacts.nonces);
target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error submitting proof {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
race_state.reset_nonces_to_submit();
if !target_client_is_online {
strategy.reset_best_target_nonces();
}
},
target_transaction_status = target_tx_tracker => {
match (target_transaction_status, race_state.nonces_submitted.as_ref()) {
(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
let _ = race_target.nonces(at_block, false)
.await
.map_err(|e| format!("failed to read nonces from target node: {e:?}"))
.and_then(|(_, nonces_at_target)| {
if nonces_at_target.latest_nonce < *nonces_submitted.end() {
Err(format!(
"best nonce at target after tx is {:?} and we've submitted {:?}",
nonces_at_target.latest_nonce,
nonces_submitted.end(),
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!(
target: "bridge",
"{} -> {} race transaction failed: {}",
P::source_name(),
P::target_name(),
e,
);
race_state.reset_nonces_submitted();
});
},
(TrackedTransactionStatus::Lost, _) => {
log::warn!(
target: "bridge",
"{} -> {} race transaction has been lost. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);
race_state.reset_nonces_submitted();
},
_ => (),
}
},
_ = source_go_offline_future => {
source_client_is_online = true;
},
_ = target_go_offline_future => {
target_client_is_online = true;
},
}
progress_context = print_race_progress::<P, _>(progress_context, &strategy);
if source_client_is_online {
source_client_is_online = false;
let target_batch_transaction = target_batch_transaction.take();
let expected_race_state =
if let Some(ref target_batch_transaction) = target_batch_transaction {
let required_source_header_at_target =
target_batch_transaction.required_header_id();
let mut expected_race_state = race_state.clone();
expected_race_state.best_finalized_source_header_id_at_best_target =
Some(required_source_header_at_target);
expected_race_state
} else {
race_state.clone()
};
let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await;
let best_at_source = strategy.best_at_source();
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
log::debug!(
target: "bridge",
"Asking {} to prove nonces in range {:?} at block {:?}",
P::source_name(),
nonces_range,
at_block,
);
source_generate_proof.set(
race_source
.generate_proof(at_block, nonces_range, proof_parameters)
.and_then(|(at_source_block, nonces, proof)| async {
Ok((at_source_block, nonces, proof, target_batch_transaction))
})
.fuse(),
);
} else if let (true, Some(best_at_source)) = (source_nonces_required, best_at_source) {
log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
let at_block = race_state
.best_finalized_source_header_id_at_source
.as_ref()
.expect(
"source_nonces_required is only true when\
best_finalized_source_header_id_at_source is Some; qed",
)
.clone();
source_nonces.set(race_source.nonces(at_block, best_at_source).fuse());
} else {
source_client_is_online = true;
}
}
if target_client_is_online {
target_client_is_online = false;
if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() {
log::debug!(
target: "bridge",
"Going to submit proof of messages in range {:?} to {} node{}",
nonces_range,
P::target_name(),
race_state.nonces_to_submit_batch.as_ref().map(|tx| format!(
". This transaction is batched with sending the proof for header {:?}.",
tx.required_header_id())
).unwrap_or_default(),
);
target_submit_proof.set(
race_target
.submit_proof(
race_state.nonces_to_submit_batch.clone(),
at_block.clone(),
nonces_range.clone(),
proof.clone(),
)
.fuse(),
);
} else if let Some(source_required_header) = source_required_header.clone() {
log::debug!(
target: "bridge",
"Going to require {} header {:?} at {}",
P::source_name(),
source_required_header,
P::target_name(),
);
target_require_source_header
.set(race_target.require_source_header(source_required_header).fuse());
} else if target_best_nonces_required {
log::debug!(target: "bridge", "Asking {} about best message nonces", P::target_name());
let at_block = race_state
.best_target_header_id
.as_ref()
.expect("target_best_nonces_required is only true when best_target_header_id is Some; qed")
.clone();
target_best_nonces.set(race_target.nonces(at_block, false).fuse());
} else if target_finalized_nonces_required {
log::debug!(target: "bridge", "Asking {} about finalized message nonces", P::target_name());
let at_block = race_state
.best_finalized_target_header_id
.as_ref()
.expect(
"target_finalized_nonces_required is only true when\
best_finalized_target_header_id is Some; qed",
)
.clone();
target_finalized_nonces.set(race_target.nonces(at_block, true).fuse());
} else {
target_client_is_online = true;
}
}
}
}
fn print_race_progress<P, S>(prev_time: Instant, strategy: &S) -> Instant
where
P: MessageRace,
S: RaceStrategy<P::SourceHeaderId, P::TargetHeaderId, P::Proof>,
{
let now_time = Instant::now();
let need_update = now_time.saturating_duration_since(prev_time) > Duration::from_secs(10);
if !need_update {
return prev_time
}
let now_best_nonce_at_source = strategy.best_at_source();
let now_best_nonce_at_target = strategy.best_at_target();
log::info!(
target: "bridge",
"Synced {:?} of {:?} nonces in {} -> {} race",
now_best_nonce_at_target,
now_best_nonce_at_source,
P::source_name(),
P::target_name(),
);
now_time
}
async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>(
race_state: impl RaceState<SourceHeaderId, TargetHeaderId>,
strategy: &Strategy,
) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)>
where
SourceHeaderId: Clone,
Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>,
{
let best_finalized_source_header_id_at_best_target =
race_state.best_finalized_source_header_id_at_best_target()?;
strategy
.select_nonces_to_deliver(race_state)
.await
.map(|(nonces_range, proof_parameters)| {
(best_finalized_source_header_id_at_best_target, nonces_range, proof_parameters)
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message_race_strategy::BasicStrategy;
use relay_utils::HeaderId;
#[async_std::test]
async fn proof_is_generated_at_best_block_known_to_target_node() {
const GENERATED_AT: u64 = 6;
const BEST_AT_SOURCE: u64 = 10;
const BEST_AT_TARGET: u64 = 8;
let mut race_state = RaceStateImpl::<_, _, (), ()> {
best_finalized_source_header_id_at_source: Some(HeaderId(
BEST_AT_SOURCE,
BEST_AT_SOURCE,
)),
best_finalized_source_header_id_at_best_target: Some(HeaderId(
BEST_AT_TARGET,
BEST_AT_TARGET,
)),
best_target_header_id: Some(HeaderId(0, 0)),
best_finalized_target_header_id: Some(HeaderId(0, 0)),
nonces_to_submit: None,
nonces_to_submit_batch: None,
nonces_submitted: None,
};
let mut strategy = BasicStrategy::<_, _, _, _, _, ()>::new();
strategy.source_nonces_updated(
HeaderId(GENERATED_AT, GENERATED_AT),
SourceClientNonces { new_nonces: 0..=10, confirmed_nonce: None },
);
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 5u64, nonces_data: () },
&mut race_state,
);
assert_eq!(
select_nonces_to_deliver(race_state, &strategy).await,
Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),))
);
}
}