1use std::{
20 cmp,
21 collections::{BTreeSet, HashMap, HashSet},
22 hash,
23 sync::Arc,
24};
25
26use crate::LOG_TARGET;
27use sc_transaction_pool_api::error;
28use serde::Serialize;
29use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
30use tracing::trace;
31
32use super::{
33 base_pool::Transaction,
34 future::WaitingTransaction,
35 tracked_map::{self, TrackedMap},
36};
37
38#[derive(Debug)]
42pub struct TransactionRef<Hash, Ex> {
43 pub transaction: Arc<Transaction<Hash, Ex>>,
45 pub insertion_id: u64,
47}
48
49impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
50 fn clone(&self) -> Self {
51 Self { transaction: self.transaction.clone(), insertion_id: self.insertion_id }
52 }
53}
54
55impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
56 fn cmp(&self, other: &Self) -> cmp::Ordering {
57 self.transaction
58 .priority
59 .cmp(&other.transaction.priority)
60 .then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till))
61 .then_with(|| other.insertion_id.cmp(&self.insertion_id))
62 }
63}
64
65impl<Hash, Ex> PartialOrd for TransactionRef<Hash, Ex> {
66 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
67 Some(self.cmp(other))
68 }
69}
70
71impl<Hash, Ex> PartialEq for TransactionRef<Hash, Ex> {
72 fn eq(&self, other: &Self) -> bool {
73 self.cmp(other) == cmp::Ordering::Equal
74 }
75}
76impl<Hash, Ex> Eq for TransactionRef<Hash, Ex> {}
77
78#[derive(Debug)]
79pub struct ReadyTx<Hash, Ex> {
80 pub transaction: TransactionRef<Hash, Ex>,
82 pub unlocks: Vec<Hash>,
84 pub requires_offset: usize,
89}
90
91impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
92 fn clone(&self) -> Self {
93 Self {
94 transaction: self.transaction.clone(),
95 unlocks: self.unlocks.clone(),
96 requires_offset: self.requires_offset,
97 }
98 }
99}
100
101const HASH_READY: &str = r#"
102Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
103Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
104Hence every hash retrieved from `provided_tags` is always present in `ready`;
105qed
106"#;
107
108#[derive(Clone, Debug)]
110pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
111 insertion_id: u64,
113 provided_tags: HashMap<Tag, Hash>,
116 ready: TrackedMap<Hash, ReadyTx<Hash, Ex>>,
118 best: BTreeSet<TransactionRef<Hash, Ex>>,
121}
122
123impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {
124 fn size(&self) -> usize {
125 self.transaction.transaction.bytes
126 }
127}
128
129impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
130 fn default() -> Self {
131 Self {
132 insertion_id: Default::default(),
133 provided_tags: Default::default(),
134 ready: Default::default(),
135 best: Default::default(),
136 }
137 }
138}
139
140impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
141 pub fn provided_tags(&self) -> &HashMap<Tag, Hash> {
143 &self.provided_tags
144 }
145
146 pub fn get(&self) -> BestIterator<Hash, Ex> {
165 BestIterator {
166 all: self.ready.clone_map(),
167 best: self.best.clone(),
168 awaiting: Default::default(),
169 invalid: Default::default(),
170 }
171 }
172
173 pub fn import(
179 &mut self,
180 tx: WaitingTransaction<Hash, Ex>,
181 ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
182 assert!(
183 tx.is_ready(),
184 "Only ready transactions can be imported. Missing: {:?}",
185 tx.missing_tags
186 );
187 assert!(
188 !self.ready.read().contains_key(&tx.transaction.hash),
189 "Transaction is already imported."
190 );
191
192 self.insertion_id += 1;
193 let insertion_id = self.insertion_id;
194 let hash = tx.transaction.hash.clone();
195 let transaction = tx.transaction;
196
197 let (replaced, unlocks) = self.replace_previous(&transaction)?;
198
199 let mut goes_to_best = true;
200 let mut ready = self.ready.write();
201 let mut requires_offset = 0;
202 for tag in &transaction.requires {
204 if let Some(other) = self.provided_tags.get(tag) {
206 let tx = ready.get_mut(other).expect(HASH_READY);
207 tx.unlocks.push(hash.clone());
208 goes_to_best = false;
210 } else {
211 requires_offset += 1;
212 }
213 }
214
215 for tag in &transaction.provides {
219 self.provided_tags.insert(tag.clone(), hash.clone());
220 }
221
222 let transaction = TransactionRef { insertion_id, transaction };
223
224 if goes_to_best {
226 self.best.insert(transaction.clone());
227 }
228
229 ready.insert(hash, ReadyTx { transaction, unlocks, requires_offset });
231
232 Ok(replaced)
233 }
234
235 pub fn fold<R, F: FnMut(R, &ReadyTx<Hash, Ex>) -> R>(&self, init: R, f: F) -> R {
238 self.ready.read().values().fold(init, f)
239 }
240
241 pub fn contains(&self, hash: &Hash) -> bool {
243 self.ready.read().contains_key(hash)
244 }
245
246 pub fn by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
248 self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
249 }
250
251 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
253 let ready = self.ready.read();
254 hashes
255 .iter()
256 .map(|hash| ready.get(hash).map(|x| x.transaction.transaction.clone()))
257 .collect()
258 }
259
260 pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
266 let to_remove = hashes.to_vec();
267 self.remove_subtree_with_tag_filter(to_remove, None)
268 }
269
270 fn remove_subtree_with_tag_filter(
276 &mut self,
277 mut to_remove: Vec<Hash>,
278 provides_tag_filter: Option<HashSet<Tag>>,
279 ) -> Vec<Arc<Transaction<Hash, Ex>>> {
280 let mut removed = vec![];
281 let mut ready = self.ready.write();
282 while let Some(tx_hash) = to_remove.pop() {
283 if let Some(mut tx) = ready.remove(&tx_hash) {
284 let invalidated = tx.transaction.transaction.provides.iter().filter(|tag| {
285 provides_tag_filter
286 .as_ref()
287 .map(|filter| !filter.contains(&**tag))
288 .unwrap_or(true)
289 });
290
291 let mut removed_some_tags = false;
292 for tag in invalidated {
294 removed_some_tags = true;
295 self.provided_tags.remove(tag);
296 }
297
298 for tag in &tx.transaction.transaction.requires {
300 if let Some(hash) = self.provided_tags.get(tag) {
301 if let Some(tx_unlocking) = ready.get_mut(hash) {
302 remove_item(&mut tx_unlocking.unlocks, &tx_hash);
303 }
304 }
305 }
306
307 self.best.remove(&tx.transaction);
309
310 if removed_some_tags {
311 to_remove.append(&mut tx.unlocks);
313 }
314
315 trace!(target: LOG_TARGET, ?tx_hash, "Removed as part of the subtree.");
317 removed.push(tx.transaction.transaction);
318 }
319 }
320
321 removed
322 }
323
324 pub fn prune_tags(&mut self, tag: Tag) -> Vec<Arc<Transaction<Hash, Ex>>> {
330 let mut removed = vec![];
331 let mut to_remove = vec![tag];
332
333 while let Some(tag) = to_remove.pop() {
334 let res = self
335 .provided_tags
336 .remove(&tag)
337 .and_then(|hash| self.ready.write().remove(&hash));
338
339 if let Some(tx) = res {
340 let unlocks = tx.unlocks;
341
342 self.best.remove(&tx.transaction);
344
345 let tx = tx.transaction.transaction;
346
347 {
349 let hash = &tx.hash;
350 let mut ready = self.ready.write();
351 let mut find_previous = |tag| -> Option<Vec<Tag>> {
352 let prev_hash = self.provided_tags.get(tag)?;
353 let tx2 = ready.get_mut(prev_hash)?;
354 remove_item(&mut tx2.unlocks, hash);
355 if tx2.unlocks.is_empty() {
365 Some(tx2.transaction.transaction.provides.clone())
366 } else {
367 None
368 }
369 };
370
371 for tag in &tx.requires {
373 if let Some(mut tags_to_remove) = find_previous(tag) {
374 to_remove.append(&mut tags_to_remove);
375 }
376 }
377 }
378
379 for hash in unlocks {
381 if let Some(tx) = self.ready.write().get_mut(&hash) {
382 tx.requires_offset += 1;
383 if tx.requires_offset == tx.transaction.transaction.requires.len() {
385 self.best.insert(tx.transaction.clone());
386 }
387 }
388 }
389
390 let current_tag = &tag;
394 for tag in &tx.provides {
395 let removed = self.provided_tags.remove(tag);
396 assert_eq!(
397 removed.as_ref(),
398 if current_tag == tag { None } else { Some(&tx.hash) },
399 "The pool contains exactly one transaction providing given tag; the removed transaction
400 claims to provide that tag, so it has to be mapped to it's hash; qed"
401 );
402 }
403
404 removed.push(tx);
405 }
406 }
407
408 removed
409 }
410
411 fn replace_previous(
420 &mut self,
421 tx: &Transaction<Hash, Ex>,
422 ) -> error::Result<(Vec<Arc<Transaction<Hash, Ex>>>, Vec<Hash>)> {
423 let (to_remove, unlocks) = {
424 let replace_hashes = tx
426 .provides
427 .iter()
428 .filter_map(|tag| self.provided_tags.get(tag))
429 .collect::<HashSet<_>>();
430
431 if replace_hashes.is_empty() {
433 return Ok((vec![], vec![]))
434 }
435
436 let old_priority = {
438 let ready = self.ready.read();
439 replace_hashes
440 .iter()
441 .filter_map(|hash| ready.get(hash))
442 .fold(0u64, |total, tx| {
443 total.saturating_add(tx.transaction.transaction.priority)
444 })
445 };
446
447 if old_priority >= tx.priority {
449 return Err(error::Error::TooLowPriority { old: old_priority, new: tx.priority })
450 }
451
452 let unlocks = {
454 let ready = self.ready.read();
455 replace_hashes.iter().filter_map(|hash| ready.get(hash)).fold(
456 vec![],
457 |mut list, tx| {
458 list.extend(tx.unlocks.iter().cloned());
459 list
460 },
461 )
462 };
463
464 (replace_hashes.into_iter().cloned().collect::<Vec<_>>(), unlocks)
465 };
466
467 let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
468 let removed = self.remove_subtree_with_tag_filter(to_remove, Some(new_provides));
469
470 Ok((removed, unlocks))
471 }
472
473 pub fn len(&self) -> usize {
475 self.ready.len()
476 }
477
478 pub fn bytes(&self) -> usize {
480 self.ready.bytes()
481 }
482}
483
484pub struct BestIterator<Hash, Ex> {
486 all: HashMap<Hash, ReadyTx<Hash, Ex>>,
487 awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
488 best: BTreeSet<TransactionRef<Hash, Ex>>,
489 invalid: HashSet<Hash>,
490}
491
492impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
493 fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
496 if satisfied >= tx_ref.transaction.requires.len() {
497 self.best.insert(tx_ref);
499 } else {
500 self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref));
502 }
503 }
504}
505
506impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
507 for BestIterator<Hash, Ex>
508{
509 fn report_invalid(&mut self, tx: &Self::Item) {
510 BestIterator::report_invalid(self, tx)
511 }
512}
513
514impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
515 pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
521 if let Some(to_report) = self.all.get(&tx.hash) {
522 trace!(
523 target: LOG_TARGET,
524 tx_hash = ?to_report.transaction.transaction.hash,
525 "best-iterator: Reported as invalid. Will skip sub-chains while iterating."
526 );
527 for hash in &to_report.unlocks {
528 self.invalid.insert(hash.clone());
529 }
530 }
531 }
532}
533
534impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
535 type Item = Arc<Transaction<Hash, Ex>>;
536
537 fn next(&mut self) -> Option<Self::Item> {
538 loop {
539 let best = self.best.iter().next_back()?.clone();
540 let best = self.best.take(&best)?;
541 let tx_hash = &best.transaction.hash;
542
543 if self.invalid.contains(tx_hash) {
545 trace!(
546 target: LOG_TARGET,
547 ?tx_hash,
548 "Skipping invalid child transaction while iterating."
549 );
550 continue
551 }
552
553 let ready = match self.all.get(tx_hash).cloned() {
554 Some(ready) => ready,
555 None => continue,
557 };
558
559 for hash in &ready.unlocks {
561 let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
563 satisfied += 1;
564 Some((satisfied, tx_ref))
565 } else {
567 self.all
568 .get(hash)
569 .map(|next| (next.requires_offset + 1, next.transaction.clone()))
570 };
571 if let Some((satisfied, tx_ref)) = res {
572 self.best_or_awaiting(satisfied, tx_ref)
573 }
574 }
575
576 return Some(best.transaction)
577 }
578 }
579}
580
581fn remove_item<T: PartialEq>(vec: &mut Vec<T>, item: &T) {
583 if let Some(idx) = vec.iter().position(|i| i == item) {
584 vec.swap_remove(idx);
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591
592 fn tx(id: u8) -> Transaction<u64, Vec<u8>> {
593 Transaction {
594 data: vec![id],
595 bytes: 1,
596 hash: id as u64,
597 priority: 1,
598 valid_till: 2,
599 requires: vec![vec![1], vec![2]],
600 provides: vec![vec![3], vec![4]],
601 propagate: true,
602 source: crate::TimedTransactionSource::new_external(false),
603 }
604 }
605
606 fn import<H: hash::Hash + Eq + Member + Serialize, Ex>(
607 ready: &mut ReadyTransactions<H, Ex>,
608 tx: Transaction<H, Ex>,
609 ) -> error::Result<Vec<Arc<Transaction<H, Ex>>>> {
610 let x = WaitingTransaction::new(tx, ready.provided_tags(), &[]);
611 ready.import(x)
612 }
613
614 #[test]
615 fn should_replace_transaction_that_provides_the_same_tag() {
616 let mut ready = ReadyTransactions::default();
618 let mut tx1 = tx(1);
619 tx1.requires.clear();
620 let mut tx2 = tx(2);
621 tx2.requires.clear();
622 tx2.provides = vec![vec![3]];
623 let mut tx3 = tx(3);
624 tx3.requires.clear();
625 tx3.provides = vec![vec![4]];
626
627 import(&mut ready, tx2).unwrap();
629 import(&mut ready, tx3).unwrap();
630 assert_eq!(ready.get().count(), 2);
631
632 import(&mut ready, tx1.clone()).unwrap_err();
634
635 tx1.priority = 10;
636 import(&mut ready, tx1).unwrap();
637
638 assert_eq!(ready.get().count(), 1);
640 }
641
642 #[test]
643 fn should_replace_multiple_transactions_correctly() {
644 let mut ready = ReadyTransactions::default();
646 let mut tx0 = tx(0);
647 tx0.requires = vec![];
648 tx0.provides = vec![vec![0]];
649 let mut tx1 = tx(1);
650 tx1.requires = vec![];
651 tx1.provides = vec![vec![1]];
652 let mut tx2 = tx(2);
653 tx2.requires = vec![vec![0], vec![1]];
654 tx2.provides = vec![vec![2], vec![3]];
655 let mut tx3 = tx(3);
656 tx3.requires = vec![vec![2]];
657 tx3.provides = vec![vec![4]];
658 let mut tx4 = tx(4);
659 tx4.requires = vec![vec![3]];
660 tx4.provides = vec![vec![5]];
661 let mut tx2_2 = tx(5);
663 tx2_2.requires = vec![vec![0], vec![1]];
664 tx2_2.provides = vec![vec![2]];
665 tx2_2.priority = 10;
666
667 for tx in vec![tx0, tx1, tx2, tx3, tx4] {
668 import(&mut ready, tx).unwrap();
669 }
670 assert_eq!(ready.get().count(), 5);
671
672 import(&mut ready, tx2_2).unwrap();
674
675 assert_eq!(ready.get().count(), 3);
677 }
678
679 fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
686 let mut tx1 = tx(1);
687 tx1.requires.clear();
688 let mut tx2 = tx(2);
689 tx2.requires = tx1.provides.clone();
690 tx2.provides = vec![vec![106]];
691 let mut tx3 = tx(3);
692 tx3.requires = vec![tx1.provides[0].clone(), vec![106]];
693 tx3.provides = vec![];
694 let mut tx4 = tx(4);
695 tx4.requires = vec![tx1.provides[0].clone()];
696 tx4.provides = vec![vec![107]];
697 let mut tx5 = tx(5);
698 tx5.requires = vec![tx4.provides[0].clone()];
699 tx5.provides = vec![vec![108]];
700 let mut tx6 = tx(6);
701 tx6.requires = vec![tx5.provides[0].clone()];
702 tx6.provides = vec![];
703 let tx7 = Transaction {
704 data: vec![7].into(),
705 bytes: 1,
706 hash: 7,
707 priority: 1,
708 valid_till: u64::MAX, requires: vec![tx1.provides[0].clone()],
710 provides: vec![],
711 propagate: true,
712 source: crate::TimedTransactionSource::new_external(false),
713 };
714
715 for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
717 import(ready, tx).unwrap();
718 }
719
720 assert_eq!(ready.best.len(), 1);
721 }
722
723 #[test]
724 fn should_return_best_transactions_in_correct_order() {
725 let mut ready = ReadyTransactions::default();
727 populate_pool(&mut ready);
728
729 let mut it = ready.get().map(|tx| tx.data[0]);
731
732 assert_eq!(it.next(), Some(1));
734 assert_eq!(it.next(), Some(2));
735 assert_eq!(it.next(), Some(3));
736 assert_eq!(it.next(), Some(4));
737 assert_eq!(it.next(), Some(5));
738 assert_eq!(it.next(), Some(6));
739 assert_eq!(it.next(), Some(7));
740 assert_eq!(it.next(), None);
741 }
742
743 #[test]
744 fn should_order_refs() {
745 let mut id = 1;
746 let mut with_priority = |priority, longevity| {
747 id += 1;
748 let mut tx = tx(id);
749 tx.priority = priority;
750 tx.valid_till = longevity;
751 tx
752 };
753 assert!(
755 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
756 TransactionRef { transaction: Arc::new(with_priority(2, 3)), insertion_id: 2 }
757 );
758 assert!(
760 TransactionRef { transaction: Arc::new(with_priority(3, 2)), insertion_id: 1 } >
761 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
762 );
763 assert!(
765 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
766 TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
767 );
768 }
769
770 #[test]
771 fn should_skip_invalid_transactions_while_iterating() {
772 let mut ready = ReadyTransactions::default();
774 populate_pool(&mut ready);
775
776 let mut it = ready.get();
778 let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];
779
780 assert_eq!(it.next().as_ref().map(data), Some(1));
782 assert_eq!(it.next().as_ref().map(data), Some(2));
783 assert_eq!(it.next().as_ref().map(data), Some(3));
784 let tx4 = it.next();
785 assert_eq!(tx4.as_ref().map(data), Some(4));
786 it.report_invalid(&tx4.unwrap());
788 assert_eq!(it.next().as_ref().map(data), Some(7));
789 assert_eq!(it.next().as_ref().map(data), None);
790 }
791
792 #[test]
793 fn should_remove_tx_from_unlocks_set_of_its_parent() {
794 let mut ready = ReadyTransactions::default();
796 populate_pool(&mut ready);
797
798 let mut it = ready.get();
800 let tx1 = it.next().unwrap();
801 let tx2 = it.next().unwrap();
802 let tx3 = it.next().unwrap();
803 let tx4 = it.next().unwrap();
804 let lock = ready.ready.read();
805 let tx1_unlocks = &lock.get(&tx1.hash).unwrap().unlocks;
806
807 assert_eq!(tx1_unlocks[0], tx2.hash);
809 assert_eq!(tx1_unlocks[1], tx2.hash);
810 assert_eq!(tx1_unlocks[2], tx3.hash);
811 assert_eq!(tx1_unlocks[4], tx4.hash);
812 drop(lock);
813
814 let removed = ready.remove_subtree(&[tx2.hash]);
816 assert_eq!(removed.len(), 2);
817 assert_eq!(removed[0].hash, tx2.hash);
818 assert_eq!(removed[1].hash, tx3.hash);
820
821 let lock = ready.ready.read();
822 let tx1_unlocks = &lock.get(&tx1.hash).unwrap().unlocks;
823 assert!(!tx1_unlocks.contains(&tx2.hash));
824 assert!(!tx1_unlocks.contains(&tx3.hash));
825 assert!(tx1_unlocks.contains(&tx4.hash));
826 }
827}