referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/graph/
validated_pool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	common::{
21		sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22	},
23	insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31	traits::SaturatedConversion,
32	transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35	collections::{HashMap, HashSet},
36	sync::Arc,
37	time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42	base_pool::{self as base, PruneStatus},
43	listener::EventHandler,
44	pool::{
45		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46	},
47	rotator::PoolRotator,
48	watcher::Watcher,
49};
50
51/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
52#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54	/// Transaction that has been validated successfully.
55	Valid(base::Transaction<Hash, Ex>),
56	/// Transaction that is invalid.
57	Invalid(Hash, Error),
58	/// Transaction which validity can't be determined.
59	///
60	/// We're notifying watchers about failure, if 'unknown' transaction is submitted.
61	Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65	/// Consume validity result, transaction data and produce ValidTransaction.
66	pub fn valid_at(
67		at: u64,
68		hash: Hash,
69		source: base::TimedTransactionSource,
70		data: Ex,
71		bytes: usize,
72		validity: ValidTransaction,
73	) -> Self {
74		Self::Valid(base::Transaction {
75			data,
76			bytes,
77			hash,
78			source,
79			priority: validity.priority,
80			requires: validity.requires,
81			provides: validity.provides,
82			propagate: validity.propagate,
83			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84		})
85	}
86
87	/// Returns priority for valid transaction, None if transaction is not valid.
88	pub fn priority(&self) -> Option<TransactionPriority> {
89		match self {
90			ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91			_ => None,
92		}
93	}
94}
95
96/// A type of validated transaction stored in the validated pool.
97pub type ValidatedTransactionFor<B> =
98	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100/// A type alias representing ValidatedPool event dispatcher for given ChainApi type.
101pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103/// A closure that returns true if the local node is a validator that can author blocks.
104#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108	fn from(is_validator: bool) -> Self {
109		Self(Arc::new(Box::new(move || is_validator)))
110	}
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114	fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115		Self(Arc::new(is_validator))
116	}
117}
118
119/// Represents the result of `submit` or `submit_and_watch` operations.
120pub struct BaseSubmitOutcome<B: ChainApi, W> {
121	/// The hash of the submitted transaction.
122	hash: ExtrinsicHash<B>,
123	/// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`.
124	watcher: Option<W>,
125
126	/// The priority of the transaction. Defaults to None if unknown.
127	priority: Option<TransactionPriority>,
128}
129
130/// Type alias to outcome of submission to `ValidatedPool`.
131pub type ValidatedPoolSubmitOutcome<B> =
132	BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135	/// Creates a new instance with given hash and priority.
136	pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137		Self { hash, priority, watcher: None }
138	}
139
140	/// Sets the transaction watcher.
141	pub fn with_watcher(mut self, watcher: W) -> Self {
142		self.watcher = Some(watcher);
143		self
144	}
145
146	/// Provides priority of submitted transaction.
147	pub fn priority(&self) -> Option<TransactionPriority> {
148		self.priority
149	}
150
151	/// Provides hash of submitted transaction.
152	pub fn hash(&self) -> ExtrinsicHash<B> {
153		self.hash
154	}
155
156	/// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will
157	/// panic (that would mean logical error in program).
158	pub fn expect_watcher(&mut self) -> W {
159		self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160	}
161}
162
163/// Pool that deals with validated transactions.
164pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165	api: Arc<B>,
166	is_validator: IsValidator,
167	options: Options,
168	event_dispatcher: RwLock<EventDispatcher<B, L>>,
169	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171	rotator: PoolRotator<ExtrinsicHash<B>>,
172	enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176	fn clone(&self) -> Self {
177		Self {
178			api: self.api.clone(),
179			is_validator: self.is_validator.clone(),
180			options: self.options.clone(),
181			event_dispatcher: Default::default(),
182			pool: RwLock::from(self.pool.read().clone()),
183			import_notification_sinks: Default::default(),
184			rotator: self.rotator.clone(),
185			enforce_limits_stats: self.enforce_limits_stats.clone(),
186		}
187	}
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192		Self {
193			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194				event_handler,
195			))),
196			..self.clone()
197		}
198	}
199
200	/// Create a new transaction pool with statically sized rotator.
201	pub fn new_with_staticly_sized_rotator(
202		options: Options,
203		is_validator: IsValidator,
204		api: Arc<B>,
205	) -> Self {
206		let ban_time = options.ban_time;
207		Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208	}
209
210	/// Create a new transaction pool.
211	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212		let ban_time = options.ban_time;
213		let total_count = options.total_count();
214		Self::new_with_rotator(
215			options,
216			is_validator,
217			api,
218			PoolRotator::new_with_expected_size(ban_time, total_count),
219			None,
220		)
221	}
222
223	/// Create a new transaction pool with given event handler.
224	pub fn new_with_event_handler(
225		options: Options,
226		is_validator: IsValidator,
227		api: Arc<B>,
228		event_handler: L,
229	) -> Self {
230		let ban_time = options.ban_time;
231		let total_count = options.total_count();
232		Self::new_with_rotator(
233			options,
234			is_validator,
235			api,
236			PoolRotator::new_with_expected_size(ban_time, total_count),
237			Some(event_handler),
238		)
239	}
240
241	fn new_with_rotator(
242		options: Options,
243		is_validator: IsValidator,
244		api: Arc<B>,
245		rotator: PoolRotator<ExtrinsicHash<B>>,
246		event_handler: Option<L>,
247	) -> Self {
248		let base_pool = base::BasePool::new(options.reject_future_transactions);
249		Self {
250			is_validator,
251			options,
252			event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253			api,
254			pool: RwLock::new(base_pool),
255			import_notification_sinks: Default::default(),
256			rotator,
257			enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258				STAT_SLIDING_WINDOW,
259			)),
260		}
261	}
262
263	/// Bans given set of hashes.
264	pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
265		self.rotator.ban(now, hashes)
266	}
267
268	/// Returns true if transaction with given hash is currently banned from the pool.
269	pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
270		self.rotator.is_banned(hash)
271	}
272
273	/// A fast check before doing any further processing of a transaction, like validation.
274	///
275	/// If `ignore_banned` is `true`, it will not check if the transaction is banned.
276	///
277	/// It checks if the transaction is already imported or banned. If so, it returns an error.
278	pub fn check_is_known(
279		&self,
280		tx_hash: &ExtrinsicHash<B>,
281		ignore_banned: bool,
282	) -> Result<(), B::Error> {
283		if !ignore_banned && self.is_banned(tx_hash) {
284			Err(error::Error::TemporarilyBanned.into())
285		} else if self.pool.read().is_imported(tx_hash) {
286			Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
287		} else {
288			Ok(())
289		}
290	}
291
292	/// Imports a bunch of pre-validated transactions to the pool.
293	pub fn submit(
294		&self,
295		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
296	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
297		let results = txs
298			.into_iter()
299			.map(|validated_tx| self.submit_one(validated_tx))
300			.collect::<Vec<_>>();
301
302		// only enforce limits if there is at least one imported transaction
303		let removed = if results.iter().any(|res| res.is_ok()) {
304			let start = Instant::now();
305			let removed = self.enforce_limits();
306			insert_and_log_throttled_sync!(
307				Level::DEBUG,
308				target:"txpool",
309				prefix:"enforce_limits_stats",
310				self.enforce_limits_stats,
311				start.elapsed().into()
312			);
313			removed
314		} else {
315			Default::default()
316		};
317
318		results
319			.into_iter()
320			.map(|res| match res {
321				Ok(outcome) if removed.contains(&outcome.hash) => {
322					Err(error::Error::ImmediatelyDropped.into())
323				},
324				other => other,
325			})
326			.collect()
327	}
328
329	/// Submit single pre-validated transaction to the pool.
330	fn submit_one(
331		&self,
332		tx: ValidatedTransactionFor<B>,
333	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
334		match tx {
335			ValidatedTransaction::Valid(tx) => {
336				let priority = tx.priority;
337				trace!(
338					target: LOG_TARGET,
339					tx_hash = ?tx.hash,
340					"ValidatedPool::submit_one"
341				);
342				if !tx.propagate && !(self.is_validator.0)() {
343					return Err(error::Error::Unactionable.into());
344				}
345
346				let imported = self.pool.write().import(tx)?;
347
348				if let base::Imported::Ready { ref hash, .. } = imported {
349					let sinks = &mut self.import_notification_sinks.lock();
350					sinks.retain_mut(|sink| match sink.try_send(*hash) {
351						Ok(()) => true,
352						Err(e) => {
353							if e.is_full() {
354								warn!(
355									target: LOG_TARGET,
356									tx_hash = ?hash,
357									"Trying to notify an import but the channel is full"
358								);
359								true
360							} else {
361								false
362							}
363						},
364					});
365				}
366
367				let mut event_dispatcher = self.event_dispatcher.write();
368				fire_events(&mut *event_dispatcher, &imported);
369				Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
370			},
371			ValidatedTransaction::Invalid(tx_hash, error) => {
372				trace!(
373					target: LOG_TARGET,
374					?tx_hash,
375					?error,
376					"ValidatedPool::submit_one invalid"
377				);
378				self.rotator.ban(&Instant::now(), std::iter::once(tx_hash));
379				Err(error)
380			},
381			ValidatedTransaction::Unknown(tx_hash, error) => {
382				trace!(
383					target: LOG_TARGET,
384					?tx_hash,
385					?error,
386					"ValidatedPool::submit_one unknown"
387				);
388				self.event_dispatcher.write().invalid(&tx_hash);
389				Err(error)
390			},
391		}
392	}
393
394	fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
395		let status = self.pool.read().status();
396		let ready_limit = &self.options.ready;
397		let future_limit = &self.options.future;
398
399		if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
400			future_limit.is_exceeded(status.future, status.future_bytes)
401		{
402			trace!(
403				target: LOG_TARGET,
404				ready_count = ready_limit.count,
405				ready_kb = ready_limit.total_bytes / 1024,
406				future_count = future_limit.count,
407				future_kb = future_limit.total_bytes / 1024,
408				"Enforcing limits"
409			);
410
411			// clean up the pool
412			let removed = {
413				let mut pool = self.pool.write();
414				let removed = pool
415					.enforce_limits(ready_limit, future_limit)
416					.into_iter()
417					.map(|x| x.hash)
418					.collect::<HashSet<_>>();
419				// ban all removed transactions
420				self.rotator.ban(&Instant::now(), removed.iter().copied());
421				removed
422			};
423			if !removed.is_empty() {
424				trace!(
425					target: LOG_TARGET,
426					dropped_count = removed.len(),
427					"Enforcing limits"
428				);
429			}
430
431			// run notifications
432			let mut event_dispatcher = self.event_dispatcher.write();
433			for h in &removed {
434				event_dispatcher.limits_enforced(h);
435			}
436
437			removed
438		} else {
439			Default::default()
440		}
441	}
442
443	/// Import a single extrinsic and starts to watch their progress in the pool.
444	pub fn submit_and_watch(
445		&self,
446		tx: ValidatedTransactionFor<B>,
447	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
448		match tx {
449			ValidatedTransaction::Valid(tx) => {
450				let hash = self.api.hash_and_length(&tx.data).0;
451				let watcher = self.create_watcher(hash);
452				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
453					.pop()
454					.expect("One extrinsic passed; one result returned; qed")
455					.map(|outcome| outcome.with_watcher(watcher))
456			},
457			ValidatedTransaction::Invalid(hash, err) => {
458				self.rotator.ban(&Instant::now(), std::iter::once(hash));
459				Err(err)
460			},
461			ValidatedTransaction::Unknown(_, err) => Err(err),
462		}
463	}
464
465	/// Creates a new watcher for given extrinsic.
466	pub fn create_watcher(
467		&self,
468		tx_hash: ExtrinsicHash<B>,
469	) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
470		self.event_dispatcher.write().create_watcher(tx_hash)
471	}
472
473	/// Provides a list of hashes for all watched transactions in the pool.
474	pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
475		self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
476	}
477
478	/// Resubmits revalidated transactions back to the pool.
479	///
480	/// Removes and then submits passed transactions and all dependent transactions.
481	/// Transactions that are missing from the pool are not submitted.
482	pub fn resubmit(
483		&self,
484		mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
485	) {
486		#[derive(Debug, Clone, Copy, PartialEq)]
487		enum Status {
488			Future,
489			Ready,
490			Failed,
491			Dropped,
492		}
493
494		let (mut initial_statuses, final_statuses) = {
495			let mut pool = self.pool.write();
496
497			// remove all passed transactions from the ready/future queues
498			// (this may remove additional transactions as well)
499			//
500			// for every transaction that has an entry in the `updated_transactions`,
501			// we store updated validation result in txs_to_resubmit
502			// for every transaction that has no entry in the `updated_transactions`,
503			// we store last validation result (i.e. the pool entry) in txs_to_resubmit
504			let mut initial_statuses = HashMap::new();
505			let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
506			while !updated_transactions.is_empty() {
507				let hash = updated_transactions
508					.keys()
509					.next()
510					.cloned()
511					.expect("transactions is not empty; qed");
512
513				// note we are not considering tx with hash invalid here - we just want
514				// to remove it along with dependent transactions and `remove_subtree()`
515				// does exactly what we need
516				let removed = pool.remove_subtree(&[hash]);
517				for removed_tx in removed {
518					let removed_hash = removed_tx.hash;
519					let updated_transaction = updated_transactions.shift_remove(&removed_hash);
520					let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
521						updated_tx
522					} else {
523						// in most cases we'll end up in successful `try_unwrap`, but if not
524						// we still need to reinsert transaction back to the pool => duplicate call
525						let transaction = match Arc::try_unwrap(removed_tx) {
526							Ok(transaction) => transaction,
527							Err(transaction) => transaction.duplicate(),
528						};
529						ValidatedTransaction::Valid(transaction)
530					};
531
532					initial_statuses.insert(removed_hash, Status::Ready);
533					txs_to_resubmit.push((removed_hash, tx_to_resubmit));
534				}
535				// make sure to remove the hash even if it's not present in the pool anymore.
536				updated_transactions.shift_remove(&hash);
537			}
538
539			// if we're rejecting future transactions, then insertion order matters here:
540			// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
541			// to the future queue and gets rejected immediately
542			// => let's temporary stop rejection and clear future queue before return
543			pool.with_futures_enabled(|pool, reject_future_transactions| {
544				// now resubmit all removed transactions back to the pool
545				let mut final_statuses = HashMap::new();
546				for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
547					match tx_to_resubmit {
548						ValidatedTransaction::Valid(tx) => match pool.import(tx) {
549							Ok(imported) => match imported {
550								base::Imported::Ready { promoted, failed, removed, .. } => {
551									final_statuses.insert(tx_hash, Status::Ready);
552									for hash in promoted {
553										final_statuses.insert(hash, Status::Ready);
554									}
555									for hash in failed {
556										final_statuses.insert(hash, Status::Failed);
557									}
558									for tx in removed {
559										final_statuses.insert(tx.hash, Status::Dropped);
560									}
561								},
562								base::Imported::Future { .. } => {
563									final_statuses.insert(tx_hash, Status::Future);
564								},
565							},
566							Err(error) => {
567								// we do not want to fail if single transaction import has failed
568								// nor we do want to propagate this error, because it could tx
569								// unknown to caller => let's just notify listeners (and issue debug
570								// message)
571								warn!(
572									target: LOG_TARGET,
573									?tx_hash,
574									%error,
575									"Removing invalid transaction from update"
576								);
577								final_statuses.insert(tx_hash, Status::Failed);
578							},
579						},
580						ValidatedTransaction::Invalid(_, _) |
581						ValidatedTransaction::Unknown(_, _) => {
582							final_statuses.insert(tx_hash, Status::Failed);
583						},
584					}
585				}
586
587				// if the pool is configured to reject future transactions, let's clear the future
588				// queue, updating final statuses as required
589				if reject_future_transactions {
590					for future_tx in pool.clear_future() {
591						final_statuses.insert(future_tx.hash, Status::Dropped);
592					}
593				}
594
595				(initial_statuses, final_statuses)
596			})
597		};
598
599		// and now let's notify listeners about status changes
600		let mut event_dispatcher = self.event_dispatcher.write();
601		for (hash, final_status) in final_statuses {
602			let initial_status = initial_statuses.remove(&hash);
603			if initial_status.is_none() || Some(final_status) != initial_status {
604				match final_status {
605					Status::Future => event_dispatcher.future(&hash),
606					Status::Ready => event_dispatcher.ready(&hash, None),
607					Status::Dropped => event_dispatcher.dropped(&hash),
608					Status::Failed => event_dispatcher.invalid(&hash),
609				}
610			}
611		}
612	}
613
614	/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
615	pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
616		self.pool
617			.read()
618			.by_hashes(hashes)
619			.into_iter()
620			.map(|existing_in_pool| {
621				existing_in_pool.map(|transaction| transaction.provides.to_vec())
622			})
623			.collect()
624	}
625
626	/// Get ready transaction by hash
627	pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
628		self.pool.read().ready_by_hash(hash)
629	}
630
631	/// Prunes ready transactions that provide given list of tags.
632	pub fn prune_tags(
633		&self,
634		tags: impl IntoIterator<Item = Tag>,
635	) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
636		// Perform tag-based pruning in the base pool
637		let status = self.pool.write().prune_tags(tags);
638		// Notify event listeners of all transactions
639		// that were promoted to `Ready` or were dropped.
640		{
641			let mut event_dispatcher = self.event_dispatcher.write();
642			for promoted in &status.promoted {
643				fire_events(&mut *event_dispatcher, promoted);
644			}
645			for f in &status.failed {
646				event_dispatcher.dropped(f);
647			}
648		}
649
650		status
651	}
652
653	/// Resubmit transactions that have been revalidated after prune_tags call.
654	pub fn resubmit_pruned(
655		&self,
656		at: &HashAndNumber<B::Block>,
657		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
658		pruned_hashes: Vec<ExtrinsicHash<B>>,
659		pruned_xts: Vec<ValidatedTransactionFor<B>>,
660	) {
661		debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
662
663		// Resubmit pruned transactions
664		let results = self.submit(pruned_xts);
665
666		// Collect the hashes of transactions that now became invalid (meaning that they are
667		// successfully pruned).
668		let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
669			match r.map_err(error::IntoPoolError::into_pool_error) {
670				Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
671				_ => None,
672			}
673		});
674		// Fire `pruned` notifications for collected hashes and make sure to include
675		// `known_imported_hashes` since they were just imported as part of the block.
676		let hashes = hashes.chain(known_imported_hashes.into_iter());
677		self.fire_pruned(at, hashes);
678
679		// perform regular cleanup of old transactions in the pool
680		// and update temporary bans.
681		self.clear_stale(at);
682	}
683
684	/// Fire notifications for pruned transactions.
685	pub fn fire_pruned(
686		&self,
687		at: &HashAndNumber<B::Block>,
688		hashes: impl Iterator<Item = ExtrinsicHash<B>>,
689	) {
690		let mut event_dispatcher = self.event_dispatcher.write();
691		let mut set = HashSet::with_capacity(hashes.size_hint().0);
692		for h in hashes {
693			// `hashes` has possibly duplicate hashes.
694			// we'd like to send out the `InBlock` notification only once.
695			if !set.contains(&h) {
696				event_dispatcher.pruned(at.hash, &h);
697				set.insert(h);
698			}
699		}
700	}
701
702	/// Removes stale transactions from the pool.
703	///
704	/// Stale transactions are transaction beyond their longevity period.
705	/// Note this function does not remove transactions that are already included in the chain.
706	/// See `prune_tags` if you want this.
707	pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
708		let HashAndNumber { number, .. } = *at;
709		let number = number.saturated_into::<u64>();
710		let now = Instant::now();
711		let to_remove = {
712			self.ready()
713				.filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
714				.map(|tx| tx.hash)
715				.collect::<Vec<_>>()
716		};
717		let futures_to_remove: Vec<ExtrinsicHash<B>> = {
718			let p = self.pool.read();
719			let mut hashes = Vec::new();
720			for tx in p.futures() {
721				if self.rotator.ban_if_stale(&now, number, tx) {
722					hashes.push(tx.hash);
723				}
724			}
725			hashes
726		};
727		debug!(
728			target:LOG_TARGET,
729			to_remove_len=to_remove.len(),
730			futures_to_remove_len=futures_to_remove.len(),
731			"clear_stale"
732		);
733		// removing old transactions
734		self.remove_invalid(&to_remove);
735		self.remove_invalid(&futures_to_remove);
736		// clear banned transactions timeouts
737		self.rotator.clear_timeouts(&now);
738	}
739
740	/// Get api reference.
741	pub fn api(&self) -> &B {
742		&self.api
743	}
744
745	/// Return an event stream of notifications for when transactions are imported to the pool.
746	///
747	/// Consumers of this stream should use the `ready` method to actually get the
748	/// pending transactions in the right order.
749	pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
750		const CHANNEL_BUFFER_SIZE: usize = 1024;
751
752		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
753		self.import_notification_sinks.lock().push(sink);
754		stream
755	}
756
757	/// Invoked when extrinsics are broadcasted.
758	pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
759		let mut event_dispatcher = self.event_dispatcher.write();
760		for (hash, peers) in propagated.into_iter() {
761			event_dispatcher.broadcasted(&hash, peers);
762		}
763	}
764
765	/// Remove a subtree of transactions from the pool and mark them invalid.
766	///
767	/// The transactions passed as an argument will be additionally banned
768	/// to prevent them from entering the pool right away.
769	/// Note this is not the case for the dependent transactions - those may
770	/// still be valid so we want to be able to re-import them.
771	///
772	/// For every removed transaction an Invalid event is triggered.
773	///
774	/// Returns the list of actually removed transactions, which may include transactions dependent
775	/// on provided set.
776	pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
777		// early exit in case there is no invalid transactions.
778		if hashes.is_empty() {
779			return vec![];
780		}
781
782		let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
783			listener.invalid(&removed_tx_hash);
784		});
785
786		trace!(
787			target: LOG_TARGET,
788			removed_count = hashes.len(),
789			invalid_count = invalid.len(),
790			"Removed invalid transactions"
791		);
792		log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
793
794		invalid
795	}
796
797	/// Get an iterator for ready transactions ordered by priority
798	pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
799		self.pool.read().ready()
800	}
801
802	/// Returns a Vec of hashes and extrinsics in the future pool.
803	pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
804		self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
805	}
806
807	/// Returns pool status.
808	pub fn status(&self) -> PoolStatus {
809		self.pool.read().status()
810	}
811
812	/// Notify all watchers that transactions in the block with hash have been finalized
813	pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
814		trace!(
815			target: LOG_TARGET,
816			?block_hash,
817			"Attempting to notify watchers of finalization"
818		);
819		self.event_dispatcher.write().finalized(block_hash);
820		Ok(())
821	}
822
823	/// Notify the event_dispatcher of retracted blocks
824	pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
825		self.event_dispatcher.write().retracted(block_hash)
826	}
827
828	/// Resends ready and future events for all the ready and future transactions that are already
829	/// in the pool.
830	///
831	/// Intended to be called after cloning the instance of `ValidatedPool`.
832	pub fn retrigger_notifications(&self) {
833		let pool = self.pool.read();
834		let mut event_dispatcher = self.event_dispatcher.write();
835		pool.ready().for_each(|r| {
836			event_dispatcher.ready(&r.hash, None);
837		});
838		pool.futures().for_each(|f| {
839			event_dispatcher.future(&f.hash);
840		});
841	}
842
843	/// Removes a transaction subtree from the pool, starting from the given transaction hash.
844	///
845	/// This function traverses the dependency graph of transactions and removes the specified
846	/// transaction along with all its descendant transactions from the pool.
847	///
848	/// The root transactions will be banned from re-entrering the pool if `ban_transactions` is
849	/// true. Descendant transactions may be re-submitted to the pool if required.
850	///
851	/// A `event_disaptcher_action` callback function is invoked for every transaction that is
852	/// removed, providing a reference to the pool's event dispatcher and the hash of the removed
853	/// transaction. This allows to trigger the required events.
854	///
855	/// Returns a vector containing the hashes of all removed transactions, including the root
856	/// transaction specified by `tx_hash`.
857	pub fn remove_subtree<F>(
858		&self,
859		hashes: &[ExtrinsicHash<B>],
860		ban_transactions: bool,
861		event_dispatcher_action: F,
862	) -> Vec<TransactionFor<B>>
863	where
864		F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
865	{
866		// temporarily ban removed transactions if requested
867		if ban_transactions {
868			self.rotator.ban(&Instant::now(), hashes.iter().cloned());
869		};
870		let removed = self.pool.write().remove_subtree(hashes);
871
872		removed
873			.into_iter()
874			.map(|tx| {
875				let removed_tx_hash = tx.hash;
876				let mut event_dispatcher = self.event_dispatcher.write();
877				event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
878				tx.clone()
879			})
880			.collect::<Vec<_>>()
881	}
882}
883
884fn fire_events<B, L, Ex>(
885	event_dispatcher: &mut EventDispatcher<B, L>,
886	imported: &base::Imported<ExtrinsicHash<B>, Ex>,
887) where
888	B: ChainApi,
889	L: EventHandler<B>,
890{
891	match *imported {
892		base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
893			event_dispatcher.ready(hash, None);
894			failed.iter().for_each(|f| event_dispatcher.invalid(f));
895			removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
896			promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
897		},
898		base::Imported::Future { ref hash } => event_dispatcher.future(hash),
899	}
900}