sc_transaction_pool/graph/
validated_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	collections::{HashMap, HashSet},
21	hash,
22	sync::Arc,
23};
24
25use crate::LOG_TARGET;
26use futures::channel::mpsc::{channel, Sender};
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
29use serde::Serialize;
30use sp_runtime::{
31	generic::BlockId,
32	traits::{self, SaturatedConversion},
33	transaction_validity::{TransactionSource, TransactionTag as Tag, ValidTransaction},
34};
35use std::time::Instant;
36
37use super::{
38	base_pool::{self as base, PruneStatus},
39	listener::Listener,
40	pool::{
41		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
42	},
43	rotator::PoolRotator,
44	watcher::Watcher,
45};
46
47/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
48#[derive(Debug)]
49pub enum ValidatedTransaction<Hash, Ex, Error> {
50	/// Transaction that has been validated successfully.
51	Valid(base::Transaction<Hash, Ex>),
52	/// Transaction that is invalid.
53	Invalid(Hash, Error),
54	/// Transaction which validity can't be determined.
55	///
56	/// We're notifying watchers about failure, if 'unknown' transaction is submitted.
57	Unknown(Hash, Error),
58}
59
60impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
61	/// Consume validity result, transaction data and produce ValidTransaction.
62	pub fn valid_at(
63		at: u64,
64		hash: Hash,
65		source: TransactionSource,
66		data: Ex,
67		bytes: usize,
68		validity: ValidTransaction,
69	) -> Self {
70		Self::Valid(base::Transaction {
71			data,
72			bytes,
73			hash,
74			source,
75			priority: validity.priority,
76			requires: validity.requires,
77			provides: validity.provides,
78			propagate: validity.propagate,
79			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
80		})
81	}
82}
83
84/// A type of validated transaction stored in the pool.
85pub type ValidatedTransactionFor<B> =
86	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
87
88/// A closure that returns true if the local node is a validator that can author blocks.
89pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
90
91impl From<bool> for IsValidator {
92	fn from(is_validator: bool) -> Self {
93		Self(Box::new(move || is_validator))
94	}
95}
96
97impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
98	fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
99		Self(is_validator)
100	}
101}
102
103/// Pool that deals with validated transactions.
104pub struct ValidatedPool<B: ChainApi> {
105	api: Arc<B>,
106	is_validator: IsValidator,
107	options: Options,
108	listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
109	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
110	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
111	rotator: PoolRotator<ExtrinsicHash<B>>,
112}
113
114impl<B: ChainApi> ValidatedPool<B> {
115	/// Create a new transaction pool.
116	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
117		let base_pool = base::BasePool::new(options.reject_future_transactions);
118		let ban_time = options.ban_time;
119		Self {
120			is_validator,
121			options,
122			listener: Default::default(),
123			api,
124			pool: RwLock::new(base_pool),
125			import_notification_sinks: Default::default(),
126			rotator: PoolRotator::new(ban_time),
127		}
128	}
129
130	/// Bans given set of hashes.
131	pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
132		self.rotator.ban(now, hashes)
133	}
134
135	/// Returns true if transaction with given hash is currently banned from the pool.
136	pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
137		self.rotator.is_banned(hash)
138	}
139
140	/// A fast check before doing any further processing of a transaction, like validation.
141	///
142	/// If `ignore_banned` is `true`, it will not check if the transaction is banned.
143	///
144	/// It checks if the transaction is already imported or banned. If so, it returns an error.
145	pub fn check_is_known(
146		&self,
147		tx_hash: &ExtrinsicHash<B>,
148		ignore_banned: bool,
149	) -> Result<(), B::Error> {
150		if !ignore_banned && self.is_banned(tx_hash) {
151			Err(error::Error::TemporarilyBanned.into())
152		} else if self.pool.read().is_imported(tx_hash) {
153			Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
154		} else {
155			Ok(())
156		}
157	}
158
159	/// Imports a bunch of pre-validated transactions to the pool.
160	pub fn submit(
161		&self,
162		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
163	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
164		let results = txs
165			.into_iter()
166			.map(|validated_tx| self.submit_one(validated_tx))
167			.collect::<Vec<_>>();
168
169		// only enforce limits if there is at least one imported transaction
170		let removed = if results.iter().any(|res| res.is_ok()) {
171			self.enforce_limits()
172		} else {
173			Default::default()
174		};
175
176		results
177			.into_iter()
178			.map(|res| match res {
179				Ok(ref hash) if removed.contains(hash) =>
180					Err(error::Error::ImmediatelyDropped.into()),
181				other => other,
182			})
183			.collect()
184	}
185
186	/// Submit single pre-validated transaction to the pool.
187	fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
188		match tx {
189			ValidatedTransaction::Valid(tx) => {
190				if !tx.propagate && !(self.is_validator.0)() {
191					return Err(error::Error::Unactionable.into())
192				}
193
194				let imported = self.pool.write().import(tx)?;
195
196				if let base::Imported::Ready { ref hash, .. } = imported {
197					let sinks = &mut self.import_notification_sinks.lock();
198					sinks.retain_mut(|sink| match sink.try_send(*hash) {
199						Ok(()) => true,
200						Err(e) =>
201							if e.is_full() {
202								log::warn!(
203									target: LOG_TARGET,
204									"[{:?}] Trying to notify an import but the channel is full",
205									hash,
206								);
207								true
208							} else {
209								false
210							},
211					});
212				}
213
214				let mut listener = self.listener.write();
215				fire_events(&mut *listener, &imported);
216				Ok(*imported.hash())
217			},
218			ValidatedTransaction::Invalid(hash, err) => {
219				self.rotator.ban(&Instant::now(), std::iter::once(hash));
220				Err(err)
221			},
222			ValidatedTransaction::Unknown(hash, err) => {
223				self.listener.write().invalid(&hash);
224				Err(err)
225			},
226		}
227	}
228
229	fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
230		let status = self.pool.read().status();
231		let ready_limit = &self.options.ready;
232		let future_limit = &self.options.future;
233
234		log::debug!(target: LOG_TARGET, "Pool Status: {:?}", status);
235		if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
236			future_limit.is_exceeded(status.future, status.future_bytes)
237		{
238			log::debug!(
239				target: LOG_TARGET,
240				"Enforcing limits ({}/{}kB ready, {}/{}kB future",
241				ready_limit.count,
242				ready_limit.total_bytes / 1024,
243				future_limit.count,
244				future_limit.total_bytes / 1024,
245			);
246
247			// clean up the pool
248			let removed = {
249				let mut pool = self.pool.write();
250				let removed = pool
251					.enforce_limits(ready_limit, future_limit)
252					.into_iter()
253					.map(|x| x.hash)
254					.collect::<HashSet<_>>();
255				// ban all removed transactions
256				self.rotator.ban(&Instant::now(), removed.iter().copied());
257				removed
258			};
259			if !removed.is_empty() {
260				log::debug!(target: LOG_TARGET, "Enforcing limits: {} dropped", removed.len());
261			}
262
263			// run notifications
264			let mut listener = self.listener.write();
265			for h in &removed {
266				listener.dropped(h, None);
267			}
268
269			removed
270		} else {
271			Default::default()
272		}
273	}
274
275	/// Import a single extrinsic and starts to watch their progress in the pool.
276	pub fn submit_and_watch(
277		&self,
278		tx: ValidatedTransactionFor<B>,
279	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
280		match tx {
281			ValidatedTransaction::Valid(tx) => {
282				let hash = self.api.hash_and_length(&tx.data).0;
283				let watcher = self.listener.write().create_watcher(hash);
284				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
285					.pop()
286					.expect("One extrinsic passed; one result returned; qed")
287					.map(|_| watcher)
288			},
289			ValidatedTransaction::Invalid(hash, err) => {
290				self.rotator.ban(&Instant::now(), std::iter::once(hash));
291				Err(err)
292			},
293			ValidatedTransaction::Unknown(_, err) => Err(err),
294		}
295	}
296
297	/// Resubmits revalidated transactions back to the pool.
298	///
299	/// Removes and then submits passed transactions and all dependent transactions.
300	/// Transactions that are missing from the pool are not submitted.
301	pub fn resubmit(
302		&self,
303		mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
304	) {
305		#[derive(Debug, Clone, Copy, PartialEq)]
306		enum Status {
307			Future,
308			Ready,
309			Failed,
310			Dropped,
311		}
312
313		let (mut initial_statuses, final_statuses) = {
314			let mut pool = self.pool.write();
315
316			// remove all passed transactions from the ready/future queues
317			// (this may remove additional transactions as well)
318			//
319			// for every transaction that has an entry in the `updated_transactions`,
320			// we store updated validation result in txs_to_resubmit
321			// for every transaction that has no entry in the `updated_transactions`,
322			// we store last validation result (i.e. the pool entry) in txs_to_resubmit
323			let mut initial_statuses = HashMap::new();
324			let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
325			while !updated_transactions.is_empty() {
326				let hash = updated_transactions
327					.keys()
328					.next()
329					.cloned()
330					.expect("transactions is not empty; qed");
331
332				// note we are not considering tx with hash invalid here - we just want
333				// to remove it along with dependent transactions and `remove_subtree()`
334				// does exactly what we need
335				let removed = pool.remove_subtree(&[hash]);
336				for removed_tx in removed {
337					let removed_hash = removed_tx.hash;
338					let updated_transaction = updated_transactions.remove(&removed_hash);
339					let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
340						updated_tx
341					} else {
342						// in most cases we'll end up in successful `try_unwrap`, but if not
343						// we still need to reinsert transaction back to the pool => duplicate call
344						let transaction = match Arc::try_unwrap(removed_tx) {
345							Ok(transaction) => transaction,
346							Err(transaction) => transaction.duplicate(),
347						};
348						ValidatedTransaction::Valid(transaction)
349					};
350
351					initial_statuses.insert(removed_hash, Status::Ready);
352					txs_to_resubmit.push((removed_hash, tx_to_resubmit));
353				}
354				// make sure to remove the hash even if it's not present in the pool any more.
355				updated_transactions.remove(&hash);
356			}
357
358			// if we're rejecting future transactions, then insertion order matters here:
359			// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
360			// to the future queue and gets rejected immediately
361			// => let's temporary stop rejection and clear future queue before return
362			pool.with_futures_enabled(|pool, reject_future_transactions| {
363				// now resubmit all removed transactions back to the pool
364				let mut final_statuses = HashMap::new();
365				for (hash, tx_to_resubmit) in txs_to_resubmit {
366					match tx_to_resubmit {
367						ValidatedTransaction::Valid(tx) => match pool.import(tx) {
368							Ok(imported) => match imported {
369								base::Imported::Ready { promoted, failed, removed, .. } => {
370									final_statuses.insert(hash, Status::Ready);
371									for hash in promoted {
372										final_statuses.insert(hash, Status::Ready);
373									}
374									for hash in failed {
375										final_statuses.insert(hash, Status::Failed);
376									}
377									for tx in removed {
378										final_statuses.insert(tx.hash, Status::Dropped);
379									}
380								},
381								base::Imported::Future { .. } => {
382									final_statuses.insert(hash, Status::Future);
383								},
384							},
385							Err(err) => {
386								// we do not want to fail if single transaction import has failed
387								// nor we do want to propagate this error, because it could tx
388								// unknown to caller => let's just notify listeners (and issue debug
389								// message)
390								log::warn!(
391									target: LOG_TARGET,
392									"[{:?}] Removing invalid transaction from update: {}",
393									hash,
394									err,
395								);
396								final_statuses.insert(hash, Status::Failed);
397							},
398						},
399						ValidatedTransaction::Invalid(_, _) |
400						ValidatedTransaction::Unknown(_, _) => {
401							final_statuses.insert(hash, Status::Failed);
402						},
403					}
404				}
405
406				// if the pool is configured to reject future transactions, let's clear the future
407				// queue, updating final statuses as required
408				if reject_future_transactions {
409					for future_tx in pool.clear_future() {
410						final_statuses.insert(future_tx.hash, Status::Dropped);
411					}
412				}
413
414				(initial_statuses, final_statuses)
415			})
416		};
417
418		// and now let's notify listeners about status changes
419		let mut listener = self.listener.write();
420		for (hash, final_status) in final_statuses {
421			let initial_status = initial_statuses.remove(&hash);
422			if initial_status.is_none() || Some(final_status) != initial_status {
423				match final_status {
424					Status::Future => listener.future(&hash),
425					Status::Ready => listener.ready(&hash, None),
426					Status::Dropped => listener.dropped(&hash, None),
427					Status::Failed => listener.invalid(&hash),
428				}
429			}
430		}
431	}
432
433	/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
434	pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
435		self.pool
436			.read()
437			.by_hashes(hashes)
438			.into_iter()
439			.map(|existing_in_pool| {
440				existing_in_pool.map(|transaction| transaction.provides.to_vec())
441			})
442			.collect()
443	}
444
445	/// Get ready transaction by hash
446	pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
447		self.pool.read().ready_by_hash(hash)
448	}
449
450	/// Prunes ready transactions that provide given list of tags.
451	pub fn prune_tags(
452		&self,
453		tags: impl IntoIterator<Item = Tag>,
454	) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
455		// Perform tag-based pruning in the base pool
456		let status = self.pool.write().prune_tags(tags);
457		// Notify event listeners of all transactions
458		// that were promoted to `Ready` or were dropped.
459		{
460			let mut listener = self.listener.write();
461			for promoted in &status.promoted {
462				fire_events(&mut *listener, promoted);
463			}
464			for f in &status.failed {
465				listener.dropped(f, None);
466			}
467		}
468
469		Ok(status)
470	}
471
472	/// Resubmit transactions that have been revalidated after prune_tags call.
473	pub fn resubmit_pruned(
474		&self,
475		at: &BlockId<B::Block>,
476		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
477		pruned_hashes: Vec<ExtrinsicHash<B>>,
478		pruned_xts: Vec<ValidatedTransactionFor<B>>,
479	) -> Result<(), B::Error> {
480		debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
481
482		// Resubmit pruned transactions
483		let results = self.submit(pruned_xts);
484
485		// Collect the hashes of transactions that now became invalid (meaning that they are
486		// successfully pruned).
487		let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
488			match r.map_err(error::IntoPoolError::into_pool_error) {
489				Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
490				_ => None,
491			}
492		});
493		// Fire `pruned` notifications for collected hashes and make sure to include
494		// `known_imported_hashes` since they were just imported as part of the block.
495		let hashes = hashes.chain(known_imported_hashes.into_iter());
496		self.fire_pruned(at, hashes)?;
497
498		// perform regular cleanup of old transactions in the pool
499		// and update temporary bans.
500		self.clear_stale(at)?;
501		Ok(())
502	}
503
504	/// Fire notifications for pruned transactions.
505	pub fn fire_pruned(
506		&self,
507		at: &BlockId<B::Block>,
508		hashes: impl Iterator<Item = ExtrinsicHash<B>>,
509	) -> Result<(), B::Error> {
510		let header_hash = self
511			.api
512			.block_id_to_hash(at)?
513			.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?;
514		let mut listener = self.listener.write();
515		let mut set = HashSet::with_capacity(hashes.size_hint().0);
516		for h in hashes {
517			// `hashes` has possibly duplicate hashes.
518			// we'd like to send out the `InBlock` notification only once.
519			if !set.contains(&h) {
520				listener.pruned(header_hash, &h);
521				set.insert(h);
522			}
523		}
524		Ok(())
525	}
526
527	/// Removes stale transactions from the pool.
528	///
529	/// Stale transactions are transaction beyond their longevity period.
530	/// Note this function does not remove transactions that are already included in the chain.
531	/// See `prune_tags` if you want this.
532	pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
533		let block_number = self
534			.api
535			.block_id_to_number(at)?
536			.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?
537			.saturated_into::<u64>();
538		let now = Instant::now();
539		let to_remove = {
540			self.ready()
541				.filter(|tx| self.rotator.ban_if_stale(&now, block_number, tx))
542				.map(|tx| tx.hash)
543				.collect::<Vec<_>>()
544		};
545		let futures_to_remove: Vec<ExtrinsicHash<B>> = {
546			let p = self.pool.read();
547			let mut hashes = Vec::new();
548			for tx in p.futures() {
549				if self.rotator.ban_if_stale(&now, block_number, tx) {
550					hashes.push(tx.hash);
551				}
552			}
553			hashes
554		};
555		// removing old transactions
556		self.remove_invalid(&to_remove);
557		self.remove_invalid(&futures_to_remove);
558		// clear banned transactions timeouts
559		self.rotator.clear_timeouts(&now);
560
561		Ok(())
562	}
563
564	/// Get api reference.
565	pub fn api(&self) -> &B {
566		&self.api
567	}
568
569	/// Return an event stream of notifications for when transactions are imported to the pool.
570	///
571	/// Consumers of this stream should use the `ready` method to actually get the
572	/// pending transactions in the right order.
573	pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
574		const CHANNEL_BUFFER_SIZE: usize = 1024;
575
576		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
577		self.import_notification_sinks.lock().push(sink);
578		stream
579	}
580
581	/// Invoked when extrinsics are broadcasted.
582	pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
583		let mut listener = self.listener.write();
584		for (hash, peers) in propagated.into_iter() {
585			listener.broadcasted(&hash, peers);
586		}
587	}
588
589	/// Remove a subtree of transactions from the pool and mark them invalid.
590	///
591	/// The transactions passed as an argument will be additionally banned
592	/// to prevent them from entering the pool right away.
593	/// Note this is not the case for the dependent transactions - those may
594	/// still be valid so we want to be able to re-import them.
595	pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
596		// early exit in case there is no invalid transactions.
597		if hashes.is_empty() {
598			return vec![]
599		}
600
601		log::debug!(target: LOG_TARGET, "Removing invalid transactions: {:?}", hashes);
602
603		// temporarily ban invalid transactions
604		self.rotator.ban(&Instant::now(), hashes.iter().cloned());
605
606		let invalid = self.pool.write().remove_subtree(hashes);
607
608		log::debug!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid);
609
610		let mut listener = self.listener.write();
611		for tx in &invalid {
612			listener.invalid(&tx.hash);
613		}
614
615		invalid
616	}
617
618	/// Get an iterator for ready transactions ordered by priority
619	pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
620		self.pool.read().ready()
621	}
622
623	/// Returns a Vec of hashes and extrinsics in the future pool.
624	pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
625		self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
626	}
627
628	/// Returns pool status.
629	pub fn status(&self) -> PoolStatus {
630		self.pool.read().status()
631	}
632
633	/// Notify all watchers that transactions in the block with hash have been finalized
634	pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
635		log::trace!(
636			target: LOG_TARGET,
637			"Attempting to notify watchers of finalization for {}",
638			block_hash,
639		);
640		self.listener.write().finalized(block_hash);
641		Ok(())
642	}
643
644	/// Notify the listener of retracted blocks
645	pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
646		self.listener.write().retracted(block_hash)
647	}
648}
649
650fn fire_events<H, B, Ex>(listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>)
651where
652	H: hash::Hash + Eq + traits::Member + Serialize,
653	B: ChainApi,
654{
655	match *imported {
656		base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
657			listener.ready(hash, None);
658			failed.iter().for_each(|f| listener.invalid(f));
659			removed.iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
660			promoted.iter().for_each(|p| listener.ready(p, None));
661		},
662		base::Imported::Future { ref hash } => listener.future(hash),
663	}
664}