1use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
24
25use crate::LOG_TARGET;
26use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
27use serde::Serialize;
28use sp_core::hexdisplay::HexDisplay;
29use sp_runtime::{
30 traits::Member,
31 transaction_validity::{
32 TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
33 TransactionTag as Tag,
34 },
35};
36use tracing::{trace, warn};
37
38use super::{
39 future::{FutureTransactions, WaitingTransaction},
40 ready::{BestIterator, ReadyTransactions, TransactionRef},
41};
42
43#[derive(Debug, PartialEq, Eq)]
45pub enum Imported<Hash, Ex> {
46 Ready {
48 hash: Hash,
50 promoted: Vec<Hash>,
52 failed: Vec<Hash>,
54 removed: Vec<Arc<Transaction<Hash, Ex>>>,
56 },
57 Future {
59 hash: Hash,
61 },
62}
63
64impl<Hash, Ex> Imported<Hash, Ex> {
65 pub fn hash(&self) -> &Hash {
67 use self::Imported::*;
68 match *self {
69 Ready { ref hash, .. } => hash,
70 Future { ref hash, .. } => hash,
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct PruneStatus<Hash, Ex> {
78 pub promoted: Vec<Imported<Hash, Ex>>,
80 pub failed: Vec<Hash>,
82 pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct TimedTransactionSource {
89 pub source: TransactionSource,
91
92 pub timestamp: Option<Instant>,
94}
95
96impl From<TimedTransactionSource> for TransactionSource {
97 fn from(value: TimedTransactionSource) -> Self {
98 value.source
99 }
100}
101
102impl TimedTransactionSource {
103 pub fn new_in_block(with_timestamp: bool) -> Self {
106 Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
107 }
108 pub fn new_external(with_timestamp: bool) -> Self {
111 Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
112 }
113 pub fn new_local(with_timestamp: bool) -> Self {
116 Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
117 }
118 pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
120 Self { source, timestamp: with_timestamp.then(Instant::now) }
121 }
122}
123
124#[derive(PartialEq, Eq, Clone)]
126pub struct Transaction<Hash, Extrinsic> {
127 pub data: Extrinsic,
129 pub bytes: usize,
131 pub hash: Hash,
133 pub priority: Priority,
135 pub valid_till: Longevity,
137 pub requires: Vec<Tag>,
139 pub provides: Vec<Tag>,
141 pub propagate: bool,
143 pub source: TimedTransactionSource,
145}
146
147impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
148 fn as_ref(&self) -> &Extrinsic {
149 &self.data
150 }
151}
152
153impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
154 type Transaction = Extrinsic;
155 type Hash = Hash;
156
157 fn data(&self) -> &Extrinsic {
158 &self.data
159 }
160
161 fn hash(&self) -> &Hash {
162 &self.hash
163 }
164
165 fn priority(&self) -> &Priority {
166 &self.priority
167 }
168
169 fn longevity(&self) -> &Longevity {
170 &self.valid_till
171 }
172
173 fn requires(&self) -> &[Tag] {
174 &self.requires
175 }
176
177 fn provides(&self) -> &[Tag] {
178 &self.provides
179 }
180
181 fn is_propagable(&self) -> bool {
182 self.propagate
183 }
184}
185
186impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
187 pub fn duplicate(&self) -> Self {
193 Self {
194 data: self.data.clone(),
195 bytes: self.bytes,
196 hash: self.hash.clone(),
197 priority: self.priority,
198 source: self.source.clone(),
199 valid_till: self.valid_till,
200 requires: self.requires.clone(),
201 provides: self.provides.clone(),
202 propagate: self.propagate,
203 }
204 }
205}
206
207impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
208where
209 Hash: fmt::Debug,
210 Extrinsic: fmt::Debug,
211{
212 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
213 let join_tags = |tags: &[Tag]| {
214 tags.iter()
215 .map(|tag| HexDisplay::from(tag).to_string())
216 .collect::<Vec<_>>()
217 .join(", ")
218 };
219
220 write!(fmt, "Transaction {{ ")?;
221 write!(fmt, "hash: {:?}, ", &self.hash)?;
222 write!(fmt, "priority: {:?}, ", &self.priority)?;
223 write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
224 write!(fmt, "bytes: {:?}, ", &self.bytes)?;
225 write!(fmt, "propagate: {:?}, ", &self.propagate)?;
226 write!(fmt, "source: {:?}, ", &self.source)?;
227 write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
228 write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
229 write!(fmt, "data: {:?}", &self.data)?;
230 write!(fmt, "}}")?;
231 Ok(())
232 }
233}
234
235const RECENTLY_PRUNED_TAGS: usize = 2;
237
238#[derive(Clone, Debug)]
249pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
250 reject_future_transactions: bool,
251 future: FutureTransactions<Hash, Ex>,
252 ready: ReadyTransactions<Hash, Ex>,
253 recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
258 recently_pruned_index: usize,
259}
260
261impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
262 fn default() -> Self {
263 Self::new(false)
264 }
265}
266
267impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
268 pub fn new(reject_future_transactions: bool) -> Self {
270 Self {
271 reject_future_transactions,
272 future: Default::default(),
273 ready: Default::default(),
274 recently_pruned: Default::default(),
275 recently_pruned_index: 0,
276 }
277 }
278
279 pub fn clear_recently_pruned(&mut self) {
281 self.recently_pruned = Default::default();
282 self.recently_pruned_index = 0;
283 }
284
285 pub(crate) fn with_futures_enabled<T>(
291 &mut self,
292 closure: impl FnOnce(&mut Self, bool) -> T,
293 ) -> T {
294 let previous = self.reject_future_transactions;
295 self.reject_future_transactions = false;
296 let return_value = closure(self, previous);
297 self.reject_future_transactions = previous;
298 return_value
299 }
300
301 pub fn is_imported(&self, tx_hash: &Hash) -> bool {
303 self.future.contains(tx_hash) || self.ready.contains(tx_hash)
304 }
305
306 pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
314 if self.is_imported(&tx.hash) {
315 return Err(error::Error::AlreadyImported(Box::new(tx.hash)))
316 }
317
318 let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
319 trace!(
320 target: LOG_TARGET,
321 tx_hash = ?tx.transaction.hash,
322 ?tx,
323 set = if tx.is_ready() { "ready" } else { "future" },
324 "Importing transaction"
325 );
326
327 if !tx.is_ready() {
329 if self.reject_future_transactions {
330 return Err(error::Error::RejectedFutureTransaction)
331 }
332
333 let hash = tx.transaction.hash.clone();
334 self.future.import(tx);
335 return Ok(Imported::Future { hash })
336 }
337
338 self.import_to_ready(tx)
339 }
340
341 fn import_to_ready(
345 &mut self,
346 tx: WaitingTransaction<Hash, Ex>,
347 ) -> error::Result<Imported<Hash, Ex>> {
348 let tx_hash = tx.transaction.hash.clone();
349 let mut promoted = vec![];
350 let mut failed = vec![];
351 let mut removed = vec![];
352
353 let mut first = true;
354 let mut to_import = vec![tx];
355
356 while let Some(tx) = to_import.pop() {
358 to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
360
361 let current_hash = tx.transaction.hash.clone();
363 let current_tx = tx.transaction.clone();
364 match self.ready.import(tx) {
365 Ok(mut replaced) => {
366 if !first {
367 promoted.push(current_hash.clone());
368 }
369 promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
372 removed.append(&mut replaced);
375 },
376 Err(error @ error::Error::TooLowPriority { .. }) => {
377 trace!(
378 target: LOG_TARGET,
379 tx_hash = ?current_tx.hash,
380 ?first,
381 %error,
382 "Error importing transaction"
383 );
384 if first {
385 return Err(error)
386 } else {
387 removed.push(current_tx);
388 promoted.retain(|hash| *hash != current_hash);
389 }
390 },
391 Err(error) => {
393 trace!(
394 target: LOG_TARGET,
395 tx_hash = ?current_tx.hash,
396 ?error,
397 first,
398 "Error importing transaction"
399 );
400 if first {
401 return Err(error)
402 } else {
403 failed.push(current_tx.hash.clone());
404 }
405 },
406 }
407 first = false;
408 }
409
410 if removed.iter().any(|tx| tx.hash == tx_hash) {
416 self.ready.remove_subtree(&promoted);
419
420 trace!(
421 target: LOG_TARGET,
422 ?tx_hash,
423 "Cycle detected, bailing."
424 );
425 return Err(error::Error::CycleDetected)
426 }
427
428 Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
429 }
430
431 pub fn ready(&self) -> BestIterator<Hash, Ex> {
433 self.ready.get()
434 }
435
436 pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
438 self.future.all()
439 }
440
441 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
446 let ready = self.ready.by_hashes(hashes);
447 let future = self.future.by_hashes(hashes);
448
449 ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
450 }
451
452 pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
454 self.ready.by_hash(hash)
455 }
456
457 pub fn enforce_limits(
464 &mut self,
465 ready: &Limit,
466 future: &Limit,
467 ) -> Vec<Arc<Transaction<Hash, Ex>>> {
468 let mut removed = vec![];
469
470 while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
471 let worst =
473 self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
474 let transaction = ¤t.transaction;
475 worst
476 .map(|worst| {
477 match worst.transaction.priority.cmp(&transaction.transaction.priority)
482 {
483 Ordering::Less => worst,
484 Ordering::Equal =>
485 if worst.insertion_id > transaction.insertion_id {
486 transaction.clone()
487 } else {
488 worst
489 },
490 Ordering::Greater => transaction.clone(),
491 }
492 })
493 .or_else(|| Some(transaction.clone()))
494 });
495
496 if let Some(worst) = worst {
497 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
498 } else {
499 break
500 }
501 }
502
503 while future.is_exceeded(self.future.len(), self.future.bytes()) {
504 let worst = self.future.fold(|worst, current| match worst {
506 None => Some(current.clone()),
507 Some(worst) => Some(
508 match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
509 {
510 (Some(worst_timestamp), Some(current_timestamp)) => {
511 if worst_timestamp > current_timestamp {
512 current.clone()
513 } else {
514 worst
515 }
516 },
517 _ =>
518 if worst.imported_at > current.imported_at {
519 current.clone()
520 } else {
521 worst
522 },
523 },
524 ),
525 });
526
527 if let Some(worst) = worst {
528 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
529 } else {
530 break
531 }
532 }
533
534 removed
535 }
536
537 pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
546 let mut removed = self.ready.remove_subtree(hashes);
547 removed.extend(self.future.remove(hashes));
548 removed
549 }
550
551 pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
553 self.future.clear()
554 }
555
556 pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
563 let mut to_import = vec![];
564 let mut pruned = vec![];
565 let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
566 self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
567 recently_pruned.clear();
568
569 let tags = tags.into_iter().collect::<Vec<_>>();
570 let futures_removed = self.future.prune_tags(&tags);
571
572 for tag in tags {
573 to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
575 pruned.append(&mut self.ready.prune_tags(tag.clone()));
577 recently_pruned.insert(tag);
579 }
580
581 let mut promoted = vec![];
582 let mut failed = vec![];
583 for tx in futures_removed {
584 failed.push(tx.hash.clone());
585 }
586
587 for tx in to_import {
588 let tx_hash = tx.transaction.hash.clone();
589 match self.import_to_ready(tx) {
590 Ok(res) => promoted.push(res),
591 Err(error) => {
592 warn!(
593 target: LOG_TARGET,
594 ?tx_hash,
595 ?error,
596 "Failed to promote during pruning."
597 );
598 failed.push(tx_hash)
599 },
600 }
601 }
602
603 PruneStatus { pruned, failed, promoted }
604 }
605
606 pub fn status(&self) -> PoolStatus {
608 PoolStatus {
609 ready: self.ready.len(),
610 ready_bytes: self.ready.bytes(),
611 future: self.future.len(),
612 future_bytes: self.future.bytes(),
613 }
614 }
615}
616
617#[derive(Debug, Clone)]
619pub struct Limit {
620 pub count: usize,
622 pub total_bytes: usize,
624}
625
626impl Limit {
627 pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
629 self.count < count || self.total_bytes < bytes
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636
637 type Hash = u64;
638
639 fn pool() -> BasePool<Hash, Vec<u8>> {
640 BasePool::default()
641 }
642
643 fn default_tx() -> Transaction<Hash, Vec<u8>> {
644 Transaction {
645 data: vec![],
646 bytes: 1,
647 hash: 1u64,
648 priority: 5u64,
649 valid_till: 64u64,
650 requires: vec![],
651 provides: vec![],
652 propagate: true,
653 source: TimedTransactionSource::new_external(false),
654 }
655 }
656
657 #[test]
658 fn prune_for_ready_works() {
659 let mut pool = pool();
661
662 pool.import(Transaction {
664 data: vec![1u8].into(),
665 provides: vec![vec![2]],
666 ..default_tx().clone()
667 })
668 .unwrap();
669
670 assert_eq!(pool.ready().count(), 1);
672 assert_eq!(pool.ready.len(), 1);
673
674 let result = pool.prune_tags(vec![vec![2]]);
675 assert_eq!(pool.ready().count(), 0);
676 assert_eq!(pool.ready.len(), 0);
677 assert_eq!(result.pruned.len(), 1);
678 assert_eq!(result.failed.len(), 0);
679 assert_eq!(result.promoted.len(), 0);
680 }
681
682 #[test]
683 fn prune_for_future_works() {
684 let mut pool = pool();
686
687 pool.import(Transaction {
689 data: vec![1u8].into(),
690 requires: vec![vec![1]],
691 provides: vec![vec![2]],
692 hash: 0xaa,
693 ..default_tx().clone()
694 })
695 .unwrap();
696
697 assert_eq!(pool.futures().count(), 1);
699 assert_eq!(pool.future.len(), 1);
700
701 let result = pool.prune_tags(vec![vec![2]]);
702 assert_eq!(pool.ready().count(), 0);
703 assert_eq!(pool.ready.len(), 0);
704 assert_eq!(pool.futures().count(), 0);
705 assert_eq!(pool.future.len(), 0);
706
707 assert_eq!(result.pruned.len(), 0);
708 assert_eq!(result.failed.len(), 1);
709 assert_eq!(result.failed[0], 0xaa);
710 assert_eq!(result.promoted.len(), 0);
711 }
712
713 #[test]
714 fn should_import_transaction_to_ready() {
715 let mut pool = pool();
717
718 pool.import(Transaction {
720 data: vec![1u8].into(),
721 provides: vec![vec![1]],
722 ..default_tx().clone()
723 })
724 .unwrap();
725
726 assert_eq!(pool.ready().count(), 1);
728 assert_eq!(pool.ready.len(), 1);
729 }
730
731 #[test]
732 fn should_not_import_same_transaction_twice() {
733 let mut pool = pool();
735
736 pool.import(Transaction {
738 data: vec![1u8].into(),
739 provides: vec![vec![1]],
740 ..default_tx().clone()
741 })
742 .unwrap();
743 pool.import(Transaction {
744 data: vec![1u8].into(),
745 provides: vec![vec![1]],
746 ..default_tx().clone()
747 })
748 .unwrap_err();
749
750 assert_eq!(pool.ready().count(), 1);
752 assert_eq!(pool.ready.len(), 1);
753 }
754
755 #[test]
756 fn should_import_transaction_to_future_and_promote_it_later() {
757 let mut pool = pool();
759
760 pool.import(Transaction {
762 data: vec![1u8].into(),
763 requires: vec![vec![0]],
764 provides: vec![vec![1]],
765 ..default_tx().clone()
766 })
767 .unwrap();
768 assert_eq!(pool.ready().count(), 0);
769 assert_eq!(pool.ready.len(), 0);
770 pool.import(Transaction {
771 data: vec![2u8].into(),
772 hash: 2,
773 provides: vec![vec![0]],
774 ..default_tx().clone()
775 })
776 .unwrap();
777
778 assert_eq!(pool.ready().count(), 2);
780 assert_eq!(pool.ready.len(), 2);
781 }
782
783 #[test]
784 fn should_promote_a_subgraph() {
785 let mut pool = pool();
787
788 pool.import(Transaction {
790 data: vec![1u8].into(),
791 requires: vec![vec![0]],
792 provides: vec![vec![1]],
793 ..default_tx().clone()
794 })
795 .unwrap();
796 pool.import(Transaction {
797 data: vec![3u8].into(),
798 hash: 3,
799 requires: vec![vec![2]],
800 ..default_tx().clone()
801 })
802 .unwrap();
803 pool.import(Transaction {
804 data: vec![2u8].into(),
805 hash: 2,
806 requires: vec![vec![1]],
807 provides: vec![vec![3], vec![2]],
808 ..default_tx().clone()
809 })
810 .unwrap();
811 pool.import(Transaction {
812 data: vec![4u8].into(),
813 hash: 4,
814 priority: 1_000u64,
815 requires: vec![vec![3], vec![4]],
816 ..default_tx().clone()
817 })
818 .unwrap();
819 assert_eq!(pool.ready().count(), 0);
820 assert_eq!(pool.ready.len(), 0);
821
822 let res = pool
823 .import(Transaction {
824 data: vec![5u8].into(),
825 hash: 5,
826 provides: vec![vec![0], vec![4]],
827 ..default_tx().clone()
828 })
829 .unwrap();
830
831 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
833
834 assert_eq!(it.next(), Some(5));
835 assert_eq!(it.next(), Some(1));
836 assert_eq!(it.next(), Some(2));
837 assert_eq!(it.next(), Some(4));
838 assert_eq!(it.next(), Some(3));
839 assert_eq!(it.next(), None);
840 assert_eq!(
841 res,
842 Imported::Ready {
843 hash: 5,
844 promoted: vec![1, 2, 3, 4],
845 failed: vec![],
846 removed: vec![],
847 }
848 );
849 }
850
851 #[test]
852 fn should_remove_conflicting_future() {
853 let mut pool = pool();
854 pool.import(Transaction {
855 data: vec![3u8].into(),
856 hash: 3,
857 requires: vec![vec![1]],
858 priority: 50u64,
859 provides: vec![vec![3]],
860 ..default_tx().clone()
861 })
862 .unwrap();
863 assert_eq!(pool.ready().count(), 0);
864 assert_eq!(pool.ready.len(), 0);
865
866 let tx2 = Transaction {
867 data: vec![2u8].into(),
868 hash: 2,
869 requires: vec![vec![1]],
870 provides: vec![vec![3]],
871 ..default_tx().clone()
872 };
873 pool.import(tx2.clone()).unwrap();
874 assert_eq!(pool.future.len(), 2);
875
876 let res = pool
877 .import(Transaction {
878 data: vec![1u8].into(),
879 hash: 1,
880 provides: vec![vec![1]],
881 ..default_tx().clone()
882 })
883 .unwrap();
884
885 assert_eq!(
886 res,
887 Imported::Ready {
888 hash: 1,
889 promoted: vec![3],
890 failed: vec![],
891 removed: vec![tx2.into()]
892 }
893 );
894
895 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
896 assert_eq!(it.next(), Some(1));
897 assert_eq!(it.next(), Some(3));
898 assert_eq!(it.next(), None);
899
900 assert_eq!(pool.future.len(), 0);
901 }
902
903 #[test]
904 fn should_handle_a_cycle() {
905 let mut pool = pool();
907 pool.import(Transaction {
908 data: vec![1u8].into(),
909 requires: vec![vec![0]],
910 provides: vec![vec![1]],
911 ..default_tx().clone()
912 })
913 .unwrap();
914 pool.import(Transaction {
915 data: vec![3u8].into(),
916 hash: 3,
917 requires: vec![vec![1]],
918 provides: vec![vec![2]],
919 ..default_tx().clone()
920 })
921 .unwrap();
922 assert_eq!(pool.ready().count(), 0);
923 assert_eq!(pool.ready.len(), 0);
924
925 let tx2 = Transaction {
927 data: vec![2u8].into(),
928 hash: 2,
929 requires: vec![vec![2]],
930 provides: vec![vec![0]],
931 ..default_tx().clone()
932 };
933 pool.import(tx2.clone()).unwrap();
934
935 {
937 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
938 assert_eq!(it.next(), None);
939 }
940 assert_eq!(pool.future.len(), 3);
942
943 let res = pool
945 .import(Transaction {
946 data: vec![4u8].into(),
947 hash: 4,
948 priority: 50u64,
949 provides: vec![vec![0]],
950 ..default_tx().clone()
951 })
952 .unwrap();
953 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
954 assert_eq!(it.next(), Some(4));
955 assert_eq!(it.next(), Some(1));
956 assert_eq!(it.next(), Some(3));
957 assert_eq!(it.next(), None);
958 assert_eq!(
959 res,
960 Imported::Ready {
961 hash: 4,
962 promoted: vec![1, 3],
963 failed: vec![],
964 removed: vec![tx2.into()]
965 }
966 );
967 assert_eq!(pool.future.len(), 0);
968 }
969
970 #[test]
971 fn should_handle_a_cycle_with_low_priority() {
972 let mut pool = pool();
974 pool.import(Transaction {
975 data: vec![1u8].into(),
976 requires: vec![vec![0]],
977 provides: vec![vec![1]],
978 ..default_tx().clone()
979 })
980 .unwrap();
981 pool.import(Transaction {
982 data: vec![3u8].into(),
983 hash: 3,
984 requires: vec![vec![1]],
985 provides: vec![vec![2]],
986 ..default_tx().clone()
987 })
988 .unwrap();
989 assert_eq!(pool.ready().count(), 0);
990 assert_eq!(pool.ready.len(), 0);
991
992 pool.import(Transaction {
994 data: vec![2u8].into(),
995 hash: 2,
996 requires: vec![vec![2]],
997 provides: vec![vec![0]],
998 ..default_tx().clone()
999 })
1000 .unwrap();
1001
1002 {
1004 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1005 assert_eq!(it.next(), None);
1006 }
1007 assert_eq!(pool.future.len(), 3);
1009
1010 let err = pool
1012 .import(Transaction {
1013 data: vec![4u8].into(),
1014 hash: 4,
1015 priority: 1u64, provides: vec![vec![0]],
1017 ..default_tx().clone()
1018 })
1019 .unwrap_err();
1020 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1021 assert_eq!(it.next(), None);
1022 assert_eq!(pool.ready.len(), 0);
1023 assert_eq!(pool.future.len(), 0);
1024 if let error::Error::CycleDetected = err {
1025 } else {
1026 assert!(false, "Invalid error kind: {:?}", err);
1027 }
1028 }
1029
1030 #[test]
1031 fn should_remove_invalid_transactions() {
1032 let mut pool = pool();
1034 pool.import(Transaction {
1035 data: vec![5u8].into(),
1036 hash: 5,
1037 provides: vec![vec![0], vec![4]],
1038 ..default_tx().clone()
1039 })
1040 .unwrap();
1041 pool.import(Transaction {
1042 data: vec![1u8].into(),
1043 requires: vec![vec![0]],
1044 provides: vec![vec![1]],
1045 ..default_tx().clone()
1046 })
1047 .unwrap();
1048 pool.import(Transaction {
1049 data: vec![3u8].into(),
1050 hash: 3,
1051 requires: vec![vec![2]],
1052 ..default_tx().clone()
1053 })
1054 .unwrap();
1055 pool.import(Transaction {
1056 data: vec![2u8].into(),
1057 hash: 2,
1058 requires: vec![vec![1]],
1059 provides: vec![vec![3], vec![2]],
1060 ..default_tx().clone()
1061 })
1062 .unwrap();
1063 pool.import(Transaction {
1064 data: vec![4u8].into(),
1065 hash: 4,
1066 priority: 1_000u64,
1067 requires: vec![vec![3], vec![4]],
1068 ..default_tx().clone()
1069 })
1070 .unwrap();
1071 pool.import(Transaction {
1073 data: vec![6u8].into(),
1074 hash: 6,
1075 priority: 1_000u64,
1076 requires: vec![vec![11]],
1077 ..default_tx().clone()
1078 })
1079 .unwrap();
1080 assert_eq!(pool.ready().count(), 5);
1081 assert_eq!(pool.future.len(), 1);
1082
1083 pool.remove_subtree(&[6, 1]);
1085
1086 assert_eq!(pool.ready().count(), 1);
1088 assert_eq!(pool.future.len(), 0);
1089 }
1090
1091 #[test]
1092 fn should_prune_ready_transactions() {
1093 let mut pool = pool();
1095 pool.import(Transaction {
1097 data: vec![5u8].into(),
1098 hash: 5,
1099 requires: vec![vec![0]],
1100 provides: vec![vec![100]],
1101 ..default_tx().clone()
1102 })
1103 .unwrap();
1104 pool.import(Transaction {
1106 data: vec![1u8].into(),
1107 provides: vec![vec![1]],
1108 ..default_tx().clone()
1109 })
1110 .unwrap();
1111 pool.import(Transaction {
1112 data: vec![2u8].into(),
1113 hash: 2,
1114 requires: vec![vec![2]],
1115 provides: vec![vec![3]],
1116 ..default_tx().clone()
1117 })
1118 .unwrap();
1119 pool.import(Transaction {
1120 data: vec![3u8].into(),
1121 hash: 3,
1122 requires: vec![vec![1]],
1123 provides: vec![vec![2]],
1124 ..default_tx().clone()
1125 })
1126 .unwrap();
1127 pool.import(Transaction {
1128 data: vec![4u8].into(),
1129 hash: 4,
1130 priority: 1_000u64,
1131 requires: vec![vec![3], vec![2]],
1132 provides: vec![vec![4]],
1133 ..default_tx().clone()
1134 })
1135 .unwrap();
1136
1137 assert_eq!(pool.ready().count(), 4);
1138 assert_eq!(pool.future.len(), 1);
1139
1140 let result = pool.prune_tags(vec![vec![0], vec![2]]);
1142
1143 assert_eq!(result.pruned.len(), 2);
1145 assert_eq!(result.failed.len(), 0);
1146 assert_eq!(
1147 result.promoted[0],
1148 Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1149 );
1150 assert_eq!(result.promoted.len(), 1);
1151 assert_eq!(pool.future.len(), 0);
1152 assert_eq!(pool.ready.len(), 3);
1153 assert_eq!(pool.ready().count(), 3);
1154 }
1155
1156 #[test]
1157 fn transaction_debug() {
1158 assert_eq!(
1159 format!(
1160 "{:?}",
1161 Transaction {
1162 data: vec![4u8].into(),
1163 hash: 4,
1164 priority: 1_000u64,
1165 requires: vec![vec![3], vec![2]],
1166 provides: vec![vec![4]],
1167 ..default_tx().clone()
1168 }
1169 ),
1170 "Transaction { \
1171hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1172source: TimedTransactionSource { source: TransactionSource::External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1173 .to_owned()
1174 );
1175 }
1176
1177 #[test]
1178 fn transaction_propagation() {
1179 assert_eq!(
1180 Transaction {
1181 data: vec![4u8].into(),
1182 hash: 4,
1183 priority: 1_000u64,
1184 requires: vec![vec![3], vec![2]],
1185 provides: vec![vec![4]],
1186 ..default_tx().clone()
1187 }
1188 .is_propagable(),
1189 true
1190 );
1191
1192 assert_eq!(
1193 Transaction {
1194 data: vec![4u8].into(),
1195 hash: 4,
1196 priority: 1_000u64,
1197 requires: vec![vec![3], vec![2]],
1198 provides: vec![vec![4]],
1199 propagate: false,
1200 ..default_tx().clone()
1201 }
1202 .is_propagable(),
1203 false
1204 );
1205 }
1206
1207 #[test]
1208 fn should_reject_future_transactions() {
1209 let mut pool = pool();
1211
1212 pool.reject_future_transactions = true;
1214
1215 let err = pool.import(Transaction {
1217 data: vec![5u8].into(),
1218 hash: 5,
1219 requires: vec![vec![0]],
1220 ..default_tx().clone()
1221 });
1222
1223 if let Err(error::Error::RejectedFutureTransaction) = err {
1224 } else {
1225 assert!(false, "Invalid error kind: {:?}", err);
1226 }
1227 }
1228
1229 #[test]
1230 fn should_clear_future_queue() {
1231 let mut pool = pool();
1233
1234 pool.import(Transaction {
1236 data: vec![5u8].into(),
1237 hash: 5,
1238 requires: vec![vec![0]],
1239 ..default_tx().clone()
1240 })
1241 .unwrap();
1242
1243 assert_eq!(pool.future.len(), 1);
1245
1246 assert_eq!(pool.clear_future().len(), 1);
1248
1249 assert_eq!(pool.future.len(), 0);
1251 }
1252
1253 #[test]
1254 fn should_accept_future_transactions_when_explicitly_asked_to() {
1255 let mut pool = pool();
1257 pool.reject_future_transactions = true;
1258
1259 let flag_value = pool.with_futures_enabled(|pool, flag| {
1261 pool.import(Transaction {
1262 data: vec![5u8].into(),
1263 hash: 5,
1264 requires: vec![vec![0]],
1265 ..default_tx().clone()
1266 })
1267 .unwrap();
1268
1269 flag
1270 });
1271
1272 assert_eq!(flag_value, true);
1274 assert_eq!(pool.reject_future_transactions, true);
1275 assert_eq!(pool.future.len(), 1);
1276 }
1277}