referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
20use async_trait::async_trait;
21use futures::channel::mpsc::Receiver;
22use indexmap::IndexMap;
23use sc_transaction_pool_api::error;
24use sp_blockchain::{HashAndNumber, TreeRoute};
25use sp_runtime::{
26	generic::BlockId,
27	traits::{self, Block as BlockT, SaturatedConversion},
28	transaction_validity::{
29		TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
30	},
31};
32use std::{
33	collections::HashMap,
34	sync::Arc,
35	time::{Duration, Instant},
36};
37use tracing::{debug, instrument, trace, Level};
38
39use super::{
40	base_pool as base,
41	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
42	EventHandler, ValidatedPoolSubmitOutcome,
43};
44
45/// Modification notification event stream type;
46pub type EventStream<H> = Receiver<H>;
47
48/// Block hash type for a pool.
49pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
50/// Extrinsic hash type for a pool.
51pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
52/// Extrinsic type for a pool (reference counted).
53pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
54/// Extrinsic type for a pool (raw data).
55pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
56/// Block number type for the ChainApi
57pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
58/// A type of transaction stored in the pool
59pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
60/// A type of validated transaction stored in the pool.
61pub type ValidatedTransactionFor<A> =
62	ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
63
64/// The priority of request to validate the transaction.
65#[derive(PartialEq, Copy, Clone)]
66pub enum ValidateTransactionPriority {
67	/// Validate the newly submitted transactions
68	///
69	/// Validation will be done with lower priority.
70	Submitted,
71	/// Validate the transaction during maintainance process,
72	///
73	/// Validation will be performed with higher priority.
74	Maintained,
75}
76
77/// Concrete extrinsic validation and query logic.
78#[async_trait]
79pub trait ChainApi: Send + Sync {
80	/// Block type.
81	type Block: BlockT;
82	/// Error type.
83	type Error: From<error::Error> + error::IntoPoolError;
84
85	/// Asynchronously verify extrinsic at given block.
86	async fn validate_transaction(
87		&self,
88		at: <Self::Block as BlockT>::Hash,
89		source: TransactionSource,
90		uxt: ExtrinsicFor<Self>,
91		validation_priority: ValidateTransactionPriority,
92	) -> Result<TransactionValidity, Self::Error>;
93
94	/// Synchronously verify given extrinsic at given block.
95	///
96	/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
97	/// blocks the current thread when performing validation.
98	fn validate_transaction_blocking(
99		&self,
100		at: <Self::Block as BlockT>::Hash,
101		source: TransactionSource,
102		uxt: ExtrinsicFor<Self>,
103	) -> Result<TransactionValidity, Self::Error>;
104
105	/// Returns a block number given the block id.
106	fn block_id_to_number(
107		&self,
108		at: &BlockId<Self::Block>,
109	) -> Result<Option<NumberFor<Self>>, Self::Error>;
110
111	/// Returns a block hash given the block id.
112	fn block_id_to_hash(
113		&self,
114		at: &BlockId<Self::Block>,
115	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
116
117	/// Returns hash and encoding length of the extrinsic.
118	fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
119
120	/// Returns a block body given the block.
121	async fn block_body(
122		&self,
123		at: <Self::Block as BlockT>::Hash,
124	) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
125
126	/// Returns a block header given the block id.
127	fn block_header(
128		&self,
129		at: <Self::Block as BlockT>::Hash,
130	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
131
132	/// Compute a tree-route between two blocks. See [`TreeRoute`] for more details.
133	fn tree_route(
134		&self,
135		from: <Self::Block as BlockT>::Hash,
136		to: <Self::Block as BlockT>::Hash,
137	) -> Result<TreeRoute<Self::Block>, Self::Error>;
138
139	/// Resolves block number by id.
140	fn resolve_block_number(
141		&self,
142		at: <Self::Block as BlockT>::Hash,
143	) -> Result<NumberFor<Self>, Self::Error> {
144		self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
145			number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
146		})
147	}
148}
149
150/// Pool configuration options.
151#[derive(Debug, Clone)]
152pub struct Options {
153	/// Ready queue limits.
154	pub ready: base::Limit,
155	/// Future queue limits.
156	pub future: base::Limit,
157	/// Reject future transactions.
158	pub reject_future_transactions: bool,
159	/// How long the extrinsic is banned for.
160	pub ban_time: Duration,
161}
162
163impl Default for Options {
164	fn default() -> Self {
165		Self {
166			ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
167			future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
168			reject_future_transactions: false,
169			ban_time: Duration::from_secs(60 * 30),
170		}
171	}
172}
173
174impl Options {
175	/// Total (ready+future) maximal number of transactions in the pool.
176	pub fn total_count(&self) -> usize {
177		self.ready.count + self.future.count
178	}
179}
180
181/// Should we check that the transaction is banned
182/// in the pool, before we verify it?
183#[derive(Copy, Clone)]
184pub(crate) enum CheckBannedBeforeVerify {
185	Yes,
186	No,
187}
188
189/// Extrinsics pool that performs validation.
190pub struct Pool<B: ChainApi, L: EventHandler<B>> {
191	validated_pool: Arc<ValidatedPool<B, L>>,
192}
193
194impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
195	/// Create a new transaction pool with statically sized rotator.
196	pub fn new_with_staticly_sized_rotator(
197		options: Options,
198		is_validator: IsValidator,
199		api: Arc<B>,
200	) -> Self {
201		Self {
202			validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
203				options,
204				is_validator,
205				api,
206			)),
207		}
208	}
209
210	/// Create a new transaction pool.
211	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212		Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
213	}
214
215	/// Create a new transaction pool.
216	pub fn new_with_event_handler(
217		options: Options,
218		is_validator: IsValidator,
219		api: Arc<B>,
220		event_handler: L,
221	) -> Self {
222		Self {
223			validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
224				options,
225				is_validator,
226				api,
227				event_handler,
228			)),
229		}
230	}
231
232	/// Imports a bunch of unverified extrinsics to the pool
233	#[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
234	pub async fn submit_at(
235		&self,
236		at: &HashAndNumber<B::Block>,
237		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
238		validation_priority: ValidateTransactionPriority,
239	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
240		let validated_transactions =
241			self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
242		self.validated_pool.submit(validated_transactions.into_values())
243	}
244
245	/// Resubmit the given extrinsics to the pool.
246	///
247	/// This does not check if a transaction is banned, before we verify it again.
248	pub async fn resubmit_at(
249		&self,
250		at: &HashAndNumber<B::Block>,
251		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
252		validation_priority: ValidateTransactionPriority,
253	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
254		let validated_transactions =
255			self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
256		self.validated_pool.submit(validated_transactions.into_values())
257	}
258
259	/// Imports one unverified extrinsic to the pool
260	pub async fn submit_one(
261		&self,
262		at: &HashAndNumber<B::Block>,
263		source: base::TimedTransactionSource,
264		xt: ExtrinsicFor<B>,
265	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
266		let res = self
267			.submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
268			.await
269			.pop();
270		res.expect("One extrinsic passed; one result returned; qed")
271	}
272
273	/// Import a single extrinsic and starts to watch its progress in the pool.
274	pub async fn submit_and_watch(
275		&self,
276		at: &HashAndNumber<B::Block>,
277		source: base::TimedTransactionSource,
278		xt: ExtrinsicFor<B>,
279	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
280		let (_, tx) = self
281			.verify_one(
282				at.hash,
283				at.number,
284				source,
285				xt,
286				CheckBannedBeforeVerify::Yes,
287				ValidateTransactionPriority::Submitted,
288			)
289			.await;
290		self.validated_pool.submit_and_watch(tx)
291	}
292
293	/// Resubmit some transaction that were validated elsewhere.
294	pub fn resubmit(
295		&self,
296		revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
297	) {
298		let now = Instant::now();
299		self.validated_pool.resubmit(revalidated_transactions);
300		trace!(
301			target: LOG_TARGET,
302			duration = ?now.elapsed(),
303			status = ?self.validated_pool.status(),
304			"Resubmitted transaction."
305		);
306	}
307
308	/// Prunes known ready transactions.
309	///
310	/// Used to clear the pool from transactions that were part of recently imported block.
311	/// The main difference from the `prune` is that we do not revalidate any transactions
312	/// and ignore unknown passed hashes.
313	pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
314		// Get details of all extrinsics that are already in the pool
315		let in_pool_tags =
316			self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
317
318		// Prune all transactions that provide given tags
319		let prune_status = self.validated_pool.prune_tags(in_pool_tags);
320		let pruned_transactions =
321			hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
322		self.validated_pool.fire_pruned(at, pruned_transactions);
323	}
324
325	/// Prunes ready transactions.
326	///
327	/// Used to clear the pool from transactions that were part of recently imported block.
328	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
329	/// into runtime too often we first look up all extrinsics that are in the pool and get
330	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
331	pub async fn prune(
332		&self,
333		at: &HashAndNumber<B::Block>,
334		parent: <B::Block as BlockT>::Hash,
335		extrinsics: &[RawExtrinsicFor<B>],
336		known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
337	) {
338		debug!(
339			target: LOG_TARGET,
340			?at,
341			extrinsics_count = extrinsics.len(),
342			"Starting pruning of block."
343		);
344		// Get details of all extrinsics that are already in the pool
345		let in_pool_hashes =
346			extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
347		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
348		// Fill unknown tags based on the known tags given in `known_provides_tags`.
349		let mut unknown_txs_count = 0usize;
350		let mut reused_txs_count = 0usize;
351		let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
352			tags.or_else(|| {
353				unknown_txs_count += 1;
354				known_provides_tags.as_ref().and_then(|inner| {
355					inner.get(&tx_hash).map(|found_tags| {
356						reused_txs_count += 1;
357						found_tags.clone()
358					})
359				})
360			})
361		});
362
363		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic,
364		// Option<Vec<Tag>>)`)
365		let all = extrinsics.iter().zip(tags);
366		let mut validated_counter: usize = 0;
367		let mut future_tags = Vec::new();
368		let now = Instant::now();
369		for (extrinsic, in_pool_tags) in all {
370			match in_pool_tags {
371				// reuse the tags for extrinsics that were found in the pool or given in
372				// `known_provides_tags` cache.
373				Some(tags) => future_tags.extend(tags),
374				// if it's not found in the pool query the runtime at parent block
375				// to get validity info and tags that the extrinsic provides.
376				None => {
377					// Avoid validating block txs if the pool is empty
378					if !self.validated_pool.status().is_empty() {
379						validated_counter = validated_counter + 1;
380						let validity = self
381							.validated_pool
382							.api()
383							.validate_transaction(
384								parent,
385								TransactionSource::InBlock,
386								Arc::from(extrinsic.clone()),
387								ValidateTransactionPriority::Maintained,
388							)
389							.await;
390
391						trace!(
392							target: LOG_TARGET,
393							tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
394							?validity,
395							"prune::revalidated"
396						);
397						if let Ok(Ok(validity)) = validity {
398							future_tags.extend(validity.provides);
399						}
400					} else {
401						trace!(
402							target: LOG_TARGET,
403							?at,
404							"txpool is empty, skipping validation for block",
405						);
406					}
407				},
408			}
409		}
410
411		let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
412		debug!(
413			target: LOG_TARGET,
414			validated_counter,
415			known_provides_tags_len,
416			unknown_txs_count,
417			reused_txs_count,
418			duration = ?now.elapsed(),
419			"prune"
420		);
421		self.prune_tags(at, future_tags, in_pool_hashes).await
422	}
423
424	/// Prunes ready transactions that provide given list of tags.
425	///
426	/// Given tags are assumed to be always provided now, so all transactions
427	/// in the Future Queue that require that particular tag (and have other
428	/// requirements satisfied) are promoted to Ready Queue.
429	///
430	/// Moreover for each provided tag we remove transactions in the pool that:
431	/// 1. Provide that tag directly
432	/// 2. Are a dependency of pruned transaction.
433	///
434	/// Returns transactions that have been removed from the pool and must be reverified
435	/// before reinserting to the pool.
436	///
437	/// By removing predecessor transactions as well we might actually end up
438	/// pruning too much, so all removed transactions are reverified against
439	/// the runtime (`validate_transaction`) to make sure they are invalid.
440	///
441	/// However we avoid revalidating transactions that are contained within
442	/// the second parameter of `known_imported_hashes`. These transactions
443	/// (if pruned) are not revalidated and become temporarily banned to
444	/// prevent importing them in the (near) future.
445	pub async fn prune_tags(
446		&self,
447		at: &HashAndNumber<B::Block>,
448		tags: impl IntoIterator<Item = Tag>,
449		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
450	) {
451		let now = Instant::now();
452		trace!(target: LOG_TARGET, ?at, "Pruning tags.");
453		// Prune all transactions that provide given tags
454		let prune_status = self.validated_pool.prune_tags(tags);
455
456		// Make sure that we don't revalidate extrinsics that were part of the recently
457		// imported block. This is especially important for UTXO-like chains cause the
458		// inputs are pruned so such transaction would go to future again.
459		self.validated_pool
460			.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
461
462		// Try to re-validate pruned transactions since some of them might be still valid.
463		// note that `known_imported_hashes` will be rejected here due to temporary ban.
464		let pruned_transactions =
465			prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
466
467		let reverified_transactions = self
468			.verify(
469				at,
470				pruned_transactions,
471				CheckBannedBeforeVerify::Yes,
472				ValidateTransactionPriority::Maintained,
473			)
474			.await;
475
476		let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
477		debug!(
478			target: LOG_TARGET,
479			?at,
480			reverified_transactions = reverified_transactions.len(),
481			duration = ?now.elapsed(),
482			"Pruned. Resubmitting transactions."
483		);
484		log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
485
486		// And finally - submit reverified transactions back to the pool
487		self.validated_pool.resubmit_pruned(
488			&at,
489			known_imported_hashes,
490			pruned_hashes,
491			reverified_transactions.into_values().collect(),
492		)
493	}
494
495	/// Returns transaction hash
496	pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
497		self.validated_pool.api().hash_and_length(xt).0
498	}
499
500	/// Returns future that validates a bunch of transactions at given block.
501	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
502	async fn verify(
503		&self,
504		at: &HashAndNumber<B::Block>,
505		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
506		check: CheckBannedBeforeVerify,
507		validation_priority: ValidateTransactionPriority,
508	) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
509		let HashAndNumber { number, hash } = *at;
510
511		let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
512			self.verify_one(hash, number, source, xt, check, validation_priority)
513		}))
514		.await
515		.into_iter()
516		.collect::<IndexMap<_, _>>();
517
518		res
519	}
520
521	/// Returns future that validates single transaction at given block.
522	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
523	pub(crate) async fn verify_one(
524		&self,
525		block_hash: <B::Block as BlockT>::Hash,
526		block_number: NumberFor<B>,
527		source: base::TimedTransactionSource,
528		xt: ExtrinsicFor<B>,
529		check: CheckBannedBeforeVerify,
530		validation_priority: ValidateTransactionPriority,
531	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
532		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
533
534		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
535		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
536			return (hash, ValidatedTransaction::Invalid(hash, err))
537		}
538
539		let validation_result = self
540			.validated_pool
541			.api()
542			.validate_transaction(
543				block_hash,
544				source.clone().into(),
545				xt.clone(),
546				validation_priority,
547			)
548			.await;
549
550		let status = match validation_result {
551			Ok(status) => status,
552			Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
553		};
554
555		let validity = match status {
556			Ok(validity) =>
557				if validity.provides.is_empty() {
558					ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
559				} else {
560					ValidatedTransaction::valid_at(
561						block_number.saturated_into::<u64>(),
562						hash,
563						source,
564						xt,
565						bytes,
566						validity,
567					)
568				},
569			Err(TransactionValidityError::Invalid(e)) =>
570				ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
571			Err(TransactionValidityError::Unknown(e)) =>
572				ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
573		};
574
575		(hash, validity)
576	}
577
578	/// Get a reference to the underlying validated pool.
579	pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
580		&self.validated_pool
581	}
582
583	/// Clears the recently pruned transactions in validated pool.
584	pub fn clear_recently_pruned(&mut self) {
585		self.validated_pool.pool.write().clear_recently_pruned();
586	}
587}
588
589impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
590	/// Deep clones the pool.
591	///
592	/// Must be called on purpose: it duplicates all the internal structures.
593	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
594		let other: ValidatedPool<B, L> =
595			self.validated_pool().deep_clone_with_event_handler(event_handler);
596		Self { validated_pool: Arc::from(other) }
597	}
598}
599
600#[cfg(test)]
601mod tests {
602	use super::{super::base_pool::Limit, *};
603	use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
604	use assert_matches::assert_matches;
605	use base::TimedTransactionSource;
606	use codec::Encode;
607	use futures::executor::block_on;
608	use parking_lot::Mutex;
609	use sc_transaction_pool_api::TransactionStatus;
610	use sp_runtime::transaction_validity::TransactionSource;
611	use std::{collections::HashMap, time::Instant};
612	use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
613	use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
614
615	const SOURCE: TimedTransactionSource =
616		TimedTransactionSource { source: TransactionSource::External, timestamp: None };
617
618	type Pool<Api> = super::Pool<Api, ()>;
619
620	#[test]
621	fn should_validate_and_import_transaction() {
622		// given
623		let (pool, api) = pool();
624
625		// when
626		let hash = block_on(
627			pool.submit_one(
628				&api.expect_hash_and_number(0),
629				SOURCE,
630				uxt(Transfer {
631					from: Alice.into(),
632					to: AccountId::from_h256(H256::from_low_u64_be(2)),
633					amount: 5,
634					nonce: 0,
635				})
636				.into(),
637			),
638		)
639		.map(|outcome| outcome.hash())
640		.unwrap();
641
642		// then
643		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
644	}
645
646	#[test]
647	fn submit_at_preserves_order() {
648		sp_tracing::try_init_simple();
649		// given
650		let (pool, api) = pool();
651
652		let txs = (0..10)
653			.map(|i| {
654				uxt(Transfer {
655					from: Alice.into(),
656					to: AccountId::from_h256(H256::from_low_u64_be(i)),
657					amount: 5,
658					nonce: i,
659				})
660				.into()
661			})
662			.collect::<Vec<_>>();
663
664		let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
665
666		// when
667		let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
668		let hashes = block_on(pool.submit_at(
669			&api.expect_hash_and_number(0),
670			txs,
671			ValidateTransactionPriority::Submitted,
672		))
673		.into_iter()
674		.map(|r| r.map(|o| o.hash()))
675		.collect::<Vec<_>>();
676		debug!(hashes = ?hashes, "-->");
677
678		// then
679		hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
680			|(result_hash, initial_hash)| {
681				assert_eq!(result_hash.unwrap(), initial_hash);
682			},
683		);
684	}
685
686	#[test]
687	fn should_reject_if_temporarily_banned() {
688		// given
689		let (pool, api) = pool();
690		let uxt = uxt(Transfer {
691			from: Alice.into(),
692			to: AccountId::from_h256(H256::from_low_u64_be(2)),
693			amount: 5,
694			nonce: 0,
695		});
696
697		// when
698		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
699		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
700			.map(|o| o.hash());
701		assert_eq!(pool.validated_pool().status().ready, 0);
702		assert_eq!(pool.validated_pool().status().future, 0);
703
704		// then
705		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
706	}
707
708	#[test]
709	fn should_reject_unactionable_transactions() {
710		// given
711		let api = Arc::new(TestApi::default());
712		let pool = Pool::new_with_staticly_sized_rotator(
713			Default::default(),
714			// the node does not author blocks
715			false.into(),
716			api.clone(),
717		);
718
719		// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
720		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
721
722		// when
723		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
724			.map(|o| o.hash());
725
726		// then
727		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
728	}
729
730	#[test]
731	fn should_notify_about_pool_events() {
732		let (stream, hash0, hash1) = {
733			// given
734			let (pool, api) = pool();
735			let han_of_block0 = api.expect_hash_and_number(0);
736			let stream = pool.validated_pool().import_notification_stream();
737
738			// when
739			let hash0 = block_on(
740				pool.submit_one(
741					&han_of_block0,
742					SOURCE,
743					uxt(Transfer {
744						from: Alice.into(),
745						to: AccountId::from_h256(H256::from_low_u64_be(2)),
746						amount: 5,
747						nonce: 0,
748					})
749					.into(),
750				),
751			)
752			.unwrap()
753			.hash();
754			let hash1 = block_on(
755				pool.submit_one(
756					&han_of_block0,
757					SOURCE,
758					uxt(Transfer {
759						from: Alice.into(),
760						to: AccountId::from_h256(H256::from_low_u64_be(2)),
761						amount: 5,
762						nonce: 1,
763					})
764					.into(),
765				),
766			)
767			.unwrap()
768			.hash();
769			// future doesn't count
770			let _hash = block_on(
771				pool.submit_one(
772					&han_of_block0,
773					SOURCE,
774					uxt(Transfer {
775						from: Alice.into(),
776						to: AccountId::from_h256(H256::from_low_u64_be(2)),
777						amount: 5,
778						nonce: 3,
779					})
780					.into(),
781				),
782			)
783			.unwrap()
784			.hash();
785
786			assert_eq!(pool.validated_pool().status().ready, 2);
787			assert_eq!(pool.validated_pool().status().future, 1);
788
789			(stream, hash0, hash1)
790		};
791
792		// then
793		let mut it = futures::executor::block_on_stream(stream);
794		assert_eq!(it.next(), Some(hash0));
795		assert_eq!(it.next(), Some(hash1));
796		assert_eq!(it.next(), None);
797	}
798
799	#[test]
800	fn should_clear_stale_transactions() {
801		// given
802		let (pool, api) = pool();
803		let han_of_block0 = api.expect_hash_and_number(0);
804		let hash1 = block_on(
805			pool.submit_one(
806				&han_of_block0,
807				SOURCE,
808				uxt(Transfer {
809					from: Alice.into(),
810					to: AccountId::from_h256(H256::from_low_u64_be(2)),
811					amount: 5,
812					nonce: 0,
813				})
814				.into(),
815			),
816		)
817		.unwrap()
818		.hash();
819		let hash2 = block_on(
820			pool.submit_one(
821				&han_of_block0,
822				SOURCE,
823				uxt(Transfer {
824					from: Alice.into(),
825					to: AccountId::from_h256(H256::from_low_u64_be(2)),
826					amount: 5,
827					nonce: 1,
828				})
829				.into(),
830			),
831		)
832		.unwrap()
833		.hash();
834		let hash3 = block_on(
835			pool.submit_one(
836				&han_of_block0,
837				SOURCE,
838				uxt(Transfer {
839					from: Alice.into(),
840					to: AccountId::from_h256(H256::from_low_u64_be(2)),
841					amount: 5,
842					nonce: 3,
843				})
844				.into(),
845			),
846		)
847		.unwrap()
848		.hash();
849
850		// when
851		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
852
853		// then
854		assert_eq!(pool.validated_pool().ready().count(), 0);
855		assert_eq!(pool.validated_pool().status().future, 0);
856		assert_eq!(pool.validated_pool().status().ready, 0);
857		// make sure they are temporarily banned as well
858		assert!(pool.validated_pool.is_banned(&hash1));
859		assert!(pool.validated_pool.is_banned(&hash2));
860		assert!(pool.validated_pool.is_banned(&hash3));
861	}
862
863	#[test]
864	fn should_ban_mined_transactions() {
865		// given
866		let (pool, api) = pool();
867		let hash1 = block_on(
868			pool.submit_one(
869				&api.expect_hash_and_number(0),
870				SOURCE,
871				uxt(Transfer {
872					from: Alice.into(),
873					to: AccountId::from_h256(H256::from_low_u64_be(2)),
874					amount: 5,
875					nonce: 0,
876				})
877				.into(),
878			),
879		)
880		.unwrap()
881		.hash();
882
883		// when
884		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
885
886		// then
887		assert!(pool.validated_pool.is_banned(&hash1));
888	}
889
890	#[test]
891	fn should_limit_futures() {
892		sp_tracing::try_init_simple();
893
894		let xt = uxt(Transfer {
895			from: Alice.into(),
896			to: AccountId::from_h256(H256::from_low_u64_be(2)),
897			amount: 5,
898			nonce: 1,
899		});
900
901		// given
902		let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
903
904		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
905
906		let api = Arc::new(TestApi::default());
907		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
908
909		let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
910			.unwrap()
911			.hash();
912		assert_eq!(pool.validated_pool().status().future, 1);
913
914		// when
915		let hash2 = block_on(
916			pool.submit_one(
917				&api.expect_hash_and_number(0),
918				SOURCE,
919				uxt(Transfer {
920					from: Bob.into(),
921					to: AccountId::from_h256(H256::from_low_u64_be(2)),
922					amount: 5,
923					nonce: 10,
924				})
925				.into(),
926			),
927		)
928		.unwrap()
929		.hash();
930
931		// then
932		assert_eq!(pool.validated_pool().status().future, 1);
933		assert!(pool.validated_pool.is_banned(&hash1));
934		assert!(!pool.validated_pool.is_banned(&hash2));
935	}
936
937	#[test]
938	fn should_error_if_reject_immediately() {
939		// given
940		let limit = Limit { count: 100, total_bytes: 10 };
941
942		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
943
944		let api = Arc::new(TestApi::default());
945		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
946
947		// when
948		block_on(
949			pool.submit_one(
950				&api.expect_hash_and_number(0),
951				SOURCE,
952				uxt(Transfer {
953					from: Alice.into(),
954					to: AccountId::from_h256(H256::from_low_u64_be(2)),
955					amount: 5,
956					nonce: 1,
957				})
958				.into(),
959			),
960		)
961		.map(|o| o.hash())
962		.unwrap_err();
963
964		// then
965		assert_eq!(pool.validated_pool().status().ready, 0);
966		assert_eq!(pool.validated_pool().status().future, 0);
967	}
968
969	#[test]
970	fn should_reject_transactions_with_no_provides() {
971		// given
972		let (pool, api) = pool();
973
974		// when
975		let err = block_on(
976			pool.submit_one(
977				&api.expect_hash_and_number(0),
978				SOURCE,
979				uxt(Transfer {
980					from: Alice.into(),
981					to: AccountId::from_h256(H256::from_low_u64_be(2)),
982					amount: 5,
983					nonce: INVALID_NONCE,
984				})
985				.into(),
986			),
987		)
988		.map(|o| o.hash())
989		.unwrap_err();
990
991		// then
992		assert_eq!(pool.validated_pool().status().ready, 0);
993		assert_eq!(pool.validated_pool().status().future, 0);
994		assert_matches!(err, error::Error::NoTagsProvided);
995	}
996
997	mod listener {
998		use super::*;
999
1000		#[test]
1001		fn should_trigger_ready_and_finalized() {
1002			// given
1003			let (pool, api) = pool();
1004			let watcher = block_on(
1005				pool.submit_and_watch(
1006					&api.expect_hash_and_number(0),
1007					SOURCE,
1008					uxt(Transfer {
1009						from: Alice.into(),
1010						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1011						amount: 5,
1012						nonce: 0,
1013					})
1014					.into(),
1015				),
1016			)
1017			.unwrap()
1018			.expect_watcher();
1019			assert_eq!(pool.validated_pool().status().ready, 1);
1020			assert_eq!(pool.validated_pool().status().future, 0);
1021
1022			let han_of_block2 = api.expect_hash_and_number(2);
1023
1024			// when
1025			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1026			assert_eq!(pool.validated_pool().status().ready, 0);
1027			assert_eq!(pool.validated_pool().status().future, 0);
1028
1029			// then
1030			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1031			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1032			assert_eq!(
1033				stream.next(),
1034				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1035			);
1036		}
1037
1038		#[test]
1039		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1040			// given
1041			let (pool, api) = pool();
1042			let watcher = block_on(
1043				pool.submit_and_watch(
1044					&api.expect_hash_and_number(0),
1045					SOURCE,
1046					uxt(Transfer {
1047						from: Alice.into(),
1048						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1049						amount: 5,
1050						nonce: 0,
1051					})
1052					.into(),
1053				),
1054			)
1055			.unwrap()
1056			.expect_watcher();
1057			assert_eq!(pool.validated_pool().status().ready, 1);
1058			assert_eq!(pool.validated_pool().status().future, 0);
1059
1060			let han_of_block2 = api.expect_hash_and_number(2);
1061
1062			// when
1063			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1064			assert_eq!(pool.validated_pool().status().ready, 0);
1065			assert_eq!(pool.validated_pool().status().future, 0);
1066
1067			// then
1068			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1069			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1070			assert_eq!(
1071				stream.next(),
1072				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1073			);
1074		}
1075
1076		#[test]
1077		fn should_trigger_future_and_ready_after_promoted() {
1078			// given
1079			let (pool, api) = pool();
1080			let han_of_block0 = api.expect_hash_and_number(0);
1081
1082			let watcher = block_on(
1083				pool.submit_and_watch(
1084					&han_of_block0,
1085					SOURCE,
1086					uxt(Transfer {
1087						from: Alice.into(),
1088						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1089						amount: 5,
1090						nonce: 1,
1091					})
1092					.into(),
1093				),
1094			)
1095			.unwrap()
1096			.expect_watcher();
1097			assert_eq!(pool.validated_pool().status().ready, 0);
1098			assert_eq!(pool.validated_pool().status().future, 1);
1099
1100			// when
1101			block_on(
1102				pool.submit_one(
1103					&han_of_block0,
1104					SOURCE,
1105					uxt(Transfer {
1106						from: Alice.into(),
1107						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1108						amount: 5,
1109						nonce: 0,
1110					})
1111					.into(),
1112				),
1113			)
1114			.unwrap();
1115			assert_eq!(pool.validated_pool().status().ready, 2);
1116
1117			// then
1118			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1119			assert_eq!(stream.next(), Some(TransactionStatus::Future));
1120			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1121		}
1122
1123		#[test]
1124		fn should_trigger_invalid_and_ban() {
1125			// given
1126			let (pool, api) = pool();
1127			let uxt = uxt(Transfer {
1128				from: Alice.into(),
1129				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1130				amount: 5,
1131				nonce: 0,
1132			});
1133			let watcher =
1134				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1135					.unwrap()
1136					.expect_watcher();
1137			assert_eq!(pool.validated_pool().status().ready, 1);
1138
1139			// when
1140			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1141
1142			// then
1143			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1144			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1145			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1146			assert_eq!(stream.next(), None);
1147		}
1148
1149		#[test]
1150		fn should_trigger_broadcasted() {
1151			// given
1152			let (pool, api) = pool();
1153			let uxt = uxt(Transfer {
1154				from: Alice.into(),
1155				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1156				amount: 5,
1157				nonce: 0,
1158			});
1159			let watcher =
1160				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1161					.unwrap()
1162					.expect_watcher();
1163			assert_eq!(pool.validated_pool().status().ready, 1);
1164
1165			// when
1166			let mut map = HashMap::new();
1167			let peers = vec!["a".into(), "b".into(), "c".into()];
1168			map.insert(*watcher.hash(), peers.clone());
1169			pool.validated_pool().on_broadcasted(map);
1170
1171			// then
1172			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1173			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1174			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1175		}
1176
1177		#[test]
1178		fn should_trigger_dropped_older() {
1179			// given
1180			let limit = Limit { count: 1, total_bytes: 1000 };
1181			let options =
1182				Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1183
1184			let api = Arc::new(TestApi::default());
1185			let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1186
1187			let xt = uxt(Transfer {
1188				from: Alice.into(),
1189				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1190				amount: 5,
1191				nonce: 0,
1192			});
1193			let watcher =
1194				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1195					.unwrap()
1196					.expect_watcher();
1197			assert_eq!(pool.validated_pool().status().ready, 1);
1198
1199			// when
1200			let xt = uxt(Transfer {
1201				from: Bob.into(),
1202				to: AccountId::from_h256(H256::from_low_u64_be(1)),
1203				amount: 4,
1204				nonce: 1,
1205			});
1206			block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1207			assert_eq!(pool.validated_pool().status().ready, 1);
1208
1209			// then
1210			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1211			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1212			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1213		}
1214
1215		#[test]
1216		fn should_trigger_dropped_lower_priority() {
1217			{
1218				// given
1219				let limit = Limit { count: 1, total_bytes: 1000 };
1220				let options =
1221					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1222
1223				let api = Arc::new(TestApi::default());
1224				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1225
1226				// after validation `IncludeData` will have priority set to 9001
1227				// (validate_transaction mock)
1228				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1229				block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1230					.unwrap();
1231				assert_eq!(pool.validated_pool().status().ready, 1);
1232
1233				// then
1234				// after validation `Transfer` will have priority set to 4 (validate_transaction
1235				// mock)
1236				let xt = uxt(Transfer {
1237					from: Bob.into(),
1238					to: AccountId::from_h256(H256::from_low_u64_be(1)),
1239					amount: 4,
1240					nonce: 1,
1241				});
1242				let result =
1243					block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1244				assert!(matches!(
1245					result,
1246					Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
1247				));
1248			}
1249			{
1250				// given
1251				let limit = Limit { count: 2, total_bytes: 1000 };
1252				let options =
1253					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1254
1255				let api = Arc::new(TestApi::default());
1256				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1257
1258				let han_of_block0 = api.expect_hash_and_number(0);
1259
1260				// after validation `IncludeData` will have priority set to 9001
1261				// (validate_transaction mock)
1262				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1263				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1264					.unwrap()
1265					.expect_watcher();
1266				assert_eq!(pool.validated_pool().status().ready, 1);
1267
1268				// after validation `Transfer` will have priority set to 4 (validate_transaction
1269				// mock)
1270				let xt = uxt(Transfer {
1271					from: Alice.into(),
1272					to: AccountId::from_h256(H256::from_low_u64_be(2)),
1273					amount: 5,
1274					nonce: 0,
1275				});
1276				let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1277					.unwrap()
1278					.expect_watcher();
1279				assert_eq!(pool.validated_pool().status().ready, 2);
1280
1281				// when
1282				// after validation `Store` will have priority set to 9001 (validate_transaction
1283				// mock)
1284				let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1285				block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1286					.unwrap();
1287				assert_eq!(pool.validated_pool().status().ready, 2);
1288
1289				// then
1290				let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1291				assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1292				assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1293			}
1294		}
1295
1296		#[test]
1297		fn should_handle_pruning_in_the_middle_of_import() {
1298			// given
1299			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1300			let (tx, rx) = std::sync::mpsc::sync_channel(1);
1301			let mut api = TestApi::default();
1302			api.delay = Arc::new(Mutex::new(rx.into()));
1303			let api = Arc::new(api);
1304			let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1305				Default::default(),
1306				true.into(),
1307				api.clone(),
1308			));
1309
1310			let han_of_block0 = api.expect_hash_and_number(0);
1311
1312			// when
1313			let xt = uxt(Transfer {
1314				from: Alice.into(),
1315				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1316				amount: 5,
1317				nonce: 1,
1318			});
1319
1320			// This transaction should go to future, since we use `nonce: 1`
1321			let pool2 = pool.clone();
1322			std::thread::spawn({
1323				let hash_of_block0 = han_of_block0.clone();
1324				move || {
1325					block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1326					ready.send(()).unwrap();
1327				}
1328			});
1329
1330			// But now before the previous one is imported we import
1331			// the one that it depends on.
1332			let xt = uxt(Transfer {
1333				from: Alice.into(),
1334				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1335				amount: 4,
1336				nonce: 0,
1337			});
1338			// The tag the above transaction provides (TestApi is using just nonce as u8)
1339			let provides = vec![0_u8];
1340			block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1341			assert_eq!(pool.validated_pool().status().ready, 1);
1342
1343			// Now block import happens before the second transaction is able to finish
1344			// verification.
1345			block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1346			assert_eq!(pool.validated_pool().status().ready, 0);
1347
1348			// so when we release the verification of the previous one it will have
1349			// something in `requires`, but should go to ready directly, since the previous
1350			// transaction was imported correctly.
1351			tx.send(()).unwrap();
1352
1353			// then
1354			is_ready.recv().unwrap(); // wait for finish
1355			assert_eq!(pool.validated_pool().status().ready, 1);
1356			assert_eq!(pool.validated_pool().status().future, 0);
1357		}
1358	}
1359}