use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
use async_trait::async_trait;
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
use relay_utils::{
interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient,
retry_backoff, FailedClient, TransactionTracker,
};
use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_race_delivery::run as run_message_delivery_race,
message_race_receiving::run as run_message_receiving_race,
metrics::{Labeled, MessageLaneLoopMetrics},
};
#[derive(Debug, Clone)]
pub struct Params<LaneId> {
pub lane: LaneId,
pub source_tick: Duration,
pub target_tick: Duration,
pub reconnect_delay: Duration,
pub delivery_params: MessageDeliveryParams,
}
#[derive(Debug, Clone)]
pub struct MessageDeliveryParams {
pub max_unrewarded_relayer_entries_at_target: MessageNonce,
pub max_unconfirmed_nonces_at_target: MessageNonce,
pub max_messages_in_single_batch: MessageNonce,
pub max_messages_weight_in_single_batch: Weight,
pub max_messages_size_in_single_batch: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MessageDetails<SourceChainBalance> {
pub dispatch_weight: Weight,
pub size: u32,
pub reward: SourceChainBalance,
}
pub type MessageDetailsMap<SourceChainBalance> =
BTreeMap<MessageNonce, MessageDetails<SourceChainBalance>>;
#[derive(Debug, PartialEq, Eq)]
pub struct MessageProofParameters {
pub outbound_state_proof_required: bool,
pub dispatch_weight: Weight,
}
pub struct NoncesSubmitArtifacts<T> {
pub nonces: RangeInclusive<MessageNonce>,
pub tx_tracker: T,
}
pub trait BatchTransaction<HeaderId>: Debug + Send + Sync {
fn required_header_id(&self) -> HeaderId;
}
#[async_trait]
pub trait SourceClient<P: MessageLane>: RelayClient {
type BatchTransaction: BatchTransaction<TargetHeaderIdOf<P>> + Clone;
type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>;
async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
async fn generated_message_details(
&self,
id: SourceHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>,
) -> Result<MessageDetailsMap<P::SourceChainBalance>, Self::Error>;
async fn prove_messages(
&self,
id: SourceHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>,
proof_parameters: MessageProofParameters,
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), Self::Error>;
async fn submit_messages_receiving_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<Self::TransactionTracker, Self::Error>;
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
}
#[async_trait]
pub trait TargetClient<P: MessageLane>: RelayClient {
type BatchTransaction: BatchTransaction<SourceHeaderIdOf<P>> + Clone;
type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>;
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
async fn latest_received_nonce(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, MessageNonce), Self::Error>;
async fn unrewarded_relayers_state(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, UnrewardedRelayersState), Self::Error>;
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessagesReceivingProof), Self::Error>;
async fn submit_messages_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
generated_at_header: SourceHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof,
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ClientState<SelfHeaderId, PeerHeaderId> {
pub best_self: SelfHeaderId,
pub best_finalized_self: SelfHeaderId,
pub best_finalized_peer_at_best_self: Option<PeerHeaderId>,
pub actual_best_finalized_peer_at_best_self: Option<PeerHeaderId>,
}
pub type SourceClientState<P> = ClientState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>;
pub type TargetClientState<P> = ClientState<TargetHeaderIdOf<P>, SourceHeaderIdOf<P>>;
#[derive(Debug, Default)]
pub struct ClientsState<P: MessageLane> {
pub source: Option<SourceClientState<P>>,
pub target: Option<TargetClientState<P>>,
}
pub fn metrics_prefix<P: MessageLane>(lane: &P::LaneId) -> String {
format!("{}_to_{}_MessageLane_{}", P::SOURCE_NAME, P::TARGET_NAME, lane.label())
}
pub async fn run<P: MessageLane>(
params: Params<P::LaneId>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<(), relay_utils::Error> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
.with_metrics(metrics_params)
.loop_metric(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<P>(¶ms.lane)))?)?
.expose()
.await?
.run(metrics_prefix::<P>(¶ms.lane), move |source_client, target_client, metrics| {
run_until_connection_lost(
params.clone(),
source_client,
target_client,
metrics,
exit_signal.clone(),
)
})
.await
}
async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
params: Params<P::LaneId>,
source_client: SC,
target_client: TC,
metrics_msg: Option<MessageLaneLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
let mut source_state_required = true;
let source_state = source_client.state().fuse();
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(params.source_tick).fuse();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = false;
let mut target_state_required = true;
let target_state = target_client.state().fuse();
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(params.target_tick).fuse();
let (
(delivery_source_state_sender, delivery_source_state_receiver),
(delivery_target_state_sender, delivery_target_state_receiver),
) = (unbounded(), unbounded());
let delivery_race_loop = run_message_delivery_race(
source_client.clone(),
delivery_source_state_receiver,
target_client.clone(),
delivery_target_state_receiver,
metrics_msg.clone(),
params.delivery_params,
)
.fuse();
let (
(receiving_source_state_sender, receiving_source_state_receiver),
(receiving_target_state_sender, receiving_target_state_receiver),
) = (unbounded(), unbounded());
let receiving_race_loop = run_message_receiving_race(
source_client.clone(),
receiving_source_state_receiver,
target_client.clone(),
receiving_target_state_receiver,
metrics_msg.clone(),
)
.fuse();
let exit_signal = exit_signal.fuse();
futures::pin_mut!(
source_state,
source_go_offline_future,
source_tick_stream,
target_state,
target_go_offline_future,
target_tick_stream,
delivery_race_loop,
receiving_race_loop,
exit_signal
);
loop {
futures::select! {
new_source_state = source_state => {
source_state_required = false;
source_client_is_online = process_future_result(
new_source_state,
&mut source_retry_backoff,
|new_source_state| {
log::debug!(
target: "bridge",
"Received state from {} node: {:?}",
P::SOURCE_NAME,
new_source_state,
);
let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_source_state::<P>(new_source_state);
}
},
&mut source_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving state from {} node", P::SOURCE_NAME),
).fail_if_connection_error(FailedClient::Source)?;
},
_ = source_go_offline_future => {
source_client_is_online = true;
},
_ = source_tick_stream.next() => {
source_state_required = true;
},
new_target_state = target_state => {
target_state_required = false;
target_client_is_online = process_future_result(
new_target_state,
&mut target_retry_backoff,
|new_target_state| {
log::debug!(
target: "bridge",
"Received state from {} node: {:?}",
P::TARGET_NAME,
new_target_state,
);
let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_target_state::<P>(new_target_state);
}
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving state from {} node", P::TARGET_NAME),
).fail_if_connection_error(FailedClient::Target)?;
},
_ = target_go_offline_future => {
target_client_is_online = true;
},
_ = target_tick_stream.next() => {
target_state_required = true;
},
delivery_error = delivery_race_loop => {
match delivery_error {
Ok(_) => unreachable!("only ends with error; qed"),
Err(err) => return Err(err),
}
},
receiving_error = receiving_race_loop => {
match receiving_error {
Ok(_) => unreachable!("only ends with error; qed"),
Err(err) => return Err(err),
}
},
() = exit_signal => {
return Ok(());
}
}
if source_client_is_online && source_state_required {
log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
source_state.set(source_client.state().fuse());
source_client_is_online = false;
}
if target_client_is_online && target_state_required {
log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME);
target_state.set(target_client.state().fuse());
target_client_is_online = false;
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;
use bp_messages::{HashedLaneId, LaneIdType, LegacyLaneId};
use futures::stream::StreamExt;
use parking_lot::Mutex;
use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus};
use super::*;
pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId {
HeaderId(number, number)
}
pub type TestSourceChainBalance = u64;
pub type TestSourceHeaderId = HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash>;
pub type TestTargetHeaderId = HeaderId<TestTargetHeaderNumber, TestTargetHeaderHash>;
pub type TestMessagesProof = (RangeInclusive<MessageNonce>, Option<MessageNonce>);
pub type TestMessagesReceivingProof = MessageNonce;
pub type TestSourceHeaderNumber = u64;
pub type TestSourceHeaderHash = u64;
pub type TestTargetHeaderNumber = u64;
pub type TestTargetHeaderHash = u64;
#[derive(Debug)]
pub struct TestError;
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
true
}
}
pub type TestLaneIdType = HashedLaneId;
#[derive(Clone)]
pub struct TestMessageLane;
impl MessageLane for TestMessageLane {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type MessagesProof = TestMessagesProof;
type MessagesReceivingProof = TestMessagesReceivingProof;
type SourceChainBalance = TestSourceChainBalance;
type SourceHeaderNumber = TestSourceHeaderNumber;
type SourceHeaderHash = TestSourceHeaderHash;
type TargetHeaderNumber = TestTargetHeaderNumber;
type TargetHeaderHash = TestTargetHeaderHash;
type LaneId = TestLaneIdType;
}
#[derive(Clone, Debug)]
pub struct TestMessagesBatchTransaction {
required_header_id: TestSourceHeaderId,
}
#[async_trait]
impl BatchTransaction<TestSourceHeaderId> for TestMessagesBatchTransaction {
fn required_header_id(&self) -> TestSourceHeaderId {
self.required_header_id
}
}
#[derive(Clone, Debug)]
pub struct TestConfirmationBatchTransaction {
required_header_id: TestTargetHeaderId,
}
#[async_trait]
impl BatchTransaction<TestTargetHeaderId> for TestConfirmationBatchTransaction {
fn required_header_id(&self) -> TestTargetHeaderId {
self.required_header_id
}
}
#[derive(Clone, Debug)]
pub struct TestTransactionTracker(TrackedTransactionStatus<TestTargetHeaderId>);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = TestTargetHeaderId;
async fn wait(self) -> TrackedTransactionStatus<TestTargetHeaderId> {
self.0
}
}
#[derive(Debug, Clone)]
pub struct TestClientData {
is_source_fails: bool,
is_source_reconnected: bool,
source_state: SourceClientState<TestMessageLane>,
source_latest_generated_nonce: MessageNonce,
source_latest_confirmed_received_nonce: MessageNonce,
source_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
is_target_fails: bool,
is_target_reconnected: bool,
target_state: SourceClientState<TestMessageLane>,
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
target_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
submitted_messages_proofs: Vec<TestMessagesProof>,
target_to_source_batch_transaction: Option<TestConfirmationBatchTransaction>,
target_to_source_header_required: Option<TestTargetHeaderId>,
target_to_source_header_requirements: Vec<TestTargetHeaderId>,
source_to_target_batch_transaction: Option<TestMessagesBatchTransaction>,
source_to_target_header_required: Option<TestSourceHeaderId>,
source_to_target_header_requirements: Vec<TestSourceHeaderId>,
}
impl Default for TestClientData {
fn default() -> TestClientData {
TestClientData {
is_source_fails: false,
is_source_reconnected: false,
source_state: Default::default(),
source_latest_generated_nonce: 0,
source_latest_confirmed_received_nonce: 0,
source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_receiving_proofs: Vec::new(),
is_target_fails: false,
is_target_reconnected: false,
target_state: Default::default(),
target_latest_received_nonce: 0,
target_latest_confirmed_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_proofs: Vec::new(),
target_to_source_batch_transaction: None,
target_to_source_header_required: None,
target_to_source_header_requirements: Vec::new(),
source_to_target_batch_transaction: None,
source_to_target_header_required: None,
source_to_target_header_requirements: Vec::new(),
}
}
}
impl TestClientData {
fn receive_messages(
&mut self,
maybe_batch_tx: Option<TestMessagesBatchTransaction>,
proof: TestMessagesProof,
) {
self.target_state.best_self =
HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
self.target_state.best_finalized_self = self.target_state.best_self;
self.target_latest_received_nonce = *proof.0.end();
if let Some(maybe_batch_tx) = maybe_batch_tx {
self.target_state.best_finalized_peer_at_best_self =
Some(maybe_batch_tx.required_header_id());
}
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
self.target_latest_confirmed_received_nonce =
target_latest_confirmed_received_nonce;
}
self.submitted_messages_proofs.push(proof);
}
fn receive_messages_delivery_proof(
&mut self,
maybe_batch_tx: Option<TestConfirmationBatchTransaction>,
proof: TestMessagesReceivingProof,
) {
self.source_state.best_self =
HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
self.source_state.best_finalized_self = self.source_state.best_self;
if let Some(maybe_batch_tx) = maybe_batch_tx {
self.source_state.best_finalized_peer_at_best_self =
Some(maybe_batch_tx.required_header_id());
}
self.submitted_messages_receiving_proofs.push(proof);
self.source_latest_confirmed_received_nonce = proof;
}
}
#[derive(Clone)]
pub struct TestSourceClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
impl Default for TestSourceClient {
fn default() -> Self {
TestSourceClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
{
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_source_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
}
#[async_trait]
impl SourceClient<TestMessageLane> for TestSourceClient {
type BatchTransaction = TestConfirmationBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.source_state.clone())
}
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.source_latest_generated_nonce))
}
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
) -> Result<(SourceHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((id, data.source_latest_confirmed_received_nonce))
}
async fn generated_message_details(
&self,
_id: SourceHeaderIdOf<TestMessageLane>,
nonces: RangeInclusive<MessageNonce>,
) -> Result<MessageDetailsMap<TestSourceChainBalance>, TestError> {
Ok(nonces
.map(|nonce| {
(
nonce,
MessageDetails {
dispatch_weight: Weight::from_parts(1, 0),
size: 1,
reward: 1,
},
)
})
.collect())
}
async fn prove_messages(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
nonces: RangeInclusive<MessageNonce>,
proof_parameters: MessageProofParameters,
) -> Result<
(SourceHeaderIdOf<TestMessageLane>, RangeInclusive<MessageNonce>, TestMessagesProof),
TestError,
> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((
id,
nonces.clone(),
(
nonces,
if proof_parameters.outbound_state_proof_required {
Some(data.source_latest_confirmed_received_nonce)
} else {
None
},
),
))
}
async fn submit_messages_receiving_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
proof: TestMessagesReceivingProof,
) -> Result<Self::TransactionTracker, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
data.receive_messages_delivery_proof(maybe_batch_tx, proof);
(self.post_tick)(&mut data);
Ok(TestTransactionTracker(data.source_tracked_transaction_status))
}
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<Option<Self::BatchTransaction>, Self::Error> {
let mut data = self.data.lock();
data.target_to_source_header_required = Some(id);
data.target_to_source_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.target_to_source_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
#[derive(Clone)]
pub struct TestTargetClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
impl Default for TestTargetClient {
fn default() -> Self {
TestTargetClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
{
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_target_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
}
#[async_trait]
impl TargetClient<TestMessageLane> for TestTargetClient {
type BatchTransaction = TestMessagesBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.target_state.clone())
}
async fn latest_received_nonce(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_received_nonce))
}
async fn unrewarded_relayers_state(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, UnrewardedRelayersState), TestError> {
Ok((
id,
UnrewardedRelayersState {
unrewarded_relayer_entries: 0,
messages_in_oldest_entry: 0,
total_messages: 0,
last_delivered_nonce: 0,
},
))
}
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_confirmed_received_nonce))
}
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessagesReceivingProof), TestError> {
Ok((id, self.data.lock().target_latest_received_nonce))
}
async fn submit_messages_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
nonces: RangeInclusive<MessageNonce>,
proof: TestMessagesProof,
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
data.receive_messages(maybe_batch_tx, proof);
(self.post_tick)(&mut data);
Ok(NoncesSubmitArtifacts {
nonces,
tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
})
}
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
) -> Result<Option<Self::BatchTransaction>, Self::Error> {
let mut data = self.data.lock();
data.source_to_target_header_required = Some(id);
data.source_to_target_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.source_to_target_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
fn run_loop_test(
data: Arc<Mutex<TestClientData>>,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
source_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> TestClientData {
async_std::task::block_on(async {
let source_client = TestSourceClient {
data: data.clone(),
tick: source_tick,
post_tick: source_post_tick,
};
let target_client = TestTargetClient {
data: data.clone(),
tick: target_tick,
post_tick: target_post_tick,
};
let _ = run(
Params {
lane: TestLaneIdType::try_new(1, 2).unwrap(),
source_tick: Duration::from_millis(100),
target_tick: Duration::from_millis(100),
reconnect_delay: Duration::from_millis(0),
delivery_params: MessageDeliveryParams {
max_unrewarded_relayer_entries_at_target: 4,
max_unconfirmed_nonces_at_target: 4,
max_messages_in_single_batch: 4,
max_messages_weight_in_single_batch: Weight::from_parts(4, 0),
max_messages_size_in_single_batch: 4,
},
},
source_client,
target_client,
MetricsParams::disabled(),
exit_signal,
)
.await;
let result = data.lock().clone();
result
})
}
#[test]
fn message_lane_loop_is_able_to_recover_from_connection_errors() {
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
is_source_fails: true,
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 1,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(|data: &mut TestClientData| {
if data.is_source_reconnected {
data.is_source_fails = false;
data.is_target_fails = true;
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.is_target_fails = false;
}
if data.target_state.best_finalized_peer_at_best_self.unwrap().0 < 10 {
data.target_state.best_finalized_peer_at_best_self = Some(HeaderId(
data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
data.target_state.best_finalized_peer_at_best_self.unwrap().0 + 1,
));
}
if !data.submitted_messages_proofs.is_empty() {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
}
#[test]
fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 1,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(move |data: &mut TestClientData| {
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement !=
data.source_state.best_finalized_peer_at_best_self.unwrap()
{
data.source_state.best_finalized_peer_at_best_self =
Some(*last_requirement);
}
}
}),
Arc::new(move |data: &mut TestClientData| {
if data.submitted_messages_receiving_proofs.len() == 1 {
data.source_latest_confirmed_received_nonce = 0;
}
}),
Arc::new(move |data: &mut TestClientData| {
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement !=
data.target_state.best_finalized_peer_at_best_self.unwrap()
{
data.target_state.best_finalized_peer_at_best_self =
Some(*last_requirement);
}
}
if data.source_latest_confirmed_received_nonce == 1 {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(move |data: &mut TestClientData| {
if data.submitted_messages_proofs.len() == 1 {
data.target_latest_received_nonce = 0;
data.target_latest_confirmed_received_nonce = 0;
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs.len(), 2);
assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
}
#[test]
fn message_lane_loop_works() {
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 10,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(|data: &mut TestClientData| {
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
if data.target_to_source_header_required.is_some() {
assert!(
data.source_state.best_finalized_peer_at_best_self.unwrap().0 <
data.target_state.best_self.0
);
data.target_to_source_header_required = None;
}
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement !=
data.source_state.best_finalized_peer_at_best_self.unwrap()
{
data.source_state.best_finalized_peer_at_best_self =
Some(*last_requirement);
}
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
if data.source_to_target_header_required.is_some() {
assert!(
data.target_state.best_finalized_peer_at_best_self.unwrap().0 <
data.source_state.best_self.0
);
data.source_to_target_header_required = None;
}
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement !=
data.target_state.best_finalized_peer_at_best_self.unwrap()
{
data.target_state.best_finalized_peer_at_best_self =
Some(*last_requirement);
}
}
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
#[test]
fn message_lane_loop_works_with_batch_transactions() {
let (exit_sender, exit_receiver) = unbounded();
let original_data = Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 10,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
..Default::default()
}));
let result = run_loop_test(
original_data,
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
if let Some(target_to_source_header_required) =
data.target_to_source_header_required.take()
{
data.target_to_source_batch_transaction =
Some(TestConfirmationBatchTransaction {
required_header_id: target_to_source_header_required,
})
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
if let Some(source_to_target_header_required) =
data.source_to_target_header_required.take()
{
data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction {
required_header_id: source_to_target_header_required,
})
}
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
#[test]
fn metrics_prefix_is_valid() {
assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<TestMessageLane>(
&HashedLaneId::try_new(1, 2).unwrap()
)))
.is_ok());
#[derive(Clone)]
pub struct LegacyTestMessageLane;
impl MessageLane for LegacyTestMessageLane {
const SOURCE_NAME: &'static str = "LegacyTestSource";
const TARGET_NAME: &'static str = "LegacyTestTarget";
type MessagesProof = TestMessagesProof;
type MessagesReceivingProof = TestMessagesReceivingProof;
type SourceChainBalance = TestSourceChainBalance;
type SourceHeaderNumber = TestSourceHeaderNumber;
type SourceHeaderHash = TestSourceHeaderHash;
type TargetHeaderNumber = TestTargetHeaderNumber;
type TargetHeaderHash = TestTargetHeaderHash;
type LaneId = LegacyLaneId;
}
assert!(MessageLaneLoopMetrics::new(Some(&metrics_prefix::<LegacyTestMessageLane>(
&LegacyLaneId([0, 0, 0, 1])
)))
.is_ok());
}
}