use std::{
cmp::Ordering,
collections::{HashMap, HashSet},
sync::{
atomic::{self, AtomicU64},
Arc,
},
time::Instant,
};
use futures::FutureExt;
use itertools::Itertools;
use parking_lot::RwLock;
use tracing::{debug, trace};
use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
traits::Block as BlockT,
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use crate::{
common::tracing_log_xt::log_xt_trace,
graph,
graph::{base_pool::TimedTransactionSource, tracked_map::Size, ExtrinsicFor, ExtrinsicHash},
LOG_TARGET,
};
use super::{
metrics::MetricsLink as PrometheusMetrics,
multi_view_listener::MultiViewListener,
view_store::{ViewStore, ViewStoreSubmitOutcome},
};
pub(crate) const TXMEMPOOL_REVALIDATION_PERIOD: u64 = 10;
pub(crate) const TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE: usize = 1000;
pub const TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER: usize = 4;
#[derive(Debug)]
pub(crate) struct TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
watched: bool,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
source: TimedTransactionSource,
validated_at: AtomicU64,
priority: RwLock<Option<TransactionPriority>>,
}
impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
pub(crate) fn is_watched(&self) -> bool {
self.watched
}
fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
Self::new(false, source, tx, bytes)
}
fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
Self::new(true, source, tx, bytes)
}
fn new(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
) -> Self {
Self::new_with_optional_priority(watched, source, tx, bytes, None)
}
fn new_with_priority(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
priority: TransactionPriority,
) -> Self {
Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority))
}
fn new_with_optional_priority(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
priority: Option<TransactionPriority>,
) -> Self {
Self {
watched,
tx,
source: TimedTransactionSource::from_transaction_source(source, true),
validated_at: AtomicU64::new(0),
bytes,
priority: priority.into(),
}
}
pub(crate) fn tx(&self) -> ExtrinsicFor<ChainApi> {
self.tx.clone()
}
pub(crate) fn source(&self) -> TimedTransactionSource {
self.source.clone()
}
pub(crate) fn priority(&self) -> Option<TransactionPriority> {
*self.priority.read()
}
}
impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn size(&self) -> usize {
self.bytes
}
}
type InternalTxMemPoolMap<ChainApi, Block> =
graph::tracked_map::TrackedMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>>;
pub(super) struct TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
transactions: InternalTxMemPoolMap<ChainApi, Block>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
}
#[derive(Debug)]
pub(super) struct InsertionInfo<Hash> {
pub(super) hash: Hash,
pub(super) source: TimedTransactionSource,
pub(super) removed: Vec<Hash>,
}
impl<Hash> InsertionInfo<Hash> {
fn new(hash: Hash, source: TimedTransactionSource) -> Self {
Self::new_with_removed(hash, source, Default::default())
}
fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
Self { hash, source, removed }
}
}
impl<ChainApi, Block> TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> Self {
Self {
api,
listener,
transactions: Default::default(),
metrics,
max_transactions_count,
max_transactions_total_bytes,
}
}
#[cfg(test)]
fn new_test(
api: Arc<ChainApi>,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> Self {
Self {
api,
listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0),
transactions: Default::default(),
metrics: Default::default(),
max_transactions_count,
max_transactions_total_bytes,
}
}
pub(super) fn get_by_hash(
&self,
hash: ExtrinsicHash<ChainApi>,
) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions.read().get(&hash).map(Clone::clone)
}
pub fn unwatched_and_watched_count(&self) -> (usize, usize) {
let transactions = self.transactions.read();
let watched_count = transactions.values().filter(|t| t.is_watched()).count();
(transactions.len() - watched_count, watched_count)
}
pub fn len(&self) -> usize {
self.transactions.read().len()
}
#[cfg(test)]
pub fn bytes(&self) -> usize {
return self.transactions.bytes()
}
fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
length > self.max_transactions_count ||
current_total_bytes > self.max_transactions_total_bytes
}
fn try_insert(
&self,
tx_hash: ExtrinsicHash<ChainApi>,
tx: TxInMemPool<ChainApi, Block>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let mut transactions = self.transactions.write();
let bytes = self.transactions.bytes();
let result = match (
self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
transactions.contains_key(&tx_hash),
) {
(false, false) => {
let source = tx.source();
transactions.insert(tx_hash, Arc::from(tx));
Ok(InsertionInfo::new(tx_hash, source))
},
(_, true) =>
Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(tx_hash))),
(true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
};
trace!(
target: LOG_TARGET,
?tx_hash,
result_hash = ?result.as_ref().map(|r| r.hash),
"mempool::try_insert"
);
result
}
pub(super) fn try_insert_with_replacement(
&self,
new_tx: ExtrinsicFor<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
watched: bool,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let (hash, length) = self.api.hash_and_length(&new_tx);
let new_tx = TxInMemPool::new_with_priority(watched, source, new_tx, length, priority);
if new_tx.bytes > self.max_transactions_total_bytes {
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
}
let mut transactions = self.transactions.write();
if transactions.contains_key(&hash) {
return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
}
let mut sorted = transactions
.iter()
.filter_map(|(h, v)| v.priority().map(|_| (*h, v.clone())))
.collect::<Vec<_>>();
sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) {
Ordering::Equal => match (a.source.timestamp, b.source.timestamp) {
(Some(a), Some(b)) => b.cmp(&a),
_ => Ordering::Equal,
},
ordering => ordering,
});
let mut total_size_removed = 0usize;
let mut to_be_removed = vec![];
let free_bytes = self.max_transactions_total_bytes - self.transactions.bytes();
loop {
let Some((worst_hash, worst_tx)) = sorted.pop() else {
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
};
if worst_tx.priority() >= new_tx.priority() {
return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
}
total_size_removed += worst_tx.bytes;
to_be_removed.push(worst_hash);
if free_bytes + total_size_removed >= new_tx.bytes {
break;
}
}
let source = new_tx.source();
transactions.insert(hash, Arc::from(new_tx));
for worst_hash in &to_be_removed {
transactions.remove(worst_hash);
}
debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed))
}
pub(super) fn extend_unwatched(
&self,
source: TransactionSource,
xts: &[ExtrinsicFor<ChainApi>],
) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
{
let result = xts
.iter()
.map(|xt| {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length))
})
.collect::<Vec<_>>();
result
}
pub(super) fn push_watched(
&self,
source: TransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
}
pub(super) fn clone_transactions(
&self,
) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions.clone_map()
}
pub(super) fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) {
log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction");
let mut transactions = self.transactions.write();
for tx_hash in tx_hashes {
transactions.remove(tx_hash);
}
}
async fn revalidate_inner(&self, finalized_block: HashAndNumber<Block>) -> Vec<Block::Hash> {
trace!(
target: LOG_TARGET,
?finalized_block,
"mempool::revalidate_inner"
);
let start = Instant::now();
let (count, input) = {
let transactions = self.transactions.clone_map();
(
transactions.len(),
transactions
.into_iter()
.filter(|xt| {
let finalized_block_number = finalized_block.number.into().as_u64();
xt.1.validated_at.load(atomic::Ordering::Relaxed) +
TXMEMPOOL_REVALIDATION_PERIOD <
finalized_block_number
})
.sorted_by_key(|tx| tx.1.validated_at.load(atomic::Ordering::Relaxed))
.take(TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE),
)
};
let validations_futures = input.into_iter().map(|(xt_hash, xt)| {
self.api
.validate_transaction(finalized_block.hash, xt.source.clone().into(), xt.tx())
.map(move |validation_result| {
xt.validated_at
.store(finalized_block.number.into().as_u64(), atomic::Ordering::Relaxed);
(xt_hash, validation_result)
})
});
let validation_results = futures::future::join_all(validations_futures).await;
let input_len = validation_results.len();
let duration = start.elapsed();
let invalid_hashes = validation_results
.into_iter()
.filter_map(|(tx_hash, validation_result)| match validation_result {
Ok(Ok(_)) |
Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => None,
Err(_) |
Ok(Err(TransactionValidityError::Unknown(_))) |
Ok(Err(TransactionValidityError::Invalid(_))) => {
trace!(
target: LOG_TARGET,
?tx_hash,
?validation_result,
"mempool::revalidate_inner invalid"
);
Some(tx_hash)
},
})
.collect::<Vec<_>>();
debug!(
target: LOG_TARGET,
?finalized_block,
input_len,
count,
invalid_hashes = invalid_hashes.len(),
?duration,
"mempool::revalidate_inner"
);
invalid_hashes
}
pub(super) async fn purge_finalized_transactions(
&self,
finalized_xts: &Vec<ExtrinsicHash<ChainApi>>,
) {
debug!(
target: LOG_TARGET,
count = finalized_xts.len(),
"purge_finalized_transactions"
);
log_xt_trace!(target: LOG_TARGET, finalized_xts, "purged finalized transactions");
let mut transactions = self.transactions.write();
finalized_xts.iter().for_each(|t| {
transactions.remove(t);
});
}
pub(super) async fn revalidate(
&self,
view_store: Arc<ViewStore<ChainApi, Block>>,
finalized_block: HashAndNumber<Block>,
) {
let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
let mut invalid_hashes_subtrees =
revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>();
for tx in &revalidated_invalid_hashes {
invalid_hashes_subtrees.extend(
view_store
.remove_transaction_subtree(*tx, |_, _| {})
.into_iter()
.map(|tx| tx.hash),
);
}
{
let mut transactions = self.transactions.write();
invalid_hashes_subtrees.iter().for_each(|tx_hash| {
transactions.remove(&tx_hash);
});
};
self.metrics.report(|metrics| {
metrics
.mempool_revalidation_invalid_txs
.inc_by(invalid_hashes_subtrees.len() as _)
});
let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();
self.listener
.transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>());
trace!(
target: LOG_TARGET,
?finalized_block,
revalidated_invalid_hashes_len,
invalid_hashes_subtrees_len,
"mempool::revalidate"
);
}
pub(super) fn update_transaction_priority(&self, outcome: &ViewStoreSubmitOutcome<ChainApi>) {
outcome.priority().map(|priority| {
self.transactions
.write()
.get_mut(&outcome.hash())
.map(|p| *p.priority.write() = Some(priority))
});
}
pub(super) fn count_unknown_transactions<'a>(
&self,
hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>,
) -> usize {
let transactions = self.transactions.read();
hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count()
}
}
#[cfg(test)]
mod tx_mem_pool_tests {
use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
use substrate_test_runtime_client::Sr25519Keyring::*;
use crate::{common::tests::TestApi, graph::ChainApi};
use super::*;
fn uxt(nonce: u64) -> Extrinsic {
crate::common::tests::uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
})
}
#[test]
fn extend_unwatched_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn extend_unwatched_detects_already_imported() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
xts.push(xts.iter().last().unwrap().clone());
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max - 1).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
}
#[test]
fn push_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
let xt = Arc::from(uxt(98));
let result = mempool.push_watched(TransactionSource::External, xt);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn push_detects_already_imported() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let xt0 = xts.iter().last().unwrap().clone();
let xt1 = xts.iter().next().unwrap().clone();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
let result = mempool.push_watched(TransactionSource::External, xt0);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt1]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
}
#[test]
fn count_works() {
let max = 100;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts0);
assert!(results.iter().all(Result::is_ok));
let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
let results = xts1
.into_iter()
.map(|t| mempool.push_watched(TransactionSource::External, t))
.collect::<Vec<_>>();
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.unwatched_and_watched_count(), (10, 5));
}
const LARGE_XT_SIZE: usize = 1129;
fn large_uxt(x: usize) -> Extrinsic {
ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
}
#[test]
fn push_obeys_size_limit() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
let xt = Arc::from(large_uxt(98));
let result = mempool.push_watched(TransactionSource::External, xt);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(large_uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn replacing_txs_works_for_same_tx_size() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let low_prio = 0u64;
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
submit_outcomes
.into_iter()
.for_each(|o| mempool.update_transaction_priority(&o));
let xt = Arc::from(large_uxt(98));
let hash = api.hash_and_length(&xt).0;
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
.unwrap();
assert_eq!(result.hash, hash);
assert_eq!(result.removed, hashes[0..1]);
}
#[test]
fn replacing_txs_removes_proper_size_of_txs() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let low_prio = 0u64;
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
submit_outcomes
.into_iter()
.for_each(|o| mempool.update_transaction_priority(&o));
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
let (hash, length) = api.hash_and_length(&xt);
assert_eq!(length, 1130);
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
.unwrap();
assert_eq!(result.hash, hash);
assert_eq!(result.removed, hashes[0..2]);
}
#[test]
fn replacing_txs_removes_proper_size_and_prios() {
sp_tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
.iter()
.enumerate()
.map(|(prio, t)| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
submit_outcomes
.into_iter()
.for_each(|o| mempool.update_transaction_priority(&o));
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
let (hash, length) = api.hash_and_length(&xt);
assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
.unwrap();
assert_eq!(result.hash, hash);
assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
}
#[test]
fn replacing_txs_skips_lower_prio_tx() {
sp_tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = 100u64;
let low_prio = 10u64;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let submit_outcomes = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
ViewStoreSubmitOutcome::new(h, Some(hi_prio))
})
.collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
submit_outcomes
.into_iter()
.for_each(|o| mempool.update_transaction_priority(&o));
let xt = Arc::from(large_uxt(98));
let result =
mempool.try_insert_with_replacement(xt, low_prio, TransactionSource::External, false);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn replacing_txs_is_skipped_if_prios_are_not_set() {
sp_tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
let length = api.hash_and_length(&xt).1;
assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
let result =
mempool.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
}