referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
base_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! A basic version of the dependency graph.
20//!
21//! For a more full-featured pool, have a look at the `pool` module.
22
23use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
24
25use crate::LOG_TARGET;
26use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
27use serde::Serialize;
28use sp_core::hexdisplay::HexDisplay;
29use sp_runtime::{
30	traits::Member,
31	transaction_validity::{
32		TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
33		TransactionTag as Tag,
34	},
35};
36use tracing::{trace, warn};
37
38use super::{
39	future::{FutureTransactions, WaitingTransaction},
40	ready::{BestIterator, ReadyTransactions, TransactionRef},
41};
42
43/// Successful import result.
44#[derive(Debug, PartialEq, Eq)]
45pub enum Imported<Hash, Ex> {
46	/// Transaction was successfully imported to Ready queue.
47	Ready {
48		/// Hash of transaction that was successfully imported.
49		hash: Hash,
50		/// Transactions that got promoted from the Future queue.
51		promoted: Vec<Hash>,
52		/// Transactions that failed to be promoted from the Future queue and are now discarded.
53		failed: Vec<Hash>,
54		/// Transactions removed from the Ready pool (replaced).
55		removed: Vec<Arc<Transaction<Hash, Ex>>>,
56	},
57	/// Transaction was successfully imported to Future queue.
58	Future {
59		/// Hash of transaction that was successfully imported.
60		hash: Hash,
61	},
62}
63
64impl<Hash, Ex> Imported<Hash, Ex> {
65	/// Returns the hash of imported transaction.
66	pub fn hash(&self) -> &Hash {
67		use self::Imported::*;
68		match *self {
69			Ready { ref hash, .. } => hash,
70			Future { ref hash, .. } => hash,
71		}
72	}
73}
74
75/// Status of pruning the queue.
76#[derive(Debug)]
77pub struct PruneStatus<Hash, Ex> {
78	/// A list of imports that satisfying the tag triggered.
79	pub promoted: Vec<Imported<Hash, Ex>>,
80	/// A list of transactions that failed to be promoted and now are discarded.
81	pub failed: Vec<Hash>,
82	/// A list of transactions that got pruned from the ready queue.
83	pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
84}
85
86/// A transaction source that includes a timestamp indicating when the transaction was submitted.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct TimedTransactionSource {
89	/// The original source of the transaction.
90	pub source: TransactionSource,
91
92	/// The time at which the transaction was submitted.
93	pub timestamp: Option<Instant>,
94}
95
96impl From<TimedTransactionSource> for TransactionSource {
97	fn from(value: TimedTransactionSource) -> Self {
98		value.source
99	}
100}
101
102impl TimedTransactionSource {
103	/// Creates a new instance with an internal `TransactionSource::InBlock` source and an optional
104	/// timestamp.
105	pub fn new_in_block(with_timestamp: bool) -> Self {
106		Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
107	}
108	/// Creates a new instance with an internal `TransactionSource::External` source and an optional
109	/// timestamp.
110	pub fn new_external(with_timestamp: bool) -> Self {
111		Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
112	}
113	/// Creates a new instance with an internal `TransactionSource::Local` source and an optional
114	/// timestamp.
115	pub fn new_local(with_timestamp: bool) -> Self {
116		Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
117	}
118	/// Creates a new instance with an given source and an optional timestamp.
119	pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
120		Self { source, timestamp: with_timestamp.then(Instant::now) }
121	}
122}
123
124/// Immutable transaction
125#[derive(PartialEq, Eq, Clone)]
126pub struct Transaction<Hash, Extrinsic> {
127	/// Raw extrinsic representing that transaction.
128	pub data: Extrinsic,
129	/// Number of bytes encoding of the transaction requires.
130	pub bytes: usize,
131	/// Transaction hash (unique)
132	pub hash: Hash,
133	/// Transaction priority (higher = better)
134	pub priority: Priority,
135	/// At which block the transaction becomes invalid?
136	pub valid_till: Longevity,
137	/// Tags required by the transaction.
138	pub requires: Vec<Tag>,
139	/// Tags that this transaction provides.
140	pub provides: Vec<Tag>,
141	/// Should that transaction be propagated.
142	pub propagate: bool,
143	/// Timed source of that transaction.
144	pub source: TimedTransactionSource,
145}
146
147impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
148	fn as_ref(&self) -> &Extrinsic {
149		&self.data
150	}
151}
152
153impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
154	type Transaction = Extrinsic;
155	type Hash = Hash;
156
157	fn data(&self) -> &Extrinsic {
158		&self.data
159	}
160
161	fn hash(&self) -> &Hash {
162		&self.hash
163	}
164
165	fn priority(&self) -> &Priority {
166		&self.priority
167	}
168
169	fn longevity(&self) -> &Longevity {
170		&self.valid_till
171	}
172
173	fn requires(&self) -> &[Tag] {
174		&self.requires
175	}
176
177	fn provides(&self) -> &[Tag] {
178		&self.provides
179	}
180
181	fn is_propagable(&self) -> bool {
182		self.propagate
183	}
184}
185
186impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
187	/// Explicit transaction clone.
188	///
189	/// Transaction should be cloned only if absolutely necessary && we want
190	/// every reason to be commented. That's why we `Transaction` is not `Clone`,
191	/// but there's explicit `duplicate` method.
192	pub fn duplicate(&self) -> Self {
193		Self {
194			data: self.data.clone(),
195			bytes: self.bytes,
196			hash: self.hash.clone(),
197			priority: self.priority,
198			source: self.source.clone(),
199			valid_till: self.valid_till,
200			requires: self.requires.clone(),
201			provides: self.provides.clone(),
202			propagate: self.propagate,
203		}
204	}
205}
206
207impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
208where
209	Hash: fmt::Debug,
210	Extrinsic: fmt::Debug,
211{
212	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
213		let join_tags = |tags: &[Tag]| {
214			tags.iter()
215				.map(|tag| HexDisplay::from(tag).to_string())
216				.collect::<Vec<_>>()
217				.join(", ")
218		};
219
220		write!(fmt, "Transaction {{ ")?;
221		write!(fmt, "hash: {:?}, ", &self.hash)?;
222		write!(fmt, "priority: {:?}, ", &self.priority)?;
223		write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
224		write!(fmt, "bytes: {:?}, ", &self.bytes)?;
225		write!(fmt, "propagate: {:?}, ", &self.propagate)?;
226		write!(fmt, "source: {:?}, ", &self.source)?;
227		write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
228		write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
229		write!(fmt, "data: {:?}", &self.data)?;
230		write!(fmt, "}}")?;
231		Ok(())
232	}
233}
234
235/// Store last pruned tags for given number of invocations.
236const RECENTLY_PRUNED_TAGS: usize = 2;
237
238/// Transaction pool.
239///
240/// Builds a dependency graph for all transactions in the pool and returns
241/// the ones that are currently ready to be executed.
242///
243/// General note:
244/// If function returns some transactions it usually means that importing them
245/// as-is for the second time will fail or produce unwanted results.
246/// Most likely it is required to revalidate them and recompute set of
247/// required tags.
248#[derive(Clone, Debug)]
249pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
250	reject_future_transactions: bool,
251	future: FutureTransactions<Hash, Ex>,
252	ready: ReadyTransactions<Hash, Ex>,
253	/// Store recently pruned tags (for last two invocations).
254	///
255	/// This is used to make sure we don't accidentally put
256	/// transactions to future in case they were just stuck in verification.
257	recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
258	recently_pruned_index: usize,
259}
260
261impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
262	fn default() -> Self {
263		Self::new(false)
264	}
265}
266
267impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
268	/// Create new pool given reject_future_transactions flag.
269	pub fn new(reject_future_transactions: bool) -> Self {
270		Self {
271			reject_future_transactions,
272			future: Default::default(),
273			ready: Default::default(),
274			recently_pruned: Default::default(),
275			recently_pruned_index: 0,
276		}
277	}
278
279	/// Clears buffer keeping recently pruned transaction.
280	pub fn clear_recently_pruned(&mut self) {
281		self.recently_pruned = Default::default();
282		self.recently_pruned_index = 0;
283	}
284
285	/// Temporary enables future transactions, runs closure and then restores
286	/// `reject_future_transactions` flag back to previous value.
287	///
288	/// The closure accepts the mutable reference to the pool and original value
289	/// of the `reject_future_transactions` flag.
290	pub(crate) fn with_futures_enabled<T>(
291		&mut self,
292		closure: impl FnOnce(&mut Self, bool) -> T,
293	) -> T {
294		let previous = self.reject_future_transactions;
295		self.reject_future_transactions = false;
296		let return_value = closure(self, previous);
297		self.reject_future_transactions = previous;
298		return_value
299	}
300
301	/// Returns if the transaction for the given hash is already imported.
302	pub fn is_imported(&self, tx_hash: &Hash) -> bool {
303		self.future.contains(tx_hash) || self.ready.contains(tx_hash)
304	}
305
306	/// Imports transaction to the pool.
307	///
308	/// The pool consists of two parts: Future and Ready.
309	/// The former contains transactions that require some tags that are not yet provided by
310	/// other transactions in the pool.
311	/// The latter contains transactions that have all the requirements satisfied and are
312	/// ready to be included in the block.
313	pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
314		if self.is_imported(&tx.hash) {
315			return Err(error::Error::AlreadyImported(Box::new(tx.hash)))
316		}
317
318		let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
319		trace!(
320			target: LOG_TARGET,
321			tx_hash = ?tx.transaction.hash,
322			?tx,
323			set = if tx.is_ready() { "ready" } else { "future" },
324			"Importing transaction"
325		);
326
327		// If all tags are not satisfied import to future.
328		if !tx.is_ready() {
329			if self.reject_future_transactions {
330				return Err(error::Error::RejectedFutureTransaction)
331			}
332
333			let hash = tx.transaction.hash.clone();
334			self.future.import(tx);
335			return Ok(Imported::Future { hash })
336		}
337
338		self.import_to_ready(tx)
339	}
340
341	/// Imports transaction to ready queue.
342	///
343	/// NOTE the transaction has to have all requirements satisfied.
344	fn import_to_ready(
345		&mut self,
346		tx: WaitingTransaction<Hash, Ex>,
347	) -> error::Result<Imported<Hash, Ex>> {
348		let tx_hash = tx.transaction.hash.clone();
349		let mut promoted = vec![];
350		let mut failed = vec![];
351		let mut removed = vec![];
352
353		let mut first = true;
354		let mut to_import = vec![tx];
355
356		// take first transaction from the list
357		while let Some(tx) = to_import.pop() {
358			// find transactions in Future that it unlocks
359			to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
360
361			// import this transaction
362			let current_hash = tx.transaction.hash.clone();
363			let current_tx = tx.transaction.clone();
364			match self.ready.import(tx) {
365				Ok(mut replaced) => {
366					if !first {
367						promoted.push(current_hash.clone());
368					}
369					// If there were conflicting future transactions promoted, removed them from
370					// promoted set.
371					promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
372					// The transactions were removed from the ready pool. We might attempt to
373					// re-import them.
374					removed.append(&mut replaced);
375				},
376				Err(error @ error::Error::TooLowPriority { .. }) => {
377					trace!(
378						target: LOG_TARGET,
379						tx_hash = ?current_tx.hash,
380						?first,
381						%error,
382						"Error importing transaction"
383					);
384					if first {
385						return Err(error)
386					} else {
387						removed.push(current_tx);
388						promoted.retain(|hash| *hash != current_hash);
389					}
390				},
391				// transaction failed to be imported.
392				Err(error) => {
393					trace!(
394						target: LOG_TARGET,
395						tx_hash = ?current_tx.hash,
396						?error,
397						first,
398						"Error importing transaction"
399					);
400					if first {
401						return Err(error)
402					} else {
403						failed.push(current_tx.hash.clone());
404					}
405				},
406			}
407			first = false;
408		}
409
410		// An edge case when importing transaction caused
411		// some future transactions to be imported and that
412		// future transactions pushed out current transaction.
413		// This means that there is a cycle and the transactions should
414		// be moved back to future, since we can't resolve it.
415		if removed.iter().any(|tx| tx.hash == tx_hash) {
416			// We still need to remove all transactions that we promoted
417			// since they depend on each other and will never get to the best iterator.
418			self.ready.remove_subtree(&promoted);
419
420			trace!(
421				target: LOG_TARGET,
422				?tx_hash,
423				"Cycle detected, bailing."
424			);
425			return Err(error::Error::CycleDetected)
426		}
427
428		Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
429	}
430
431	/// Returns an iterator over ready transactions in the pool.
432	pub fn ready(&self) -> BestIterator<Hash, Ex> {
433		self.ready.get()
434	}
435
436	/// Returns an iterator over future transactions in the pool.
437	pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
438		self.future.all()
439	}
440
441	/// Returns pool transactions given list of hashes.
442	///
443	/// Includes both ready and future pool. For every hash in the `hashes`
444	/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
445	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
446		let ready = self.ready.by_hashes(hashes);
447		let future = self.future.by_hashes(hashes);
448
449		ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
450	}
451
452	/// Returns pool transaction by hash.
453	pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
454		self.ready.by_hash(hash)
455	}
456
457	/// Makes sure that the transactions in the queues stay within provided limits.
458	///
459	/// Removes and returns worst transactions from the queues and all transactions that depend on
460	/// them. Technically the worst transaction should be evaluated by computing the entire pending
461	/// set. We use a simplified approach to remove transactions with the lowest priority first or
462	/// those that occupy the pool for the longest time in case priority is the same.
463	pub fn enforce_limits(
464		&mut self,
465		ready: &Limit,
466		future: &Limit,
467	) -> Vec<Arc<Transaction<Hash, Ex>>> {
468		let mut removed = vec![];
469
470		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
471			// find the worst transaction
472			let worst =
473				self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
474					let transaction = &current.transaction;
475					worst
476						.map(|worst| {
477							// Here we don't use `TransactionRef`'s ordering implementation because
478							// while it prefers priority like need here, it also prefers older
479							// transactions for inclusion purposes and limit enforcement needs to
480							// prefer newer transactions instead and drop the older ones.
481							match worst.transaction.priority.cmp(&transaction.transaction.priority)
482							{
483								Ordering::Less => worst,
484								Ordering::Equal =>
485									if worst.insertion_id > transaction.insertion_id {
486										transaction.clone()
487									} else {
488										worst
489									},
490								Ordering::Greater => transaction.clone(),
491							}
492						})
493						.or_else(|| Some(transaction.clone()))
494				});
495
496			if let Some(worst) = worst {
497				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
498			} else {
499				break
500			}
501		}
502
503		while future.is_exceeded(self.future.len(), self.future.bytes()) {
504			// find the worst transaction
505			let worst = self.future.fold(|worst, current| match worst {
506				None => Some(current.clone()),
507				Some(worst) => Some(
508					match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
509					{
510						(Some(worst_timestamp), Some(current_timestamp)) => {
511							if worst_timestamp > current_timestamp {
512								current.clone()
513							} else {
514								worst
515							}
516						},
517						_ =>
518							if worst.imported_at > current.imported_at {
519								current.clone()
520							} else {
521								worst
522							},
523					},
524				),
525			});
526
527			if let Some(worst) = worst {
528				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
529			} else {
530				break
531			}
532		}
533
534		removed
535	}
536
537	/// Removes all transactions represented by the hashes and all other transactions
538	/// that depend on them.
539	///
540	/// Returns a list of actually removed transactions.
541	/// NOTE some transactions might still be valid, but were just removed because
542	/// they were part of a chain, you may attempt to re-import them later.
543	/// NOTE If you want to remove ready transactions that were already used,
544	/// and you don't want them to be stored in the pool use `prune_tags` method.
545	pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
546		let mut removed = self.ready.remove_subtree(hashes);
547		removed.extend(self.future.remove(hashes));
548		removed
549	}
550
551	/// Removes and returns all transactions from the future queue.
552	pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
553		self.future.clear()
554	}
555
556	/// Prunes transactions that provide given list of tags.
557	///
558	/// This will cause all transactions (both ready and future) that provide these tags to be
559	/// removed from the pool, but unlike `remove_subtree`, dependent transactions are not touched.
560	/// Additional transactions from future queue might be promoted to ready if you satisfy tags
561	/// that the pool didn't previously know about.
562	pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
563		let mut to_import = vec![];
564		let mut pruned = vec![];
565		let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
566		self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
567		recently_pruned.clear();
568
569		let tags = tags.into_iter().collect::<Vec<_>>();
570		let futures_removed = self.future.prune_tags(&tags);
571
572		for tag in tags {
573			// make sure to promote any future transactions that could be unlocked
574			to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
575			// and actually prune transactions in ready queue
576			pruned.append(&mut self.ready.prune_tags(tag.clone()));
577			// store the tags for next submission
578			recently_pruned.insert(tag);
579		}
580
581		let mut promoted = vec![];
582		let mut failed = vec![];
583		for tx in futures_removed {
584			failed.push(tx.hash.clone());
585		}
586
587		for tx in to_import {
588			let tx_hash = tx.transaction.hash.clone();
589			match self.import_to_ready(tx) {
590				Ok(res) => promoted.push(res),
591				Err(error) => {
592					warn!(
593						target: LOG_TARGET,
594						?tx_hash,
595						?error,
596						"Failed to promote during pruning."
597					);
598					failed.push(tx_hash)
599				},
600			}
601		}
602
603		PruneStatus { pruned, failed, promoted }
604	}
605
606	/// Get pool status.
607	pub fn status(&self) -> PoolStatus {
608		PoolStatus {
609			ready: self.ready.len(),
610			ready_bytes: self.ready.bytes(),
611			future: self.future.len(),
612			future_bytes: self.future.bytes(),
613		}
614	}
615}
616
617/// Queue limits
618#[derive(Debug, Clone)]
619pub struct Limit {
620	/// Maximal number of transactions in the queue.
621	pub count: usize,
622	/// Maximal size of encodings of all transactions in the queue.
623	pub total_bytes: usize,
624}
625
626impl Limit {
627	/// Returns true if any of the provided values exceeds the limit.
628	pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
629		self.count < count || self.total_bytes < bytes
630	}
631}
632
633#[cfg(test)]
634mod tests {
635	use super::*;
636
637	type Hash = u64;
638
639	fn pool() -> BasePool<Hash, Vec<u8>> {
640		BasePool::default()
641	}
642
643	fn default_tx() -> Transaction<Hash, Vec<u8>> {
644		Transaction {
645			data: vec![],
646			bytes: 1,
647			hash: 1u64,
648			priority: 5u64,
649			valid_till: 64u64,
650			requires: vec![],
651			provides: vec![],
652			propagate: true,
653			source: TimedTransactionSource::new_external(false),
654		}
655	}
656
657	#[test]
658	fn prune_for_ready_works() {
659		// given
660		let mut pool = pool();
661
662		// when
663		pool.import(Transaction {
664			data: vec![1u8].into(),
665			provides: vec![vec![2]],
666			..default_tx().clone()
667		})
668		.unwrap();
669
670		// then
671		assert_eq!(pool.ready().count(), 1);
672		assert_eq!(pool.ready.len(), 1);
673
674		let result = pool.prune_tags(vec![vec![2]]);
675		assert_eq!(pool.ready().count(), 0);
676		assert_eq!(pool.ready.len(), 0);
677		assert_eq!(result.pruned.len(), 1);
678		assert_eq!(result.failed.len(), 0);
679		assert_eq!(result.promoted.len(), 0);
680	}
681
682	#[test]
683	fn prune_for_future_works() {
684		// given
685		let mut pool = pool();
686
687		// when
688		pool.import(Transaction {
689			data: vec![1u8].into(),
690			requires: vec![vec![1]],
691			provides: vec![vec![2]],
692			hash: 0xaa,
693			..default_tx().clone()
694		})
695		.unwrap();
696
697		// then
698		assert_eq!(pool.futures().count(), 1);
699		assert_eq!(pool.future.len(), 1);
700
701		let result = pool.prune_tags(vec![vec![2]]);
702		assert_eq!(pool.ready().count(), 0);
703		assert_eq!(pool.ready.len(), 0);
704		assert_eq!(pool.futures().count(), 0);
705		assert_eq!(pool.future.len(), 0);
706
707		assert_eq!(result.pruned.len(), 0);
708		assert_eq!(result.failed.len(), 1);
709		assert_eq!(result.failed[0], 0xaa);
710		assert_eq!(result.promoted.len(), 0);
711	}
712
713	#[test]
714	fn should_import_transaction_to_ready() {
715		// given
716		let mut pool = pool();
717
718		// when
719		pool.import(Transaction {
720			data: vec![1u8].into(),
721			provides: vec![vec![1]],
722			..default_tx().clone()
723		})
724		.unwrap();
725
726		// then
727		assert_eq!(pool.ready().count(), 1);
728		assert_eq!(pool.ready.len(), 1);
729	}
730
731	#[test]
732	fn should_not_import_same_transaction_twice() {
733		// given
734		let mut pool = pool();
735
736		// when
737		pool.import(Transaction {
738			data: vec![1u8].into(),
739			provides: vec![vec![1]],
740			..default_tx().clone()
741		})
742		.unwrap();
743		pool.import(Transaction {
744			data: vec![1u8].into(),
745			provides: vec![vec![1]],
746			..default_tx().clone()
747		})
748		.unwrap_err();
749
750		// then
751		assert_eq!(pool.ready().count(), 1);
752		assert_eq!(pool.ready.len(), 1);
753	}
754
755	#[test]
756	fn should_import_transaction_to_future_and_promote_it_later() {
757		// given
758		let mut pool = pool();
759
760		// when
761		pool.import(Transaction {
762			data: vec![1u8].into(),
763			requires: vec![vec![0]],
764			provides: vec![vec![1]],
765			..default_tx().clone()
766		})
767		.unwrap();
768		assert_eq!(pool.ready().count(), 0);
769		assert_eq!(pool.ready.len(), 0);
770		pool.import(Transaction {
771			data: vec![2u8].into(),
772			hash: 2,
773			provides: vec![vec![0]],
774			..default_tx().clone()
775		})
776		.unwrap();
777
778		// then
779		assert_eq!(pool.ready().count(), 2);
780		assert_eq!(pool.ready.len(), 2);
781	}
782
783	#[test]
784	fn should_promote_a_subgraph() {
785		// given
786		let mut pool = pool();
787
788		// when
789		pool.import(Transaction {
790			data: vec![1u8].into(),
791			requires: vec![vec![0]],
792			provides: vec![vec![1]],
793			..default_tx().clone()
794		})
795		.unwrap();
796		pool.import(Transaction {
797			data: vec![3u8].into(),
798			hash: 3,
799			requires: vec![vec![2]],
800			..default_tx().clone()
801		})
802		.unwrap();
803		pool.import(Transaction {
804			data: vec![2u8].into(),
805			hash: 2,
806			requires: vec![vec![1]],
807			provides: vec![vec![3], vec![2]],
808			..default_tx().clone()
809		})
810		.unwrap();
811		pool.import(Transaction {
812			data: vec![4u8].into(),
813			hash: 4,
814			priority: 1_000u64,
815			requires: vec![vec![3], vec![4]],
816			..default_tx().clone()
817		})
818		.unwrap();
819		assert_eq!(pool.ready().count(), 0);
820		assert_eq!(pool.ready.len(), 0);
821
822		let res = pool
823			.import(Transaction {
824				data: vec![5u8].into(),
825				hash: 5,
826				provides: vec![vec![0], vec![4]],
827				..default_tx().clone()
828			})
829			.unwrap();
830
831		// then
832		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
833
834		assert_eq!(it.next(), Some(5));
835		assert_eq!(it.next(), Some(1));
836		assert_eq!(it.next(), Some(2));
837		assert_eq!(it.next(), Some(4));
838		assert_eq!(it.next(), Some(3));
839		assert_eq!(it.next(), None);
840		assert_eq!(
841			res,
842			Imported::Ready {
843				hash: 5,
844				promoted: vec![1, 2, 3, 4],
845				failed: vec![],
846				removed: vec![],
847			}
848		);
849	}
850
851	#[test]
852	fn should_remove_conflicting_future() {
853		let mut pool = pool();
854		pool.import(Transaction {
855			data: vec![3u8].into(),
856			hash: 3,
857			requires: vec![vec![1]],
858			priority: 50u64,
859			provides: vec![vec![3]],
860			..default_tx().clone()
861		})
862		.unwrap();
863		assert_eq!(pool.ready().count(), 0);
864		assert_eq!(pool.ready.len(), 0);
865
866		let tx2 = Transaction {
867			data: vec![2u8].into(),
868			hash: 2,
869			requires: vec![vec![1]],
870			provides: vec![vec![3]],
871			..default_tx().clone()
872		};
873		pool.import(tx2.clone()).unwrap();
874		assert_eq!(pool.future.len(), 2);
875
876		let res = pool
877			.import(Transaction {
878				data: vec![1u8].into(),
879				hash: 1,
880				provides: vec![vec![1]],
881				..default_tx().clone()
882			})
883			.unwrap();
884
885		assert_eq!(
886			res,
887			Imported::Ready {
888				hash: 1,
889				promoted: vec![3],
890				failed: vec![],
891				removed: vec![tx2.into()]
892			}
893		);
894
895		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
896		assert_eq!(it.next(), Some(1));
897		assert_eq!(it.next(), Some(3));
898		assert_eq!(it.next(), None);
899
900		assert_eq!(pool.future.len(), 0);
901	}
902
903	#[test]
904	fn should_handle_a_cycle() {
905		// given
906		let mut pool = pool();
907		pool.import(Transaction {
908			data: vec![1u8].into(),
909			requires: vec![vec![0]],
910			provides: vec![vec![1]],
911			..default_tx().clone()
912		})
913		.unwrap();
914		pool.import(Transaction {
915			data: vec![3u8].into(),
916			hash: 3,
917			requires: vec![vec![1]],
918			provides: vec![vec![2]],
919			..default_tx().clone()
920		})
921		.unwrap();
922		assert_eq!(pool.ready().count(), 0);
923		assert_eq!(pool.ready.len(), 0);
924
925		// when
926		let tx2 = Transaction {
927			data: vec![2u8].into(),
928			hash: 2,
929			requires: vec![vec![2]],
930			provides: vec![vec![0]],
931			..default_tx().clone()
932		};
933		pool.import(tx2.clone()).unwrap();
934
935		// then
936		{
937			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
938			assert_eq!(it.next(), None);
939		}
940		// all transactions occupy the Future queue - it's fine
941		assert_eq!(pool.future.len(), 3);
942
943		// let's close the cycle with one additional transaction
944		let res = pool
945			.import(Transaction {
946				data: vec![4u8].into(),
947				hash: 4,
948				priority: 50u64,
949				provides: vec![vec![0]],
950				..default_tx().clone()
951			})
952			.unwrap();
953		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
954		assert_eq!(it.next(), Some(4));
955		assert_eq!(it.next(), Some(1));
956		assert_eq!(it.next(), Some(3));
957		assert_eq!(it.next(), None);
958		assert_eq!(
959			res,
960			Imported::Ready {
961				hash: 4,
962				promoted: vec![1, 3],
963				failed: vec![],
964				removed: vec![tx2.into()]
965			}
966		);
967		assert_eq!(pool.future.len(), 0);
968	}
969
970	#[test]
971	fn should_handle_a_cycle_with_low_priority() {
972		// given
973		let mut pool = pool();
974		pool.import(Transaction {
975			data: vec![1u8].into(),
976			requires: vec![vec![0]],
977			provides: vec![vec![1]],
978			..default_tx().clone()
979		})
980		.unwrap();
981		pool.import(Transaction {
982			data: vec![3u8].into(),
983			hash: 3,
984			requires: vec![vec![1]],
985			provides: vec![vec![2]],
986			..default_tx().clone()
987		})
988		.unwrap();
989		assert_eq!(pool.ready().count(), 0);
990		assert_eq!(pool.ready.len(), 0);
991
992		// when
993		pool.import(Transaction {
994			data: vec![2u8].into(),
995			hash: 2,
996			requires: vec![vec![2]],
997			provides: vec![vec![0]],
998			..default_tx().clone()
999		})
1000		.unwrap();
1001
1002		// then
1003		{
1004			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1005			assert_eq!(it.next(), None);
1006		}
1007		// all transactions occupy the Future queue - it's fine
1008		assert_eq!(pool.future.len(), 3);
1009
1010		// let's close the cycle with one additional transaction
1011		let err = pool
1012			.import(Transaction {
1013				data: vec![4u8].into(),
1014				hash: 4,
1015				priority: 1u64, // lower priority than Tx(2)
1016				provides: vec![vec![0]],
1017				..default_tx().clone()
1018			})
1019			.unwrap_err();
1020		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1021		assert_eq!(it.next(), None);
1022		assert_eq!(pool.ready.len(), 0);
1023		assert_eq!(pool.future.len(), 0);
1024		if let error::Error::CycleDetected = err {
1025		} else {
1026			assert!(false, "Invalid error kind: {:?}", err);
1027		}
1028	}
1029
1030	#[test]
1031	fn should_remove_invalid_transactions() {
1032		// given
1033		let mut pool = pool();
1034		pool.import(Transaction {
1035			data: vec![5u8].into(),
1036			hash: 5,
1037			provides: vec![vec![0], vec![4]],
1038			..default_tx().clone()
1039		})
1040		.unwrap();
1041		pool.import(Transaction {
1042			data: vec![1u8].into(),
1043			requires: vec![vec![0]],
1044			provides: vec![vec![1]],
1045			..default_tx().clone()
1046		})
1047		.unwrap();
1048		pool.import(Transaction {
1049			data: vec![3u8].into(),
1050			hash: 3,
1051			requires: vec![vec![2]],
1052			..default_tx().clone()
1053		})
1054		.unwrap();
1055		pool.import(Transaction {
1056			data: vec![2u8].into(),
1057			hash: 2,
1058			requires: vec![vec![1]],
1059			provides: vec![vec![3], vec![2]],
1060			..default_tx().clone()
1061		})
1062		.unwrap();
1063		pool.import(Transaction {
1064			data: vec![4u8].into(),
1065			hash: 4,
1066			priority: 1_000u64,
1067			requires: vec![vec![3], vec![4]],
1068			..default_tx().clone()
1069		})
1070		.unwrap();
1071		// future
1072		pool.import(Transaction {
1073			data: vec![6u8].into(),
1074			hash: 6,
1075			priority: 1_000u64,
1076			requires: vec![vec![11]],
1077			..default_tx().clone()
1078		})
1079		.unwrap();
1080		assert_eq!(pool.ready().count(), 5);
1081		assert_eq!(pool.future.len(), 1);
1082
1083		// when
1084		pool.remove_subtree(&[6, 1]);
1085
1086		// then
1087		assert_eq!(pool.ready().count(), 1);
1088		assert_eq!(pool.future.len(), 0);
1089	}
1090
1091	#[test]
1092	fn should_prune_ready_transactions() {
1093		// given
1094		let mut pool = pool();
1095		// future (waiting for 0)
1096		pool.import(Transaction {
1097			data: vec![5u8].into(),
1098			hash: 5,
1099			requires: vec![vec![0]],
1100			provides: vec![vec![100]],
1101			..default_tx().clone()
1102		})
1103		.unwrap();
1104		// ready
1105		pool.import(Transaction {
1106			data: vec![1u8].into(),
1107			provides: vec![vec![1]],
1108			..default_tx().clone()
1109		})
1110		.unwrap();
1111		pool.import(Transaction {
1112			data: vec![2u8].into(),
1113			hash: 2,
1114			requires: vec![vec![2]],
1115			provides: vec![vec![3]],
1116			..default_tx().clone()
1117		})
1118		.unwrap();
1119		pool.import(Transaction {
1120			data: vec![3u8].into(),
1121			hash: 3,
1122			requires: vec![vec![1]],
1123			provides: vec![vec![2]],
1124			..default_tx().clone()
1125		})
1126		.unwrap();
1127		pool.import(Transaction {
1128			data: vec![4u8].into(),
1129			hash: 4,
1130			priority: 1_000u64,
1131			requires: vec![vec![3], vec![2]],
1132			provides: vec![vec![4]],
1133			..default_tx().clone()
1134		})
1135		.unwrap();
1136
1137		assert_eq!(pool.ready().count(), 4);
1138		assert_eq!(pool.future.len(), 1);
1139
1140		// when
1141		let result = pool.prune_tags(vec![vec![0], vec![2]]);
1142
1143		// then
1144		assert_eq!(result.pruned.len(), 2);
1145		assert_eq!(result.failed.len(), 0);
1146		assert_eq!(
1147			result.promoted[0],
1148			Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1149		);
1150		assert_eq!(result.promoted.len(), 1);
1151		assert_eq!(pool.future.len(), 0);
1152		assert_eq!(pool.ready.len(), 3);
1153		assert_eq!(pool.ready().count(), 3);
1154	}
1155
1156	#[test]
1157	fn transaction_debug() {
1158		assert_eq!(
1159			format!(
1160				"{:?}",
1161				Transaction {
1162					data: vec![4u8].into(),
1163					hash: 4,
1164					priority: 1_000u64,
1165					requires: vec![vec![3], vec![2]],
1166					provides: vec![vec![4]],
1167					..default_tx().clone()
1168				}
1169			),
1170			"Transaction { \
1171hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1172source: TimedTransactionSource { source: TransactionSource::External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1173				.to_owned()
1174		);
1175	}
1176
1177	#[test]
1178	fn transaction_propagation() {
1179		assert_eq!(
1180			Transaction {
1181				data: vec![4u8].into(),
1182				hash: 4,
1183				priority: 1_000u64,
1184				requires: vec![vec![3], vec![2]],
1185				provides: vec![vec![4]],
1186				..default_tx().clone()
1187			}
1188			.is_propagable(),
1189			true
1190		);
1191
1192		assert_eq!(
1193			Transaction {
1194				data: vec![4u8].into(),
1195				hash: 4,
1196				priority: 1_000u64,
1197				requires: vec![vec![3], vec![2]],
1198				provides: vec![vec![4]],
1199				propagate: false,
1200				..default_tx().clone()
1201			}
1202			.is_propagable(),
1203			false
1204		);
1205	}
1206
1207	#[test]
1208	fn should_reject_future_transactions() {
1209		// given
1210		let mut pool = pool();
1211
1212		// when
1213		pool.reject_future_transactions = true;
1214
1215		// then
1216		let err = pool.import(Transaction {
1217			data: vec![5u8].into(),
1218			hash: 5,
1219			requires: vec![vec![0]],
1220			..default_tx().clone()
1221		});
1222
1223		if let Err(error::Error::RejectedFutureTransaction) = err {
1224		} else {
1225			assert!(false, "Invalid error kind: {:?}", err);
1226		}
1227	}
1228
1229	#[test]
1230	fn should_clear_future_queue() {
1231		// given
1232		let mut pool = pool();
1233
1234		// when
1235		pool.import(Transaction {
1236			data: vec![5u8].into(),
1237			hash: 5,
1238			requires: vec![vec![0]],
1239			..default_tx().clone()
1240		})
1241		.unwrap();
1242
1243		// then
1244		assert_eq!(pool.future.len(), 1);
1245
1246		// and then when
1247		assert_eq!(pool.clear_future().len(), 1);
1248
1249		// then
1250		assert_eq!(pool.future.len(), 0);
1251	}
1252
1253	#[test]
1254	fn should_accept_future_transactions_when_explicitly_asked_to() {
1255		// given
1256		let mut pool = pool();
1257		pool.reject_future_transactions = true;
1258
1259		// when
1260		let flag_value = pool.with_futures_enabled(|pool, flag| {
1261			pool.import(Transaction {
1262				data: vec![5u8].into(),
1263				hash: 5,
1264				requires: vec![vec![0]],
1265				..default_tx().clone()
1266			})
1267			.unwrap();
1268
1269			flag
1270		});
1271
1272		// then
1273		assert_eq!(flag_value, true);
1274		assert_eq!(pool.reject_future_transactions, true);
1275		assert_eq!(pool.future.len(), 1);
1276	}
1277}