1use std::{
20 cmp,
21 collections::{BTreeSet, HashMap, HashSet},
22 hash,
23 sync::Arc,
24};
25
26use crate::LOG_TARGET;
27use log::{debug, trace};
28use sc_transaction_pool_api::error;
29use serde::Serialize;
30use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
31
32use super::{
33 base_pool::Transaction,
34 future::WaitingTransaction,
35 tracked_map::{self, TrackedMap},
36};
37
38#[derive(Debug)]
42pub struct TransactionRef<Hash, Ex> {
43 pub transaction: Arc<Transaction<Hash, Ex>>,
45 pub insertion_id: u64,
47}
48
49impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
50 fn clone(&self) -> Self {
51 Self { transaction: self.transaction.clone(), insertion_id: self.insertion_id }
52 }
53}
54
55impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
56 fn cmp(&self, other: &Self) -> cmp::Ordering {
57 self.transaction
58 .priority
59 .cmp(&other.transaction.priority)
60 .then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till))
61 .then_with(|| other.insertion_id.cmp(&self.insertion_id))
62 }
63}
64
65impl<Hash, Ex> PartialOrd for TransactionRef<Hash, Ex> {
66 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
67 Some(self.cmp(other))
68 }
69}
70
71impl<Hash, Ex> PartialEq for TransactionRef<Hash, Ex> {
72 fn eq(&self, other: &Self) -> bool {
73 self.cmp(other) == cmp::Ordering::Equal
74 }
75}
76impl<Hash, Ex> Eq for TransactionRef<Hash, Ex> {}
77
78#[derive(Debug)]
79pub struct ReadyTx<Hash, Ex> {
80 pub transaction: TransactionRef<Hash, Ex>,
82 pub unlocks: Vec<Hash>,
84 pub requires_offset: usize,
89}
90
91impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
92 fn clone(&self) -> Self {
93 Self {
94 transaction: self.transaction.clone(),
95 unlocks: self.unlocks.clone(),
96 requires_offset: self.requires_offset,
97 }
98 }
99}
100
101const HASH_READY: &str = r#"
102Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
103Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
104Hence every hash retrieved from `provided_tags` is always present in `ready`;
105qed
106"#;
107
108#[derive(Debug)]
110pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
111 insertion_id: u64,
113 provided_tags: HashMap<Tag, Hash>,
116 ready: TrackedMap<Hash, ReadyTx<Hash, Ex>>,
118 best: BTreeSet<TransactionRef<Hash, Ex>>,
121}
122
123impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {
124 fn size(&self) -> usize {
125 self.transaction.transaction.bytes
126 }
127}
128
129impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
130 fn default() -> Self {
131 Self {
132 insertion_id: Default::default(),
133 provided_tags: Default::default(),
134 ready: Default::default(),
135 best: Default::default(),
136 }
137 }
138}
139
140impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
141 pub fn provided_tags(&self) -> &HashMap<Tag, Hash> {
143 &self.provided_tags
144 }
145
146 pub fn get(&self) -> BestIterator<Hash, Ex> {
165 BestIterator {
166 all: self.ready.clone_map(),
167 best: self.best.clone(),
168 awaiting: Default::default(),
169 invalid: Default::default(),
170 }
171 }
172
173 pub fn import(
179 &mut self,
180 tx: WaitingTransaction<Hash, Ex>,
181 ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
182 assert!(
183 tx.is_ready(),
184 "Only ready transactions can be imported. Missing: {:?}",
185 tx.missing_tags
186 );
187 assert!(
188 !self.ready.read().contains_key(&tx.transaction.hash),
189 "Transaction is already imported."
190 );
191
192 self.insertion_id += 1;
193 let insertion_id = self.insertion_id;
194 let hash = tx.transaction.hash.clone();
195 let transaction = tx.transaction;
196
197 let (replaced, unlocks) = self.replace_previous(&transaction)?;
198
199 let mut goes_to_best = true;
200 let mut ready = self.ready.write();
201 let mut requires_offset = 0;
202 for tag in &transaction.requires {
204 if let Some(other) = self.provided_tags.get(tag) {
206 let tx = ready.get_mut(other).expect(HASH_READY);
207 tx.unlocks.push(hash.clone());
208 goes_to_best = false;
210 } else {
211 requires_offset += 1;
212 }
213 }
214
215 for tag in &transaction.provides {
219 self.provided_tags.insert(tag.clone(), hash.clone());
220 }
221
222 let transaction = TransactionRef { insertion_id, transaction };
223
224 if goes_to_best {
226 self.best.insert(transaction.clone());
227 }
228
229 ready.insert(hash, ReadyTx { transaction, unlocks, requires_offset });
231
232 Ok(replaced)
233 }
234
235 pub fn fold<R, F: FnMut(Option<R>, &ReadyTx<Hash, Ex>) -> Option<R>>(
237 &mut self,
238 f: F,
239 ) -> Option<R> {
240 self.ready.read().values().fold(None, f)
241 }
242
243 pub fn contains(&self, hash: &Hash) -> bool {
245 self.ready.read().contains_key(hash)
246 }
247
248 pub fn by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
250 self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
251 }
252
253 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
255 let ready = self.ready.read();
256 hashes
257 .iter()
258 .map(|hash| ready.get(hash).map(|x| x.transaction.transaction.clone()))
259 .collect()
260 }
261
262 pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
268 let to_remove = hashes.to_vec();
269 self.remove_subtree_with_tag_filter(to_remove, None)
270 }
271
272 fn remove_subtree_with_tag_filter(
278 &mut self,
279 mut to_remove: Vec<Hash>,
280 provides_tag_filter: Option<HashSet<Tag>>,
281 ) -> Vec<Arc<Transaction<Hash, Ex>>> {
282 let mut removed = vec![];
283 let mut ready = self.ready.write();
284 while let Some(hash) = to_remove.pop() {
285 if let Some(mut tx) = ready.remove(&hash) {
286 let invalidated = tx.transaction.transaction.provides.iter().filter(|tag| {
287 provides_tag_filter
288 .as_ref()
289 .map(|filter| !filter.contains(&**tag))
290 .unwrap_or(true)
291 });
292
293 let mut removed_some_tags = false;
294 for tag in invalidated {
296 removed_some_tags = true;
297 self.provided_tags.remove(tag);
298 }
299
300 for tag in &tx.transaction.transaction.requires {
302 if let Some(hash) = self.provided_tags.get(tag) {
303 if let Some(tx) = ready.get_mut(hash) {
304 remove_item(&mut tx.unlocks, hash);
305 }
306 }
307 }
308
309 self.best.remove(&tx.transaction);
311
312 if removed_some_tags {
313 to_remove.append(&mut tx.unlocks);
315 }
316
317 trace!(target: LOG_TARGET, "[{:?}] Removed as part of the subtree.", hash);
319 removed.push(tx.transaction.transaction);
320 }
321 }
322
323 removed
324 }
325
326 pub fn prune_tags(&mut self, tag: Tag) -> Vec<Arc<Transaction<Hash, Ex>>> {
332 let mut removed = vec![];
333 let mut to_remove = vec![tag];
334
335 while let Some(tag) = to_remove.pop() {
336 let res = self
337 .provided_tags
338 .remove(&tag)
339 .and_then(|hash| self.ready.write().remove(&hash));
340
341 if let Some(tx) = res {
342 let unlocks = tx.unlocks;
343
344 self.best.remove(&tx.transaction);
346
347 let tx = tx.transaction.transaction;
348
349 {
351 let hash = &tx.hash;
352 let mut ready = self.ready.write();
353 let mut find_previous = |tag| -> Option<Vec<Tag>> {
354 let prev_hash = self.provided_tags.get(tag)?;
355 let tx2 = ready.get_mut(prev_hash)?;
356 remove_item(&mut tx2.unlocks, hash);
357 if tx2.unlocks.is_empty() {
367 Some(tx2.transaction.transaction.provides.clone())
368 } else {
369 None
370 }
371 };
372
373 for tag in &tx.requires {
375 if let Some(mut tags_to_remove) = find_previous(tag) {
376 to_remove.append(&mut tags_to_remove);
377 }
378 }
379 }
380
381 for hash in unlocks {
383 if let Some(tx) = self.ready.write().get_mut(&hash) {
384 tx.requires_offset += 1;
385 if tx.requires_offset == tx.transaction.transaction.requires.len() {
387 self.best.insert(tx.transaction.clone());
388 }
389 }
390 }
391
392 let current_tag = &tag;
396 for tag in &tx.provides {
397 let removed = self.provided_tags.remove(tag);
398 assert_eq!(
399 removed.as_ref(),
400 if current_tag == tag { None } else { Some(&tx.hash) },
401 "The pool contains exactly one transaction providing given tag; the removed transaction
402 claims to provide that tag, so it has to be mapped to it's hash; qed"
403 );
404 }
405
406 removed.push(tx);
407 }
408 }
409
410 removed
411 }
412
413 fn replace_previous(
422 &mut self,
423 tx: &Transaction<Hash, Ex>,
424 ) -> error::Result<(Vec<Arc<Transaction<Hash, Ex>>>, Vec<Hash>)> {
425 let (to_remove, unlocks) = {
426 let replace_hashes = tx
428 .provides
429 .iter()
430 .filter_map(|tag| self.provided_tags.get(tag))
431 .collect::<HashSet<_>>();
432
433 if replace_hashes.is_empty() {
435 return Ok((vec![], vec![]))
436 }
437
438 let old_priority = {
440 let ready = self.ready.read();
441 replace_hashes
442 .iter()
443 .filter_map(|hash| ready.get(hash))
444 .fold(0u64, |total, tx| {
445 total.saturating_add(tx.transaction.transaction.priority)
446 })
447 };
448
449 if old_priority >= tx.priority {
451 return Err(error::Error::TooLowPriority { old: old_priority, new: tx.priority })
452 }
453
454 let unlocks = {
456 let ready = self.ready.read();
457 replace_hashes.iter().filter_map(|hash| ready.get(hash)).fold(
458 vec![],
459 |mut list, tx| {
460 list.extend(tx.unlocks.iter().cloned());
461 list
462 },
463 )
464 };
465
466 (replace_hashes.into_iter().cloned().collect::<Vec<_>>(), unlocks)
467 };
468
469 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
470 let removed = self.remove_subtree_with_tag_filter(to_remove, Some(new_provides));
471
472 Ok((removed, unlocks))
473 }
474
475 pub fn len(&self) -> usize {
477 self.ready.len()
478 }
479
480 pub fn bytes(&self) -> usize {
482 self.ready.bytes()
483 }
484}
485
486pub struct BestIterator<Hash, Ex> {
488 all: HashMap<Hash, ReadyTx<Hash, Ex>>,
489 awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
490 best: BTreeSet<TransactionRef<Hash, Ex>>,
491 invalid: HashSet<Hash>,
492}
493
494impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
495 fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
498 if satisfied >= tx_ref.transaction.requires.len() {
499 self.best.insert(tx_ref);
501 } else {
502 self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref));
504 }
505 }
506}
507
508impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
509 for BestIterator<Hash, Ex>
510{
511 fn report_invalid(&mut self, tx: &Self::Item) {
512 BestIterator::report_invalid(self, tx)
513 }
514}
515
516impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
517 pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
523 if let Some(to_report) = self.all.get(&tx.hash) {
524 debug!(
525 target: LOG_TARGET,
526 "[{:?}] Reported as invalid. Will skip sub-chains while iterating.",
527 to_report.transaction.transaction.hash
528 );
529 for hash in &to_report.unlocks {
530 self.invalid.insert(hash.clone());
531 }
532 }
533 }
534}
535
536impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
537 type Item = Arc<Transaction<Hash, Ex>>;
538
539 fn next(&mut self) -> Option<Self::Item> {
540 loop {
541 let best = self.best.iter().next_back()?.clone();
542 let best = self.best.take(&best)?;
543 let hash = &best.transaction.hash;
544
545 if self.invalid.contains(hash) {
547 debug!(
548 target: LOG_TARGET,
549 "[{:?}] Skipping invalid child transaction while iterating.", hash,
550 );
551 continue
552 }
553
554 let ready = match self.all.get(hash).cloned() {
555 Some(ready) => ready,
556 None => continue,
558 };
559
560 for hash in &ready.unlocks {
562 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
564 satisfied += 1;
565 Some((satisfied, tx_ref))
566 } else {
568 self.all
569 .get(hash)
570 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
571 };
572 if let Some((satisfied, tx_ref)) = res {
573 self.best_or_awaiting(satisfied, tx_ref)
574 }
575 }
576
577 return Some(best.transaction)
578 }
579 }
580}
581
582fn remove_item<T: PartialEq>(vec: &mut Vec<T>, item: &T) {
584 if let Some(idx) = vec.iter().position(|i| i == item) {
585 vec.swap_remove(idx);
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use sp_runtime::transaction_validity::TransactionSource as Source;
593
594 fn tx(id: u8) -> Transaction<u64, Vec<u8>> {
595 Transaction {
596 data: vec![id],
597 bytes: 1,
598 hash: id as u64,
599 priority: 1,
600 valid_till: 2,
601 requires: vec![vec![1], vec![2]],
602 provides: vec![vec![3], vec![4]],
603 propagate: true,
604 source: Source::External,
605 }
606 }
607
608 fn import<H: hash::Hash + Eq + Member + Serialize, Ex>(
609 ready: &mut ReadyTransactions<H, Ex>,
610 tx: Transaction<H, Ex>,
611 ) -> error::Result<Vec<Arc<Transaction<H, Ex>>>> {
612 let x = WaitingTransaction::new(tx, ready.provided_tags(), &[]);
613 ready.import(x)
614 }
615
616 #[test]
617 fn should_replace_transaction_that_provides_the_same_tag() {
618 let mut ready = ReadyTransactions::default();
620 let mut tx1 = tx(1);
621 tx1.requires.clear();
622 let mut tx2 = tx(2);
623 tx2.requires.clear();
624 tx2.provides = vec![vec![3]];
625 let mut tx3 = tx(3);
626 tx3.requires.clear();
627 tx3.provides = vec![vec![4]];
628
629 import(&mut ready, tx2).unwrap();
631 import(&mut ready, tx3).unwrap();
632 assert_eq!(ready.get().count(), 2);
633
634 import(&mut ready, tx1.clone()).unwrap_err();
636
637 tx1.priority = 10;
638 import(&mut ready, tx1).unwrap();
639
640 assert_eq!(ready.get().count(), 1);
642 }
643
644 #[test]
645 fn should_replace_multiple_transactions_correctly() {
646 let mut ready = ReadyTransactions::default();
648 let mut tx0 = tx(0);
649 tx0.requires = vec![];
650 tx0.provides = vec![vec![0]];
651 let mut tx1 = tx(1);
652 tx1.requires = vec![];
653 tx1.provides = vec![vec![1]];
654 let mut tx2 = tx(2);
655 tx2.requires = vec![vec![0], vec![1]];
656 tx2.provides = vec![vec![2], vec![3]];
657 let mut tx3 = tx(3);
658 tx3.requires = vec![vec![2]];
659 tx3.provides = vec![vec![4]];
660 let mut tx4 = tx(4);
661 tx4.requires = vec![vec![3]];
662 tx4.provides = vec![vec![5]];
663 let mut tx2_2 = tx(5);
665 tx2_2.requires = vec![vec![0], vec![1]];
666 tx2_2.provides = vec![vec![2]];
667 tx2_2.priority = 10;
668
669 for tx in vec![tx0, tx1, tx2, tx3, tx4] {
670 import(&mut ready, tx).unwrap();
671 }
672 assert_eq!(ready.get().count(), 5);
673
674 import(&mut ready, tx2_2).unwrap();
676
677 assert_eq!(ready.get().count(), 3);
679 }
680
681 fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
688 let mut tx1 = tx(1);
689 tx1.requires.clear();
690 let mut tx2 = tx(2);
691 tx2.requires = tx1.provides.clone();
692 tx2.provides = vec![vec![106]];
693 let mut tx3 = tx(3);
694 tx3.requires = vec![tx1.provides[0].clone(), vec![106]];
695 tx3.provides = vec![];
696 let mut tx4 = tx(4);
697 tx4.requires = vec![tx1.provides[0].clone()];
698 tx4.provides = vec![vec![107]];
699 let mut tx5 = tx(5);
700 tx5.requires = vec![tx4.provides[0].clone()];
701 tx5.provides = vec![vec![108]];
702 let mut tx6 = tx(6);
703 tx6.requires = vec![tx5.provides[0].clone()];
704 tx6.provides = vec![];
705 let tx7 = Transaction {
706 data: vec![7],
707 bytes: 1,
708 hash: 7,
709 priority: 1,
710 valid_till: u64::MAX, requires: vec![tx1.provides[0].clone()],
712 provides: vec![],
713 propagate: true,
714 source: Source::External,
715 };
716
717 for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
719 import(ready, tx).unwrap();
720 }
721
722 assert_eq!(ready.best.len(), 1);
723 }
724
725 #[test]
726 fn should_return_best_transactions_in_correct_order() {
727 let mut ready = ReadyTransactions::default();
729 populate_pool(&mut ready);
730
731 let mut it = ready.get().map(|tx| tx.data[0]);
733
734 assert_eq!(it.next(), Some(1));
736 assert_eq!(it.next(), Some(2));
737 assert_eq!(it.next(), Some(3));
738 assert_eq!(it.next(), Some(4));
739 assert_eq!(it.next(), Some(5));
740 assert_eq!(it.next(), Some(6));
741 assert_eq!(it.next(), Some(7));
742 assert_eq!(it.next(), None);
743 }
744
745 #[test]
746 fn should_order_refs() {
747 let mut id = 1;
748 let mut with_priority = |priority, longevity| {
749 id += 1;
750 let mut tx = tx(id);
751 tx.priority = priority;
752 tx.valid_till = longevity;
753 tx
754 };
755 assert!(
757 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
758 TransactionRef { transaction: Arc::new(with_priority(2, 3)), insertion_id: 2 }
759 );
760 assert!(
762 TransactionRef { transaction: Arc::new(with_priority(3, 2)), insertion_id: 1 } >
763 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
764 );
765 assert!(
767 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
768 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
769 );
770 }
771
772 #[test]
773 fn should_skip_invalid_transactions_while_iterating() {
774 let mut ready = ReadyTransactions::default();
776 populate_pool(&mut ready);
777
778 let mut it = ready.get();
780 let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];
781
782 assert_eq!(it.next().as_ref().map(data), Some(1));
784 assert_eq!(it.next().as_ref().map(data), Some(2));
785 assert_eq!(it.next().as_ref().map(data), Some(3));
786 let tx4 = it.next();
787 assert_eq!(tx4.as_ref().map(data), Some(4));
788 it.report_invalid(&tx4.unwrap());
790 assert_eq!(it.next().as_ref().map(data), Some(7));
791 assert_eq!(it.next().as_ref().map(data), None);
792 }
793}