referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
view_store.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool view store. Basically block hash to view map with some utility methods.
20
21use super::{
22	import_notification_sink::MultiViewImportNotificationSink,
23	multi_view_listener::{MultiViewListener, TxStatusStream},
24	view::{View, ViewPoolObserver},
25};
26use crate::{
27	fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController,
28	graph::{
29		self,
30		base_pool::{TimedTransactionSource, Transaction},
31		BaseSubmitOutcome, BlockHash, ExtrinsicFor, ExtrinsicHash, TransactionFor,
32		ValidatedPoolSubmitOutcome,
33	},
34	ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
35};
36use itertools::Itertools;
37use parking_lot::RwLock;
38use sc_transaction_pool_api::{
39	error::Error as PoolError, PoolStatus, TransactionTag as Tag, TxInvalidityReportMap,
40};
41use sp_blockchain::{HashAndNumber, TreeRoute};
42use sp_runtime::{
43	generic::BlockId,
44	traits::{Block as BlockT, Header, One, Saturating},
45	transaction_validity::{InvalidTransaction, TransactionValidityError},
46};
47use std::{
48	collections::{hash_map::Entry, HashMap, HashSet},
49	sync::Arc,
50	time::Instant,
51};
52use tracing::{debug, instrument, trace, warn, Level};
53
54/// Helper struct to maintain the context for pending transaction submission, executed for
55/// newly inserted views.
56#[derive(Clone)]
57struct PendingTxSubmission<ChainApi>
58where
59	ChainApi: graph::ChainApi,
60{
61	/// New transaction replacing the old one.
62	xt: ExtrinsicFor<ChainApi>,
63	/// Source of the transaction.
64	source: TimedTransactionSource,
65}
66
67/// Helper type representing the callback allowing to trigger per-transaction events on
68/// `ValidatedPool`'s listener.
69type RemovalCallback<ChainApi> = Arc<
70	dyn Fn(
71			&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
72			ExtrinsicHash<ChainApi>,
73		) + Send
74		+ Sync,
75>;
76
77/// Helper struct to maintain the context for pending transaction removal, executed for
78/// newly inserted views.
79struct PendingTxRemoval<ChainApi>
80where
81	ChainApi: graph::ChainApi,
82{
83	/// Hash of the transaction that will be removed,
84	xt_hash: ExtrinsicHash<ChainApi>,
85	/// Action that shall be executed on underlying `ValidatedPool`'s listener.
86	listener_action: RemovalCallback<ChainApi>,
87}
88
89/// This enum represents an action that should be executed on the newly built
90/// view before this view is inserted into the view store.
91enum PreInsertAction<ChainApi>
92where
93	ChainApi: graph::ChainApi,
94{
95	/// Represents the action of submitting a new transaction. Intended to use to handle usurped
96	/// transactions.
97	SubmitTx(PendingTxSubmission<ChainApi>),
98
99	/// Represents the action of removing a subtree of transactions.
100	RemoveSubtree(PendingTxRemoval<ChainApi>),
101}
102
103/// Represents a task awaiting execution, to be performed immediately prior to the view insertion
104/// into the view store.
105struct PendingPreInsertTask<ChainApi>
106where
107	ChainApi: graph::ChainApi,
108{
109	/// The action to be applied when inserting a new view.
110	action: PreInsertAction<ChainApi>,
111	/// Indicates if the action was already applied to all the views in the view_store.
112	/// If true, it can be removed after inserting any new view.
113	processed: bool,
114}
115
116impl<ChainApi> PendingPreInsertTask<ChainApi>
117where
118	ChainApi: graph::ChainApi,
119{
120	/// Creates new unprocessed instance of pending transaction submission.
121	fn new_submission_action(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource) -> Self {
122		Self {
123			processed: false,
124			action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source }),
125		}
126	}
127
128	/// Creates new unprocessed instance of pending transaction removal.
129	fn new_removal_action(
130		xt_hash: ExtrinsicHash<ChainApi>,
131		listener: RemovalCallback<ChainApi>,
132	) -> Self {
133		Self {
134			processed: false,
135			action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
136				xt_hash,
137				listener_action: listener,
138			}),
139		}
140	}
141
142	/// Marks a task as done for every view present in view store. Basically means that can be
143	/// removed on new view insertion.
144	fn mark_processed(&mut self) {
145		self.processed = true;
146	}
147}
148
149/// The helper structure encapsulates all the views.
150pub(super) struct ViewStore<ChainApi, Block>
151where
152	Block: BlockT,
153	ChainApi: graph::ChainApi<Block = Block>,
154{
155	/// The blockchain api.
156	pub(super) api: Arc<ChainApi>,
157	/// Active views at tips of the forks.
158	///
159	/// Active views are updated with incoming transactions.
160	pub(super) active_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
161	/// Inactive views at intermediary blocks that are no longer tips of the forks.
162	///
163	/// Inactive views are not updated with incoming transactions, while they can still be used to
164	/// build new blocks upon them.
165	pub(super) inactive_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
166	/// Listener for controlling external watchers of transactions.
167	///
168	/// Provides a side-channel allowing to send per-transaction state changes notification.
169	pub(super) listener: Arc<MultiViewListener<ChainApi>>,
170	/// Most recent view processed by tx-pool. Used in the API functions that were not changed to
171	/// add `at` parameter.
172	pub(super) most_recent_view: RwLock<Option<Arc<View<ChainApi>>>>,
173	/// The controller of multi view dropped stream.
174	pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
175	/// Util providing an aggregated stream of transactions that were imported to ready queue in
176	/// any view. Reference kept here for clean up purposes.
177	pub(super) import_notification_sink:
178		MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
179	/// The map used to synchronize replacement of transactions between maintain and dropped
180	/// notifcication threads. It is meant to assure that replaced transaction is also removed from
181	/// newly built views in maintain process.
182	///
183	/// The map's key is hash of actionable extrinsic (to avoid duplicated entries).
184	pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
185}
186
187/// Type alias to outcome of submission to `ViewStore`.
188pub(super) type ViewStoreSubmitOutcome<ChainApi> =
189	BaseSubmitOutcome<ChainApi, TxStatusStream<ChainApi>>;
190
191impl<ChainApi: graph::ChainApi> From<ValidatedPoolSubmitOutcome<ChainApi>>
192	for ViewStoreSubmitOutcome<ChainApi>
193{
194	fn from(value: ValidatedPoolSubmitOutcome<ChainApi>) -> Self {
195		Self::new(value.hash(), value.priority())
196	}
197}
198
199impl<ChainApi, Block> ViewStore<ChainApi, Block>
200where
201	Block: BlockT,
202	ChainApi: graph::ChainApi<Block = Block> + 'static,
203	<Block as BlockT>::Hash: Unpin,
204{
205	/// Creates a new empty view store.
206	pub(super) fn new(
207		api: Arc<ChainApi>,
208		listener: Arc<MultiViewListener<ChainApi>>,
209		dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
210		import_notification_sink: MultiViewImportNotificationSink<
211			Block::Hash,
212			ExtrinsicHash<ChainApi>,
213		>,
214	) -> Self {
215		Self {
216			api,
217			active_views: Default::default(),
218			inactive_views: Default::default(),
219			listener,
220			most_recent_view: RwLock::from(None),
221			dropped_stream_controller,
222			import_notification_sink,
223			pending_txs_tasks: Default::default(),
224		}
225	}
226
227	/// Imports a bunch of unverified extrinsics to every active view.
228	pub(super) async fn submit(
229		&self,
230		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
231	) -> HashMap<Block::Hash, Vec<Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error>>> {
232		let submit_futures = {
233			let active_views = self.active_views.read();
234			active_views
235				.values()
236				.map(|view| {
237					let view = view.clone();
238					let xts = xts.clone();
239					async move {
240						(
241							view.at.hash,
242							view.submit_many(xts, ValidateTransactionPriority::Submitted)
243								.await
244								.into_iter()
245								.map(|r| r.map(Into::into))
246								.collect::<Vec<_>>(),
247						)
248					}
249				})
250				.collect::<Vec<_>>()
251		};
252		let results = futures::future::join_all(submit_futures).await;
253
254		HashMap::<_, _>::from_iter(results.into_iter())
255	}
256
257	/// Synchronously imports single unverified extrinsics into every active view.
258	pub(super) fn submit_local(
259		&self,
260		xt: ExtrinsicFor<ChainApi>,
261	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
262		let active_views = self.active_views.read().values().cloned().collect::<Vec<_>>();
263
264		let tx_hash = self.api.hash_and_length(&xt).0;
265
266		let result = active_views
267			.iter()
268			.map(|view| view.submit_local(xt.clone()))
269			.find_or_first(Result::is_ok);
270
271		match result {
272			Some(Err(error)) => {
273				trace!(
274					target: LOG_TARGET,
275					?tx_hash,
276					%error,
277					"submit_local failed"
278				);
279				Err(error)
280			},
281			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
282			Some(Ok(r)) => Ok(r.into()),
283		}
284	}
285
286	/// Import a single extrinsic and starts to watch its progress in the pool.
287	///
288	/// The extrinsic is imported to every view, and the individual streams providing the progress
289	/// of this transaction within every view are added to the multi view listener.
290	///
291	/// The external stream of aggregated/processed events provided by the `MultiViewListener`
292	/// instance is returned.
293	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::sumbit_and_watch")]
294	pub(super) async fn submit_and_watch(
295		&self,
296		_at: Block::Hash,
297		source: TimedTransactionSource,
298		xt: ExtrinsicFor<ChainApi>,
299	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
300		let tx_hash = self.api.hash_and_length(&xt).0;
301		let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
302			return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
303		};
304		let submit_futures = {
305			let active_views = self.active_views.read();
306			active_views
307				.values()
308				.map(|view| {
309					let view = view.clone();
310					let xt = xt.clone();
311					let source = source.clone();
312					async move {
313						view.submit_one(source, xt, ValidateTransactionPriority::Submitted).await
314					}
315				})
316				.collect::<Vec<_>>()
317		};
318		let result = futures::future::join_all(submit_futures)
319			.await
320			.into_iter()
321			.find_or_first(Result::is_ok);
322
323		match result {
324			Some(Err(error)) => {
325				trace!(
326					target: LOG_TARGET,
327					?tx_hash,
328					%error,
329					"submit_and_watch failed"
330				);
331				return Err(error);
332			},
333			Some(Ok(result)) =>
334				Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
335			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
336		}
337	}
338
339	/// Returns the pool status for every active view.
340	pub(super) fn status(&self) -> HashMap<Block::Hash, PoolStatus> {
341		self.active_views.read().iter().map(|(h, v)| (*h, v.status())).collect()
342	}
343
344	/// Returns true if there are no active views.
345	pub(super) fn is_empty(&self) -> bool {
346		self.active_views.read().is_empty() && self.inactive_views.read().is_empty()
347	}
348
349	/// Searches in the view store for the first descendant view by iterating through the fork of
350	/// the `at` block, up to the provided `block_number`.
351	///
352	/// Returns with a maybe pair of a view and a set of enacted blocks when the first view is
353	/// found.
354	pub(super) fn find_view_descendent_up_to_number(
355		&self,
356		at: &HashAndNumber<Block>,
357		up_to: <<Block as BlockT>::Header as Header>::Number,
358	) -> Option<(Arc<View<ChainApi>>, Vec<Block::Hash>)> {
359		let mut enacted_blocks = Vec::new();
360		let mut at_hash = at.hash;
361		let mut at_number = at.number;
362
363		// Search for a view that can be used to get and return an approximate ready
364		// transaction set.
365		while at_number >= up_to {
366			// Found a view, stop searching.
367			if let Some((view, _)) = self.get_view_at(at_hash, true) {
368				return Some((view, enacted_blocks));
369			}
370
371			enacted_blocks.push(at_hash);
372
373			// Move up into the fork.
374			let header = self.api.block_header(at_hash).ok().flatten()?;
375			at_hash = *header.parent_hash();
376			at_number = at_number.saturating_sub(One::one());
377		}
378
379		None
380	}
381
382	/// Finds the best existing active view to clone from along the path.
383	///
384	/// ```text
385	/// Tree route from R1 to E2.
386	///   <- R3 <- R2 <- R1
387	///  /
388	/// C
389	///  \-> E1 -> E2
390	/// ```
391	/// ```text
392	/// Search path is:
393	/// [E1, C, R3, R2, R1]
394	/// ```
395	pub(super) fn find_best_view(
396		&self,
397		tree_route: &TreeRoute<Block>,
398	) -> Option<Arc<View<ChainApi>>> {
399		let active_views = self.active_views.read();
400		let best_view = {
401			tree_route
402				.retracted()
403				.iter()
404				.chain(std::iter::once(tree_route.common_block()))
405				.chain(tree_route.enacted().iter())
406				.rev()
407				.find(|block| active_views.contains_key(&block.hash))
408		};
409		best_view.map(|h| {
410			active_views
411				.get(&h.hash)
412				.expect("hash was just found in the map's keys. qed")
413				.clone()
414		})
415	}
416
417	/// Returns an iterator for ready transactions for the most recently notified best block.
418	///
419	/// The iterator for future transactions is returned if the most recently notified best block,
420	/// for which maintain process was accomplished, exists.
421	pub(super) fn ready(&self) -> ReadyIteratorFor<ChainApi> {
422		let ready_iterator =
423			self.most_recent_view.read().as_ref().map(|v| v.pool.validated_pool().ready());
424
425		if let Some(ready_iterator) = ready_iterator {
426			return Box::new(ready_iterator)
427		} else {
428			return Box::new(std::iter::empty())
429		}
430	}
431
432	/// Returns a list of future transactions for the most recently notified best block.
433	///
434	/// The set of future transactions is returned if the most recently notified best block, for
435	/// which maintain process was accomplished, exists.
436	pub(super) fn futures(
437		&self,
438	) -> Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>> {
439		self.most_recent_view
440			.read()
441			.as_ref()
442			.and_then(|view| self.futures_at(view.at.hash))
443			.unwrap_or_default()
444	}
445
446	/// Returns a list of future transactions in the view at given block hash.
447	pub(super) fn futures_at(
448		&self,
449		at: Block::Hash,
450	) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
451		self.get_view_at(at, true)
452			.map(|(v, _)| v.pool.validated_pool().pool.read().futures().cloned().collect())
453	}
454
455	/// Collects all the transactions included in the blocks on the provided `tree_route` and
456	/// triggers finalization event for them.
457	///
458	/// The finalization event is sent using side-channel of the multi view `listener`.
459	///
460	/// Returns the list of finalized transactions hashes.
461	pub(super) async fn finalize_route(
462		&self,
463		finalized_hash: Block::Hash,
464		tree_route: &[Block::Hash],
465	) -> Vec<ExtrinsicHash<ChainApi>> {
466		debug!(
467			target: LOG_TARGET,
468			?finalized_hash,
469			?tree_route,
470			"finalize_route"
471		);
472		let mut finalized_transactions = Vec::new();
473
474		for block in tree_route.iter().chain(std::iter::once(&finalized_hash)) {
475			let extrinsics = self
476				.api
477				.block_body(*block)
478				.await
479				.unwrap_or_else(|error| {
480					warn!(
481						target: LOG_TARGET,
482						%error,
483						"Finalize route: error request"
484					);
485					None
486				})
487				.unwrap_or_default()
488				.iter()
489				.map(|e| self.api.hash_and_length(&e).0)
490				.collect::<Vec<_>>();
491
492			extrinsics
493				.iter()
494				.enumerate()
495				.for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i));
496
497			finalized_transactions.extend(extrinsics);
498		}
499
500		debug!(
501			target: LOG_TARGET,
502			"finalize_route: done"
503		);
504		finalized_transactions
505	}
506
507	/// Return specific ready transaction by hash, if there is one.
508	///
509	/// Currently the ready transaction is returned if it exists for the most recently notified best
510	/// block (for which maintain process was accomplished).
511	pub(super) fn ready_transaction(
512		&self,
513		at: Block::Hash,
514		tx_hash: &ExtrinsicHash<ChainApi>,
515	) -> Option<TransactionFor<ChainApi>> {
516		self.active_views
517			.read()
518			.get(&at)
519			.and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash))
520	}
521
522	/// Inserts new view into the view store.
523	///
524	/// Refer to [`Self::insert_new_view_sync`] more details.
525	/// If there are any pending tx replacments, they are applied to the new view.
526	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::insert_new_view")]
527	pub(super) async fn insert_new_view(
528		&self,
529		view: Arc<View<ChainApi>>,
530		tree_route: &TreeRoute<Block>,
531	) {
532		self.apply_pending_tx_replacements(view.clone()).await;
533
534		let start = Instant::now();
535		self.insert_new_view_sync(view, tree_route);
536
537		debug!(
538			target: LOG_TARGET,
539			inactive_views = ?self.inactive_views.read().keys(),
540			duration = ?start.elapsed(),
541			"insert_new_view"
542		);
543	}
544
545	/// Inserts new view into the view store.
546	///
547	/// All the views associated with the blocks which are on enacted path (including common
548	/// ancestor) will be:
549	/// - moved to the inactive views set (`inactive_views`),
550	/// - removed from the multi view listeners.
551	///
552	/// The `most_recent_view` is updated with the reference to the newly inserted view.
553	pub(super) fn insert_new_view_sync(
554		&self,
555		view: Arc<View<ChainApi>>,
556		tree_route: &TreeRoute<Block>,
557	) {
558		//note: most_recent_view must be synced with changes in in/active_views.
559		{
560			let mut most_recent_view_lock = self.most_recent_view.write();
561			let mut active_views = self.active_views.write();
562			let mut inactive_views = self.inactive_views.write();
563
564			std::iter::once(tree_route.common_block())
565				.chain(tree_route.enacted().iter())
566				.map(|block| block.hash)
567				.for_each(|hash| {
568					active_views.remove(&hash).map(|view| {
569						inactive_views.insert(hash, view);
570					});
571				});
572			active_views.insert(view.at.hash, view.clone());
573			most_recent_view_lock.replace(view.clone());
574		};
575	}
576
577	/// Returns an optional reference to the view at given hash.
578	///
579	/// If `allow_retracted` flag is set, inactive views are also searched.
580	///
581	/// If the view at provided hash does not exist `None` is returned.
582	pub(super) fn get_view_at(
583		&self,
584		at: Block::Hash,
585		allow_inactive: bool,
586	) -> Option<(Arc<View<ChainApi>>, bool)> {
587		if let Some(view) = self.active_views.read().get(&at) {
588			return Some((view.clone(), false));
589		}
590		if allow_inactive {
591			if let Some(view) = self.inactive_views.read().get(&at) {
592				return Some((view.clone(), true))
593			}
594		};
595		None
596	}
597
598	/// The finalization event handle for the view store.
599	///
600	/// Views that have associated block number less than finalized block number are removed from
601	/// both active and inactive set.
602	///
603	/// Note: the views with the associated number greater than finalized block number on the forks
604	/// that are not finalized will stay in the view store. They will be removed in the future, once
605	/// new finalized blocks will be notified. This is to avoid scanning for common ancestors.
606	///
607	/// All watched transactions in the blocks from the tree_route will be notified with `Finalized`
608	/// event.
609	///
610	/// Returns the list of hashes of all finalized transactions along the provided `tree_route`.
611	pub(crate) async fn handle_finalized(
612		&self,
613		finalized_hash: Block::Hash,
614		tree_route: &[Block::Hash],
615	) -> Vec<ExtrinsicHash<ChainApi>> {
616		let finalized_xts = self.finalize_route(finalized_hash, tree_route).await;
617		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
618
619		let mut dropped_views = vec![];
620		//clean up older then finalized
621		{
622			let mut active_views = self.active_views.write();
623			let mut inactive_views = self.inactive_views.write();
624			active_views.retain(|hash, v| {
625				let retain = match finalized_number {
626					Err(_) | Ok(None) => *hash == finalized_hash,
627					Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
628					Ok(Some(n)) => v.at.number > n,
629				};
630				if !retain {
631					dropped_views.push(*hash);
632				}
633				retain
634			});
635
636			inactive_views.retain(|hash, v| {
637				let retain = match finalized_number {
638					Err(_) | Ok(None) => false,
639					Ok(Some(n)) => v.at.number >= n,
640				};
641				if !retain {
642					dropped_views.push(*hash);
643				}
644				retain
645			});
646
647			debug!(
648				target: LOG_TARGET,
649				inactive_views = ?inactive_views.keys(),
650				?dropped_views,
651				"handle_finalized"
652			);
653		}
654
655		self.listener.remove_stale_controllers();
656		self.dropped_stream_controller.remove_transactions(finalized_xts.clone());
657
658		self.listener.remove_view(finalized_hash);
659		for view in dropped_views {
660			self.listener.remove_view(view);
661			self.dropped_stream_controller.remove_view(view);
662		}
663
664		finalized_xts
665	}
666
667	/// Terminates all the ongoing background views revalidations triggered at the end of maintain
668	/// process.
669	///
670	/// Refer to [*View revalidation*](../index.html#view-revalidation) for more details.
671	pub(crate) async fn finish_background_revalidations(&self) {
672		let start = Instant::now();
673		let finish_revalidation_futures = {
674			let active_views = self.active_views.read();
675			active_views
676				.values()
677				.map(|view| {
678					let view = view.clone();
679					async move { view.finish_revalidation().await }
680				})
681				.collect::<Vec<_>>()
682		};
683		debug!(
684			target: LOG_TARGET,
685			duration = ?start.elapsed(),
686			"finish_background_revalidations before"
687		);
688		futures::future::join_all(finish_revalidation_futures).await;
689		debug!(
690			target: LOG_TARGET,
691			duration = ?start.elapsed(),
692			"finish_background_revalidations after"
693		);
694	}
695
696	/// Reports invalid transactions to the view store.
697	///
698	/// This function accepts an array of tuples, each containing a transaction hash and an
699	/// optional error encountered during the transaction execution at a specific (also optional)
700	/// block.
701	///
702	/// Removal operation applies to provided transactions. Their descendants can be removed from
703	/// the view, but will not be invalidated or banned.
704	///
705	/// Invalid future and stale transaction will be removed only from given `at` view, and will be
706	/// kept in the view_store. Such transaction will not be reported in returned vector. They
707	/// also will not be banned from re-entering the pool. No event will be triggered.
708	///
709	/// For other errors, the transaction will be removed from the view_store, and it will be
710	/// included in the returned vector. Additionally, transactions provided as input will be banned
711	/// from re-entering the pool.
712	///
713	/// If the tuple's error is None, the transaction will be forcibly removed from the view_store,
714	/// banned and included into the returned vector.
715	///
716	/// For every transaction removed from the view_store (excluding descendants) an Invalid event
717	/// is triggered.
718	///
719	/// Returns the list of actually removed transactions from the mempool, which were included in
720	/// the provided input list.
721	pub(crate) fn report_invalid(
722		&self,
723		at: Option<Block::Hash>,
724		invalid_tx_errors: TxInvalidityReportMap<ExtrinsicHash<ChainApi>>,
725	) -> Vec<TransactionFor<ChainApi>> {
726		let mut remove_from_view = vec![];
727		let mut remove_from_pool = vec![];
728
729		invalid_tx_errors.into_iter().for_each(|(hash, e)| match e {
730			Some(TransactionValidityError::Invalid(
731				InvalidTransaction::Future | InvalidTransaction::Stale,
732			)) => {
733				remove_from_view.push(hash);
734			},
735			_ => {
736				remove_from_pool.push(hash);
737			},
738		});
739
740		// transaction removed from view, won't be included into the final result, as they may still
741		// be in the pool.
742		at.map(|at| {
743			self.get_view_at(at, true)
744				.map(|(view, _)| view.remove_subtree(&remove_from_view, false, |_, _| {}))
745		});
746
747		let mut removed = vec![];
748		for tx_hash in &remove_from_pool {
749			let removed_from_pool = self.remove_transaction_subtree(*tx_hash, |_, _| {});
750			removed_from_pool
751				.iter()
752				.find(|tx| tx.hash == *tx_hash)
753				.map(|tx| removed.push(tx.clone()));
754		}
755
756		self.listener.transactions_invalidated(&remove_from_pool);
757
758		removed
759	}
760
761	/// Replaces an existing transaction in the view_store with a new one.
762	///
763	/// Attempts to replace a transaction identified by `replaced` with a new transaction `xt`.
764	///
765	/// Before submitting a transaction to the views, the new *unprocessed* transaction replacement
766	/// record will be inserted into a pending replacement map. Once the submission to all the views
767	/// is accomplished, the record is marked as *processed*.
768	///
769	/// This map is later applied in `insert_new_view` method executed from different thread.
770	///
771	/// If the transaction is already being replaced, it will simply return without making
772	/// changes.
773	pub(super) async fn replace_transaction(
774		&self,
775		source: TimedTransactionSource,
776		xt: ExtrinsicFor<ChainApi>,
777		replaced: ExtrinsicHash<ChainApi>,
778	) {
779		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
780			entry.insert(PendingPreInsertTask::new_submission_action(xt.clone(), source.clone()));
781		} else {
782			return
783		};
784
785		let tx_hash = self.api.hash_and_length(&xt).0;
786		trace!(
787			target: LOG_TARGET,
788			?replaced,
789			?tx_hash,
790			"replace_transaction"
791		);
792		self.replace_transaction_in_views(source, xt, tx_hash, replaced).await;
793
794		if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
795			replacement.mark_processed();
796		}
797	}
798
799	/// Applies pending transaction replacements to the specified view.
800	///
801	/// After application, all already processed replacements are removed.
802	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::apply_pending_tx_replacements")]
803	async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
804		let start = Instant::now();
805		let mut futures = vec![];
806		let mut replace_count = 0;
807		let mut remove_count = 0;
808
809		for replacement in self.pending_txs_tasks.read().values() {
810			match replacement.action {
811				PreInsertAction::SubmitTx(ref submission) => {
812					replace_count += 1;
813					let xt_hash = self.api.hash_and_length(&submission.xt).0;
814					futures.push(self.replace_transaction_in_view(
815						view.clone(),
816						submission.source.clone(),
817						submission.xt.clone(),
818						xt_hash,
819					));
820				},
821				PreInsertAction::RemoveSubtree(ref removal) => {
822					remove_count += 1;
823					view.remove_subtree(&[removal.xt_hash], true, &*removal.listener_action);
824				},
825			}
826		}
827		let _ = futures::future::join_all(futures).await;
828		self.pending_txs_tasks.write().retain(|_, r| !r.processed);
829		debug!(
830			target: LOG_TARGET,
831			at_hash = ?view.at.hash,
832			replace_count,
833			remove_count,
834			duration = ?start.elapsed(),
835			count = ?self.pending_txs_tasks.read().len(),
836			"apply_pending_tx_replacements"
837		);
838	}
839
840	/// Submits `xt` to the given view.
841	///
842	/// For watched transaction stream is added to the listener.
843	async fn replace_transaction_in_view(
844		&self,
845		view: Arc<View<ChainApi>>,
846		source: TimedTransactionSource,
847		xt: ExtrinsicFor<ChainApi>,
848		tx_hash: ExtrinsicHash<ChainApi>,
849	) {
850		if let Err(error) =
851			view.submit_one(source, xt, ValidateTransactionPriority::Maintained).await
852		{
853			trace!(
854				target: LOG_TARGET,
855				?tx_hash,
856				at_hash = ?view.at.hash,
857				%error,
858				"replace_transaction: submit failed"
859			);
860		}
861	}
862
863	/// Sends `xt` to every view (both active and inactive) containing `replaced` extrinsics.
864	///
865	/// It is assumed that transaction is already known by the pool. Intended to ba called when `xt`
866	/// is replacing `replaced` extrinsic.
867	async fn replace_transaction_in_views(
868		&self,
869		source: TimedTransactionSource,
870		xt: ExtrinsicFor<ChainApi>,
871		tx_hash: ExtrinsicHash<ChainApi>,
872		replaced: ExtrinsicHash<ChainApi>,
873	) {
874		let submit_futures = {
875			let active_views = self.active_views.read();
876			let inactive_views = self.inactive_views.read();
877			active_views
878				.iter()
879				.chain(inactive_views.iter())
880				.filter(|(_, view)| view.is_imported(&replaced))
881				.map(|(_, view)| {
882					self.replace_transaction_in_view(
883						view.clone(),
884						source.clone(),
885						xt.clone(),
886						tx_hash,
887					)
888				})
889				.collect::<Vec<_>>()
890		};
891		let _results = futures::future::join_all(submit_futures).await;
892	}
893
894	/// Removes a transaction subtree from every view in the view_store, starting from the given
895	/// transaction hash.
896	///
897	/// This function traverses the dependency graph of transactions and removes the specified
898	/// transaction along with all its descendant transactions from every view.
899	///
900	/// A `listener_action` callback function is invoked for every transaction that is removed,
901	/// providing a reference to the pool's listener and the hash of the removed transaction. This
902	/// allows to trigger the required events. Note that listener may be called multiple times for
903	/// the same hash.
904	///
905	/// Function will also schedule view pre-insertion actions to ensure that transactions will be
906	/// removed from newly created view.
907	///
908	/// Returns a vector containing the hashes of all removed transactions, including the root
909	/// transaction specified by `tx_hash`. Vector contains only unique hashes.
910	pub(super) fn remove_transaction_subtree<F>(
911		&self,
912		xt_hash: ExtrinsicHash<ChainApi>,
913		listener_action: F,
914	) -> Vec<TransactionFor<ChainApi>>
915	where
916		F: Fn(
917				&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
918				ExtrinsicHash<ChainApi>,
919			) + Clone
920			+ Send
921			+ Sync
922			+ 'static,
923	{
924		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
925			entry.insert(PendingPreInsertTask::new_removal_action(
926				xt_hash,
927				Arc::from(listener_action.clone()),
928			));
929		};
930
931		let mut seen = HashSet::new();
932
933		let removed = self
934			.active_views
935			.read()
936			.iter()
937			.chain(self.inactive_views.read().iter())
938			.filter(|(_, view)| view.is_imported(&xt_hash))
939			.flat_map(|(_, view)| view.remove_subtree(&[xt_hash], true, &listener_action))
940			.filter_map(|xt| seen.insert(xt.hash).then(|| xt.clone()))
941			.collect();
942
943		if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
944			removal_action.mark_processed();
945		}
946
947		removed
948	}
949
950	/// Clears stale views when blockchain finality stalls.
951	///
952	/// This function removes outdated active and inactive views based on the block height
953	/// difference compared to the current block's height. Views are considered stale and
954	/// purged from the `ViewStore` if their height difference from the current block `at`
955	/// exceeds the specified `threshold`.
956	///
957	/// If any views are removed, corresponding cleanup operations are performed on multi-view
958	/// stream controllers to ensure views are also removed there.
959	pub(crate) fn finality_stall_view_cleanup(&self, at: &HashAndNumber<Block>, threshold: usize) {
960		let mut dropped_views = vec![];
961		{
962			let mut active_views = self.active_views.write();
963			let mut inactive_views = self.inactive_views.write();
964			let mut f = |hash: &BlockHash<ChainApi>, v: &View<ChainApi>| -> bool {
965				let diff = at.number.saturating_sub(v.at.number);
966				if diff.into() > threshold.into() {
967					dropped_views.push(*hash);
968					false
969				} else {
970					true
971				}
972			};
973
974			active_views.retain(|h, v| f(h, v));
975			inactive_views.retain(|h, v| f(h, v));
976		}
977
978		if !dropped_views.is_empty() {
979			for view in dropped_views {
980				self.listener.remove_view(view);
981				self.dropped_stream_controller.remove_view(view);
982			}
983		}
984	}
985
986	/// Returns provides tags of given transactions in the views associated to the given set of
987	/// blocks.
988	pub(crate) fn provides_tags_from_inactive_views(
989		&self,
990		block_hashes: Vec<&HashAndNumber<Block>>,
991		mut xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
992	) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
993		let mut provides_tags_map = HashMap::new();
994
995		block_hashes.into_iter().for_each(|hn| {
996			// Get tx provides tags from given view's pool.
997			if let Some((view, _)) = self.get_view_at(hn.hash, true) {
998				let provides_tags = view.pool.validated_pool().extrinsics_tags(&xts_hashes);
999				let xts_provides_tags = xts_hashes
1000					.iter()
1001					.zip(provides_tags.into_iter())
1002					.filter_map(|(hash, maybe_tags)| maybe_tags.map(|tags| (*hash, tags)))
1003					.collect::<HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>>>();
1004
1005				// Remove txs that have been resolved.
1006				xts_hashes.retain(|xth| !xts_provides_tags.contains_key(xth));
1007
1008				// Collect the (extrinsic hash, tags) pairs in a map.
1009				provides_tags_map.extend(xts_provides_tags);
1010			}
1011		});
1012
1013		provides_tags_map
1014	}
1015}