referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
base_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! A basic version of the dependency graph.
20//!
21//! For a more full-featured pool, have a look at the `pool` module.
22
23use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
24
25use crate::LOG_TARGET;
26use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
27use serde::Serialize;
28use sp_core::hexdisplay::HexDisplay;
29use sp_runtime::{
30	traits::Member,
31	transaction_validity::{
32		TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
33		TransactionTag as Tag,
34	},
35};
36use tracing::{trace, warn};
37
38use super::{
39	future::{FutureTransactions, WaitingTransaction},
40	ready::{BestIterator, ReadyTransactions, TransactionRef},
41};
42
43/// Successful import result.
44#[derive(Debug, PartialEq, Eq)]
45pub enum Imported<Hash, Ex> {
46	/// Transaction was successfully imported to Ready queue.
47	Ready {
48		/// Hash of transaction that was successfully imported.
49		hash: Hash,
50		/// Transactions that got promoted from the Future queue.
51		promoted: Vec<Hash>,
52		/// Transactions that failed to be promoted from the Future queue and are now discarded.
53		failed: Vec<Hash>,
54		/// Transactions removed from the Ready pool (replaced).
55		removed: Vec<Arc<Transaction<Hash, Ex>>>,
56	},
57	/// Transaction was successfully imported to Future queue.
58	Future {
59		/// Hash of transaction that was successfully imported.
60		hash: Hash,
61	},
62}
63
64impl<Hash, Ex> Imported<Hash, Ex> {
65	/// Returns the hash of imported transaction.
66	pub fn hash(&self) -> &Hash {
67		use self::Imported::*;
68		match *self {
69			Ready { ref hash, .. } => hash,
70			Future { ref hash, .. } => hash,
71		}
72	}
73}
74
75/// Status of pruning the queue.
76#[derive(Debug)]
77pub struct PruneStatus<Hash, Ex> {
78	/// A list of imports that satisfying the tag triggered.
79	pub promoted: Vec<Imported<Hash, Ex>>,
80	/// A list of transactions that failed to be promoted and now are discarded.
81	pub failed: Vec<Hash>,
82	/// A list of transactions that got pruned from the ready queue.
83	pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
84}
85
86/// A transaction source that includes a timestamp indicating when the transaction was submitted.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct TimedTransactionSource {
89	/// The original source of the transaction.
90	pub source: TransactionSource,
91
92	/// The time at which the transaction was submitted.
93	pub timestamp: Option<Instant>,
94}
95
96impl From<TimedTransactionSource> for TransactionSource {
97	fn from(value: TimedTransactionSource) -> Self {
98		value.source
99	}
100}
101
102impl TimedTransactionSource {
103	/// Creates a new instance with an internal `TransactionSource::InBlock` source and an optional
104	/// timestamp.
105	pub fn new_in_block(with_timestamp: bool) -> Self {
106		Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
107	}
108	/// Creates a new instance with an internal `TransactionSource::External` source and an optional
109	/// timestamp.
110	pub fn new_external(with_timestamp: bool) -> Self {
111		Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
112	}
113	/// Creates a new instance with an internal `TransactionSource::Local` source and an optional
114	/// timestamp.
115	pub fn new_local(with_timestamp: bool) -> Self {
116		Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
117	}
118	/// Creates a new instance with an given source and an optional timestamp.
119	pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
120		Self { source, timestamp: with_timestamp.then(Instant::now) }
121	}
122}
123
124/// Immutable transaction
125#[derive(PartialEq, Eq, Clone)]
126pub struct Transaction<Hash, Extrinsic> {
127	/// Raw extrinsic representing that transaction.
128	pub data: Extrinsic,
129	/// Number of bytes encoding of the transaction requires.
130	pub bytes: usize,
131	/// Transaction hash (unique)
132	pub hash: Hash,
133	/// Transaction priority (higher = better)
134	pub priority: Priority,
135	/// At which block the transaction becomes invalid?
136	pub valid_till: Longevity,
137	/// Tags required by the transaction.
138	pub requires: Vec<Tag>,
139	/// Tags that this transaction provides.
140	pub provides: Vec<Tag>,
141	/// Should that transaction be propagated.
142	pub propagate: bool,
143	/// Timed source of that transaction.
144	pub source: TimedTransactionSource,
145}
146
147impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
148	fn as_ref(&self) -> &Extrinsic {
149		&self.data
150	}
151}
152
153impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
154	type Transaction = Extrinsic;
155	type Hash = Hash;
156
157	fn data(&self) -> &Extrinsic {
158		&self.data
159	}
160
161	fn hash(&self) -> &Hash {
162		&self.hash
163	}
164
165	fn priority(&self) -> &Priority {
166		&self.priority
167	}
168
169	fn longevity(&self) -> &Longevity {
170		&self.valid_till
171	}
172
173	fn requires(&self) -> &[Tag] {
174		&self.requires
175	}
176
177	fn provides(&self) -> &[Tag] {
178		&self.provides
179	}
180
181	fn is_propagable(&self) -> bool {
182		self.propagate
183	}
184}
185
186impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
187	/// Explicit transaction clone.
188	///
189	/// Transaction should be cloned only if absolutely necessary && we want
190	/// every reason to be commented. That's why we `Transaction` is not `Clone`,
191	/// but there's explicit `duplicate` method.
192	pub fn duplicate(&self) -> Self {
193		Self {
194			data: self.data.clone(),
195			bytes: self.bytes,
196			hash: self.hash.clone(),
197			priority: self.priority,
198			source: self.source.clone(),
199			valid_till: self.valid_till,
200			requires: self.requires.clone(),
201			provides: self.provides.clone(),
202			propagate: self.propagate,
203		}
204	}
205}
206
207impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
208where
209	Hash: fmt::Debug,
210	Extrinsic: fmt::Debug,
211{
212	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
213		let join_tags = |tags: &[Tag]| {
214			tags.iter()
215				.map(|tag| HexDisplay::from(tag).to_string())
216				.collect::<Vec<_>>()
217				.join(", ")
218		};
219
220		write!(fmt, "Transaction {{ ")?;
221		write!(fmt, "hash: {:?}, ", &self.hash)?;
222		write!(fmt, "priority: {:?}, ", &self.priority)?;
223		write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
224		write!(fmt, "bytes: {:?}, ", &self.bytes)?;
225		write!(fmt, "propagate: {:?}, ", &self.propagate)?;
226		write!(fmt, "source: {:?}, ", &self.source)?;
227		write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
228		write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
229		write!(fmt, "data: {:?}", &self.data)?;
230		write!(fmt, "}}")?;
231		Ok(())
232	}
233}
234
235/// Store last pruned tags for given number of invocations.
236const RECENTLY_PRUNED_TAGS: usize = 2;
237
238/// Transaction pool.
239///
240/// Builds a dependency graph for all transactions in the pool and returns
241/// the ones that are currently ready to be executed.
242///
243/// General note:
244/// If function returns some transactions it usually means that importing them
245/// as-is for the second time will fail or produce unwanted results.
246/// Most likely it is required to revalidate them and recompute set of
247/// required tags.
248#[derive(Clone, Debug)]
249pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
250	reject_future_transactions: bool,
251	future: FutureTransactions<Hash, Ex>,
252	ready: ReadyTransactions<Hash, Ex>,
253	/// Store recently pruned tags (for last two invocations).
254	///
255	/// This is used to make sure we don't accidentally put
256	/// transactions to future in case they were just stuck in verification.
257	recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
258	recently_pruned_index: usize,
259}
260
261impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
262	fn default() -> Self {
263		Self::new(false)
264	}
265}
266
267impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
268	/// Create new pool given reject_future_transactions flag.
269	pub fn new(reject_future_transactions: bool) -> Self {
270		Self {
271			reject_future_transactions,
272			future: Default::default(),
273			ready: Default::default(),
274			recently_pruned: Default::default(),
275			recently_pruned_index: 0,
276		}
277	}
278
279	/// Clears buffer keeping recently pruned transaction.
280	pub fn clear_recently_pruned(&mut self) {
281		self.recently_pruned = Default::default();
282		self.recently_pruned_index = 0;
283	}
284
285	/// Temporary enables future transactions, runs closure and then restores
286	/// `reject_future_transactions` flag back to previous value.
287	///
288	/// The closure accepts the mutable reference to the pool and original value
289	/// of the `reject_future_transactions` flag.
290	pub(crate) fn with_futures_enabled<T>(
291		&mut self,
292		closure: impl FnOnce(&mut Self, bool) -> T,
293	) -> T {
294		let previous = self.reject_future_transactions;
295		self.reject_future_transactions = false;
296		let return_value = closure(self, previous);
297		self.reject_future_transactions = previous;
298		return_value
299	}
300
301	/// Returns if the transaction for the given hash is already imported.
302	pub fn is_imported(&self, tx_hash: &Hash) -> bool {
303		self.future.contains(tx_hash) || self.ready.contains(tx_hash)
304	}
305
306	/// Imports transaction to the pool.
307	///
308	/// The pool consists of two parts: Future and Ready.
309	/// The former contains transactions that require some tags that are not yet provided by
310	/// other transactions in the pool.
311	/// The latter contains transactions that have all the requirements satisfied and are
312	/// ready to be included in the block.
313	pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
314		if self.is_imported(&tx.hash) {
315			return Err(error::Error::AlreadyImported(Box::new(tx.hash)));
316		}
317
318		let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
319		trace!(
320			target: LOG_TARGET,
321			tx_hash = ?tx.transaction.hash,
322			?tx,
323			set = if tx.is_ready() { "ready" } else { "future" },
324			"Importing transaction"
325		);
326
327		// If all tags are not satisfied import to future.
328		if !tx.is_ready() {
329			if self.reject_future_transactions {
330				return Err(error::Error::RejectedFutureTransaction);
331			}
332
333			let hash = tx.transaction.hash.clone();
334			self.future.import(tx);
335			return Ok(Imported::Future { hash });
336		}
337
338		self.import_to_ready(tx)
339	}
340
341	/// Imports transaction to ready queue.
342	///
343	/// NOTE the transaction has to have all requirements satisfied.
344	fn import_to_ready(
345		&mut self,
346		tx: WaitingTransaction<Hash, Ex>,
347	) -> error::Result<Imported<Hash, Ex>> {
348		let tx_hash = tx.transaction.hash.clone();
349		let mut promoted = vec![];
350		let mut failed = vec![];
351		let mut removed = vec![];
352
353		let mut first = true;
354		let mut to_import = vec![tx];
355
356		// take first transaction from the list
357		while let Some(tx) = to_import.pop() {
358			// find transactions in Future that it unlocks
359			to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
360
361			// import this transaction
362			let current_hash = tx.transaction.hash.clone();
363			let current_tx = tx.transaction.clone();
364			match self.ready.import(tx) {
365				Ok(mut replaced) => {
366					if !first {
367						promoted.push(current_hash.clone());
368					}
369					// If there were conflicting future transactions promoted, removed them from
370					// promoted set.
371					promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
372					// The transactions were removed from the ready pool. We might attempt to
373					// re-import them.
374					removed.append(&mut replaced);
375				},
376				Err(error @ error::Error::TooLowPriority { .. }) => {
377					trace!(
378						target: LOG_TARGET,
379						tx_hash = ?current_tx.hash,
380						?first,
381						%error,
382						"Error importing transaction"
383					);
384					if first {
385						return Err(error);
386					} else {
387						removed.push(current_tx);
388						promoted.retain(|hash| *hash != current_hash);
389					}
390				},
391				// transaction failed to be imported.
392				Err(error) => {
393					trace!(
394						target: LOG_TARGET,
395						tx_hash = ?current_tx.hash,
396						?error,
397						first,
398						"Error importing transaction"
399					);
400					if first {
401						return Err(error);
402					} else {
403						failed.push(current_tx.hash.clone());
404					}
405				},
406			}
407			first = false;
408		}
409
410		// An edge case when importing transaction caused
411		// some future transactions to be imported and that
412		// future transactions pushed out current transaction.
413		// This means that there is a cycle and the transactions should
414		// be moved back to future, since we can't resolve it.
415		if removed.iter().any(|tx| tx.hash == tx_hash) {
416			// We still need to remove all transactions that we promoted
417			// since they depend on each other and will never get to the best iterator.
418			self.ready.remove_subtree(&promoted);
419
420			trace!(
421				target: LOG_TARGET,
422				?tx_hash,
423				"Cycle detected, bailing."
424			);
425			return Err(error::Error::CycleDetected);
426		}
427
428		Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
429	}
430
431	/// Returns an iterator over ready transactions in the pool.
432	pub fn ready(&self) -> BestIterator<Hash, Ex> {
433		self.ready.get()
434	}
435
436	/// Returns an iterator over future transactions in the pool.
437	pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
438		self.future.all()
439	}
440
441	/// Returns pool transactions given list of hashes.
442	///
443	/// Includes both ready and future pool. For every hash in the `hashes`
444	/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
445	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
446		let ready = self.ready.by_hashes(hashes);
447		let future = self.future.by_hashes(hashes);
448
449		ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
450	}
451
452	/// Returns pool transaction by hash.
453	pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
454		self.ready.by_hash(hash)
455	}
456
457	/// Makes sure that the transactions in the queues stay within provided limits.
458	///
459	/// Removes and returns worst transactions from the queues and all transactions that depend on
460	/// them. Technically the worst transaction should be evaluated by computing the entire pending
461	/// set. We use a simplified approach to remove transactions with the lowest priority first or
462	/// those that occupy the pool for the longest time in case priority is the same.
463	pub fn enforce_limits(
464		&mut self,
465		ready: &Limit,
466		future: &Limit,
467	) -> Vec<Arc<Transaction<Hash, Ex>>> {
468		let mut removed = vec![];
469
470		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
471			// find the worst transaction
472			let worst =
473				self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
474					let transaction = &current.transaction;
475					worst
476						.map(|worst| {
477							// Here we don't use `TransactionRef`'s ordering implementation because
478							// while it prefers priority like need here, it also prefers older
479							// transactions for inclusion purposes and limit enforcement needs to
480							// prefer newer transactions instead and drop the older ones.
481							match worst.transaction.priority.cmp(&transaction.transaction.priority)
482							{
483								Ordering::Less => worst,
484								Ordering::Equal => {
485									if worst.insertion_id > transaction.insertion_id {
486										transaction.clone()
487									} else {
488										worst
489									}
490								},
491								Ordering::Greater => transaction.clone(),
492							}
493						})
494						.or_else(|| Some(transaction.clone()))
495				});
496
497			if let Some(worst) = worst {
498				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
499			} else {
500				break;
501			}
502		}
503
504		while future.is_exceeded(self.future.len(), self.future.bytes()) {
505			// find the worst transaction
506			let worst = self.future.fold(|worst, current| match worst {
507				None => Some(current.clone()),
508				Some(worst) => Some(
509					match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
510					{
511						(Some(worst_timestamp), Some(current_timestamp)) => {
512							if worst_timestamp > current_timestamp {
513								current.clone()
514							} else {
515								worst
516							}
517						},
518						_ => {
519							if worst.imported_at > current.imported_at {
520								current.clone()
521							} else {
522								worst
523							}
524						},
525					},
526				),
527			});
528
529			if let Some(worst) = worst {
530				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
531			} else {
532				break;
533			}
534		}
535
536		removed
537	}
538
539	/// Removes all transactions represented by the hashes and all other transactions
540	/// that depend on them.
541	///
542	/// Returns a list of actually removed transactions.
543	/// NOTE some transactions might still be valid, but were just removed because
544	/// they were part of a chain, you may attempt to re-import them later.
545	/// NOTE If you want to remove ready transactions that were already used,
546	/// and you don't want them to be stored in the pool use `prune_tags` method.
547	pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
548		let mut removed = self.ready.remove_subtree(hashes);
549		removed.extend(self.future.remove(hashes));
550		removed
551	}
552
553	/// Removes and returns all transactions from the future queue.
554	pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
555		self.future.clear()
556	}
557
558	/// Prunes transactions that provide given list of tags.
559	///
560	/// This will cause all transactions (both ready and future) that provide these tags to be
561	/// removed from the pool, but unlike `remove_subtree`, dependent transactions are not touched.
562	/// Additional transactions from future queue might be promoted to ready if you satisfy tags
563	/// that the pool didn't previously know about.
564	pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
565		let mut to_import = vec![];
566		let mut pruned = vec![];
567		let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
568		self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
569		recently_pruned.clear();
570
571		let tags = tags.into_iter().collect::<Vec<_>>();
572		let futures_removed = self.future.prune_tags(&tags);
573
574		for tag in tags {
575			// make sure to promote any future transactions that could be unlocked
576			to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
577			// and actually prune transactions in ready queue
578			pruned.append(&mut self.ready.prune_tags(tag.clone()));
579			// store the tags for next submission
580			recently_pruned.insert(tag);
581		}
582
583		let mut promoted = vec![];
584		let mut failed = vec![];
585		for tx in futures_removed {
586			failed.push(tx.hash.clone());
587		}
588
589		for tx in to_import {
590			let tx_hash = tx.transaction.hash.clone();
591			match self.import_to_ready(tx) {
592				Ok(res) => promoted.push(res),
593				Err(error) => {
594					warn!(
595						target: LOG_TARGET,
596						?tx_hash,
597						?error,
598						"Failed to promote during pruning."
599					);
600					failed.push(tx_hash)
601				},
602			}
603		}
604
605		PruneStatus { pruned, failed, promoted }
606	}
607
608	/// Get pool status.
609	pub fn status(&self) -> PoolStatus {
610		PoolStatus {
611			ready: self.ready.len(),
612			ready_bytes: self.ready.bytes(),
613			future: self.future.len(),
614			future_bytes: self.future.bytes(),
615		}
616	}
617}
618
619/// Queue limits
620#[derive(Debug, Clone)]
621pub struct Limit {
622	/// Maximal number of transactions in the queue.
623	pub count: usize,
624	/// Maximal size of encodings of all transactions in the queue.
625	pub total_bytes: usize,
626}
627
628impl Limit {
629	/// Returns true if any of the provided values exceeds the limit.
630	pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
631		self.count < count || self.total_bytes < bytes
632	}
633}
634
635#[cfg(test)]
636mod tests {
637	use super::*;
638
639	type Hash = u64;
640
641	fn pool() -> BasePool<Hash, Vec<u8>> {
642		BasePool::default()
643	}
644
645	fn default_tx() -> Transaction<Hash, Vec<u8>> {
646		Transaction {
647			data: vec![],
648			bytes: 1,
649			hash: 1u64,
650			priority: 5u64,
651			valid_till: 64u64,
652			requires: vec![],
653			provides: vec![],
654			propagate: true,
655			source: TimedTransactionSource::new_external(false),
656		}
657	}
658
659	#[test]
660	fn prune_for_ready_works() {
661		// given
662		let mut pool = pool();
663
664		// when
665		pool.import(Transaction {
666			data: vec![1u8].into(),
667			provides: vec![vec![2]],
668			..default_tx().clone()
669		})
670		.unwrap();
671
672		// then
673		assert_eq!(pool.ready().count(), 1);
674		assert_eq!(pool.ready.len(), 1);
675
676		let result = pool.prune_tags(vec![vec![2]]);
677		assert_eq!(pool.ready().count(), 0);
678		assert_eq!(pool.ready.len(), 0);
679		assert_eq!(result.pruned.len(), 1);
680		assert_eq!(result.failed.len(), 0);
681		assert_eq!(result.promoted.len(), 0);
682	}
683
684	#[test]
685	fn prune_for_future_works() {
686		// given
687		let mut pool = pool();
688
689		// when
690		pool.import(Transaction {
691			data: vec![1u8].into(),
692			requires: vec![vec![1]],
693			provides: vec![vec![2]],
694			hash: 0xaa,
695			..default_tx().clone()
696		})
697		.unwrap();
698
699		// then
700		assert_eq!(pool.futures().count(), 1);
701		assert_eq!(pool.future.len(), 1);
702
703		let result = pool.prune_tags(vec![vec![2]]);
704		assert_eq!(pool.ready().count(), 0);
705		assert_eq!(pool.ready.len(), 0);
706		assert_eq!(pool.futures().count(), 0);
707		assert_eq!(pool.future.len(), 0);
708
709		assert_eq!(result.pruned.len(), 0);
710		assert_eq!(result.failed.len(), 1);
711		assert_eq!(result.failed[0], 0xaa);
712		assert_eq!(result.promoted.len(), 0);
713	}
714
715	#[test]
716	fn should_import_transaction_to_ready() {
717		// given
718		let mut pool = pool();
719
720		// when
721		pool.import(Transaction {
722			data: vec![1u8].into(),
723			provides: vec![vec![1]],
724			..default_tx().clone()
725		})
726		.unwrap();
727
728		// then
729		assert_eq!(pool.ready().count(), 1);
730		assert_eq!(pool.ready.len(), 1);
731	}
732
733	#[test]
734	fn should_not_import_same_transaction_twice() {
735		// given
736		let mut pool = pool();
737
738		// when
739		pool.import(Transaction {
740			data: vec![1u8].into(),
741			provides: vec![vec![1]],
742			..default_tx().clone()
743		})
744		.unwrap();
745		pool.import(Transaction {
746			data: vec![1u8].into(),
747			provides: vec![vec![1]],
748			..default_tx().clone()
749		})
750		.unwrap_err();
751
752		// then
753		assert_eq!(pool.ready().count(), 1);
754		assert_eq!(pool.ready.len(), 1);
755	}
756
757	#[test]
758	fn should_import_transaction_to_future_and_promote_it_later() {
759		// given
760		let mut pool = pool();
761
762		// when
763		pool.import(Transaction {
764			data: vec![1u8].into(),
765			requires: vec![vec![0]],
766			provides: vec![vec![1]],
767			..default_tx().clone()
768		})
769		.unwrap();
770		assert_eq!(pool.ready().count(), 0);
771		assert_eq!(pool.ready.len(), 0);
772		pool.import(Transaction {
773			data: vec![2u8].into(),
774			hash: 2,
775			provides: vec![vec![0]],
776			..default_tx().clone()
777		})
778		.unwrap();
779
780		// then
781		assert_eq!(pool.ready().count(), 2);
782		assert_eq!(pool.ready.len(), 2);
783	}
784
785	#[test]
786	fn should_promote_a_subgraph() {
787		// given
788		let mut pool = pool();
789
790		// when
791		pool.import(Transaction {
792			data: vec![1u8].into(),
793			requires: vec![vec![0]],
794			provides: vec![vec![1]],
795			..default_tx().clone()
796		})
797		.unwrap();
798		pool.import(Transaction {
799			data: vec![3u8].into(),
800			hash: 3,
801			requires: vec![vec![2]],
802			..default_tx().clone()
803		})
804		.unwrap();
805		pool.import(Transaction {
806			data: vec![2u8].into(),
807			hash: 2,
808			requires: vec![vec![1]],
809			provides: vec![vec![3], vec![2]],
810			..default_tx().clone()
811		})
812		.unwrap();
813		pool.import(Transaction {
814			data: vec![4u8].into(),
815			hash: 4,
816			priority: 1_000u64,
817			requires: vec![vec![3], vec![4]],
818			..default_tx().clone()
819		})
820		.unwrap();
821		assert_eq!(pool.ready().count(), 0);
822		assert_eq!(pool.ready.len(), 0);
823
824		let res = pool
825			.import(Transaction {
826				data: vec![5u8].into(),
827				hash: 5,
828				provides: vec![vec![0], vec![4]],
829				..default_tx().clone()
830			})
831			.unwrap();
832
833		// then
834		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
835
836		assert_eq!(it.next(), Some(5));
837		assert_eq!(it.next(), Some(1));
838		assert_eq!(it.next(), Some(2));
839		assert_eq!(it.next(), Some(4));
840		assert_eq!(it.next(), Some(3));
841		assert_eq!(it.next(), None);
842		assert_eq!(
843			res,
844			Imported::Ready {
845				hash: 5,
846				promoted: vec![1, 2, 3, 4],
847				failed: vec![],
848				removed: vec![],
849			}
850		);
851	}
852
853	#[test]
854	fn should_remove_conflicting_future() {
855		let mut pool = pool();
856		pool.import(Transaction {
857			data: vec![3u8].into(),
858			hash: 3,
859			requires: vec![vec![1]],
860			priority: 50u64,
861			provides: vec![vec![3]],
862			..default_tx().clone()
863		})
864		.unwrap();
865		assert_eq!(pool.ready().count(), 0);
866		assert_eq!(pool.ready.len(), 0);
867
868		let tx2 = Transaction {
869			data: vec![2u8].into(),
870			hash: 2,
871			requires: vec![vec![1]],
872			provides: vec![vec![3]],
873			..default_tx().clone()
874		};
875		pool.import(tx2.clone()).unwrap();
876		assert_eq!(pool.future.len(), 2);
877
878		let res = pool
879			.import(Transaction {
880				data: vec![1u8].into(),
881				hash: 1,
882				provides: vec![vec![1]],
883				..default_tx().clone()
884			})
885			.unwrap();
886
887		assert_eq!(
888			res,
889			Imported::Ready {
890				hash: 1,
891				promoted: vec![3],
892				failed: vec![],
893				removed: vec![tx2.into()]
894			}
895		);
896
897		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
898		assert_eq!(it.next(), Some(1));
899		assert_eq!(it.next(), Some(3));
900		assert_eq!(it.next(), None);
901
902		assert_eq!(pool.future.len(), 0);
903	}
904
905	#[test]
906	fn should_handle_a_cycle() {
907		// given
908		let mut pool = pool();
909		pool.import(Transaction {
910			data: vec![1u8].into(),
911			requires: vec![vec![0]],
912			provides: vec![vec![1]],
913			..default_tx().clone()
914		})
915		.unwrap();
916		pool.import(Transaction {
917			data: vec![3u8].into(),
918			hash: 3,
919			requires: vec![vec![1]],
920			provides: vec![vec![2]],
921			..default_tx().clone()
922		})
923		.unwrap();
924		assert_eq!(pool.ready().count(), 0);
925		assert_eq!(pool.ready.len(), 0);
926
927		// when
928		let tx2 = Transaction {
929			data: vec![2u8].into(),
930			hash: 2,
931			requires: vec![vec![2]],
932			provides: vec![vec![0]],
933			..default_tx().clone()
934		};
935		pool.import(tx2.clone()).unwrap();
936
937		// then
938		{
939			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
940			assert_eq!(it.next(), None);
941		}
942		// all transactions occupy the Future queue - it's fine
943		assert_eq!(pool.future.len(), 3);
944
945		// let's close the cycle with one additional transaction
946		let res = pool
947			.import(Transaction {
948				data: vec![4u8].into(),
949				hash: 4,
950				priority: 50u64,
951				provides: vec![vec![0]],
952				..default_tx().clone()
953			})
954			.unwrap();
955		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
956		assert_eq!(it.next(), Some(4));
957		assert_eq!(it.next(), Some(1));
958		assert_eq!(it.next(), Some(3));
959		assert_eq!(it.next(), None);
960		assert_eq!(
961			res,
962			Imported::Ready {
963				hash: 4,
964				promoted: vec![1, 3],
965				failed: vec![],
966				removed: vec![tx2.into()]
967			}
968		);
969		assert_eq!(pool.future.len(), 0);
970	}
971
972	#[test]
973	fn should_handle_a_cycle_with_low_priority() {
974		// given
975		let mut pool = pool();
976		pool.import(Transaction {
977			data: vec![1u8].into(),
978			requires: vec![vec![0]],
979			provides: vec![vec![1]],
980			..default_tx().clone()
981		})
982		.unwrap();
983		pool.import(Transaction {
984			data: vec![3u8].into(),
985			hash: 3,
986			requires: vec![vec![1]],
987			provides: vec![vec![2]],
988			..default_tx().clone()
989		})
990		.unwrap();
991		assert_eq!(pool.ready().count(), 0);
992		assert_eq!(pool.ready.len(), 0);
993
994		// when
995		pool.import(Transaction {
996			data: vec![2u8].into(),
997			hash: 2,
998			requires: vec![vec![2]],
999			provides: vec![vec![0]],
1000			..default_tx().clone()
1001		})
1002		.unwrap();
1003
1004		// then
1005		{
1006			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1007			assert_eq!(it.next(), None);
1008		}
1009		// all transactions occupy the Future queue - it's fine
1010		assert_eq!(pool.future.len(), 3);
1011
1012		// let's close the cycle with one additional transaction
1013		let err = pool
1014			.import(Transaction {
1015				data: vec![4u8].into(),
1016				hash: 4,
1017				priority: 1u64, // lower priority than Tx(2)
1018				provides: vec![vec![0]],
1019				..default_tx().clone()
1020			})
1021			.unwrap_err();
1022		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1023		assert_eq!(it.next(), None);
1024		assert_eq!(pool.ready.len(), 0);
1025		assert_eq!(pool.future.len(), 0);
1026		if let error::Error::CycleDetected = err {
1027		} else {
1028			assert!(false, "Invalid error kind: {:?}", err);
1029		}
1030	}
1031
1032	#[test]
1033	fn should_remove_invalid_transactions() {
1034		// given
1035		let mut pool = pool();
1036		pool.import(Transaction {
1037			data: vec![5u8].into(),
1038			hash: 5,
1039			provides: vec![vec![0], vec![4]],
1040			..default_tx().clone()
1041		})
1042		.unwrap();
1043		pool.import(Transaction {
1044			data: vec![1u8].into(),
1045			requires: vec![vec![0]],
1046			provides: vec![vec![1]],
1047			..default_tx().clone()
1048		})
1049		.unwrap();
1050		pool.import(Transaction {
1051			data: vec![3u8].into(),
1052			hash: 3,
1053			requires: vec![vec![2]],
1054			..default_tx().clone()
1055		})
1056		.unwrap();
1057		pool.import(Transaction {
1058			data: vec![2u8].into(),
1059			hash: 2,
1060			requires: vec![vec![1]],
1061			provides: vec![vec![3], vec![2]],
1062			..default_tx().clone()
1063		})
1064		.unwrap();
1065		pool.import(Transaction {
1066			data: vec![4u8].into(),
1067			hash: 4,
1068			priority: 1_000u64,
1069			requires: vec![vec![3], vec![4]],
1070			..default_tx().clone()
1071		})
1072		.unwrap();
1073		// future
1074		pool.import(Transaction {
1075			data: vec![6u8].into(),
1076			hash: 6,
1077			priority: 1_000u64,
1078			requires: vec![vec![11]],
1079			..default_tx().clone()
1080		})
1081		.unwrap();
1082		assert_eq!(pool.ready().count(), 5);
1083		assert_eq!(pool.future.len(), 1);
1084
1085		// when
1086		pool.remove_subtree(&[6, 1]);
1087
1088		// then
1089		assert_eq!(pool.ready().count(), 1);
1090		assert_eq!(pool.future.len(), 0);
1091	}
1092
1093	#[test]
1094	fn should_prune_ready_transactions() {
1095		// given
1096		let mut pool = pool();
1097		// future (waiting for 0)
1098		pool.import(Transaction {
1099			data: vec![5u8].into(),
1100			hash: 5,
1101			requires: vec![vec![0]],
1102			provides: vec![vec![100]],
1103			..default_tx().clone()
1104		})
1105		.unwrap();
1106		// ready
1107		pool.import(Transaction {
1108			data: vec![1u8].into(),
1109			provides: vec![vec![1]],
1110			..default_tx().clone()
1111		})
1112		.unwrap();
1113		pool.import(Transaction {
1114			data: vec![2u8].into(),
1115			hash: 2,
1116			requires: vec![vec![2]],
1117			provides: vec![vec![3]],
1118			..default_tx().clone()
1119		})
1120		.unwrap();
1121		pool.import(Transaction {
1122			data: vec![3u8].into(),
1123			hash: 3,
1124			requires: vec![vec![1]],
1125			provides: vec![vec![2]],
1126			..default_tx().clone()
1127		})
1128		.unwrap();
1129		pool.import(Transaction {
1130			data: vec![4u8].into(),
1131			hash: 4,
1132			priority: 1_000u64,
1133			requires: vec![vec![3], vec![2]],
1134			provides: vec![vec![4]],
1135			..default_tx().clone()
1136		})
1137		.unwrap();
1138
1139		assert_eq!(pool.ready().count(), 4);
1140		assert_eq!(pool.future.len(), 1);
1141
1142		// when
1143		let result = pool.prune_tags(vec![vec![0], vec![2]]);
1144
1145		// then
1146		assert_eq!(result.pruned.len(), 2);
1147		assert_eq!(result.failed.len(), 0);
1148		assert_eq!(
1149			result.promoted[0],
1150			Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1151		);
1152		assert_eq!(result.promoted.len(), 1);
1153		assert_eq!(pool.future.len(), 0);
1154		assert_eq!(pool.ready.len(), 3);
1155		assert_eq!(pool.ready().count(), 3);
1156	}
1157
1158	#[test]
1159	fn transaction_debug() {
1160		assert_eq!(
1161			format!(
1162				"{:?}",
1163				Transaction {
1164					data: vec![4u8].into(),
1165					hash: 4,
1166					priority: 1_000u64,
1167					requires: vec![vec![3], vec![2]],
1168					provides: vec![vec![4]],
1169					..default_tx().clone()
1170				}
1171			),
1172			"Transaction { \
1173hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1174source: TimedTransactionSource { source: External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1175				.to_owned()
1176		);
1177	}
1178
1179	#[test]
1180	fn transaction_propagation() {
1181		assert_eq!(
1182			Transaction {
1183				data: vec![4u8].into(),
1184				hash: 4,
1185				priority: 1_000u64,
1186				requires: vec![vec![3], vec![2]],
1187				provides: vec![vec![4]],
1188				..default_tx().clone()
1189			}
1190			.is_propagable(),
1191			true
1192		);
1193
1194		assert_eq!(
1195			Transaction {
1196				data: vec![4u8].into(),
1197				hash: 4,
1198				priority: 1_000u64,
1199				requires: vec![vec![3], vec![2]],
1200				provides: vec![vec![4]],
1201				propagate: false,
1202				..default_tx().clone()
1203			}
1204			.is_propagable(),
1205			false
1206		);
1207	}
1208
1209	#[test]
1210	fn should_reject_future_transactions() {
1211		// given
1212		let mut pool = pool();
1213
1214		// when
1215		pool.reject_future_transactions = true;
1216
1217		// then
1218		let err = pool.import(Transaction {
1219			data: vec![5u8].into(),
1220			hash: 5,
1221			requires: vec![vec![0]],
1222			..default_tx().clone()
1223		});
1224
1225		if let Err(error::Error::RejectedFutureTransaction) = err {
1226		} else {
1227			assert!(false, "Invalid error kind: {:?}", err);
1228		}
1229	}
1230
1231	#[test]
1232	fn should_clear_future_queue() {
1233		// given
1234		let mut pool = pool();
1235
1236		// when
1237		pool.import(Transaction {
1238			data: vec![5u8].into(),
1239			hash: 5,
1240			requires: vec![vec![0]],
1241			..default_tx().clone()
1242		})
1243		.unwrap();
1244
1245		// then
1246		assert_eq!(pool.future.len(), 1);
1247
1248		// and then when
1249		assert_eq!(pool.clear_future().len(), 1);
1250
1251		// then
1252		assert_eq!(pool.future.len(), 0);
1253	}
1254
1255	#[test]
1256	fn should_accept_future_transactions_when_explicitly_asked_to() {
1257		// given
1258		let mut pool = pool();
1259		pool.reject_future_transactions = true;
1260
1261		// when
1262		let flag_value = pool.with_futures_enabled(|pool, flag| {
1263			pool.import(Transaction {
1264				data: vec![5u8].into(),
1265				hash: 5,
1266				requires: vec![vec![0]],
1267				..default_tx().clone()
1268			})
1269			.unwrap();
1270
1271			flag
1272		});
1273
1274		// then
1275		assert_eq!(flag_value, true);
1276		assert_eq!(pool.reject_future_transactions, true);
1277		assert_eq!(pool.future.len(), 1);
1278	}
1279}