sc_transaction_pool/graph/
future.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	collections::{HashMap, HashSet},
21	fmt, hash,
22	sync::Arc,
23};
24
25use sp_core::hexdisplay::HexDisplay;
26use sp_runtime::transaction_validity::TransactionTag as Tag;
27use std::time::Instant;
28
29use super::base_pool::Transaction;
30
31/// Transaction with partially satisfied dependencies.
32pub struct WaitingTransaction<Hash, Ex> {
33	/// Transaction details.
34	pub transaction: Arc<Transaction<Hash, Ex>>,
35	/// Tags that are required and have not been satisfied yet by other transactions in the pool.
36	pub missing_tags: HashSet<Tag>,
37	/// Time of import to the Future Queue.
38	pub imported_at: Instant,
39}
40
41impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
42	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43		write!(fmt, "WaitingTransaction {{ ")?;
44		write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
45		write!(fmt, "transaction: {:?}, ", self.transaction)?;
46		write!(
47			fmt,
48			"missing_tags: {{{}}}",
49			self.missing_tags
50				.iter()
51				.map(|tag| HexDisplay::from(tag).to_string())
52				.collect::<Vec<_>>()
53				.join(", "),
54		)?;
55		write!(fmt, "}}")
56	}
57}
58
59impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
60	fn clone(&self) -> Self {
61		Self {
62			transaction: self.transaction.clone(),
63			missing_tags: self.missing_tags.clone(),
64			imported_at: self.imported_at,
65		}
66	}
67}
68
69impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
70	/// Creates a new `WaitingTransaction`.
71	///
72	/// Computes the set of missing tags based on the requirements and tags that
73	/// are provided by all transactions in the ready queue.
74	pub fn new(
75		transaction: Transaction<Hash, Ex>,
76		provided: &HashMap<Tag, Hash>,
77		recently_pruned: &[HashSet<Tag>],
78	) -> Self {
79		let missing_tags = transaction
80			.requires
81			.iter()
82			.filter(|tag| {
83				// is true if the tag is already satisfied either via transaction in the pool
84				// or one that was recently included.
85				let is_provided = provided.contains_key(&**tag) ||
86					recently_pruned.iter().any(|x| x.contains(&**tag));
87				!is_provided
88			})
89			.cloned()
90			.collect();
91
92		Self { transaction: Arc::new(transaction), missing_tags, imported_at: Instant::now() }
93	}
94
95	/// Marks the tag as satisfied.
96	pub fn satisfy_tag(&mut self, tag: &Tag) {
97		self.missing_tags.remove(tag);
98	}
99
100	/// Returns true if transaction has all requirements satisfied.
101	pub fn is_ready(&self) -> bool {
102		self.missing_tags.is_empty()
103	}
104}
105
106/// A pool of transactions that are not yet ready to be included in the block.
107///
108/// Contains transactions that are still awaiting for some other transactions that
109/// could provide a tag that they require.
110#[derive(Debug)]
111pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
112	/// tags that are not yet provided by any transaction and we await for them
113	wanted_tags: HashMap<Tag, HashSet<Hash>>,
114	/// Transactions waiting for a particular other transaction
115	waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
116}
117
118impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
119	fn default() -> Self {
120		Self { wanted_tags: Default::default(), waiting: Default::default() }
121	}
122}
123
124const WAITING_PROOF: &str = r"#
125In import we always insert to `waiting` if we push to `wanted_tags`;
126when removing from `waiting` we always clear `wanted_tags`;
127every hash from `wanted_tags` is always present in `waiting`;
128qed
129#";
130
131impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
132	/// Import transaction to Future queue.
133	///
134	/// Only transactions that don't have all their tags satisfied should occupy
135	/// the Future queue.
136	/// As soon as required tags are provided by some other transactions that are ready
137	/// we should remove the transactions from here and move them to the Ready queue.
138	pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
139		assert!(!tx.is_ready(), "Transaction is ready.");
140		assert!(
141			!self.waiting.contains_key(&tx.transaction.hash),
142			"Transaction is already imported."
143		);
144
145		// Add all tags that are missing
146		for tag in &tx.missing_tags {
147			let entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
148			entry.insert(tx.transaction.hash.clone());
149		}
150
151		// Add the transaction to a by-hash waiting map
152		self.waiting.insert(tx.transaction.hash.clone(), tx);
153	}
154
155	/// Returns true if given hash is part of the queue.
156	pub fn contains(&self, hash: &Hash) -> bool {
157		self.waiting.contains_key(hash)
158	}
159
160	/// Returns a list of known transactions
161	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
162		hashes
163			.iter()
164			.map(|h| self.waiting.get(h).map(|x| x.transaction.clone()))
165			.collect()
166	}
167
168	/// Satisfies provided tags in transactions that are waiting for them.
169	///
170	/// Returns (and removes) transactions that became ready after their last tag got
171	/// satisfied and now we can remove them from Future and move to Ready queue.
172	pub fn satisfy_tags<T: AsRef<Tag>>(
173		&mut self,
174		tags: impl IntoIterator<Item = T>,
175	) -> Vec<WaitingTransaction<Hash, Ex>> {
176		let mut became_ready = vec![];
177
178		for tag in tags {
179			if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
180				for hash in hashes {
181					let is_ready = {
182						let tx = self.waiting.get_mut(&hash).expect(WAITING_PROOF);
183						tx.satisfy_tag(tag.as_ref());
184						tx.is_ready()
185					};
186
187					if is_ready {
188						let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
189						became_ready.push(tx);
190					}
191				}
192			}
193		}
194
195		became_ready
196	}
197
198	/// Removes transactions for given list of hashes.
199	///
200	/// Returns a list of actually removed transactions.
201	pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
202		let mut removed = vec![];
203		for hash in hashes {
204			if let Some(waiting_tx) = self.waiting.remove(hash) {
205				// remove from wanted_tags as well
206				for tag in waiting_tx.missing_tags {
207					let remove = if let Some(wanted) = self.wanted_tags.get_mut(&tag) {
208						wanted.remove(hash);
209						wanted.is_empty()
210					} else {
211						false
212					};
213					if remove {
214						self.wanted_tags.remove(&tag);
215					}
216				}
217				// add to result
218				removed.push(waiting_tx.transaction)
219			}
220		}
221		removed
222	}
223
224	/// Fold a list of future transactions to compute a single value.
225	pub fn fold<R, F: FnMut(Option<R>, &WaitingTransaction<Hash, Ex>) -> Option<R>>(
226		&mut self,
227		f: F,
228	) -> Option<R> {
229		self.waiting.values().fold(None, f)
230	}
231
232	/// Returns iterator over all future transactions
233	pub fn all(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
234		self.waiting.values().map(|waiting| &*waiting.transaction)
235	}
236
237	/// Removes and returns all future transactions.
238	pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
239		self.wanted_tags.clear();
240		self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
241	}
242
243	/// Returns number of transactions in the Future queue.
244	pub fn len(&self) -> usize {
245		self.waiting.len()
246	}
247
248	/// Returns sum of encoding lengths of all transactions in this queue.
249	pub fn bytes(&self) -> usize {
250		self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes)
251	}
252}