use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc};
use crate::LOG_TARGET;
use log::{trace, warn};
use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
use serde::Serialize;
use sp_core::hexdisplay::HexDisplay;
use sp_runtime::{
traits::Member,
transaction_validity::{
TransactionLongevity as Longevity, TransactionPriority as Priority,
TransactionSource as Source, TransactionTag as Tag,
},
};
use super::{
future::{FutureTransactions, WaitingTransaction},
ready::{BestIterator, ReadyTransactions, TransactionRef},
};
#[derive(Debug, PartialEq, Eq)]
pub enum Imported<Hash, Ex> {
Ready {
hash: Hash,
promoted: Vec<Hash>,
failed: Vec<Hash>,
removed: Vec<Arc<Transaction<Hash, Ex>>>,
},
Future {
hash: Hash,
},
}
impl<Hash, Ex> Imported<Hash, Ex> {
pub fn hash(&self) -> &Hash {
use self::Imported::*;
match *self {
Ready { ref hash, .. } => hash,
Future { ref hash, .. } => hash,
}
}
}
#[derive(Debug)]
pub struct PruneStatus<Hash, Ex> {
pub promoted: Vec<Imported<Hash, Ex>>,
pub failed: Vec<Hash>,
pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
}
#[derive(PartialEq, Eq, Clone)]
pub struct Transaction<Hash, Extrinsic> {
pub data: Extrinsic,
pub bytes: usize,
pub hash: Hash,
pub priority: Priority,
pub valid_till: Longevity,
pub requires: Vec<Tag>,
pub provides: Vec<Tag>,
pub propagate: bool,
pub source: Source,
}
impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
fn as_ref(&self) -> &Extrinsic {
&self.data
}
}
impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
type Transaction = Extrinsic;
type Hash = Hash;
fn data(&self) -> &Extrinsic {
&self.data
}
fn hash(&self) -> &Hash {
&self.hash
}
fn priority(&self) -> &Priority {
&self.priority
}
fn longevity(&self) -> &Longevity {
&self.valid_till
}
fn requires(&self) -> &[Tag] {
&self.requires
}
fn provides(&self) -> &[Tag] {
&self.provides
}
fn is_propagable(&self) -> bool {
self.propagate
}
}
impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
pub fn duplicate(&self) -> Self {
Self {
data: self.data.clone(),
bytes: self.bytes,
hash: self.hash.clone(),
priority: self.priority,
source: self.source,
valid_till: self.valid_till,
requires: self.requires.clone(),
provides: self.provides.clone(),
propagate: self.propagate,
}
}
}
impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
where
Hash: fmt::Debug,
Extrinsic: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let join_tags = |tags: &[Tag]| {
tags.iter()
.map(|tag| HexDisplay::from(tag).to_string())
.collect::<Vec<_>>()
.join(", ")
};
write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.hash)?;
write!(fmt, "priority: {:?}, ", &self.priority)?;
write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
write!(fmt, "bytes: {:?}, ", &self.bytes)?;
write!(fmt, "propagate: {:?}, ", &self.propagate)?;
write!(fmt, "source: {:?}, ", &self.source)?;
write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
write!(fmt, "data: {:?}", &self.data)?;
write!(fmt, "}}")?;
Ok(())
}
}
const RECENTLY_PRUNED_TAGS: usize = 2;
#[derive(Clone, Debug)]
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
reject_future_transactions: bool,
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
recently_pruned_index: usize,
}
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
fn default() -> Self {
Self::new(false)
}
}
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
pub fn new(reject_future_transactions: bool) -> Self {
Self {
reject_future_transactions,
future: Default::default(),
ready: Default::default(),
recently_pruned: Default::default(),
recently_pruned_index: 0,
}
}
pub fn clear_recently_pruned(&mut self) {
self.recently_pruned = Default::default();
self.recently_pruned_index = 0;
}
pub(crate) fn with_futures_enabled<T>(
&mut self,
closure: impl FnOnce(&mut Self, bool) -> T,
) -> T {
let previous = self.reject_future_transactions;
self.reject_future_transactions = false;
let return_value = closure(self, previous);
self.reject_future_transactions = previous;
return_value
}
pub fn is_imported(&self, tx_hash: &Hash) -> bool {
self.future.contains(tx_hash) || self.ready.contains(tx_hash)
}
pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
if self.is_imported(&tx.hash) {
return Err(error::Error::AlreadyImported(Box::new(tx.hash)))
}
let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
trace!(
target: LOG_TARGET,
"[{:?}] Importing {:?} to {}",
tx.transaction.hash,
tx,
if tx.is_ready() { "ready" } else { "future" }
);
if !tx.is_ready() {
if self.reject_future_transactions {
return Err(error::Error::RejectedFutureTransaction)
}
let hash = tx.transaction.hash.clone();
self.future.import(tx);
return Ok(Imported::Future { hash })
}
self.import_to_ready(tx)
}
fn import_to_ready(
&mut self,
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
let hash = tx.transaction.hash.clone();
let mut promoted = vec![];
let mut failed = vec![];
let mut removed = vec![];
let mut first = true;
let mut to_import = vec![tx];
while let Some(tx) = to_import.pop() {
to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
let current_hash = tx.transaction.hash.clone();
match self.ready.import(tx) {
Ok(mut replaced) => {
if !first {
promoted.push(current_hash);
}
removed.append(&mut replaced);
},
Err(e) =>
if first {
trace!(target: LOG_TARGET, "[{:?}] Error importing: {:?}", current_hash, e);
return Err(e)
} else {
failed.push(current_hash);
},
}
first = false;
}
if removed.iter().any(|tx| tx.hash == hash) {
self.ready.remove_subtree(&promoted);
trace!(target: LOG_TARGET, "[{:?}] Cycle detected, bailing.", hash);
return Err(error::Error::CycleDetected)
}
Ok(Imported::Ready { hash, promoted, failed, removed })
}
pub fn ready(&self) -> BestIterator<Hash, Ex> {
self.ready.get()
}
pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
self.future.all()
}
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.by_hashes(hashes);
let future = self.future.by_hashes(hashes);
ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
}
pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
self.ready.by_hash(hash)
}
pub fn enforce_limits(
&mut self,
ready: &Limit,
future: &Limit,
) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
let transaction = ¤t.transaction;
worst
.map(|worst| {
match worst.transaction.priority.cmp(&transaction.transaction.priority) {
Ordering::Less => worst,
Ordering::Equal =>
if worst.insertion_id > transaction.insertion_id {
transaction.clone()
} else {
worst
},
Ordering::Greater => transaction.clone(),
}
})
.or_else(|| Some(transaction.clone()))
});
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
}
while future.is_exceeded(self.future.len(), self.future.bytes()) {
let worst = self.future.fold(|worst, current| match worst {
None => Some(current.clone()),
Some(ref tx) if tx.imported_at > current.imported_at => Some(current.clone()),
other => other,
});
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
}
removed
}
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_subtree(hashes);
removed.extend(self.future.remove(hashes));
removed
}
pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
self.future.clear()
}
pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
let mut to_import = vec![];
let mut pruned = vec![];
let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
recently_pruned.clear();
let tags = tags.into_iter().collect::<Vec<_>>();
let futures_removed = self.future.prune_tags(&tags);
for tag in tags {
to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
pruned.append(&mut self.ready.prune_tags(tag.clone()));
recently_pruned.insert(tag);
}
let mut promoted = vec![];
let mut failed = vec![];
for tx in futures_removed {
failed.push(tx.hash.clone());
}
for tx in to_import {
let hash = tx.transaction.hash.clone();
match self.import_to_ready(tx) {
Ok(res) => promoted.push(res),
Err(e) => {
warn!(
target: LOG_TARGET,
"[{:?}] Failed to promote during pruning: {:?}", hash, e,
);
failed.push(hash)
},
}
}
PruneStatus { pruned, failed, promoted }
}
pub fn status(&self) -> PoolStatus {
PoolStatus {
ready: self.ready.len(),
ready_bytes: self.ready.bytes(),
future: self.future.len(),
future_bytes: self.future.bytes(),
}
}
}
#[derive(Debug, Clone)]
pub struct Limit {
pub count: usize,
pub total_bytes: usize,
}
impl Limit {
pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
self.count < count || self.total_bytes < bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
type Hash = u64;
fn pool() -> BasePool<Hash, Vec<u8>> {
BasePool::default()
}
fn default_tx() -> Transaction<Hash, Vec<u8>> {
Transaction {
data: vec![],
bytes: 1,
hash: 1u64,
priority: 5u64,
valid_till: 64u64,
requires: vec![],
provides: vec![],
propagate: true,
source: Source::External,
}
}
#[test]
fn prune_for_ready_works() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
provides: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.ready.len(), 1);
let result = pool.prune_tags(vec![vec![2]]);
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
assert_eq!(result.pruned.len(), 1);
assert_eq!(result.failed.len(), 0);
assert_eq!(result.promoted.len(), 0);
}
#[test]
fn prune_for_future_works() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![1]],
provides: vec![vec![2]],
hash: 0xaa,
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.futures().count(), 1);
assert_eq!(pool.future.len(), 1);
let result = pool.prune_tags(vec![vec![2]]);
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
assert_eq!(pool.futures().count(), 0);
assert_eq!(pool.future.len(), 0);
assert_eq!(result.pruned.len(), 0);
assert_eq!(result.failed.len(), 1);
assert_eq!(result.failed[0], 0xaa);
assert_eq!(result.promoted.len(), 0);
}
#[test]
fn should_import_transaction_to_ready() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.ready.len(), 1);
}
#[test]
fn should_not_import_same_transaction_twice() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![1u8].into(),
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap_err();
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.ready.len(), 1);
}
#[test]
fn should_import_transaction_to_future_and_promote_it_later() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![0]],
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
provides: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 2);
assert_eq!(pool.ready.len(), 2);
}
#[test]
fn should_promote_a_subgraph() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![0]],
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![3u8].into(),
hash: 3,
requires: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
requires: vec![vec![1]],
provides: vec![vec![3], vec![2]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![4]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
let res = pool
.import(Transaction {
data: vec![5u8].into(),
hash: 5,
provides: vec![vec![0], vec![4]],
..default_tx().clone()
})
.unwrap();
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), None);
assert_eq!(
res,
Imported::Ready {
hash: 5,
promoted: vec![1, 2, 3, 4],
failed: vec![],
removed: vec![],
}
);
}
#[test]
fn should_handle_a_cycle() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![0]],
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![3u8].into(),
hash: 3,
requires: vec![vec![1]],
provides: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
requires: vec![vec![2]],
provides: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
{
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
}
assert_eq!(pool.future.len(), 3);
let res = pool
.import(Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 50u64,
provides: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), None);
assert_eq!(
res,
Imported::Ready { hash: 4, promoted: vec![1, 3], failed: vec![2], removed: vec![] }
);
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_handle_a_cycle_with_low_priority() {
let mut pool = pool();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![0]],
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![3u8].into(),
hash: 3,
requires: vec![vec![1]],
provides: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
requires: vec![vec![2]],
provides: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
{
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
}
assert_eq!(pool.future.len(), 3);
let err = pool
.import(Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1u64, provides: vec![vec![0]],
..default_tx().clone()
})
.unwrap_err();
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
assert_eq!(pool.ready.len(), 0);
assert_eq!(pool.future.len(), 0);
if let error::Error::CycleDetected = err {
} else {
assert!(false, "Invalid error kind: {:?}", err);
}
}
#[test]
fn should_remove_invalid_transactions() {
let mut pool = pool();
pool.import(Transaction {
data: vec![5u8].into(),
hash: 5,
provides: vec![vec![0], vec![4]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![1u8].into(),
requires: vec![vec![0]],
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![3u8].into(),
hash: 3,
requires: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
requires: vec![vec![1]],
provides: vec![vec![3], vec![2]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![4]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![6u8].into(),
hash: 6,
priority: 1_000u64,
requires: vec![vec![11]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 5);
assert_eq!(pool.future.len(), 1);
pool.remove_subtree(&[6, 1]);
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_prune_ready_transactions() {
let mut pool = pool();
pool.import(Transaction {
data: vec![5u8].into(),
hash: 5,
requires: vec![vec![0]],
provides: vec![vec![100]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![1u8].into(),
provides: vec![vec![1]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![2u8].into(),
hash: 2,
requires: vec![vec![2]],
provides: vec![vec![3]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![3u8].into(),
hash: 3,
requires: vec![vec![1]],
provides: vec![vec![2]],
..default_tx().clone()
})
.unwrap();
pool.import(Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![2]],
provides: vec![vec![4]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.ready().count(), 4);
assert_eq!(pool.future.len(), 1);
let result = pool.prune_tags(vec![vec![0], vec![2]]);
assert_eq!(result.pruned.len(), 2);
assert_eq!(result.failed.len(), 0);
assert_eq!(
result.promoted[0],
Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
);
assert_eq!(result.promoted.len(), 1);
assert_eq!(pool.future.len(), 0);
assert_eq!(pool.ready.len(), 3);
assert_eq!(pool.ready().count(), 3);
}
#[test]
fn transaction_debug() {
assert_eq!(
format!(
"{:?}",
Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![2]],
provides: vec![vec![4]],
..default_tx().clone()
}
),
"Transaction { \
hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
source: TransactionSource::External, requires: [03, 02], provides: [04], data: [4]}"
.to_owned()
);
}
#[test]
fn transaction_propagation() {
assert_eq!(
Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![2]],
provides: vec![vec![4]],
..default_tx().clone()
}
.is_propagable(),
true
);
assert_eq!(
Transaction {
data: vec![4u8].into(),
hash: 4,
priority: 1_000u64,
requires: vec![vec![3], vec![2]],
provides: vec![vec![4]],
propagate: false,
..default_tx().clone()
}
.is_propagable(),
false
);
}
#[test]
fn should_reject_future_transactions() {
let mut pool = pool();
pool.reject_future_transactions = true;
let err = pool.import(Transaction {
data: vec![5u8].into(),
hash: 5,
requires: vec![vec![0]],
..default_tx().clone()
});
if let Err(error::Error::RejectedFutureTransaction) = err {
} else {
assert!(false, "Invalid error kind: {:?}", err);
}
}
#[test]
fn should_clear_future_queue() {
let mut pool = pool();
pool.import(Transaction {
data: vec![5u8].into(),
hash: 5,
requires: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
assert_eq!(pool.future.len(), 1);
assert_eq!(pool.clear_future().len(), 1);
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_accept_future_transactions_when_explicitly_asked_to() {
let mut pool = pool();
pool.reject_future_transactions = true;
let flag_value = pool.with_futures_enabled(|pool, flag| {
pool.import(Transaction {
data: vec![5u8].into(),
hash: 5,
requires: vec![vec![0]],
..default_tx().clone()
})
.unwrap();
flag
});
assert_eq!(flag_value, true);
assert_eq!(pool.reject_future_transactions, true);
assert_eq!(pool.future.len(), 1);
}
}