use crate::{
common::log_xt::log_xt_trace,
fork_aware_txpool::stream_map_util::next_event,
graph::{self, BlockHash, ExtrinsicHash},
LOG_TARGET,
};
use futures::stream::StreamExt;
use log::{debug, trace};
use sc_transaction_pool_api::TransactionStatus;
use sc_utils::mpsc;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap, HashSet,
},
fmt::{self, Debug, Formatter},
pin::Pin,
};
use tokio_stream::StreamMap;
#[derive(Debug, PartialEq)]
pub struct DroppedTransaction<Hash> {
pub tx_hash: Hash,
pub reason: DroppedReason<Hash>,
}
impl<Hash> DroppedTransaction<Hash> {
fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
Self { reason: DroppedReason::Usurped(by), tx_hash }
}
fn new_enforced_by_limts(tx_hash: Hash) -> Self {
Self { reason: DroppedReason::LimitsEnforced, tx_hash }
}
}
#[derive(Debug, PartialEq)]
pub enum DroppedReason<Hash> {
Usurped(Hash),
LimitsEnforced,
}
pub type ViewStreamEvent<C> = crate::graph::DroppedByLimitsEvent<ExtrinsicHash<C>, BlockHash<C>>;
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
pub(crate) type StreamOfDropped<C> =
Pin<Box<dyn futures::Stream<Item = DroppedTransaction<ExtrinsicHash<C>>> + Send>>;
type Controller<T> = mpsc::TracingUnboundedSender<T>;
type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
enum Command<ChainApi>
where
ChainApi: graph::ChainApi,
{
AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
RemoveView(BlockHash<ChainApi>),
RemoveFinalizedTxs(Vec<ExtrinsicHash<ChainApi>>),
}
impl<ChainApi> Debug for Command<ChainApi>
where
ChainApi: graph::ChainApi,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Command::AddView(..) => write!(f, "AddView"),
Command::RemoveView(..) => write!(f, "RemoveView"),
Command::RemoveFinalizedTxs(..) => write!(f, "RemoveFinalizedTxs"),
}
}
}
struct MultiViewDropWatcherContext<ChainApi>
where
ChainApi: graph::ChainApi,
{
stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
command_receiver: CommandReceiver<Command<ChainApi>>,
ready_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
future_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
pending_dropped_transactions: Vec<ExtrinsicHash<ChainApi>>,
}
impl<C> MultiViewDropWatcherContext<C>
where
C: graph::ChainApi + 'static,
<<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
{
fn transaction_views(
&mut self,
tx_hash: ExtrinsicHash<C>,
) -> Option<OccupiedEntry<ExtrinsicHash<C>, HashSet<BlockHash<C>>>> {
if let Entry::Occupied(views_keeping_tx_valid) = self.ready_transaction_views.entry(tx_hash)
{
return Some(views_keeping_tx_valid)
}
if let Entry::Occupied(views_keeping_tx_valid) =
self.future_transaction_views.entry(tx_hash)
{
return Some(views_keeping_tx_valid)
}
None
}
fn handle_command(&mut self, cmd: Command<C>) {
match cmd {
Command::AddView(key, stream) => {
trace!(
target: LOG_TARGET,
"dropped_watcher: Command::AddView {key:?} views:{:?}",
self.stream_map.keys().collect::<Vec<_>>()
);
self.stream_map.insert(key, stream);
},
Command::RemoveView(key) => {
trace!(
target: LOG_TARGET,
"dropped_watcher: Command::RemoveView {key:?} views:{:?}",
self.stream_map.keys().collect::<Vec<_>>()
);
self.stream_map.remove(&key);
self.ready_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
trace!(
target: LOG_TARGET,
"[{:?}] dropped_watcher: Command::RemoveView ready views: {:?}",
tx_hash,
views
);
views.remove(&key);
});
self.future_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
trace!(
target: LOG_TARGET,
"[{:?}] dropped_watcher: Command::RemoveView future views: {:?}",
tx_hash,
views
);
views.remove(&key);
if views.is_empty() {
self.pending_dropped_transactions.push(*tx_hash);
}
});
},
Command::RemoveFinalizedTxs(xts) => {
log_xt_trace!(
target: LOG_TARGET,
xts.clone(),
"[{:?}] dropped_watcher: finalized xt removed"
);
xts.iter().for_each(|xt| {
self.ready_transaction_views.remove(xt);
self.future_transaction_views.remove(xt);
});
},
}
}
fn handle_event(
&mut self,
block_hash: BlockHash<C>,
event: ViewStreamEvent<C>,
) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
trace!(
target: LOG_TARGET,
"dropped_watcher: handle_event: event:{event:?} from:{block_hash:?} future_views:{:?} ready_views:{:?} stream_map views:{:?}, ",
self.future_transaction_views.get(&event.0),
self.ready_transaction_views.get(&event.0),
self.stream_map.keys().collect::<Vec<_>>(),
);
let (tx_hash, status) = event;
match status {
TransactionStatus::Future => {
self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
},
TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
views.insert(block_hash);
self.ready_transaction_views.insert(tx_hash, views);
} else {
self.ready_transaction_views.entry(tx_hash).or_default().insert(block_hash);
}
},
TransactionStatus::Dropped => {
if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
views_keeping_tx_valid.get_mut().remove(&block_hash);
if views_keeping_tx_valid.get().is_empty() {
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
}
} else {
debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash);
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
}
},
TransactionStatus::Usurped(by) =>
return Some(DroppedTransaction::new_usurped(tx_hash, by)),
_ => {},
};
None
}
fn get_pending_dropped_transaction(&mut self) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
while let Some(tx_hash) = self.pending_dropped_transactions.pop() {
if self.ready_transaction_views.get(&tx_hash).is_some() {
continue
}
if let Some(views) = self.future_transaction_views.get(&tx_hash) {
if views.is_empty() {
self.future_transaction_views.remove(&tx_hash);
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
}
}
}
None
}
fn event_stream() -> (StreamOfDropped<C>, Controller<Command<C>>) {
const CHANNEL_SIZE: usize = 64;
let (sender, command_receiver) = sc_utils::mpsc::tracing_unbounded::<Command<C>>(
"tx-pool-dropped-watcher-cmd-stream",
CHANNEL_SIZE,
);
let ctx = Self {
stream_map: StreamMap::new(),
command_receiver,
ready_transaction_views: Default::default(),
future_transaction_views: Default::default(),
pending_dropped_transactions: Default::default(),
};
let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
loop {
if let Some(dropped) = ctx.get_pending_dropped_transaction() {
debug!("dropped_watcher: sending out (pending): {dropped:?}");
return Some((dropped, ctx));
}
tokio::select! {
biased;
Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
},
cmd = ctx.command_receiver.next() => {
ctx.handle_command(cmd?);
}
}
}
})
.boxed();
(stream_map, sender)
}
}
pub struct MultiViewDroppedWatcherController<ChainApi: graph::ChainApi> {
controller: Controller<Command<ChainApi>>,
}
impl<ChainApi: graph::ChainApi> Clone for MultiViewDroppedWatcherController<ChainApi> {
fn clone(&self) -> Self {
Self { controller: self.controller.clone() }
}
}
impl<ChainApi> MultiViewDroppedWatcherController<ChainApi>
where
ChainApi: graph::ChainApi + 'static,
<<ChainApi as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
{
pub fn new() -> (MultiViewDroppedWatcherController<ChainApi>, StreamOfDropped<ChainApi>) {
let (stream_map, ctrl) = MultiViewDropWatcherContext::<ChainApi>::event_stream();
(Self { controller: ctrl }, stream_map.boxed())
}
pub fn add_view(&self, key: BlockHash<ChainApi>, view: ViewStream<ChainApi>) {
let _ = self.controller.unbounded_send(Command::AddView(key, view)).map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: add_view {key:?} send message failed: {e}");
});
}
pub fn remove_view(&self, key: BlockHash<ChainApi>) {
let _ = self.controller.unbounded_send(Command::RemoveView(key)).map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: remove_view {key:?} send message failed: {e}");
});
}
pub fn remove_finalized_txs(
&self,
xts: impl IntoIterator<Item = ExtrinsicHash<ChainApi>> + Clone,
) {
let _ = self
.controller
.unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect()))
.map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: remove_finalized_txs send message failed: {e}");
});
}
}
#[cfg(test)]
mod dropped_watcher_tests {
use super::*;
use crate::common::tests::TestApi;
use futures::{stream::pending, FutureExt, StreamExt};
use sp_core::H256;
type MultiViewDroppedWatcher = super::MultiViewDroppedWatcherController<TestApi>;
#[tokio::test]
async fn test01() {
sp_tracing::try_init_simple();
let (watcher, output_stream) = MultiViewDroppedWatcher::new();
let block_hash = H256::repeat_byte(0x01);
let tx_hash = H256::repeat_byte(0x0a);
let view_stream = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash, view_stream);
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
}
#[tokio::test]
async fn test02() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0a);
let view_stream0 = futures::stream::iter(vec![(tx_hash, TransactionStatus::Future)])
.chain(pending())
.boxed();
let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
assert!(output_stream.next().now_or_never().is_none());
watcher.add_view(block_hash1, view_stream1);
assert!(output_stream.next().now_or_never().is_none());
}
#[tokio::test]
async fn test03() {
sp_tracing::try_init_simple();
let (watcher, output_stream) = MultiViewDroppedWatcher::new();
let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash0 = H256::repeat_byte(0x0a);
let tx_hash1 = H256::repeat_byte(0x0b);
let view_stream0 = futures::stream::iter(vec![(tx_hash0, TransactionStatus::Future)])
.chain(pending())
.boxed();
let view_stream1 = futures::stream::iter(vec![
(tx_hash1, TransactionStatus::Ready),
(tx_hash1, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
watcher.add_view(block_hash1, view_stream1);
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(
handle.await.unwrap(),
vec![DroppedTransaction::new_enforced_by_limts(tx_hash1)]
);
}
#[tokio::test]
async fn test04() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);
let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
assert!(output_stream.next().now_or_never().is_none());
watcher.remove_view(block_hash0);
watcher.add_view(block_hash1, view_stream1);
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
}
#[tokio::test]
async fn test05() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
assert!(output_stream.next().now_or_never().is_none());
let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);
let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
assert!(output_stream.next().now_or_never().is_none());
let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::InBlock((block_hash0, 0))),
])
.boxed();
watcher.add_view(block_hash1, view_stream1);
assert!(output_stream.next().now_or_never().is_none());
assert!(output_stream.next().now_or_never().is_none());
assert!(output_stream.next().now_or_never().is_none());
assert!(output_stream.next().now_or_never().is_none());
assert!(output_stream.next().now_or_never().is_none());
let tx_hash = H256::repeat_byte(0x0c);
let view_stream2 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
let block_hash2 = H256::repeat_byte(0x03);
watcher.add_view(block_hash2, view_stream2);
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
}
}