referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
validated_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	common::{
21		sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22	},
23	insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31	traits::SaturatedConversion,
32	transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35	collections::{HashMap, HashSet},
36	sync::Arc,
37	time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42	base_pool::{self as base, PruneStatus},
43	listener::EventHandler,
44	pool::{
45		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46	},
47	rotator::PoolRotator,
48	watcher::Watcher,
49};
50
51/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
52#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54	/// Transaction that has been validated successfully.
55	Valid(base::Transaction<Hash, Ex>),
56	/// Transaction that is invalid.
57	Invalid(Hash, Error),
58	/// Transaction which validity can't be determined.
59	///
60	/// We're notifying watchers about failure, if 'unknown' transaction is submitted.
61	Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65	/// Consume validity result, transaction data and produce ValidTransaction.
66	pub fn valid_at(
67		at: u64,
68		hash: Hash,
69		source: base::TimedTransactionSource,
70		data: Ex,
71		bytes: usize,
72		validity: ValidTransaction,
73	) -> Self {
74		Self::Valid(base::Transaction {
75			data,
76			bytes,
77			hash,
78			source,
79			priority: validity.priority,
80			requires: validity.requires,
81			provides: validity.provides,
82			propagate: validity.propagate,
83			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84		})
85	}
86
87	/// Returns priority for valid transaction, None if transaction is not valid.
88	pub fn priority(&self) -> Option<TransactionPriority> {
89		match self {
90			ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91			_ => None,
92		}
93	}
94}
95
96/// A type of validated transaction stored in the validated pool.
97pub type ValidatedTransactionFor<B> =
98	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100/// A type alias representing ValidatedPool event dispatcher for given ChainApi type.
101pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103/// A closure that returns true if the local node is a validator that can author blocks.
104#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108	fn from(is_validator: bool) -> Self {
109		Self(Arc::new(Box::new(move || is_validator)))
110	}
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114	fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115		Self(Arc::new(is_validator))
116	}
117}
118
119/// Represents the result of `submit` or `submit_and_watch` operations.
120pub struct BaseSubmitOutcome<B: ChainApi, W> {
121	/// The hash of the submitted transaction.
122	hash: ExtrinsicHash<B>,
123	/// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`.
124	watcher: Option<W>,
125
126	/// The priority of the transaction. Defaults to None if unknown.
127	priority: Option<TransactionPriority>,
128}
129
130/// Type alias to outcome of submission to `ValidatedPool`.
131pub type ValidatedPoolSubmitOutcome<B> =
132	BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135	/// Creates a new instance with given hash and priority.
136	pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137		Self { hash, priority, watcher: None }
138	}
139
140	/// Sets the transaction watcher.
141	pub fn with_watcher(mut self, watcher: W) -> Self {
142		self.watcher = Some(watcher);
143		self
144	}
145
146	/// Provides priority of submitted transaction.
147	pub fn priority(&self) -> Option<TransactionPriority> {
148		self.priority
149	}
150
151	/// Provides hash of submitted transaction.
152	pub fn hash(&self) -> ExtrinsicHash<B> {
153		self.hash
154	}
155
156	/// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will
157	/// panic (that would mean logical error in program).
158	pub fn expect_watcher(&mut self) -> W {
159		self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160	}
161}
162
163/// Pool that deals with validated transactions.
164pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165	api: Arc<B>,
166	is_validator: IsValidator,
167	options: Options,
168	event_dispatcher: RwLock<EventDispatcher<B, L>>,
169	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171	rotator: PoolRotator<ExtrinsicHash<B>>,
172	enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176	fn clone(&self) -> Self {
177		Self {
178			api: self.api.clone(),
179			is_validator: self.is_validator.clone(),
180			options: self.options.clone(),
181			event_dispatcher: Default::default(),
182			pool: RwLock::from(self.pool.read().clone()),
183			import_notification_sinks: Default::default(),
184			rotator: self.rotator.clone(),
185			enforce_limits_stats: self.enforce_limits_stats.clone(),
186		}
187	}
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192		Self {
193			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194				event_handler,
195			))),
196			..self.clone()
197		}
198	}
199
200	/// Create a new transaction pool with statically sized rotator.
201	pub fn new_with_staticly_sized_rotator(
202		options: Options,
203		is_validator: IsValidator,
204		api: Arc<B>,
205	) -> Self {
206		let ban_time = options.ban_time;
207		Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208	}
209
210	/// Create a new transaction pool.
211	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212		let ban_time = options.ban_time;
213		let total_count = options.total_count();
214		Self::new_with_rotator(
215			options,
216			is_validator,
217			api,
218			PoolRotator::new_with_expected_size(ban_time, total_count),
219			None,
220		)
221	}
222
223	/// Create a new transaction pool with given event handler.
224	pub fn new_with_event_handler(
225		options: Options,
226		is_validator: IsValidator,
227		api: Arc<B>,
228		event_handler: L,
229	) -> Self {
230		let ban_time = options.ban_time;
231		let total_count = options.total_count();
232		Self::new_with_rotator(
233			options,
234			is_validator,
235			api,
236			PoolRotator::new_with_expected_size(ban_time, total_count),
237			Some(event_handler),
238		)
239	}
240
241	fn new_with_rotator(
242		options: Options,
243		is_validator: IsValidator,
244		api: Arc<B>,
245		rotator: PoolRotator<ExtrinsicHash<B>>,
246		event_handler: Option<L>,
247	) -> Self {
248		let base_pool = base::BasePool::new(options.reject_future_transactions);
249		Self {
250			is_validator,
251			options,
252			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253			api,
254			pool: RwLock::new(base_pool),
255			import_notification_sinks: Default::default(),
256			rotator,
257			enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258				STAT_SLIDING_WINDOW,
259			)),
260		}
261	}
262
263	/// Bans given set of hashes.
264	pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
265		self.rotator.ban(now, hashes)
266	}
267
268	/// Returns true if transaction with given hash is currently banned from the pool.
269	pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
270		self.rotator.is_banned(hash)
271	}
272
273	/// A fast check before doing any further processing of a transaction, like validation.
274	///
275	/// If `ignore_banned` is `true`, it will not check if the transaction is banned.
276	///
277	/// It checks if the transaction is already imported or banned. If so, it returns an error.
278	pub fn check_is_known(
279		&self,
280		tx_hash: &ExtrinsicHash<B>,
281		ignore_banned: bool,
282	) -> Result<(), B::Error> {
283		if !ignore_banned && self.is_banned(tx_hash) {
284			Err(error::Error::TemporarilyBanned.into())
285		} else if self.pool.read().is_imported(tx_hash) {
286			Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
287		} else {
288			Ok(())
289		}
290	}
291
292	/// Imports a bunch of pre-validated transactions to the pool.
293	pub fn submit(
294		&self,
295		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
296	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
297		let results = txs
298			.into_iter()
299			.map(|validated_tx| self.submit_one(validated_tx))
300			.collect::<Vec<_>>();
301
302		// only enforce limits if there is at least one imported transaction
303		let removed = if results.iter().any(|res| res.is_ok()) {
304			let start = Instant::now();
305			let removed = self.enforce_limits();
306			insert_and_log_throttled_sync!(
307				Level::DEBUG,
308				target:"txpool",
309				prefix:"enforce_limits_stats",
310				self.enforce_limits_stats,
311				start.elapsed().into()
312			);
313			removed
314		} else {
315			Default::default()
316		};
317
318		results
319			.into_iter()
320			.map(|res| match res {
321				Ok(outcome) if removed.contains(&outcome.hash) =>
322					Err(error::Error::ImmediatelyDropped.into()),
323				other => other,
324			})
325			.collect()
326	}
327
328	/// Submit single pre-validated transaction to the pool.
329	fn submit_one(
330		&self,
331		tx: ValidatedTransactionFor<B>,
332	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
333		match tx {
334			ValidatedTransaction::Valid(tx) => {
335				let priority = tx.priority;
336				trace!(
337					target: LOG_TARGET,
338					tx_hash = ?tx.hash,
339					"ValidatedPool::submit_one"
340				);
341				if !tx.propagate && !(self.is_validator.0)() {
342					return Err(error::Error::Unactionable.into())
343				}
344
345				let imported = self.pool.write().import(tx)?;
346
347				if let base::Imported::Ready { ref hash, .. } = imported {
348					let sinks = &mut self.import_notification_sinks.lock();
349					sinks.retain_mut(|sink| match sink.try_send(*hash) {
350						Ok(()) => true,
351						Err(e) =>
352							if e.is_full() {
353								warn!(
354									target: LOG_TARGET,
355									tx_hash = ?hash,
356									"Trying to notify an import but the channel is full"
357								);
358								true
359							} else {
360								false
361							},
362					});
363				}
364
365				let mut event_dispatcher = self.event_dispatcher.write();
366				fire_events(&mut *event_dispatcher, &imported);
367				Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
368			},
369			ValidatedTransaction::Invalid(tx_hash, error) => {
370				trace!(
371					target: LOG_TARGET,
372					?tx_hash,
373					?error,
374					"ValidatedPool::submit_one invalid"
375				);
376				self.rotator.ban(&Instant::now(), std::iter::once(tx_hash));
377				Err(error)
378			},
379			ValidatedTransaction::Unknown(tx_hash, error) => {
380				trace!(
381					target: LOG_TARGET,
382					?tx_hash,
383					?error,
384					"ValidatedPool::submit_one unknown"
385				);
386				self.event_dispatcher.write().invalid(&tx_hash);
387				Err(error)
388			},
389		}
390	}
391
392	fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
393		let status = self.pool.read().status();
394		let ready_limit = &self.options.ready;
395		let future_limit = &self.options.future;
396
397		if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
398			future_limit.is_exceeded(status.future, status.future_bytes)
399		{
400			trace!(
401				target: LOG_TARGET,
402				ready_count = ready_limit.count,
403				ready_kb = ready_limit.total_bytes / 1024,
404				future_count = future_limit.count,
405				future_kb = future_limit.total_bytes / 1024,
406				"Enforcing limits"
407			);
408
409			// clean up the pool
410			let removed = {
411				let mut pool = self.pool.write();
412				let removed = pool
413					.enforce_limits(ready_limit, future_limit)
414					.into_iter()
415					.map(|x| x.hash)
416					.collect::<HashSet<_>>();
417				// ban all removed transactions
418				self.rotator.ban(&Instant::now(), removed.iter().copied());
419				removed
420			};
421			if !removed.is_empty() {
422				trace!(
423					target: LOG_TARGET,
424					dropped_count = removed.len(),
425					"Enforcing limits"
426				);
427			}
428
429			// run notifications
430			let mut event_dispatcher = self.event_dispatcher.write();
431			for h in &removed {
432				event_dispatcher.limits_enforced(h);
433			}
434
435			removed
436		} else {
437			Default::default()
438		}
439	}
440
441	/// Import a single extrinsic and starts to watch their progress in the pool.
442	pub fn submit_and_watch(
443		&self,
444		tx: ValidatedTransactionFor<B>,
445	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
446		match tx {
447			ValidatedTransaction::Valid(tx) => {
448				let hash = self.api.hash_and_length(&tx.data).0;
449				let watcher = self.create_watcher(hash);
450				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
451					.pop()
452					.expect("One extrinsic passed; one result returned; qed")
453					.map(|outcome| outcome.with_watcher(watcher))
454			},
455			ValidatedTransaction::Invalid(hash, err) => {
456				self.rotator.ban(&Instant::now(), std::iter::once(hash));
457				Err(err)
458			},
459			ValidatedTransaction::Unknown(_, err) => Err(err),
460		}
461	}
462
463	/// Creates a new watcher for given extrinsic.
464	pub fn create_watcher(
465		&self,
466		tx_hash: ExtrinsicHash<B>,
467	) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
468		self.event_dispatcher.write().create_watcher(tx_hash)
469	}
470
471	/// Provides a list of hashes for all watched transactions in the pool.
472	pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
473		self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
474	}
475
476	/// Resubmits revalidated transactions back to the pool.
477	///
478	/// Removes and then submits passed transactions and all dependent transactions.
479	/// Transactions that are missing from the pool are not submitted.
480	pub fn resubmit(
481		&self,
482		mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
483	) {
484		#[derive(Debug, Clone, Copy, PartialEq)]
485		enum Status {
486			Future,
487			Ready,
488			Failed,
489			Dropped,
490		}
491
492		let (mut initial_statuses, final_statuses) = {
493			let mut pool = self.pool.write();
494
495			// remove all passed transactions from the ready/future queues
496			// (this may remove additional transactions as well)
497			//
498			// for every transaction that has an entry in the `updated_transactions`,
499			// we store updated validation result in txs_to_resubmit
500			// for every transaction that has no entry in the `updated_transactions`,
501			// we store last validation result (i.e. the pool entry) in txs_to_resubmit
502			let mut initial_statuses = HashMap::new();
503			let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
504			while !updated_transactions.is_empty() {
505				let hash = updated_transactions
506					.keys()
507					.next()
508					.cloned()
509					.expect("transactions is not empty; qed");
510
511				// note we are not considering tx with hash invalid here - we just want
512				// to remove it along with dependent transactions and `remove_subtree()`
513				// does exactly what we need
514				let removed = pool.remove_subtree(&[hash]);
515				for removed_tx in removed {
516					let removed_hash = removed_tx.hash;
517					let updated_transaction = updated_transactions.shift_remove(&removed_hash);
518					let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
519						updated_tx
520					} else {
521						// in most cases we'll end up in successful `try_unwrap`, but if not
522						// we still need to reinsert transaction back to the pool => duplicate call
523						let transaction = match Arc::try_unwrap(removed_tx) {
524							Ok(transaction) => transaction,
525							Err(transaction) => transaction.duplicate(),
526						};
527						ValidatedTransaction::Valid(transaction)
528					};
529
530					initial_statuses.insert(removed_hash, Status::Ready);
531					txs_to_resubmit.push((removed_hash, tx_to_resubmit));
532				}
533				// make sure to remove the hash even if it's not present in the pool anymore.
534				updated_transactions.shift_remove(&hash);
535			}
536
537			// if we're rejecting future transactions, then insertion order matters here:
538			// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
539			// to the future queue and gets rejected immediately
540			// => let's temporary stop rejection and clear future queue before return
541			pool.with_futures_enabled(|pool, reject_future_transactions| {
542				// now resubmit all removed transactions back to the pool
543				let mut final_statuses = HashMap::new();
544				for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
545					match tx_to_resubmit {
546						ValidatedTransaction::Valid(tx) => match pool.import(tx) {
547							Ok(imported) => match imported {
548								base::Imported::Ready { promoted, failed, removed, .. } => {
549									final_statuses.insert(tx_hash, Status::Ready);
550									for hash in promoted {
551										final_statuses.insert(hash, Status::Ready);
552									}
553									for hash in failed {
554										final_statuses.insert(hash, Status::Failed);
555									}
556									for tx in removed {
557										final_statuses.insert(tx.hash, Status::Dropped);
558									}
559								},
560								base::Imported::Future { .. } => {
561									final_statuses.insert(tx_hash, Status::Future);
562								},
563							},
564							Err(error) => {
565								// we do not want to fail if single transaction import has failed
566								// nor we do want to propagate this error, because it could tx
567								// unknown to caller => let's just notify listeners (and issue debug
568								// message)
569								warn!(
570									target: LOG_TARGET,
571									?tx_hash,
572									%error,
573									"Removing invalid transaction from update"
574								);
575								final_statuses.insert(tx_hash, Status::Failed);
576							},
577						},
578						ValidatedTransaction::Invalid(_, _) |
579						ValidatedTransaction::Unknown(_, _) => {
580							final_statuses.insert(tx_hash, Status::Failed);
581						},
582					}
583				}
584
585				// if the pool is configured to reject future transactions, let's clear the future
586				// queue, updating final statuses as required
587				if reject_future_transactions {
588					for future_tx in pool.clear_future() {
589						final_statuses.insert(future_tx.hash, Status::Dropped);
590					}
591				}
592
593				(initial_statuses, final_statuses)
594			})
595		};
596
597		// and now let's notify listeners about status changes
598		let mut event_dispatcher = self.event_dispatcher.write();
599		for (hash, final_status) in final_statuses {
600			let initial_status = initial_statuses.remove(&hash);
601			if initial_status.is_none() || Some(final_status) != initial_status {
602				match final_status {
603					Status::Future => event_dispatcher.future(&hash),
604					Status::Ready => event_dispatcher.ready(&hash, None),
605					Status::Dropped => event_dispatcher.dropped(&hash),
606					Status::Failed => event_dispatcher.invalid(&hash),
607				}
608			}
609		}
610	}
611
612	/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
613	pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
614		self.pool
615			.read()
616			.by_hashes(hashes)
617			.into_iter()
618			.map(|existing_in_pool| {
619				existing_in_pool.map(|transaction| transaction.provides.to_vec())
620			})
621			.collect()
622	}
623
624	/// Get ready transaction by hash
625	pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
626		self.pool.read().ready_by_hash(hash)
627	}
628
629	/// Prunes ready transactions that provide given list of tags.
630	pub fn prune_tags(
631		&self,
632		tags: impl IntoIterator<Item = Tag>,
633	) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
634		// Perform tag-based pruning in the base pool
635		let status = self.pool.write().prune_tags(tags);
636		// Notify event listeners of all transactions
637		// that were promoted to `Ready` or were dropped.
638		{
639			let mut event_dispatcher = self.event_dispatcher.write();
640			for promoted in &status.promoted {
641				fire_events(&mut *event_dispatcher, promoted);
642			}
643			for f in &status.failed {
644				event_dispatcher.dropped(f);
645			}
646		}
647
648		status
649	}
650
651	/// Resubmit transactions that have been revalidated after prune_tags call.
652	pub fn resubmit_pruned(
653		&self,
654		at: &HashAndNumber<B::Block>,
655		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
656		pruned_hashes: Vec<ExtrinsicHash<B>>,
657		pruned_xts: Vec<ValidatedTransactionFor<B>>,
658	) {
659		debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
660
661		// Resubmit pruned transactions
662		let results = self.submit(pruned_xts);
663
664		// Collect the hashes of transactions that now became invalid (meaning that they are
665		// successfully pruned).
666		let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
667			match r.map_err(error::IntoPoolError::into_pool_error) {
668				Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
669				_ => None,
670			}
671		});
672		// Fire `pruned` notifications for collected hashes and make sure to include
673		// `known_imported_hashes` since they were just imported as part of the block.
674		let hashes = hashes.chain(known_imported_hashes.into_iter());
675		self.fire_pruned(at, hashes);
676
677		// perform regular cleanup of old transactions in the pool
678		// and update temporary bans.
679		self.clear_stale(at);
680	}
681
682	/// Fire notifications for pruned transactions.
683	pub fn fire_pruned(
684		&self,
685		at: &HashAndNumber<B::Block>,
686		hashes: impl Iterator<Item = ExtrinsicHash<B>>,
687	) {
688		let mut event_dispatcher = self.event_dispatcher.write();
689		let mut set = HashSet::with_capacity(hashes.size_hint().0);
690		for h in hashes {
691			// `hashes` has possibly duplicate hashes.
692			// we'd like to send out the `InBlock` notification only once.
693			if !set.contains(&h) {
694				event_dispatcher.pruned(at.hash, &h);
695				set.insert(h);
696			}
697		}
698	}
699
700	/// Removes stale transactions from the pool.
701	///
702	/// Stale transactions are transaction beyond their longevity period.
703	/// Note this function does not remove transactions that are already included in the chain.
704	/// See `prune_tags` if you want this.
705	pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
706		let HashAndNumber { number, .. } = *at;
707		let number = number.saturated_into::<u64>();
708		let now = Instant::now();
709		let to_remove = {
710			self.ready()
711				.filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
712				.map(|tx| tx.hash)
713				.collect::<Vec<_>>()
714		};
715		let futures_to_remove: Vec<ExtrinsicHash<B>> = {
716			let p = self.pool.read();
717			let mut hashes = Vec::new();
718			for tx in p.futures() {
719				if self.rotator.ban_if_stale(&now, number, tx) {
720					hashes.push(tx.hash);
721				}
722			}
723			hashes
724		};
725		debug!(
726			target:LOG_TARGET,
727			to_remove_len=to_remove.len(),
728			futures_to_remove_len=futures_to_remove.len(),
729			"clear_stale"
730		);
731		// removing old transactions
732		self.remove_invalid(&to_remove);
733		self.remove_invalid(&futures_to_remove);
734		// clear banned transactions timeouts
735		self.rotator.clear_timeouts(&now);
736	}
737
738	/// Get api reference.
739	pub fn api(&self) -> &B {
740		&self.api
741	}
742
743	/// Return an event stream of notifications for when transactions are imported to the pool.
744	///
745	/// Consumers of this stream should use the `ready` method to actually get the
746	/// pending transactions in the right order.
747	pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
748		const CHANNEL_BUFFER_SIZE: usize = 1024;
749
750		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
751		self.import_notification_sinks.lock().push(sink);
752		stream
753	}
754
755	/// Invoked when extrinsics are broadcasted.
756	pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
757		let mut event_dispatcher = self.event_dispatcher.write();
758		for (hash, peers) in propagated.into_iter() {
759			event_dispatcher.broadcasted(&hash, peers);
760		}
761	}
762
763	/// Remove a subtree of transactions from the pool and mark them invalid.
764	///
765	/// The transactions passed as an argument will be additionally banned
766	/// to prevent them from entering the pool right away.
767	/// Note this is not the case for the dependent transactions - those may
768	/// still be valid so we want to be able to re-import them.
769	///
770	/// For every removed transaction an Invalid event is triggered.
771	///
772	/// Returns the list of actually removed transactions, which may include transactions dependent
773	/// on provided set.
774	pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
775		// early exit in case there is no invalid transactions.
776		if hashes.is_empty() {
777			return vec![]
778		}
779
780		let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
781			listener.invalid(&removed_tx_hash);
782		});
783
784		trace!(
785			target: LOG_TARGET,
786			removed_count = hashes.len(),
787			invalid_count = invalid.len(),
788			"Removed invalid transactions"
789		);
790		log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
791
792		invalid
793	}
794
795	/// Get an iterator for ready transactions ordered by priority
796	pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
797		self.pool.read().ready()
798	}
799
800	/// Returns a Vec of hashes and extrinsics in the future pool.
801	pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
802		self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
803	}
804
805	/// Returns pool status.
806	pub fn status(&self) -> PoolStatus {
807		self.pool.read().status()
808	}
809
810	/// Notify all watchers that transactions in the block with hash have been finalized
811	pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
812		trace!(
813			target: LOG_TARGET,
814			?block_hash,
815			"Attempting to notify watchers of finalization"
816		);
817		self.event_dispatcher.write().finalized(block_hash);
818		Ok(())
819	}
820
821	/// Notify the event_dispatcher of retracted blocks
822	pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
823		self.event_dispatcher.write().retracted(block_hash)
824	}
825
826	/// Resends ready and future events for all the ready and future transactions that are already
827	/// in the pool.
828	///
829	/// Intended to be called after cloning the instance of `ValidatedPool`.
830	pub fn retrigger_notifications(&self) {
831		let pool = self.pool.read();
832		let mut event_dispatcher = self.event_dispatcher.write();
833		pool.ready().for_each(|r| {
834			event_dispatcher.ready(&r.hash, None);
835		});
836		pool.futures().for_each(|f| {
837			event_dispatcher.future(&f.hash);
838		});
839	}
840
841	/// Removes a transaction subtree from the pool, starting from the given transaction hash.
842	///
843	/// This function traverses the dependency graph of transactions and removes the specified
844	/// transaction along with all its descendant transactions from the pool.
845	///
846	/// The root transactions will be banned from re-entrering the pool if `ban_transactions` is
847	/// true. Descendant transactions may be re-submitted to the pool if required.
848	///
849	/// A `event_disaptcher_action` callback function is invoked for every transaction that is
850	/// removed, providing a reference to the pool's event dispatcher and the hash of the removed
851	/// transaction. This allows to trigger the required events.
852	///
853	/// Returns a vector containing the hashes of all removed transactions, including the root
854	/// transaction specified by `tx_hash`.
855	pub fn remove_subtree<F>(
856		&self,
857		hashes: &[ExtrinsicHash<B>],
858		ban_transactions: bool,
859		event_dispatcher_action: F,
860	) -> Vec<TransactionFor<B>>
861	where
862		F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
863	{
864		// temporarily ban removed transactions if requested
865		if ban_transactions {
866			self.rotator.ban(&Instant::now(), hashes.iter().cloned());
867		};
868		let removed = self.pool.write().remove_subtree(hashes);
869
870		removed
871			.into_iter()
872			.map(|tx| {
873				let removed_tx_hash = tx.hash;
874				let mut event_dispatcher = self.event_dispatcher.write();
875				event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
876				tx.clone()
877			})
878			.collect::<Vec<_>>()
879	}
880}
881
882fn fire_events<B, L, Ex>(
883	event_dispatcher: &mut EventDispatcher<B, L>,
884	imported: &base::Imported<ExtrinsicHash<B>, Ex>,
885) where
886	B: ChainApi,
887	L: EventHandler<B>,
888{
889	match *imported {
890		base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
891			event_dispatcher.ready(hash, None);
892			failed.iter().for_each(|f| event_dispatcher.invalid(f));
893			removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
894			promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
895		},
896		base::Imported::Future { ref hash } => event_dispatcher.future(hash),
897	}
898}