use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
use relay_utils::{HeaderId, TrackedTransactionStatus};
use sp_runtime::traits::Header as _;
use std::time::Duration;
#[async_trait]
pub trait Environment<C: Chain>: Send + Sync {
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
}
#[async_trait]
impl<C: Chain, T: crate::client::Client<C>> Environment<C> for T {
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
}
}
pub struct TransactionTracker<C: Chain, E> {
environment: E,
transaction_hash: HashOf<C>,
stall_timeout: Duration,
subscription: Subscription<TransactionStatusOf<C>>,
}
impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
pub fn new(
environment: E,
stall_timeout: Duration,
transaction_hash: HashOf<C>,
subscription: Subscription<TransactionStatusOf<C>>,
) -> Self {
Self { environment, stall_timeout, transaction_hash, subscription }
}
pub fn switch_environment<NewE: Environment<C>>(
self,
environment: NewE,
) -> TransactionTracker<C, NewE> {
TransactionTracker {
environment,
stall_timeout: self.stall_timeout,
transaction_hash: self.transaction_hash,
subscription: self.subscription,
}
}
async fn do_wait(
self,
wait_for_stall_timeout: impl Future<Output = ()>,
wait_for_stall_timeout_rest: impl Future<Output = ()>,
) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
let wait_for_invalidation = watch_transaction_status::<_, C, _>(
self.environment,
self.transaction_hash,
self.subscription,
);
futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation);
match futures::future::select(wait_for_stall_timeout, wait_for_invalidation).await {
Either::Left((_, _)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} is considered lost after timeout (no status response from the node)",
C::NAME,
self.transaction_hash,
);
(TrackedTransactionStatus::Lost, None)
},
Either::Right((invalidation_status, _)) => match invalidation_status {
InvalidationStatus::Finalized(at_block) =>
(TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
InvalidationStatus::Invalid =>
(TrackedTransactionStatus::Lost, Some(invalidation_status)),
InvalidationStatus::Lost => {
wait_for_stall_timeout_rest.await;
log::trace!(
target: "bridge",
"{} transaction {:?} is considered lost after timeout",
C::NAME,
self.transaction_hash,
);
(TrackedTransactionStatus::Lost, Some(invalidation_status))
},
},
}
}
}
#[async_trait]
impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
type HeaderId = HeaderIdOf<C>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
}
}
#[derive(Debug, PartialEq)]
enum InvalidationStatus<BlockId> {
Finalized(BlockId),
Invalid,
Lost,
}
async fn watch_transaction_status<
E: Environment<C>,
C: Chain,
S: Stream<Item = TransactionStatusOf<C>>,
>(
environment: E,
transaction_hash: HashOf<C>,
subscription: S,
) -> InvalidationStatus<HeaderIdOf<C>> {
futures::pin_mut!(subscription);
loop {
match subscription.next().await {
Some(TransactionStatusOf::<C>::Finalized((block_hash, _))) => {
log::trace!(
target: "bridge",
"{} transaction {:?} has been finalized at block: {:?}",
C::NAME,
transaction_hash,
block_hash,
);
let header_id = match environment.header_id_by_hash(block_hash).await {
Ok(header_id) => header_id,
Err(e) => {
log::error!(
target: "bridge",
"Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
block_hash,
C::NAME,
transaction_hash,
e,
);
return InvalidationStatus::Lost
},
};
return InvalidationStatus::Finalized(header_id)
},
Some(TransactionStatusOf::<C>::Invalid) => {
log::trace!(
target: "bridge",
"{} transaction {:?} has been invalidated",
C::NAME,
transaction_hash,
);
return InvalidationStatus::Invalid
},
Some(TransactionStatusOf::<C>::Future) |
Some(TransactionStatusOf::<C>::Ready) |
Some(TransactionStatusOf::<C>::Broadcast(_)) => {
},
Some(TransactionStatusOf::<C>::InBlock(block_hash)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} has been included in block: {:?}",
C::NAME,
transaction_hash,
block_hash,
);
},
Some(TransactionStatusOf::<C>::Retracted(block_hash)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} at block {:?} has been retracted",
C::NAME,
transaction_hash,
block_hash,
);
},
Some(TransactionStatusOf::<C>::FinalityTimeout(block_hash)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} block {:?} has not been finalized for too long",
C::NAME,
transaction_hash,
block_hash,
);
return InvalidationStatus::Lost
},
Some(TransactionStatusOf::<C>::Usurped(new_transaction_hash)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} has been usurped by new transaction: {:?}",
C::NAME,
transaction_hash,
new_transaction_hash,
);
return InvalidationStatus::Lost
},
Some(TransactionStatusOf::<C>::Dropped) => {
log::trace!(
target: "bridge",
"{} transaction {:?} has been dropped from the pool",
C::NAME,
transaction_hash,
);
return InvalidationStatus::Lost
},
None => {
return InvalidationStatus::Lost
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_chain::TestChain, StreamDescription};
use futures::{FutureExt, SinkExt};
use sc_transaction_pool_api::TransactionStatus;
struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);
#[async_trait]
impl Environment<TestChain> for TestEnvironment {
async fn header_id_by_hash(
&self,
_hash: HashOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, Error> {
self.0.as_ref().map_err(|_| Error::BridgePalletIsNotInitialized).cloned()
}
}
async fn on_transaction_status(
status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
) -> Option<(
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription::new_forwarded(
StreamDescription::new("test".into(), "test".into()),
receiver,
),
);
let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100));
let wait_for_stall_timeout_rest = futures::future::ready(());
sender.send(Ok(status)).await.unwrap();
let (ts, is) =
tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await;
is.map(|is| (ts, is))
}
#[async_std::test]
async fn returns_finalized_on_finalized() {
assert_eq!(
on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
Some((
TrackedTransactionStatus::Finalized(Default::default()),
InvalidationStatus::Finalized(Default::default())
)),
);
}
#[async_std::test]
async fn returns_lost_on_finalized_and_environment_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Err(Error::BridgePalletIsNotInitialized)),
Default::default(),
futures::stream::iter([TransactionStatus::Finalized(Default::default())])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
#[async_std::test]
async fn returns_invalid_on_invalid() {
assert_eq!(
on_transaction_status(TransactionStatus::Invalid).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)),
);
}
#[async_std::test]
async fn waits_on_future() {
assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,);
}
#[async_std::test]
async fn waits_on_ready() {
assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,);
}
#[async_std::test]
async fn waits_on_broadcast() {
assert_eq!(
on_transaction_status(TransactionStatus::Broadcast(Default::default())).await,
None,
);
}
#[async_std::test]
async fn waits_on_in_block() {
assert_eq!(
on_transaction_status(TransactionStatus::InBlock(Default::default())).await,
None,
);
}
#[async_std::test]
async fn waits_on_retracted() {
assert_eq!(
on_transaction_status(TransactionStatus::Retracted(Default::default())).await,
None,
);
}
#[async_std::test]
async fn lost_on_finality_timeout() {
assert_eq!(
on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_usurped() {
assert_eq!(
on_transaction_status(TransactionStatus::Usurped(Default::default())).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_dropped() {
assert_eq!(
on_transaction_status(TransactionStatus::Dropped).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_subscription_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Default::default(),
futures::stream::iter([])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription::new_forwarded(
StreamDescription::new("test".into(), "test".into()),
receiver,
),
);
let wait_for_stall_timeout = futures::future::ready(()).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
let wait_result = tx_tracker
.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest)
.now_or_never();
assert_eq!(wait_result, Some((TrackedTransactionStatus::Lost, None)));
}
}