1use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
20use async_trait::async_trait;
21use futures::channel::mpsc::Receiver;
22use indexmap::IndexMap;
23use sc_transaction_pool_api::error;
24use sp_blockchain::{HashAndNumber, TreeRoute};
25use sp_runtime::{
26 generic::BlockId,
27 traits::{self, Block as BlockT, SaturatedConversion},
28 transaction_validity::{
29 TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30 },
31};
32use std::{
33 collections::HashMap,
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use tracing::{debug, instrument, trace, Level};
38
39use super::{
40 base_pool as base,
41 validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
42 EventHandler, ValidatedPoolSubmitOutcome,
43};
44
45pub type EventStream<H> = Receiver<H>;
47
48pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
52pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
54pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
56pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
58pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
60pub type ValidatedTransactionFor<A> =
62 ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
63
64#[derive(PartialEq, Copy, Clone)]
66pub enum ValidateTransactionPriority {
67 Submitted,
71 Maintained,
75}
76
77#[async_trait]
79pub trait ChainApi: Send + Sync {
80 type Block: BlockT;
82 type Error: From<error::Error> + error::IntoPoolError;
84
85 async fn validate_transaction(
87 &self,
88 at: <Self::Block as BlockT>::Hash,
89 source: TransactionSource,
90 uxt: ExtrinsicFor<Self>,
91 validation_priority: ValidateTransactionPriority,
92 ) -> Result<TransactionValidity, Self::Error>;
93
94 fn validate_transaction_blocking(
99 &self,
100 at: <Self::Block as BlockT>::Hash,
101 source: TransactionSource,
102 uxt: ExtrinsicFor<Self>,
103 ) -> Result<TransactionValidity, Self::Error>;
104
105 fn block_id_to_number(
107 &self,
108 at: &BlockId<Self::Block>,
109 ) -> Result<Option<NumberFor<Self>>, Self::Error>;
110
111 fn block_id_to_hash(
113 &self,
114 at: &BlockId<Self::Block>,
115 ) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
116
117 fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
119
120 async fn block_body(
122 &self,
123 at: <Self::Block as BlockT>::Hash,
124 ) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
125
126 fn block_header(
128 &self,
129 at: <Self::Block as BlockT>::Hash,
130 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
131
132 fn tree_route(
134 &self,
135 from: <Self::Block as BlockT>::Hash,
136 to: <Self::Block as BlockT>::Hash,
137 ) -> Result<TreeRoute<Self::Block>, Self::Error>;
138
139 fn resolve_block_number(
141 &self,
142 at: <Self::Block as BlockT>::Hash,
143 ) -> Result<NumberFor<Self>, Self::Error> {
144 self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
145 number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
146 })
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct Options {
153 pub ready: base::Limit,
155 pub future: base::Limit,
157 pub reject_future_transactions: bool,
159 pub ban_time: Duration,
161}
162
163impl Default for Options {
164 fn default() -> Self {
165 Self {
166 ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
167 future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
168 reject_future_transactions: false,
169 ban_time: Duration::from_secs(60 * 30),
170 }
171 }
172}
173
174impl Options {
175 pub fn total_count(&self) -> usize {
177 self.ready.count + self.future.count
178 }
179}
180
181#[derive(Copy, Clone)]
184pub(crate) enum CheckBannedBeforeVerify {
185 Yes,
186 No,
187}
188
189pub struct Pool<B: ChainApi, L: EventHandler<B>> {
191 validated_pool: Arc<ValidatedPool<B, L>>,
192}
193
194impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
195 pub fn new_with_staticly_sized_rotator(
197 options: Options,
198 is_validator: IsValidator,
199 api: Arc<B>,
200 ) -> Self {
201 Self {
202 validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
203 options,
204 is_validator,
205 api,
206 )),
207 }
208 }
209
210 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212 Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
213 }
214
215 pub fn new_with_event_handler(
217 options: Options,
218 is_validator: IsValidator,
219 api: Arc<B>,
220 event_handler: L,
221 ) -> Self {
222 Self {
223 validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
224 options,
225 is_validator,
226 api,
227 event_handler,
228 )),
229 }
230 }
231
232 #[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
234 pub async fn submit_at(
235 &self,
236 at: &HashAndNumber<B::Block>,
237 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
238 validation_priority: ValidateTransactionPriority,
239 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
240 let validated_transactions =
241 self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
242 self.validated_pool.submit(validated_transactions.into_values())
243 }
244
245 pub async fn resubmit_at(
249 &self,
250 at: &HashAndNumber<B::Block>,
251 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
252 validation_priority: ValidateTransactionPriority,
253 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
254 let validated_transactions =
255 self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
256 self.validated_pool.submit(validated_transactions.into_values())
257 }
258
259 pub async fn submit_one(
261 &self,
262 at: &HashAndNumber<B::Block>,
263 source: base::TimedTransactionSource,
264 xt: ExtrinsicFor<B>,
265 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
266 let res = self
267 .submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
268 .await
269 .pop();
270 res.expect("One extrinsic passed; one result returned; qed")
271 }
272
273 pub async fn submit_and_watch(
275 &self,
276 at: &HashAndNumber<B::Block>,
277 source: base::TimedTransactionSource,
278 xt: ExtrinsicFor<B>,
279 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
280 let (_, tx) = self
281 .verify_one(
282 at.hash,
283 at.number,
284 source,
285 xt,
286 CheckBannedBeforeVerify::Yes,
287 ValidateTransactionPriority::Submitted,
288 )
289 .await;
290 self.validated_pool.submit_and_watch(tx)
291 }
292
293 pub fn resubmit(
295 &self,
296 revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
297 ) {
298 let now = Instant::now();
299 self.validated_pool.resubmit(revalidated_transactions);
300 trace!(
301 target: LOG_TARGET,
302 duration = ?now.elapsed(),
303 status = ?self.validated_pool.status(),
304 "Resubmitted transaction."
305 );
306 }
307
308 pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
314 let in_pool_tags =
316 self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
317
318 let prune_status = self.validated_pool.prune_tags(in_pool_tags);
320 let pruned_transactions =
321 hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
322 self.validated_pool.fire_pruned(at, pruned_transactions);
323 }
324
325 pub async fn prune(
332 &self,
333 at: &HashAndNumber<B::Block>,
334 parent: <B::Block as BlockT>::Hash,
335 extrinsics: &[RawExtrinsicFor<B>],
336 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
337 ) {
338 debug!(
339 target: LOG_TARGET,
340 ?at,
341 extrinsics_count = extrinsics.len(),
342 "Starting pruning of block."
343 );
344 let in_pool_hashes =
346 extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
347 let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
348 let mut unknown_txs_count = 0usize;
350 let mut reused_txs_count = 0usize;
351 let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
352 tags.or_else(|| {
353 unknown_txs_count += 1;
354 known_provides_tags.as_ref().and_then(|inner| {
355 inner.get(&tx_hash).map(|found_tags| {
356 reused_txs_count += 1;
357 found_tags.clone()
358 })
359 })
360 })
361 });
362
363 let all = extrinsics.iter().zip(tags);
366 let mut validated_counter: usize = 0;
367 let mut future_tags = Vec::new();
368 let now = Instant::now();
369 for (extrinsic, in_pool_tags) in all {
370 match in_pool_tags {
371 Some(tags) => future_tags.extend(tags),
374 None => {
377 if !self.validated_pool.status().is_empty() {
379 validated_counter = validated_counter + 1;
380 let validity = self
381 .validated_pool
382 .api()
383 .validate_transaction(
384 parent,
385 TransactionSource::InBlock,
386 Arc::from(extrinsic.clone()),
387 ValidateTransactionPriority::Maintained,
388 )
389 .await;
390
391 trace!(
392 target: LOG_TARGET,
393 tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
394 ?validity,
395 "prune::revalidated"
396 );
397 if let Ok(Ok(validity)) = validity {
398 future_tags.extend(validity.provides);
399 }
400 } else {
401 trace!(
402 target: LOG_TARGET,
403 ?at,
404 "txpool is empty, skipping validation for block",
405 );
406 }
407 },
408 }
409 }
410
411 let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
412 debug!(
413 target: LOG_TARGET,
414 validated_counter,
415 known_provides_tags_len,
416 unknown_txs_count,
417 reused_txs_count,
418 duration = ?now.elapsed(),
419 "prune"
420 );
421 self.prune_tags(at, future_tags, in_pool_hashes).await
422 }
423
424 pub async fn prune_tags(
446 &self,
447 at: &HashAndNumber<B::Block>,
448 tags: impl IntoIterator<Item = Tag>,
449 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
450 ) {
451 let now = Instant::now();
452 trace!(target: LOG_TARGET, ?at, "Pruning tags.");
453 let prune_status = self.validated_pool.prune_tags(tags);
455
456 self.validated_pool
460 .ban(&Instant::now(), known_imported_hashes.clone().into_iter());
461
462 let pruned_transactions =
465 prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
466
467 let reverified_transactions = self
468 .verify(
469 at,
470 pruned_transactions,
471 CheckBannedBeforeVerify::Yes,
472 ValidateTransactionPriority::Maintained,
473 )
474 .await;
475
476 let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
477 debug!(
478 target: LOG_TARGET,
479 ?at,
480 reverified_transactions = reverified_transactions.len(),
481 duration = ?now.elapsed(),
482 "Pruned. Resubmitting transactions."
483 );
484 log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
485
486 self.validated_pool.resubmit_pruned(
488 &at,
489 known_imported_hashes,
490 pruned_hashes,
491 reverified_transactions.into_values().collect(),
492 )
493 }
494
495 pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
497 self.validated_pool.api().hash_and_length(xt).0
498 }
499
500 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
502 async fn verify(
503 &self,
504 at: &HashAndNumber<B::Block>,
505 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
506 check: CheckBannedBeforeVerify,
507 validation_priority: ValidateTransactionPriority,
508 ) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
509 let HashAndNumber { number, hash } = *at;
510
511 let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
512 self.verify_one(hash, number, source, xt, check, validation_priority)
513 }))
514 .await
515 .into_iter()
516 .collect::<IndexMap<_, _>>();
517
518 res
519 }
520
521 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
523 pub(crate) async fn verify_one(
524 &self,
525 block_hash: <B::Block as BlockT>::Hash,
526 block_number: NumberFor<B>,
527 source: base::TimedTransactionSource,
528 xt: ExtrinsicFor<B>,
529 check: CheckBannedBeforeVerify,
530 validation_priority: ValidateTransactionPriority,
531 ) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
532 let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
533
534 let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
535 if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
536 return (hash, ValidatedTransaction::Invalid(hash, err))
537 }
538
539 let validation_result = self
540 .validated_pool
541 .api()
542 .validate_transaction(
543 block_hash,
544 source.clone().into(),
545 xt.clone(),
546 validation_priority,
547 )
548 .await;
549
550 let status = match validation_result {
551 Ok(status) => status,
552 Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
553 };
554
555 let validity = match status {
556 Ok(validity) =>
557 if validity.provides.is_empty() {
558 ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
559 } else {
560 ValidatedTransaction::valid_at(
561 block_number.saturated_into::<u64>(),
562 hash,
563 source,
564 xt,
565 bytes,
566 validity,
567 )
568 },
569 Err(TransactionValidityError::Invalid(e)) =>
570 ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
571 Err(TransactionValidityError::Unknown(e)) =>
572 ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
573 };
574
575 (hash, validity)
576 }
577
578 pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
580 &self.validated_pool
581 }
582
583 pub fn clear_recently_pruned(&mut self) {
585 self.validated_pool.pool.write().clear_recently_pruned();
586 }
587}
588
589impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
590 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
594 let other: ValidatedPool<B, L> =
595 self.validated_pool().deep_clone_with_event_handler(event_handler);
596 Self { validated_pool: Arc::from(other) }
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use super::{super::base_pool::Limit, *};
603 use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
604 use assert_matches::assert_matches;
605 use base::TimedTransactionSource;
606 use codec::Encode;
607 use futures::executor::block_on;
608 use parking_lot::Mutex;
609 use sc_transaction_pool_api::TransactionStatus;
610 use sp_runtime::transaction_validity::TransactionSource;
611 use std::{collections::HashMap, time::Instant};
612 use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
613 use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
614
615 const SOURCE: TimedTransactionSource =
616 TimedTransactionSource { source: TransactionSource::External, timestamp: None };
617
618 type Pool<Api> = super::Pool<Api, ()>;
619
620 #[test]
621 fn should_validate_and_import_transaction() {
622 let (pool, api) = pool();
624
625 let hash = block_on(
627 pool.submit_one(
628 &api.expect_hash_and_number(0),
629 SOURCE,
630 uxt(Transfer {
631 from: Alice.into(),
632 to: AccountId::from_h256(H256::from_low_u64_be(2)),
633 amount: 5,
634 nonce: 0,
635 })
636 .into(),
637 ),
638 )
639 .map(|outcome| outcome.hash())
640 .unwrap();
641
642 assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
644 }
645
646 #[test]
647 fn submit_at_preserves_order() {
648 sp_tracing::try_init_simple();
649 let (pool, api) = pool();
651
652 let txs = (0..10)
653 .map(|i| {
654 uxt(Transfer {
655 from: Alice.into(),
656 to: AccountId::from_h256(H256::from_low_u64_be(i)),
657 amount: 5,
658 nonce: i,
659 })
660 .into()
661 })
662 .collect::<Vec<_>>();
663
664 let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
665
666 let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
668 let hashes = block_on(pool.submit_at(
669 &api.expect_hash_and_number(0),
670 txs,
671 ValidateTransactionPriority::Submitted,
672 ))
673 .into_iter()
674 .map(|r| r.map(|o| o.hash()))
675 .collect::<Vec<_>>();
676 debug!(hashes = ?hashes, "-->");
677
678 hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
680 |(result_hash, initial_hash)| {
681 assert_eq!(result_hash.unwrap(), initial_hash);
682 },
683 );
684 }
685
686 #[test]
687 fn should_reject_if_temporarily_banned() {
688 let (pool, api) = pool();
690 let uxt = uxt(Transfer {
691 from: Alice.into(),
692 to: AccountId::from_h256(H256::from_low_u64_be(2)),
693 amount: 5,
694 nonce: 0,
695 });
696
697 pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
699 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
700 .map(|o| o.hash());
701 assert_eq!(pool.validated_pool().status().ready, 0);
702 assert_eq!(pool.validated_pool().status().future, 0);
703
704 assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
706 }
707
708 #[test]
709 fn should_reject_unactionable_transactions() {
710 let api = Arc::new(TestApi::default());
712 let pool = Pool::new_with_staticly_sized_rotator(
713 Default::default(),
714 false.into(),
716 api.clone(),
717 );
718
719 let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
721
722 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
724 .map(|o| o.hash());
725
726 assert_matches!(res.unwrap_err(), error::Error::Unactionable);
728 }
729
730 #[test]
731 fn should_notify_about_pool_events() {
732 let (stream, hash0, hash1) = {
733 let (pool, api) = pool();
735 let han_of_block0 = api.expect_hash_and_number(0);
736 let stream = pool.validated_pool().import_notification_stream();
737
738 let hash0 = block_on(
740 pool.submit_one(
741 &han_of_block0,
742 SOURCE,
743 uxt(Transfer {
744 from: Alice.into(),
745 to: AccountId::from_h256(H256::from_low_u64_be(2)),
746 amount: 5,
747 nonce: 0,
748 })
749 .into(),
750 ),
751 )
752 .unwrap()
753 .hash();
754 let hash1 = block_on(
755 pool.submit_one(
756 &han_of_block0,
757 SOURCE,
758 uxt(Transfer {
759 from: Alice.into(),
760 to: AccountId::from_h256(H256::from_low_u64_be(2)),
761 amount: 5,
762 nonce: 1,
763 })
764 .into(),
765 ),
766 )
767 .unwrap()
768 .hash();
769 let _hash = block_on(
771 pool.submit_one(
772 &han_of_block0,
773 SOURCE,
774 uxt(Transfer {
775 from: Alice.into(),
776 to: AccountId::from_h256(H256::from_low_u64_be(2)),
777 amount: 5,
778 nonce: 3,
779 })
780 .into(),
781 ),
782 )
783 .unwrap()
784 .hash();
785
786 assert_eq!(pool.validated_pool().status().ready, 2);
787 assert_eq!(pool.validated_pool().status().future, 1);
788
789 (stream, hash0, hash1)
790 };
791
792 let mut it = futures::executor::block_on_stream(stream);
794 assert_eq!(it.next(), Some(hash0));
795 assert_eq!(it.next(), Some(hash1));
796 assert_eq!(it.next(), None);
797 }
798
799 #[test]
800 fn should_clear_stale_transactions() {
801 let (pool, api) = pool();
803 let han_of_block0 = api.expect_hash_and_number(0);
804 let hash1 = block_on(
805 pool.submit_one(
806 &han_of_block0,
807 SOURCE,
808 uxt(Transfer {
809 from: Alice.into(),
810 to: AccountId::from_h256(H256::from_low_u64_be(2)),
811 amount: 5,
812 nonce: 0,
813 })
814 .into(),
815 ),
816 )
817 .unwrap()
818 .hash();
819 let hash2 = block_on(
820 pool.submit_one(
821 &han_of_block0,
822 SOURCE,
823 uxt(Transfer {
824 from: Alice.into(),
825 to: AccountId::from_h256(H256::from_low_u64_be(2)),
826 amount: 5,
827 nonce: 1,
828 })
829 .into(),
830 ),
831 )
832 .unwrap()
833 .hash();
834 let hash3 = block_on(
835 pool.submit_one(
836 &han_of_block0,
837 SOURCE,
838 uxt(Transfer {
839 from: Alice.into(),
840 to: AccountId::from_h256(H256::from_low_u64_be(2)),
841 amount: 5,
842 nonce: 3,
843 })
844 .into(),
845 ),
846 )
847 .unwrap()
848 .hash();
849
850 pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
852
853 assert_eq!(pool.validated_pool().ready().count(), 0);
855 assert_eq!(pool.validated_pool().status().future, 0);
856 assert_eq!(pool.validated_pool().status().ready, 0);
857 assert!(pool.validated_pool.is_banned(&hash1));
859 assert!(pool.validated_pool.is_banned(&hash2));
860 assert!(pool.validated_pool.is_banned(&hash3));
861 }
862
863 #[test]
864 fn should_ban_mined_transactions() {
865 let (pool, api) = pool();
867 let hash1 = block_on(
868 pool.submit_one(
869 &api.expect_hash_and_number(0),
870 SOURCE,
871 uxt(Transfer {
872 from: Alice.into(),
873 to: AccountId::from_h256(H256::from_low_u64_be(2)),
874 amount: 5,
875 nonce: 0,
876 })
877 .into(),
878 ),
879 )
880 .unwrap()
881 .hash();
882
883 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
885
886 assert!(pool.validated_pool.is_banned(&hash1));
888 }
889
890 #[test]
891 fn should_limit_futures() {
892 sp_tracing::try_init_simple();
893
894 let xt = uxt(Transfer {
895 from: Alice.into(),
896 to: AccountId::from_h256(H256::from_low_u64_be(2)),
897 amount: 5,
898 nonce: 1,
899 });
900
901 let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
903
904 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
905
906 let api = Arc::new(TestApi::default());
907 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
908
909 let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
910 .unwrap()
911 .hash();
912 assert_eq!(pool.validated_pool().status().future, 1);
913
914 let hash2 = block_on(
916 pool.submit_one(
917 &api.expect_hash_and_number(0),
918 SOURCE,
919 uxt(Transfer {
920 from: Bob.into(),
921 to: AccountId::from_h256(H256::from_low_u64_be(2)),
922 amount: 5,
923 nonce: 10,
924 })
925 .into(),
926 ),
927 )
928 .unwrap()
929 .hash();
930
931 assert_eq!(pool.validated_pool().status().future, 1);
933 assert!(pool.validated_pool.is_banned(&hash1));
934 assert!(!pool.validated_pool.is_banned(&hash2));
935 }
936
937 #[test]
938 fn should_error_if_reject_immediately() {
939 let limit = Limit { count: 100, total_bytes: 10 };
941
942 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
943
944 let api = Arc::new(TestApi::default());
945 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
946
947 block_on(
949 pool.submit_one(
950 &api.expect_hash_and_number(0),
951 SOURCE,
952 uxt(Transfer {
953 from: Alice.into(),
954 to: AccountId::from_h256(H256::from_low_u64_be(2)),
955 amount: 5,
956 nonce: 1,
957 })
958 .into(),
959 ),
960 )
961 .map(|o| o.hash())
962 .unwrap_err();
963
964 assert_eq!(pool.validated_pool().status().ready, 0);
966 assert_eq!(pool.validated_pool().status().future, 0);
967 }
968
969 #[test]
970 fn should_reject_transactions_with_no_provides() {
971 let (pool, api) = pool();
973
974 let err = block_on(
976 pool.submit_one(
977 &api.expect_hash_and_number(0),
978 SOURCE,
979 uxt(Transfer {
980 from: Alice.into(),
981 to: AccountId::from_h256(H256::from_low_u64_be(2)),
982 amount: 5,
983 nonce: INVALID_NONCE,
984 })
985 .into(),
986 ),
987 )
988 .map(|o| o.hash())
989 .unwrap_err();
990
991 assert_eq!(pool.validated_pool().status().ready, 0);
993 assert_eq!(pool.validated_pool().status().future, 0);
994 assert_matches!(err, error::Error::NoTagsProvided);
995 }
996
997 mod listener {
998 use super::*;
999
1000 #[test]
1001 fn should_trigger_ready_and_finalized() {
1002 let (pool, api) = pool();
1004 let watcher = block_on(
1005 pool.submit_and_watch(
1006 &api.expect_hash_and_number(0),
1007 SOURCE,
1008 uxt(Transfer {
1009 from: Alice.into(),
1010 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1011 amount: 5,
1012 nonce: 0,
1013 })
1014 .into(),
1015 ),
1016 )
1017 .unwrap()
1018 .expect_watcher();
1019 assert_eq!(pool.validated_pool().status().ready, 1);
1020 assert_eq!(pool.validated_pool().status().future, 0);
1021
1022 let han_of_block2 = api.expect_hash_and_number(2);
1023
1024 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1026 assert_eq!(pool.validated_pool().status().ready, 0);
1027 assert_eq!(pool.validated_pool().status().future, 0);
1028
1029 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1031 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1032 assert_eq!(
1033 stream.next(),
1034 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1035 );
1036 }
1037
1038 #[test]
1039 fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1040 let (pool, api) = pool();
1042 let watcher = block_on(
1043 pool.submit_and_watch(
1044 &api.expect_hash_and_number(0),
1045 SOURCE,
1046 uxt(Transfer {
1047 from: Alice.into(),
1048 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1049 amount: 5,
1050 nonce: 0,
1051 })
1052 .into(),
1053 ),
1054 )
1055 .unwrap()
1056 .expect_watcher();
1057 assert_eq!(pool.validated_pool().status().ready, 1);
1058 assert_eq!(pool.validated_pool().status().future, 0);
1059
1060 let han_of_block2 = api.expect_hash_and_number(2);
1061
1062 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1064 assert_eq!(pool.validated_pool().status().ready, 0);
1065 assert_eq!(pool.validated_pool().status().future, 0);
1066
1067 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1069 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1070 assert_eq!(
1071 stream.next(),
1072 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1073 );
1074 }
1075
1076 #[test]
1077 fn should_trigger_future_and_ready_after_promoted() {
1078 let (pool, api) = pool();
1080 let han_of_block0 = api.expect_hash_and_number(0);
1081
1082 let watcher = block_on(
1083 pool.submit_and_watch(
1084 &han_of_block0,
1085 SOURCE,
1086 uxt(Transfer {
1087 from: Alice.into(),
1088 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1089 amount: 5,
1090 nonce: 1,
1091 })
1092 .into(),
1093 ),
1094 )
1095 .unwrap()
1096 .expect_watcher();
1097 assert_eq!(pool.validated_pool().status().ready, 0);
1098 assert_eq!(pool.validated_pool().status().future, 1);
1099
1100 block_on(
1102 pool.submit_one(
1103 &han_of_block0,
1104 SOURCE,
1105 uxt(Transfer {
1106 from: Alice.into(),
1107 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1108 amount: 5,
1109 nonce: 0,
1110 })
1111 .into(),
1112 ),
1113 )
1114 .unwrap();
1115 assert_eq!(pool.validated_pool().status().ready, 2);
1116
1117 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1119 assert_eq!(stream.next(), Some(TransactionStatus::Future));
1120 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1121 }
1122
1123 #[test]
1124 fn should_trigger_invalid_and_ban() {
1125 let (pool, api) = pool();
1127 let uxt = uxt(Transfer {
1128 from: Alice.into(),
1129 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1130 amount: 5,
1131 nonce: 0,
1132 });
1133 let watcher =
1134 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1135 .unwrap()
1136 .expect_watcher();
1137 assert_eq!(pool.validated_pool().status().ready, 1);
1138
1139 pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1141
1142 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1144 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1145 assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1146 assert_eq!(stream.next(), None);
1147 }
1148
1149 #[test]
1150 fn should_trigger_broadcasted() {
1151 let (pool, api) = pool();
1153 let uxt = uxt(Transfer {
1154 from: Alice.into(),
1155 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1156 amount: 5,
1157 nonce: 0,
1158 });
1159 let watcher =
1160 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1161 .unwrap()
1162 .expect_watcher();
1163 assert_eq!(pool.validated_pool().status().ready, 1);
1164
1165 let mut map = HashMap::new();
1167 let peers = vec!["a".into(), "b".into(), "c".into()];
1168 map.insert(*watcher.hash(), peers.clone());
1169 pool.validated_pool().on_broadcasted(map);
1170
1171 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1173 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1174 assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1175 }
1176
1177 #[test]
1178 fn should_trigger_dropped_older() {
1179 let limit = Limit { count: 1, total_bytes: 1000 };
1181 let options =
1182 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1183
1184 let api = Arc::new(TestApi::default());
1185 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1186
1187 let xt = uxt(Transfer {
1188 from: Alice.into(),
1189 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1190 amount: 5,
1191 nonce: 0,
1192 });
1193 let watcher =
1194 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1195 .unwrap()
1196 .expect_watcher();
1197 assert_eq!(pool.validated_pool().status().ready, 1);
1198
1199 let xt = uxt(Transfer {
1201 from: Bob.into(),
1202 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1203 amount: 4,
1204 nonce: 1,
1205 });
1206 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1207 assert_eq!(pool.validated_pool().status().ready, 1);
1208
1209 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1211 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1212 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1213 }
1214
1215 #[test]
1216 fn should_trigger_dropped_lower_priority() {
1217 {
1218 let limit = Limit { count: 1, total_bytes: 1000 };
1220 let options =
1221 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1222
1223 let api = Arc::new(TestApi::default());
1224 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1225
1226 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1229 block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1230 .unwrap();
1231 assert_eq!(pool.validated_pool().status().ready, 1);
1232
1233 let xt = uxt(Transfer {
1237 from: Bob.into(),
1238 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1239 amount: 4,
1240 nonce: 1,
1241 });
1242 let result =
1243 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1244 assert!(matches!(
1245 result,
1246 Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1247 ));
1248 }
1249 {
1250 let limit = Limit { count: 2, total_bytes: 1000 };
1252 let options =
1253 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1254
1255 let api = Arc::new(TestApi::default());
1256 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1257
1258 let han_of_block0 = api.expect_hash_and_number(0);
1259
1260 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1263 block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1264 .unwrap()
1265 .expect_watcher();
1266 assert_eq!(pool.validated_pool().status().ready, 1);
1267
1268 let xt = uxt(Transfer {
1271 from: Alice.into(),
1272 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1273 amount: 5,
1274 nonce: 0,
1275 });
1276 let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1277 .unwrap()
1278 .expect_watcher();
1279 assert_eq!(pool.validated_pool().status().ready, 2);
1280
1281 let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1285 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1286 .unwrap();
1287 assert_eq!(pool.validated_pool().status().ready, 2);
1288
1289 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1291 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1292 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1293 }
1294 }
1295
1296 #[test]
1297 fn should_handle_pruning_in_the_middle_of_import() {
1298 let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1300 let (tx, rx) = std::sync::mpsc::sync_channel(1);
1301 let mut api = TestApi::default();
1302 api.delay = Arc::new(Mutex::new(rx.into()));
1303 let api = Arc::new(api);
1304 let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1305 Default::default(),
1306 true.into(),
1307 api.clone(),
1308 ));
1309
1310 let han_of_block0 = api.expect_hash_and_number(0);
1311
1312 let xt = uxt(Transfer {
1314 from: Alice.into(),
1315 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1316 amount: 5,
1317 nonce: 1,
1318 });
1319
1320 let pool2 = pool.clone();
1322 std::thread::spawn({
1323 let hash_of_block0 = han_of_block0.clone();
1324 move || {
1325 block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1326 ready.send(()).unwrap();
1327 }
1328 });
1329
1330 let xt = uxt(Transfer {
1333 from: Alice.into(),
1334 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1335 amount: 4,
1336 nonce: 0,
1337 });
1338 let provides = vec![0_u8];
1340 block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1341 assert_eq!(pool.validated_pool().status().ready, 1);
1342
1343 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1346 assert_eq!(pool.validated_pool().status().ready, 0);
1347
1348 tx.send(()).unwrap();
1352
1353 is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
1356 assert_eq!(pool.validated_pool().status().future, 0);
1357 }
1358 }
1359}