referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
20use async_trait::async_trait;
21use futures::channel::mpsc::Receiver;
22use indexmap::IndexMap;
23use sc_transaction_pool_api::error;
24use sp_blockchain::{HashAndNumber, TreeRoute};
25use sp_runtime::{
26	generic::BlockId,
27	traits::{self, Block as BlockT, SaturatedConversion},
28	transaction_validity::{
29		TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30	},
31};
32use std::{
33	collections::HashMap,
34	sync::Arc,
35	time::{Duration, Instant},
36};
37use tracing::{debug, instrument, trace, Level};
38
39use super::{
40	base_pool as base,
41	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
42	EventHandler, ValidatedPoolSubmitOutcome,
43};
44
45/// Modification notification event stream type;
46pub type EventStream<H> = Receiver<H>;
47
48/// Block hash type for a pool.
49pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50/// Extrinsic hash type for a pool.
51pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
52/// Extrinsic type for a pool (reference counted).
53pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
54/// Extrinsic type for a pool (raw data).
55pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
56/// Block number type for the ChainApi
57pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
58/// A type of transaction stored in the pool
59pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
60/// A type of validated transaction stored in the pool.
61pub type ValidatedTransactionFor<A> =
62	ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
63
64/// The priority of request to validate the transaction.
65#[derive(PartialEq, Copy, Clone)]
66pub enum ValidateTransactionPriority {
67	/// Validate the newly submitted transactions
68	///
69	/// Validation will be done with lower priority.
70	Submitted,
71	/// Validate the transaction during maintainance process,
72	///
73	/// Validation will be performed with higher priority.
74	Maintained,
75}
76
77/// Concrete extrinsic validation and query logic.
78#[async_trait]
79pub trait ChainApi: Send + Sync {
80	/// Block type.
81	type Block: BlockT;
82	/// Error type.
83	type Error: From<error::Error> + error::IntoPoolError + error::IntoMetricsLabel;
84
85	/// Asynchronously verify extrinsic at given block.
86	async fn validate_transaction(
87		&self,
88		at: <Self::Block as BlockT>::Hash,
89		source: TransactionSource,
90		uxt: ExtrinsicFor<Self>,
91		validation_priority: ValidateTransactionPriority,
92	) -> Result<TransactionValidity, Self::Error>;
93
94	/// Synchronously verify given extrinsic at given block.
95	///
96	/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
97	/// blocks the current thread when performing validation.
98	fn validate_transaction_blocking(
99		&self,
100		at: <Self::Block as BlockT>::Hash,
101		source: TransactionSource,
102		uxt: ExtrinsicFor<Self>,
103	) -> Result<TransactionValidity, Self::Error>;
104
105	/// Returns a block number given the block id.
106	fn block_id_to_number(
107		&self,
108		at: &BlockId<Self::Block>,
109	) -> Result<Option<NumberFor<Self>>, Self::Error>;
110
111	/// Returns a block hash given the block id.
112	fn block_id_to_hash(
113		&self,
114		at: &BlockId<Self::Block>,
115	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
116
117	/// Returns hash and encoding length of the extrinsic.
118	fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
119
120	/// Returns a block body given the block.
121	async fn block_body(
122		&self,
123		at: <Self::Block as BlockT>::Hash,
124	) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
125
126	/// Returns a block header given the block id.
127	fn block_header(
128		&self,
129		at: <Self::Block as BlockT>::Hash,
130	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
131
132	/// Compute a tree-route between two blocks. See [`TreeRoute`] for more details.
133	fn tree_route(
134		&self,
135		from: <Self::Block as BlockT>::Hash,
136		to: <Self::Block as BlockT>::Hash,
137	) -> Result<TreeRoute<Self::Block>, Self::Error>;
138
139	/// Resolves block number by id.
140	fn resolve_block_number(
141		&self,
142		at: <Self::Block as BlockT>::Hash,
143	) -> Result<NumberFor<Self>, Self::Error> {
144		self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
145			number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
146		})
147	}
148}
149
150/// Pool configuration options.
151#[derive(Debug, Clone)]
152pub struct Options {
153	/// Ready queue limits.
154	pub ready: base::Limit,
155	/// Future queue limits.
156	pub future: base::Limit,
157	/// Reject future transactions.
158	pub reject_future_transactions: bool,
159	/// How long the extrinsic is banned for.
160	pub ban_time: Duration,
161}
162
163impl Default for Options {
164	fn default() -> Self {
165		Self {
166			ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
167			future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
168			reject_future_transactions: false,
169			ban_time: Duration::from_secs(60 * 30),
170		}
171	}
172}
173
174impl Options {
175	/// Total (ready+future) maximal number of transactions in the pool.
176	pub fn total_count(&self) -> usize {
177		self.ready.count + self.future.count
178	}
179}
180
181/// Should we check that the transaction is banned
182/// in the pool, before we verify it?
183#[derive(Copy, Clone)]
184pub(crate) enum CheckBannedBeforeVerify {
185	Yes,
186	No,
187}
188
189/// Extrinsics pool that performs validation.
190pub struct Pool<B: ChainApi, L: EventHandler<B>> {
191	validated_pool: Arc<ValidatedPool<B, L>>,
192}
193
194impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
195	/// Create a new transaction pool with statically sized rotator.
196	pub fn new_with_staticly_sized_rotator(
197		options: Options,
198		is_validator: IsValidator,
199		api: Arc<B>,
200	) -> Self {
201		Self {
202			validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
203				options,
204				is_validator,
205				api,
206			)),
207		}
208	}
209
210	/// Create a new transaction pool.
211	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212		Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
213	}
214
215	/// Create a new transaction pool.
216	pub fn new_with_event_handler(
217		options: Options,
218		is_validator: IsValidator,
219		api: Arc<B>,
220		event_handler: L,
221	) -> Self {
222		Self {
223			validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
224				options,
225				is_validator,
226				api,
227				event_handler,
228			)),
229		}
230	}
231
232	/// Imports a bunch of unverified extrinsics to the pool
233	#[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
234	pub async fn submit_at(
235		&self,
236		at: &HashAndNumber<B::Block>,
237		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
238		validation_priority: ValidateTransactionPriority,
239	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
240		let validated_transactions =
241			self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
242		self.validated_pool.submit(validated_transactions.into_values())
243	}
244
245	/// Resubmit the given extrinsics to the pool.
246	///
247	/// This does not check if a transaction is banned, before we verify it again.
248	pub async fn resubmit_at(
249		&self,
250		at: &HashAndNumber<B::Block>,
251		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
252		validation_priority: ValidateTransactionPriority,
253	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
254		let validated_transactions =
255			self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
256		self.validated_pool.submit(validated_transactions.into_values())
257	}
258
259	/// Imports one unverified extrinsic to the pool
260	pub async fn submit_one(
261		&self,
262		at: &HashAndNumber<B::Block>,
263		source: base::TimedTransactionSource,
264		xt: ExtrinsicFor<B>,
265	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
266		let res = self
267			.submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
268			.await
269			.pop();
270		res.expect("One extrinsic passed; one result returned; qed")
271	}
272
273	/// Import a single extrinsic and starts to watch its progress in the pool.
274	pub async fn submit_and_watch(
275		&self,
276		at: &HashAndNumber<B::Block>,
277		source: base::TimedTransactionSource,
278		xt: ExtrinsicFor<B>,
279	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
280		let (_, tx) = self
281			.verify_one(
282				at.hash,
283				at.number,
284				source,
285				xt,
286				CheckBannedBeforeVerify::Yes,
287				ValidateTransactionPriority::Submitted,
288			)
289			.await;
290		self.validated_pool.submit_and_watch(tx)
291	}
292
293	/// Resubmit some transaction that were validated elsewhere.
294	pub fn resubmit(
295		&self,
296		revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
297	) {
298		let now = Instant::now();
299		self.validated_pool.resubmit(revalidated_transactions);
300		trace!(
301			target: LOG_TARGET,
302			duration = ?now.elapsed(),
303			status = ?self.validated_pool.status(),
304			"Resubmitted transaction."
305		);
306	}
307
308	/// Prunes known ready transactions.
309	///
310	/// Used to clear the pool from transactions that were part of recently imported block.
311	/// The main difference from the `prune` is that we do not revalidate any transactions
312	/// and ignore unknown passed hashes.
313	pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
314		// Get details of all extrinsics that are already in the pool
315		let in_pool_tags =
316			self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
317
318		// Prune all transactions that provide given tags
319		let prune_status = self.validated_pool.prune_tags(in_pool_tags);
320		let pruned_transactions =
321			hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
322		self.validated_pool.fire_pruned(at, pruned_transactions);
323	}
324
325	/// Prunes ready transactions.
326	///
327	/// Used to clear the pool from transactions that were part of recently imported block.
328	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
329	/// into runtime too often we first look up all extrinsics that are in the pool and get
330	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
331	pub async fn prune(
332		&self,
333		at: &HashAndNumber<B::Block>,
334		parent: <B::Block as BlockT>::Hash,
335		extrinsics: &[RawExtrinsicFor<B>],
336		known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
337	) {
338		debug!(
339			target: LOG_TARGET,
340			?at,
341			extrinsics_count = extrinsics.len(),
342			"Starting pruning of block."
343		);
344		// Get details of all extrinsics that are already in the pool
345		let in_pool_hashes =
346			extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
347		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
348		// Fill unknown tags based on the known tags given in `known_provides_tags`.
349		let mut unknown_txs_count = 0usize;
350		let mut reused_txs_count = 0usize;
351		let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
352			tags.or_else(|| {
353				unknown_txs_count += 1;
354				known_provides_tags.as_ref().and_then(|inner| {
355					inner.get(&tx_hash).map(|found_tags| {
356						reused_txs_count += 1;
357						found_tags.clone()
358					})
359				})
360			})
361		});
362
363		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic,
364		// Option<Vec<Tag>>)`)
365		let all = extrinsics.iter().zip(tags);
366		let mut validated_counter: usize = 0;
367		let mut future_tags = Vec::new();
368		let now = Instant::now();
369		for (extrinsic, in_pool_tags) in all {
370			match in_pool_tags {
371				// reuse the tags for extrinsics that were found in the pool or given in
372				// `known_provides_tags` cache.
373				Some(tags) => future_tags.extend(tags),
374				// if it's not found in the pool query the runtime at parent block
375				// to get validity info and tags that the extrinsic provides.
376				None => {
377					// Avoid validating block txs if the pool is empty
378					if !self.validated_pool.status().is_empty() {
379						validated_counter = validated_counter + 1;
380						let validity = self
381							.validated_pool
382							.api()
383							.validate_transaction(
384								parent,
385								TransactionSource::InBlock,
386								Arc::from(extrinsic.clone()),
387								ValidateTransactionPriority::Maintained,
388							)
389							.await;
390
391						trace!(
392							target: LOG_TARGET,
393							tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
394							?validity,
395							"prune::revalidated"
396						);
397						if let Ok(Ok(validity)) = validity {
398							future_tags.extend(validity.provides);
399						}
400					} else {
401						trace!(
402							target: LOG_TARGET,
403							?at,
404							"txpool is empty, skipping validation for block",
405						);
406					}
407				},
408			}
409		}
410
411		let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
412		debug!(
413			target: LOG_TARGET,
414			validated_counter,
415			known_provides_tags_len,
416			unknown_txs_count,
417			reused_txs_count,
418			duration = ?now.elapsed(),
419			"prune"
420		);
421		self.prune_tags(at, future_tags, in_pool_hashes).await
422	}
423
424	/// Prunes ready transactions that provide given list of tags.
425	///
426	/// Given tags are assumed to be always provided now, so all transactions
427	/// in the Future Queue that require that particular tag (and have other
428	/// requirements satisfied) are promoted to Ready Queue.
429	///
430	/// Moreover for each provided tag we remove transactions in the pool that:
431	/// 1. Provide that tag directly
432	/// 2. Are a dependency of pruned transaction.
433	///
434	/// Returns transactions that have been removed from the pool and must be reverified
435	/// before reinserting to the pool.
436	///
437	/// By removing predecessor transactions as well we might actually end up
438	/// pruning too much, so all removed transactions are reverified against
439	/// the runtime (`validate_transaction`) to make sure they are invalid.
440	///
441	/// However we avoid revalidating transactions that are contained within
442	/// the second parameter of `known_imported_hashes`. These transactions
443	/// (if pruned) are not revalidated and become temporarily banned to
444	/// prevent importing them in the (near) future.
445	pub async fn prune_tags(
446		&self,
447		at: &HashAndNumber<B::Block>,
448		tags: impl IntoIterator<Item = Tag>,
449		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
450	) {
451		let now = Instant::now();
452		trace!(target: LOG_TARGET, ?at, "Pruning tags.");
453		// Prune all transactions that provide given tags
454		let prune_status = self.validated_pool.prune_tags(tags);
455
456		// Make sure that we don't revalidate extrinsics that were part of the recently
457		// imported block. This is especially important for UTXO-like chains cause the
458		// inputs are pruned so such transaction would go to future again.
459		self.validated_pool
460			.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
461
462		// Try to re-validate pruned transactions since some of them might be still valid.
463		// note that `known_imported_hashes` will be rejected here due to temporary ban.
464		let pruned_transactions =
465			prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
466
467		let reverified_transactions = self
468			.verify(
469				at,
470				pruned_transactions,
471				CheckBannedBeforeVerify::Yes,
472				ValidateTransactionPriority::Maintained,
473			)
474			.await;
475
476		let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
477		debug!(
478			target: LOG_TARGET,
479			?at,
480			reverified_transactions = reverified_transactions.len(),
481			duration = ?now.elapsed(),
482			"Pruned. Resubmitting transactions."
483		);
484		log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
485
486		// And finally - submit reverified transactions back to the pool
487		self.validated_pool.resubmit_pruned(
488			&at,
489			known_imported_hashes,
490			pruned_hashes,
491			reverified_transactions.into_values().collect(),
492		)
493	}
494
495	/// Returns transaction hash
496	pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
497		self.validated_pool.api().hash_and_length(xt).0
498	}
499
500	/// Returns future that validates a bunch of transactions at given block.
501	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
502	async fn verify(
503		&self,
504		at: &HashAndNumber<B::Block>,
505		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
506		check: CheckBannedBeforeVerify,
507		validation_priority: ValidateTransactionPriority,
508	) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
509		let HashAndNumber { number, hash } = *at;
510
511		let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
512			self.verify_one(hash, number, source, xt, check, validation_priority)
513		}))
514		.await
515		.into_iter()
516		.collect::<IndexMap<_, _>>();
517
518		res
519	}
520
521	/// Returns future that validates single transaction at given block.
522	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
523	pub(crate) async fn verify_one(
524		&self,
525		block_hash: <B::Block as BlockT>::Hash,
526		block_number: NumberFor<B>,
527		source: base::TimedTransactionSource,
528		xt: ExtrinsicFor<B>,
529		check: CheckBannedBeforeVerify,
530		validation_priority: ValidateTransactionPriority,
531	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
532		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
533
534		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
535		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
536			return (hash, ValidatedTransaction::Invalid(hash, err));
537		}
538
539		let validation_result = self
540			.validated_pool
541			.api()
542			.validate_transaction(
543				block_hash,
544				source.clone().into(),
545				xt.clone(),
546				validation_priority,
547			)
548			.await;
549
550		let status = match validation_result {
551			Ok(status) => status,
552			Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
553		};
554
555		let validity = match status {
556			Ok(validity) => {
557				if validity.provides.is_empty() {
558					ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
559				} else {
560					ValidatedTransaction::valid_at(
561						block_number.saturated_into::<u64>(),
562						hash,
563						source,
564						xt,
565						bytes,
566						validity,
567					)
568				}
569			},
570			Err(TransactionValidityError::Invalid(e)) => {
571				ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into())
572			},
573			Err(TransactionValidityError::Unknown(e)) => {
574				ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into())
575			},
576		};
577
578		(hash, validity)
579	}
580
581	/// Get a reference to the underlying validated pool.
582	pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
583		&self.validated_pool
584	}
585
586	/// Clears the recently pruned transactions in validated pool.
587	pub fn clear_recently_pruned(&mut self) {
588		self.validated_pool.pool.write().clear_recently_pruned();
589	}
590}
591
592impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
593	/// Deep clones the pool.
594	///
595	/// Must be called on purpose: it duplicates all the internal structures.
596	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
597		let other: ValidatedPool<B, L> =
598			self.validated_pool().deep_clone_with_event_handler(event_handler);
599		Self { validated_pool: Arc::from(other) }
600	}
601}
602
603#[cfg(test)]
604mod tests {
605	use super::{super::base_pool::Limit, *};
606	use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
607	use assert_matches::assert_matches;
608	use base::TimedTransactionSource;
609	use codec::Encode;
610	use futures::executor::block_on;
611	use parking_lot::Mutex;
612	use sc_transaction_pool_api::TransactionStatus;
613	use sp_runtime::transaction_validity::TransactionSource;
614	use std::{collections::HashMap, time::Instant};
615	use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
616	use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
617
618	const SOURCE: TimedTransactionSource =
619		TimedTransactionSource { source: TransactionSource::External, timestamp: None };
620
621	type Pool<Api> = super::Pool<Api, ()>;
622
623	#[test]
624	fn should_validate_and_import_transaction() {
625		// given
626		let (pool, api) = pool();
627
628		// when
629		let hash = block_on(
630			pool.submit_one(
631				&api.expect_hash_and_number(0),
632				SOURCE,
633				uxt(Transfer {
634					from: Alice.into(),
635					to: AccountId::from_h256(H256::from_low_u64_be(2)),
636					amount: 5,
637					nonce: 0,
638				})
639				.into(),
640			),
641		)
642		.map(|outcome| outcome.hash())
643		.unwrap();
644
645		// then
646		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
647	}
648
649	#[test]
650	fn submit_at_preserves_order() {
651		sp_tracing::try_init_simple();
652		// given
653		let (pool, api) = pool();
654
655		let txs = (0..10)
656			.map(|i| {
657				uxt(Transfer {
658					from: Alice.into(),
659					to: AccountId::from_h256(H256::from_low_u64_be(i)),
660					amount: 5,
661					nonce: i,
662				})
663				.into()
664			})
665			.collect::<Vec<_>>();
666
667		let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
668
669		// when
670		let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
671		let hashes = block_on(pool.submit_at(
672			&api.expect_hash_and_number(0),
673			txs,
674			ValidateTransactionPriority::Submitted,
675		))
676		.into_iter()
677		.map(|r| r.map(|o| o.hash()))
678		.collect::<Vec<_>>();
679		debug!(hashes = ?hashes, "-->");
680
681		// then
682		hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
683			|(result_hash, initial_hash)| {
684				assert_eq!(result_hash.unwrap(), initial_hash);
685			},
686		);
687	}
688
689	#[test]
690	fn should_reject_if_temporarily_banned() {
691		// given
692		let (pool, api) = pool();
693		let uxt = uxt(Transfer {
694			from: Alice.into(),
695			to: AccountId::from_h256(H256::from_low_u64_be(2)),
696			amount: 5,
697			nonce: 0,
698		});
699
700		// when
701		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
702		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
703			.map(|o| o.hash());
704		assert_eq!(pool.validated_pool().status().ready, 0);
705		assert_eq!(pool.validated_pool().status().future, 0);
706
707		// then
708		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
709	}
710
711	#[test]
712	fn should_reject_unactionable_transactions() {
713		// given
714		let api = Arc::new(TestApi::default());
715		let pool = Pool::new_with_staticly_sized_rotator(
716			Default::default(),
717			// the node does not author blocks
718			false.into(),
719			api.clone(),
720		);
721
722		// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
723		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
724
725		// when
726		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
727			.map(|o| o.hash());
728
729		// then
730		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
731	}
732
733	#[test]
734	fn should_notify_about_pool_events() {
735		let (stream, hash0, hash1) = {
736			// given
737			let (pool, api) = pool();
738			let han_of_block0 = api.expect_hash_and_number(0);
739			let stream = pool.validated_pool().import_notification_stream();
740
741			// when
742			let hash0 = block_on(
743				pool.submit_one(
744					&han_of_block0,
745					SOURCE,
746					uxt(Transfer {
747						from: Alice.into(),
748						to: AccountId::from_h256(H256::from_low_u64_be(2)),
749						amount: 5,
750						nonce: 0,
751					})
752					.into(),
753				),
754			)
755			.unwrap()
756			.hash();
757			let hash1 = block_on(
758				pool.submit_one(
759					&han_of_block0,
760					SOURCE,
761					uxt(Transfer {
762						from: Alice.into(),
763						to: AccountId::from_h256(H256::from_low_u64_be(2)),
764						amount: 5,
765						nonce: 1,
766					})
767					.into(),
768				),
769			)
770			.unwrap()
771			.hash();
772			// future doesn't count
773			let _hash = block_on(
774				pool.submit_one(
775					&han_of_block0,
776					SOURCE,
777					uxt(Transfer {
778						from: Alice.into(),
779						to: AccountId::from_h256(H256::from_low_u64_be(2)),
780						amount: 5,
781						nonce: 3,
782					})
783					.into(),
784				),
785			)
786			.unwrap()
787			.hash();
788
789			assert_eq!(pool.validated_pool().status().ready, 2);
790			assert_eq!(pool.validated_pool().status().future, 1);
791
792			(stream, hash0, hash1)
793		};
794
795		// then
796		let mut it = futures::executor::block_on_stream(stream);
797		assert_eq!(it.next(), Some(hash0));
798		assert_eq!(it.next(), Some(hash1));
799		assert_eq!(it.next(), None);
800	}
801
802	#[test]
803	fn should_clear_stale_transactions() {
804		// given
805		let (pool, api) = pool();
806		let han_of_block0 = api.expect_hash_and_number(0);
807		let hash1 = block_on(
808			pool.submit_one(
809				&han_of_block0,
810				SOURCE,
811				uxt(Transfer {
812					from: Alice.into(),
813					to: AccountId::from_h256(H256::from_low_u64_be(2)),
814					amount: 5,
815					nonce: 0,
816				})
817				.into(),
818			),
819		)
820		.unwrap()
821		.hash();
822		let hash2 = block_on(
823			pool.submit_one(
824				&han_of_block0,
825				SOURCE,
826				uxt(Transfer {
827					from: Alice.into(),
828					to: AccountId::from_h256(H256::from_low_u64_be(2)),
829					amount: 5,
830					nonce: 1,
831				})
832				.into(),
833			),
834		)
835		.unwrap()
836		.hash();
837		let hash3 = block_on(
838			pool.submit_one(
839				&han_of_block0,
840				SOURCE,
841				uxt(Transfer {
842					from: Alice.into(),
843					to: AccountId::from_h256(H256::from_low_u64_be(2)),
844					amount: 5,
845					nonce: 3,
846				})
847				.into(),
848			),
849		)
850		.unwrap()
851		.hash();
852
853		// when
854		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
855
856		// then
857		assert_eq!(pool.validated_pool().ready().count(), 0);
858		assert_eq!(pool.validated_pool().status().future, 0);
859		assert_eq!(pool.validated_pool().status().ready, 0);
860		// make sure they are temporarily banned as well
861		assert!(pool.validated_pool.is_banned(&hash1));
862		assert!(pool.validated_pool.is_banned(&hash2));
863		assert!(pool.validated_pool.is_banned(&hash3));
864	}
865
866	#[test]
867	fn should_ban_mined_transactions() {
868		// given
869		let (pool, api) = pool();
870		let hash1 = block_on(
871			pool.submit_one(
872				&api.expect_hash_and_number(0),
873				SOURCE,
874				uxt(Transfer {
875					from: Alice.into(),
876					to: AccountId::from_h256(H256::from_low_u64_be(2)),
877					amount: 5,
878					nonce: 0,
879				})
880				.into(),
881			),
882		)
883		.unwrap()
884		.hash();
885
886		// when
887		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
888
889		// then
890		assert!(pool.validated_pool.is_banned(&hash1));
891	}
892
893	#[test]
894	fn should_limit_futures() {
895		sp_tracing::try_init_simple();
896
897		let xt = uxt(Transfer {
898			from: Alice.into(),
899			to: AccountId::from_h256(H256::from_low_u64_be(2)),
900			amount: 5,
901			nonce: 1,
902		});
903
904		// given
905		let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
906
907		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
908
909		let api = Arc::new(TestApi::default());
910		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
911
912		let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
913			.unwrap()
914			.hash();
915		assert_eq!(pool.validated_pool().status().future, 1);
916
917		// when
918		let hash2 = block_on(
919			pool.submit_one(
920				&api.expect_hash_and_number(0),
921				SOURCE,
922				uxt(Transfer {
923					from: Bob.into(),
924					to: AccountId::from_h256(H256::from_low_u64_be(2)),
925					amount: 5,
926					nonce: 10,
927				})
928				.into(),
929			),
930		)
931		.unwrap()
932		.hash();
933
934		// then
935		assert_eq!(pool.validated_pool().status().future, 1);
936		assert!(pool.validated_pool.is_banned(&hash1));
937		assert!(!pool.validated_pool.is_banned(&hash2));
938	}
939
940	#[test]
941	fn should_error_if_reject_immediately() {
942		// given
943		let limit = Limit { count: 100, total_bytes: 10 };
944
945		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
946
947		let api = Arc::new(TestApi::default());
948		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
949
950		// when
951		block_on(
952			pool.submit_one(
953				&api.expect_hash_and_number(0),
954				SOURCE,
955				uxt(Transfer {
956					from: Alice.into(),
957					to: AccountId::from_h256(H256::from_low_u64_be(2)),
958					amount: 5,
959					nonce: 1,
960				})
961				.into(),
962			),
963		)
964		.map(|o| o.hash())
965		.unwrap_err();
966
967		// then
968		assert_eq!(pool.validated_pool().status().ready, 0);
969		assert_eq!(pool.validated_pool().status().future, 0);
970	}
971
972	#[test]
973	fn should_reject_transactions_with_no_provides() {
974		// given
975		let (pool, api) = pool();
976
977		// when
978		let err = block_on(
979			pool.submit_one(
980				&api.expect_hash_and_number(0),
981				SOURCE,
982				uxt(Transfer {
983					from: Alice.into(),
984					to: AccountId::from_h256(H256::from_low_u64_be(2)),
985					amount: 5,
986					nonce: INVALID_NONCE,
987				})
988				.into(),
989			),
990		)
991		.map(|o| o.hash())
992		.unwrap_err();
993
994		// then
995		assert_eq!(pool.validated_pool().status().ready, 0);
996		assert_eq!(pool.validated_pool().status().future, 0);
997		assert_matches!(err, error::Error::NoTagsProvided);
998	}
999
1000	mod listener {
1001		use super::*;
1002
1003		#[test]
1004		fn should_trigger_ready_and_finalized() {
1005			// given
1006			let (pool, api) = pool();
1007			let watcher = block_on(
1008				pool.submit_and_watch(
1009					&api.expect_hash_and_number(0),
1010					SOURCE,
1011					uxt(Transfer {
1012						from: Alice.into(),
1013						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1014						amount: 5,
1015						nonce: 0,
1016					})
1017					.into(),
1018				),
1019			)
1020			.unwrap()
1021			.expect_watcher();
1022			assert_eq!(pool.validated_pool().status().ready, 1);
1023			assert_eq!(pool.validated_pool().status().future, 0);
1024
1025			let han_of_block2 = api.expect_hash_and_number(2);
1026
1027			// when
1028			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1029			assert_eq!(pool.validated_pool().status().ready, 0);
1030			assert_eq!(pool.validated_pool().status().future, 0);
1031
1032			// then
1033			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1034			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1035			assert_eq!(
1036				stream.next(),
1037				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1038			);
1039		}
1040
1041		#[test]
1042		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1043			// given
1044			let (pool, api) = pool();
1045			let watcher = block_on(
1046				pool.submit_and_watch(
1047					&api.expect_hash_and_number(0),
1048					SOURCE,
1049					uxt(Transfer {
1050						from: Alice.into(),
1051						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1052						amount: 5,
1053						nonce: 0,
1054					})
1055					.into(),
1056				),
1057			)
1058			.unwrap()
1059			.expect_watcher();
1060			assert_eq!(pool.validated_pool().status().ready, 1);
1061			assert_eq!(pool.validated_pool().status().future, 0);
1062
1063			let han_of_block2 = api.expect_hash_and_number(2);
1064
1065			// when
1066			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1067			assert_eq!(pool.validated_pool().status().ready, 0);
1068			assert_eq!(pool.validated_pool().status().future, 0);
1069
1070			// then
1071			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1072			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1073			assert_eq!(
1074				stream.next(),
1075				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1076			);
1077		}
1078
1079		#[test]
1080		fn should_trigger_future_and_ready_after_promoted() {
1081			// given
1082			let (pool, api) = pool();
1083			let han_of_block0 = api.expect_hash_and_number(0);
1084
1085			let watcher = block_on(
1086				pool.submit_and_watch(
1087					&han_of_block0,
1088					SOURCE,
1089					uxt(Transfer {
1090						from: Alice.into(),
1091						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1092						amount: 5,
1093						nonce: 1,
1094					})
1095					.into(),
1096				),
1097			)
1098			.unwrap()
1099			.expect_watcher();
1100			assert_eq!(pool.validated_pool().status().ready, 0);
1101			assert_eq!(pool.validated_pool().status().future, 1);
1102
1103			// when
1104			block_on(
1105				pool.submit_one(
1106					&han_of_block0,
1107					SOURCE,
1108					uxt(Transfer {
1109						from: Alice.into(),
1110						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1111						amount: 5,
1112						nonce: 0,
1113					})
1114					.into(),
1115				),
1116			)
1117			.unwrap();
1118			assert_eq!(pool.validated_pool().status().ready, 2);
1119
1120			// then
1121			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1122			assert_eq!(stream.next(), Some(TransactionStatus::Future));
1123			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1124		}
1125
1126		#[test]
1127		fn should_trigger_invalid_and_ban() {
1128			// given
1129			let (pool, api) = pool();
1130			let uxt = uxt(Transfer {
1131				from: Alice.into(),
1132				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1133				amount: 5,
1134				nonce: 0,
1135			});
1136			let watcher =
1137				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1138					.unwrap()
1139					.expect_watcher();
1140			assert_eq!(pool.validated_pool().status().ready, 1);
1141
1142			// when
1143			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1144
1145			// then
1146			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1147			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1148			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1149			assert_eq!(stream.next(), None);
1150		}
1151
1152		#[test]
1153		fn should_trigger_broadcasted() {
1154			// given
1155			let (pool, api) = pool();
1156			let uxt = uxt(Transfer {
1157				from: Alice.into(),
1158				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1159				amount: 5,
1160				nonce: 0,
1161			});
1162			let watcher =
1163				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1164					.unwrap()
1165					.expect_watcher();
1166			assert_eq!(pool.validated_pool().status().ready, 1);
1167
1168			// when
1169			let mut map = HashMap::new();
1170			let peers = vec!["a".into(), "b".into(), "c".into()];
1171			map.insert(*watcher.hash(), peers.clone());
1172			pool.validated_pool().on_broadcasted(map);
1173
1174			// then
1175			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1176			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1177			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1178		}
1179
1180		#[test]
1181		fn should_trigger_dropped_older() {
1182			// given
1183			let limit = Limit { count: 1, total_bytes: 1000 };
1184			let options =
1185				Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1186
1187			let api = Arc::new(TestApi::default());
1188			let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1189
1190			let xt = uxt(Transfer {
1191				from: Alice.into(),
1192				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1193				amount: 5,
1194				nonce: 0,
1195			});
1196			let watcher =
1197				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1198					.unwrap()
1199					.expect_watcher();
1200			assert_eq!(pool.validated_pool().status().ready, 1);
1201
1202			// when
1203			let xt = uxt(Transfer {
1204				from: Bob.into(),
1205				to: AccountId::from_h256(H256::from_low_u64_be(1)),
1206				amount: 4,
1207				nonce: 1,
1208			});
1209			block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1210			assert_eq!(pool.validated_pool().status().ready, 1);
1211
1212			// then
1213			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1214			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1215			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1216		}
1217
1218		#[test]
1219		fn should_trigger_dropped_lower_priority() {
1220			{
1221				// given
1222				let limit = Limit { count: 1, total_bytes: 1000 };
1223				let options =
1224					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1225
1226				let api = Arc::new(TestApi::default());
1227				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1228
1229				// after validation `IncludeData` will have priority set to 9001
1230				// (validate_transaction mock)
1231				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1232				block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1233					.unwrap();
1234				assert_eq!(pool.validated_pool().status().ready, 1);
1235
1236				// then
1237				// after validation `Transfer` will have priority set to 4 (validate_transaction
1238				// mock)
1239				let xt = uxt(Transfer {
1240					from: Bob.into(),
1241					to: AccountId::from_h256(H256::from_low_u64_be(1)),
1242					amount: 4,
1243					nonce: 1,
1244				});
1245				let result =
1246					block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1247				assert!(matches!(
1248					result,
1249					Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1250				));
1251			}
1252			{
1253				// given
1254				let limit = Limit { count: 2, total_bytes: 1000 };
1255				let options =
1256					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1257
1258				let api = Arc::new(TestApi::default());
1259				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1260
1261				let han_of_block0 = api.expect_hash_and_number(0);
1262
1263				// after validation `IncludeData` will have priority set to 9001
1264				// (validate_transaction mock)
1265				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1266				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1267					.unwrap()
1268					.expect_watcher();
1269				assert_eq!(pool.validated_pool().status().ready, 1);
1270
1271				// after validation `Transfer` will have priority set to 4 (validate_transaction
1272				// mock)
1273				let xt = uxt(Transfer {
1274					from: Alice.into(),
1275					to: AccountId::from_h256(H256::from_low_u64_be(2)),
1276					amount: 5,
1277					nonce: 0,
1278				});
1279				let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1280					.unwrap()
1281					.expect_watcher();
1282				assert_eq!(pool.validated_pool().status().ready, 2);
1283
1284				// when
1285				// after validation `Store` will have priority set to 9001 (validate_transaction
1286				// mock)
1287				let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1288				block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1289					.unwrap();
1290				assert_eq!(pool.validated_pool().status().ready, 2);
1291
1292				// then
1293				let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1294				assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1295				assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1296			}
1297		}
1298
1299		#[test]
1300		fn should_handle_pruning_in_the_middle_of_import() {
1301			// given
1302			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1303			let (tx, rx) = std::sync::mpsc::sync_channel(1);
1304			let mut api = TestApi::default();
1305			api.delay = Arc::new(Mutex::new(rx.into()));
1306			let api = Arc::new(api);
1307			let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1308				Default::default(),
1309				true.into(),
1310				api.clone(),
1311			));
1312
1313			let han_of_block0 = api.expect_hash_and_number(0);
1314
1315			// when
1316			let xt = uxt(Transfer {
1317				from: Alice.into(),
1318				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1319				amount: 5,
1320				nonce: 1,
1321			});
1322
1323			// This transaction should go to future, since we use `nonce: 1`
1324			let pool2 = pool.clone();
1325			std::thread::spawn({
1326				let hash_of_block0 = han_of_block0.clone();
1327				move || {
1328					block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1329					ready.send(()).unwrap();
1330				}
1331			});
1332
1333			// But now before the previous one is imported we import
1334			// the one that it depends on.
1335			let xt = uxt(Transfer {
1336				from: Alice.into(),
1337				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1338				amount: 4,
1339				nonce: 0,
1340			});
1341			// The tag the above transaction provides (TestApi is using just nonce as u8)
1342			let provides = vec![0_u8];
1343			block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1344			assert_eq!(pool.validated_pool().status().ready, 1);
1345
1346			// Now block import happens before the second transaction is able to finish
1347			// verification.
1348			block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1349			assert_eq!(pool.validated_pool().status().ready, 0);
1350
1351			// so when we release the verification of the previous one it will have
1352			// something in `requires`, but should go to ready directly, since the previous
1353			// transaction was imported correctly.
1354			tx.send(()).unwrap();
1355
1356			// then
1357			is_ready.recv().unwrap(); // wait for finish
1358			assert_eq!(pool.validated_pool().status().ready, 1);
1359			assert_eq!(pool.validated_pool().status().future, 0);
1360		}
1361	}
1362}