referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
tx_mem_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction memory pool, container for watched and unwatched transactions.
20//! Acts as a buffer which collect transactions before importing them to the views. Following are
21//! the crucial use cases when it is needed:
22//! - empty pool (no views yet)
23//! - potential races between creation of view and submitting transaction (w/o intermediary buffer
24//!   some transactions could be lost)
25//! - the transaction can be invalid on some forks (and thus the associated views may not contain
26//!   it), while on other forks tx can be valid. Depending on which view is chosen to be cloned,
27//!   such transaction could not be present in the newly created view.
28//!
29//! Sync methods (with `_sync` suffix) are also exposed, and it should be safe to call them from
30//! sync or non-tokio contenxt. These methods are required for implementing some non-async methods.
31//! See <https://github.com/paritytech/polkadot-sdk/issues/8912> for some more information. The implementation of the
32//! bridging is based on passing messages from sync context to tokio thread.
33
34use futures::{future::join_all, FutureExt};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
38use sp_blockchain::HashAndNumber;
39use sp_runtime::{
40	traits::Block as BlockT,
41	transaction_validity::{InvalidTransaction, TransactionValidityError},
42};
43use std::{
44	collections::HashSet,
45	future::Future,
46	pin::Pin,
47	sync::{
48		atomic::{self, AtomicU64},
49		mpsc::{
50			channel as sync_bridge_channel, Receiver as SyncBridgeReceiver,
51			Sender as SyncBridgeSender,
52		},
53		Arc,
54	},
55	time::Instant,
56};
57use tracing::{debug, trace};
58
59use crate::{
60	common::tracing_log_xt::log_xt_trace,
61	graph,
62	graph::{base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash},
63	ValidateTransactionPriority, LOG_TARGET,
64};
65
66use super::{
67	metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
68	view_store::ViewStore,
69};
70
71mod tx_mem_pool_map;
72
73/// The minimum interval between single transaction revalidations. Given in blocks.
74pub(crate) const TXMEMPOOL_REVALIDATION_PERIOD: u64 = 10;
75
76/// The number of transactions revalidated in single revalidation batch.
77pub(crate) const TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE: usize = 1000;
78
79const SYNC_BRIDGE_EXPECT: &str = "The mempool blocking task shall not be terminated. qed.";
80
81/// Represents the transaction in the intermediary buffer.
82pub(crate) struct TxInMemPool<ChainApi, Block>
83where
84	Block: BlockT,
85	ChainApi: graph::ChainApi<Block = Block> + 'static,
86{
87	/// Is the progress of transaction watched.
88	///
89	/// Indicates if transaction was sent with `submit_and_watch`. Serves only stats/testing
90	/// purposes.
91	watched: bool,
92	/// Extrinsic actual body.
93	tx: ExtrinsicFor<ChainApi>,
94	/// Size of the extrinsics actual body.
95	bytes: usize,
96	/// Transaction source.
97	source: TimedTransactionSource,
98	/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
99	validated_at: AtomicU64,
100	/// Priority of transaction at some block. It is assumed it will not be changed often. None if
101	/// not known.
102	priority: RwLock<Option<TransactionPriority>>,
103}
104
105impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
106where
107	Block: BlockT,
108	ChainApi: graph::ChainApi<Block = Block> + 'static,
109{
110	/// Shall the progress of transaction be watched.
111	///
112	/// Was transaction sent with `submit_and_watch`.
113	pub(crate) fn is_watched(&self) -> bool {
114		self.watched
115	}
116
117	/// Creates a new instance of wrapper for unwatched transaction.
118	fn new_unwatched(
119		source: TransactionSource,
120		tx: ExtrinsicFor<ChainApi>,
121		bytes: usize,
122		validated_at: u64,
123	) -> Self {
124		Self::new(false, source, tx, bytes, validated_at)
125	}
126
127	/// Creates a new instance of wrapper for watched transaction.
128	fn new_watched(
129		source: TransactionSource,
130		tx: ExtrinsicFor<ChainApi>,
131		bytes: usize,
132		validated_at: u64,
133	) -> Self {
134		Self::new(true, source, tx, bytes, validated_at)
135	}
136
137	/// Creates a new instance of wrapper for a transaction with no priority.
138	fn new(
139		watched: bool,
140		source: TransactionSource,
141		tx: ExtrinsicFor<ChainApi>,
142		bytes: usize,
143		validated_at: u64,
144	) -> Self {
145		Self::new_with_optional_priority(watched, source, tx, bytes, None, validated_at)
146	}
147
148	/// Creates a new instance of wrapper for a transaction with given priority.
149	fn new_with_priority(
150		watched: bool,
151		source: TransactionSource,
152		tx: ExtrinsicFor<ChainApi>,
153		bytes: usize,
154		priority: TransactionPriority,
155		validated_at: u64,
156	) -> Self {
157		Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority), validated_at)
158	}
159
160	/// Creates a new instance of wrapper for a transaction with optional priority.
161	fn new_with_optional_priority(
162		watched: bool,
163		source: TransactionSource,
164		tx: ExtrinsicFor<ChainApi>,
165		bytes: usize,
166		priority: Option<TransactionPriority>,
167		validated_at: u64,
168	) -> Self {
169		Self {
170			watched,
171			tx,
172			source: TimedTransactionSource::from_transaction_source(source, true),
173			validated_at: AtomicU64::new(validated_at),
174			bytes,
175			priority: priority.into(),
176		}
177	}
178
179	/// Provides a clone of actual transaction body.
180	///
181	/// Operation is cheap, as the body is `Arc`.
182	pub(crate) fn tx(&self) -> ExtrinsicFor<ChainApi> {
183		self.tx.clone()
184	}
185
186	/// Returns the source of the transaction.
187	pub(crate) fn source(&self) -> TimedTransactionSource {
188		self.source.clone()
189	}
190
191	/// Returns the priority of the transaction.
192	pub(crate) fn priority(&self) -> Option<TransactionPriority> {
193		*self.priority.read()
194	}
195}
196
197impl<ChainApi, Block> std::fmt::Debug for TxInMemPool<ChainApi, Block>
198where
199	Block: BlockT,
200	ChainApi: graph::ChainApi<Block = Block> + 'static,
201{
202	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203		f.debug_struct("TxInMemPool")
204			.field("watched", &self.watched)
205			.field("tx", &"...")
206			.field("bytes", &self.bytes)
207			.field("source", &self.source)
208			.field("validated_at", &self.validated_at)
209			.field("priority", &self.priority)
210			.finish()
211	}
212}
213
214impl<ChainApi, Block> std::cmp::PartialEq for TxInMemPool<ChainApi, Block>
215where
216	Block: BlockT,
217	ChainApi: graph::ChainApi<Block = Block> + 'static,
218{
219	fn eq(&self, other: &Self) -> bool {
220		self.watched == other.watched &&
221			self.tx == other.tx &&
222			self.bytes == other.bytes &&
223			self.source == other.source &&
224			*self.priority.read() == *other.priority.read() &&
225			self.validated_at.load(atomic::Ordering::Relaxed) ==
226				other.validated_at.load(atomic::Ordering::Relaxed)
227	}
228}
229
230#[derive(Debug, Clone, Copy, Eq, PartialEq)]
231struct MempoolTxPriority(pub Option<TransactionPriority>);
232
233impl Ord for MempoolTxPriority {
234	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
235		match (&self.0, &other.0) {
236			(Some(a), Some(b)) => a.cmp(b),
237			(Some(_), None) => std::cmp::Ordering::Less,
238			(None, Some(_)) => std::cmp::Ordering::Greater,
239			(None, None) => std::cmp::Ordering::Equal,
240		}
241	}
242}
243impl From<Option<TransactionPriority>> for MempoolTxPriority {
244	fn from(value: Option<TransactionPriority>) -> Self {
245		MempoolTxPriority(value)
246	}
247}
248
249impl PartialOrd for MempoolTxPriority {
250	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
251		Some(self.cmp(other))
252	}
253}
254
255impl<ChainApi, Block> tx_mem_pool_map::Size for Arc<TxInMemPool<ChainApi, Block>>
256where
257	Block: BlockT,
258	ChainApi: graph::ChainApi<Block = Block> + 'static,
259{
260	fn size(&self) -> usize {
261		self.bytes
262	}
263}
264
265impl<ChainApi, Block> tx_mem_pool_map::PriorityAndTimestamp for Arc<TxInMemPool<ChainApi, Block>>
266where
267	Block: BlockT,
268	ChainApi: graph::ChainApi<Block = Block> + 'static,
269{
270	type Priority = MempoolTxPriority;
271	type Timestamp = Option<Instant>;
272
273	fn priority(&self) -> Self::Priority {
274		TxInMemPool::priority(self).into()
275	}
276
277	fn timestamp(&self) -> Self::Timestamp {
278		self.source().timestamp
279	}
280}
281
282type InternalTxMemPoolMap<ChainApi, Block> = tx_mem_pool_map::SizeTrackedStore<
283	ExtrinsicHash<ChainApi>,
284	tx_mem_pool_map::PriorityKey<MempoolTxPriority, Option<Instant>>,
285	Arc<TxInMemPool<ChainApi, Block>>,
286>;
287
288/// Internal (blocking) task for bridging sync and async code.
289///
290/// Should be polled in blocking task.
291pub type TxMemPoolBlockingTask = Pin<Box<dyn Future<Output = ()> + Send>>;
292
293/// An intermediary transactions buffer.
294///
295/// Keeps all the transaction which are potentially valid. Transactions that were finalized or
296/// transactions that are invalid at finalized blocks are removed, either while handling the
297/// `Finalized` event, or during revalidation process.
298///
299/// All transactions from  a`TxMemPool` are submitted to the newly created views.
300///
301/// All newly submitted transactions goes into the `TxMemPool`.
302pub(super) struct TxMemPool<ChainApi, Block>
303where
304	Block: BlockT,
305	ChainApi: graph::ChainApi<Block = Block> + 'static,
306{
307	/// A shared API instance necessary for blockchain related operations.
308	api: Arc<ChainApi>,
309
310	/// A shared instance of the `MultiViewListener`.
311	///
312	/// Provides a side-channel allowing to send per-transaction state changes notification.
313	listener: Arc<MultiViewListener<ChainApi>>,
314
315	/// Channel used to send the requests from the sync code.
316	sync_channel: SyncBridgeSender<TxMemPoolSyncRequest<ChainApi, Block>>,
317
318	///  A map that stores the transactions currently in the memory pool.
319	///
320	///  The key is the hash of the transaction, and the value is a wrapper
321	///  structure, which contains the mempool specific details of the transaction.
322	transactions: InternalTxMemPoolMap<ChainApi, Block>,
323
324	/// Prometheus's metrics endpoint.
325	metrics: PrometheusMetrics,
326
327	/// Indicates the maximum number of transactions that can be maintained in the memory pool.
328	max_transactions_count: usize,
329
330	/// Maximal size of encodings of all transactions in the memory pool.
331	max_transactions_total_bytes: usize,
332}
333
334/// Helper structure to encapsulate a result of [`TxMemPool::try_insert`].
335#[derive(Debug)]
336pub(super) struct InsertionInfo<Hash> {
337	pub(super) hash: Hash,
338	pub(super) source: TimedTransactionSource,
339	pub(super) removed: Vec<Hash>,
340}
341
342impl<Hash> InsertionInfo<Hash> {
343	fn new(hash: Hash, source: TimedTransactionSource) -> Self {
344		Self::new_with_removed(hash, source, Default::default())
345	}
346	fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
347		Self { hash, source, removed }
348	}
349}
350
351impl<ChainApi, Block> TxMemPool<ChainApi, Block>
352where
353	Block: BlockT,
354	ChainApi: graph::ChainApi<Block = Block> + 'static,
355	<Block as BlockT>::Hash: Unpin,
356{
357	/// Creates a new `TxMemPool` instance with the given API, listener, metrics,
358	/// and max transaction count.
359	pub(super) fn new(
360		api: Arc<ChainApi>,
361		listener: Arc<MultiViewListener<ChainApi>>,
362		metrics: PrometheusMetrics,
363		max_transactions_count: usize,
364		max_transactions_total_bytes: usize,
365	) -> (Self, TxMemPoolBlockingTask) {
366		let (sync_channel, rx) = sync_bridge_channel();
367		let task = Self::sync_bridge_task(rx);
368		(
369			Self {
370				api,
371				listener,
372				sync_channel,
373				transactions: Default::default(),
374				metrics,
375				max_transactions_count,
376				max_transactions_total_bytes,
377			},
378			task.boxed(),
379		)
380	}
381
382	/// Creates a new `TxMemPool` instance for testing purposes.
383	#[cfg(test)]
384	fn new_test(
385		api: Arc<ChainApi>,
386		max_transactions_count: usize,
387		max_transactions_total_bytes: usize,
388	) -> Self {
389		let (sync_channel, rx) = sync_bridge_channel();
390		tokio::task::spawn_blocking(move || Self::sync_bridge_task(rx));
391		Self {
392			api,
393			listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0),
394			transactions: Default::default(),
395			metrics: Default::default(),
396			sync_channel,
397			max_transactions_count,
398			max_transactions_total_bytes,
399		}
400	}
401
402	/// Retrieves a transaction by its hash if it exists in the memory pool.
403	pub(super) async fn get_by_hash(
404		&self,
405		hash: ExtrinsicHash<ChainApi>,
406	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
407		self.transactions.read().await.get(&hash).map(Clone::clone)
408	}
409
410	/// Returns a tuple with the count of unwatched and watched transactions in the memory pool.
411	pub async fn unwatched_and_watched_count(&self) -> (usize, usize) {
412		let transactions = self.transactions.read().await;
413		let watched_count = transactions.values().filter(|t| t.is_watched()).count();
414		(transactions.len() - watched_count, watched_count)
415	}
416
417	/// Returns a total number of transactions kept within mempool.
418	pub fn len(&self) -> usize {
419		self.transactions.len()
420	}
421
422	/// Returns the number of bytes used by all extrinsics in the the pool.
423	#[cfg(test)]
424	pub fn bytes(&self) -> usize {
425		return self.transactions.bytes()
426	}
427
428	/// Returns true if provided values would exceed defined limits.
429	fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
430		length > self.max_transactions_count ||
431			current_total_bytes > self.max_transactions_total_bytes
432	}
433
434	/// Attempts to insert a transaction into the memory pool, ensuring it does not
435	/// exceed the maximum allowed transaction count.
436	async fn try_insert(
437		&self,
438		tx_hash: ExtrinsicHash<ChainApi>,
439		tx: TxInMemPool<ChainApi, Block>,
440	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
441		let mut transactions = self.transactions.write().await;
442
443		let bytes = self.transactions.bytes();
444
445		let result = match (
446			self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
447			transactions.contains_key(&tx_hash),
448		) {
449			(false, false) => {
450				let source = tx.source();
451				transactions.insert(tx_hash, Arc::from(tx));
452				Ok(InsertionInfo::new(tx_hash, source))
453			},
454			(_, true) =>
455				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(tx_hash))),
456			(true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
457		};
458		trace!(
459			target: LOG_TARGET,
460			?tx_hash,
461			result_hash = ?result.as_ref().map(|r| r.hash),
462			"mempool::try_insert"
463		);
464		result
465	}
466
467	/// Attempts to insert a new transaction in the memory pool and drop some worse existing
468	/// transactions.
469	///
470	/// A "worse" transaction means transaction with lower priority, or older transaction with the
471	/// same prio.
472	///
473	/// This operation will not overflow the limit of the mempool. It means that cumulative
474	/// size of removed transactions will be equal (or greated) then size of newly inserted
475	/// transaction.
476	///
477	/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
478	/// inserted; otherwise, returns an appropriate error indicating the failure.
479	pub(super) async fn try_insert_with_replacement(
480		&self,
481		new_tx: ExtrinsicFor<ChainApi>,
482		priority: TransactionPriority,
483		source: TransactionSource,
484		validated_at: u64,
485		watched: bool,
486	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
487		let (hash, length) = self.api.hash_and_length(&new_tx);
488		let new_tx =
489			TxInMemPool::new_with_priority(watched, source, new_tx, length, priority, validated_at);
490		if new_tx.bytes > self.max_transactions_total_bytes {
491			return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
492		}
493
494		let mut transactions = self.transactions.write().await;
495
496		if transactions.contains_key(&hash) {
497			return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
498		}
499
500		// When pushing higher prio transaction, we need to find a number of lower prio txs, such
501		// that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size
502		// limits. Naive way to do it - rev-sort by priority and eat the tail.
503
504		// reverse (oldest, lowest prio last)
505		let source = new_tx.source();
506		let new_tx = Arc::new(new_tx);
507		let insertion_result = transactions.try_insert_with_replacement(
508			self.max_transactions_total_bytes,
509			hash,
510			new_tx,
511		);
512		debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
513		match insertion_result {
514			None => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
515			Some(to_be_removed) => Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed)),
516		}
517	}
518
519	/// Adds a new unwatched transactions to the internal buffer not exceeding the limit.
520	///
521	/// Returns the vector of results for each transaction, the order corresponds to the input
522	/// vector.
523	pub(super) async fn extend_unwatched(
524		&self,
525		source: TransactionSource,
526		validated_at: u64,
527		xts: &[ExtrinsicFor<ChainApi>],
528	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
529	{
530		let insert_futures = xts.into_iter().map(|xt| {
531			let api = self.api.clone();
532			let xt = xt.clone();
533			async move {
534				let (hash, length) = api.hash_and_length(&xt);
535				self.try_insert(hash, TxInMemPool::new_unwatched(source, xt, length, validated_at))
536					.await
537			}
538		});
539
540		join_all(insert_futures).await
541	}
542
543	/// Adds a new watched transaction to the memory pool if it does not exceed the maximum allowed
544	/// transaction count.
545	pub(super) async fn push_watched(
546		&self,
547		source: TransactionSource,
548		validated_at: u64,
549		xt: ExtrinsicFor<ChainApi>,
550	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
551		let (hash, length) = self.api.hash_and_length(&xt);
552		self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length, validated_at))
553			.await
554	}
555
556	/// Provides read-only access to all transctions via an iterator.
557	///
558	/// This function allows to iterate over all stored transaction without cloning.
559	/// The provided closure receives an iterator over references to keys and values.
560	///
561	/// Note: Typically some filtering should be applied and required items can be cloned and return
562	/// outside the closure if required. Transacaction are `Arc` so clone shall be cheap.
563	pub(super) async fn with_transactions<F, R>(&self, f: F) -> R
564	where
565		F: Fn(
566			std::collections::hash_map::Iter<
567				ExtrinsicHash<ChainApi>,
568				Arc<TxInMemPool<ChainApi, Block>>,
569			>,
570		) -> R,
571	{
572		self.transactions.read().await.with_items(f)
573	}
574
575	/// Removes transactions with given hashes from the memory pool.
576	pub(super) async fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) {
577		log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction");
578		let mut transactions = self.transactions.write().await;
579		for tx_hash in tx_hashes {
580			transactions.remove(tx_hash);
581		}
582	}
583
584	/// Revalidates a batch of transactions against the provided finalized block.
585	///
586	/// Returns a vector of invalid transaction hashes.
587	async fn revalidate_inner(&self, finalized_block: HashAndNumber<Block>) -> Vec<Block::Hash> {
588		trace!(
589			target: LOG_TARGET,
590			?finalized_block,
591			"mempool::revalidate_inner"
592		);
593		let start = Instant::now();
594
595		let (total_count, to_be_validated) = {
596			(
597				self.transactions.len(),
598				self.with_transactions(|iter| {
599					iter.filter(|(_, xt)| {
600						let finalized_block_number = finalized_block.number.into().as_u64();
601						xt.validated_at.load(atomic::Ordering::Relaxed) +
602							TXMEMPOOL_REVALIDATION_PERIOD <
603							finalized_block_number
604					})
605					.sorted_by_key(|(_, tx)| tx.validated_at.load(atomic::Ordering::Relaxed))
606					.take(TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE)
607					.map(|(k, v)| (*k, v.clone()))
608					.collect::<Vec<_>>()
609				})
610				.await,
611			)
612		};
613
614		let validations_futures = to_be_validated.into_iter().map(|(xt_hash, xt)| {
615			self.api
616				.validate_transaction(
617					finalized_block.hash,
618					xt.source.clone().into(),
619					xt.tx(),
620					ValidateTransactionPriority::Maintained,
621				)
622				.map(move |validation_result| {
623					xt.validated_at
624						.store(finalized_block.number.into().as_u64(), atomic::Ordering::Relaxed);
625					(xt_hash, validation_result)
626				})
627		});
628		let validation_results = futures::future::join_all(validations_futures).await;
629		let validated_count = validation_results.len();
630
631		let duration = start.elapsed();
632
633		let invalid_hashes = validation_results
634			.into_iter()
635			.filter_map(|(tx_hash, validation_result)| match validation_result {
636				Ok(Ok(_)) |
637				Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => None,
638				Err(_) |
639				Ok(Err(TransactionValidityError::Unknown(_))) |
640				Ok(Err(TransactionValidityError::Invalid(_))) => {
641					trace!(
642						target: LOG_TARGET,
643						?tx_hash,
644						?validation_result,
645						"mempool::revalidate_inner invalid"
646					);
647					Some(tx_hash)
648				},
649			})
650			.collect::<Vec<_>>();
651
652		debug!(
653			target: LOG_TARGET,
654			?finalized_block,
655			validated_count,
656			total_count,
657			invalid_hashes = invalid_hashes.len(),
658			?duration,
659			"mempool::revalidate_inner"
660		);
661
662		invalid_hashes
663	}
664
665	/// Removes the finalized transactions from the memory pool, using a provided list of hashes.
666	pub(super) async fn purge_finalized_transactions(
667		&self,
668		finalized_xts: &Vec<ExtrinsicHash<ChainApi>>,
669	) {
670		debug!(
671			target: LOG_TARGET,
672			count = finalized_xts.len(),
673			"purge_finalized_transactions"
674		);
675		log_xt_trace!(target: LOG_TARGET, finalized_xts, "purged finalized transactions");
676		let mut transactions = self.transactions.write().await;
677		finalized_xts.iter().for_each(|t| {
678			transactions.remove(t);
679		});
680	}
681
682	/// Revalidates transactions in the memory pool against a given finalized block and removes
683	/// invalid ones.
684	pub(super) async fn revalidate(
685		&self,
686		view_store: Arc<ViewStore<ChainApi, Block>>,
687		finalized_block: HashAndNumber<Block>,
688	) {
689		let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
690
691		let mut invalid_hashes_subtrees =
692			revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>();
693		for tx in &revalidated_invalid_hashes {
694			invalid_hashes_subtrees.extend(
695				view_store
696					.remove_transaction_subtree(*tx, |_, _| {})
697					.into_iter()
698					.map(|tx| tx.hash),
699			);
700		}
701
702		{
703			let mut transactions = self.transactions.write().await;
704			invalid_hashes_subtrees.iter().for_each(|tx_hash| {
705				transactions.remove(&tx_hash);
706			});
707		};
708
709		self.metrics.report(|metrics| {
710			metrics
711				.mempool_revalidation_invalid_txs
712				.inc_by(invalid_hashes_subtrees.len() as _)
713		});
714
715		let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
716		let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();
717
718		let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::<Vec<_>>();
719
720		//note: here the consistency is assumed: it is expected that transaction will be
721		// actually removed from the listener with Invalid event. This means assumption that no view
722		// is referencing tx as ready.
723		self.listener.transactions_invalidated(&invalid_hashes_subtrees);
724		view_store
725			.import_notification_sink
726			.clean_notified_items(&invalid_hashes_subtrees);
727		view_store
728			.dropped_stream_controller
729			.remove_transactions(invalid_hashes_subtrees);
730
731		trace!(
732			target: LOG_TARGET,
733			?finalized_block,
734			revalidated_invalid_hashes_len,
735			invalid_hashes_subtrees_len,
736			"mempool::revalidate"
737		);
738	}
739
740	/// Updates the priority of transaction stored in mempool using provided priority.
741	pub(super) async fn update_transaction_priority(
742		&self,
743		hash: ExtrinsicHash<ChainApi>,
744		prio: Option<TransactionPriority>,
745	) {
746		if let Some(priority) = prio {
747			let mut transactions = self.transactions.write().await;
748
749			transactions.update_item(&hash, |t| {
750				*t.priority.write() = Some(priority);
751			});
752		}
753	}
754
755	/// Counts the number of transactions in the provided iterator of hashes
756	/// that are not known to the pool.
757	pub(super) async fn count_unknown_transactions<'a>(
758		&self,
759		hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>,
760	) -> usize {
761		let transactions = self.transactions.read().await;
762		hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count()
763	}
764}
765
766/// Convenient return type of extend_unwatched
767type ExtendUnwatchedResult<ChainApi> =
768	Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>;
769
770/// Convenient return type of try_insert_with_replacement
771type TryInsertWithReplacementResult<ChainApi> =
772	Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>;
773
774/// Helper enum defining what requests can be made from sync code.
775enum TxMemPoolSyncRequest<ChainApi, Block>
776where
777	Block: BlockT,
778	ChainApi: graph::ChainApi<Block = Block> + 'static,
779{
780	RemoveTransactions(
781		Arc<TxMemPool<ChainApi, Block>>,
782		Vec<ExtrinsicHash<ChainApi>>,
783		SyncBridgeSender<()>,
784	),
785	ExtendUnwatched(
786		Arc<TxMemPool<ChainApi, Block>>,
787		TransactionSource,
788		u64,
789		Vec<ExtrinsicFor<ChainApi>>,
790		SyncBridgeSender<ExtendUnwatchedResult<ChainApi>>,
791	),
792	UpdateTransactionPriority(
793		Arc<TxMemPool<ChainApi, Block>>,
794		ExtrinsicHash<ChainApi>,
795		Option<TransactionPriority>,
796		SyncBridgeSender<()>,
797	),
798	TryInsertWithReplacement(
799		Arc<TxMemPool<ChainApi, Block>>,
800		ExtrinsicFor<ChainApi>,
801		TransactionPriority,
802		TransactionSource,
803		u64,
804		bool,
805		SyncBridgeSender<TryInsertWithReplacementResult<ChainApi>>,
806	),
807}
808
809impl<ChainApi, Block> TxMemPoolSyncRequest<ChainApi, Block>
810where
811	Block: BlockT,
812	ChainApi: graph::ChainApi<Block = Block> + 'static,
813{
814	fn remove_transactions(
815		mempool: Arc<TxMemPool<ChainApi, Block>>,
816		hashes: Vec<ExtrinsicHash<ChainApi>>,
817	) -> (SyncBridgeReceiver<()>, Self) {
818		let (tx, rx) = sync_bridge_channel();
819		(rx, Self::RemoveTransactions(mempool, hashes, tx))
820	}
821
822	fn extend_unwatched(
823		mempool: Arc<TxMemPool<ChainApi, Block>>,
824		source: TransactionSource,
825		validated_at: u64,
826		xts: Vec<ExtrinsicFor<ChainApi>>,
827	) -> (SyncBridgeReceiver<ExtendUnwatchedResult<ChainApi>>, Self) {
828		let (tx, rx) = sync_bridge_channel();
829		(rx, Self::ExtendUnwatched(mempool, source, validated_at, xts, tx))
830	}
831
832	fn update_transaction_priority(
833		mempool: Arc<TxMemPool<ChainApi, Block>>,
834		hash: ExtrinsicHash<ChainApi>,
835		prio: Option<TransactionPriority>,
836	) -> (SyncBridgeReceiver<()>, Self) {
837		let (tx, rx) = sync_bridge_channel();
838		(rx, Self::UpdateTransactionPriority(mempool, hash, prio, tx))
839	}
840
841	fn try_insert_with_replacement(
842		mempool: Arc<TxMemPool<ChainApi, Block>>,
843		new_tx: ExtrinsicFor<ChainApi>,
844		priority: TransactionPriority,
845		source: TransactionSource,
846		validated_at: u64,
847		watched: bool,
848	) -> (SyncBridgeReceiver<TryInsertWithReplacementResult<ChainApi>>, Self) {
849		let (tx, rx) = sync_bridge_channel();
850		(
851			rx,
852			Self::TryInsertWithReplacement(
853				mempool,
854				new_tx,
855				priority,
856				source,
857				validated_at,
858				watched,
859				tx,
860			),
861		)
862	}
863}
864
865impl<ChainApi, Block> TxMemPool<ChainApi, Block>
866where
867	Block: BlockT,
868	ChainApi: graph::ChainApi<Block = Block> + 'static,
869	<Block as BlockT>::Hash: Unpin,
870{
871	async fn sync_bridge_task(rx: SyncBridgeReceiver<TxMemPoolSyncRequest<ChainApi, Block>>) {
872		for request in rx {
873			Self::handle_request(request).await;
874		}
875	}
876
877	async fn handle_request(request: TxMemPoolSyncRequest<ChainApi, Block>) {
878		match request {
879			TxMemPoolSyncRequest::RemoveTransactions(mempool, hashes, tx) => {
880				mempool.remove_transactions(&hashes).await;
881				if let Err(error) = tx.send(()) {
882					debug!(target: LOG_TARGET, ?error, "RemoveTransaction: sending response failed");
883				}
884			},
885			TxMemPoolSyncRequest::ExtendUnwatched(mempool, source, validated_at, txs, tx) => {
886				let result = mempool.extend_unwatched(source, validated_at, &txs).await;
887				if let Err(error) = tx.send(result) {
888					debug!(target: LOG_TARGET, ?error, "ExtendUnwatched: sending response failed");
889				}
890			},
891			TxMemPoolSyncRequest::UpdateTransactionPriority(mempool, hash, prio, tx) => {
892				let result = mempool.update_transaction_priority(hash, prio).await;
893				if let Err(error) = tx.send(result) {
894					debug!(target: LOG_TARGET, ?error, "UpdateTransactionPriority2: sending response failed");
895				}
896			},
897			TxMemPoolSyncRequest::TryInsertWithReplacement(
898				mempool,
899				new_tx,
900				priority,
901				source,
902				validated_at,
903				watched,
904				tx,
905			) => {
906				let result = mempool
907					.try_insert_with_replacement(new_tx, priority, source, validated_at, watched)
908					.await;
909				if let Err(error) = tx.send(result) {
910					debug!(target: LOG_TARGET, ?error, "TryInsertWithReplacementSync: sending response failed");
911				}
912			},
913		}
914	}
915
916	pub(super) fn try_insert_with_replacement_sync(
917		self: Arc<Self>,
918		new_tx: ExtrinsicFor<ChainApi>,
919		priority: TransactionPriority,
920		source: TransactionSource,
921		validated_at: u64,
922		watched: bool,
923	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
924		let (response, request) = TxMemPoolSyncRequest::try_insert_with_replacement(
925			self.clone(),
926			new_tx,
927			priority,
928			source,
929			validated_at,
930			watched,
931		);
932		let _ = self.sync_channel.send(request);
933		response.recv().expect(SYNC_BRIDGE_EXPECT)
934	}
935
936	pub(super) fn extend_unwatched_sync(
937		self: Arc<Self>,
938		source: TransactionSource,
939		validated_at: u64,
940		xts: Vec<ExtrinsicFor<ChainApi>>,
941	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
942	{
943		let (response, request) =
944			TxMemPoolSyncRequest::extend_unwatched(self.clone(), source, validated_at, xts);
945		let _ = self.sync_channel.send(request);
946		response.recv().expect(SYNC_BRIDGE_EXPECT)
947	}
948
949	pub(super) fn remove_transactions_sync(
950		self: Arc<Self>,
951		tx_hashes: Vec<ExtrinsicHash<ChainApi>>,
952	) {
953		let (response, request) =
954			TxMemPoolSyncRequest::remove_transactions(self.clone(), tx_hashes);
955		let _ = self.sync_channel.send(request);
956		response.recv().expect(SYNC_BRIDGE_EXPECT)
957	}
958
959	pub(super) fn update_transaction_priority_sync(
960		self: Arc<Self>,
961		hash: ExtrinsicHash<ChainApi>,
962		prio: Option<TransactionPriority>,
963	) {
964		let (response, request) =
965			TxMemPoolSyncRequest::update_transaction_priority(self.clone(), hash, prio);
966		let _ = self.sync_channel.send(request);
967		response.recv().expect(SYNC_BRIDGE_EXPECT)
968	}
969}
970
971#[cfg(test)]
972mod tx_mem_pool_tests {
973	use futures::future::join_all;
974	use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
975	use substrate_test_runtime_client::Sr25519Keyring::*;
976
977	use crate::{
978		common::tests::TestApi, fork_aware_txpool::view_store::ViewStoreSubmitOutcome,
979		graph::ChainApi,
980	};
981
982	use super::*;
983
984	fn uxt(nonce: u64) -> Extrinsic {
985		crate::common::tests::uxt(Transfer {
986			from: Alice.into(),
987			to: AccountId::from_h256(H256::from_low_u64_be(2)),
988			amount: 5,
989			nonce,
990		})
991	}
992
993	#[tokio::test]
994	async fn extend_unwatched_obeys_limit() {
995		let max = 10;
996		let api = Arc::from(TestApi::default());
997		let mempool = TxMemPool::new_test(api, max, usize::MAX);
998
999		let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1000
1001		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1002		assert!(results.iter().take(max).all(Result::is_ok));
1003		assert!(matches!(
1004			results.into_iter().last().unwrap().unwrap_err(),
1005			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1006		));
1007	}
1008
1009	#[tokio::test]
1010	async fn extend_unwatched_detects_already_imported() {
1011		sp_tracing::try_init_simple();
1012		let max = 10;
1013		let api = Arc::from(TestApi::default());
1014		let mempool = TxMemPool::new_test(api, max, usize::MAX);
1015
1016		let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1017		xts.push(xts.iter().last().unwrap().clone());
1018
1019		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1020		assert!(results.iter().take(max - 1).all(Result::is_ok));
1021		assert!(matches!(
1022			results.into_iter().last().unwrap().unwrap_err(),
1023			sc_transaction_pool_api::error::Error::AlreadyImported(_)
1024		));
1025	}
1026
1027	#[tokio::test]
1028	async fn push_obeys_limit() {
1029		let max = 10;
1030		let api = Arc::from(TestApi::default());
1031		let mempool = TxMemPool::new_test(api, max, usize::MAX);
1032
1033		let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1034
1035		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1036		assert!(results.iter().all(Result::is_ok));
1037
1038		let xt = Arc::from(uxt(98));
1039		let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
1040		assert!(matches!(
1041			result.unwrap_err(),
1042			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1043		));
1044		let xt = Arc::from(uxt(99));
1045		let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
1046		assert!(matches!(
1047			result.pop().unwrap().unwrap_err(),
1048			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1049		));
1050	}
1051
1052	#[tokio::test]
1053	async fn push_detects_already_imported() {
1054		let max = 10;
1055		let api = Arc::from(TestApi::default());
1056		let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
1057
1058		let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1059		let xt0 = xts.iter().last().unwrap().clone();
1060		let xt1 = xts.iter().next().unwrap().clone();
1061
1062		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1063		assert!(results.iter().all(Result::is_ok));
1064
1065		let result = mempool.push_watched(TransactionSource::External, 0, xt0).await;
1066		assert!(matches!(
1067			result.unwrap_err(),
1068			sc_transaction_pool_api::error::Error::AlreadyImported(_)
1069		));
1070		let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt1]).await;
1071		assert!(matches!(
1072			result.pop().unwrap().unwrap_err(),
1073			sc_transaction_pool_api::error::Error::AlreadyImported(_)
1074		));
1075	}
1076
1077	#[tokio::test]
1078	async fn count_works() {
1079		sp_tracing::try_init_simple();
1080		trace!(target:LOG_TARGET,line=line!(),"xxx");
1081
1082		let max = 100;
1083		let api = Arc::from(TestApi::default());
1084		let mempool = TxMemPool::new_test(api, max, usize::MAX);
1085		trace!(target:LOG_TARGET,line=line!(),"xxx");
1086
1087		let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1088		trace!(target:LOG_TARGET,line=line!(),"xxx");
1089
1090		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts0).await;
1091		trace!(target:LOG_TARGET,line=line!(),"xxx");
1092		assert!(results.iter().all(Result::is_ok));
1093		trace!(target:LOG_TARGET,line=line!(),"xxx");
1094
1095		let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
1096		trace!(target:LOG_TARGET,line=line!(),"xxx");
1097		let results = xts1
1098			.into_iter()
1099			.map(|t| mempool.push_watched(TransactionSource::External, 0, t));
1100		trace!(target:LOG_TARGET,line=line!(),"xxx");
1101		let results = join_all(results).await;
1102		trace!(target:LOG_TARGET,line=line!(),"xxx");
1103		assert!(results.iter().all(Result::is_ok));
1104		assert_eq!(mempool.unwatched_and_watched_count().await, (10, 5));
1105	}
1106
1107	/// size of large extrinsic
1108	const LARGE_XT_SIZE: usize = 1129;
1109
1110	fn large_uxt(x: usize) -> Extrinsic {
1111		ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
1112	}
1113
1114	#[tokio::test]
1115	async fn push_obeys_size_limit() {
1116		sp_tracing::try_init_simple();
1117		let max = 10;
1118		let api = Arc::from(TestApi::default());
1119		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1120
1121		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1122
1123		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1124
1125		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1126		assert!(results.iter().all(Result::is_ok));
1127		assert_eq!(mempool.bytes(), total_xts_bytes);
1128
1129		let xt = Arc::from(large_uxt(98));
1130		let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
1131		assert!(matches!(
1132			result.unwrap_err(),
1133			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1134		));
1135
1136		let xt = Arc::from(large_uxt(99));
1137		let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
1138		assert!(matches!(
1139			result.pop().unwrap().unwrap_err(),
1140			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1141		));
1142	}
1143
1144	#[tokio::test]
1145	async fn replacing_txs_works_for_same_tx_size() {
1146		sp_tracing::try_init_simple();
1147		let max = 10;
1148		let api = Arc::from(TestApi::default());
1149		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1150
1151		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1152
1153		let low_prio = 0u64;
1154		let hi_prio = u64::MAX;
1155
1156		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1157		let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1158			.iter()
1159			.map(|t| {
1160				let h = api.hash_and_length(t).0;
1161				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
1162			})
1163			.unzip();
1164
1165		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1166		assert!(results.iter().all(Result::is_ok));
1167		assert_eq!(mempool.bytes(), total_xts_bytes);
1168
1169		for o in submit_outcomes {
1170			mempool.update_transaction_priority(o.hash(), o.priority()).await;
1171		}
1172
1173		let xt = Arc::from(large_uxt(98));
1174		let hash = api.hash_and_length(&xt).0;
1175		let result = mempool
1176			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1177			.await
1178			.unwrap();
1179
1180		assert_eq!(result.hash, hash);
1181		assert_eq!(result.removed, hashes[0..1]);
1182	}
1183
1184	#[tokio::test]
1185	async fn replacing_txs_removes_proper_size_of_txs() {
1186		sp_tracing::try_init_simple();
1187		let max = 10;
1188		let api = Arc::from(TestApi::default());
1189		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1190
1191		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1192
1193		let low_prio = 0u64;
1194		let hi_prio = u64::MAX;
1195
1196		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1197		let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1198			.iter()
1199			.map(|t| {
1200				let h = api.hash_and_length(t).0;
1201				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
1202			})
1203			.unzip();
1204
1205		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1206		assert!(results.iter().all(Result::is_ok));
1207		assert_eq!(mempool.bytes(), total_xts_bytes);
1208		assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
1209
1210		for o in submit_outcomes {
1211			mempool.update_transaction_priority(o.hash(), o.priority()).await;
1212		}
1213
1214		//this one should drop 2 xts (size: 1130):
1215		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
1216		let (hash, length) = api.hash_and_length(&xt);
1217		assert_eq!(length, 1130);
1218		let result = mempool
1219			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1220			.await
1221			.unwrap();
1222
1223		assert_eq!(result.hash, hash);
1224		assert_eq!(result.removed, hashes[0..2]);
1225	}
1226
1227	#[tokio::test]
1228	async fn replacing_txs_removes_proper_size_and_prios() {
1229		sp_tracing::try_init_simple();
1230		const COUNT: usize = 10;
1231		let api = Arc::from(TestApi::default());
1232		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1233
1234		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1235
1236		let hi_prio = u64::MAX;
1237
1238		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1239		let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1240			.iter()
1241			.enumerate()
1242			.map(|(prio, t)| {
1243				let h = api.hash_and_length(t).0;
1244				(ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
1245			})
1246			.unzip();
1247
1248		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1249		assert!(results.iter().all(Result::is_ok));
1250		assert_eq!(mempool.bytes(), total_xts_bytes);
1251
1252		for o in submit_outcomes {
1253			mempool.update_transaction_priority(o.hash(), o.priority()).await;
1254		}
1255
1256		//this one should drop 3 xts (each of size 1129)
1257		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
1258		let (hash, length) = api.hash_and_length(&xt);
1259		// overhead is 105, thus length: 105 + 2154
1260		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
1261		let result = mempool
1262			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1263			.await
1264			.unwrap();
1265
1266		assert_eq!(result.hash, hash);
1267		assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
1268	}
1269
1270	#[tokio::test]
1271	async fn replacing_txs_skips_lower_prio_tx() {
1272		sp_tracing::try_init_simple();
1273		const COUNT: usize = 10;
1274		let api = Arc::from(TestApi::default());
1275		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1276
1277		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1278
1279		let hi_prio = 100u64;
1280		let low_prio = 10u64;
1281
1282		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1283		let submit_outcomes: Vec<ViewStoreSubmitOutcome<TestApi>> = xts
1284			.iter()
1285			.map(|t| {
1286				let h = api.hash_and_length(t).0;
1287				ViewStoreSubmitOutcome::new(h, Some(hi_prio))
1288			})
1289			.collect::<Vec<_>>();
1290
1291		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1292		assert!(results.iter().all(Result::is_ok));
1293		assert_eq!(mempool.bytes(), total_xts_bytes);
1294
1295		for o in submit_outcomes {
1296			mempool.update_transaction_priority(o.hash(), o.priority()).await;
1297		}
1298
1299		let xt = Arc::from(large_uxt(98));
1300		let result = mempool
1301			.try_insert_with_replacement(xt, low_prio, TransactionSource::External, 0, false)
1302			.await;
1303
1304		// lower prio tx is rejected immediately
1305		assert!(matches!(
1306			result.unwrap_err(),
1307			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1308		));
1309	}
1310
1311	#[tokio::test]
1312	async fn replacing_txs_is_skipped_if_prios_are_not_set() {
1313		sp_tracing::try_init_simple();
1314		const COUNT: usize = 10;
1315		let api = Arc::from(TestApi::default());
1316		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1317
1318		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1319
1320		let hi_prio = u64::MAX;
1321
1322		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1323
1324		let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1325		assert!(results.iter().all(Result::is_ok));
1326		assert_eq!(mempool.bytes(), total_xts_bytes);
1327
1328		//this one could drop 3 xts (each of size 1129)
1329		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
1330		let length = api.hash_and_length(&xt).1;
1331		// overhead is 105, thus length: 105 + 2154
1332		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
1333
1334		let result = mempool
1335			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1336			.await;
1337
1338		// we did not update priorities (update_transaction_priority was not called):
1339		assert!(matches!(
1340			result.unwrap_err(),
1341			sc_transaction_pool_api::error::Error::ImmediatelyDropped
1342		));
1343	}
1344}