sc_transaction_pool/graph/
listener.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{collections::HashMap, fmt::Debug, hash};
20
21use crate::LOG_TARGET;
22use linked_hash_map::LinkedHashMap;
23use log::{debug, trace};
24use serde::Serialize;
25use sp_runtime::traits;
26
27use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
28
29/// Extrinsic pool default listener.
30pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
31	watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
32	finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
33}
34
35/// Maximum number of blocks awaiting finality at any time.
36const MAX_FINALITY_WATCHERS: usize = 512;
37
38impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
39	fn default() -> Self {
40		Self { watchers: Default::default(), finality_watchers: Default::default() }
41	}
42}
43
44impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
45	fn fire<F>(&mut self, hash: &H, fun: F)
46	where
47		F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>),
48	{
49		let clean = if let Some(h) = self.watchers.get_mut(hash) {
50			fun(h);
51			h.is_done()
52		} else {
53			false
54		};
55
56		if clean {
57			self.watchers.remove(hash);
58		}
59	}
60
61	/// Creates a new watcher for given verified extrinsic.
62	///
63	/// The watcher can be used to subscribe to life-cycle events of that extrinsic.
64	pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
65		let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
66		sender.new_watcher(hash)
67	}
68
69	/// Notify the listeners about extrinsic broadcast.
70	pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
71		trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash);
72		self.fire(hash, |watcher| watcher.broadcast(peers));
73	}
74
75	/// New transaction was added to the ready pool or promoted from the future pool.
76	pub fn ready(&mut self, tx: &H, old: Option<&H>) {
77		trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old);
78		self.fire(tx, |watcher| watcher.ready());
79		if let Some(old) = old {
80			self.fire(old, |watcher| watcher.usurped(tx.clone()));
81		}
82	}
83
84	/// New transaction was added to the future pool.
85	pub fn future(&mut self, tx: &H) {
86		trace!(target: LOG_TARGET, "[{:?}] Future", tx);
87		self.fire(tx, |watcher| watcher.future());
88	}
89
90	/// Transaction was dropped from the pool because of the limit.
91	pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
92		trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by);
93		self.fire(tx, |watcher| match by {
94			Some(t) => watcher.usurped(t.clone()),
95			None => watcher.dropped(),
96		})
97	}
98
99	/// Transaction was removed as invalid.
100	pub fn invalid(&mut self, tx: &H) {
101		debug!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
102		self.fire(tx, |watcher| watcher.invalid());
103	}
104
105	/// Transaction was pruned from the pool.
106	pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
107		debug!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash);
108		// Get the transactions included in the given block hash.
109		let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
110		txs.push(tx.clone());
111		// Current transaction is the last one included.
112		let tx_index = txs.len() - 1;
113
114		self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index));
115
116		while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
117			if let Some((hash, txs)) = self.finality_watchers.pop_front() {
118				for tx in txs {
119					self.fire(&tx, |watcher| watcher.finality_timeout(hash));
120				}
121			}
122		}
123	}
124
125	/// The block this transaction was included in has been retracted.
126	pub fn retracted(&mut self, block_hash: BlockHash<C>) {
127		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
128			for hash in hashes {
129				self.fire(&hash, |watcher| watcher.retracted(block_hash))
130			}
131		}
132	}
133
134	/// Notify all watchers that transactions have been finalized
135	pub fn finalized(&mut self, block_hash: BlockHash<C>) {
136		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
137			for (tx_index, hash) in hashes.into_iter().enumerate() {
138				log::debug!(
139					target: LOG_TARGET,
140					"[{:?}] Sent finalization event (block {:?})",
141					hash,
142					block_hash,
143				);
144				self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index))
145			}
146		}
147	}
148}