1use crate::{
25 common::tracing_log_xt::log_xt_trace,
26 fork_aware_txpool::stream_map_util::next_event,
27 graph::{self, BlockHash, ExtrinsicHash},
28 LOG_TARGET,
29};
30use futures::stream::StreamExt;
31use sc_transaction_pool_api::TransactionStatus;
32use sc_utils::mpsc;
33use sp_runtime::traits::Block as BlockT;
34use std::{
35 collections::{
36 hash_map::{Entry, OccupiedEntry},
37 HashMap, HashSet,
38 },
39 fmt::{self, Debug, Formatter},
40 pin::Pin,
41};
42use tokio_stream::StreamMap;
43use tracing::{debug, trace};
44
45#[derive(Debug, PartialEq)]
48pub struct DroppedTransaction<Hash> {
49 pub tx_hash: Hash,
51 pub reason: DroppedReason<Hash>,
53}
54
55impl<Hash> DroppedTransaction<Hash> {
56 pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
58 Self { reason: DroppedReason::Usurped(by), tx_hash }
59 }
60
61 pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
63 Self { reason: DroppedReason::LimitsEnforced, tx_hash }
64 }
65
66 pub fn new_invalid(tx_hash: Hash) -> Self {
68 Self { reason: DroppedReason::Invalid, tx_hash }
69 }
70}
71
72#[derive(Debug, PartialEq)]
74pub enum DroppedReason<Hash> {
75 Usurped(Hash),
77 LimitsEnforced,
79 Invalid,
81}
82
83pub type ViewStreamEvent<C> =
85 crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
86
87type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
89
90pub(crate) type StreamOfDropped<C> =
93 Pin<Box<dyn futures::Stream<Item = DroppedTransaction<ExtrinsicHash<C>>> + Send>>;
94
95type Controller<T> = mpsc::TracingUnboundedSender<T>;
99
100type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
103
104enum Command<ChainApi>
106where
107 ChainApi: graph::ChainApi,
108{
109 AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
112 RemoveView(BlockHash<ChainApi>),
114 RemoveTransactions(Vec<ExtrinsicHash<ChainApi>>),
118}
119
120impl<ChainApi> Debug for Command<ChainApi>
121where
122 ChainApi: graph::ChainApi,
123{
124 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
125 match self {
126 Command::AddView(..) => write!(f, "AddView"),
127 Command::RemoveView(..) => write!(f, "RemoveView"),
128 Command::RemoveTransactions(..) => write!(f, "RemoveTransactions"),
129 }
130 }
131}
132
133struct MultiViewDropWatcherContext<ChainApi>
139where
140 ChainApi: graph::ChainApi,
141{
142 stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
146 command_receiver: CommandReceiver<Command<ChainApi>>,
149 ready_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
157 future_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
164
165 pending_dropped_transactions: Vec<ExtrinsicHash<ChainApi>>,
167}
168
169impl<C> MultiViewDropWatcherContext<C>
170where
171 C: graph::ChainApi + 'static,
172 <<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
173{
174 fn transaction_views(
176 &mut self,
177 tx_hash: ExtrinsicHash<C>,
178 ) -> Option<OccupiedEntry<ExtrinsicHash<C>, HashSet<BlockHash<C>>>> {
179 if let Entry::Occupied(views_keeping_tx_valid) = self.ready_transaction_views.entry(tx_hash)
180 {
181 return Some(views_keeping_tx_valid)
182 }
183 if let Entry::Occupied(views_keeping_tx_valid) =
184 self.future_transaction_views.entry(tx_hash)
185 {
186 return Some(views_keeping_tx_valid)
187 }
188 None
189 }
190
191 fn handle_command(&mut self, cmd: Command<C>) {
193 match cmd {
194 Command::AddView(key, stream) => {
195 trace!(
196 target: LOG_TARGET,
197 "dropped_watcher: Command::AddView {key:?} views:{:?}",
198 self.stream_map.keys().collect::<Vec<_>>()
199 );
200 self.stream_map.insert(key, stream);
201 },
202 Command::RemoveView(key) => {
203 trace!(
204 target: LOG_TARGET,
205 "dropped_watcher: Command::RemoveView {key:?} views:{:?}",
206 self.stream_map.keys().collect::<Vec<_>>()
207 );
208 self.stream_map.remove(&key);
209 self.ready_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
210 trace!(
211 target: LOG_TARGET,
212 "[{:?}] dropped_watcher: Command::RemoveView ready views: {:?}",
213 tx_hash,
214 views
215 );
216 views.remove(&key);
217 });
218
219 self.future_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
220 trace!(
221 target: LOG_TARGET,
222 "[{:?}] dropped_watcher: Command::RemoveView future views: {:?}",
223 tx_hash,
224 views
225 );
226 views.remove(&key);
227 if views.is_empty() {
228 self.pending_dropped_transactions.push(*tx_hash);
229 }
230 });
231 },
232 Command::RemoveTransactions(xts) => {
233 log_xt_trace!(
234 target: LOG_TARGET,
235 xts.clone(),
236 "dropped_watcher: finalized xt removed"
237 );
238 xts.iter().for_each(|xt| {
239 self.ready_transaction_views.remove(xt);
240 self.future_transaction_views.remove(xt);
241 });
242 },
243 }
244 }
245
246 fn handle_event(
252 &mut self,
253 block_hash: BlockHash<C>,
254 event: ViewStreamEvent<C>,
255 ) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
256 trace!(
257 target: LOG_TARGET,
258 "dropped_watcher: handle_event: event:{event:?} from:{block_hash:?} future_views:{:?} ready_views:{:?} stream_map views:{:?}, ",
259 self.future_transaction_views.get(&event.0),
260 self.ready_transaction_views.get(&event.0),
261 self.stream_map.keys().collect::<Vec<_>>(),
262 );
263 let (tx_hash, status) = event;
264 match status {
265 TransactionStatus::Future => {
266 if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
268 views_keeping_tx_valid.get_mut().insert(block_hash);
269 } else {
270 self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
271 }
272 },
273 TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
274 if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
282 views.insert(block_hash);
283 self.ready_transaction_views.insert(tx_hash, views);
284 } else {
285 self.ready_transaction_views.entry(tx_hash).or_default().insert(block_hash);
286 }
287 },
288 TransactionStatus::Dropped => {
289 if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
290 views_keeping_tx_valid.get_mut().remove(&block_hash);
291 if views_keeping_tx_valid.get().is_empty() {
292 return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
293 }
294 } else {
295 debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked dropped) tx");
296 return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
297 }
298 },
299 TransactionStatus::Usurped(by) =>
300 return Some(DroppedTransaction::new_usurped(tx_hash, by)),
301 TransactionStatus::Invalid => {
302 if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
303 views_keeping_tx_valid.get_mut().remove(&block_hash);
304 if views_keeping_tx_valid.get().is_empty() {
305 return Some(DroppedTransaction::new_invalid(tx_hash))
306 }
307 } else {
308 debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked invalid) tx");
309 return Some(DroppedTransaction::new_invalid(tx_hash))
310 }
311 },
312 _ => {},
313 };
314 None
315 }
316
317 fn get_pending_dropped_transaction(&mut self) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
319 while let Some(tx_hash) = self.pending_dropped_transactions.pop() {
320 if self.ready_transaction_views.get(&tx_hash).is_some() {
323 continue
324 }
325
326 if let Some(views) = self.future_transaction_views.get(&tx_hash) {
327 if views.is_empty() {
328 self.future_transaction_views.remove(&tx_hash);
329 return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
330 }
331 }
332 }
333 None
334 }
335
336 fn event_stream() -> (StreamOfDropped<C>, Controller<Command<C>>) {
342 const CHANNEL_SIZE: usize = 64;
344 let (sender, command_receiver) = sc_utils::mpsc::tracing_unbounded::<Command<C>>(
345 "tx-pool-dropped-watcher-cmd-stream",
346 CHANNEL_SIZE,
347 );
348
349 let ctx = Self {
350 stream_map: StreamMap::new(),
351 command_receiver,
352 ready_transaction_views: Default::default(),
353 future_transaction_views: Default::default(),
354 pending_dropped_transactions: Default::default(),
355 };
356
357 let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
358 loop {
359 if let Some(dropped) = ctx.get_pending_dropped_transaction() {
360 trace!("dropped_watcher: sending out (pending): {dropped:?}");
361 return Some((dropped, ctx));
362 }
363 tokio::select! {
364 biased;
365 Some(event) = next_event(&mut ctx.stream_map) => {
366 if let Some(dropped) = ctx.handle_event(event.0, event.1) {
367 trace!("dropped_watcher: sending out: {dropped:?}");
368 return Some((dropped, ctx));
369 }
370 },
371 cmd = ctx.command_receiver.next() => {
372 ctx.handle_command(cmd?);
373 }
374
375 }
376 }
377 })
378 .boxed();
379
380 (stream_map, sender)
381 }
382}
383
384pub struct MultiViewDroppedWatcherController<ChainApi: graph::ChainApi> {
389 controller: Controller<Command<ChainApi>>,
391}
392
393impl<ChainApi: graph::ChainApi> Clone for MultiViewDroppedWatcherController<ChainApi> {
394 fn clone(&self) -> Self {
395 Self { controller: self.controller.clone() }
396 }
397}
398
399impl<ChainApi> MultiViewDroppedWatcherController<ChainApi>
400where
401 ChainApi: graph::ChainApi + 'static,
402 <<ChainApi as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
403{
404 pub fn new() -> (MultiViewDroppedWatcherController<ChainApi>, StreamOfDropped<ChainApi>) {
406 let (stream_map, ctrl) = MultiViewDropWatcherContext::<ChainApi>::event_stream();
407 (Self { controller: ctrl }, stream_map.boxed())
408 }
409
410 pub fn add_view(&self, key: BlockHash<ChainApi>, view: ViewStream<ChainApi>) {
412 let _ = self.controller.unbounded_send(Command::AddView(key, view)).map_err(|e| {
413 trace!(target: LOG_TARGET, "dropped_watcher: add_view {key:?} send message failed: {e}");
414 });
415 }
416
417 pub fn remove_view(&self, key: BlockHash<ChainApi>) {
420 let _ = self.controller.unbounded_send(Command::RemoveView(key)).map_err(|e| {
421 trace!(target: LOG_TARGET, "dropped_watcher: remove_view {key:?} send message failed: {e}");
422 });
423 }
424
425 pub fn remove_transactions(
427 &self,
428 xts: impl IntoIterator<Item = ExtrinsicHash<ChainApi>> + Clone,
429 ) {
430 let _ = self
431 .controller
432 .unbounded_send(Command::RemoveTransactions(xts.into_iter().collect()))
433 .map_err(|e| {
434 trace!(target: LOG_TARGET, "dropped_watcher: remove_transactions send message failed: {e}");
435 });
436 }
437}
438
439#[cfg(test)]
440mod dropped_watcher_tests {
441 use super::*;
442 use crate::common::tests::TestApi;
443 use futures::{stream::pending, FutureExt, StreamExt};
444 use sp_core::H256;
445
446 type MultiViewDroppedWatcher = super::MultiViewDroppedWatcherController<TestApi>;
447
448 #[tokio::test]
449 async fn test01() {
450 sp_tracing::try_init_simple();
451 let (watcher, output_stream) = MultiViewDroppedWatcher::new();
452
453 let block_hash = H256::repeat_byte(0x01);
454 let tx_hash = H256::repeat_byte(0x0a);
455
456 let view_stream = futures::stream::iter(vec![
457 (tx_hash, TransactionStatus::Ready),
458 (tx_hash, TransactionStatus::Dropped),
459 ])
460 .boxed();
461
462 watcher.add_view(block_hash, view_stream);
463 let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
464 assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
465 }
466
467 #[tokio::test]
468 async fn test02() {
469 sp_tracing::try_init_simple();
470 let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
471
472 let block_hash0 = H256::repeat_byte(0x01);
473 let block_hash1 = H256::repeat_byte(0x02);
474 let tx_hash = H256::repeat_byte(0x0a);
475
476 let view_stream0 = futures::stream::iter(vec![(tx_hash, TransactionStatus::Future)])
477 .chain(pending())
478 .boxed();
479 let view_stream1 = futures::stream::iter(vec![
480 (tx_hash, TransactionStatus::Ready),
481 (tx_hash, TransactionStatus::Dropped),
482 ])
483 .boxed();
484
485 watcher.add_view(block_hash0, view_stream0);
486
487 assert!(output_stream.next().now_or_never().is_none());
488 watcher.add_view(block_hash1, view_stream1);
489 assert!(output_stream.next().now_or_never().is_none());
490 }
491
492 #[tokio::test]
493 async fn test03() {
494 sp_tracing::try_init_simple();
495 let (watcher, output_stream) = MultiViewDroppedWatcher::new();
496
497 let block_hash0 = H256::repeat_byte(0x01);
498 let block_hash1 = H256::repeat_byte(0x02);
499 let tx_hash0 = H256::repeat_byte(0x0a);
500 let tx_hash1 = H256::repeat_byte(0x0b);
501
502 let view_stream0 = futures::stream::iter(vec![(tx_hash0, TransactionStatus::Future)])
503 .chain(pending())
504 .boxed();
505 let view_stream1 = futures::stream::iter(vec![
506 (tx_hash1, TransactionStatus::Ready),
507 (tx_hash1, TransactionStatus::Dropped),
508 ])
509 .boxed();
510
511 watcher.add_view(block_hash0, view_stream0);
512 watcher.add_view(block_hash1, view_stream1);
513 let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
514 assert_eq!(
515 handle.await.unwrap(),
516 vec![DroppedTransaction::new_enforced_by_limts(tx_hash1)]
517 );
518 }
519
520 #[tokio::test]
521 async fn test04() {
522 sp_tracing::try_init_simple();
523 let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
524
525 let block_hash0 = H256::repeat_byte(0x01);
526 let block_hash1 = H256::repeat_byte(0x02);
527 let tx_hash = H256::repeat_byte(0x0b);
528
529 let view_stream0 = futures::stream::iter(vec![
530 (tx_hash, TransactionStatus::Future),
531 (tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
532 ])
533 .boxed();
534 let view_stream1 = futures::stream::iter(vec![
535 (tx_hash, TransactionStatus::Ready),
536 (tx_hash, TransactionStatus::Dropped),
537 ])
538 .boxed();
539
540 watcher.add_view(block_hash0, view_stream0);
541 assert!(output_stream.next().now_or_never().is_none());
542 watcher.remove_view(block_hash0);
543
544 watcher.add_view(block_hash1, view_stream1);
545 let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
546 assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
547 }
548
549 #[tokio::test]
550 async fn test05() {
551 sp_tracing::try_init_simple();
552 let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
553 assert!(output_stream.next().now_or_never().is_none());
554
555 let block_hash0 = H256::repeat_byte(0x01);
556 let block_hash1 = H256::repeat_byte(0x02);
557 let tx_hash = H256::repeat_byte(0x0b);
558
559 let view_stream0 = futures::stream::iter(vec![
560 (tx_hash, TransactionStatus::Future),
561 (tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
562 ])
563 .boxed();
564 watcher.add_view(block_hash0, view_stream0);
565 assert!(output_stream.next().now_or_never().is_none());
566
567 let view_stream1 = futures::stream::iter(vec![
568 (tx_hash, TransactionStatus::Ready),
569 (tx_hash, TransactionStatus::InBlock((block_hash0, 0))),
570 ])
571 .boxed();
572
573 watcher.add_view(block_hash1, view_stream1);
574 assert!(output_stream.next().now_or_never().is_none());
575 assert!(output_stream.next().now_or_never().is_none());
576 assert!(output_stream.next().now_or_never().is_none());
577 assert!(output_stream.next().now_or_never().is_none());
578 assert!(output_stream.next().now_or_never().is_none());
579
580 let tx_hash = H256::repeat_byte(0x0c);
581 let view_stream2 = futures::stream::iter(vec![
582 (tx_hash, TransactionStatus::Future),
583 (tx_hash, TransactionStatus::Dropped),
584 ])
585 .boxed();
586 let block_hash2 = H256::repeat_byte(0x03);
587 watcher.add_view(block_hash2, view_stream2);
588 let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
589 assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
590 }
591}