use std::{
collections::{HashMap, HashSet},
hash,
sync::Arc,
};
use crate::LOG_TARGET;
use futures::channel::mpsc::{channel, Sender};
use parking_lot::{Mutex, RwLock};
use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
use serde::Serialize;
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion},
transaction_validity::{TransactionSource, TransactionTag as Tag, ValidTransaction},
};
use std::time::Instant;
use super::{
base_pool::{self as base, PruneStatus},
listener::Listener,
pool::{
BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
},
rotator::PoolRotator,
watcher::Watcher,
};
#[derive(Debug)]
pub enum ValidatedTransaction<Hash, Ex, Error> {
Valid(base::Transaction<Hash, Ex>),
Invalid(Hash, Error),
Unknown(Hash, Error),
}
impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
pub fn valid_at(
at: u64,
hash: Hash,
source: TransactionSource,
data: Ex,
bytes: usize,
validity: ValidTransaction,
) -> Self {
Self::Valid(base::Transaction {
data,
bytes,
hash,
source,
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
})
}
}
pub type ValidatedTransactionFor<B> =
ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
impl From<bool> for IsValidator {
fn from(is_validator: bool) -> Self {
Self(Box::new(move || is_validator))
}
}
impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
Self(is_validator)
}
}
pub struct ValidatedPool<B: ChainApi> {
api: Arc<B>,
is_validator: IsValidator,
options: Options,
listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
rotator: PoolRotator<ExtrinsicHash<B>>,
}
impl<B: ChainApi> ValidatedPool<B> {
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
let ban_time = options.ban_time;
Self {
is_validator,
options,
listener: Default::default(),
api,
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: PoolRotator::new(ban_time),
}
}
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
self.rotator.ban(now, hashes)
}
pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
self.rotator.is_banned(hash)
}
pub fn check_is_known(
&self,
tx_hash: &ExtrinsicHash<B>,
ignore_banned: bool,
) -> Result<(), B::Error> {
if !ignore_banned && self.is_banned(tx_hash) {
Err(error::Error::TemporarilyBanned.into())
} else if self.pool.read().is_imported(tx_hash) {
Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
} else {
Ok(())
}
}
pub fn submit(
&self,
txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let results = txs
.into_iter()
.map(|validated_tx| self.submit_one(validated_tx))
.collect::<Vec<_>>();
let removed = if results.iter().any(|res| res.is_ok()) {
self.enforce_limits()
} else {
Default::default()
};
results
.into_iter()
.map(|res| match res {
Ok(ref hash) if removed.contains(hash) =>
Err(error::Error::ImmediatelyDropped.into()),
other => other,
})
.collect()
}
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
if !tx.propagate && !(self.is_validator.0)() {
return Err(error::Error::Unactionable.into())
}
let imported = self.pool.write().import(tx)?;
if let base::Imported::Ready { ref hash, .. } = imported {
let sinks = &mut self.import_notification_sinks.lock();
sinks.retain_mut(|sink| match sink.try_send(*hash) {
Ok(()) => true,
Err(e) =>
if e.is_full() {
log::warn!(
target: LOG_TARGET,
"[{:?}] Trying to notify an import but the channel is full",
hash,
);
true
} else {
false
},
});
}
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(*imported.hash())
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err)
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash);
Err(err)
},
}
}
fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
let status = self.pool.read().status();
let ready_limit = &self.options.ready;
let future_limit = &self.options.future;
log::debug!(target: LOG_TARGET, "Pool Status: {:?}", status);
if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
future_limit.is_exceeded(status.future, status.future_bytes)
{
log::debug!(
target: LOG_TARGET,
"Enforcing limits ({}/{}kB ready, {}/{}kB future",
ready_limit.count,
ready_limit.total_bytes / 1024,
future_limit.count,
future_limit.total_bytes / 1024,
);
let removed = {
let mut pool = self.pool.write();
let removed = pool
.enforce_limits(ready_limit, future_limit)
.into_iter()
.map(|x| x.hash)
.collect::<HashSet<_>>();
self.rotator.ban(&Instant::now(), removed.iter().copied());
removed
};
if !removed.is_empty() {
log::debug!(target: LOG_TARGET, "Enforcing limits: {} dropped", removed.len());
}
let mut listener = self.listener.write();
for h in &removed {
listener.dropped(h, None);
}
removed
} else {
Default::default()
}
}
pub fn submit_and_watch(
&self,
tx: ValidatedTransactionFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let hash = self.api.hash_and_length(&tx.data).0;
let watcher = self.listener.write().create_watcher(hash);
self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
.pop()
.expect("One extrinsic passed; one result returned; qed")
.map(|_| watcher)
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err)
},
ValidatedTransaction::Unknown(_, err) => Err(err),
}
}
pub fn resubmit(
&self,
mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
) {
#[derive(Debug, Clone, Copy, PartialEq)]
enum Status {
Future,
Ready,
Failed,
Dropped,
}
let (mut initial_statuses, final_statuses) = {
let mut pool = self.pool.write();
let mut initial_statuses = HashMap::new();
let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
while !updated_transactions.is_empty() {
let hash = updated_transactions
.keys()
.next()
.cloned()
.expect("transactions is not empty; qed");
let removed = pool.remove_subtree(&[hash]);
for removed_tx in removed {
let removed_hash = removed_tx.hash;
let updated_transaction = updated_transactions.remove(&removed_hash);
let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
updated_tx
} else {
let transaction = match Arc::try_unwrap(removed_tx) {
Ok(transaction) => transaction,
Err(transaction) => transaction.duplicate(),
};
ValidatedTransaction::Valid(transaction)
};
initial_statuses.insert(removed_hash, Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
updated_transactions.remove(&hash);
}
pool.with_futures_enabled(|pool, reject_future_transactions| {
let mut final_statuses = HashMap::new();
for (hash, tx_to_resubmit) in txs_to_resubmit {
match tx_to_resubmit {
ValidatedTransaction::Valid(tx) => match pool.import(tx) {
Ok(imported) => match imported {
base::Imported::Ready { promoted, failed, removed, .. } => {
final_statuses.insert(hash, Status::Ready);
for hash in promoted {
final_statuses.insert(hash, Status::Ready);
}
for hash in failed {
final_statuses.insert(hash, Status::Failed);
}
for tx in removed {
final_statuses.insert(tx.hash, Status::Dropped);
}
},
base::Imported::Future { .. } => {
final_statuses.insert(hash, Status::Future);
},
},
Err(err) => {
log::warn!(
target: LOG_TARGET,
"[{:?}] Removing invalid transaction from update: {}",
hash,
err,
);
final_statuses.insert(hash, Status::Failed);
},
},
ValidatedTransaction::Invalid(_, _) |
ValidatedTransaction::Unknown(_, _) => {
final_statuses.insert(hash, Status::Failed);
},
}
}
if reject_future_transactions {
for future_tx in pool.clear_future() {
final_statuses.insert(future_tx.hash, Status::Dropped);
}
}
(initial_statuses, final_statuses)
})
};
let mut listener = self.listener.write();
for (hash, final_status) in final_statuses {
let initial_status = initial_statuses.remove(&hash);
if initial_status.is_none() || Some(final_status) != initial_status {
match final_status {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None),
Status::Failed => listener.invalid(&hash),
}
}
}
}
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool
.read()
.by_hashes(hashes)
.into_iter()
.map(|existing_in_pool| {
existing_in_pool.map(|transaction| transaction.provides.to_vec())
})
.collect()
}
pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
self.pool.read().ready_by_hash(hash)
}
pub fn prune_tags(
&self,
tags: impl IntoIterator<Item = Tag>,
) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
let status = self.pool.write().prune_tags(tags);
{
let mut listener = self.listener.write();
for promoted in &status.promoted {
fire_events(&mut *listener, promoted);
}
for f in &status.failed {
listener.dropped(f, None);
}
}
Ok(status)
}
pub fn resubmit_pruned(
&self,
at: &BlockId<B::Block>,
known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
pruned_hashes: Vec<ExtrinsicHash<B>>,
pruned_xts: Vec<ValidatedTransactionFor<B>>,
) -> Result<(), B::Error> {
debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
let results = self.submit(pruned_xts);
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
_ => None,
}
});
let hashes = hashes.chain(known_imported_hashes.into_iter());
self.fire_pruned(at, hashes)?;
self.clear_stale(at)?;
Ok(())
}
pub fn fire_pruned(
&self,
at: &BlockId<B::Block>,
hashes: impl Iterator<Item = ExtrinsicHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self
.api
.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?;
let mut listener = self.listener.write();
let mut set = HashSet::with_capacity(hashes.size_hint().0);
for h in hashes {
if !set.contains(&h) {
listener.pruned(header_hash, &h);
set.insert(h);
}
}
Ok(())
}
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self
.api
.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?
.saturated_into::<u64>();
let now = Instant::now();
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, tx))
.map(|tx| tx.hash)
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExtrinsicHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
for tx in p.futures() {
if self.rotator.ban_if_stale(&now, block_number, tx) {
hashes.push(tx.hash);
}
}
hashes
};
self.remove_invalid(&to_remove);
self.remove_invalid(&futures_to_remove);
self.rotator.clear_timeouts(&now);
Ok(())
}
pub fn api(&self) -> &B {
&self.api
}
pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.import_notification_sinks.lock().push(sink);
stream
}
pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
}
}
pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
if hashes.is_empty() {
return vec![]
}
log::debug!(target: LOG_TARGET, "Removing invalid transactions: {:?}", hashes);
self.rotator.ban(&Instant::now(), hashes.iter().cloned());
let invalid = self.pool.write().remove_subtree(hashes);
log::debug!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid);
let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash);
}
invalid
}
pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
self.pool.read().ready()
}
pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
}
pub fn status(&self) -> PoolStatus {
self.pool.read().status()
}
pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
log::trace!(
target: LOG_TARGET,
"Attempting to notify watchers of finalization for {}",
block_hash,
);
self.listener.write().finalized(block_hash);
Ok(())
}
pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
self.listener.write().retracted(block_hash)
}
}
fn fire_events<H, B, Ex>(listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>)
where
H: hash::Hash + Eq + traits::Member + Serialize,
B: ChainApi,
{
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
failed.iter().for_each(|f| listener.invalid(f));
removed.iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
promoted.iter().for_each(|p| listener.ready(p, None));
},
base::Imported::Future { ref hash } => listener.future(hash),
}
}