1use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
24
25use crate::LOG_TARGET;
26use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
27use serde::Serialize;
28use sp_core::hexdisplay::HexDisplay;
29use sp_runtime::{
30 traits::Member,
31 transaction_validity::{
32 TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
33 TransactionTag as Tag,
34 },
35};
36use tracing::{trace, warn};
37
38use super::{
39 future::{FutureTransactions, WaitingTransaction},
40 ready::{BestIterator, ReadyTransactions, TransactionRef},
41};
42
43#[derive(Debug, PartialEq, Eq)]
45pub enum Imported<Hash, Ex> {
46 Ready {
48 hash: Hash,
50 promoted: Vec<Hash>,
52 failed: Vec<Hash>,
54 removed: Vec<Arc<Transaction<Hash, Ex>>>,
56 },
57 Future {
59 hash: Hash,
61 },
62}
63
64impl<Hash, Ex> Imported<Hash, Ex> {
65 pub fn hash(&self) -> &Hash {
67 use self::Imported::*;
68 match *self {
69 Ready { ref hash, .. } => hash,
70 Future { ref hash, .. } => hash,
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct PruneStatus<Hash, Ex> {
78 pub promoted: Vec<Imported<Hash, Ex>>,
80 pub failed: Vec<Hash>,
82 pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct TimedTransactionSource {
89 pub source: TransactionSource,
91
92 pub timestamp: Option<Instant>,
94}
95
96impl From<TimedTransactionSource> for TransactionSource {
97 fn from(value: TimedTransactionSource) -> Self {
98 value.source
99 }
100}
101
102impl TimedTransactionSource {
103 pub fn new_in_block(with_timestamp: bool) -> Self {
106 Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
107 }
108 pub fn new_external(with_timestamp: bool) -> Self {
111 Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
112 }
113 pub fn new_local(with_timestamp: bool) -> Self {
116 Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
117 }
118 pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
120 Self { source, timestamp: with_timestamp.then(Instant::now) }
121 }
122}
123
124#[derive(PartialEq, Eq, Clone)]
126pub struct Transaction<Hash, Extrinsic> {
127 pub data: Extrinsic,
129 pub bytes: usize,
131 pub hash: Hash,
133 pub priority: Priority,
135 pub valid_till: Longevity,
137 pub requires: Vec<Tag>,
139 pub provides: Vec<Tag>,
141 pub propagate: bool,
143 pub source: TimedTransactionSource,
145}
146
147impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
148 fn as_ref(&self) -> &Extrinsic {
149 &self.data
150 }
151}
152
153impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
154 type Transaction = Extrinsic;
155 type Hash = Hash;
156
157 fn data(&self) -> &Extrinsic {
158 &self.data
159 }
160
161 fn hash(&self) -> &Hash {
162 &self.hash
163 }
164
165 fn priority(&self) -> &Priority {
166 &self.priority
167 }
168
169 fn longevity(&self) -> &Longevity {
170 &self.valid_till
171 }
172
173 fn requires(&self) -> &[Tag] {
174 &self.requires
175 }
176
177 fn provides(&self) -> &[Tag] {
178 &self.provides
179 }
180
181 fn is_propagable(&self) -> bool {
182 self.propagate
183 }
184}
185
186impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
187 pub fn duplicate(&self) -> Self {
193 Self {
194 data: self.data.clone(),
195 bytes: self.bytes,
196 hash: self.hash.clone(),
197 priority: self.priority,
198 source: self.source.clone(),
199 valid_till: self.valid_till,
200 requires: self.requires.clone(),
201 provides: self.provides.clone(),
202 propagate: self.propagate,
203 }
204 }
205}
206
207impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
208where
209 Hash: fmt::Debug,
210 Extrinsic: fmt::Debug,
211{
212 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
213 let join_tags = |tags: &[Tag]| {
214 tags.iter()
215 .map(|tag| HexDisplay::from(tag).to_string())
216 .collect::<Vec<_>>()
217 .join(", ")
218 };
219
220 write!(fmt, "Transaction {{ ")?;
221 write!(fmt, "hash: {:?}, ", &self.hash)?;
222 write!(fmt, "priority: {:?}, ", &self.priority)?;
223 write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
224 write!(fmt, "bytes: {:?}, ", &self.bytes)?;
225 write!(fmt, "propagate: {:?}, ", &self.propagate)?;
226 write!(fmt, "source: {:?}, ", &self.source)?;
227 write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
228 write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
229 write!(fmt, "data: {:?}", &self.data)?;
230 write!(fmt, "}}")?;
231 Ok(())
232 }
233}
234
235const RECENTLY_PRUNED_TAGS: usize = 2;
237
238#[derive(Clone, Debug)]
249pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
250 reject_future_transactions: bool,
251 future: FutureTransactions<Hash, Ex>,
252 ready: ReadyTransactions<Hash, Ex>,
253 recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
258 recently_pruned_index: usize,
259}
260
261impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
262 fn default() -> Self {
263 Self::new(false)
264 }
265}
266
267impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
268 pub fn new(reject_future_transactions: bool) -> Self {
270 Self {
271 reject_future_transactions,
272 future: Default::default(),
273 ready: Default::default(),
274 recently_pruned: Default::default(),
275 recently_pruned_index: 0,
276 }
277 }
278
279 pub fn clear_recently_pruned(&mut self) {
281 self.recently_pruned = Default::default();
282 self.recently_pruned_index = 0;
283 }
284
285 pub(crate) fn with_futures_enabled<T>(
291 &mut self,
292 closure: impl FnOnce(&mut Self, bool) -> T,
293 ) -> T {
294 let previous = self.reject_future_transactions;
295 self.reject_future_transactions = false;
296 let return_value = closure(self, previous);
297 self.reject_future_transactions = previous;
298 return_value
299 }
300
301 pub fn is_imported(&self, tx_hash: &Hash) -> bool {
303 self.future.contains(tx_hash) || self.ready.contains(tx_hash)
304 }
305
306 pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
314 if self.is_imported(&tx.hash) {
315 return Err(error::Error::AlreadyImported(Box::new(tx.hash)));
316 }
317
318 let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
319 trace!(
320 target: LOG_TARGET,
321 tx_hash = ?tx.transaction.hash,
322 ?tx,
323 set = if tx.is_ready() { "ready" } else { "future" },
324 "Importing transaction"
325 );
326
327 if !tx.is_ready() {
329 if self.reject_future_transactions {
330 return Err(error::Error::RejectedFutureTransaction);
331 }
332
333 let hash = tx.transaction.hash.clone();
334 self.future.import(tx);
335 return Ok(Imported::Future { hash });
336 }
337
338 self.import_to_ready(tx)
339 }
340
341 fn import_to_ready(
345 &mut self,
346 tx: WaitingTransaction<Hash, Ex>,
347 ) -> error::Result<Imported<Hash, Ex>> {
348 let tx_hash = tx.transaction.hash.clone();
349 let mut promoted = vec![];
350 let mut failed = vec![];
351 let mut removed = vec![];
352
353 let mut first = true;
354 let mut to_import = vec![tx];
355
356 while let Some(tx) = to_import.pop() {
358 to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
360
361 let current_hash = tx.transaction.hash.clone();
363 let current_tx = tx.transaction.clone();
364 match self.ready.import(tx) {
365 Ok(mut replaced) => {
366 if !first {
367 promoted.push(current_hash.clone());
368 }
369 promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
372 removed.append(&mut replaced);
375 },
376 Err(error @ error::Error::TooLowPriority { .. }) => {
377 trace!(
378 target: LOG_TARGET,
379 tx_hash = ?current_tx.hash,
380 ?first,
381 %error,
382 "Error importing transaction"
383 );
384 if first {
385 return Err(error);
386 } else {
387 removed.push(current_tx);
388 promoted.retain(|hash| *hash != current_hash);
389 }
390 },
391 Err(error) => {
393 trace!(
394 target: LOG_TARGET,
395 tx_hash = ?current_tx.hash,
396 ?error,
397 first,
398 "Error importing transaction"
399 );
400 if first {
401 return Err(error);
402 } else {
403 failed.push(current_tx.hash.clone());
404 }
405 },
406 }
407 first = false;
408 }
409
410 if removed.iter().any(|tx| tx.hash == tx_hash) {
416 self.ready.remove_subtree(&promoted);
419
420 trace!(
421 target: LOG_TARGET,
422 ?tx_hash,
423 "Cycle detected, bailing."
424 );
425 return Err(error::Error::CycleDetected);
426 }
427
428 Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
429 }
430
431 pub fn ready(&self) -> BestIterator<Hash, Ex> {
433 self.ready.get()
434 }
435
436 pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
438 self.future.all()
439 }
440
441 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
446 let ready = self.ready.by_hashes(hashes);
447 let future = self.future.by_hashes(hashes);
448
449 ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
450 }
451
452 pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
454 self.ready.by_hash(hash)
455 }
456
457 pub fn enforce_limits(
464 &mut self,
465 ready: &Limit,
466 future: &Limit,
467 ) -> Vec<Arc<Transaction<Hash, Ex>>> {
468 let mut removed = vec![];
469
470 while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
471 let worst =
473 self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
474 let transaction = ¤t.transaction;
475 worst
476 .map(|worst| {
477 match worst.transaction.priority.cmp(&transaction.transaction.priority)
482 {
483 Ordering::Less => worst,
484 Ordering::Equal => {
485 if worst.insertion_id > transaction.insertion_id {
486 transaction.clone()
487 } else {
488 worst
489 }
490 },
491 Ordering::Greater => transaction.clone(),
492 }
493 })
494 .or_else(|| Some(transaction.clone()))
495 });
496
497 if let Some(worst) = worst {
498 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
499 } else {
500 break;
501 }
502 }
503
504 while future.is_exceeded(self.future.len(), self.future.bytes()) {
505 let worst = self.future.fold(|worst, current| match worst {
507 None => Some(current.clone()),
508 Some(worst) => Some(
509 match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
510 {
511 (Some(worst_timestamp), Some(current_timestamp)) => {
512 if worst_timestamp > current_timestamp {
513 current.clone()
514 } else {
515 worst
516 }
517 },
518 _ => {
519 if worst.imported_at > current.imported_at {
520 current.clone()
521 } else {
522 worst
523 }
524 },
525 },
526 ),
527 });
528
529 if let Some(worst) = worst {
530 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
531 } else {
532 break;
533 }
534 }
535
536 removed
537 }
538
539 pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
548 let mut removed = self.ready.remove_subtree(hashes);
549 removed.extend(self.future.remove(hashes));
550 removed
551 }
552
553 pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
555 self.future.clear()
556 }
557
558 pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
565 let mut to_import = vec![];
566 let mut pruned = vec![];
567 let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
568 self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
569 recently_pruned.clear();
570
571 let tags = tags.into_iter().collect::<Vec<_>>();
572 let futures_removed = self.future.prune_tags(&tags);
573
574 for tag in tags {
575 to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
577 pruned.append(&mut self.ready.prune_tags(tag.clone()));
579 recently_pruned.insert(tag);
581 }
582
583 let mut promoted = vec![];
584 let mut failed = vec![];
585 for tx in futures_removed {
586 failed.push(tx.hash.clone());
587 }
588
589 for tx in to_import {
590 let tx_hash = tx.transaction.hash.clone();
591 match self.import_to_ready(tx) {
592 Ok(res) => promoted.push(res),
593 Err(error) => {
594 warn!(
595 target: LOG_TARGET,
596 ?tx_hash,
597 ?error,
598 "Failed to promote during pruning."
599 );
600 failed.push(tx_hash)
601 },
602 }
603 }
604
605 PruneStatus { pruned, failed, promoted }
606 }
607
608 pub fn status(&self) -> PoolStatus {
610 PoolStatus {
611 ready: self.ready.len(),
612 ready_bytes: self.ready.bytes(),
613 future: self.future.len(),
614 future_bytes: self.future.bytes(),
615 }
616 }
617}
618
619#[derive(Debug, Clone)]
621pub struct Limit {
622 pub count: usize,
624 pub total_bytes: usize,
626}
627
628impl Limit {
629 pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
631 self.count < count || self.total_bytes < bytes
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638
639 type Hash = u64;
640
641 fn pool() -> BasePool<Hash, Vec<u8>> {
642 BasePool::default()
643 }
644
645 fn default_tx() -> Transaction<Hash, Vec<u8>> {
646 Transaction {
647 data: vec![],
648 bytes: 1,
649 hash: 1u64,
650 priority: 5u64,
651 valid_till: 64u64,
652 requires: vec![],
653 provides: vec![],
654 propagate: true,
655 source: TimedTransactionSource::new_external(false),
656 }
657 }
658
659 #[test]
660 fn prune_for_ready_works() {
661 let mut pool = pool();
663
664 pool.import(Transaction {
666 data: vec![1u8].into(),
667 provides: vec![vec![2]],
668 ..default_tx().clone()
669 })
670 .unwrap();
671
672 assert_eq!(pool.ready().count(), 1);
674 assert_eq!(pool.ready.len(), 1);
675
676 let result = pool.prune_tags(vec![vec![2]]);
677 assert_eq!(pool.ready().count(), 0);
678 assert_eq!(pool.ready.len(), 0);
679 assert_eq!(result.pruned.len(), 1);
680 assert_eq!(result.failed.len(), 0);
681 assert_eq!(result.promoted.len(), 0);
682 }
683
684 #[test]
685 fn prune_for_future_works() {
686 let mut pool = pool();
688
689 pool.import(Transaction {
691 data: vec![1u8].into(),
692 requires: vec![vec![1]],
693 provides: vec![vec![2]],
694 hash: 0xaa,
695 ..default_tx().clone()
696 })
697 .unwrap();
698
699 assert_eq!(pool.futures().count(), 1);
701 assert_eq!(pool.future.len(), 1);
702
703 let result = pool.prune_tags(vec![vec![2]]);
704 assert_eq!(pool.ready().count(), 0);
705 assert_eq!(pool.ready.len(), 0);
706 assert_eq!(pool.futures().count(), 0);
707 assert_eq!(pool.future.len(), 0);
708
709 assert_eq!(result.pruned.len(), 0);
710 assert_eq!(result.failed.len(), 1);
711 assert_eq!(result.failed[0], 0xaa);
712 assert_eq!(result.promoted.len(), 0);
713 }
714
715 #[test]
716 fn should_import_transaction_to_ready() {
717 let mut pool = pool();
719
720 pool.import(Transaction {
722 data: vec![1u8].into(),
723 provides: vec![vec![1]],
724 ..default_tx().clone()
725 })
726 .unwrap();
727
728 assert_eq!(pool.ready().count(), 1);
730 assert_eq!(pool.ready.len(), 1);
731 }
732
733 #[test]
734 fn should_not_import_same_transaction_twice() {
735 let mut pool = pool();
737
738 pool.import(Transaction {
740 data: vec![1u8].into(),
741 provides: vec![vec![1]],
742 ..default_tx().clone()
743 })
744 .unwrap();
745 pool.import(Transaction {
746 data: vec![1u8].into(),
747 provides: vec![vec![1]],
748 ..default_tx().clone()
749 })
750 .unwrap_err();
751
752 assert_eq!(pool.ready().count(), 1);
754 assert_eq!(pool.ready.len(), 1);
755 }
756
757 #[test]
758 fn should_import_transaction_to_future_and_promote_it_later() {
759 let mut pool = pool();
761
762 pool.import(Transaction {
764 data: vec![1u8].into(),
765 requires: vec![vec![0]],
766 provides: vec![vec![1]],
767 ..default_tx().clone()
768 })
769 .unwrap();
770 assert_eq!(pool.ready().count(), 0);
771 assert_eq!(pool.ready.len(), 0);
772 pool.import(Transaction {
773 data: vec![2u8].into(),
774 hash: 2,
775 provides: vec![vec![0]],
776 ..default_tx().clone()
777 })
778 .unwrap();
779
780 assert_eq!(pool.ready().count(), 2);
782 assert_eq!(pool.ready.len(), 2);
783 }
784
785 #[test]
786 fn should_promote_a_subgraph() {
787 let mut pool = pool();
789
790 pool.import(Transaction {
792 data: vec![1u8].into(),
793 requires: vec![vec![0]],
794 provides: vec![vec![1]],
795 ..default_tx().clone()
796 })
797 .unwrap();
798 pool.import(Transaction {
799 data: vec![3u8].into(),
800 hash: 3,
801 requires: vec![vec![2]],
802 ..default_tx().clone()
803 })
804 .unwrap();
805 pool.import(Transaction {
806 data: vec![2u8].into(),
807 hash: 2,
808 requires: vec![vec![1]],
809 provides: vec![vec![3], vec![2]],
810 ..default_tx().clone()
811 })
812 .unwrap();
813 pool.import(Transaction {
814 data: vec![4u8].into(),
815 hash: 4,
816 priority: 1_000u64,
817 requires: vec![vec![3], vec![4]],
818 ..default_tx().clone()
819 })
820 .unwrap();
821 assert_eq!(pool.ready().count(), 0);
822 assert_eq!(pool.ready.len(), 0);
823
824 let res = pool
825 .import(Transaction {
826 data: vec![5u8].into(),
827 hash: 5,
828 provides: vec![vec![0], vec![4]],
829 ..default_tx().clone()
830 })
831 .unwrap();
832
833 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
835
836 assert_eq!(it.next(), Some(5));
837 assert_eq!(it.next(), Some(1));
838 assert_eq!(it.next(), Some(2));
839 assert_eq!(it.next(), Some(4));
840 assert_eq!(it.next(), Some(3));
841 assert_eq!(it.next(), None);
842 assert_eq!(
843 res,
844 Imported::Ready {
845 hash: 5,
846 promoted: vec![1, 2, 3, 4],
847 failed: vec![],
848 removed: vec![],
849 }
850 );
851 }
852
853 #[test]
854 fn should_remove_conflicting_future() {
855 let mut pool = pool();
856 pool.import(Transaction {
857 data: vec![3u8].into(),
858 hash: 3,
859 requires: vec![vec![1]],
860 priority: 50u64,
861 provides: vec![vec![3]],
862 ..default_tx().clone()
863 })
864 .unwrap();
865 assert_eq!(pool.ready().count(), 0);
866 assert_eq!(pool.ready.len(), 0);
867
868 let tx2 = Transaction {
869 data: vec![2u8].into(),
870 hash: 2,
871 requires: vec![vec![1]],
872 provides: vec![vec![3]],
873 ..default_tx().clone()
874 };
875 pool.import(tx2.clone()).unwrap();
876 assert_eq!(pool.future.len(), 2);
877
878 let res = pool
879 .import(Transaction {
880 data: vec![1u8].into(),
881 hash: 1,
882 provides: vec![vec![1]],
883 ..default_tx().clone()
884 })
885 .unwrap();
886
887 assert_eq!(
888 res,
889 Imported::Ready {
890 hash: 1,
891 promoted: vec![3],
892 failed: vec![],
893 removed: vec![tx2.into()]
894 }
895 );
896
897 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
898 assert_eq!(it.next(), Some(1));
899 assert_eq!(it.next(), Some(3));
900 assert_eq!(it.next(), None);
901
902 assert_eq!(pool.future.len(), 0);
903 }
904
905 #[test]
906 fn should_handle_a_cycle() {
907 let mut pool = pool();
909 pool.import(Transaction {
910 data: vec![1u8].into(),
911 requires: vec![vec![0]],
912 provides: vec![vec![1]],
913 ..default_tx().clone()
914 })
915 .unwrap();
916 pool.import(Transaction {
917 data: vec![3u8].into(),
918 hash: 3,
919 requires: vec![vec![1]],
920 provides: vec![vec![2]],
921 ..default_tx().clone()
922 })
923 .unwrap();
924 assert_eq!(pool.ready().count(), 0);
925 assert_eq!(pool.ready.len(), 0);
926
927 let tx2 = Transaction {
929 data: vec![2u8].into(),
930 hash: 2,
931 requires: vec![vec![2]],
932 provides: vec![vec![0]],
933 ..default_tx().clone()
934 };
935 pool.import(tx2.clone()).unwrap();
936
937 {
939 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
940 assert_eq!(it.next(), None);
941 }
942 assert_eq!(pool.future.len(), 3);
944
945 let res = pool
947 .import(Transaction {
948 data: vec![4u8].into(),
949 hash: 4,
950 priority: 50u64,
951 provides: vec![vec![0]],
952 ..default_tx().clone()
953 })
954 .unwrap();
955 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
956 assert_eq!(it.next(), Some(4));
957 assert_eq!(it.next(), Some(1));
958 assert_eq!(it.next(), Some(3));
959 assert_eq!(it.next(), None);
960 assert_eq!(
961 res,
962 Imported::Ready {
963 hash: 4,
964 promoted: vec![1, 3],
965 failed: vec![],
966 removed: vec![tx2.into()]
967 }
968 );
969 assert_eq!(pool.future.len(), 0);
970 }
971
972 #[test]
973 fn should_handle_a_cycle_with_low_priority() {
974 let mut pool = pool();
976 pool.import(Transaction {
977 data: vec![1u8].into(),
978 requires: vec![vec![0]],
979 provides: vec![vec![1]],
980 ..default_tx().clone()
981 })
982 .unwrap();
983 pool.import(Transaction {
984 data: vec![3u8].into(),
985 hash: 3,
986 requires: vec![vec![1]],
987 provides: vec![vec![2]],
988 ..default_tx().clone()
989 })
990 .unwrap();
991 assert_eq!(pool.ready().count(), 0);
992 assert_eq!(pool.ready.len(), 0);
993
994 pool.import(Transaction {
996 data: vec![2u8].into(),
997 hash: 2,
998 requires: vec![vec![2]],
999 provides: vec![vec![0]],
1000 ..default_tx().clone()
1001 })
1002 .unwrap();
1003
1004 {
1006 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1007 assert_eq!(it.next(), None);
1008 }
1009 assert_eq!(pool.future.len(), 3);
1011
1012 let err = pool
1014 .import(Transaction {
1015 data: vec![4u8].into(),
1016 hash: 4,
1017 priority: 1u64, provides: vec![vec![0]],
1019 ..default_tx().clone()
1020 })
1021 .unwrap_err();
1022 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1023 assert_eq!(it.next(), None);
1024 assert_eq!(pool.ready.len(), 0);
1025 assert_eq!(pool.future.len(), 0);
1026 if let error::Error::CycleDetected = err {
1027 } else {
1028 assert!(false, "Invalid error kind: {:?}", err);
1029 }
1030 }
1031
1032 #[test]
1033 fn should_remove_invalid_transactions() {
1034 let mut pool = pool();
1036 pool.import(Transaction {
1037 data: vec![5u8].into(),
1038 hash: 5,
1039 provides: vec![vec![0], vec![4]],
1040 ..default_tx().clone()
1041 })
1042 .unwrap();
1043 pool.import(Transaction {
1044 data: vec![1u8].into(),
1045 requires: vec![vec![0]],
1046 provides: vec![vec![1]],
1047 ..default_tx().clone()
1048 })
1049 .unwrap();
1050 pool.import(Transaction {
1051 data: vec![3u8].into(),
1052 hash: 3,
1053 requires: vec![vec![2]],
1054 ..default_tx().clone()
1055 })
1056 .unwrap();
1057 pool.import(Transaction {
1058 data: vec![2u8].into(),
1059 hash: 2,
1060 requires: vec![vec![1]],
1061 provides: vec![vec![3], vec![2]],
1062 ..default_tx().clone()
1063 })
1064 .unwrap();
1065 pool.import(Transaction {
1066 data: vec![4u8].into(),
1067 hash: 4,
1068 priority: 1_000u64,
1069 requires: vec![vec![3], vec![4]],
1070 ..default_tx().clone()
1071 })
1072 .unwrap();
1073 pool.import(Transaction {
1075 data: vec![6u8].into(),
1076 hash: 6,
1077 priority: 1_000u64,
1078 requires: vec![vec![11]],
1079 ..default_tx().clone()
1080 })
1081 .unwrap();
1082 assert_eq!(pool.ready().count(), 5);
1083 assert_eq!(pool.future.len(), 1);
1084
1085 pool.remove_subtree(&[6, 1]);
1087
1088 assert_eq!(pool.ready().count(), 1);
1090 assert_eq!(pool.future.len(), 0);
1091 }
1092
1093 #[test]
1094 fn should_prune_ready_transactions() {
1095 let mut pool = pool();
1097 pool.import(Transaction {
1099 data: vec![5u8].into(),
1100 hash: 5,
1101 requires: vec![vec![0]],
1102 provides: vec![vec![100]],
1103 ..default_tx().clone()
1104 })
1105 .unwrap();
1106 pool.import(Transaction {
1108 data: vec![1u8].into(),
1109 provides: vec![vec![1]],
1110 ..default_tx().clone()
1111 })
1112 .unwrap();
1113 pool.import(Transaction {
1114 data: vec![2u8].into(),
1115 hash: 2,
1116 requires: vec![vec![2]],
1117 provides: vec![vec![3]],
1118 ..default_tx().clone()
1119 })
1120 .unwrap();
1121 pool.import(Transaction {
1122 data: vec![3u8].into(),
1123 hash: 3,
1124 requires: vec![vec![1]],
1125 provides: vec![vec![2]],
1126 ..default_tx().clone()
1127 })
1128 .unwrap();
1129 pool.import(Transaction {
1130 data: vec![4u8].into(),
1131 hash: 4,
1132 priority: 1_000u64,
1133 requires: vec![vec![3], vec![2]],
1134 provides: vec![vec![4]],
1135 ..default_tx().clone()
1136 })
1137 .unwrap();
1138
1139 assert_eq!(pool.ready().count(), 4);
1140 assert_eq!(pool.future.len(), 1);
1141
1142 let result = pool.prune_tags(vec![vec![0], vec![2]]);
1144
1145 assert_eq!(result.pruned.len(), 2);
1147 assert_eq!(result.failed.len(), 0);
1148 assert_eq!(
1149 result.promoted[0],
1150 Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1151 );
1152 assert_eq!(result.promoted.len(), 1);
1153 assert_eq!(pool.future.len(), 0);
1154 assert_eq!(pool.ready.len(), 3);
1155 assert_eq!(pool.ready().count(), 3);
1156 }
1157
1158 #[test]
1159 fn transaction_debug() {
1160 assert_eq!(
1161 format!(
1162 "{:?}",
1163 Transaction {
1164 data: vec![4u8].into(),
1165 hash: 4,
1166 priority: 1_000u64,
1167 requires: vec![vec![3], vec![2]],
1168 provides: vec![vec![4]],
1169 ..default_tx().clone()
1170 }
1171 ),
1172 "Transaction { \
1173hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1174source: TimedTransactionSource { source: External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1175 .to_owned()
1176 );
1177 }
1178
1179 #[test]
1180 fn transaction_propagation() {
1181 assert_eq!(
1182 Transaction {
1183 data: vec![4u8].into(),
1184 hash: 4,
1185 priority: 1_000u64,
1186 requires: vec![vec![3], vec![2]],
1187 provides: vec![vec![4]],
1188 ..default_tx().clone()
1189 }
1190 .is_propagable(),
1191 true
1192 );
1193
1194 assert_eq!(
1195 Transaction {
1196 data: vec![4u8].into(),
1197 hash: 4,
1198 priority: 1_000u64,
1199 requires: vec![vec![3], vec![2]],
1200 provides: vec![vec![4]],
1201 propagate: false,
1202 ..default_tx().clone()
1203 }
1204 .is_propagable(),
1205 false
1206 );
1207 }
1208
1209 #[test]
1210 fn should_reject_future_transactions() {
1211 let mut pool = pool();
1213
1214 pool.reject_future_transactions = true;
1216
1217 let err = pool.import(Transaction {
1219 data: vec![5u8].into(),
1220 hash: 5,
1221 requires: vec![vec![0]],
1222 ..default_tx().clone()
1223 });
1224
1225 if let Err(error::Error::RejectedFutureTransaction) = err {
1226 } else {
1227 assert!(false, "Invalid error kind: {:?}", err);
1228 }
1229 }
1230
1231 #[test]
1232 fn should_clear_future_queue() {
1233 let mut pool = pool();
1235
1236 pool.import(Transaction {
1238 data: vec![5u8].into(),
1239 hash: 5,
1240 requires: vec![vec![0]],
1241 ..default_tx().clone()
1242 })
1243 .unwrap();
1244
1245 assert_eq!(pool.future.len(), 1);
1247
1248 assert_eq!(pool.clear_future().len(), 1);
1250
1251 assert_eq!(pool.future.len(), 0);
1253 }
1254
1255 #[test]
1256 fn should_accept_future_transactions_when_explicitly_asked_to() {
1257 let mut pool = pool();
1259 pool.reject_future_transactions = true;
1260
1261 let flag_value = pool.with_futures_enabled(|pool, flag| {
1263 pool.import(Transaction {
1264 data: vec![5u8].into(),
1265 hash: 5,
1266 requires: vec![vec![0]],
1267 ..default_tx().clone()
1268 })
1269 .unwrap();
1270
1271 flag
1272 });
1273
1274 assert_eq!(flag_value, true);
1276 assert_eq!(pool.reject_future_transactions, true);
1277 assert_eq!(pool.future.len(), 1);
1278 }
1279}