use std::{
collections::{HashMap, HashSet},
fmt, hash,
sync::Arc,
};
use sp_core::hexdisplay::HexDisplay;
use sp_runtime::transaction_validity::TransactionTag as Tag;
use std::time::Instant;
use super::base_pool::Transaction;
use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
pub struct WaitingTransaction<Hash, Ex> {
pub transaction: Arc<Transaction<Hash, Ex>>,
pub missing_tags: HashSet<Tag>,
pub imported_at: Instant,
}
impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "WaitingTransaction {{ ")?;
write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
write!(fmt, "transaction: {:?}, ", self.transaction)?;
write!(
fmt,
"missing_tags: {{{}}}",
self.missing_tags
.iter()
.map(|tag| HexDisplay::from(tag).to_string())
.collect::<Vec<_>>()
.join(", "),
)?;
write!(fmt, "}}")
}
}
impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
missing_tags: self.missing_tags.clone(),
imported_at: self.imported_at,
}
}
}
impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
pub fn new(
transaction: Transaction<Hash, Ex>,
provided: &HashMap<Tag, Hash>,
recently_pruned: &[HashSet<Tag>],
) -> Self {
let missing_tags = transaction
.requires
.iter()
.filter(|tag| {
let is_provided = provided.contains_key(&**tag) ||
recently_pruned.iter().any(|x| x.contains(&**tag));
!is_provided
})
.cloned()
.collect();
Self { transaction: Arc::new(transaction), missing_tags, imported_at: Instant::now() }
}
pub fn satisfy_tag(&mut self, tag: &Tag) {
self.missing_tags.remove(tag);
}
pub fn is_ready(&self) -> bool {
self.missing_tags.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
wanted_tags: HashMap<Tag, HashSet<Hash>>,
waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
}
impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
fn default() -> Self {
Self { wanted_tags: Default::default(), waiting: Default::default() }
}
}
const WAITING_PROOF: &str = r"#
In import we always insert to `waiting` if we push to `wanted_tags`;
when removing from `waiting` we always clear `wanted_tags`;
every hash from `wanted_tags` is always present in `waiting`;
qed
#";
impl<Hash: hash::Hash + Eq + Clone + std::fmt::Debug, Ex: std::fmt::Debug>
FutureTransactions<Hash, Ex>
{
pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
assert!(!tx.is_ready(), "Transaction is ready.");
assert!(
!self.waiting.contains_key(&tx.transaction.hash),
"Transaction is already imported."
);
for tag in &tx.missing_tags {
let entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
entry.insert(tx.transaction.hash.clone());
}
self.waiting.insert(tx.transaction.hash.clone(), tx);
}
pub fn contains(&self, hash: &Hash) -> bool {
self.waiting.contains_key(hash)
}
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
hashes
.iter()
.map(|h| self.waiting.get(h).map(|x| x.transaction.clone()))
.collect()
}
pub fn prune_tags(&mut self, tags: &Vec<Tag>) -> Vec<Arc<Transaction<Hash, Ex>>> {
let pruned = self
.waiting
.values()
.filter_map(|tx| {
tx.transaction
.provides
.iter()
.any(|provided_tag| tags.contains(provided_tag))
.then(|| tx.transaction.hash.clone())
})
.collect::<Vec<_>>();
log_xt_trace!(target: LOG_TARGET, &pruned, "[{:?}] FutureTransactions: removed while pruning tags.");
self.remove(&pruned)
}
pub fn satisfy_tags<T: AsRef<Tag>>(
&mut self,
tags: impl IntoIterator<Item = T>,
) -> Vec<WaitingTransaction<Hash, Ex>> {
let mut became_ready = vec![];
for tag in tags {
if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
for hash in hashes {
let is_ready = {
let tx = self.waiting.get_mut(&hash).expect(WAITING_PROOF);
tx.satisfy_tag(tag.as_ref());
tx.is_ready()
};
if is_ready {
let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
became_ready.push(tx);
}
}
}
}
became_ready
}
pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
for hash in hashes {
if let Some(waiting_tx) = self.waiting.remove(hash) {
for tag in waiting_tx.missing_tags {
let remove = if let Some(wanted) = self.wanted_tags.get_mut(&tag) {
wanted.remove(hash);
wanted.is_empty()
} else {
false
};
if remove {
self.wanted_tags.remove(&tag);
}
}
removed.push(waiting_tx.transaction)
}
}
removed
}
pub fn fold<R, F: FnMut(Option<R>, &WaitingTransaction<Hash, Ex>) -> Option<R>>(
&mut self,
f: F,
) -> Option<R> {
self.waiting.values().fold(None, f)
}
pub fn all(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
self.waiting.values().map(|waiting| &*waiting.transaction)
}
pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
self.wanted_tags.clear();
self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
}
pub fn len(&self) -> usize {
self.waiting.len()
}
pub fn bytes(&self) -> usize {
self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes)
}
}