sc_transaction_pool/graph/
watcher.rs1use futures::Stream;
22use sc_transaction_pool_api::TransactionStatus;
23use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
24
25#[derive(Debug)]
29pub struct Watcher<H, BH> {
30 receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
31 hash: H,
33}
34
35impl<H, BH> Watcher<H, BH> {
36 pub fn hash(&self) -> &H {
38 &self.hash
39 }
40
41 pub fn into_stream(self) -> impl Stream<Item = TransactionStatus<H, BH>> {
45 self.receiver
46 }
47}
48
49#[derive(Debug)]
51pub struct Sender<H, BH> {
52 receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
53 is_finalized: bool,
54}
55
56impl<H, BH> Default for Sender<H, BH> {
57 fn default() -> Self {
58 Sender { receivers: Default::default(), is_finalized: false }
59 }
60}
61
62impl<H: Clone, BH: Clone> Sender<H, BH> {
63 pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
65 let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
66 self.receivers.push(tx);
67 Watcher { receiver, hash }
68 }
69
70 pub fn ready(&mut self) {
72 self.send(TransactionStatus::Ready)
73 }
74
75 pub fn future(&mut self) {
77 self.send(TransactionStatus::Future)
78 }
79
80 pub fn usurped(&mut self, hash: H) {
82 self.send(TransactionStatus::Usurped(hash));
83 self.is_finalized = true;
84 }
85
86 pub fn in_block(&mut self, hash: BH, index: usize) {
88 self.send(TransactionStatus::InBlock((hash, index)));
89 }
90
91 pub fn finalized(&mut self, hash: BH, index: usize) {
93 self.send(TransactionStatus::Finalized((hash, index)));
94 self.is_finalized = true;
95 }
96
97 pub fn finality_timeout(&mut self, hash: BH) {
99 self.send(TransactionStatus::FinalityTimeout(hash));
100 self.is_finalized = true;
101 }
102
103 pub fn retracted(&mut self, hash: BH) {
105 self.send(TransactionStatus::Retracted(hash));
106 }
107
108 pub fn invalid(&mut self) {
110 self.send(TransactionStatus::Invalid);
111 self.is_finalized = true;
113 }
114
115 pub fn limit_enforced(&mut self) {
117 self.send(TransactionStatus::Dropped);
118 self.is_finalized = true;
119 }
120
121 pub fn dropped(&mut self) {
123 self.send(TransactionStatus::Dropped);
124 self.is_finalized = true;
125 }
126
127 pub fn broadcast(&mut self, peers: Vec<String>) {
129 self.send(TransactionStatus::Broadcast(peers))
130 }
131
132 pub fn is_done(&self) -> bool {
134 self.is_finalized || self.receivers.is_empty()
135 }
136
137 fn send(&mut self, status: TransactionStatus<H, BH>) {
138 self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
139 }
140}