referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
validated_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	common::{
21		sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22	},
23	insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31	traits::SaturatedConversion,
32	transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35	collections::{HashMap, HashSet},
36	sync::Arc,
37	time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42	base_pool::{self as base, PruneStatus},
43	listener::EventHandler,
44	pool::{
45		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46	},
47	rotator::{BanReason, PoolRotator},
48	watcher::Watcher,
49};
50
51/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
52#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54	/// Transaction that has been validated successfully.
55	Valid(base::Transaction<Hash, Ex>),
56	/// Transaction that is invalid.
57	Invalid(Hash, Error),
58	/// Transaction which validity can't be determined.
59	///
60	/// We're notifying watchers about failure, if 'unknown' transaction is submitted.
61	Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65	/// Consume validity result, transaction data and produce ValidTransaction.
66	pub fn valid_at(
67		at: u64,
68		hash: Hash,
69		source: base::TimedTransactionSource,
70		data: Ex,
71		bytes: usize,
72		validity: ValidTransaction,
73	) -> Self {
74		Self::Valid(base::Transaction {
75			data,
76			bytes,
77			hash,
78			source,
79			priority: validity.priority,
80			requires: validity.requires,
81			provides: validity.provides,
82			propagate: validity.propagate,
83			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84		})
85	}
86
87	/// Returns priority for valid transaction, None if transaction is not valid.
88	pub fn priority(&self) -> Option<TransactionPriority> {
89		match self {
90			ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91			_ => None,
92		}
93	}
94}
95
96/// A type of validated transaction stored in the validated pool.
97pub type ValidatedTransactionFor<B> =
98	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100/// A type alias representing ValidatedPool event dispatcher for given ChainApi type.
101pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103/// A closure that returns true if the local node is a validator that can author blocks.
104#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108	fn from(is_validator: bool) -> Self {
109		Self(Arc::new(Box::new(move || is_validator)))
110	}
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114	fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115		Self(Arc::new(is_validator))
116	}
117}
118
119/// Represents the result of `submit` or `submit_and_watch` operations.
120pub struct BaseSubmitOutcome<B: ChainApi, W> {
121	/// The hash of the submitted transaction.
122	hash: ExtrinsicHash<B>,
123	/// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`.
124	watcher: Option<W>,
125
126	/// The priority of the transaction. Defaults to None if unknown.
127	priority: Option<TransactionPriority>,
128}
129
130/// Type alias to outcome of submission to `ValidatedPool`.
131pub type ValidatedPoolSubmitOutcome<B> =
132	BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135	/// Creates a new instance with given hash and priority.
136	pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137		Self { hash, priority, watcher: None }
138	}
139
140	/// Sets the transaction watcher.
141	pub fn with_watcher(mut self, watcher: W) -> Self {
142		self.watcher = Some(watcher);
143		self
144	}
145
146	/// Provides priority of submitted transaction.
147	pub fn priority(&self) -> Option<TransactionPriority> {
148		self.priority
149	}
150
151	/// Provides hash of submitted transaction.
152	pub fn hash(&self) -> ExtrinsicHash<B> {
153		self.hash
154	}
155
156	/// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will
157	/// panic (that would mean logical error in program).
158	pub fn expect_watcher(&mut self) -> W {
159		self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160	}
161}
162
163/// Pool that deals with validated transactions.
164pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165	api: Arc<B>,
166	is_validator: IsValidator,
167	options: Options,
168	event_dispatcher: RwLock<EventDispatcher<B, L>>,
169	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171	rotator: PoolRotator<ExtrinsicHash<B>>,
172	enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176	fn clone(&self) -> Self {
177		Self {
178			api: self.api.clone(),
179			is_validator: self.is_validator.clone(),
180			options: self.options.clone(),
181			event_dispatcher: Default::default(),
182			pool: RwLock::from(self.pool.read().clone()),
183			import_notification_sinks: Default::default(),
184			rotator: self.rotator.clone(),
185			enforce_limits_stats: self.enforce_limits_stats.clone(),
186		}
187	}
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192		Self {
193			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194				event_handler,
195			))),
196			..self.clone()
197		}
198	}
199
200	/// Create a new transaction pool with statically sized rotator.
201	pub fn new_with_staticly_sized_rotator(
202		options: Options,
203		is_validator: IsValidator,
204		api: Arc<B>,
205	) -> Self {
206		let ban_time = options.ban_time;
207		Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208	}
209
210	/// Create a new transaction pool.
211	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212		let ban_time = options.ban_time;
213		let total_count = options.total_count();
214		Self::new_with_rotator(
215			options,
216			is_validator,
217			api,
218			PoolRotator::new_with_expected_size(ban_time, total_count),
219			None,
220		)
221	}
222
223	/// Create a new transaction pool with given event handler.
224	pub fn new_with_event_handler(
225		options: Options,
226		is_validator: IsValidator,
227		api: Arc<B>,
228		event_handler: L,
229	) -> Self {
230		let ban_time = options.ban_time;
231		let total_count = options.total_count();
232		Self::new_with_rotator(
233			options,
234			is_validator,
235			api,
236			PoolRotator::new_with_expected_size(ban_time, total_count),
237			Some(event_handler),
238		)
239	}
240
241	fn new_with_rotator(
242		options: Options,
243		is_validator: IsValidator,
244		api: Arc<B>,
245		rotator: PoolRotator<ExtrinsicHash<B>>,
246		event_handler: Option<L>,
247	) -> Self {
248		let base_pool = base::BasePool::new(options.reject_future_transactions);
249		Self {
250			is_validator,
251			options,
252			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253			api,
254			pool: RwLock::new(base_pool),
255			import_notification_sinks: Default::default(),
256			rotator,
257			enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258				STAT_SLIDING_WINDOW,
259			)),
260		}
261	}
262
263	/// Bans given set of hashes with the specified reason.
264	pub fn ban(
265		&self,
266		now: &Instant,
267		hashes: impl IntoIterator<Item = ExtrinsicHash<B>>,
268		reason: BanReason,
269	) {
270		self.rotator.ban(now, hashes, reason)
271	}
272
273	/// Returns true if transaction with given hash is currently banned from the pool.
274	pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
275		self.rotator.is_banned(hash)
276	}
277
278	/// Removes the ban for a transaction, but only if it was banned for
279	/// [`BanReason::Validation`].
280	///
281	/// Returns `true` if the ban was removed.
282	pub fn unban_if_validation(&self, hash: &ExtrinsicHash<B>) -> bool {
283		self.rotator.unban_if_validation(hash)
284	}
285
286	/// A fast check before doing any further processing of a transaction, like validation.
287	///
288	/// If `ignore_banned` is `true`, it will not check if the transaction is banned.
289	///
290	/// It checks if the transaction is already imported or banned. If so, it returns an error.
291	pub fn check_is_known(
292		&self,
293		tx_hash: &ExtrinsicHash<B>,
294		ignore_banned: bool,
295	) -> Result<(), B::Error> {
296		if !ignore_banned && self.is_banned(tx_hash) {
297			Err(error::Error::TemporarilyBanned.into())
298		} else if self.pool.read().is_imported(tx_hash) {
299			Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
300		} else {
301			Ok(())
302		}
303	}
304
305	/// Imports a bunch of pre-validated transactions to the pool.
306	pub fn submit(
307		&self,
308		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
309	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
310		let results = txs
311			.into_iter()
312			.map(|validated_tx| self.submit_one(validated_tx))
313			.collect::<Vec<_>>();
314
315		// only enforce limits if there is at least one imported transaction
316		let removed = if results.iter().any(|res| res.is_ok()) {
317			let start = Instant::now();
318			let removed = self.enforce_limits();
319			insert_and_log_throttled_sync!(
320				Level::DEBUG,
321				target:"txpool",
322				prefix:"enforce_limits_stats",
323				self.enforce_limits_stats,
324				start.elapsed().into()
325			);
326			removed
327		} else {
328			Default::default()
329		};
330
331		results
332			.into_iter()
333			.map(|res| match res {
334				Ok(outcome) if removed.contains(&outcome.hash) => {
335					Err(error::Error::ImmediatelyDropped.into())
336				},
337				other => other,
338			})
339			.collect()
340	}
341
342	/// Submit single pre-validated transaction to the pool.
343	fn submit_one(
344		&self,
345		tx: ValidatedTransactionFor<B>,
346	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
347		match tx {
348			ValidatedTransaction::Valid(tx) => {
349				let priority = tx.priority;
350				trace!(
351					target: LOG_TARGET,
352					tx_hash = ?tx.hash,
353					"ValidatedPool::submit_one"
354				);
355				if !tx.propagate && !(self.is_validator.0)() {
356					return Err(error::Error::Unactionable.into());
357				}
358
359				let imported = self.pool.write().import(tx)?;
360
361				if let base::Imported::Ready { ref hash, .. } = imported {
362					let sinks = &mut self.import_notification_sinks.lock();
363					sinks.retain_mut(|sink| match sink.try_send(*hash) {
364						Ok(()) => true,
365						Err(e) => {
366							if e.is_full() {
367								warn!(
368									target: LOG_TARGET,
369									tx_hash = ?hash,
370									"Trying to notify an import but the channel is full"
371								);
372								true
373							} else {
374								false
375							}
376						},
377					});
378				}
379
380				let mut event_dispatcher = self.event_dispatcher.write();
381				fire_events(&mut *event_dispatcher, &imported);
382				Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
383			},
384			ValidatedTransaction::Invalid(tx_hash, error) => {
385				trace!(
386					target: LOG_TARGET,
387					?tx_hash,
388					?error,
389					"ValidatedPool::submit_one invalid"
390				);
391				self.rotator
392					.ban(&Instant::now(), std::iter::once(tx_hash), BanReason::Validation);
393				Err(error)
394			},
395			ValidatedTransaction::Unknown(tx_hash, error) => {
396				trace!(
397					target: LOG_TARGET,
398					?tx_hash,
399					?error,
400					"ValidatedPool::submit_one unknown"
401				);
402				self.event_dispatcher.write().invalid(&tx_hash);
403				Err(error)
404			},
405		}
406	}
407
408	fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
409		let status = self.pool.read().status();
410		let ready_limit = &self.options.ready;
411		let future_limit = &self.options.future;
412
413		if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
414			future_limit.is_exceeded(status.future, status.future_bytes)
415		{
416			trace!(
417				target: LOG_TARGET,
418				ready_count = ready_limit.count,
419				ready_kb = ready_limit.total_bytes / 1024,
420				future_count = future_limit.count,
421				future_kb = future_limit.total_bytes / 1024,
422				"Enforcing limits"
423			);
424
425			// clean up the pool
426			let removed = {
427				let mut pool = self.pool.write();
428				let removed = pool
429					.enforce_limits(ready_limit, future_limit)
430					.into_iter()
431					.map(|x| x.hash)
432					.collect::<HashSet<_>>();
433				// ban all removed transactions
434				self.rotator.ban(
435					&Instant::now(),
436					removed.iter().copied(),
437					BanReason::LimitsEnforced,
438				);
439				removed
440			};
441			if !removed.is_empty() {
442				trace!(
443					target: LOG_TARGET,
444					dropped_count = removed.len(),
445					"Enforcing limits"
446				);
447			}
448
449			// run notifications
450			let mut event_dispatcher = self.event_dispatcher.write();
451			for h in &removed {
452				event_dispatcher.limits_enforced(h);
453			}
454
455			removed
456		} else {
457			Default::default()
458		}
459	}
460
461	/// Import a single extrinsic and starts to watch their progress in the pool.
462	pub fn submit_and_watch(
463		&self,
464		tx: ValidatedTransactionFor<B>,
465	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
466		match tx {
467			ValidatedTransaction::Valid(tx) => {
468				let hash = self.api.hash_and_length(&tx.data).0;
469				let watcher = self.create_watcher(hash);
470				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
471					.pop()
472					.expect("One extrinsic passed; one result returned; qed")
473					.map(|outcome| outcome.with_watcher(watcher))
474			},
475			ValidatedTransaction::Invalid(hash, err) => {
476				self.rotator.ban(&Instant::now(), std::iter::once(hash), BanReason::Validation);
477				Err(err)
478			},
479			ValidatedTransaction::Unknown(_, err) => Err(err),
480		}
481	}
482
483	/// Creates a new watcher for given extrinsic.
484	pub fn create_watcher(
485		&self,
486		tx_hash: ExtrinsicHash<B>,
487	) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
488		self.event_dispatcher.write().create_watcher(tx_hash)
489	}
490
491	/// Provides a list of hashes for all watched transactions in the pool.
492	pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
493		self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
494	}
495
496	/// Resubmits revalidated transactions back to the pool.
497	///
498	/// Removes and then submits passed transactions and all dependent transactions.
499	/// Transactions that are missing from the pool are not submitted.
500	pub fn resubmit(
501		&self,
502		mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
503	) {
504		#[derive(Debug, Clone, Copy, PartialEq)]
505		enum Status {
506			Future,
507			Ready,
508			Failed,
509			Dropped,
510		}
511
512		let (mut initial_statuses, final_statuses) = {
513			let mut pool = self.pool.write();
514
515			// remove all passed transactions from the ready/future queues
516			// (this may remove additional transactions as well)
517			//
518			// for every transaction that has an entry in the `updated_transactions`,
519			// we store updated validation result in txs_to_resubmit
520			// for every transaction that has no entry in the `updated_transactions`,
521			// we store last validation result (i.e. the pool entry) in txs_to_resubmit
522			let mut initial_statuses = HashMap::new();
523			let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
524			while !updated_transactions.is_empty() {
525				let hash = updated_transactions
526					.keys()
527					.next()
528					.cloned()
529					.expect("transactions is not empty; qed");
530
531				// note we are not considering tx with hash invalid here - we just want
532				// to remove it along with dependent transactions and `remove_subtree()`
533				// does exactly what we need
534				let removed = pool.remove_subtree(&[hash]);
535				for removed_tx in removed {
536					let removed_hash = removed_tx.hash;
537					let updated_transaction = updated_transactions.shift_remove(&removed_hash);
538					let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
539						updated_tx
540					} else {
541						// in most cases we'll end up in successful `try_unwrap`, but if not
542						// we still need to reinsert transaction back to the pool => duplicate call
543						let transaction = match Arc::try_unwrap(removed_tx) {
544							Ok(transaction) => transaction,
545							Err(transaction) => transaction.duplicate(),
546						};
547						ValidatedTransaction::Valid(transaction)
548					};
549
550					initial_statuses.insert(removed_hash, Status::Ready);
551					txs_to_resubmit.push((removed_hash, tx_to_resubmit));
552				}
553				// make sure to remove the hash even if it's not present in the pool anymore.
554				updated_transactions.shift_remove(&hash);
555			}
556
557			// if we're rejecting future transactions, then insertion order matters here:
558			// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
559			// to the future queue and gets rejected immediately
560			// => let's temporary stop rejection and clear future queue before return
561			pool.with_futures_enabled(|pool, reject_future_transactions| {
562				// now resubmit all removed transactions back to the pool
563				let mut final_statuses = HashMap::new();
564				for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
565					match tx_to_resubmit {
566						ValidatedTransaction::Valid(tx) => match pool.import(tx) {
567							Ok(imported) => match imported {
568								base::Imported::Ready { promoted, failed, removed, .. } => {
569									final_statuses.insert(tx_hash, Status::Ready);
570									for hash in promoted {
571										final_statuses.insert(hash, Status::Ready);
572									}
573									for hash in failed {
574										final_statuses.insert(hash, Status::Failed);
575									}
576									for tx in removed {
577										final_statuses.insert(tx.hash, Status::Dropped);
578									}
579								},
580								base::Imported::Future { .. } => {
581									final_statuses.insert(tx_hash, Status::Future);
582								},
583							},
584							Err(error) => {
585								// we do not want to fail if single transaction import has failed
586								// nor we do want to propagate this error, because it could tx
587								// unknown to caller => let's just notify listeners (and issue debug
588								// message)
589								warn!(
590									target: LOG_TARGET,
591									?tx_hash,
592									%error,
593									"Removing invalid transaction from update"
594								);
595								final_statuses.insert(tx_hash, Status::Failed);
596							},
597						},
598						ValidatedTransaction::Invalid(_, _) |
599						ValidatedTransaction::Unknown(_, _) => {
600							final_statuses.insert(tx_hash, Status::Failed);
601						},
602					}
603				}
604
605				// if the pool is configured to reject future transactions, let's clear the future
606				// queue, updating final statuses as required
607				if reject_future_transactions {
608					for future_tx in pool.clear_future() {
609						final_statuses.insert(future_tx.hash, Status::Dropped);
610					}
611				}
612
613				(initial_statuses, final_statuses)
614			})
615		};
616
617		// and now let's notify listeners about status changes
618		let mut event_dispatcher = self.event_dispatcher.write();
619		for (hash, final_status) in final_statuses {
620			let initial_status = initial_statuses.remove(&hash);
621			if initial_status.is_none() || Some(final_status) != initial_status {
622				match final_status {
623					Status::Future => event_dispatcher.future(&hash),
624					Status::Ready => event_dispatcher.ready(&hash, None),
625					Status::Dropped => event_dispatcher.dropped(&hash),
626					Status::Failed => event_dispatcher.invalid(&hash),
627				}
628			}
629		}
630	}
631
632	/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
633	pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
634		self.pool
635			.read()
636			.by_hashes(hashes)
637			.into_iter()
638			.map(|existing_in_pool| {
639				existing_in_pool.map(|transaction| transaction.provides.to_vec())
640			})
641			.collect()
642	}
643
644	/// Get ready transaction by hash
645	pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
646		self.pool.read().ready_by_hash(hash)
647	}
648
649	/// Prunes ready transactions that provide given list of tags.
650	pub fn prune_tags(
651		&self,
652		tags: impl IntoIterator<Item = Tag>,
653	) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
654		// Perform tag-based pruning in the base pool
655		let status = self.pool.write().prune_tags(tags);
656		// Notify event listeners of all transactions
657		// that were promoted to `Ready` or were dropped.
658		{
659			let mut event_dispatcher = self.event_dispatcher.write();
660			for promoted in &status.promoted {
661				fire_events(&mut *event_dispatcher, promoted);
662			}
663			for f in &status.failed {
664				event_dispatcher.dropped(f);
665			}
666		}
667
668		status
669	}
670
671	/// Resubmit transactions that have been revalidated after prune_tags call.
672	pub fn resubmit_pruned(
673		&self,
674		at: &HashAndNumber<B::Block>,
675		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
676		pruned_hashes: Vec<ExtrinsicHash<B>>,
677		pruned_xts: Vec<ValidatedTransactionFor<B>>,
678	) {
679		debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
680
681		// Resubmit pruned transactions back to the pool. Tag-based pruning may over-prune
682		// (removing dependents in the subtree), so still-valid collateral txs need to be
683		// re-added. In the fork-aware pool this is likely redundant โ€” `update_view_with_mempool`
684		// resubmits everything from the mempool right after pruning โ€” and could be removed in
685		// the future.
686		self.submit(pruned_xts);
687
688		// Fire `pruned` (InBlock) notifications only for `known_imported_hashes` โ€” the
689		// hashes of extrinsics actually present in the imported block body.
690		self.fire_pruned(at, known_imported_hashes.into_iter());
691
692		// perform regular cleanup of old transactions in the pool
693		// and update temporary bans.
694		self.clear_stale(at);
695	}
696
697	/// Fire notifications for pruned transactions.
698	pub fn fire_pruned(
699		&self,
700		at: &HashAndNumber<B::Block>,
701		hashes: impl Iterator<Item = ExtrinsicHash<B>>,
702	) {
703		let mut event_dispatcher = self.event_dispatcher.write();
704		let mut set = HashSet::with_capacity(hashes.size_hint().0);
705		for h in hashes {
706			// `hashes` has possibly duplicate hashes.
707			// we'd like to send out the `InBlock` notification only once.
708			if !set.contains(&h) {
709				event_dispatcher.pruned(at.hash, &h);
710				set.insert(h);
711			}
712		}
713	}
714
715	/// Removes stale transactions from the pool.
716	///
717	/// Stale transactions are transaction beyond their longevity period.
718	/// Note this function does not remove transactions that are already included in the chain.
719	/// See `prune_tags` if you want this.
720	pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
721		let HashAndNumber { number, .. } = *at;
722		let number = number.saturated_into::<u64>();
723		let now = Instant::now();
724		let to_remove = {
725			self.ready()
726				.filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
727				.map(|tx| tx.hash)
728				.collect::<Vec<_>>()
729		};
730		let futures_to_remove: Vec<ExtrinsicHash<B>> = {
731			let p = self.pool.read();
732			let mut hashes = Vec::new();
733			for tx in p.futures() {
734				if self.rotator.ban_if_stale(&now, number, tx) {
735					hashes.push(tx.hash);
736				}
737			}
738			hashes
739		};
740		debug!(
741			target:LOG_TARGET,
742			to_remove_len=to_remove.len(),
743			futures_to_remove_len=futures_to_remove.len(),
744			"clear_stale"
745		);
746		// removing old transactions
747		self.remove_invalid(&to_remove);
748		self.remove_invalid(&futures_to_remove);
749		// clear banned transactions timeouts
750		self.rotator.clear_timeouts(&now);
751	}
752
753	/// Get api reference.
754	pub fn api(&self) -> &B {
755		&self.api
756	}
757
758	/// Return an event stream of notifications for when transactions are imported to the pool.
759	///
760	/// Consumers of this stream should use the `ready` method to actually get the
761	/// pending transactions in the right order.
762	pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
763		const CHANNEL_BUFFER_SIZE: usize = 1024;
764
765		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
766		self.import_notification_sinks.lock().push(sink);
767		stream
768	}
769
770	/// Invoked when extrinsics are broadcasted.
771	pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
772		let mut event_dispatcher = self.event_dispatcher.write();
773		for (hash, peers) in propagated.into_iter() {
774			event_dispatcher.broadcasted(&hash, peers);
775		}
776	}
777
778	/// Remove a subtree of transactions from the pool and mark them invalid.
779	///
780	/// The transactions passed as an argument will be additionally banned
781	/// to prevent them from entering the pool right away.
782	/// Note this is not the case for the dependent transactions - those may
783	/// still be valid so we want to be able to re-import them.
784	///
785	/// For every removed transaction an Invalid event is triggered.
786	///
787	/// Returns the list of actually removed transactions, which may include transactions dependent
788	/// on provided set.
789	pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
790		// early exit in case there is no invalid transactions.
791		if hashes.is_empty() {
792			return vec![];
793		}
794
795		let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
796			listener.invalid(&removed_tx_hash);
797		});
798
799		trace!(
800			target: LOG_TARGET,
801			removed_count = hashes.len(),
802			invalid_count = invalid.len(),
803			"Removed invalid transactions"
804		);
805		log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
806
807		invalid
808	}
809
810	/// Get an iterator for ready transactions ordered by priority
811	pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
812		self.pool.read().ready()
813	}
814
815	/// Returns a Vec of hashes and extrinsics in the future pool.
816	pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
817		self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
818	}
819
820	/// Returns pool status.
821	pub fn status(&self) -> PoolStatus {
822		self.pool.read().status()
823	}
824
825	/// Notify all watchers that transactions in the block with hash have been finalized
826	pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
827		trace!(
828			target: LOG_TARGET,
829			?block_hash,
830			"Attempting to notify watchers of finalization"
831		);
832		self.event_dispatcher.write().finalized(block_hash);
833		Ok(())
834	}
835
836	/// Notify the event_dispatcher of retracted blocks
837	pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
838		self.event_dispatcher.write().retracted(block_hash)
839	}
840
841	/// Resends ready and future events for all the ready and future transactions that are already
842	/// in the pool.
843	///
844	/// Intended to be called after cloning the instance of `ValidatedPool`.
845	pub fn retrigger_notifications(&self) {
846		let pool = self.pool.read();
847		let mut event_dispatcher = self.event_dispatcher.write();
848		pool.ready().for_each(|r| {
849			event_dispatcher.ready(&r.hash, None);
850		});
851		pool.futures().for_each(|f| {
852			event_dispatcher.future(&f.hash);
853		});
854	}
855
856	/// Removes a transaction subtree from the pool, starting from the given transaction hash.
857	///
858	/// This function traverses the dependency graph of transactions and removes the specified
859	/// transaction along with all its descendant transactions from the pool.
860	///
861	/// The root transactions will be banned from re-entrering the pool if `ban_transactions` is
862	/// true. Descendant transactions may be re-submitted to the pool if required.
863	///
864	/// A `event_disaptcher_action` callback function is invoked for every transaction that is
865	/// removed, providing a reference to the pool's event dispatcher and the hash of the removed
866	/// transaction. This allows to trigger the required events.
867	///
868	/// Returns a vector containing the hashes of all removed transactions, including the root
869	/// transaction specified by `tx_hash`.
870	pub fn remove_subtree<F>(
871		&self,
872		hashes: &[ExtrinsicHash<B>],
873		ban_transactions: bool,
874		event_dispatcher_action: F,
875	) -> Vec<TransactionFor<B>>
876	where
877		F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
878	{
879		// temporarily ban removed transactions if requested
880		if ban_transactions {
881			self.rotator.ban(&Instant::now(), hashes.iter().cloned(), BanReason::Validation);
882		};
883		let removed = self.pool.write().remove_subtree(hashes);
884
885		removed
886			.into_iter()
887			.map(|tx| {
888				let removed_tx_hash = tx.hash;
889				let mut event_dispatcher = self.event_dispatcher.write();
890				event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
891				tx.clone()
892			})
893			.collect::<Vec<_>>()
894	}
895}
896
897fn fire_events<B, L, Ex>(
898	event_dispatcher: &mut EventDispatcher<B, L>,
899	imported: &base::Imported<ExtrinsicHash<B>, Ex>,
900) where
901	B: ChainApi,
902	L: EventHandler<B>,
903{
904	match *imported {
905		base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
906			event_dispatcher.ready(hash, None);
907			failed.iter().for_each(|f| event_dispatcher.invalid(f));
908			removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
909			promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
910		},
911		base::Imported::Future { ref hash } => event_dispatcher.future(hash),
912	}
913}