use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener};
use crate::{
common::log_xt::log_xt_trace,
graph,
graph::{base_pool::TimedTransactionSource, tracked_map::Size, ExtrinsicFor, ExtrinsicHash},
LOG_TARGET,
};
use futures::FutureExt;
use itertools::Itertools;
use sc_transaction_pool_api::TransactionSource;
use sp_blockchain::HashAndNumber;
use sp_runtime::{
traits::Block as BlockT,
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use std::{
collections::HashMap,
sync::{atomic, atomic::AtomicU64, Arc},
time::Instant,
};
pub(crate) const TXMEMPOOL_REVALIDATION_PERIOD: u64 = 10;
pub(crate) const TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE: usize = 1000;
pub const TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER: usize = 4;
#[derive(Debug)]
pub(crate) struct TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
watched: bool,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
source: TimedTransactionSource,
validated_at: AtomicU64,
}
impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
pub(crate) fn is_watched(&self) -> bool {
self.watched
}
fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
Self {
watched: false,
tx,
source: TimedTransactionSource::from_transaction_source(source, true),
validated_at: AtomicU64::new(0),
bytes,
}
}
fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
Self {
watched: true,
tx,
source: TimedTransactionSource::from_transaction_source(source, true),
validated_at: AtomicU64::new(0),
bytes,
}
}
pub(crate) fn tx(&self) -> ExtrinsicFor<ChainApi> {
self.tx.clone()
}
pub(crate) fn source(&self) -> TimedTransactionSource {
self.source.clone()
}
}
impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn size(&self) -> usize {
self.bytes
}
}
type InternalTxMemPoolMap<ChainApi, Block> =
graph::tracked_map::TrackedMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>>;
pub(super) struct TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
transactions: InternalTxMemPoolMap<ChainApi, Block>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
}
#[derive(Debug)]
pub(super) struct InsertionInfo<Hash> {
pub(super) hash: Hash,
pub(super) source: TimedTransactionSource,
}
impl<Hash> InsertionInfo<Hash> {
fn new(hash: Hash, source: TimedTransactionSource) -> Self {
Self { hash, source }
}
}
impl<ChainApi, Block> TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> Self {
Self {
api,
listener,
transactions: Default::default(),
metrics,
max_transactions_count,
max_transactions_total_bytes,
}
}
#[allow(dead_code)]
fn new_test(
api: Arc<ChainApi>,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> Self {
Self {
api,
listener: Arc::from(MultiViewListener::new()),
transactions: Default::default(),
metrics: Default::default(),
max_transactions_count,
max_transactions_total_bytes,
}
}
pub(super) fn get_by_hash(
&self,
hash: ExtrinsicHash<ChainApi>,
) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions.read().get(&hash).map(Clone::clone)
}
pub fn unwatched_and_watched_count(&self) -> (usize, usize) {
let transactions = self.transactions.read();
let watched_count = transactions.values().filter(|t| t.is_watched()).count();
(transactions.len() - watched_count, watched_count)
}
pub fn len(&self) -> usize {
self.transactions.read().len()
}
#[cfg(test)]
pub fn bytes(&self) -> usize {
return self.transactions.bytes()
}
fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
length > self.max_transactions_count ||
current_total_bytes > self.max_transactions_total_bytes
}
fn try_insert(
&self,
hash: ExtrinsicHash<ChainApi>,
tx: TxInMemPool<ChainApi, Block>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
let bytes = self.transactions.bytes();
let mut transactions = self.transactions.write();
let result = match (
!self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
transactions.contains_key(&hash),
) {
(true, false) => {
let source = tx.source();
transactions.insert(hash, Arc::from(tx));
Ok(InsertionInfo::new(hash, source))
},
(_, true) =>
Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()),
(false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()),
};
log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, result.as_ref().map(|r| r.hash));
result
}
pub(super) fn extend_unwatched(
&self,
source: TransactionSource,
xts: &[ExtrinsicFor<ChainApi>],
) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error>> {
let result = xts
.iter()
.map(|xt| {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length))
})
.collect::<Vec<_>>();
result
}
pub(super) fn push_watched(
&self,
source: TransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
}
pub(super) async fn remove_dropped_transaction(
&self,
dropped: &ExtrinsicHash<ChainApi>,
) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
log::debug!(target: LOG_TARGET, "[{:?}] mempool::remove_dropped_transaction", dropped);
self.transactions.write().remove(dropped)
}
pub(super) fn clone_unwatched(
&self,
) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions
.read()
.iter()
.filter_map(|(hash, tx)| (!tx.is_watched()).then(|| (*hash, tx.clone())))
.collect::<HashMap<_, _>>()
}
pub(super) fn clone_watched(
&self,
) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions
.read()
.iter()
.filter_map(|(hash, tx)| (tx.is_watched()).then(|| (*hash, tx.clone())))
.collect::<HashMap<_, _>>()
}
pub(super) fn remove(&self, hash: ExtrinsicHash<ChainApi>) {
let _ = self.transactions.write().remove(&hash);
}
async fn revalidate_inner(&self, finalized_block: HashAndNumber<Block>) -> Vec<Block::Hash> {
log::trace!(target: LOG_TARGET, "mempool::revalidate at:{finalized_block:?}");
let start = Instant::now();
let (count, input) = {
let transactions = self.transactions.clone_map();
(
transactions.len(),
transactions
.into_iter()
.filter(|xt| {
let finalized_block_number = finalized_block.number.into().as_u64();
xt.1.validated_at.load(atomic::Ordering::Relaxed) +
TXMEMPOOL_REVALIDATION_PERIOD <
finalized_block_number
})
.sorted_by_key(|tx| tx.1.validated_at.load(atomic::Ordering::Relaxed))
.take(TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE),
)
};
let validations_futures = input.into_iter().map(|(xt_hash, xt)| {
self.api
.validate_transaction(finalized_block.hash, xt.source.clone().into(), xt.tx())
.map(move |validation_result| {
xt.validated_at
.store(finalized_block.number.into().as_u64(), atomic::Ordering::Relaxed);
(xt_hash, validation_result)
})
});
let validation_results = futures::future::join_all(validations_futures).await;
let input_len = validation_results.len();
let duration = start.elapsed();
let invalid_hashes = validation_results
.into_iter()
.filter_map(|(xt_hash, validation_result)| match validation_result {
Ok(Ok(_)) |
Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => None,
Err(_) |
Ok(Err(TransactionValidityError::Unknown(_))) |
Ok(Err(TransactionValidityError::Invalid(_))) => {
log::trace!(
target: LOG_TARGET,
"[{:?}]: Purging: invalid: {:?}",
xt_hash,
validation_result,
);
Some(xt_hash)
},
})
.collect::<Vec<_>>();
log::debug!(
target: LOG_TARGET,
"mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} invalid_hashes:{} took {duration:?}", invalid_hashes.len(),
);
invalid_hashes
}
pub(super) async fn purge_finalized_transactions(
&self,
finalized_xts: &Vec<ExtrinsicHash<ChainApi>>,
) {
log::debug!(target: LOG_TARGET, "purge_finalized_transactions count:{:?}", finalized_xts.len());
log_xt_trace!(target: LOG_TARGET, finalized_xts, "[{:?}] purged finalized transactions");
let mut transactions = self.transactions.write();
finalized_xts.iter().for_each(|t| {
transactions.remove(t);
});
}
pub(super) async fn revalidate(&self, finalized_block: HashAndNumber<Block>) {
log::trace!(target: LOG_TARGET, "purge_transactions at:{:?}", finalized_block);
let invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
self.metrics.report(|metrics| {
metrics.mempool_revalidation_invalid_txs.inc_by(invalid_hashes.len() as _)
});
let mut transactions = self.transactions.write();
invalid_hashes.iter().for_each(|i| {
transactions.remove(i);
});
self.listener.invalidate_transactions(&invalid_hashes);
}
}
#[cfg(test)]
mod tx_mem_pool_tests {
use super::*;
use crate::{common::tests::TestApi, graph::ChainApi};
use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
use substrate_test_runtime_client::Sr25519Keyring::*;
fn uxt(nonce: u64) -> Extrinsic {
crate::common::tests::uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
})
}
#[test]
fn extend_unwatched_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn extend_unwatched_detects_already_imported() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
xts.push(xts.iter().last().unwrap().clone());
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max - 1).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
}
#[test]
fn push_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
let xt = Arc::from(uxt(98));
let result = mempool.push_watched(TransactionSource::External, xt);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
#[test]
fn push_detects_already_imported() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let xt0 = xts.iter().last().unwrap().clone();
let xt1 = xts.iter().next().unwrap().clone();
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
let result = mempool.push_watched(TransactionSource::External, xt0);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt1]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
}
#[test]
fn count_works() {
let max = 100;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, &xts0);
assert!(results.iter().all(Result::is_ok));
let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
let results = xts1
.into_iter()
.map(|t| mempool.push_watched(TransactionSource::External, t))
.collect::<Vec<_>>();
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.unwatched_and_watched_count(), (10, 5));
}
fn large_uxt(x: usize) -> Extrinsic {
ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
}
#[test]
fn push_obeys_size_limit() {
sp_tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
let xt = Arc::from(large_uxt(98));
let result = mempool.push_watched(TransactionSource::External, xt);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(large_uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
}
}