referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
watcher.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Extrinsics status updates.
20
21use futures::Stream;
22use sc_transaction_pool_api::TransactionStatus;
23use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
24
25/// Extrinsic watcher.
26///
27/// Represents a stream of status updates for a particular extrinsic.
28#[derive(Debug)]
29pub struct Watcher<H, BH> {
30	receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
31	/// transaction hash of watched extrinsic
32	hash: H,
33}
34
35impl<H, BH> Watcher<H, BH> {
36	/// Returns the transaction hash.
37	pub fn hash(&self) -> &H {
38		&self.hash
39	}
40
41	/// Pipe the notifications to given sink.
42	///
43	/// Make sure to drive the future to completion.
44	pub fn into_stream(self) -> impl Stream<Item = TransactionStatus<H, BH>> {
45		self.receiver
46	}
47}
48
49/// Sender part of the watcher. Exposed only for testing purposes.
50#[derive(Debug)]
51pub struct Sender<H, BH> {
52	receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
53	is_finalized: bool,
54}
55
56impl<H, BH> Default for Sender<H, BH> {
57	fn default() -> Self {
58		Sender { receivers: Default::default(), is_finalized: false }
59	}
60}
61
62impl<H: Clone, BH: Clone> Sender<H, BH> {
63	/// Add a new watcher to this sender object.
64	pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
65		let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
66		self.receivers.push(tx);
67		Watcher { receiver, hash }
68	}
69
70	/// Transaction became ready.
71	pub fn ready(&mut self) {
72		self.send(TransactionStatus::Ready)
73	}
74
75	/// Transaction was moved to future.
76	pub fn future(&mut self) {
77		self.send(TransactionStatus::Future)
78	}
79
80	/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
81	pub fn usurped(&mut self, hash: H) {
82		self.send(TransactionStatus::Usurped(hash));
83		self.is_finalized = true;
84	}
85
86	/// Extrinsic has been included in block with given hash.
87	pub fn in_block(&mut self, hash: BH, index: usize) {
88		self.send(TransactionStatus::InBlock((hash, index)));
89	}
90
91	/// Extrinsic has been finalized by a finality gadget.
92	pub fn finalized(&mut self, hash: BH, index: usize) {
93		self.send(TransactionStatus::Finalized((hash, index)));
94		self.is_finalized = true;
95	}
96
97	/// The block this extrinsic was included in has been retracted
98	pub fn finality_timeout(&mut self, hash: BH) {
99		self.send(TransactionStatus::FinalityTimeout(hash));
100		self.is_finalized = true;
101	}
102
103	/// The block this extrinsic was included in has been retracted
104	pub fn retracted(&mut self, hash: BH) {
105		self.send(TransactionStatus::Retracted(hash));
106	}
107
108	/// Extrinsic has been marked as invalid by the block builder.
109	pub fn invalid(&mut self) {
110		self.send(TransactionStatus::Invalid);
111		// we mark as finalized as there are no more notifications
112		self.is_finalized = true;
113	}
114
115	/// Transaction has been dropped from the pool because of the limit.
116	pub fn limit_enforced(&mut self) {
117		self.send(TransactionStatus::Dropped);
118		self.is_finalized = true;
119	}
120
121	/// Transaction has been dropped from the pool.
122	pub fn dropped(&mut self) {
123		self.send(TransactionStatus::Dropped);
124		self.is_finalized = true;
125	}
126
127	/// The extrinsic has been broadcast to the given peers.
128	pub fn broadcast(&mut self, peers: Vec<String>) {
129		self.send(TransactionStatus::Broadcast(peers))
130	}
131
132	/// Returns true if there are no more listeners for this extrinsic, or it was finalized.
133	pub fn is_done(&self) -> bool {
134		self.is_finalized || self.receivers.is_empty()
135	}
136
137	fn send(&mut self, status: TransactionStatus<H, BH>) {
138		self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
139	}
140}