referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
listener.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{collections::HashMap, fmt::Debug, hash};
20
21use linked_hash_map::LinkedHashMap;
22use tracing::trace;
23
24use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
25
26static LOG_TARGET: &str = "txpool::watcher";
27
28/// The `EventHandler` trait provides a mechanism for clients to respond to various
29/// transaction-related events. It offers a set of callback methods that are invoked by the
30/// transaction pool's event dispatcher to notify about changes in the status of transactions.
31///
32/// This trait can be implemented by any component that needs to respond to transaction lifecycle
33/// events, enabling custom logic and handling of these events.
34pub trait EventHandler<C: ChainApi> {
35	/// Called when a transaction is broadcasted.
36	fn broadcasted(&self, _hash: ExtrinsicHash<C>, _peers: Vec<String>) {}
37
38	/// Called when a transaction is ready for execution.
39	fn ready(&self, _tx: ExtrinsicHash<C>) {}
40
41	/// Called when a transaction is deemed to be executable in the future.
42	fn future(&self, _tx: ExtrinsicHash<C>) {}
43
44	/// Called when transaction pool limits result in a transaction being affected.
45	fn limits_enforced(&self, _tx: ExtrinsicHash<C>) {}
46
47	/// Called when a transaction is replaced by another.
48	fn usurped(&self, _tx: ExtrinsicHash<C>, _by: ExtrinsicHash<C>) {}
49
50	/// Called when a transaction is dropped from the pool.
51	fn dropped(&self, _tx: ExtrinsicHash<C>) {}
52
53	/// Called when a transaction is found to be invalid.
54	fn invalid(&self, _tx: ExtrinsicHash<C>) {}
55
56	/// Called when a transaction was pruned from the pool due to its presence in imported block.
57	fn pruned(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {}
58
59	/// Called when a transaction is retracted from inclusion in a block.
60	fn retracted(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>) {}
61
62	/// Called when a transaction has not been finalized within a timeout period.
63	fn finality_timeout(&self, _tx: ExtrinsicHash<C>, _hash: BlockHash<C>) {}
64
65	/// Called when a transaction is finalized in a block.
66	fn finalized(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {}
67}
68
69impl<C: ChainApi> EventHandler<C> for () {}
70
71/// The `EventDispatcher` struct is responsible for dispatching transaction-related events from the
72/// validated pool to interested observers and an optional event handler. It acts as the primary
73/// liaison between the transaction pool and clients that are monitoring transaction statuses.
74pub struct EventDispatcher<H: hash::Hash + Eq, C: ChainApi, L: EventHandler<C>> {
75	/// Map containing per-transaction sinks for emitting transaction status events.
76	watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
77	finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
78
79	/// Optional event handler (listener) that will be notified about all transactions status
80	/// changes from the pool.
81	event_handler: Option<L>,
82}
83
84/// Maximum number of blocks awaiting finality at any time.
85const MAX_FINALITY_WATCHERS: usize = 512;
86
87impl<H: hash::Hash + Eq + Debug, C: ChainApi, L: EventHandler<C>> Default
88	for EventDispatcher<H, C, L>
89{
90	fn default() -> Self {
91		Self {
92			watchers: Default::default(),
93			finality_watchers: Default::default(),
94			event_handler: None,
95		}
96	}
97}
98
99impl<C: ChainApi, L: EventHandler<C>> EventDispatcher<ExtrinsicHash<C>, C, L> {
100	/// Creates a new instance with provided event handler.
101	pub fn new_with_event_handler(event_handler: Option<L>) -> Self {
102		Self { event_handler, ..Default::default() }
103	}
104
105	fn fire<F>(&mut self, hash: &ExtrinsicHash<C>, fun: F)
106	where
107		F: FnOnce(&mut watcher::Sender<ExtrinsicHash<C>, ExtrinsicHash<C>>),
108	{
109		let clean = if let Some(h) = self.watchers.get_mut(hash) {
110			fun(h);
111			h.is_done()
112		} else {
113			false
114		};
115
116		if clean {
117			self.watchers.remove(hash);
118		}
119	}
120
121	/// Creates a new watcher for given verified extrinsic.
122	///
123	/// The watcher can be used to subscribe to life-cycle events of that extrinsic.
124	pub fn create_watcher(
125		&mut self,
126		hash: ExtrinsicHash<C>,
127	) -> watcher::Watcher<ExtrinsicHash<C>, ExtrinsicHash<C>> {
128		let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default);
129		sender.new_watcher(hash)
130	}
131
132	/// Notify the listeners about the extrinsic broadcast.
133	pub fn broadcasted(&mut self, tx_hash: &ExtrinsicHash<C>, peers: Vec<String>) {
134		trace!(
135			target: LOG_TARGET,
136			?tx_hash,
137			"Broadcasted."
138		);
139		self.fire(tx_hash, |watcher| watcher.broadcast(peers.clone()));
140		self.event_handler.as_ref().map(|l| l.broadcasted(*tx_hash, peers));
141	}
142
143	/// New transaction was added to the ready pool or promoted from the future pool.
144	pub fn ready(&mut self, tx: &ExtrinsicHash<C>, old: Option<&ExtrinsicHash<C>>) {
145		trace!(
146			target: LOG_TARGET,
147			tx_hash = ?*tx,
148			replaced_with = ?old,
149			"Ready."
150		);
151		self.fire(tx, |watcher| watcher.ready());
152		if let Some(old) = old {
153			self.fire(old, |watcher| watcher.usurped(*tx));
154		}
155
156		self.event_handler.as_ref().map(|l| l.ready(*tx));
157	}
158
159	/// New transaction was added to the future pool.
160	pub fn future(&mut self, tx_hash: &ExtrinsicHash<C>) {
161		trace!(
162			target: LOG_TARGET,
163			?tx_hash,
164			"Future."
165		);
166		self.fire(tx_hash, |watcher| watcher.future());
167
168		self.event_handler.as_ref().map(|l| l.future(*tx_hash));
169	}
170
171	/// Transaction was dropped from the pool because of enforcing the limit.
172	pub fn limits_enforced(&mut self, tx_hash: &ExtrinsicHash<C>) {
173		trace!(
174			target: LOG_TARGET,
175			?tx_hash,
176			"Dropped (limits enforced)."
177		);
178		self.fire(tx_hash, |watcher| watcher.limit_enforced());
179
180		self.event_handler.as_ref().map(|l| l.limits_enforced(*tx_hash));
181	}
182
183	/// Transaction was replaced with other extrinsic.
184	pub fn usurped(&mut self, tx: &ExtrinsicHash<C>, by: &ExtrinsicHash<C>) {
185		trace!(
186			target: LOG_TARGET,
187			tx_hash = ?tx,
188			?by,
189			"Dropped (replaced)."
190		);
191		self.fire(tx, |watcher| watcher.usurped(*by));
192
193		self.event_handler.as_ref().map(|l| l.usurped(*tx, *by));
194	}
195
196	/// Transaction was dropped from the pool because of the failure during the resubmission of
197	/// revalidate transactions or failure during pruning tags.
198	pub fn dropped(&mut self, tx_hash: &ExtrinsicHash<C>) {
199		trace!(
200			target: LOG_TARGET,
201			   ?tx_hash,
202			"Dropped."
203		);
204		self.fire(tx_hash, |watcher| watcher.dropped());
205		self.event_handler.as_ref().map(|l| l.dropped(*tx_hash));
206	}
207
208	/// Transaction was removed as invalid.
209	pub fn invalid(&mut self, tx_hash: &ExtrinsicHash<C>) {
210		trace!(
211			target: LOG_TARGET,
212			?tx_hash,
213			"Extrinsic invalid."
214		);
215		self.fire(tx_hash, |watcher| watcher.invalid());
216		self.event_handler.as_ref().map(|l| l.invalid(*tx_hash));
217	}
218
219	/// Transaction was pruned from the pool.
220	pub fn pruned(&mut self, block_hash: BlockHash<C>, tx_hash: &ExtrinsicHash<C>) {
221		trace!(
222			target: LOG_TARGET,
223			?tx_hash,
224			?block_hash,
225			"Pruned at."
226		);
227		// Get the transactions included in the given block hash.
228		let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
229		txs.push(*tx_hash);
230		// Current transaction is the last one included.
231		let tx_index = txs.len() - 1;
232
233		self.fire(tx_hash, |watcher| watcher.in_block(block_hash, tx_index));
234		self.event_handler.as_ref().map(|l| l.pruned(*tx_hash, block_hash, tx_index));
235
236		while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
237			if let Some((hash, txs)) = self.finality_watchers.pop_front() {
238				for tx in txs {
239					self.fire(&tx, |watcher| watcher.finality_timeout(hash));
240					self.event_handler.as_ref().map(|l| l.finality_timeout(tx, block_hash));
241				}
242			}
243		}
244	}
245
246	/// The block this transaction was included in has been retracted.
247	pub fn retracted(&mut self, block_hash: BlockHash<C>) {
248		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
249			for hash in hashes {
250				self.fire(&hash, |watcher| watcher.retracted(block_hash));
251				self.event_handler.as_ref().map(|l| l.retracted(hash, block_hash));
252			}
253		}
254	}
255
256	/// Notify all watchers that transactions have been finalized
257	pub fn finalized(&mut self, block_hash: BlockHash<C>) {
258		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
259			for (tx_index, tx_hash) in hashes.into_iter().enumerate() {
260				trace!(
261					target: LOG_TARGET,
262					?tx_hash,
263					?block_hash,
264					"Sent finalization event."
265				);
266				self.fire(&tx_hash, |watcher| watcher.finalized(block_hash, tx_index));
267				self.event_handler.as_ref().map(|l| l.finalized(tx_hash, block_hash, tx_index));
268			}
269		}
270	}
271
272	/// Provides hashes of all watched transactions.
273	pub fn watched_transactions(&self) -> impl Iterator<Item = &ExtrinsicHash<C>> {
274		self.watchers.keys()
275	}
276}