use std::{collections::HashMap, fmt::Debug, hash};
use linked_hash_map::LinkedHashMap;
use log::trace;
use sc_transaction_pool_api::TransactionStatus;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use serde::Serialize;
use sp_runtime::traits;
use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
static LOG_TARGET: &str = "txpool::watcher";
pub type DroppedByLimitsEvent<H, BH> = (H, TransactionStatus<H, BH>);
pub type DroppedByLimitsStream<H, BH> = TracingUnboundedReceiver<DroppedByLimitsEvent<H, BH>>;
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
dropped_by_limits_sink: Option<TracingUnboundedSender<DroppedByLimitsEvent<H, BlockHash<C>>>>,
}
const MAX_FINALITY_WATCHERS: usize = 512;
impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
fn default() -> Self {
Self {
watchers: Default::default(),
finality_watchers: Default::default(),
dropped_by_limits_sink: None,
}
}
}
impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H, C> {
fn fire<F>(&mut self, hash: &H, fun: F)
where
F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>),
{
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
} else {
false
};
if clean {
self.watchers.remove(hash);
}
}
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
sender.new_watcher(hash)
}
pub fn create_dropped_by_limits_stream(&mut self) -> DroppedByLimitsStream<H, BlockHash<C>> {
let (sender, single_stream) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
self.dropped_by_limits_sink = Some(sender);
single_stream
}
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash);
self.fire(hash, |watcher| watcher.broadcast(peers));
}
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old);
self.fire(tx, |watcher| watcher.ready());
if let Some(old) = old {
self.fire(old, |watcher| watcher.usurped(tx.clone()));
}
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Ready)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink/ready: send message failed: {:?}", tx, e);
}
}
}
pub fn future(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Future", tx);
self.fire(tx, |watcher| watcher.future());
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Future)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}
pub fn limit_enforced(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped (limit enforced)", tx);
self.fire(tx, |watcher| watcher.limit_enforced());
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Dropped)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}
pub fn usurped(&mut self, tx: &H, by: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by);
self.fire(tx, |watcher| watcher.usurped(by.clone()));
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) =
sink.unbounded_send((tx.clone(), TransactionStatus::Usurped(by.clone())))
{
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}
pub fn dropped(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped", tx);
self.fire(tx, |watcher| watcher.dropped());
}
pub fn invalid(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
self.fire(tx, |watcher| watcher.invalid());
}
pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash);
let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
txs.push(tx.clone());
let tx_index = txs.len() - 1;
self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index));
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
for tx in txs {
self.fire(&tx, |watcher| watcher.finality_timeout(hash));
}
}
}
}
pub fn retracted(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for hash in hashes {
self.fire(&hash, |watcher| watcher.retracted(block_hash))
}
}
}
pub fn finalized(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for (tx_index, hash) in hashes.into_iter().enumerate() {
log::trace!(
target: LOG_TARGET,
"[{:?}] Sent finalization event (block {:?})",
hash,
block_hash,
);
self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index))
}
}
}
pub fn watched_transactions(&self) -> impl Iterator<Item = &H> {
self.watchers.keys()
}
}