1use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
20use async_trait::async_trait;
21use futures::channel::mpsc::Receiver;
22use indexmap::IndexMap;
23use sc_transaction_pool_api::error;
24use sp_blockchain::{HashAndNumber, TreeRoute};
25use sp_runtime::{
26 generic::BlockId,
27 traits::{self, Block as BlockT, SaturatedConversion},
28 transaction_validity::{
29 TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30 },
31};
32use std::{
33 collections::HashMap,
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use tracing::{debug, instrument, trace, Level};
38
39use super::{
40 base_pool as base,
41 validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
42 EventHandler, ValidatedPoolSubmitOutcome,
43};
44
45pub type EventStream<H> = Receiver<H>;
47
48pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
52pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
54pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
56pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
58pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
60pub type ValidatedTransactionFor<A> =
62 ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
63
64#[derive(PartialEq, Copy, Clone)]
66pub enum ValidateTransactionPriority {
67 Submitted,
71 Maintained,
75}
76
77#[async_trait]
79pub trait ChainApi: Send + Sync {
80 type Block: BlockT;
82 type Error: From<error::Error> + error::IntoPoolError + error::IntoMetricsLabel;
84
85 async fn validate_transaction(
87 &self,
88 at: <Self::Block as BlockT>::Hash,
89 source: TransactionSource,
90 uxt: ExtrinsicFor<Self>,
91 validation_priority: ValidateTransactionPriority,
92 ) -> Result<TransactionValidity, Self::Error>;
93
94 fn validate_transaction_blocking(
99 &self,
100 at: <Self::Block as BlockT>::Hash,
101 source: TransactionSource,
102 uxt: ExtrinsicFor<Self>,
103 ) -> Result<TransactionValidity, Self::Error>;
104
105 fn block_id_to_number(
107 &self,
108 at: &BlockId<Self::Block>,
109 ) -> Result<Option<NumberFor<Self>>, Self::Error>;
110
111 fn block_id_to_hash(
113 &self,
114 at: &BlockId<Self::Block>,
115 ) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
116
117 fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
119
120 async fn block_body(
122 &self,
123 at: <Self::Block as BlockT>::Hash,
124 ) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
125
126 fn block_header(
128 &self,
129 at: <Self::Block as BlockT>::Hash,
130 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
131
132 fn tree_route(
134 &self,
135 from: <Self::Block as BlockT>::Hash,
136 to: <Self::Block as BlockT>::Hash,
137 ) -> Result<TreeRoute<Self::Block>, Self::Error>;
138
139 fn resolve_block_number(
141 &self,
142 at: <Self::Block as BlockT>::Hash,
143 ) -> Result<NumberFor<Self>, Self::Error> {
144 self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
145 number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
146 })
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct Options {
153 pub ready: base::Limit,
155 pub future: base::Limit,
157 pub reject_future_transactions: bool,
159 pub ban_time: Duration,
161}
162
163impl Default for Options {
164 fn default() -> Self {
165 Self {
166 ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
167 future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
168 reject_future_transactions: false,
169 ban_time: Duration::from_secs(60 * 30),
170 }
171 }
172}
173
174impl Options {
175 pub fn total_count(&self) -> usize {
177 self.ready.count + self.future.count
178 }
179}
180
181#[derive(Copy, Clone)]
184pub(crate) enum CheckBannedBeforeVerify {
185 Yes,
186 No,
187}
188
189pub struct Pool<B: ChainApi, L: EventHandler<B>> {
191 validated_pool: Arc<ValidatedPool<B, L>>,
192}
193
194impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
195 pub fn new_with_staticly_sized_rotator(
197 options: Options,
198 is_validator: IsValidator,
199 api: Arc<B>,
200 ) -> Self {
201 Self {
202 validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
203 options,
204 is_validator,
205 api,
206 )),
207 }
208 }
209
210 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212 Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
213 }
214
215 pub fn new_with_event_handler(
217 options: Options,
218 is_validator: IsValidator,
219 api: Arc<B>,
220 event_handler: L,
221 ) -> Self {
222 Self {
223 validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
224 options,
225 is_validator,
226 api,
227 event_handler,
228 )),
229 }
230 }
231
232 #[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
234 pub async fn submit_at(
235 &self,
236 at: &HashAndNumber<B::Block>,
237 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
238 validation_priority: ValidateTransactionPriority,
239 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
240 let validated_transactions =
241 self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
242 self.validated_pool.submit(validated_transactions.into_values())
243 }
244
245 pub async fn resubmit_at(
249 &self,
250 at: &HashAndNumber<B::Block>,
251 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
252 validation_priority: ValidateTransactionPriority,
253 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
254 let validated_transactions =
255 self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
256 self.validated_pool.submit(validated_transactions.into_values())
257 }
258
259 pub async fn submit_one(
261 &self,
262 at: &HashAndNumber<B::Block>,
263 source: base::TimedTransactionSource,
264 xt: ExtrinsicFor<B>,
265 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
266 let res = self
267 .submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
268 .await
269 .pop();
270 res.expect("One extrinsic passed; one result returned; qed")
271 }
272
273 pub async fn submit_and_watch(
275 &self,
276 at: &HashAndNumber<B::Block>,
277 source: base::TimedTransactionSource,
278 xt: ExtrinsicFor<B>,
279 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
280 let (_, tx) = self
281 .verify_one(
282 at.hash,
283 at.number,
284 source,
285 xt,
286 CheckBannedBeforeVerify::Yes,
287 ValidateTransactionPriority::Submitted,
288 )
289 .await;
290 self.validated_pool.submit_and_watch(tx)
291 }
292
293 pub fn resubmit(
295 &self,
296 revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
297 ) {
298 let now = Instant::now();
299 self.validated_pool.resubmit(revalidated_transactions);
300 trace!(
301 target: LOG_TARGET,
302 duration = ?now.elapsed(),
303 status = ?self.validated_pool.status(),
304 "Resubmitted transaction."
305 );
306 }
307
308 pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
314 let in_pool_tags =
316 self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
317
318 let prune_status = self.validated_pool.prune_tags(in_pool_tags);
320 let pruned_transactions =
321 hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
322 self.validated_pool.fire_pruned(at, pruned_transactions);
323 }
324
325 pub async fn prune(
332 &self,
333 at: &HashAndNumber<B::Block>,
334 parent: <B::Block as BlockT>::Hash,
335 extrinsics: &[RawExtrinsicFor<B>],
336 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
337 ) {
338 debug!(
339 target: LOG_TARGET,
340 ?at,
341 extrinsics_count = extrinsics.len(),
342 "Starting pruning of block."
343 );
344 let in_pool_hashes =
346 extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
347 let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
348 let mut unknown_txs_count = 0usize;
350 let mut reused_txs_count = 0usize;
351 let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
352 tags.or_else(|| {
353 unknown_txs_count += 1;
354 known_provides_tags.as_ref().and_then(|inner| {
355 inner.get(&tx_hash).map(|found_tags| {
356 reused_txs_count += 1;
357 found_tags.clone()
358 })
359 })
360 })
361 });
362
363 let all = extrinsics.iter().zip(tags);
366 let mut validated_counter: usize = 0;
367 let mut future_tags = Vec::new();
368 let now = Instant::now();
369 for (extrinsic, in_pool_tags) in all {
370 match in_pool_tags {
371 Some(tags) => future_tags.extend(tags),
374 None => {
377 if !self.validated_pool.status().is_empty() {
379 validated_counter = validated_counter + 1;
380 let validity = self
381 .validated_pool
382 .api()
383 .validate_transaction(
384 parent,
385 TransactionSource::InBlock,
386 Arc::from(extrinsic.clone()),
387 ValidateTransactionPriority::Maintained,
388 )
389 .await;
390
391 trace!(
392 target: LOG_TARGET,
393 tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
394 ?validity,
395 "prune::revalidated"
396 );
397 if let Ok(Ok(validity)) = validity {
398 future_tags.extend(validity.provides);
399 }
400 } else {
401 trace!(
402 target: LOG_TARGET,
403 ?at,
404 "txpool is empty, skipping validation for block",
405 );
406 }
407 },
408 }
409 }
410
411 let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
412 debug!(
413 target: LOG_TARGET,
414 validated_counter,
415 known_provides_tags_len,
416 unknown_txs_count,
417 reused_txs_count,
418 duration = ?now.elapsed(),
419 "prune"
420 );
421 self.prune_tags(at, future_tags, in_pool_hashes).await
422 }
423
424 pub async fn prune_tags(
446 &self,
447 at: &HashAndNumber<B::Block>,
448 tags: impl IntoIterator<Item = Tag>,
449 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
450 ) {
451 let now = Instant::now();
452 trace!(target: LOG_TARGET, ?at, "Pruning tags.");
453 let prune_status = self.validated_pool.prune_tags(tags);
455
456 self.validated_pool
460 .ban(&Instant::now(), known_imported_hashes.clone().into_iter());
461
462 let pruned_transactions =
465 prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
466
467 let reverified_transactions = self
468 .verify(
469 at,
470 pruned_transactions,
471 CheckBannedBeforeVerify::Yes,
472 ValidateTransactionPriority::Maintained,
473 )
474 .await;
475
476 let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
477 debug!(
478 target: LOG_TARGET,
479 ?at,
480 reverified_transactions = reverified_transactions.len(),
481 duration = ?now.elapsed(),
482 "Pruned. Resubmitting transactions."
483 );
484 log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
485
486 self.validated_pool.resubmit_pruned(
488 &at,
489 known_imported_hashes,
490 pruned_hashes,
491 reverified_transactions.into_values().collect(),
492 )
493 }
494
495 pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
497 self.validated_pool.api().hash_and_length(xt).0
498 }
499
500 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
502 async fn verify(
503 &self,
504 at: &HashAndNumber<B::Block>,
505 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
506 check: CheckBannedBeforeVerify,
507 validation_priority: ValidateTransactionPriority,
508 ) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
509 let HashAndNumber { number, hash } = *at;
510
511 let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
512 self.verify_one(hash, number, source, xt, check, validation_priority)
513 }))
514 .await
515 .into_iter()
516 .collect::<IndexMap<_, _>>();
517
518 res
519 }
520
521 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
523 pub(crate) async fn verify_one(
524 &self,
525 block_hash: <B::Block as BlockT>::Hash,
526 block_number: NumberFor<B>,
527 source: base::TimedTransactionSource,
528 xt: ExtrinsicFor<B>,
529 check: CheckBannedBeforeVerify,
530 validation_priority: ValidateTransactionPriority,
531 ) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
532 let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
533
534 let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
535 if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
536 return (hash, ValidatedTransaction::Invalid(hash, err));
537 }
538
539 let validation_result = self
540 .validated_pool
541 .api()
542 .validate_transaction(
543 block_hash,
544 source.clone().into(),
545 xt.clone(),
546 validation_priority,
547 )
548 .await;
549
550 let status = match validation_result {
551 Ok(status) => status,
552 Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
553 };
554
555 let validity = match status {
556 Ok(validity) => {
557 if validity.provides.is_empty() {
558 ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
559 } else {
560 ValidatedTransaction::valid_at(
561 block_number.saturated_into::<u64>(),
562 hash,
563 source,
564 xt,
565 bytes,
566 validity,
567 )
568 }
569 },
570 Err(TransactionValidityError::Invalid(e)) => {
571 ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into())
572 },
573 Err(TransactionValidityError::Unknown(e)) => {
574 ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into())
575 },
576 };
577
578 (hash, validity)
579 }
580
581 pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
583 &self.validated_pool
584 }
585
586 pub fn clear_recently_pruned(&mut self) {
588 self.validated_pool.pool.write().clear_recently_pruned();
589 }
590}
591
592impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
593 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
597 let other: ValidatedPool<B, L> =
598 self.validated_pool().deep_clone_with_event_handler(event_handler);
599 Self { validated_pool: Arc::from(other) }
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::{super::base_pool::Limit, *};
606 use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
607 use assert_matches::assert_matches;
608 use base::TimedTransactionSource;
609 use codec::Encode;
610 use futures::executor::block_on;
611 use parking_lot::Mutex;
612 use sc_transaction_pool_api::TransactionStatus;
613 use sp_runtime::transaction_validity::TransactionSource;
614 use std::{collections::HashMap, time::Instant};
615 use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
616 use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
617
618 const SOURCE: TimedTransactionSource =
619 TimedTransactionSource { source: TransactionSource::External, timestamp: None };
620
621 type Pool<Api> = super::Pool<Api, ()>;
622
623 #[test]
624 fn should_validate_and_import_transaction() {
625 let (pool, api) = pool();
627
628 let hash = block_on(
630 pool.submit_one(
631 &api.expect_hash_and_number(0),
632 SOURCE,
633 uxt(Transfer {
634 from: Alice.into(),
635 to: AccountId::from_h256(H256::from_low_u64_be(2)),
636 amount: 5,
637 nonce: 0,
638 })
639 .into(),
640 ),
641 )
642 .map(|outcome| outcome.hash())
643 .unwrap();
644
645 assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
647 }
648
649 #[test]
650 fn submit_at_preserves_order() {
651 sp_tracing::try_init_simple();
652 let (pool, api) = pool();
654
655 let txs = (0..10)
656 .map(|i| {
657 uxt(Transfer {
658 from: Alice.into(),
659 to: AccountId::from_h256(H256::from_low_u64_be(i)),
660 amount: 5,
661 nonce: i,
662 })
663 .into()
664 })
665 .collect::<Vec<_>>();
666
667 let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
668
669 let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
671 let hashes = block_on(pool.submit_at(
672 &api.expect_hash_and_number(0),
673 txs,
674 ValidateTransactionPriority::Submitted,
675 ))
676 .into_iter()
677 .map(|r| r.map(|o| o.hash()))
678 .collect::<Vec<_>>();
679 debug!(hashes = ?hashes, "-->");
680
681 hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
683 |(result_hash, initial_hash)| {
684 assert_eq!(result_hash.unwrap(), initial_hash);
685 },
686 );
687 }
688
689 #[test]
690 fn should_reject_if_temporarily_banned() {
691 let (pool, api) = pool();
693 let uxt = uxt(Transfer {
694 from: Alice.into(),
695 to: AccountId::from_h256(H256::from_low_u64_be(2)),
696 amount: 5,
697 nonce: 0,
698 });
699
700 pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
702 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
703 .map(|o| o.hash());
704 assert_eq!(pool.validated_pool().status().ready, 0);
705 assert_eq!(pool.validated_pool().status().future, 0);
706
707 assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
709 }
710
711 #[test]
712 fn should_reject_unactionable_transactions() {
713 let api = Arc::new(TestApi::default());
715 let pool = Pool::new_with_staticly_sized_rotator(
716 Default::default(),
717 false.into(),
719 api.clone(),
720 );
721
722 let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
724
725 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
727 .map(|o| o.hash());
728
729 assert_matches!(res.unwrap_err(), error::Error::Unactionable);
731 }
732
733 #[test]
734 fn should_notify_about_pool_events() {
735 let (stream, hash0, hash1) = {
736 let (pool, api) = pool();
738 let han_of_block0 = api.expect_hash_and_number(0);
739 let stream = pool.validated_pool().import_notification_stream();
740
741 let hash0 = block_on(
743 pool.submit_one(
744 &han_of_block0,
745 SOURCE,
746 uxt(Transfer {
747 from: Alice.into(),
748 to: AccountId::from_h256(H256::from_low_u64_be(2)),
749 amount: 5,
750 nonce: 0,
751 })
752 .into(),
753 ),
754 )
755 .unwrap()
756 .hash();
757 let hash1 = block_on(
758 pool.submit_one(
759 &han_of_block0,
760 SOURCE,
761 uxt(Transfer {
762 from: Alice.into(),
763 to: AccountId::from_h256(H256::from_low_u64_be(2)),
764 amount: 5,
765 nonce: 1,
766 })
767 .into(),
768 ),
769 )
770 .unwrap()
771 .hash();
772 let _hash = block_on(
774 pool.submit_one(
775 &han_of_block0,
776 SOURCE,
777 uxt(Transfer {
778 from: Alice.into(),
779 to: AccountId::from_h256(H256::from_low_u64_be(2)),
780 amount: 5,
781 nonce: 3,
782 })
783 .into(),
784 ),
785 )
786 .unwrap()
787 .hash();
788
789 assert_eq!(pool.validated_pool().status().ready, 2);
790 assert_eq!(pool.validated_pool().status().future, 1);
791
792 (stream, hash0, hash1)
793 };
794
795 let mut it = futures::executor::block_on_stream(stream);
797 assert_eq!(it.next(), Some(hash0));
798 assert_eq!(it.next(), Some(hash1));
799 assert_eq!(it.next(), None);
800 }
801
802 #[test]
803 fn should_clear_stale_transactions() {
804 let (pool, api) = pool();
806 let han_of_block0 = api.expect_hash_and_number(0);
807 let hash1 = block_on(
808 pool.submit_one(
809 &han_of_block0,
810 SOURCE,
811 uxt(Transfer {
812 from: Alice.into(),
813 to: AccountId::from_h256(H256::from_low_u64_be(2)),
814 amount: 5,
815 nonce: 0,
816 })
817 .into(),
818 ),
819 )
820 .unwrap()
821 .hash();
822 let hash2 = block_on(
823 pool.submit_one(
824 &han_of_block0,
825 SOURCE,
826 uxt(Transfer {
827 from: Alice.into(),
828 to: AccountId::from_h256(H256::from_low_u64_be(2)),
829 amount: 5,
830 nonce: 1,
831 })
832 .into(),
833 ),
834 )
835 .unwrap()
836 .hash();
837 let hash3 = block_on(
838 pool.submit_one(
839 &han_of_block0,
840 SOURCE,
841 uxt(Transfer {
842 from: Alice.into(),
843 to: AccountId::from_h256(H256::from_low_u64_be(2)),
844 amount: 5,
845 nonce: 3,
846 })
847 .into(),
848 ),
849 )
850 .unwrap()
851 .hash();
852
853 pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
855
856 assert_eq!(pool.validated_pool().ready().count(), 0);
858 assert_eq!(pool.validated_pool().status().future, 0);
859 assert_eq!(pool.validated_pool().status().ready, 0);
860 assert!(pool.validated_pool.is_banned(&hash1));
862 assert!(pool.validated_pool.is_banned(&hash2));
863 assert!(pool.validated_pool.is_banned(&hash3));
864 }
865
866 #[test]
867 fn should_ban_mined_transactions() {
868 let (pool, api) = pool();
870 let hash1 = block_on(
871 pool.submit_one(
872 &api.expect_hash_and_number(0),
873 SOURCE,
874 uxt(Transfer {
875 from: Alice.into(),
876 to: AccountId::from_h256(H256::from_low_u64_be(2)),
877 amount: 5,
878 nonce: 0,
879 })
880 .into(),
881 ),
882 )
883 .unwrap()
884 .hash();
885
886 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
888
889 assert!(pool.validated_pool.is_banned(&hash1));
891 }
892
893 #[test]
894 fn should_limit_futures() {
895 sp_tracing::try_init_simple();
896
897 let xt = uxt(Transfer {
898 from: Alice.into(),
899 to: AccountId::from_h256(H256::from_low_u64_be(2)),
900 amount: 5,
901 nonce: 1,
902 });
903
904 let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
906
907 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
908
909 let api = Arc::new(TestApi::default());
910 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
911
912 let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
913 .unwrap()
914 .hash();
915 assert_eq!(pool.validated_pool().status().future, 1);
916
917 let hash2 = block_on(
919 pool.submit_one(
920 &api.expect_hash_and_number(0),
921 SOURCE,
922 uxt(Transfer {
923 from: Bob.into(),
924 to: AccountId::from_h256(H256::from_low_u64_be(2)),
925 amount: 5,
926 nonce: 10,
927 })
928 .into(),
929 ),
930 )
931 .unwrap()
932 .hash();
933
934 assert_eq!(pool.validated_pool().status().future, 1);
936 assert!(pool.validated_pool.is_banned(&hash1));
937 assert!(!pool.validated_pool.is_banned(&hash2));
938 }
939
940 #[test]
941 fn should_error_if_reject_immediately() {
942 let limit = Limit { count: 100, total_bytes: 10 };
944
945 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
946
947 let api = Arc::new(TestApi::default());
948 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
949
950 block_on(
952 pool.submit_one(
953 &api.expect_hash_and_number(0),
954 SOURCE,
955 uxt(Transfer {
956 from: Alice.into(),
957 to: AccountId::from_h256(H256::from_low_u64_be(2)),
958 amount: 5,
959 nonce: 1,
960 })
961 .into(),
962 ),
963 )
964 .map(|o| o.hash())
965 .unwrap_err();
966
967 assert_eq!(pool.validated_pool().status().ready, 0);
969 assert_eq!(pool.validated_pool().status().future, 0);
970 }
971
972 #[test]
973 fn should_reject_transactions_with_no_provides() {
974 let (pool, api) = pool();
976
977 let err = block_on(
979 pool.submit_one(
980 &api.expect_hash_and_number(0),
981 SOURCE,
982 uxt(Transfer {
983 from: Alice.into(),
984 to: AccountId::from_h256(H256::from_low_u64_be(2)),
985 amount: 5,
986 nonce: INVALID_NONCE,
987 })
988 .into(),
989 ),
990 )
991 .map(|o| o.hash())
992 .unwrap_err();
993
994 assert_eq!(pool.validated_pool().status().ready, 0);
996 assert_eq!(pool.validated_pool().status().future, 0);
997 assert_matches!(err, error::Error::NoTagsProvided);
998 }
999
1000 mod listener {
1001 use super::*;
1002
1003 #[test]
1004 fn should_trigger_ready_and_finalized() {
1005 let (pool, api) = pool();
1007 let watcher = block_on(
1008 pool.submit_and_watch(
1009 &api.expect_hash_and_number(0),
1010 SOURCE,
1011 uxt(Transfer {
1012 from: Alice.into(),
1013 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1014 amount: 5,
1015 nonce: 0,
1016 })
1017 .into(),
1018 ),
1019 )
1020 .unwrap()
1021 .expect_watcher();
1022 assert_eq!(pool.validated_pool().status().ready, 1);
1023 assert_eq!(pool.validated_pool().status().future, 0);
1024
1025 let han_of_block2 = api.expect_hash_and_number(2);
1026
1027 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1029 assert_eq!(pool.validated_pool().status().ready, 0);
1030 assert_eq!(pool.validated_pool().status().future, 0);
1031
1032 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1034 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1035 assert_eq!(
1036 stream.next(),
1037 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1038 );
1039 }
1040
1041 #[test]
1042 fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1043 let (pool, api) = pool();
1045 let watcher = block_on(
1046 pool.submit_and_watch(
1047 &api.expect_hash_and_number(0),
1048 SOURCE,
1049 uxt(Transfer {
1050 from: Alice.into(),
1051 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1052 amount: 5,
1053 nonce: 0,
1054 })
1055 .into(),
1056 ),
1057 )
1058 .unwrap()
1059 .expect_watcher();
1060 assert_eq!(pool.validated_pool().status().ready, 1);
1061 assert_eq!(pool.validated_pool().status().future, 0);
1062
1063 let han_of_block2 = api.expect_hash_and_number(2);
1064
1065 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1067 assert_eq!(pool.validated_pool().status().ready, 0);
1068 assert_eq!(pool.validated_pool().status().future, 0);
1069
1070 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1072 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1073 assert_eq!(
1074 stream.next(),
1075 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1076 );
1077 }
1078
1079 #[test]
1080 fn should_trigger_future_and_ready_after_promoted() {
1081 let (pool, api) = pool();
1083 let han_of_block0 = api.expect_hash_and_number(0);
1084
1085 let watcher = block_on(
1086 pool.submit_and_watch(
1087 &han_of_block0,
1088 SOURCE,
1089 uxt(Transfer {
1090 from: Alice.into(),
1091 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1092 amount: 5,
1093 nonce: 1,
1094 })
1095 .into(),
1096 ),
1097 )
1098 .unwrap()
1099 .expect_watcher();
1100 assert_eq!(pool.validated_pool().status().ready, 0);
1101 assert_eq!(pool.validated_pool().status().future, 1);
1102
1103 block_on(
1105 pool.submit_one(
1106 &han_of_block0,
1107 SOURCE,
1108 uxt(Transfer {
1109 from: Alice.into(),
1110 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1111 amount: 5,
1112 nonce: 0,
1113 })
1114 .into(),
1115 ),
1116 )
1117 .unwrap();
1118 assert_eq!(pool.validated_pool().status().ready, 2);
1119
1120 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1122 assert_eq!(stream.next(), Some(TransactionStatus::Future));
1123 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1124 }
1125
1126 #[test]
1127 fn should_trigger_invalid_and_ban() {
1128 let (pool, api) = pool();
1130 let uxt = uxt(Transfer {
1131 from: Alice.into(),
1132 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1133 amount: 5,
1134 nonce: 0,
1135 });
1136 let watcher =
1137 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1138 .unwrap()
1139 .expect_watcher();
1140 assert_eq!(pool.validated_pool().status().ready, 1);
1141
1142 pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1144
1145 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1147 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1148 assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1149 assert_eq!(stream.next(), None);
1150 }
1151
1152 #[test]
1153 fn should_trigger_broadcasted() {
1154 let (pool, api) = pool();
1156 let uxt = uxt(Transfer {
1157 from: Alice.into(),
1158 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1159 amount: 5,
1160 nonce: 0,
1161 });
1162 let watcher =
1163 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1164 .unwrap()
1165 .expect_watcher();
1166 assert_eq!(pool.validated_pool().status().ready, 1);
1167
1168 let mut map = HashMap::new();
1170 let peers = vec!["a".into(), "b".into(), "c".into()];
1171 map.insert(*watcher.hash(), peers.clone());
1172 pool.validated_pool().on_broadcasted(map);
1173
1174 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1176 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1177 assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1178 }
1179
1180 #[test]
1181 fn should_trigger_dropped_older() {
1182 let limit = Limit { count: 1, total_bytes: 1000 };
1184 let options =
1185 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1186
1187 let api = Arc::new(TestApi::default());
1188 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1189
1190 let xt = uxt(Transfer {
1191 from: Alice.into(),
1192 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1193 amount: 5,
1194 nonce: 0,
1195 });
1196 let watcher =
1197 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1198 .unwrap()
1199 .expect_watcher();
1200 assert_eq!(pool.validated_pool().status().ready, 1);
1201
1202 let xt = uxt(Transfer {
1204 from: Bob.into(),
1205 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1206 amount: 4,
1207 nonce: 1,
1208 });
1209 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1210 assert_eq!(pool.validated_pool().status().ready, 1);
1211
1212 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1214 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1215 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1216 }
1217
1218 #[test]
1219 fn should_trigger_dropped_lower_priority() {
1220 {
1221 let limit = Limit { count: 1, total_bytes: 1000 };
1223 let options =
1224 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1225
1226 let api = Arc::new(TestApi::default());
1227 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1228
1229 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1232 block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1233 .unwrap();
1234 assert_eq!(pool.validated_pool().status().ready, 1);
1235
1236 let xt = uxt(Transfer {
1240 from: Bob.into(),
1241 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1242 amount: 4,
1243 nonce: 1,
1244 });
1245 let result =
1246 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1247 assert!(matches!(
1248 result,
1249 Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1250 ));
1251 }
1252 {
1253 let limit = Limit { count: 2, total_bytes: 1000 };
1255 let options =
1256 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1257
1258 let api = Arc::new(TestApi::default());
1259 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1260
1261 let han_of_block0 = api.expect_hash_and_number(0);
1262
1263 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1266 block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1267 .unwrap()
1268 .expect_watcher();
1269 assert_eq!(pool.validated_pool().status().ready, 1);
1270
1271 let xt = uxt(Transfer {
1274 from: Alice.into(),
1275 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1276 amount: 5,
1277 nonce: 0,
1278 });
1279 let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1280 .unwrap()
1281 .expect_watcher();
1282 assert_eq!(pool.validated_pool().status().ready, 2);
1283
1284 let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1288 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1289 .unwrap();
1290 assert_eq!(pool.validated_pool().status().ready, 2);
1291
1292 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1294 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1295 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1296 }
1297 }
1298
1299 #[test]
1300 fn should_handle_pruning_in_the_middle_of_import() {
1301 let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1303 let (tx, rx) = std::sync::mpsc::sync_channel(1);
1304 let mut api = TestApi::default();
1305 api.delay = Arc::new(Mutex::new(rx.into()));
1306 let api = Arc::new(api);
1307 let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1308 Default::default(),
1309 true.into(),
1310 api.clone(),
1311 ));
1312
1313 let han_of_block0 = api.expect_hash_and_number(0);
1314
1315 let xt = uxt(Transfer {
1317 from: Alice.into(),
1318 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1319 amount: 5,
1320 nonce: 1,
1321 });
1322
1323 let pool2 = pool.clone();
1325 std::thread::spawn({
1326 let hash_of_block0 = han_of_block0.clone();
1327 move || {
1328 block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1329 ready.send(()).unwrap();
1330 }
1331 });
1332
1333 let xt = uxt(Transfer {
1336 from: Alice.into(),
1337 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1338 amount: 4,
1339 nonce: 0,
1340 });
1341 let provides = vec![0_u8];
1343 block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1344 assert_eq!(pool.validated_pool().status().ready, 1);
1345
1346 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1349 assert_eq!(pool.validated_pool().status().ready, 0);
1350
1351 tx.send(()).unwrap();
1355
1356 is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
1359 assert_eq!(pool.validated_pool().status().future, 0);
1360 }
1361 }
1362}