referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
future.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	collections::{HashMap, HashSet},
21	fmt, hash,
22	sync::Arc,
23};
24
25use sp_core::hexdisplay::HexDisplay;
26use sp_runtime::transaction_validity::TransactionTag as Tag;
27use std::time::Instant;
28
29use super::base_pool::Transaction;
30use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
31
32/// Transaction with partially satisfied dependencies.
33pub struct WaitingTransaction<Hash, Ex> {
34	/// Transaction details.
35	pub transaction: Arc<Transaction<Hash, Ex>>,
36	/// Tags that are required and have not been satisfied yet by other transactions in the pool.
37	pub missing_tags: HashSet<Tag>,
38	/// Time of import to the Future Queue.
39	pub imported_at: Instant,
40}
41
42impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
43	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
44		write!(fmt, "WaitingTransaction {{ ")?;
45		write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
46		write!(fmt, "transaction: {:?}, ", self.transaction)?;
47		write!(
48			fmt,
49			"missing_tags: {{{}}}",
50			self.missing_tags
51				.iter()
52				.map(|tag| HexDisplay::from(tag).to_string())
53				.collect::<Vec<_>>()
54				.join(", "),
55		)?;
56		write!(fmt, "}}")
57	}
58}
59
60impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
61	fn clone(&self) -> Self {
62		Self {
63			transaction: self.transaction.clone(),
64			missing_tags: self.missing_tags.clone(),
65			imported_at: self.imported_at,
66		}
67	}
68}
69
70impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
71	/// Creates a new `WaitingTransaction`.
72	///
73	/// Computes the set of missing tags based on the requirements and tags that
74	/// are provided by all transactions in the ready queue.
75	pub fn new(
76		transaction: Transaction<Hash, Ex>,
77		provided: &HashMap<Tag, Hash>,
78		recently_pruned: &[HashSet<Tag>],
79	) -> Self {
80		let missing_tags = transaction
81			.requires
82			.iter()
83			.filter(|tag| {
84				// is true if the tag is already satisfied either via transaction in the pool
85				// or one that was recently included.
86				let is_provided = provided.contains_key(&**tag) ||
87					recently_pruned.iter().any(|x| x.contains(&**tag));
88				!is_provided
89			})
90			.cloned()
91			.collect();
92
93		Self { transaction: Arc::new(transaction), missing_tags, imported_at: Instant::now() }
94	}
95
96	/// Marks the tag as satisfied.
97	pub fn satisfy_tag(&mut self, tag: &Tag) {
98		self.missing_tags.remove(tag);
99	}
100
101	/// Returns true if transaction has all requirements satisfied.
102	pub fn is_ready(&self) -> bool {
103		self.missing_tags.is_empty()
104	}
105}
106
107/// A pool of transactions that are not yet ready to be included in the block.
108///
109/// Contains transactions that are still awaiting some other transactions that
110/// could provide a tag that they require.
111#[derive(Clone, Debug)]
112pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
113	/// tags that are not yet provided by any transaction, and we await for them
114	wanted_tags: HashMap<Tag, HashSet<Hash>>,
115	/// Transactions waiting for a particular other transaction
116	waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
117}
118
119impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
120	fn default() -> Self {
121		Self { wanted_tags: Default::default(), waiting: Default::default() }
122	}
123}
124
125const WAITING_PROOF: &str = r"#
126In import we always insert to `waiting` if we push to `wanted_tags`;
127when removing from `waiting` we always clear `wanted_tags`;
128every hash from `wanted_tags` is always present in `waiting`;
129qed
130#";
131
132impl<Hash: hash::Hash + Eq + Clone + std::fmt::Debug, Ex: std::fmt::Debug>
133	FutureTransactions<Hash, Ex>
134{
135	/// Import transaction to Future queue.
136	///
137	/// Only transactions that don't have all their tags satisfied should occupy
138	/// the Future queue.
139	/// As soon as required tags are provided by some other transactions that are ready
140	/// we should remove the transactions from here and move them to the Ready queue.
141	pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
142		assert!(!tx.is_ready(), "Transaction is ready.");
143		assert!(
144			!self.waiting.contains_key(&tx.transaction.hash),
145			"Transaction is already imported."
146		);
147
148		// Add all tags that are missing
149		for tag in &tx.missing_tags {
150			let entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
151			entry.insert(tx.transaction.hash.clone());
152		}
153
154		// Add the transaction to a by-hash waiting map
155		self.waiting.insert(tx.transaction.hash.clone(), tx);
156	}
157
158	/// Returns true if given hash is part of the queue.
159	pub fn contains(&self, hash: &Hash) -> bool {
160		self.waiting.contains_key(hash)
161	}
162
163	/// Returns a list of known transactions
164	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
165		hashes
166			.iter()
167			.map(|h| self.waiting.get(h).map(|x| x.transaction.clone()))
168			.collect()
169	}
170
171	/// Removes transactions that provide any of tags in the given list.
172	///
173	/// Returns list of removed transactions.
174	pub fn prune_tags(&mut self, tags: &Vec<Tag>) -> Vec<Arc<Transaction<Hash, Ex>>> {
175		let pruned = self
176			.waiting
177			.values()
178			.filter_map(|tx| {
179				tx.transaction
180					.provides
181					.iter()
182					.any(|provided_tag| tags.contains(provided_tag))
183					.then(|| tx.transaction.hash.clone())
184			})
185			.collect::<Vec<_>>();
186
187		log_xt_trace!(target: LOG_TARGET, &pruned, "FutureTransactions: removed while pruning tags.");
188		self.remove(&pruned)
189	}
190
191	/// Satisfies provided tags in transactions that are waiting for them.
192	///
193	/// Returns (and removes) transactions that became ready after their last tag got
194	/// satisfied, and now we can remove them from Future and move to Ready queue.
195	pub fn satisfy_tags<T: AsRef<Tag>>(
196		&mut self,
197		tags: impl IntoIterator<Item = T>,
198	) -> Vec<WaitingTransaction<Hash, Ex>> {
199		let mut became_ready = vec![];
200
201		for tag in tags {
202			if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
203				for hash in hashes {
204					let is_ready = {
205						let tx = self.waiting.get_mut(&hash).expect(WAITING_PROOF);
206						tx.satisfy_tag(tag.as_ref());
207						tx.is_ready()
208					};
209
210					if is_ready {
211						let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
212						became_ready.push(tx);
213					}
214				}
215			}
216		}
217
218		became_ready
219	}
220
221	/// Removes transactions for given list of hashes.
222	///
223	/// Returns a list of actually removed transactions.
224	pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
225		let mut removed = vec![];
226		for hash in hashes {
227			if let Some(waiting_tx) = self.waiting.remove(hash) {
228				// remove from wanted_tags as well
229				for tag in waiting_tx.missing_tags {
230					let remove = if let Some(wanted) = self.wanted_tags.get_mut(&tag) {
231						wanted.remove(hash);
232						wanted.is_empty()
233					} else {
234						false
235					};
236					if remove {
237						self.wanted_tags.remove(&tag);
238					}
239				}
240				// add to result
241				removed.push(waiting_tx.transaction)
242			}
243		}
244
245		removed
246	}
247
248	/// Fold a list of future transactions to compute a single value.
249	pub fn fold<R, F: FnMut(Option<R>, &WaitingTransaction<Hash, Ex>) -> Option<R>>(
250		&mut self,
251		f: F,
252	) -> Option<R> {
253		self.waiting.values().fold(None, f)
254	}
255
256	/// Returns iterator over all future transactions
257	pub fn all(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
258		self.waiting.values().map(|waiting| &*waiting.transaction)
259	}
260
261	/// Removes and returns all future transactions.
262	pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
263		self.wanted_tags.clear();
264		self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
265	}
266
267	/// Returns number of transactions in the Future queue.
268	pub fn len(&self) -> usize {
269		self.waiting.len()
270	}
271
272	/// Returns sum of encoding lengths of all transactions in this queue.
273	pub fn bytes(&self) -> usize {
274		self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes)
275	}
276}