use std::{
cmp,
collections::{BTreeSet, HashMap, HashSet},
hash,
sync::Arc,
};
use crate::LOG_TARGET;
use log::{debug, trace};
use sc_transaction_pool_api::error;
use serde::Serialize;
use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
use super::{
base_pool::Transaction,
future::WaitingTransaction,
tracked_map::{self, TrackedMap},
};
#[derive(Debug)]
pub struct TransactionRef<Hash, Ex> {
pub transaction: Arc<Transaction<Hash, Ex>>,
pub insertion_id: u64,
}
impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
fn clone(&self) -> Self {
Self { transaction: self.transaction.clone(), insertion_id: self.insertion_id }
}
}
impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.transaction
.priority
.cmp(&other.transaction.priority)
.then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till))
.then_with(|| other.insertion_id.cmp(&self.insertion_id))
}
}
impl<Hash, Ex> PartialOrd for TransactionRef<Hash, Ex> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<Hash, Ex> PartialEq for TransactionRef<Hash, Ex> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}
impl<Hash, Ex> Eq for TransactionRef<Hash, Ex> {}
#[derive(Debug)]
pub struct ReadyTx<Hash, Ex> {
pub transaction: TransactionRef<Hash, Ex>,
pub unlocks: Vec<Hash>,
pub requires_offset: usize,
}
impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
}
}
const HASH_READY: &str = r#"
Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
Hence every hash retrieved from `provided_tags` is always present in `ready`;
qed
"#;
#[derive(Debug)]
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
insertion_id: u64,
provided_tags: HashMap<Tag, Hash>,
ready: TrackedMap<Hash, ReadyTx<Hash, Ex>>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {
fn size(&self) -> usize {
self.transaction.transaction.bytes
}
}
impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
Self {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
best: Default::default(),
}
}
}
impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
pub fn provided_tags(&self) -> &HashMap<Tag, Hash> {
&self.provided_tags
}
pub fn get(&self) -> BestIterator<Hash, Ex> {
BestIterator {
all: self.ready.clone_map(),
best: self.best.clone(),
awaiting: Default::default(),
invalid: Default::default(),
}
}
pub fn import(
&mut self,
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
assert!(
tx.is_ready(),
"Only ready transactions can be imported. Missing: {:?}",
tx.missing_tags
);
assert!(
!self.ready.read().contains_key(&tx.transaction.hash),
"Transaction is already imported."
);
self.insertion_id += 1;
let insertion_id = self.insertion_id;
let hash = tx.transaction.hash.clone();
let transaction = tx.transaction;
let (replaced, unlocks) = self.replace_previous(&transaction)?;
let mut goes_to_best = true;
let mut ready = self.ready.write();
let mut requires_offset = 0;
for tag in &transaction.requires {
if let Some(other) = self.provided_tags.get(tag) {
let tx = ready.get_mut(other).expect(HASH_READY);
tx.unlocks.push(hash.clone());
goes_to_best = false;
} else {
requires_offset += 1;
}
}
for tag in &transaction.provides {
self.provided_tags.insert(tag.clone(), hash.clone());
}
let transaction = TransactionRef { insertion_id, transaction };
if goes_to_best {
self.best.insert(transaction.clone());
}
ready.insert(hash, ReadyTx { transaction, unlocks, requires_offset });
Ok(replaced)
}
pub fn fold<R, F: FnMut(Option<R>, &ReadyTx<Hash, Ex>) -> Option<R>>(
&mut self,
f: F,
) -> Option<R> {
self.ready.read().values().fold(None, f)
}
pub fn contains(&self, hash: &Hash) -> bool {
self.ready.read().contains_key(hash)
}
pub fn by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
}
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.read();
hashes
.iter()
.map(|hash| ready.get(hash).map(|x| x.transaction.transaction.clone()))
.collect()
}
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let to_remove = hashes.to_vec();
self.remove_subtree_with_tag_filter(to_remove, None)
}
fn remove_subtree_with_tag_filter(
&mut self,
mut to_remove: Vec<Hash>,
provides_tag_filter: Option<HashSet<Tag>>,
) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut ready = self.ready.write();
while let Some(hash) = to_remove.pop() {
if let Some(mut tx) = ready.remove(&hash) {
let invalidated = tx.transaction.transaction.provides.iter().filter(|tag| {
provides_tag_filter
.as_ref()
.map(|filter| !filter.contains(&**tag))
.unwrap_or(true)
});
let mut removed_some_tags = false;
for tag in invalidated {
removed_some_tags = true;
self.provided_tags.remove(tag);
}
for tag in &tx.transaction.transaction.requires {
if let Some(hash) = self.provided_tags.get(tag) {
if let Some(tx) = ready.get_mut(hash) {
remove_item(&mut tx.unlocks, hash);
}
}
}
self.best.remove(&tx.transaction);
if removed_some_tags {
to_remove.append(&mut tx.unlocks);
}
trace!(target: LOG_TARGET, "[{:?}] Removed as part of the subtree.", hash);
removed.push(tx.transaction.transaction);
}
}
removed
}
pub fn prune_tags(&mut self, tag: Tag) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = vec![tag];
while let Some(tag) = to_remove.pop() {
let res = self
.provided_tags
.remove(&tag)
.and_then(|hash| self.ready.write().remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
self.best.remove(&tx.transaction);
let tx = tx.transaction.transaction;
{
let hash = &tx.hash;
let mut ready = self.ready.write();
let mut find_previous = |tag| -> Option<Vec<Tag>> {
let prev_hash = self.provided_tags.get(tag)?;
let tx2 = ready.get_mut(prev_hash)?;
remove_item(&mut tx2.unlocks, hash);
if tx2.unlocks.is_empty() {
Some(tx2.transaction.transaction.provides.clone())
} else {
None
}
};
for tag in &tx.requires {
if let Some(mut tags_to_remove) = find_previous(tag) {
to_remove.append(&mut tags_to_remove);
}
}
}
for hash in unlocks {
if let Some(tx) = self.ready.write().get_mut(&hash) {
tx.requires_offset += 1;
if tx.requires_offset == tx.transaction.transaction.requires.len() {
self.best.insert(tx.transaction.clone());
}
}
}
let current_tag = &tag;
for tag in &tx.provides {
let removed = self.provided_tags.remove(tag);
assert_eq!(
removed.as_ref(),
if current_tag == tag { None } else { Some(&tx.hash) },
"The pool contains exactly one transaction providing given tag; the removed transaction
claims to provide that tag, so it has to be mapped to it's hash; qed"
);
}
removed.push(tx);
}
}
removed
}
fn replace_previous(
&mut self,
tx: &Transaction<Hash, Ex>,
) -> error::Result<(Vec<Arc<Transaction<Hash, Ex>>>, Vec<Hash>)> {
let (to_remove, unlocks) = {
let replace_hashes = tx
.provides
.iter()
.filter_map(|tag| self.provided_tags.get(tag))
.collect::<HashSet<_>>();
if replace_hashes.is_empty() {
return Ok((vec![], vec![]))
}
let old_priority = {
let ready = self.ready.read();
replace_hashes
.iter()
.filter_map(|hash| ready.get(hash))
.fold(0u64, |total, tx| {
total.saturating_add(tx.transaction.transaction.priority)
})
};
if old_priority >= tx.priority {
return Err(error::Error::TooLowPriority { old: old_priority, new: tx.priority })
}
let unlocks = {
let ready = self.ready.read();
replace_hashes.iter().filter_map(|hash| ready.get(hash)).fold(
vec![],
|mut list, tx| {
list.extend(tx.unlocks.iter().cloned());
list
},
)
};
(replace_hashes.into_iter().cloned().collect::<Vec<_>>(), unlocks)
};
let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
let removed = self.remove_subtree_with_tag_filter(to_remove, Some(new_provides));
Ok((removed, unlocks))
}
pub fn len(&self) -> usize {
self.ready.len()
}
pub fn bytes(&self) -> usize {
self.ready.bytes()
}
}
pub struct BestIterator<Hash, Ex> {
all: HashMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
invalid: HashSet<Hash>,
}
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
if satisfied >= tx_ref.transaction.requires.len() {
self.best.insert(tx_ref);
} else {
self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref));
}
}
}
impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
for BestIterator<Hash, Ex>
{
fn report_invalid(&mut self, tx: &Self::Item) {
BestIterator::report_invalid(self, tx)
}
}
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
if let Some(to_report) = self.all.get(&tx.hash) {
debug!(
target: LOG_TARGET,
"[{:?}] Reported as invalid. Will skip sub-chains while iterating.",
to_report.transaction.transaction.hash
);
for hash in &to_report.unlocks {
self.invalid.insert(hash.clone());
}
}
}
}
impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let hash = &best.transaction.hash;
if self.invalid.contains(hash) {
debug!(
target: LOG_TARGET,
"[{:?}] Skipping invalid child transaction while iterating.", hash,
);
continue
}
let ready = match self.all.get(hash).cloned() {
Some(ready) => ready,
None => continue,
};
for hash in &ready.unlocks {
let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
Some((satisfied, tx_ref))
} else {
self.all
.get(hash)
.map(|next| (next.requires_offset + 1, next.transaction.clone()))
};
if let Some((satisfied, tx_ref)) = res {
self.best_or_awaiting(satisfied, tx_ref)
}
}
return Some(best.transaction)
}
}
}
fn remove_item<T: PartialEq>(vec: &mut Vec<T>, item: &T) {
if let Some(idx) = vec.iter().position(|i| i == item) {
vec.swap_remove(idx);
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::transaction_validity::TransactionSource as Source;
fn tx(id: u8) -> Transaction<u64, Vec<u8>> {
Transaction {
data: vec![id],
bytes: 1,
hash: id as u64,
priority: 1,
valid_till: 2,
requires: vec![vec![1], vec![2]],
provides: vec![vec![3], vec![4]],
propagate: true,
source: Source::External,
}
}
fn import<H: hash::Hash + Eq + Member + Serialize, Ex>(
ready: &mut ReadyTransactions<H, Ex>,
tx: Transaction<H, Ex>,
) -> error::Result<Vec<Arc<Transaction<H, Ex>>>> {
let x = WaitingTransaction::new(tx, ready.provided_tags(), &[]);
ready.import(x)
}
#[test]
fn should_replace_transaction_that_provides_the_same_tag() {
let mut ready = ReadyTransactions::default();
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires.clear();
tx2.provides = vec![vec![3]];
let mut tx3 = tx(3);
tx3.requires.clear();
tx3.provides = vec![vec![4]];
import(&mut ready, tx2).unwrap();
import(&mut ready, tx3).unwrap();
assert_eq!(ready.get().count(), 2);
import(&mut ready, tx1.clone()).unwrap_err();
tx1.priority = 10;
import(&mut ready, tx1).unwrap();
assert_eq!(ready.get().count(), 1);
}
#[test]
fn should_replace_multiple_transactions_correctly() {
let mut ready = ReadyTransactions::default();
let mut tx0 = tx(0);
tx0.requires = vec![];
tx0.provides = vec![vec![0]];
let mut tx1 = tx(1);
tx1.requires = vec![];
tx1.provides = vec![vec![1]];
let mut tx2 = tx(2);
tx2.requires = vec![vec![0], vec![1]];
tx2.provides = vec![vec![2], vec![3]];
let mut tx3 = tx(3);
tx3.requires = vec![vec![2]];
tx3.provides = vec![vec![4]];
let mut tx4 = tx(4);
tx4.requires = vec![vec![3]];
tx4.provides = vec![vec![5]];
let mut tx2_2 = tx(5);
tx2_2.requires = vec![vec![0], vec![1]];
tx2_2.provides = vec![vec![2]];
tx2_2.priority = 10;
for tx in vec![tx0, tx1, tx2, tx3, tx4] {
import(&mut ready, tx).unwrap();
}
assert_eq!(ready.get().count(), 5);
import(&mut ready, tx2_2).unwrap();
assert_eq!(ready.get().count(), 3);
}
fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires = tx1.provides.clone();
tx2.provides = vec![vec![106]];
let mut tx3 = tx(3);
tx3.requires = vec![tx1.provides[0].clone(), vec![106]];
tx3.provides = vec![];
let mut tx4 = tx(4);
tx4.requires = vec![tx1.provides[0].clone()];
tx4.provides = vec![vec![107]];
let mut tx5 = tx(5);
tx5.requires = vec![tx4.provides[0].clone()];
tx5.provides = vec![vec![108]];
let mut tx6 = tx(6);
tx6.requires = vec![tx5.provides[0].clone()];
tx6.provides = vec![];
let tx7 = Transaction {
data: vec![7],
bytes: 1,
hash: 7,
priority: 1,
valid_till: u64::MAX, requires: vec![tx1.provides[0].clone()],
provides: vec![],
propagate: true,
source: Source::External,
};
for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
import(ready, tx).unwrap();
}
assert_eq!(ready.best.len(), 1);
}
#[test]
fn should_return_best_transactions_in_correct_order() {
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);
let mut it = ready.get().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), Some(6));
assert_eq!(it.next(), Some(7));
assert_eq!(it.next(), None);
}
#[test]
fn should_order_refs() {
let mut id = 1;
let mut with_priority = |priority, longevity| {
id += 1;
let mut tx = tx(id);
tx.priority = priority;
tx.valid_till = longevity;
tx
};
assert!(
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
TransactionRef { transaction: Arc::new(with_priority(2, 3)), insertion_id: 2 }
);
assert!(
TransactionRef { transaction: Arc::new(with_priority(3, 2)), insertion_id: 1 } >
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
);
assert!(
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 1 } >
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
);
}
#[test]
fn should_skip_invalid_transactions_while_iterating() {
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);
let mut it = ready.get();
let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];
assert_eq!(it.next().as_ref().map(data), Some(1));
assert_eq!(it.next().as_ref().map(data), Some(2));
assert_eq!(it.next().as_ref().map(data), Some(3));
let tx4 = it.next();
assert_eq!(tx4.as_ref().map(data), Some(4));
it.report_invalid(&tx4.unwrap());
assert_eq!(it.next().as_ref().map(data), Some(7));
assert_eq!(it.next().as_ref().map(data), None);
}
}