sc_transaction_pool/graph/
future.rs1use std::{
20 collections::{HashMap, HashSet},
21 fmt, hash,
22 sync::Arc,
23};
24
25use sp_core::hexdisplay::HexDisplay;
26use sp_runtime::transaction_validity::TransactionTag as Tag;
27use std::time::Instant;
28
29use super::base_pool::Transaction;
30use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
31
32pub struct WaitingTransaction<Hash, Ex> {
34 pub transaction: Arc<Transaction<Hash, Ex>>,
36 pub missing_tags: HashSet<Tag>,
38 pub imported_at: Instant,
40}
41
42impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
43 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
44 write!(fmt, "WaitingTransaction {{ ")?;
45 write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
46 write!(fmt, "transaction: {:?}, ", self.transaction)?;
47 write!(
48 fmt,
49 "missing_tags: {{{}}}",
50 self.missing_tags
51 .iter()
52 .map(|tag| HexDisplay::from(tag).to_string())
53 .collect::<Vec<_>>()
54 .join(", "),
55 )?;
56 write!(fmt, "}}")
57 }
58}
59
60impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
61 fn clone(&self) -> Self {
62 Self {
63 transaction: self.transaction.clone(),
64 missing_tags: self.missing_tags.clone(),
65 imported_at: self.imported_at,
66 }
67 }
68}
69
70impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
71 pub fn new(
76 transaction: Transaction<Hash, Ex>,
77 provided: &HashMap<Tag, Hash>,
78 recently_pruned: &[HashSet<Tag>],
79 ) -> Self {
80 let missing_tags = transaction
81 .requires
82 .iter()
83 .filter(|tag| {
84 let is_provided = provided.contains_key(&**tag) ||
87 recently_pruned.iter().any(|x| x.contains(&**tag));
88 !is_provided
89 })
90 .cloned()
91 .collect();
92
93 Self { transaction: Arc::new(transaction), missing_tags, imported_at: Instant::now() }
94 }
95
96 pub fn satisfy_tag(&mut self, tag: &Tag) {
98 self.missing_tags.remove(tag);
99 }
100
101 pub fn is_ready(&self) -> bool {
103 self.missing_tags.is_empty()
104 }
105}
106
107#[derive(Clone, Debug)]
112pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
113 wanted_tags: HashMap<Tag, HashSet<Hash>>,
115 waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
117}
118
119impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
120 fn default() -> Self {
121 Self { wanted_tags: Default::default(), waiting: Default::default() }
122 }
123}
124
125const WAITING_PROOF: &str = r"#
126In import we always insert to `waiting` if we push to `wanted_tags`;
127when removing from `waiting` we always clear `wanted_tags`;
128every hash from `wanted_tags` is always present in `waiting`;
129qed
130#";
131
132impl<Hash: hash::Hash + Eq + Clone + std::fmt::Debug, Ex: std::fmt::Debug>
133 FutureTransactions<Hash, Ex>
134{
135 pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
142 assert!(!tx.is_ready(), "Transaction is ready.");
143 assert!(
144 !self.waiting.contains_key(&tx.transaction.hash),
145 "Transaction is already imported."
146 );
147
148 for tag in &tx.missing_tags {
150 let entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
151 entry.insert(tx.transaction.hash.clone());
152 }
153
154 self.waiting.insert(tx.transaction.hash.clone(), tx);
156 }
157
158 pub fn contains(&self, hash: &Hash) -> bool {
160 self.waiting.contains_key(hash)
161 }
162
163 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
165 hashes
166 .iter()
167 .map(|h| self.waiting.get(h).map(|x| x.transaction.clone()))
168 .collect()
169 }
170
171 pub fn prune_tags(&mut self, tags: &Vec<Tag>) -> Vec<Arc<Transaction<Hash, Ex>>> {
175 let pruned = self
176 .waiting
177 .values()
178 .filter_map(|tx| {
179 tx.transaction
180 .provides
181 .iter()
182 .any(|provided_tag| tags.contains(provided_tag))
183 .then(|| tx.transaction.hash.clone())
184 })
185 .collect::<Vec<_>>();
186
187 log_xt_trace!(target: LOG_TARGET, &pruned, "FutureTransactions: removed while pruning tags.");
188 self.remove(&pruned)
189 }
190
191 pub fn satisfy_tags<T: AsRef<Tag>>(
196 &mut self,
197 tags: impl IntoIterator<Item = T>,
198 ) -> Vec<WaitingTransaction<Hash, Ex>> {
199 let mut became_ready = vec![];
200
201 for tag in tags {
202 if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
203 for hash in hashes {
204 let is_ready = {
205 let tx = self.waiting.get_mut(&hash).expect(WAITING_PROOF);
206 tx.satisfy_tag(tag.as_ref());
207 tx.is_ready()
208 };
209
210 if is_ready {
211 let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
212 became_ready.push(tx);
213 }
214 }
215 }
216 }
217
218 became_ready
219 }
220
221 pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
225 let mut removed = vec![];
226 for hash in hashes {
227 if let Some(waiting_tx) = self.waiting.remove(hash) {
228 for tag in waiting_tx.missing_tags {
230 let remove = if let Some(wanted) = self.wanted_tags.get_mut(&tag) {
231 wanted.remove(hash);
232 wanted.is_empty()
233 } else {
234 false
235 };
236 if remove {
237 self.wanted_tags.remove(&tag);
238 }
239 }
240 removed.push(waiting_tx.transaction)
242 }
243 }
244
245 removed
246 }
247
248 pub fn fold<R, F: FnMut(Option<R>, &WaitingTransaction<Hash, Ex>) -> Option<R>>(
250 &mut self,
251 f: F,
252 ) -> Option<R> {
253 self.waiting.values().fold(None, f)
254 }
255
256 pub fn all(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
258 self.waiting.values().map(|waiting| &*waiting.transaction)
259 }
260
261 pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
263 self.wanted_tags.clear();
264 self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
265 }
266
267 pub fn len(&self) -> usize {
269 self.waiting.len()
270 }
271
272 pub fn bytes(&self) -> usize {
274 self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes)
275 }
276}