sc_transaction_pool/graph/
pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{collections::HashMap, sync::Arc, time::Duration};
20
21use crate::LOG_TARGET;
22use futures::{channel::mpsc::Receiver, Future};
23use sc_transaction_pool_api::error;
24use sp_blockchain::TreeRoute;
25use sp_runtime::{
26	generic::BlockId,
27	traits::{self, Block as BlockT, SaturatedConversion},
28	transaction_validity::{
29		TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30	},
31};
32use std::time::Instant;
33
34use super::{
35	base_pool as base,
36	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
37	watcher::Watcher,
38};
39
40/// Modification notification event stream type;
41pub type EventStream<H> = Receiver<H>;
42
43/// Block hash type for a pool.
44pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
45/// Extrinsic hash type for a pool.
46pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
47/// Extrinsic type for a pool.
48pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
49/// Block number type for the ChainApi
50pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
51/// A type of transaction stored in the pool
52pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
53/// A type of validated transaction stored in the pool.
54pub type ValidatedTransactionFor<A> =
55	ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
56
57/// Concrete extrinsic validation and query logic.
58pub trait ChainApi: Send + Sync {
59	/// Block type.
60	type Block: BlockT;
61	/// Error type.
62	type Error: From<error::Error> + error::IntoPoolError;
63	/// Validate transaction future.
64	type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
65	/// Body future (since block body might be remote)
66	type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
67		+ Unpin
68		+ Send
69		+ 'static;
70
71	/// Verify extrinsic at given block.
72	fn validate_transaction(
73		&self,
74		at: <Self::Block as BlockT>::Hash,
75		source: TransactionSource,
76		uxt: ExtrinsicFor<Self>,
77	) -> Self::ValidationFuture;
78
79	/// Returns a block number given the block id.
80	fn block_id_to_number(
81		&self,
82		at: &BlockId<Self::Block>,
83	) -> Result<Option<NumberFor<Self>>, Self::Error>;
84
85	/// Returns a block hash given the block id.
86	fn block_id_to_hash(
87		&self,
88		at: &BlockId<Self::Block>,
89	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
90
91	/// Returns hash and encoding length of the extrinsic.
92	fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
93
94	/// Returns a block body given the block.
95	fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
96
97	/// Returns a block header given the block id.
98	fn block_header(
99		&self,
100		at: <Self::Block as BlockT>::Hash,
101	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
102
103	/// Compute a tree-route between two blocks. See [`TreeRoute`] for more details.
104	fn tree_route(
105		&self,
106		from: <Self::Block as BlockT>::Hash,
107		to: <Self::Block as BlockT>::Hash,
108	) -> Result<TreeRoute<Self::Block>, Self::Error>;
109}
110
111/// Pool configuration options.
112#[derive(Debug, Clone)]
113pub struct Options {
114	/// Ready queue limits.
115	pub ready: base::Limit,
116	/// Future queue limits.
117	pub future: base::Limit,
118	/// Reject future transactions.
119	pub reject_future_transactions: bool,
120	/// How long the extrinsic is banned for.
121	pub ban_time: Duration,
122}
123
124impl Default for Options {
125	fn default() -> Self {
126		Self {
127			ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
128			future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
129			reject_future_transactions: false,
130			ban_time: Duration::from_secs(60 * 30),
131		}
132	}
133}
134
135/// Should we check that the transaction is banned
136/// in the pool, before we verify it?
137#[derive(Copy, Clone)]
138enum CheckBannedBeforeVerify {
139	Yes,
140	No,
141}
142
143/// Extrinsics pool that performs validation.
144pub struct Pool<B: ChainApi> {
145	validated_pool: Arc<ValidatedPool<B>>,
146}
147
148impl<B: ChainApi> Pool<B> {
149	/// Create a new transaction pool.
150	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
151		Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
152	}
153
154	/// Imports a bunch of unverified extrinsics to the pool
155	pub async fn submit_at(
156		&self,
157		at: <B::Block as BlockT>::Hash,
158		source: TransactionSource,
159		xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
160	) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
161		let xts = xts.into_iter().map(|xt| (source, xt));
162		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?;
163		Ok(self.validated_pool.submit(validated_transactions.into_values()))
164	}
165
166	/// Resubmit the given extrinsics to the pool.
167	///
168	/// This does not check if a transaction is banned, before we verify it again.
169	pub async fn resubmit_at(
170		&self,
171		at: <B::Block as BlockT>::Hash,
172		source: TransactionSource,
173		xts: impl IntoIterator<Item = ExtrinsicFor<B>>,
174	) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
175		let xts = xts.into_iter().map(|xt| (source, xt));
176		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?;
177		Ok(self.validated_pool.submit(validated_transactions.into_values()))
178	}
179
180	/// Imports one unverified extrinsic to the pool
181	pub async fn submit_one(
182		&self,
183		at: <B::Block as BlockT>::Hash,
184		source: TransactionSource,
185		xt: ExtrinsicFor<B>,
186	) -> Result<ExtrinsicHash<B>, B::Error> {
187		let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
188		res.expect("One extrinsic passed; one result returned; qed")
189	}
190
191	/// Import a single extrinsic and starts to watch its progress in the pool.
192	pub async fn submit_and_watch(
193		&self,
194		at: <B::Block as BlockT>::Hash,
195		source: TransactionSource,
196		xt: ExtrinsicFor<B>,
197	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
198		let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
199		let (_, tx) = self
200			.verify_one(at, block_number, source, xt, CheckBannedBeforeVerify::Yes)
201			.await;
202		self.validated_pool.submit_and_watch(tx)
203	}
204
205	/// Resubmit some transaction that were validated elsewhere.
206	pub fn resubmit(
207		&self,
208		revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
209	) {
210		let now = Instant::now();
211		self.validated_pool.resubmit(revalidated_transactions);
212		log::debug!(
213			target: LOG_TARGET,
214			"Resubmitted. Took {} ms. Status: {:?}",
215			now.elapsed().as_millis(),
216			self.validated_pool.status()
217		);
218	}
219
220	/// Prunes known ready transactions.
221	///
222	/// Used to clear the pool from transactions that were part of recently imported block.
223	/// The main difference from the `prune` is that we do not revalidate any transactions
224	/// and ignore unknown passed hashes.
225	pub fn prune_known(
226		&self,
227		at: &BlockId<B::Block>,
228		hashes: &[ExtrinsicHash<B>],
229	) -> Result<(), B::Error> {
230		// Get details of all extrinsics that are already in the pool
231		let in_pool_tags =
232			self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
233
234		// Prune all transactions that provide given tags
235		let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
236		let pruned_transactions =
237			hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
238		self.validated_pool.fire_pruned(at, pruned_transactions)
239	}
240
241	/// Prunes ready transactions.
242	///
243	/// Used to clear the pool from transactions that were part of recently imported block.
244	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
245	/// into runtime too often we first lookup all extrinsics that are in the pool and get
246	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
247	pub async fn prune(
248		&self,
249		at: <B::Block as BlockT>::Hash,
250		parent: <B::Block as BlockT>::Hash,
251		extrinsics: &[ExtrinsicFor<B>],
252	) -> Result<(), B::Error> {
253		log::debug!(
254			target: LOG_TARGET,
255			"Starting pruning of block {:?} (extrinsics: {})",
256			at,
257			extrinsics.len()
258		);
259		// Get details of all extrinsics that are already in the pool
260		let in_pool_hashes =
261			extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
262		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
263
264		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic,
265		// Option<Vec<Tag>>)`)
266		let all = extrinsics.iter().zip(in_pool_tags.into_iter());
267
268		let mut future_tags = Vec::new();
269		for (extrinsic, in_pool_tags) in all {
270			match in_pool_tags {
271				// reuse the tags for extrinsics that were found in the pool
272				Some(tags) => future_tags.extend(tags),
273				// if it's not found in the pool query the runtime at parent block
274				// to get validity info and tags that the extrinsic provides.
275				None => {
276					// Avoid validating block txs if the pool is empty
277					if !self.validated_pool.status().is_empty() {
278						let validity = self
279							.validated_pool
280							.api()
281							.validate_transaction(
282								parent,
283								TransactionSource::InBlock,
284								extrinsic.clone(),
285							)
286							.await;
287
288						if let Ok(Ok(validity)) = validity {
289							future_tags.extend(validity.provides);
290						}
291					} else {
292						log::trace!(
293							target: LOG_TARGET,
294							"txpool is empty, skipping validation for block {at:?}",
295						);
296					}
297				},
298			}
299		}
300
301		self.prune_tags(at, future_tags, in_pool_hashes).await
302	}
303
304	/// Prunes ready transactions that provide given list of tags.
305	///
306	/// Given tags are assumed to be always provided now, so all transactions
307	/// in the Future Queue that require that particular tag (and have other
308	/// requirements satisfied) are promoted to Ready Queue.
309	///
310	/// Moreover for each provided tag we remove transactions in the pool that:
311	/// 1. Provide that tag directly
312	/// 2. Are a dependency of pruned transaction.
313	///
314	/// Returns transactions that have been removed from the pool and must be reverified
315	/// before reinserting to the pool.
316	///
317	/// By removing predecessor transactions as well we might actually end up
318	/// pruning too much, so all removed transactions are reverified against
319	/// the runtime (`validate_transaction`) to make sure they are invalid.
320	///
321	/// However we avoid revalidating transactions that are contained within
322	/// the second parameter of `known_imported_hashes`. These transactions
323	/// (if pruned) are not revalidated and become temporarily banned to
324	/// prevent importing them in the (near) future.
325	pub async fn prune_tags(
326		&self,
327		at: <B::Block as BlockT>::Hash,
328		tags: impl IntoIterator<Item = Tag>,
329		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
330	) -> Result<(), B::Error> {
331		log::debug!(target: LOG_TARGET, "Pruning at {:?}", at);
332		// Prune all transactions that provide given tags
333		let prune_status = self.validated_pool.prune_tags(tags)?;
334
335		// Make sure that we don't revalidate extrinsics that were part of the recently
336		// imported block. This is especially important for UTXO-like chains cause the
337		// inputs are pruned so such transaction would go to future again.
338		self.validated_pool
339			.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
340
341		// Try to re-validate pruned transactions since some of them might be still valid.
342		// note that `known_imported_hashes` will be rejected here due to temporary ban.
343		let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash).collect::<Vec<_>>();
344		let pruned_transactions =
345			prune_status.pruned.into_iter().map(|tx| (tx.source, tx.data.clone()));
346
347		let reverified_transactions =
348			self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await?;
349
350		log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions.", at);
351		// And finally - submit reverified transactions back to the pool
352
353		self.validated_pool.resubmit_pruned(
354			&BlockId::Hash(at),
355			known_imported_hashes,
356			pruned_hashes,
357			reverified_transactions.into_values().collect(),
358		)
359	}
360
361	/// Returns transaction hash
362	pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
363		self.validated_pool.api().hash_and_length(xt).0
364	}
365
366	/// Resolves block number by id.
367	fn resolve_block_number(&self, at: &BlockId<B::Block>) -> Result<NumberFor<B>, B::Error> {
368		self.validated_pool.api().block_id_to_number(at).and_then(|number| {
369			number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
370		})
371	}
372
373	/// Returns future that validates a bunch of transactions at given block.
374	async fn verify(
375		&self,
376		at: <B::Block as BlockT>::Hash,
377		xts: impl IntoIterator<Item = (TransactionSource, ExtrinsicFor<B>)>,
378		check: CheckBannedBeforeVerify,
379	) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
380		// we need a block number to compute tx validity
381		let block_number = self.resolve_block_number(&BlockId::Hash(at))?;
382
383		let res = futures::future::join_all(
384			xts.into_iter()
385				.map(|(source, xt)| self.verify_one(at, block_number, source, xt, check)),
386		)
387		.await
388		.into_iter()
389		.collect::<HashMap<_, _>>();
390
391		Ok(res)
392	}
393
394	/// Returns future that validates single transaction at given block.
395	async fn verify_one(
396		&self,
397		block_hash: <B::Block as BlockT>::Hash,
398		block_number: NumberFor<B>,
399		source: TransactionSource,
400		xt: ExtrinsicFor<B>,
401		check: CheckBannedBeforeVerify,
402	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
403		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
404
405		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
406		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
407			return (hash, ValidatedTransaction::Invalid(hash, err))
408		}
409
410		let validation_result = self
411			.validated_pool
412			.api()
413			.validate_transaction(block_hash, source, xt.clone())
414			.await;
415
416		let status = match validation_result {
417			Ok(status) => status,
418			Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
419		};
420
421		let validity = match status {
422			Ok(validity) =>
423				if validity.provides.is_empty() {
424					ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
425				} else {
426					ValidatedTransaction::valid_at(
427						block_number.saturated_into::<u64>(),
428						hash,
429						source,
430						xt,
431						bytes,
432						validity,
433					)
434				},
435			Err(TransactionValidityError::Invalid(e)) =>
436				ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
437			Err(TransactionValidityError::Unknown(e)) =>
438				ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
439		};
440
441		(hash, validity)
442	}
443
444	/// get a reference to the underlying validated pool.
445	pub fn validated_pool(&self) -> &ValidatedPool<B> {
446		&self.validated_pool
447	}
448}
449
450impl<B: ChainApi> Clone for Pool<B> {
451	fn clone(&self) -> Self {
452		Self { validated_pool: self.validated_pool.clone() }
453	}
454}
455
456#[cfg(test)]
457mod tests {
458	use super::{super::base_pool::Limit, *};
459	use crate::tests::{pool, uxt, TestApi, INVALID_NONCE};
460	use assert_matches::assert_matches;
461	use codec::Encode;
462	use futures::executor::block_on;
463	use parking_lot::Mutex;
464	use sc_transaction_pool_api::TransactionStatus;
465	use sp_runtime::transaction_validity::TransactionSource;
466	use std::{collections::HashMap, time::Instant};
467	use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
468	use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
469
470	const SOURCE: TransactionSource = TransactionSource::External;
471
472	#[test]
473	fn should_validate_and_import_transaction() {
474		// given
475		let (pool, api) = pool();
476
477		// when
478		let hash = block_on(pool.submit_one(
479			api.expect_hash_from_number(0),
480			SOURCE,
481			uxt(Transfer {
482				from: Alice.into(),
483				to: AccountId::from_h256(H256::from_low_u64_be(2)),
484				amount: 5,
485				nonce: 0,
486			}),
487		))
488		.unwrap();
489
490		// then
491		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
492	}
493
494	#[test]
495	fn should_reject_if_temporarily_banned() {
496		// given
497		let (pool, api) = pool();
498		let uxt = uxt(Transfer {
499			from: Alice.into(),
500			to: AccountId::from_h256(H256::from_low_u64_be(2)),
501			amount: 5,
502			nonce: 0,
503		});
504
505		// when
506		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
507		let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
508		assert_eq!(pool.validated_pool().status().ready, 0);
509		assert_eq!(pool.validated_pool().status().future, 0);
510
511		// then
512		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
513	}
514
515	#[test]
516	fn should_reject_unactionable_transactions() {
517		// given
518		let api = Arc::new(TestApi::default());
519		let pool = Pool::new(
520			Default::default(),
521			// the node does not author blocks
522			false.into(),
523			api.clone(),
524		);
525
526		// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
527		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
528
529		// when
530		let res = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, uxt));
531
532		// then
533		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
534	}
535
536	#[test]
537	fn should_notify_about_pool_events() {
538		let (stream, hash0, hash1) = {
539			// given
540			let (pool, api) = pool();
541			let hash_of_block0 = api.expect_hash_from_number(0);
542			let stream = pool.validated_pool().import_notification_stream();
543
544			// when
545			let hash0 = block_on(pool.submit_one(
546				hash_of_block0,
547				SOURCE,
548				uxt(Transfer {
549					from: Alice.into(),
550					to: AccountId::from_h256(H256::from_low_u64_be(2)),
551					amount: 5,
552					nonce: 0,
553				}),
554			))
555			.unwrap();
556			let hash1 = block_on(pool.submit_one(
557				hash_of_block0,
558				SOURCE,
559				uxt(Transfer {
560					from: Alice.into(),
561					to: AccountId::from_h256(H256::from_low_u64_be(2)),
562					amount: 5,
563					nonce: 1,
564				}),
565			))
566			.unwrap();
567			// future doesn't count
568			let _hash = block_on(pool.submit_one(
569				hash_of_block0,
570				SOURCE,
571				uxt(Transfer {
572					from: Alice.into(),
573					to: AccountId::from_h256(H256::from_low_u64_be(2)),
574					amount: 5,
575					nonce: 3,
576				}),
577			))
578			.unwrap();
579
580			assert_eq!(pool.validated_pool().status().ready, 2);
581			assert_eq!(pool.validated_pool().status().future, 1);
582
583			(stream, hash0, hash1)
584		};
585
586		// then
587		let mut it = futures::executor::block_on_stream(stream);
588		assert_eq!(it.next(), Some(hash0));
589		assert_eq!(it.next(), Some(hash1));
590		assert_eq!(it.next(), None);
591	}
592
593	#[test]
594	fn should_clear_stale_transactions() {
595		// given
596		let (pool, api) = pool();
597		let hash_of_block0 = api.expect_hash_from_number(0);
598		let hash1 = block_on(pool.submit_one(
599			hash_of_block0,
600			SOURCE,
601			uxt(Transfer {
602				from: Alice.into(),
603				to: AccountId::from_h256(H256::from_low_u64_be(2)),
604				amount: 5,
605				nonce: 0,
606			}),
607		))
608		.unwrap();
609		let hash2 = block_on(pool.submit_one(
610			hash_of_block0,
611			SOURCE,
612			uxt(Transfer {
613				from: Alice.into(),
614				to: AccountId::from_h256(H256::from_low_u64_be(2)),
615				amount: 5,
616				nonce: 1,
617			}),
618		))
619		.unwrap();
620		let hash3 = block_on(pool.submit_one(
621			hash_of_block0,
622			SOURCE,
623			uxt(Transfer {
624				from: Alice.into(),
625				to: AccountId::from_h256(H256::from_low_u64_be(2)),
626				amount: 5,
627				nonce: 3,
628			}),
629		))
630		.unwrap();
631
632		// when
633		pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
634
635		// then
636		assert_eq!(pool.validated_pool().ready().count(), 0);
637		assert_eq!(pool.validated_pool().status().future, 0);
638		assert_eq!(pool.validated_pool().status().ready, 0);
639		// make sure they are temporarily banned as well
640		assert!(pool.validated_pool.is_banned(&hash1));
641		assert!(pool.validated_pool.is_banned(&hash2));
642		assert!(pool.validated_pool.is_banned(&hash3));
643	}
644
645	#[test]
646	fn should_ban_mined_transactions() {
647		// given
648		let (pool, api) = pool();
649		let hash1 = block_on(pool.submit_one(
650			api.expect_hash_from_number(0),
651			SOURCE,
652			uxt(Transfer {
653				from: Alice.into(),
654				to: AccountId::from_h256(H256::from_low_u64_be(2)),
655				amount: 5,
656				nonce: 0,
657			}),
658		))
659		.unwrap();
660
661		// when
662		block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![vec![0]], vec![hash1]))
663			.unwrap();
664
665		// then
666		assert!(pool.validated_pool.is_banned(&hash1));
667	}
668
669	#[test]
670	fn should_limit_futures() {
671		sp_tracing::try_init_simple();
672
673		let xt = uxt(Transfer {
674			from: Alice.into(),
675			to: AccountId::from_h256(H256::from_low_u64_be(2)),
676			amount: 5,
677			nonce: 1,
678		});
679
680		// given
681		let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
682
683		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
684
685		let api = Arc::new(TestApi::default());
686		let pool = Pool::new(options, true.into(), api.clone());
687
688		let hash1 = block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
689		assert_eq!(pool.validated_pool().status().future, 1);
690
691		// when
692		let hash2 = block_on(pool.submit_one(
693			api.expect_hash_from_number(0),
694			SOURCE,
695			uxt(Transfer {
696				from: Bob.into(),
697				to: AccountId::from_h256(H256::from_low_u64_be(2)),
698				amount: 5,
699				nonce: 10,
700			}),
701		))
702		.unwrap();
703
704		// then
705		assert_eq!(pool.validated_pool().status().future, 1);
706		assert!(pool.validated_pool.is_banned(&hash1));
707		assert!(!pool.validated_pool.is_banned(&hash2));
708	}
709
710	#[test]
711	fn should_error_if_reject_immediately() {
712		// given
713		let limit = Limit { count: 100, total_bytes: 10 };
714
715		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
716
717		let api = Arc::new(TestApi::default());
718		let pool = Pool::new(options, true.into(), api.clone());
719
720		// when
721		block_on(pool.submit_one(
722			api.expect_hash_from_number(0),
723			SOURCE,
724			uxt(Transfer {
725				from: Alice.into(),
726				to: AccountId::from_h256(H256::from_low_u64_be(2)),
727				amount: 5,
728				nonce: 1,
729			}),
730		))
731		.unwrap_err();
732
733		// then
734		assert_eq!(pool.validated_pool().status().ready, 0);
735		assert_eq!(pool.validated_pool().status().future, 0);
736	}
737
738	#[test]
739	fn should_reject_transactions_with_no_provides() {
740		// given
741		let (pool, api) = pool();
742
743		// when
744		let err = block_on(pool.submit_one(
745			api.expect_hash_from_number(0),
746			SOURCE,
747			uxt(Transfer {
748				from: Alice.into(),
749				to: AccountId::from_h256(H256::from_low_u64_be(2)),
750				amount: 5,
751				nonce: INVALID_NONCE,
752			}),
753		))
754		.unwrap_err();
755
756		// then
757		assert_eq!(pool.validated_pool().status().ready, 0);
758		assert_eq!(pool.validated_pool().status().future, 0);
759		assert_matches!(err, error::Error::NoTagsProvided);
760	}
761
762	mod listener {
763		use super::*;
764
765		#[test]
766		fn should_trigger_ready_and_finalized() {
767			// given
768			let (pool, api) = pool();
769			let watcher = block_on(pool.submit_and_watch(
770				api.expect_hash_from_number(0),
771				SOURCE,
772				uxt(Transfer {
773					from: Alice.into(),
774					to: AccountId::from_h256(H256::from_low_u64_be(2)),
775					amount: 5,
776					nonce: 0,
777				}),
778			))
779			.unwrap();
780			assert_eq!(pool.validated_pool().status().ready, 1);
781			assert_eq!(pool.validated_pool().status().future, 0);
782
783			let hash_of_block2 = api.expect_hash_from_number(2);
784
785			// when
786			block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![])).unwrap();
787			assert_eq!(pool.validated_pool().status().ready, 0);
788			assert_eq!(pool.validated_pool().status().future, 0);
789
790			// then
791			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
792			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
793			assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
794		}
795
796		#[test]
797		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
798			// given
799			let (pool, api) = pool();
800			let watcher = block_on(pool.submit_and_watch(
801				api.expect_hash_from_number(0),
802				SOURCE,
803				uxt(Transfer {
804					from: Alice.into(),
805					to: AccountId::from_h256(H256::from_low_u64_be(2)),
806					amount: 5,
807					nonce: 0,
808				}),
809			))
810			.unwrap();
811			assert_eq!(pool.validated_pool().status().ready, 1);
812			assert_eq!(pool.validated_pool().status().future, 0);
813
814			let hash_of_block2 = api.expect_hash_from_number(2);
815
816			// when
817			block_on(pool.prune_tags(hash_of_block2, vec![vec![0u8]], vec![*watcher.hash()]))
818				.unwrap();
819			assert_eq!(pool.validated_pool().status().ready, 0);
820			assert_eq!(pool.validated_pool().status().future, 0);
821
822			// then
823			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
824			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
825			assert_eq!(stream.next(), Some(TransactionStatus::InBlock((hash_of_block2.into(), 0))),);
826		}
827
828		#[test]
829		fn should_trigger_future_and_ready_after_promoted() {
830			// given
831			let (pool, api) = pool();
832			let hash_of_block0 = api.expect_hash_from_number(0);
833
834			let watcher = block_on(pool.submit_and_watch(
835				hash_of_block0,
836				SOURCE,
837				uxt(Transfer {
838					from: Alice.into(),
839					to: AccountId::from_h256(H256::from_low_u64_be(2)),
840					amount: 5,
841					nonce: 1,
842				}),
843			))
844			.unwrap();
845			assert_eq!(pool.validated_pool().status().ready, 0);
846			assert_eq!(pool.validated_pool().status().future, 1);
847
848			// when
849			block_on(pool.submit_one(
850				hash_of_block0,
851				SOURCE,
852				uxt(Transfer {
853					from: Alice.into(),
854					to: AccountId::from_h256(H256::from_low_u64_be(2)),
855					amount: 5,
856					nonce: 0,
857				}),
858			))
859			.unwrap();
860			assert_eq!(pool.validated_pool().status().ready, 2);
861
862			// then
863			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
864			assert_eq!(stream.next(), Some(TransactionStatus::Future));
865			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
866		}
867
868		#[test]
869		fn should_trigger_invalid_and_ban() {
870			// given
871			let (pool, api) = pool();
872			let uxt = uxt(Transfer {
873				from: Alice.into(),
874				to: AccountId::from_h256(H256::from_low_u64_be(2)),
875				amount: 5,
876				nonce: 0,
877			});
878			let watcher =
879				block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
880					.unwrap();
881			assert_eq!(pool.validated_pool().status().ready, 1);
882
883			// when
884			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
885
886			// then
887			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
888			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
889			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
890			assert_eq!(stream.next(), None);
891		}
892
893		#[test]
894		fn should_trigger_broadcasted() {
895			// given
896			let (pool, api) = pool();
897			let uxt = uxt(Transfer {
898				from: Alice.into(),
899				to: AccountId::from_h256(H256::from_low_u64_be(2)),
900				amount: 5,
901				nonce: 0,
902			});
903			let watcher =
904				block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, uxt))
905					.unwrap();
906			assert_eq!(pool.validated_pool().status().ready, 1);
907
908			// when
909			let mut map = HashMap::new();
910			let peers = vec!["a".into(), "b".into(), "c".into()];
911			map.insert(*watcher.hash(), peers.clone());
912			pool.validated_pool().on_broadcasted(map);
913
914			// then
915			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
916			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
917			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
918		}
919
920		#[test]
921		fn should_trigger_dropped_older() {
922			// given
923			let limit = Limit { count: 1, total_bytes: 1000 };
924			let options =
925				Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
926
927			let api = Arc::new(TestApi::default());
928			let pool = Pool::new(options, true.into(), api.clone());
929
930			let xt = uxt(Transfer {
931				from: Alice.into(),
932				to: AccountId::from_h256(H256::from_low_u64_be(2)),
933				amount: 5,
934				nonce: 0,
935			});
936			let watcher =
937				block_on(pool.submit_and_watch(api.expect_hash_from_number(0), SOURCE, xt))
938					.unwrap();
939			assert_eq!(pool.validated_pool().status().ready, 1);
940
941			// when
942			let xt = uxt(Transfer {
943				from: Bob.into(),
944				to: AccountId::from_h256(H256::from_low_u64_be(1)),
945				amount: 4,
946				nonce: 1,
947			});
948			block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
949			assert_eq!(pool.validated_pool().status().ready, 1);
950
951			// then
952			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
953			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
954			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
955		}
956
957		#[test]
958		fn should_trigger_dropped_lower_priority() {
959			{
960				// given
961				let limit = Limit { count: 1, total_bytes: 1000 };
962				let options =
963					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
964
965				let api = Arc::new(TestApi::default());
966				let pool = Pool::new(options, true.into(), api.clone());
967
968				// after validation `IncludeData` will have priority set to 9001
969				// (validate_transaction mock)
970				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
971				block_on(pool.submit_one(api.expect_hash_from_number(0), SOURCE, xt)).unwrap();
972				assert_eq!(pool.validated_pool().status().ready, 1);
973
974				// then
975				// after validation `Transfer` will have priority set to 4 (validate_transaction
976				// mock)
977				let xt = uxt(Transfer {
978					from: Bob.into(),
979					to: AccountId::from_h256(H256::from_low_u64_be(1)),
980					amount: 4,
981					nonce: 1,
982				});
983				let result = block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt));
984				assert!(matches!(
985					result,
986					Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
987				));
988			}
989			{
990				// given
991				let limit = Limit { count: 2, total_bytes: 1000 };
992				let options =
993					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
994
995				let api = Arc::new(TestApi::default());
996				let pool = Pool::new(options, true.into(), api.clone());
997
998				let hash_of_block0 = api.expect_hash_from_number(0);
999
1000				// after validation `IncludeData` will have priority set to 9001
1001				// (validate_transaction mock)
1002				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1003				block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
1004				assert_eq!(pool.validated_pool().status().ready, 1);
1005
1006				// after validation `Transfer` will have priority set to 4 (validate_transaction
1007				// mock)
1008				let xt = uxt(Transfer {
1009					from: Alice.into(),
1010					to: AccountId::from_h256(H256::from_low_u64_be(2)),
1011					amount: 5,
1012					nonce: 0,
1013				});
1014				let watcher = block_on(pool.submit_and_watch(hash_of_block0, SOURCE, xt)).unwrap();
1015				assert_eq!(pool.validated_pool().status().ready, 2);
1016
1017				// when
1018				// after validation `Store` will have priority set to 9001 (validate_transaction
1019				// mock)
1020				let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1021				block_on(pool.submit_one(api.expect_hash_from_number(1), SOURCE, xt)).unwrap();
1022				assert_eq!(pool.validated_pool().status().ready, 2);
1023
1024				// then
1025				let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1026				assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1027				assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1028			}
1029		}
1030
1031		#[test]
1032		fn should_handle_pruning_in_the_middle_of_import() {
1033			// given
1034			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1035			let (tx, rx) = std::sync::mpsc::sync_channel(1);
1036			let mut api = TestApi::default();
1037			api.delay = Arc::new(Mutex::new(rx.into()));
1038			let api = Arc::new(api);
1039			let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
1040
1041			let hash_of_block0 = api.expect_hash_from_number(0);
1042
1043			// when
1044			let xt = uxt(Transfer {
1045				from: Alice.into(),
1046				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1047				amount: 5,
1048				nonce: 1,
1049			});
1050
1051			// This transaction should go to future, since we use `nonce: 1`
1052			let pool2 = pool.clone();
1053			std::thread::spawn(move || {
1054				block_on(pool2.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
1055				ready.send(()).unwrap();
1056			});
1057
1058			// But now before the previous one is imported we import
1059			// the one that it depends on.
1060			let xt = uxt(Transfer {
1061				from: Alice.into(),
1062				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1063				amount: 4,
1064				nonce: 0,
1065			});
1066			// The tag the above transaction provides (TestApi is using just nonce as u8)
1067			let provides = vec![0_u8];
1068			block_on(pool.submit_one(hash_of_block0, SOURCE, xt)).unwrap();
1069			assert_eq!(pool.validated_pool().status().ready, 1);
1070
1071			// Now block import happens before the second transaction is able to finish
1072			// verification.
1073			block_on(pool.prune_tags(api.expect_hash_from_number(1), vec![provides], vec![]))
1074				.unwrap();
1075			assert_eq!(pool.validated_pool().status().ready, 0);
1076
1077			// so when we release the verification of the previous one it will have
1078			// something in `requires`, but should go to ready directly, since the previous
1079			// transaction was imported correctly.
1080			tx.send(()).unwrap();
1081
1082			// then
1083			is_ready.recv().unwrap(); // wait for finish
1084			assert_eq!(pool.validated_pool().status().ready, 1);
1085			assert_eq!(pool.validated_pool().status().future, 0);
1086		}
1087	}
1088}