use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
use futures::{channel::mpsc::Receiver, Future};
use indexmap::IndexMap;
use sc_transaction_pool_api::error;
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_runtime::{
generic::BlockId,
traits::{self, Block as BlockT, SaturatedConversion},
transaction_validity::{
TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
},
};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use super::{
base_pool as base,
validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
watcher::Watcher,
};
pub type EventStream<H> = Receiver<H>;
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
pub type ValidatedTransactionFor<A> =
ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
pub trait ChainApi: Send + Sync {
type Block: BlockT;
type Error: From<error::Error> + error::IntoPoolError;
type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
+ Unpin
+ Send
+ 'static;
fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture;
fn validate_transaction_blocking(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Result<TransactionValidity, Self::Error>;
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error>;
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
fn block_header(
&self,
at: <Self::Block as BlockT>::Hash,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
fn tree_route(
&self,
from: <Self::Block as BlockT>::Hash,
to: <Self::Block as BlockT>::Hash,
) -> Result<TreeRoute<Self::Block>, Self::Error>;
fn resolve_block_number(
&self,
at: <Self::Block as BlockT>::Hash,
) -> Result<NumberFor<Self>, Self::Error> {
self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
})
}
}
#[derive(Debug, Clone)]
pub struct Options {
pub ready: base::Limit,
pub future: base::Limit,
pub reject_future_transactions: bool,
pub ban_time: Duration,
}
impl Default for Options {
fn default() -> Self {
Self {
ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
reject_future_transactions: false,
ban_time: Duration::from_secs(60 * 30),
}
}
}
#[derive(Copy, Clone)]
enum CheckBannedBeforeVerify {
Yes,
No,
}
pub struct Pool<B: ChainApi> {
validated_pool: Arc<ValidatedPool<B>>,
}
impl<B: ChainApi> Pool<B> {
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
}
pub async fn submit_at(
&self,
at: &HashAndNumber<B::Block>,
xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await;
self.validated_pool.submit(validated_transactions.into_values())
}
pub async fn resubmit_at(
&self,
at: &HashAndNumber<B::Block>,
xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await;
self.validated_pool.submit(validated_transactions.into_values())
}
pub async fn submit_one(
&self,
at: &HashAndNumber<B::Block>,
source: base::TimedTransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<ExtrinsicHash<B>, B::Error> {
let res = self.submit_at(at, std::iter::once((source, xt))).await.pop();
res.expect("One extrinsic passed; one result returned; qed")
}
pub async fn submit_and_watch(
&self,
at: &HashAndNumber<B::Block>,
source: base::TimedTransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
let (_, tx) = self
.verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes)
.await;
self.validated_pool.submit_and_watch(tx)
}
pub fn resubmit(
&self,
revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
) {
let now = Instant::now();
self.validated_pool.resubmit(revalidated_transactions);
log::trace!(
target: LOG_TARGET,
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
self.validated_pool.status()
);
}
pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
let in_pool_tags =
self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
let prune_status = self.validated_pool.prune_tags(in_pool_tags);
let pruned_transactions =
hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
self.validated_pool.fire_pruned(at, pruned_transactions);
}
pub async fn prune(
&self,
at: &HashAndNumber<B::Block>,
parent: <B::Block as BlockT>::Hash,
extrinsics: &[RawExtrinsicFor<B>],
) {
log::debug!(
target: LOG_TARGET,
"Starting pruning of block {:?} (extrinsics: {})",
at,
extrinsics.len()
);
let in_pool_hashes =
extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
let all = extrinsics.iter().zip(in_pool_tags.into_iter());
let mut validated_counter: usize = 0;
let mut future_tags = Vec::new();
for (extrinsic, in_pool_tags) in all {
match in_pool_tags {
Some(tags) => future_tags.extend(tags),
None => {
if !self.validated_pool.status().is_empty() {
validated_counter = validated_counter + 1;
let validity = self
.validated_pool
.api()
.validate_transaction(
parent,
TransactionSource::InBlock,
Arc::from(extrinsic.clone()),
)
.await;
log::trace!(target: LOG_TARGET,"[{:?}] prune::revalidated {:?}", self.validated_pool.api().hash_and_length(&extrinsic.clone()).0, validity);
if let Ok(Ok(validity)) = validity {
future_tags.extend(validity.provides);
}
} else {
log::trace!(
target: LOG_TARGET,
"txpool is empty, skipping validation for block {at:?}",
);
}
},
}
}
log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}");
self.prune_tags(at, future_tags, in_pool_hashes).await
}
pub async fn prune_tags(
&self,
at: &HashAndNumber<B::Block>,
tags: impl IntoIterator<Item = Tag>,
known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
) {
log::trace!(target: LOG_TARGET, "Pruning at {:?}", at);
let prune_status = self.validated_pool.prune_tags(tags);
self.validated_pool
.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
let pruned_transactions =
prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
let reverified_transactions =
self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await;
let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect();
log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len());
log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}");
self.validated_pool.resubmit_pruned(
&at,
known_imported_hashes,
pruned_hashes,
reverified_transactions.into_values().collect(),
)
}
pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
self.validated_pool.api().hash_and_length(xt).0
}
async fn verify(
&self,
at: &HashAndNumber<B::Block>,
xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
check: CheckBannedBeforeVerify,
) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
let HashAndNumber { number, hash } = *at;
let res = futures::future::join_all(
xts.into_iter()
.map(|(source, xt)| self.verify_one(hash, number, source, xt, check)),
)
.await
.into_iter()
.collect::<IndexMap<_, _>>();
res
}
async fn verify_one(
&self,
block_hash: <B::Block as BlockT>::Hash,
block_number: NumberFor<B>,
source: base::TimedTransactionSource,
xt: ExtrinsicFor<B>,
check: CheckBannedBeforeVerify,
) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
return (hash, ValidatedTransaction::Invalid(hash, err))
}
let validation_result = self
.validated_pool
.api()
.validate_transaction(block_hash, source.clone().into(), xt.clone())
.await;
let status = match validation_result {
Ok(status) => status,
Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
};
let validity = match status {
Ok(validity) =>
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash,
source,
xt,
bytes,
validity,
)
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
};
(hash, validity)
}
pub fn validated_pool(&self) -> &ValidatedPool<B> {
&self.validated_pool
}
pub fn clear_recently_pruned(&mut self) {
self.validated_pool.pool.write().clear_recently_pruned();
}
}
impl<B: ChainApi> Pool<B> {
pub fn deep_clone(&self) -> Self {
let other: ValidatedPool<B> = (*self.validated_pool).clone();
Self { validated_pool: Arc::from(other) }
}
}
#[cfg(test)]
mod tests {
use super::{super::base_pool::Limit, *};
use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
use assert_matches::assert_matches;
use base::TimedTransactionSource;
use codec::Encode;
use futures::executor::block_on;
use parking_lot::Mutex;
use sc_transaction_pool_api::TransactionStatus;
use sp_runtime::transaction_validity::TransactionSource;
use std::{collections::HashMap, time::Instant};
use substrate_test_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
const SOURCE: TimedTransactionSource =
TimedTransactionSource { source: TransactionSource::External, timestamp: None };
#[test]
fn should_validate_and_import_transaction() {
let (pool, api) = pool();
let hash = block_on(
pool.submit_one(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
}
#[test]
fn submit_at_preserves_order() {
sp_tracing::try_init_simple();
let (pool, api) = pool();
let txs = (0..10)
.map(|i| {
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(i)),
amount: 5,
nonce: i,
})
.into()
})
.collect::<Vec<_>>();
let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs));
log::debug!("--> {hashes:#?}");
hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
|(result_hash, initial_hash)| {
assert_eq!(result_hash.unwrap(), initial_hash);
},
);
}
#[test]
fn should_reject_if_temporarily_banned() {
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
}
#[test]
fn should_reject_unactionable_transactions() {
let api = Arc::new(TestApi::default());
let pool = Pool::new(
Default::default(),
false.into(),
api.clone(),
);
let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
assert_matches!(res.unwrap_err(), error::Error::Unactionable);
}
#[test]
fn should_notify_about_pool_events() {
let (stream, hash0, hash1) = {
let (pool, api) = pool();
let han_of_block0 = api.expect_hash_and_number(0);
let stream = pool.validated_pool().import_notification_stream();
let hash0 = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
let hash1 = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
})
.into(),
),
)
.unwrap();
let _hash = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 3,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
assert_eq!(pool.validated_pool().status().future, 1);
(stream, hash0, hash1)
};
let mut it = futures::executor::block_on_stream(stream);
assert_eq!(it.next(), Some(hash0));
assert_eq!(it.next(), Some(hash1));
assert_eq!(it.next(), None);
}
#[test]
fn should_clear_stale_transactions() {
let (pool, api) = pool();
let han_of_block0 = api.expect_hash_and_number(0);
let hash1 = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
let hash2 = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
})
.into(),
),
)
.unwrap();
let hash3 = block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 3,
})
.into(),
),
)
.unwrap();
pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
assert_eq!(pool.validated_pool().ready().count(), 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_eq!(pool.validated_pool().status().ready, 0);
assert!(pool.validated_pool.is_banned(&hash1));
assert!(pool.validated_pool.is_banned(&hash2));
assert!(pool.validated_pool.is_banned(&hash3));
}
#[test]
fn should_ban_mined_transactions() {
let (pool, api) = pool();
let hash1 = block_on(
pool.submit_one(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
assert!(pool.validated_pool.is_banned(&hash1));
}
#[test]
fn should_limit_futures() {
sp_tracing::try_init_simple();
let xt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
});
let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let hash1 =
block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
assert_eq!(pool.validated_pool().status().future, 1);
let hash2 = block_on(
pool.submit_one(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Bob.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 10,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().future, 1);
assert!(pool.validated_pool.is_banned(&hash1));
assert!(!pool.validated_pool.is_banned(&hash2));
}
#[test]
fn should_error_if_reject_immediately() {
let limit = Limit { count: 100, total_bytes: 10 };
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
block_on(
pool.submit_one(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
})
.into(),
),
)
.unwrap_err();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
}
#[test]
fn should_reject_transactions_with_no_provides() {
let (pool, api) = pool();
let err = block_on(
pool.submit_one(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: INVALID_NONCE,
})
.into(),
),
)
.unwrap_err();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_matches!(err, error::Error::NoTagsProvided);
}
mod listener {
use super::*;
#[test]
fn should_trigger_ready_and_finalized() {
let (pool, api) = pool();
let watcher = block_on(
pool.submit_and_watch(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
let han_of_block2 = api.expect_hash_and_number(2);
block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
);
}
#[test]
fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
let (pool, api) = pool();
let watcher = block_on(
pool.submit_and_watch(
&api.expect_hash_and_number(0),
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
let han_of_block2 = api.expect_hash_and_number(2);
block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
);
}
#[test]
fn should_trigger_future_and_ready_after_promoted() {
let (pool, api) = pool();
let han_of_block0 = api.expect_hash_and_number(0);
let watcher = block_on(
pool.submit_and_watch(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 1);
block_on(
pool.submit_one(
&han_of_block0,
SOURCE,
uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
})
.into(),
),
)
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Future));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
}
#[test]
fn should_trigger_invalid_and_ban() {
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
pool.validated_pool.remove_invalid(&[*watcher.hash()]);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
assert_eq!(stream.next(), None);
}
#[test]
fn should_trigger_broadcasted() {
let (pool, api) = pool();
let uxt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let mut map = HashMap::new();
let peers = vec!["a".into(), "b".into(), "c".into()];
map.insert(*watcher.hash(), peers.clone());
pool.validated_pool().on_broadcasted(map);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
}
#[test]
fn should_trigger_dropped_older() {
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let xt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let xt = uxt(Transfer {
from: Bob.into(),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
#[test]
fn should_trigger_dropped_lower_priority() {
{
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let xt = uxt(Transfer {
from: Bob.into(),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
let result =
block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
assert!(matches!(
result,
Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
));
}
{
let limit = Limit { count: 2, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let han_of_block0 = api.expect_hash_and_number(0);
let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let xt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
.unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
}
#[test]
fn should_handle_pruning_in_the_middle_of_import() {
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let api = Arc::new(api);
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let han_of_block0 = api.expect_hash_and_number(0);
let xt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
});
let pool2 = pool.clone();
std::thread::spawn({
let hash_of_block0 = han_of_block0.clone();
move || {
block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
ready.send(()).unwrap();
}
});
let xt = uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 4,
nonce: 0,
});
let provides = vec![0_u8];
block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
assert_eq!(pool.validated_pool().status().ready, 0);
tx.send(()).unwrap();
is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
}
}
}