sc_transaction_pool/graph/
listener.rs1use std::{collections::HashMap, fmt::Debug, hash};
20
21use crate::LOG_TARGET;
22use linked_hash_map::LinkedHashMap;
23use log::{debug, trace};
24use serde::Serialize;
25use sp_runtime::traits;
26
27use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
28
29pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
31 watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
32 finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
33}
34
35const MAX_FINALITY_WATCHERS: usize = 512;
37
38impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
39 fn default() -> Self {
40 Self { watchers: Default::default(), finality_watchers: Default::default() }
41 }
42}
43
44impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
45 fn fire<F>(&mut self, hash: &H, fun: F)
46 where
47 F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>),
48 {
49 let clean = if let Some(h) = self.watchers.get_mut(hash) {
50 fun(h);
51 h.is_done()
52 } else {
53 false
54 };
55
56 if clean {
57 self.watchers.remove(hash);
58 }
59 }
60
61 pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
65 let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
66 sender.new_watcher(hash)
67 }
68
69 pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
71 trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash);
72 self.fire(hash, |watcher| watcher.broadcast(peers));
73 }
74
75 pub fn ready(&mut self, tx: &H, old: Option<&H>) {
77 trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old);
78 self.fire(tx, |watcher| watcher.ready());
79 if let Some(old) = old {
80 self.fire(old, |watcher| watcher.usurped(tx.clone()));
81 }
82 }
83
84 pub fn future(&mut self, tx: &H) {
86 trace!(target: LOG_TARGET, "[{:?}] Future", tx);
87 self.fire(tx, |watcher| watcher.future());
88 }
89
90 pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
92 trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by);
93 self.fire(tx, |watcher| match by {
94 Some(t) => watcher.usurped(t.clone()),
95 None => watcher.dropped(),
96 })
97 }
98
99 pub fn invalid(&mut self, tx: &H) {
101 debug!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
102 self.fire(tx, |watcher| watcher.invalid());
103 }
104
105 pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
107 debug!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash);
108 let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
110 txs.push(tx.clone());
111 let tx_index = txs.len() - 1;
113
114 self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index));
115
116 while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
117 if let Some((hash, txs)) = self.finality_watchers.pop_front() {
118 for tx in txs {
119 self.fire(&tx, |watcher| watcher.finality_timeout(hash));
120 }
121 }
122 }
123 }
124
125 pub fn retracted(&mut self, block_hash: BlockHash<C>) {
127 if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
128 for hash in hashes {
129 self.fire(&hash, |watcher| watcher.retracted(block_hash))
130 }
131 }
132 }
133
134 pub fn finalized(&mut self, block_hash: BlockHash<C>) {
136 if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
137 for (tx_index, hash) in hashes.into_iter().enumerate() {
138 log::debug!(
139 target: LOG_TARGET,
140 "[{:?}] Sent finalization event (block {:?})",
141 hash,
142 block_hash,
143 );
144 self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index))
145 }
146 }
147 }
148}