referrerpolicy=no-referrer-when-downgrade

relay_substrate_client/
transaction_tracker.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Helper for tracking transaction invalidation events.
18
19use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
20
21use async_trait::async_trait;
22use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
23use relay_utils::{HeaderId, TrackedTransactionStatus};
24use sp_runtime::traits::Header as _;
25use std::time::Duration;
26
27/// Transaction tracker environment.
28#[async_trait]
29pub trait Environment<C: Chain>: Send + Sync {
30	/// Returns header id by its hash.
31	async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
32}
33
34// TODO (https://github.com/paritytech/parity-bridges-common/issues/2133): remove `Environment` trait
35// after test client is implemented
36#[async_trait]
37impl<C: Chain, T: crate::client::Client<C>> Environment<C> for T {
38	async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
39		self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
40	}
41}
42
43/// Substrate transaction tracker implementation.
44///
45/// Substrate node provides RPC API to submit and watch for transaction events. This way
46/// we may know when transaction is included into block, finalized or rejected. There are
47/// some edge cases, when we can't fully trust this mechanism - e.g. transaction may broadcasted
48/// and then dropped out of node transaction pool (some other cases are also possible - node
49/// restarts, connection lost, ...). Then we can't know for sure - what is currently happening
50/// with our transaction. Is the transaction really lost? Is it still alive on the chain network?
51///
52/// We have several options to handle such cases:
53///
54/// 1) hope that the transaction is still alive and wait for its mining until it is spoiled;
55///
56/// 2) assume that the transaction is lost and resubmit another transaction instantly;
57///
58/// 3) wait for some time (if transaction is mortal - then until block where it dies; if it is
59///    immortal - then for some time that we assume is long enough to mine it) and assume that it is
60///    lost.
61///
62/// This struct implements third option as it seems to be the most optimal.
63pub struct TransactionTracker<C: Chain, E> {
64	environment: E,
65	transaction_hash: HashOf<C>,
66	stall_timeout: Duration,
67	subscription: Subscription<TransactionStatusOf<C>>,
68}
69
70impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
71	/// Create transaction tracker.
72	pub fn new(
73		environment: E,
74		stall_timeout: Duration,
75		transaction_hash: HashOf<C>,
76		subscription: Subscription<TransactionStatusOf<C>>,
77	) -> Self {
78		Self { environment, stall_timeout, transaction_hash, subscription }
79	}
80
81	// TODO (https://github.com/paritytech/parity-bridges-common/issues/2133): remove me after
82	// test client is implemented
83	/// Converts self into tracker with different environment.
84	pub fn switch_environment<NewE: Environment<C>>(
85		self,
86		environment: NewE,
87	) -> TransactionTracker<C, NewE> {
88		TransactionTracker {
89			environment,
90			stall_timeout: self.stall_timeout,
91			transaction_hash: self.transaction_hash,
92			subscription: self.subscription,
93		}
94	}
95
96	/// Wait for final transaction status and return it along with last known internal invalidation
97	/// status.
98	async fn do_wait(
99		self,
100		wait_for_stall_timeout: impl Future<Output = ()>,
101		wait_for_stall_timeout_rest: impl Future<Output = ()>,
102	) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
103		// sometimes we want to wait for the rest of the stall timeout even if
104		// `wait_for_invalidation` has been "select"ed first => it is shared
105		let wait_for_invalidation = watch_transaction_status::<_, C, _>(
106			self.environment,
107			self.transaction_hash,
108			self.subscription,
109		);
110		futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation);
111
112		match futures::future::select(wait_for_stall_timeout, wait_for_invalidation).await {
113			Either::Left((_, _)) => {
114				log::trace!(
115					target: "bridge",
116					"{} transaction {:?} is considered lost after timeout (no status response from the node)",
117					C::NAME,
118					self.transaction_hash,
119				);
120
121				(TrackedTransactionStatus::Lost, None)
122			},
123			Either::Right((invalidation_status, _)) => match invalidation_status {
124				InvalidationStatus::Finalized(at_block) =>
125					(TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
126				InvalidationStatus::Invalid =>
127					(TrackedTransactionStatus::Lost, Some(invalidation_status)),
128				InvalidationStatus::Lost => {
129					// wait for the rest of stall timeout - this way we'll be sure that the
130					// transaction is actually dead if it has been crafted properly
131					wait_for_stall_timeout_rest.await;
132					// if someone is still watching for our transaction, then we're reporting
133					// an error here (which is treated as "transaction lost")
134					log::trace!(
135						target: "bridge",
136						"{} transaction {:?} is considered lost after timeout",
137						C::NAME,
138						self.transaction_hash,
139					);
140
141					(TrackedTransactionStatus::Lost, Some(invalidation_status))
142				},
143			},
144		}
145	}
146}
147
148#[async_trait]
149impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
150	type HeaderId = HeaderIdOf<C>;
151
152	async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
153		let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
154		let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
155		self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
156	}
157}
158
159/// Transaction invalidation status.
160///
161/// Note that in places where the `TransactionTracker` is used, the finalization event will be
162/// ignored - relay loops are detecting the mining/finalization using their own
163/// techniques. That's why we're using `InvalidationStatus` here.
164#[derive(Debug, PartialEq)]
165enum InvalidationStatus<BlockId> {
166	/// Transaction has been included into block and finalized at given block.
167	Finalized(BlockId),
168	/// Transaction has been invalidated.
169	Invalid,
170	/// We have lost track of transaction status.
171	Lost,
172}
173
174/// Watch for transaction status until transaction is finalized or we lose track of its status.
175async fn watch_transaction_status<
176	E: Environment<C>,
177	C: Chain,
178	S: Stream<Item = TransactionStatusOf<C>>,
179>(
180	environment: E,
181	transaction_hash: HashOf<C>,
182	subscription: S,
183) -> InvalidationStatus<HeaderIdOf<C>> {
184	futures::pin_mut!(subscription);
185
186	loop {
187		match subscription.next().await {
188			Some(TransactionStatusOf::<C>::Finalized((block_hash, _))) => {
189				// the only "successful" outcome of this method is when the block with transaction
190				// has been finalized
191				log::trace!(
192					target: "bridge",
193					"{} transaction {:?} has been finalized at block: {:?}",
194					C::NAME,
195					transaction_hash,
196					block_hash,
197				);
198
199				let header_id = match environment.header_id_by_hash(block_hash).await {
200					Ok(header_id) => header_id,
201					Err(e) => {
202						log::error!(
203							target: "bridge",
204							"Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
205							block_hash,
206							C::NAME,
207							transaction_hash,
208							e,
209						);
210						// that's the best option we have here
211						return InvalidationStatus::Lost
212					},
213				};
214				return InvalidationStatus::Finalized(header_id)
215			},
216			Some(TransactionStatusOf::<C>::Invalid) => {
217				// if node says that the transaction is invalid, there are still chances that
218				// it is not actually invalid - e.g. if the block where transaction has been
219				// revalidated is retracted and transaction (at some other node pool) becomes
220				// valid again on other fork. But let's assume that the chances of this event
221				// are almost zero - there's a lot of things that must happen for this to be the
222				// case.
223				log::trace!(
224					target: "bridge",
225					"{} transaction {:?} has been invalidated",
226					C::NAME,
227					transaction_hash,
228				);
229				return InvalidationStatus::Invalid
230			},
231			Some(TransactionStatusOf::<C>::Future) |
232			Some(TransactionStatusOf::<C>::Ready) |
233			Some(TransactionStatusOf::<C>::Broadcast(_)) => {
234				// nothing important (for us) has happened
235			},
236			Some(TransactionStatusOf::<C>::InBlock(block_hash)) => {
237				// TODO: read matching system event (ExtrinsicSuccess or ExtrinsicFailed), log it
238				// here and use it later (on finality) for reporting invalid transaction
239				// https://github.com/paritytech/parity-bridges-common/issues/1464
240				log::trace!(
241					target: "bridge",
242					"{} transaction {:?} has been included in block: {:?}",
243					C::NAME,
244					transaction_hash,
245					block_hash,
246				);
247			},
248			Some(TransactionStatusOf::<C>::Retracted(block_hash)) => {
249				log::trace!(
250					target: "bridge",
251					"{} transaction {:?} at block {:?} has been retracted",
252					C::NAME,
253					transaction_hash,
254					block_hash,
255				);
256			},
257			Some(TransactionStatusOf::<C>::FinalityTimeout(block_hash)) => {
258				// finality is lagging? let's wait a bit more and report a stall
259				log::trace!(
260					target: "bridge",
261					"{} transaction {:?} block {:?} has not been finalized for too long",
262					C::NAME,
263					transaction_hash,
264					block_hash,
265				);
266				return InvalidationStatus::Lost
267			},
268			Some(TransactionStatusOf::<C>::Usurped(new_transaction_hash)) => {
269				// this may be result of our transaction resubmitter work or some manual
270				// intervention. In both cases - let's start stall timeout, because the meaning
271				// of transaction may have changed
272				log::trace!(
273					target: "bridge",
274					"{} transaction {:?} has been usurped by new transaction: {:?}",
275					C::NAME,
276					transaction_hash,
277					new_transaction_hash,
278				);
279				return InvalidationStatus::Lost
280			},
281			Some(TransactionStatusOf::<C>::Dropped) => {
282				// the transaction has been removed from the pool because of its limits. Let's wait
283				// a bit and report a stall
284				log::trace!(
285					target: "bridge",
286					"{} transaction {:?} has been dropped from the pool",
287					C::NAME,
288					transaction_hash,
289				);
290				return InvalidationStatus::Lost
291			},
292			None => {
293				// the status of transaction is unknown to us (the subscription has been closed?).
294				// Let's wait a bit and report a stall
295				return InvalidationStatus::Lost
296			},
297		}
298	}
299}
300
301#[cfg(test)]
302mod tests {
303	use super::*;
304	use crate::{test_chain::TestChain, StreamDescription};
305	use futures::{FutureExt, SinkExt};
306	use sc_transaction_pool_api::TransactionStatus;
307
308	struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);
309
310	#[async_trait]
311	impl Environment<TestChain> for TestEnvironment {
312		async fn header_id_by_hash(
313			&self,
314			_hash: HashOf<TestChain>,
315		) -> Result<HeaderIdOf<TestChain>, Error> {
316			self.0.as_ref().map_err(|_| Error::BridgePalletIsNotInitialized).cloned()
317		}
318	}
319
320	async fn on_transaction_status(
321		status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
322	) -> Option<(
323		TrackedTransactionStatus<HeaderIdOf<TestChain>>,
324		InvalidationStatus<HeaderIdOf<TestChain>>,
325	)> {
326		let (mut sender, receiver) = futures::channel::mpsc::channel(1);
327		let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
328			TestEnvironment(Ok(HeaderId(0, Default::default()))),
329			Duration::from_secs(0),
330			Default::default(),
331			Subscription::new_forwarded(
332				StreamDescription::new("test".into(), "test".into()),
333				receiver,
334			),
335		);
336
337		// we can't do `.now_or_never()` on `do_wait()` call, because `Subscription` has its own
338		// background thread, which may cause additional async task switches => let's leave some
339		// relatively small timeout here
340		let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100));
341		let wait_for_stall_timeout_rest = futures::future::ready(());
342		sender.send(Ok(status)).await.unwrap();
343
344		let (ts, is) =
345			tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await;
346		is.map(|is| (ts, is))
347	}
348
349	#[async_std::test]
350	async fn returns_finalized_on_finalized() {
351		assert_eq!(
352			on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
353			Some((
354				TrackedTransactionStatus::Finalized(Default::default()),
355				InvalidationStatus::Finalized(Default::default())
356			)),
357		);
358	}
359
360	#[async_std::test]
361	async fn returns_lost_on_finalized_and_environment_error() {
362		assert_eq!(
363			watch_transaction_status::<_, TestChain, _>(
364				TestEnvironment(Err(Error::BridgePalletIsNotInitialized)),
365				Default::default(),
366				futures::stream::iter([TransactionStatus::Finalized(Default::default())])
367			)
368			.now_or_never(),
369			Some(InvalidationStatus::Lost),
370		);
371	}
372
373	#[async_std::test]
374	async fn returns_invalid_on_invalid() {
375		assert_eq!(
376			on_transaction_status(TransactionStatus::Invalid).await,
377			Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)),
378		);
379	}
380
381	#[async_std::test]
382	async fn waits_on_future() {
383		assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,);
384	}
385
386	#[async_std::test]
387	async fn waits_on_ready() {
388		assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,);
389	}
390
391	#[async_std::test]
392	async fn waits_on_broadcast() {
393		assert_eq!(
394			on_transaction_status(TransactionStatus::Broadcast(Default::default())).await,
395			None,
396		);
397	}
398
399	#[async_std::test]
400	async fn waits_on_in_block() {
401		assert_eq!(
402			on_transaction_status(TransactionStatus::InBlock(Default::default())).await,
403			None,
404		);
405	}
406
407	#[async_std::test]
408	async fn waits_on_retracted() {
409		assert_eq!(
410			on_transaction_status(TransactionStatus::Retracted(Default::default())).await,
411			None,
412		);
413	}
414
415	#[async_std::test]
416	async fn lost_on_finality_timeout() {
417		assert_eq!(
418			on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await,
419			Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
420		);
421	}
422
423	#[async_std::test]
424	async fn lost_on_usurped() {
425		assert_eq!(
426			on_transaction_status(TransactionStatus::Usurped(Default::default())).await,
427			Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
428		);
429	}
430
431	#[async_std::test]
432	async fn lost_on_dropped() {
433		assert_eq!(
434			on_transaction_status(TransactionStatus::Dropped).await,
435			Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
436		);
437	}
438
439	#[async_std::test]
440	async fn lost_on_subscription_error() {
441		assert_eq!(
442			watch_transaction_status::<_, TestChain, _>(
443				TestEnvironment(Ok(HeaderId(0, Default::default()))),
444				Default::default(),
445				futures::stream::iter([])
446			)
447			.now_or_never(),
448			Some(InvalidationStatus::Lost),
449		);
450	}
451
452	#[async_std::test]
453	async fn lost_on_timeout_when_waiting_for_invalidation_status() {
454		let (_sender, receiver) = futures::channel::mpsc::channel(1);
455		let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
456			TestEnvironment(Ok(HeaderId(0, Default::default()))),
457			Duration::from_secs(0),
458			Default::default(),
459			Subscription::new_forwarded(
460				StreamDescription::new("test".into(), "test".into()),
461				receiver,
462			),
463		);
464
465		let wait_for_stall_timeout = futures::future::ready(()).shared();
466		let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
467		let wait_result = tx_tracker
468			.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest)
469			.now_or_never();
470
471		assert_eq!(wait_result, Some((TrackedTransactionStatus::Lost, None)));
472	}
473}