referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
dropped_watcher.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Multi-view pool dropped events listener provides means to combine streams from multiple pool
20//! views into a single event stream. It allows management of dropped transaction events, adding new
21//! views, and removing views as needed, ensuring that transactions which are no longer referenced
22//! by any view are detected and properly notified.
23
24use crate::{
25	common::tracing_log_xt::log_xt_trace,
26	fork_aware_txpool::stream_map_util::next_event,
27	graph::{self, BlockHash, ExtrinsicHash},
28	LOG_TARGET,
29};
30use futures::stream::StreamExt;
31use sc_transaction_pool_api::TransactionStatus;
32use sc_utils::mpsc;
33use sp_runtime::traits::Block as BlockT;
34use std::{
35	collections::{
36		hash_map::{Entry, OccupiedEntry},
37		HashMap, HashSet,
38	},
39	fmt::{self, Debug, Formatter},
40	pin::Pin,
41};
42use tokio_stream::StreamMap;
43use tracing::{debug, trace};
44
45/// Represents a transaction that was removed from the transaction pool, including the reason of its
46/// removal.
47#[derive(Debug, PartialEq)]
48pub struct DroppedTransaction<Hash> {
49	/// Hash of the dropped extrinsic.
50	pub tx_hash: Hash,
51	/// Reason of the transaction being dropped.
52	pub reason: DroppedReason<Hash>,
53}
54
55impl<Hash> DroppedTransaction<Hash> {
56	/// Creates a new instance with reason set to `DroppedReason::Usurped(by)`.
57	pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
58		Self { reason: DroppedReason::Usurped(by), tx_hash }
59	}
60
61	/// Creates a new instance with reason set to `DroppedReason::LimitsEnforced`.
62	pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
63		Self { reason: DroppedReason::LimitsEnforced, tx_hash }
64	}
65
66	/// Creates a new instance with reason set to `DroppedReason::Invalid`.
67	pub fn new_invalid(tx_hash: Hash) -> Self {
68		Self { reason: DroppedReason::Invalid, tx_hash }
69	}
70}
71
72/// Provides reason of why transactions was dropped.
73#[derive(Debug, PartialEq)]
74pub enum DroppedReason<Hash> {
75	/// Transaction was replaced by other transaction (e.g. because of higher priority).
76	Usurped(Hash),
77	/// Transaction was dropped because of internal pool limits being enforced.
78	LimitsEnforced,
79	/// Transaction was dropped because of being invalid.
80	Invalid,
81}
82
83/// Dropped-logic related event from the single view.
84pub type ViewStreamEvent<C> =
85	crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
86
87/// Dropped-logic stream of events coming from the single view.
88type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
89
90/// Stream of extrinsic hashes that were dropped by the views and have no references by existing
91/// views.
92pub(crate) type StreamOfDropped<C> =
93	Pin<Box<dyn futures::Stream<Item = DroppedTransaction<ExtrinsicHash<C>>> + Send>>;
94
95/// A type alias for a sender used as the controller of the [`MultiViewDropWatcherContext`].
96/// Used to send control commands from the [`MultiViewDroppedWatcherController`] to
97/// [`MultiViewDropWatcherContext`].
98type Controller<T> = mpsc::TracingUnboundedSender<T>;
99
100/// A type alias for a receiver used as the commands receiver in the
101/// [`MultiViewDropWatcherContext`].
102type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
103
104/// Commands to control the instance of dropped transactions stream [`StreamOfDropped`].
105enum Command<ChainApi>
106where
107	ChainApi: graph::ChainApi,
108{
109	/// Adds a new stream of dropped-related events originating in a view with a specific block
110	/// hash
111	AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
112	/// Removes an existing view's stream associated with a specific block hash.
113	RemoveView(BlockHash<ChainApi>),
114	/// Removes referencing views for given extrinsic hashes.
115	///
116	/// Intended to ba called when transactions were finalized or their finality timed out.
117	RemoveTransactions(Vec<ExtrinsicHash<ChainApi>>),
118}
119
120impl<ChainApi> Debug for Command<ChainApi>
121where
122	ChainApi: graph::ChainApi,
123{
124	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
125		match self {
126			Command::AddView(..) => write!(f, "AddView"),
127			Command::RemoveView(..) => write!(f, "RemoveView"),
128			Command::RemoveTransactions(..) => write!(f, "RemoveTransactions"),
129		}
130	}
131}
132
133/// Manages the state and logic for handling events related to dropped transactions across multiple
134/// views.
135///
136/// This struct maintains a mapping of active views and their corresponding streams, as well as the
137/// state of each transaction with respect to these views.
138struct MultiViewDropWatcherContext<ChainApi>
139where
140	ChainApi: graph::ChainApi,
141{
142	/// A map that associates the views identified by corresponding block hashes with their streams
143	/// of dropped-related events. This map is used to keep track of active views and their event
144	/// streams.
145	stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
146	/// A receiver for commands to control the state of the stream, allowing the addition and
147	/// removal of views. This is used to dynamically update which views are being tracked.
148	command_receiver: CommandReceiver<Command<ChainApi>>,
149	/// For each transaction hash we keep the set of hashes representing the views that see this
150	/// transaction as ready or in_block.
151	///
152	/// Even if all views referencing a ready transactions are removed, we still want to keep
153	/// transaction, there can be a fork which sees the transaction as ready.
154	///
155	/// Once transaction is dropped, dropping view is removed from the set.
156	ready_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
157	/// For each transaction hash we keep the set of hashes representing the views that see this
158	/// transaction as future.
159	///
160	/// Once all views referencing a future transactions are removed, the future can be dropped.
161	///
162	/// Once transaction is dropped, dropping view is removed from the set.
163	future_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
164
165	/// Transactions that need to be notified as dropped.
166	pending_dropped_transactions: Vec<ExtrinsicHash<ChainApi>>,
167}
168
169impl<C> MultiViewDropWatcherContext<C>
170where
171	C: graph::ChainApi + 'static,
172	<<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
173{
174	/// Provides the ready or future `HashSet` containing views referencing given transaction.
175	fn transaction_views(
176		&mut self,
177		tx_hash: ExtrinsicHash<C>,
178	) -> Option<OccupiedEntry<ExtrinsicHash<C>, HashSet<BlockHash<C>>>> {
179		if let Entry::Occupied(views_keeping_tx_valid) = self.ready_transaction_views.entry(tx_hash)
180		{
181			return Some(views_keeping_tx_valid)
182		}
183		if let Entry::Occupied(views_keeping_tx_valid) =
184			self.future_transaction_views.entry(tx_hash)
185		{
186			return Some(views_keeping_tx_valid)
187		}
188		None
189	}
190
191	/// Processes the command and updates internal state accordingly.
192	fn handle_command(&mut self, cmd: Command<C>) {
193		match cmd {
194			Command::AddView(key, stream) => {
195				trace!(
196					target: LOG_TARGET,
197					"dropped_watcher: Command::AddView {key:?} views:{:?}",
198					self.stream_map.keys().collect::<Vec<_>>()
199				);
200				self.stream_map.insert(key, stream);
201			},
202			Command::RemoveView(key) => {
203				trace!(
204					target: LOG_TARGET,
205					"dropped_watcher: Command::RemoveView {key:?} views:{:?}",
206					self.stream_map.keys().collect::<Vec<_>>()
207				);
208				self.stream_map.remove(&key);
209				self.ready_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
210					trace!(
211						target: LOG_TARGET,
212						"[{:?}] dropped_watcher: Command::RemoveView ready views: {:?}",
213						tx_hash,
214						views
215					);
216					views.remove(&key);
217				});
218
219				self.future_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
220					trace!(
221						target: LOG_TARGET,
222						"[{:?}] dropped_watcher: Command::RemoveView future views: {:?}",
223						tx_hash,
224						views
225					);
226					views.remove(&key);
227					if views.is_empty() {
228						self.pending_dropped_transactions.push(*tx_hash);
229					}
230				});
231			},
232			Command::RemoveTransactions(xts) => {
233				log_xt_trace!(
234					target: LOG_TARGET,
235					xts.clone(),
236					"dropped_watcher: finalized xt removed"
237				);
238				xts.iter().for_each(|xt| {
239					self.ready_transaction_views.remove(xt);
240					self.future_transaction_views.remove(xt);
241				});
242			},
243		}
244	}
245
246	/// Processes a `ViewStreamEvent` from a specific view and updates the internal state
247	/// accordingly.
248	///
249	/// If the event indicates that a transaction has been dropped and is no longer referenced by
250	/// any active views, the transaction hash is returned. Otherwise `None` is returned.
251	fn handle_event(
252		&mut self,
253		block_hash: BlockHash<C>,
254		event: ViewStreamEvent<C>,
255	) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
256		trace!(
257			target: LOG_TARGET,
258			"dropped_watcher: handle_event: event:{event:?} from:{block_hash:?} future_views:{:?} ready_views:{:?} stream_map views:{:?}, ",
259			self.future_transaction_views.get(&event.0),
260			self.ready_transaction_views.get(&event.0),
261			self.stream_map.keys().collect::<Vec<_>>(),
262		);
263		let (tx_hash, status) = event;
264		match status {
265			TransactionStatus::Future => {
266				// see note below:
267				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
268					views_keeping_tx_valid.get_mut().insert(block_hash);
269				} else {
270					self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
271				}
272			},
273			TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
274				// note: if future transaction was once seen as the ready we may want to treat it
275				// as ready transaction. The rationale behind this is as follows: we want to remove
276				// unreferenced future transactions when the last referencing view is removed (to
277				// avoid clogging mempool). For ready transactions we prefer to keep them in mempool
278				// even if no view is currently referencing them. Future transcaction once seen as
279				// ready is likely quite close to be included in some future fork (it is close to be
280				// ready, so we make exception and treat such transaction as ready).
281				if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
282					views.insert(block_hash);
283					self.ready_transaction_views.insert(tx_hash, views);
284				} else {
285					self.ready_transaction_views.entry(tx_hash).or_default().insert(block_hash);
286				}
287			},
288			TransactionStatus::Dropped => {
289				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
290					views_keeping_tx_valid.get_mut().remove(&block_hash);
291					if views_keeping_tx_valid.get().is_empty() {
292						return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
293					}
294				} else {
295					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked dropped) tx");
296					return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
297				}
298			},
299			TransactionStatus::Usurped(by) =>
300				return Some(DroppedTransaction::new_usurped(tx_hash, by)),
301			TransactionStatus::Invalid => {
302				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
303					views_keeping_tx_valid.get_mut().remove(&block_hash);
304					if views_keeping_tx_valid.get().is_empty() {
305						return Some(DroppedTransaction::new_invalid(tx_hash))
306					}
307				} else {
308					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked invalid) tx");
309					return Some(DroppedTransaction::new_invalid(tx_hash))
310				}
311			},
312			_ => {},
313		};
314		None
315	}
316
317	/// Gets pending dropped transactions if any.
318	fn get_pending_dropped_transaction(&mut self) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
319		while let Some(tx_hash) = self.pending_dropped_transactions.pop() {
320			// never drop transaction that was seen as ready. It may not have a referencing
321			// view now, but such fork can appear.
322			if self.ready_transaction_views.get(&tx_hash).is_some() {
323				continue
324			}
325
326			if let Some(views) = self.future_transaction_views.get(&tx_hash) {
327				if views.is_empty() {
328					self.future_transaction_views.remove(&tx_hash);
329					return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
330				}
331			}
332		}
333		None
334	}
335
336	/// Creates a new `StreamOfDropped` and its associated event stream controller.
337	///
338	/// This method initializes the internal structures and unfolds the stream of dropped
339	/// transactions. Returns a tuple containing this stream and the controller for managing
340	/// this stream.
341	fn event_stream() -> (StreamOfDropped<C>, Controller<Command<C>>) {
342		//note: 64 allows to avoid warning messages during execution of unit tests.
343		const CHANNEL_SIZE: usize = 64;
344		let (sender, command_receiver) = sc_utils::mpsc::tracing_unbounded::<Command<C>>(
345			"tx-pool-dropped-watcher-cmd-stream",
346			CHANNEL_SIZE,
347		);
348
349		let ctx = Self {
350			stream_map: StreamMap::new(),
351			command_receiver,
352			ready_transaction_views: Default::default(),
353			future_transaction_views: Default::default(),
354			pending_dropped_transactions: Default::default(),
355		};
356
357		let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
358			loop {
359				if let Some(dropped) = ctx.get_pending_dropped_transaction() {
360					trace!("dropped_watcher: sending out (pending): {dropped:?}");
361					return Some((dropped, ctx));
362				}
363				tokio::select! {
364					biased;
365					Some(event) = next_event(&mut ctx.stream_map) => {
366						if let Some(dropped) = ctx.handle_event(event.0, event.1) {
367							trace!("dropped_watcher: sending out: {dropped:?}");
368							return Some((dropped, ctx));
369						}
370					},
371					cmd = ctx.command_receiver.next() => {
372						ctx.handle_command(cmd?);
373					}
374
375				}
376			}
377		})
378		.boxed();
379
380		(stream_map, sender)
381	}
382}
383
384/// The controller for manipulating the state of the [`StreamOfDropped`].
385///
386/// This struct provides methods to add and remove streams associated with views to and from the
387/// stream.
388pub struct MultiViewDroppedWatcherController<ChainApi: graph::ChainApi> {
389	/// A controller allowing to update the state of the associated [`StreamOfDropped`].
390	controller: Controller<Command<ChainApi>>,
391}
392
393impl<ChainApi: graph::ChainApi> Clone for MultiViewDroppedWatcherController<ChainApi> {
394	fn clone(&self) -> Self {
395		Self { controller: self.controller.clone() }
396	}
397}
398
399impl<ChainApi> MultiViewDroppedWatcherController<ChainApi>
400where
401	ChainApi: graph::ChainApi + 'static,
402	<<ChainApi as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
403{
404	/// Creates new [`StreamOfDropped`] and its controller.
405	pub fn new() -> (MultiViewDroppedWatcherController<ChainApi>, StreamOfDropped<ChainApi>) {
406		let (stream_map, ctrl) = MultiViewDropWatcherContext::<ChainApi>::event_stream();
407		(Self { controller: ctrl }, stream_map.boxed())
408	}
409
410	/// Notifies the [`StreamOfDropped`] that new view was created.
411	pub fn add_view(&self, key: BlockHash<ChainApi>, view: ViewStream<ChainApi>) {
412		let _ = self.controller.unbounded_send(Command::AddView(key, view)).map_err(|e| {
413			trace!(target: LOG_TARGET, "dropped_watcher: add_view {key:?} send message failed: {e}");
414		});
415	}
416
417	/// Notifies the [`StreamOfDropped`] that the view was destroyed and shall be removed the
418	/// stream map.
419	pub fn remove_view(&self, key: BlockHash<ChainApi>) {
420		let _ = self.controller.unbounded_send(Command::RemoveView(key)).map_err(|e| {
421			trace!(target: LOG_TARGET, "dropped_watcher: remove_view {key:?} send message failed: {e}");
422		});
423	}
424
425	/// Removes status info for transactions.
426	pub fn remove_transactions(
427		&self,
428		xts: impl IntoIterator<Item = ExtrinsicHash<ChainApi>> + Clone,
429	) {
430		let _ = self
431			.controller
432			.unbounded_send(Command::RemoveTransactions(xts.into_iter().collect()))
433			.map_err(|e| {
434				trace!(target: LOG_TARGET, "dropped_watcher: remove_transactions send message failed: {e}");
435			});
436	}
437}
438
439#[cfg(test)]
440mod dropped_watcher_tests {
441	use super::*;
442	use crate::common::tests::TestApi;
443	use futures::{stream::pending, FutureExt, StreamExt};
444	use sp_core::H256;
445
446	type MultiViewDroppedWatcher = super::MultiViewDroppedWatcherController<TestApi>;
447
448	#[tokio::test]
449	async fn test01() {
450		sp_tracing::try_init_simple();
451		let (watcher, output_stream) = MultiViewDroppedWatcher::new();
452
453		let block_hash = H256::repeat_byte(0x01);
454		let tx_hash = H256::repeat_byte(0x0a);
455
456		let view_stream = futures::stream::iter(vec![
457			(tx_hash, TransactionStatus::Ready),
458			(tx_hash, TransactionStatus::Dropped),
459		])
460		.boxed();
461
462		watcher.add_view(block_hash, view_stream);
463		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
464		assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
465	}
466
467	#[tokio::test]
468	async fn test02() {
469		sp_tracing::try_init_simple();
470		let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
471
472		let block_hash0 = H256::repeat_byte(0x01);
473		let block_hash1 = H256::repeat_byte(0x02);
474		let tx_hash = H256::repeat_byte(0x0a);
475
476		let view_stream0 = futures::stream::iter(vec![(tx_hash, TransactionStatus::Future)])
477			.chain(pending())
478			.boxed();
479		let view_stream1 = futures::stream::iter(vec![
480			(tx_hash, TransactionStatus::Ready),
481			(tx_hash, TransactionStatus::Dropped),
482		])
483		.boxed();
484
485		watcher.add_view(block_hash0, view_stream0);
486
487		assert!(output_stream.next().now_or_never().is_none());
488		watcher.add_view(block_hash1, view_stream1);
489		assert!(output_stream.next().now_or_never().is_none());
490	}
491
492	#[tokio::test]
493	async fn test03() {
494		sp_tracing::try_init_simple();
495		let (watcher, output_stream) = MultiViewDroppedWatcher::new();
496
497		let block_hash0 = H256::repeat_byte(0x01);
498		let block_hash1 = H256::repeat_byte(0x02);
499		let tx_hash0 = H256::repeat_byte(0x0a);
500		let tx_hash1 = H256::repeat_byte(0x0b);
501
502		let view_stream0 = futures::stream::iter(vec![(tx_hash0, TransactionStatus::Future)])
503			.chain(pending())
504			.boxed();
505		let view_stream1 = futures::stream::iter(vec![
506			(tx_hash1, TransactionStatus::Ready),
507			(tx_hash1, TransactionStatus::Dropped),
508		])
509		.boxed();
510
511		watcher.add_view(block_hash0, view_stream0);
512		watcher.add_view(block_hash1, view_stream1);
513		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
514		assert_eq!(
515			handle.await.unwrap(),
516			vec![DroppedTransaction::new_enforced_by_limts(tx_hash1)]
517		);
518	}
519
520	#[tokio::test]
521	async fn test04() {
522		sp_tracing::try_init_simple();
523		let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
524
525		let block_hash0 = H256::repeat_byte(0x01);
526		let block_hash1 = H256::repeat_byte(0x02);
527		let tx_hash = H256::repeat_byte(0x0b);
528
529		let view_stream0 = futures::stream::iter(vec![
530			(tx_hash, TransactionStatus::Future),
531			(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
532		])
533		.boxed();
534		let view_stream1 = futures::stream::iter(vec![
535			(tx_hash, TransactionStatus::Ready),
536			(tx_hash, TransactionStatus::Dropped),
537		])
538		.boxed();
539
540		watcher.add_view(block_hash0, view_stream0);
541		assert!(output_stream.next().now_or_never().is_none());
542		watcher.remove_view(block_hash0);
543
544		watcher.add_view(block_hash1, view_stream1);
545		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
546		assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
547	}
548
549	#[tokio::test]
550	async fn test05() {
551		sp_tracing::try_init_simple();
552		let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
553		assert!(output_stream.next().now_or_never().is_none());
554
555		let block_hash0 = H256::repeat_byte(0x01);
556		let block_hash1 = H256::repeat_byte(0x02);
557		let tx_hash = H256::repeat_byte(0x0b);
558
559		let view_stream0 = futures::stream::iter(vec![
560			(tx_hash, TransactionStatus::Future),
561			(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
562		])
563		.boxed();
564		watcher.add_view(block_hash0, view_stream0);
565		assert!(output_stream.next().now_or_never().is_none());
566
567		let view_stream1 = futures::stream::iter(vec![
568			(tx_hash, TransactionStatus::Ready),
569			(tx_hash, TransactionStatus::InBlock((block_hash0, 0))),
570		])
571		.boxed();
572
573		watcher.add_view(block_hash1, view_stream1);
574		assert!(output_stream.next().now_or_never().is_none());
575		assert!(output_stream.next().now_or_never().is_none());
576		assert!(output_stream.next().now_or_never().is_none());
577		assert!(output_stream.next().now_or_never().is_none());
578		assert!(output_stream.next().now_or_never().is_none());
579
580		let tx_hash = H256::repeat_byte(0x0c);
581		let view_stream2 = futures::stream::iter(vec![
582			(tx_hash, TransactionStatus::Future),
583			(tx_hash, TransactionStatus::Dropped),
584		])
585		.boxed();
586		let block_hash2 = H256::repeat_byte(0x03);
587		watcher.add_view(block_hash2, view_stream2);
588		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
589		assert_eq!(handle.await.unwrap(), vec![DroppedTransaction::new_enforced_by_limts(tx_hash)]);
590	}
591}