1use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
20
21use async_trait::async_trait;
22use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
23use relay_utils::{HeaderId, TrackedTransactionStatus};
24use sp_runtime::traits::Header as _;
25use std::time::Duration;
26
27#[async_trait]
29pub trait Environment<C: Chain>: Send + Sync {
30 async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
32}
33
34#[async_trait]
37impl<C: Chain, T: crate::client::Client<C>> Environment<C> for T {
38 async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
39 self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
40 }
41}
42
43pub struct TransactionTracker<C: Chain, E> {
64 environment: E,
65 transaction_hash: HashOf<C>,
66 stall_timeout: Duration,
67 subscription: Subscription<TransactionStatusOf<C>>,
68}
69
70impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
71 pub fn new(
73 environment: E,
74 stall_timeout: Duration,
75 transaction_hash: HashOf<C>,
76 subscription: Subscription<TransactionStatusOf<C>>,
77 ) -> Self {
78 Self { environment, stall_timeout, transaction_hash, subscription }
79 }
80
81 pub fn switch_environment<NewE: Environment<C>>(
85 self,
86 environment: NewE,
87 ) -> TransactionTracker<C, NewE> {
88 TransactionTracker {
89 environment,
90 stall_timeout: self.stall_timeout,
91 transaction_hash: self.transaction_hash,
92 subscription: self.subscription,
93 }
94 }
95
96 async fn do_wait(
99 self,
100 wait_for_stall_timeout: impl Future<Output = ()>,
101 wait_for_stall_timeout_rest: impl Future<Output = ()>,
102 ) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
103 let wait_for_invalidation = watch_transaction_status::<_, C, _>(
106 self.environment,
107 self.transaction_hash,
108 self.subscription,
109 );
110 futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation);
111
112 match futures::future::select(wait_for_stall_timeout, wait_for_invalidation).await {
113 Either::Left((_, _)) => {
114 log::trace!(
115 target: "bridge",
116 "{} transaction {:?} is considered lost after timeout (no status response from the node)",
117 C::NAME,
118 self.transaction_hash,
119 );
120
121 (TrackedTransactionStatus::Lost, None)
122 },
123 Either::Right((invalidation_status, _)) => match invalidation_status {
124 InvalidationStatus::Finalized(at_block) =>
125 (TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
126 InvalidationStatus::Invalid =>
127 (TrackedTransactionStatus::Lost, Some(invalidation_status)),
128 InvalidationStatus::Lost => {
129 wait_for_stall_timeout_rest.await;
132 log::trace!(
135 target: "bridge",
136 "{} transaction {:?} is considered lost after timeout",
137 C::NAME,
138 self.transaction_hash,
139 );
140
141 (TrackedTransactionStatus::Lost, Some(invalidation_status))
142 },
143 },
144 }
145 }
146}
147
148#[async_trait]
149impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
150 type HeaderId = HeaderIdOf<C>;
151
152 async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
153 let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
154 let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
155 self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
156 }
157}
158
159#[derive(Debug, PartialEq)]
165enum InvalidationStatus<BlockId> {
166 Finalized(BlockId),
168 Invalid,
170 Lost,
172}
173
174async fn watch_transaction_status<
176 E: Environment<C>,
177 C: Chain,
178 S: Stream<Item = TransactionStatusOf<C>>,
179>(
180 environment: E,
181 transaction_hash: HashOf<C>,
182 subscription: S,
183) -> InvalidationStatus<HeaderIdOf<C>> {
184 futures::pin_mut!(subscription);
185
186 loop {
187 match subscription.next().await {
188 Some(TransactionStatusOf::<C>::Finalized((block_hash, _))) => {
189 log::trace!(
192 target: "bridge",
193 "{} transaction {:?} has been finalized at block: {:?}",
194 C::NAME,
195 transaction_hash,
196 block_hash,
197 );
198
199 let header_id = match environment.header_id_by_hash(block_hash).await {
200 Ok(header_id) => header_id,
201 Err(e) => {
202 log::error!(
203 target: "bridge",
204 "Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
205 block_hash,
206 C::NAME,
207 transaction_hash,
208 e,
209 );
210 return InvalidationStatus::Lost
212 },
213 };
214 return InvalidationStatus::Finalized(header_id)
215 },
216 Some(TransactionStatusOf::<C>::Invalid) => {
217 log::trace!(
224 target: "bridge",
225 "{} transaction {:?} has been invalidated",
226 C::NAME,
227 transaction_hash,
228 );
229 return InvalidationStatus::Invalid
230 },
231 Some(TransactionStatusOf::<C>::Future) |
232 Some(TransactionStatusOf::<C>::Ready) |
233 Some(TransactionStatusOf::<C>::Broadcast(_)) => {
234 },
236 Some(TransactionStatusOf::<C>::InBlock(block_hash)) => {
237 log::trace!(
241 target: "bridge",
242 "{} transaction {:?} has been included in block: {:?}",
243 C::NAME,
244 transaction_hash,
245 block_hash,
246 );
247 },
248 Some(TransactionStatusOf::<C>::Retracted(block_hash)) => {
249 log::trace!(
250 target: "bridge",
251 "{} transaction {:?} at block {:?} has been retracted",
252 C::NAME,
253 transaction_hash,
254 block_hash,
255 );
256 },
257 Some(TransactionStatusOf::<C>::FinalityTimeout(block_hash)) => {
258 log::trace!(
260 target: "bridge",
261 "{} transaction {:?} block {:?} has not been finalized for too long",
262 C::NAME,
263 transaction_hash,
264 block_hash,
265 );
266 return InvalidationStatus::Lost
267 },
268 Some(TransactionStatusOf::<C>::Usurped(new_transaction_hash)) => {
269 log::trace!(
273 target: "bridge",
274 "{} transaction {:?} has been usurped by new transaction: {:?}",
275 C::NAME,
276 transaction_hash,
277 new_transaction_hash,
278 );
279 return InvalidationStatus::Lost
280 },
281 Some(TransactionStatusOf::<C>::Dropped) => {
282 log::trace!(
285 target: "bridge",
286 "{} transaction {:?} has been dropped from the pool",
287 C::NAME,
288 transaction_hash,
289 );
290 return InvalidationStatus::Lost
291 },
292 None => {
293 return InvalidationStatus::Lost
296 },
297 }
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use crate::{test_chain::TestChain, StreamDescription};
305 use futures::{FutureExt, SinkExt};
306 use sc_transaction_pool_api::TransactionStatus;
307
308 struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);
309
310 #[async_trait]
311 impl Environment<TestChain> for TestEnvironment {
312 async fn header_id_by_hash(
313 &self,
314 _hash: HashOf<TestChain>,
315 ) -> Result<HeaderIdOf<TestChain>, Error> {
316 self.0.as_ref().map_err(|_| Error::BridgePalletIsNotInitialized).cloned()
317 }
318 }
319
320 async fn on_transaction_status(
321 status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
322 ) -> Option<(
323 TrackedTransactionStatus<HeaderIdOf<TestChain>>,
324 InvalidationStatus<HeaderIdOf<TestChain>>,
325 )> {
326 let (mut sender, receiver) = futures::channel::mpsc::channel(1);
327 let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
328 TestEnvironment(Ok(HeaderId(0, Default::default()))),
329 Duration::from_secs(0),
330 Default::default(),
331 Subscription::new_forwarded(
332 StreamDescription::new("test".into(), "test".into()),
333 receiver,
334 ),
335 );
336
337 let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100));
341 let wait_for_stall_timeout_rest = futures::future::ready(());
342 sender.send(Ok(status)).await.unwrap();
343
344 let (ts, is) =
345 tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await;
346 is.map(|is| (ts, is))
347 }
348
349 #[async_std::test]
350 async fn returns_finalized_on_finalized() {
351 assert_eq!(
352 on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
353 Some((
354 TrackedTransactionStatus::Finalized(Default::default()),
355 InvalidationStatus::Finalized(Default::default())
356 )),
357 );
358 }
359
360 #[async_std::test]
361 async fn returns_lost_on_finalized_and_environment_error() {
362 assert_eq!(
363 watch_transaction_status::<_, TestChain, _>(
364 TestEnvironment(Err(Error::BridgePalletIsNotInitialized)),
365 Default::default(),
366 futures::stream::iter([TransactionStatus::Finalized(Default::default())])
367 )
368 .now_or_never(),
369 Some(InvalidationStatus::Lost),
370 );
371 }
372
373 #[async_std::test]
374 async fn returns_invalid_on_invalid() {
375 assert_eq!(
376 on_transaction_status(TransactionStatus::Invalid).await,
377 Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)),
378 );
379 }
380
381 #[async_std::test]
382 async fn waits_on_future() {
383 assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,);
384 }
385
386 #[async_std::test]
387 async fn waits_on_ready() {
388 assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,);
389 }
390
391 #[async_std::test]
392 async fn waits_on_broadcast() {
393 assert_eq!(
394 on_transaction_status(TransactionStatus::Broadcast(Default::default())).await,
395 None,
396 );
397 }
398
399 #[async_std::test]
400 async fn waits_on_in_block() {
401 assert_eq!(
402 on_transaction_status(TransactionStatus::InBlock(Default::default())).await,
403 None,
404 );
405 }
406
407 #[async_std::test]
408 async fn waits_on_retracted() {
409 assert_eq!(
410 on_transaction_status(TransactionStatus::Retracted(Default::default())).await,
411 None,
412 );
413 }
414
415 #[async_std::test]
416 async fn lost_on_finality_timeout() {
417 assert_eq!(
418 on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await,
419 Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
420 );
421 }
422
423 #[async_std::test]
424 async fn lost_on_usurped() {
425 assert_eq!(
426 on_transaction_status(TransactionStatus::Usurped(Default::default())).await,
427 Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
428 );
429 }
430
431 #[async_std::test]
432 async fn lost_on_dropped() {
433 assert_eq!(
434 on_transaction_status(TransactionStatus::Dropped).await,
435 Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
436 );
437 }
438
439 #[async_std::test]
440 async fn lost_on_subscription_error() {
441 assert_eq!(
442 watch_transaction_status::<_, TestChain, _>(
443 TestEnvironment(Ok(HeaderId(0, Default::default()))),
444 Default::default(),
445 futures::stream::iter([])
446 )
447 .now_or_never(),
448 Some(InvalidationStatus::Lost),
449 );
450 }
451
452 #[async_std::test]
453 async fn lost_on_timeout_when_waiting_for_invalidation_status() {
454 let (_sender, receiver) = futures::channel::mpsc::channel(1);
455 let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
456 TestEnvironment(Ok(HeaderId(0, Default::default()))),
457 Duration::from_secs(0),
458 Default::default(),
459 Subscription::new_forwarded(
460 StreamDescription::new("test".into(), "test".into()),
461 receiver,
462 ),
463 );
464
465 let wait_for_stall_timeout = futures::future::ready(()).shared();
466 let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
467 let wait_result = tx_tracker
468 .do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest)
469 .now_or_never();
470
471 assert_eq!(wait_result, Some((TrackedTransactionStatus::Lost, None)));
472 }
473}