1use std::{collections::HashMap, sync::Arc, time::Duration};
20
21use crate::LOG_TARGET;
22use futures::{channel::mpsc::Receiver, Future};
23use sc_transaction_pool_api::error;
24use sp_blockchain::TreeRoute;
25use sp_runtime::{
26 generic::BlockId,
27 traits::{self, Block as BlockT, SaturatedConversion},
28 transaction_validity::{
29 TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30 },
31};
32use std::time::Instant;
33
34use super::{
35 base_pool as base,
36 validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
37 watcher::Watcher,
38};
39
40pub type EventStream<H> = Receiver<H>;
42
43pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
45pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
47pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
49pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
51pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
53pub type ValidatedTransactionFor<A> =
55 ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
56
57pub trait ChainApi: Send + Sync {
59 type Block: BlockT;
61 type Error: From<error::Error> + error::IntoPoolError;
63 type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
65 type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
67 + Unpin
68 + Send
69 + 'static;
70
71 fn validate_transaction(
73 &self,
74 at: <Self::Block as BlockT>::Hash,
75 source: TransactionSource,
76 uxt: ExtrinsicFor<Self>,
77 ) -> Self::ValidationFuture;
78
79 fn block_id_to_number(
81 &self,
82 at: &BlockId<Self::Block>,
83 ) -> Result<Option<NumberFor<Self>>, Self::Error>;
84
85 fn block_id_to_hash(
87 &self,
88 at: &BlockId<Self::Block>,
89 ) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
90
91 fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
93
94 fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
96
97 fn block_header(
99 &self,
100 at: <Self::Block as BlockT>::Hash,
101 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
102
103 fn tree_route(
105 &self,
106 from: <Self::Block as BlockT>::Hash,
107 to: <Self::Block as BlockT>::Hash,
108 ) -> Result<TreeRoute<Self::Block>, Self::Error>;
109}
110
111#[derive(Debug, Clone)]
113pub struct Options {
114 pub ready: base::Limit,
116 pub future: base::Limit,
118 pub reject_future_transactions: bool,
120 pub ban_time: Duration,
122}
123
124impl Default for Options {
125 fn default() -> Self {
126 Self {
127 ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
128 future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
129 reject_future_transactions: false,
130 ban_time: Duration::from_secs(60 * 30),
131 }
132 }
133}
134
135#[derive(Copy, Clone)]
138enum CheckBannedBeforeVerify {
139 Yes,
140 No,
141}
142
143pub struct Pool<B: ChainApi> {
145 validated_pool: Arc<ValidatedPool<B>>,
146}
147
148impl<B: ChainApi> Pool<B> {
149 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
151 Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
152 }
153
154 pub async fn submit_at(
156 &self,
157 at: <B::Block as BlockT>::Hash,
158 source: TransactionSource,
159 xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
160 ) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
161 let xts = xts.into_iter().map(|xt| (source, xt));
162 let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?;
163 Ok(self.validated_pool.submit(validated_transactions.into_values()))
164 }
165
166 pub async fn resubmit_at(
170 &self,
171 at: <B::Block as BlockT>::Hash,
172 source: TransactionSource,
173 xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
174 ) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
175 let xts = xts.into_iter().map(|xt| (source, xt));
176 let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?;
177 Ok(self.validated_pool.submit(validated_transactions.into_values()))
178 }
179
180 pub async fn submit_one(
182 &self,
183 at: <B::Block as BlockT>::Hash,
184 source: TransactionSource,
185 xt: ExtrinsicFor<B>,
186 ) -> Result<ExtrinsicHash<B>, B::Error> {
187 let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
188 res.expect("One extrinsic passed; one result returned; qed")
189 }
190
191 pub async fn submit_and_watch(
193 &self,
194 at: <B::Block as BlockT>::Hash,
195 source: TransactionSource,
196 xt: ExtrinsicFor<B>,
197 ) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
198 let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
199 let (_, tx) = self
200 .verify_one(at, block_number, source, xt, CheckBannedBeforeVerify::Yes)
201 .await;
202 self.validated_pool.submit_and_watch(tx)
203 }
204
205 pub fn resubmit(
207 &self,
208 revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
209 ) {
210 let now = Instant::now();
211 self.validated_pool.resubmit(revalidated_transactions);
212 log::debug!(
213 target: LOG_TARGET,
214 "Resubmitted. Took {} ms. Status: {:?}",
215 now.elapsed().as_millis(),
216 self.validated_pool.status()
217 );
218 }
219
220 pub fn prune_known(
226 &self,
227 at: &BlockId<B::Block>,
228 hashes: &[ExtrinsicHash<B>],
229 ) -> Result<(), B::Error> {
230 let in_pool_tags =
232 self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
233
234 let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
236 let pruned_transactions =
237 hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
238 self.validated_pool.fire_pruned(at, pruned_transactions)
239 }
240
241 pub async fn prune(
248 &self,
249 at: <B::Block as BlockT>::Hash,
250 parent: <B::Block as BlockT>::Hash,
251 extrinsics: &[ExtrinsicFor<B>],
252 ) -> Result<(), B::Error> {
253 log::debug!(
254 target: LOG_TARGET,
255 "Starting pruning of block {:?} (extrinsics: {})",
256 at,
257 extrinsics.len()
258 );
259 let in_pool_hashes =
261 extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
262 let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
263
264 let all = extrinsics.iter().zip(in_pool_tags.into_iter());
267
268 let mut future_tags = Vec::new();
269 for (extrinsic, in_pool_tags) in all {
270 match in_pool_tags {
271 Some(tags) => future_tags.extend(tags),
273 None => {
276 if !self.validated_pool.status().is_empty() {
278 let validity = self
279 .validated_pool
280 .api()
281 .validate_transaction(
282 parent,
283 TransactionSource::InBlock,
284 extrinsic.clone(),
285 )
286 .await;
287
288 if let Ok(Ok(validity)) = validity {
289 future_tags.extend(validity.provides);
290 }
291 } else {
292 log::trace!(
293 target: LOG_TARGET,
294 "txpool is empty, skipping validation for block {at:?}",
295 );
296 }
297 },
298 }
299 }
300
301 self.prune_tags(at, future_tags, in_pool_hashes).await
302 }
303
304 pub async fn prune_tags(
326 &self,
327 at: <B::Block as BlockT>::Hash,
328 tags: impl IntoIterator<Item = Tag>,
329 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
330 ) -> Result<(), B::Error> {
331 log::debug!(target: LOG_TARGET, "Pruning at {:?}", at);
332 let prune_status = self.validated_pool.prune_tags(tags)?;
334
335 self.validated_pool
339 .ban(&Instant::now(), known_imported_hashes.clone().into_iter());
340
341 let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash).collect::<Vec<_>>();
344 let pruned_transactions =
345 prune_status.pruned.into_iter().map(|tx| (tx.source, tx.data.clone()));
346
347 let reverified_transactions =
348 self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await?;
349
350 log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions.", at);
351 self.validated_pool.resubmit_pruned(
354 &BlockId::Hash(at),
355 known_imported_hashes,
356 pruned_hashes,
357 reverified_transactions.into_values().collect(),
358 )
359 }
360
361 pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
363 self.validated_pool.api().hash_and_length(xt).0
364 }
365
366 fn resolve_block_number(&self, at: &BlockId<B::Block>) -> Result<NumberFor<B>, B::Error> {
368 self.validated_pool.api().block_id_to_number(at).and_then(|number| {
369 number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
370 })
371 }
372
373 async fn verify(
375 &self,
376 at: <B::Block as BlockT>::Hash,
377 xts: impl IntoIterator<Item = (TransactionSource, ExtrinsicFor<B>)>,
378 check: CheckBannedBeforeVerify,
379 ) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
380 let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
382
383 let res = futures::future::join_all(
384 xts.into_iter()
385 .map(|(source, xt)| self.verify_one(at, block_number, source, xt, check)),
386 )
387 .await
388 .into_iter()
389 .collect::<HashMap<_, _>>();
390
391 Ok(res)
392 }
393
394 async fn verify_one(
396 &self,
397 block_hash: <B::Block as BlockT>::Hash,
398 block_number: NumberFor<B>,
399 source: TransactionSource,
400 xt: ExtrinsicFor<B>,
401 check: CheckBannedBeforeVerify,
402 ) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
403 let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
404
405 let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
406 if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
407 return (hash, ValidatedTransaction::Invalid(hash, err))
408 }
409
410 let validation_result = self
411 .validated_pool
412 .api()
413 .validate_transaction(block_hash, source, xt.clone())
414 .await;
415
416 let status = match validation_result {
417 Ok(status) => status,
418 Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
419 };
420
421 let validity = match status {
422 Ok(validity) =>
423 if validity.provides.is_empty() {
424 ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
425 } else {
426 ValidatedTransaction::valid_at(
427 block_number.saturated_into::<u64>(),
428 hash,
429 source,
430 xt,
431 bytes,
432 validity,
433 )
434 },
435 Err(TransactionValidityError::Invalid(e)) =>
436 ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
437 Err(TransactionValidityError::Unknown(e)) =>
438 ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
439 };
440
441 (hash, validity)
442 }
443
444 pub fn validated_pool(&self) -> &ValidatedPool<B> {
446 &self.validated_pool
447 }
448}
449
450impl<B: ChainApi> Clone for Pool<B> {
451 fn clone(&self) -> Self {
452 Self { validated_pool: self.validated_pool.clone() }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::{super::base_pool::Limit, *};
459 use crate::tests::{pool, uxt, TestApi, INVALID_NONCE};
460 use assert_matches::assert_matches;
461 use codec::Encode;
462 use futures::executor::block_on;
463 use parking_lot::Mutex;
464 use sc_transaction_pool_api::TransactionStatus;
465 use sp_runtime::transaction_validity::TransactionSource;
466 use std::{collections::HashMap, time::Instant};
467 use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
468 use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
469
470 const SOURCE: TransactionSource = TransactionSource::External;
471
472 #[test]
473 fn should_validate_and_import_transaction() {
474 let (pool, api) = pool();
476
477 let hash = block_on(pool.submit_one(
479 api.expect_hash_from_number(0),
480 SOURCE,
481 uxt(Transfer {
482 from: Alice.into(),
483 to: AccountId::from_h256(H256::from_low_u64_be(2)),
484 amount: 5,
485 nonce: 0,
486 }),
487 ))
488 .unwrap();
489
490 assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
492 }
493
494 #[test]
495 fn should_reject_if_temporarily_banned() {
496 let (pool, api) = pool();
498 let uxt = uxt(Transfer {
499 from: Alice.into(),
500 to: AccountId::from_h256(H256::from_low_u64_be(2)),
501 amount: 5,
502 nonce: 0,
503 });
504
505 pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
507 let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
508 assert_eq!(pool.validated_pool().status().ready, 0);
509 assert_eq!(pool.validated_pool().status().future, 0);
510
511 assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
513 }
514
515 #[test]
516 fn should_reject_unactionable_transactions() {
517 let api = Arc::new(TestApi::default());
519 let pool = Pool::new(
520 Default::default(),
521 false.into(),
523 api.clone(),
524 );
525
526 let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
528
529 let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
531
532 assert_matches!(res.unwrap_err(), error::Error::Unactionable);
534 }
535
536 #[test]
537 fn should_notify_about_pool_events() {
538 let (stream, hash0, hash1) = {
539 let (pool, api) = pool();
541 let hash_of_block0 = api.expect_hash_from_number(0);
542 let stream = pool.validated_pool().import_notification_stream();
543
544 let hash0 = block_on(pool.submit_one(
546 hash_of_block0,
547 SOURCE,
548 uxt(Transfer {
549 from: Alice.into(),
550 to: AccountId::from_h256(H256::from_low_u64_be(2)),
551 amount: 5,
552 nonce: 0,
553 }),
554 ))
555 .unwrap();
556 let hash1 = block_on(pool.submit_one(
557 hash_of_block0,
558 SOURCE,
559 uxt(Transfer {
560 from: Alice.into(),
561 to: AccountId::from_h256(H256::from_low_u64_be(2)),
562 amount: 5,
563 nonce: 1,
564 }),
565 ))
566 .unwrap();
567 let _hash = block_on(pool.submit_one(
569 hash_of_block0,
570 SOURCE,
571 uxt(Transfer {
572 from: Alice.into(),
573 to: AccountId::from_h256(H256::from_low_u64_be(2)),
574 amount: 5,
575 nonce: 3,
576 }),
577 ))
578 .unwrap();
579
580 assert_eq!(pool.validated_pool().status().ready, 2);
581 assert_eq!(pool.validated_pool().status().future, 1);
582
583 (stream, hash0, hash1)
584 };
585
586 let mut it = futures::executor::block_on_stream(stream);
588 assert_eq!(it.next(), Some(hash0));
589 assert_eq!(it.next(), Some(hash1));
590 assert_eq!(it.next(), None);
591 }
592
593 #[test]
594 fn should_clear_stale_transactions() {
595 let (pool, api) = pool();
597 let hash_of_block0 = api.expect_hash_from_number(0);
598 let hash1 = block_on(pool.submit_one(
599 hash_of_block0,
600 SOURCE,
601 uxt(Transfer {
602 from: Alice.into(),
603 to: AccountId::from_h256(H256::from_low_u64_be(2)),
604 amount: 5,
605 nonce: 0,
606 }),
607 ))
608 .unwrap();
609 let hash2 = block_on(pool.submit_one(
610 hash_of_block0,
611 SOURCE,
612 uxt(Transfer {
613 from: Alice.into(),
614 to: AccountId::from_h256(H256::from_low_u64_be(2)),
615 amount: 5,
616 nonce: 1,
617 }),
618 ))
619 .unwrap();
620 let hash3 = block_on(pool.submit_one(
621 hash_of_block0,
622 SOURCE,
623 uxt(Transfer {
624 from: Alice.into(),
625 to: AccountId::from_h256(H256::from_low_u64_be(2)),
626 amount: 5,
627 nonce: 3,
628 }),
629 ))
630 .unwrap();
631
632 pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
634
635 assert_eq!(pool.validated_pool().ready().count(), 0);
637 assert_eq!(pool.validated_pool().status().future, 0);
638 assert_eq!(pool.validated_pool().status().ready, 0);
639 assert!(pool.validated_pool.is_banned(&hash1));
641 assert!(pool.validated_pool.is_banned(&hash2));
642 assert!(pool.validated_pool.is_banned(&hash3));
643 }
644
645 #[test]
646 fn should_ban_mined_transactions() {
647 let (pool, api) = pool();
649 let hash1 = block_on(pool.submit_one(
650 api.expect_hash_from_number(0),
651 SOURCE,
652 uxt(Transfer {
653 from: Alice.into(),
654 to: AccountId::from_h256(H256::from_low_u64_be(2)),
655 amount: 5,
656 nonce: 0,
657 }),
658 ))
659 .unwrap();
660
661 block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![vec![0]], vec![hash1]))
663 .unwrap();
664
665 assert!(pool.validated_pool.is_banned(&hash1));
667 }
668
669 #[test]
670 fn should_limit_futures() {
671 sp_tracing::try_init_simple();
672
673 let xt = uxt(Transfer {
674 from: Alice.into(),
675 to: AccountId::from_h256(H256::from_low_u64_be(2)),
676 amount: 5,
677 nonce: 1,
678 });
679
680 let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
682
683 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
684
685 let api = Arc::new(TestApi::default());
686 let pool = Pool::new(options, true.into(), api.clone());
687
688 let hash1 = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
689 assert_eq!(pool.validated_pool().status().future, 1);
690
691 let hash2 = block_on(pool.submit_one(
693 api.expect_hash_from_number(0),
694 SOURCE,
695 uxt(Transfer {
696 from: Bob.into(),
697 to: AccountId::from_h256(H256::from_low_u64_be(2)),
698 amount: 5,
699 nonce: 10,
700 }),
701 ))
702 .unwrap();
703
704 assert_eq!(pool.validated_pool().status().future, 1);
706 assert!(pool.validated_pool.is_banned(&hash1));
707 assert!(!pool.validated_pool.is_banned(&hash2));
708 }
709
710 #[test]
711 fn should_error_if_reject_immediately() {
712 let limit = Limit { count: 100, total_bytes: 10 };
714
715 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
716
717 let api = Arc::new(TestApi::default());
718 let pool = Pool::new(options, true.into(), api.clone());
719
720 block_on(pool.submit_one(
722 api.expect_hash_from_number(0),
723 SOURCE,
724 uxt(Transfer {
725 from: Alice.into(),
726 to: AccountId::from_h256(H256::from_low_u64_be(2)),
727 amount: 5,
728 nonce: 1,
729 }),
730 ))
731 .unwrap_err();
732
733 assert_eq!(pool.validated_pool().status().ready, 0);
735 assert_eq!(pool.validated_pool().status().future, 0);
736 }
737
738 #[test]
739 fn should_reject_transactions_with_no_provides() {
740 let (pool, api) = pool();
742
743 let err = block_on(pool.submit_one(
745 api.expect_hash_from_number(0),
746 SOURCE,
747 uxt(Transfer {
748 from: Alice.into(),
749 to: AccountId::from_h256(H256::from_low_u64_be(2)),
750 amount: 5,
751 nonce: INVALID_NONCE,
752 }),
753 ))
754 .unwrap_err();
755
756 assert_eq!(pool.validated_pool().status().ready, 0);
758 assert_eq!(pool.validated_pool().status().future, 0);
759 assert_matches!(err, error::Error::NoTagsProvided);
760 }
761
762 mod listener {
763 use super::*;
764
765 #[test]
766 fn should_trigger_ready_and_finalized() {
767 let (pool, api) = pool();
769 let watcher = block_on(pool.submit_and_watch(
770 api.expect_hash_from_number(0),
771 SOURCE,
772 uxt(Transfer {
773 from: Alice.into(),
774 to: AccountId::from_h256(H256::from_low_u64_be(2)),
775 amount: 5,
776 nonce: 0,
777 }),
778 ))
779 .unwrap();
780 assert_eq!(pool.validated_pool().status().ready, 1);
781 assert_eq!(pool.validated_pool().status().future, 0);
782
783 let hash_of_block2 = api.expect_hash_from_number(2);
784
785 block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![])).unwrap();
787 assert_eq!(pool.validated_pool().status().ready, 0);
788 assert_eq!(pool.validated_pool().status().future, 0);
789
790 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
792 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
793 assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
794 }
795
796 #[test]
797 fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
798 let (pool, api) = pool();
800 let watcher = block_on(pool.submit_and_watch(
801 api.expect_hash_from_number(0),
802 SOURCE,
803 uxt(Transfer {
804 from: Alice.into(),
805 to: AccountId::from_h256(H256::from_low_u64_be(2)),
806 amount: 5,
807 nonce: 0,
808 }),
809 ))
810 .unwrap();
811 assert_eq!(pool.validated_pool().status().ready, 1);
812 assert_eq!(pool.validated_pool().status().future, 0);
813
814 let hash_of_block2 = api.expect_hash_from_number(2);
815
816 block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![*watcher.hash()]))
818 .unwrap();
819 assert_eq!(pool.validated_pool().status().ready, 0);
820 assert_eq!(pool.validated_pool().status().future, 0);
821
822 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
824 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
825 assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
826 }
827
828 #[test]
829 fn should_trigger_future_and_ready_after_promoted() {
830 let (pool, api) = pool();
832 let hash_of_block0 = api.expect_hash_from_number(0);
833
834 let watcher = block_on(pool.submit_and_watch(
835 hash_of_block0,
836 SOURCE,
837 uxt(Transfer {
838 from: Alice.into(),
839 to: AccountId::from_h256(H256::from_low_u64_be(2)),
840 amount: 5,
841 nonce: 1,
842 }),
843 ))
844 .unwrap();
845 assert_eq!(pool.validated_pool().status().ready, 0);
846 assert_eq!(pool.validated_pool().status().future, 1);
847
848 block_on(pool.submit_one(
850 hash_of_block0,
851 SOURCE,
852 uxt(Transfer {
853 from: Alice.into(),
854 to: AccountId::from_h256(H256::from_low_u64_be(2)),
855 amount: 5,
856 nonce: 0,
857 }),
858 ))
859 .unwrap();
860 assert_eq!(pool.validated_pool().status().ready, 2);
861
862 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
864 assert_eq!(stream.next(), Some(TransactionStatus::Future));
865 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
866 }
867
868 #[test]
869 fn should_trigger_invalid_and_ban() {
870 let (pool, api) = pool();
872 let uxt = uxt(Transfer {
873 from: Alice.into(),
874 to: AccountId::from_h256(H256::from_low_u64_be(2)),
875 amount: 5,
876 nonce: 0,
877 });
878 let watcher =
879 block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
880 .unwrap();
881 assert_eq!(pool.validated_pool().status().ready, 1);
882
883 pool.validated_pool.remove_invalid(&[*watcher.hash()]);
885
886 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
888 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
889 assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
890 assert_eq!(stream.next(), None);
891 }
892
893 #[test]
894 fn should_trigger_broadcasted() {
895 let (pool, api) = pool();
897 let uxt = uxt(Transfer {
898 from: Alice.into(),
899 to: AccountId::from_h256(H256::from_low_u64_be(2)),
900 amount: 5,
901 nonce: 0,
902 });
903 let watcher =
904 block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
905 .unwrap();
906 assert_eq!(pool.validated_pool().status().ready, 1);
907
908 let mut map = HashMap::new();
910 let peers = vec!["a".into(), "b".into(), "c".into()];
911 map.insert(*watcher.hash(), peers.clone());
912 pool.validated_pool().on_broadcasted(map);
913
914 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
916 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
917 assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
918 }
919
920 #[test]
921 fn should_trigger_dropped_older() {
922 let limit = Limit { count: 1, total_bytes: 1000 };
924 let options =
925 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
926
927 let api = Arc::new(TestApi::default());
928 let pool = Pool::new(options, true.into(), api.clone());
929
930 let xt = uxt(Transfer {
931 from: Alice.into(),
932 to: AccountId::from_h256(H256::from_low_u64_be(2)),
933 amount: 5,
934 nonce: 0,
935 });
936 let watcher =
937 block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, xt))
938 .unwrap();
939 assert_eq!(pool.validated_pool().status().ready, 1);
940
941 let xt = uxt(Transfer {
943 from: Bob.into(),
944 to: AccountId::from_h256(H256::from_low_u64_be(1)),
945 amount: 4,
946 nonce: 1,
947 });
948 block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
949 assert_eq!(pool.validated_pool().status().ready, 1);
950
951 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
953 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
954 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
955 }
956
957 #[test]
958 fn should_trigger_dropped_lower_priority() {
959 {
960 let limit = Limit { count: 1, total_bytes: 1000 };
962 let options =
963 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
964
965 let api = Arc::new(TestApi::default());
966 let pool = Pool::new(options, true.into(), api.clone());
967
968 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
971 block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
972 assert_eq!(pool.validated_pool().status().ready, 1);
973
974 let xt = uxt(Transfer {
978 from: Bob.into(),
979 to: AccountId::from_h256(H256::from_low_u64_be(1)),
980 amount: 4,
981 nonce: 1,
982 });
983 let result = block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt));
984 assert!(matches!(
985 result,
986 Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
987 ));
988 }
989 {
990 let limit = Limit { count: 2, total_bytes: 1000 };
992 let options =
993 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
994
995 let api = Arc::new(TestApi::default());
996 let pool = Pool::new(options, true.into(), api.clone());
997
998 let hash_of_block0 = api.expect_hash_from_number(0);
999
1000 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1003 block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
1004 assert_eq!(pool.validated_pool().status().ready, 1);
1005
1006 let xt = uxt(Transfer {
1009 from: Alice.into(),
1010 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1011 amount: 5,
1012 nonce: 0,
1013 });
1014 let watcher = block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
1015 assert_eq!(pool.validated_pool().status().ready, 2);
1016
1017 let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1021 block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
1022 assert_eq!(pool.validated_pool().status().ready, 2);
1023
1024 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1026 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1027 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1028 }
1029 }
1030
1031 #[test]
1032 fn should_handle_pruning_in_the_middle_of_import() {
1033 let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1035 let (tx, rx) = std::sync::mpsc::sync_channel(1);
1036 let mut api = TestApi::default();
1037 api.delay = Arc::new(Mutex::new(rx.into()));
1038 let api = Arc::new(api);
1039 let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
1040
1041 let hash_of_block0 = api.expect_hash_from_number(0);
1042
1043 let xt = uxt(Transfer {
1045 from: Alice.into(),
1046 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1047 amount: 5,
1048 nonce: 1,
1049 });
1050
1051 let pool2 = pool.clone();
1053 std::thread::spawn(move || {
1054 block_on(pool2.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
1055 ready.send(()).unwrap();
1056 });
1057
1058 let xt = uxt(Transfer {
1061 from: Alice.into(),
1062 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1063 amount: 4,
1064 nonce: 0,
1065 });
1066 let provides = vec![0_u8];
1068 block_on(pool.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
1069 assert_eq!(pool.validated_pool().status().ready, 1);
1070
1071 block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![provides], vec![]))
1074 .unwrap();
1075 assert_eq!(pool.validated_pool().status().ready, 0);
1076
1077 tx.send(()).unwrap();
1081
1082 is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
1085 assert_eq!(pool.validated_pool().status().future, 0);
1086 }
1087 }
1088}