sc_transaction_pool/graph/
listener.rs1use std::{collections::HashMap, fmt::Debug, hash};
20
21use linked_hash_map::LinkedHashMap;
22use tracing::trace;
23
24use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
25
26static LOG_TARGET: &str = "txpool::watcher";
27
28pub trait EventHandler<C: ChainApi> {
35 fn broadcasted(&self, _hash: ExtrinsicHash<C>, _peers: Vec<String>) {}
37
38 fn ready(&self, _tx: ExtrinsicHash<C>) {}
40
41 fn future(&self, _tx: ExtrinsicHash<C>) {}
43
44 fn limits_enforced(&self, _tx: ExtrinsicHash<C>) {}
46
47 fn usurped(&self, _tx: ExtrinsicHash<C>, _by: ExtrinsicHash<C>) {}
49
50 fn dropped(&self, _tx: ExtrinsicHash<C>) {}
52
53 fn invalid(&self, _tx: ExtrinsicHash<C>) {}
55
56 fn pruned(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {}
58
59 fn retracted(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>) {}
61
62 fn finality_timeout(&self, _tx: ExtrinsicHash<C>, _hash: BlockHash<C>) {}
64
65 fn finalized(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {}
67}
68
69impl<C: ChainApi> EventHandler<C> for () {}
70
71pub struct EventDispatcher<H: hash::Hash + Eq, C: ChainApi, L: EventHandler<C>> {
75 watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
77 finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
78
79 event_handler: Option<L>,
82}
83
84const MAX_FINALITY_WATCHERS: usize = 512;
86
87impl<H: hash::Hash + Eq + Debug, C: ChainApi, L: EventHandler<C>> Default
88 for EventDispatcher<H, C, L>
89{
90 fn default() -> Self {
91 Self {
92 watchers: Default::default(),
93 finality_watchers: Default::default(),
94 event_handler: None,
95 }
96 }
97}
98
99impl<C: ChainApi, L: EventHandler<C>> EventDispatcher<ExtrinsicHash<C>, C, L> {
100 pub fn new_with_event_handler(event_handler: Option<L>) -> Self {
102 Self { event_handler, ..Default::default() }
103 }
104
105 fn fire<F>(&mut self, hash: &ExtrinsicHash<C>, fun: F)
106 where
107 F: FnOnce(&mut watcher::Sender<ExtrinsicHash<C>, ExtrinsicHash<C>>),
108 {
109 let clean = if let Some(h) = self.watchers.get_mut(hash) {
110 fun(h);
111 h.is_done()
112 } else {
113 false
114 };
115
116 if clean {
117 self.watchers.remove(hash);
118 }
119 }
120
121 pub fn create_watcher(
125 &mut self,
126 hash: ExtrinsicHash<C>,
127 ) -> watcher::Watcher<ExtrinsicHash<C>, ExtrinsicHash<C>> {
128 let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default);
129 sender.new_watcher(hash)
130 }
131
132 pub fn broadcasted(&mut self, tx_hash: &ExtrinsicHash<C>, peers: Vec<String>) {
134 trace!(
135 target: LOG_TARGET,
136 ?tx_hash,
137 "Broadcasted."
138 );
139 self.fire(tx_hash, |watcher| watcher.broadcast(peers.clone()));
140 self.event_handler.as_ref().map(|l| l.broadcasted(*tx_hash, peers));
141 }
142
143 pub fn ready(&mut self, tx: &ExtrinsicHash<C>, old: Option<&ExtrinsicHash<C>>) {
145 trace!(
146 target: LOG_TARGET,
147 tx_hash = ?*tx,
148 replaced_with = ?old,
149 "Ready."
150 );
151 self.fire(tx, |watcher| watcher.ready());
152 if let Some(old) = old {
153 self.fire(old, |watcher| watcher.usurped(*tx));
154 }
155
156 self.event_handler.as_ref().map(|l| l.ready(*tx));
157 }
158
159 pub fn future(&mut self, tx_hash: &ExtrinsicHash<C>) {
161 trace!(
162 target: LOG_TARGET,
163 ?tx_hash,
164 "Future."
165 );
166 self.fire(tx_hash, |watcher| watcher.future());
167
168 self.event_handler.as_ref().map(|l| l.future(*tx_hash));
169 }
170
171 pub fn limits_enforced(&mut self, tx_hash: &ExtrinsicHash<C>) {
173 trace!(
174 target: LOG_TARGET,
175 ?tx_hash,
176 "Dropped (limits enforced)."
177 );
178 self.fire(tx_hash, |watcher| watcher.limit_enforced());
179
180 self.event_handler.as_ref().map(|l| l.limits_enforced(*tx_hash));
181 }
182
183 pub fn usurped(&mut self, tx: &ExtrinsicHash<C>, by: &ExtrinsicHash<C>) {
185 trace!(
186 target: LOG_TARGET,
187 tx_hash = ?tx,
188 ?by,
189 "Dropped (replaced)."
190 );
191 self.fire(tx, |watcher| watcher.usurped(*by));
192
193 self.event_handler.as_ref().map(|l| l.usurped(*tx, *by));
194 }
195
196 pub fn dropped(&mut self, tx_hash: &ExtrinsicHash<C>) {
199 trace!(
200 target: LOG_TARGET,
201 ?tx_hash,
202 "Dropped."
203 );
204 self.fire(tx_hash, |watcher| watcher.dropped());
205 self.event_handler.as_ref().map(|l| l.dropped(*tx_hash));
206 }
207
208 pub fn invalid(&mut self, tx_hash: &ExtrinsicHash<C>) {
210 trace!(
211 target: LOG_TARGET,
212 ?tx_hash,
213 "Extrinsic invalid."
214 );
215 self.fire(tx_hash, |watcher| watcher.invalid());
216 self.event_handler.as_ref().map(|l| l.invalid(*tx_hash));
217 }
218
219 pub fn pruned(&mut self, block_hash: BlockHash<C>, tx_hash: &ExtrinsicHash<C>) {
221 trace!(
222 target: LOG_TARGET,
223 ?tx_hash,
224 ?block_hash,
225 "Pruned at."
226 );
227 let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
229 txs.push(*tx_hash);
230 let tx_index = txs.len() - 1;
232
233 self.fire(tx_hash, |watcher| watcher.in_block(block_hash, tx_index));
234 self.event_handler.as_ref().map(|l| l.pruned(*tx_hash, block_hash, tx_index));
235
236 while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
237 if let Some((hash, txs)) = self.finality_watchers.pop_front() {
238 for tx in txs {
239 self.fire(&tx, |watcher| watcher.finality_timeout(hash));
240 self.event_handler.as_ref().map(|l| l.finality_timeout(tx, block_hash));
241 }
242 }
243 }
244 }
245
246 pub fn retracted(&mut self, block_hash: BlockHash<C>) {
248 if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
249 for hash in hashes {
250 self.fire(&hash, |watcher| watcher.retracted(block_hash));
251 self.event_handler.as_ref().map(|l| l.retracted(hash, block_hash));
252 }
253 }
254 }
255
256 pub fn finalized(&mut self, block_hash: BlockHash<C>) {
258 if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
259 for (tx_index, tx_hash) in hashes.into_iter().enumerate() {
260 trace!(
261 target: LOG_TARGET,
262 ?tx_hash,
263 ?block_hash,
264 "Sent finalization event."
265 );
266 self.fire(&tx_hash, |watcher| watcher.finalized(block_hash, tx_index));
267 self.event_handler.as_ref().map(|l| l.finalized(tx_hash, block_hash, tx_index));
268 }
269 }
270 }
271
272 pub fn watched_transactions(&self) -> impl Iterator<Item = &ExtrinsicHash<C>> {
274 self.watchers.keys()
275 }
276}