referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
fork_aware_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate fork-aware transaction pool implementation.
20
21use super::{
22	dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23	import_notification_sink::MultiViewImportNotificationSink,
24	metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25	multi_view_listener::MultiViewListener,
26	tx_mem_pool::{InsertionInfo, TxMemPool},
27	view::{ImportedStatus, View},
28	view_store::ViewStore,
29};
30use crate::{
31	api::FullChainApi,
32	common::{
33		sliding_stat::DurationSlidingStats,
34		tracing_log_xt::{log_xt_debug, log_xt_trace},
35		STAT_SLIDING_WINDOW,
36	},
37	enactment_state::{EnactmentAction, EnactmentState},
38	fork_aware_txpool::{
39		dropped_watcher::{DroppedReason, DroppedTransaction},
40		revalidation_worker,
41	},
42	graph::{
43		self,
44		base_pool::{TimedTransactionSource, Transaction},
45		BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46	},
47	insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48	LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52	channel::oneshot,
53	future::{self},
54	prelude::*,
55	FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62	TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67	generic::BlockId,
68	traits::{Block as BlockT, NumberFor},
69	transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70	Saturating,
71};
72use std::{
73	collections::{BTreeMap, HashMap, HashSet},
74	pin::Pin,
75	sync::Arc,
76	time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, instrument, trace, warn, Level};
80
81/// The maximum block height difference before considering a view or transaction as timed-out
82/// due to a finality stall. When the difference exceeds this threshold, elements are treated
83/// as stale and are subject to cleanup.
84const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86/// The number of transactions that will be sent from the mempool to the newly created view during
87/// the maintain process.
88// todo [#8835]: better approach is needed - maybe time-budget approach?
89// note: yap parachain block size.
90const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92/// Fork aware transaction pool task, that needs to be polled.
93pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95/// A structure that maintains a collection of pollers associated with specific block hashes
96/// (views).
97struct ReadyPoll<T, Block>
98where
99	Block: BlockT,
100{
101	pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106	Block: BlockT,
107{
108	/// Creates a new `ReadyPoll` instance with an empty collection of pollers.
109	fn new() -> Self {
110		Self { pollers: Default::default() }
111	}
112
113	/// Adds a new poller for a specific block hash and returns the `Receiver` end of the created
114	/// oneshot channel which will be used to deliver polled result.
115	fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116		let (s, r) = oneshot::channel();
117		self.pollers.entry(at).or_default().push(s);
118		r
119	}
120
121	/// Triggers all pollers associated with a specific block by sending the polled result through
122	/// each oneshot channel.
123	///
124	/// `ready_iterator` is a closure that generates the result data to be sent to the pollers.
125	fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126		debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127		let Some(pollers) = self.pollers.remove(&at) else { return };
128		pollers.into_iter().for_each(|p| {
129			debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130			let _ = p.send(ready_iterator());
131		});
132	}
133
134	/// Removes pollers that have their oneshot channels cancelled.
135	fn remove_cancelled(&mut self) {
136		self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137	}
138}
139
140/// The fork-aware transaction pool.
141///
142/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
143pub struct ForkAwareTxPool<ChainApi, Block>
144where
145	Block: BlockT,
146	ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148	/// The reference to the `ChainApi` provided by client/backend.
149	api: Arc<ChainApi>,
150
151	/// Intermediate buffer for the incoming transaction.
152	mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154	/// The store for all the views.
155	view_store: Arc<ViewStore<ChainApi, Block>>,
156
157	/// Utility for managing pollers of `ready_at` future.
158	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160	/// Prometheus's metrics endpoint.
161	metrics: PrometheusMetrics,
162
163	/// Collector of transaction statuses updates, reports transaction events metrics.
164	events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166	/// Util tracking best and finalized block.
167	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169	/// The channel allowing to send revalidation jobs to the background thread.
170	revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172	/// Util providing an aggregated stream of transactions that were imported to ready queue in
173	/// any view.
174	import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176	/// Externally provided pool options.
177	options: Options,
178
179	/// Is node the validator.
180	is_validator: IsValidator,
181
182	/// Finality timeout threshold.
183	///
184	/// Sets the maximum permissible block height difference between the latest block
185	/// and the oldest transactions or views in the pool. Beyond this difference,
186	/// transactions/views are considered timed out and eligible for cleanup.
187	finality_timeout_threshold: usize,
188
189	/// Transactions included in blocks since the most recently finalized block (including this
190	/// block).
191	///
192	/// Holds a mapping of block hash and number to their corresponding transaction hashes.
193	///
194	/// Intended to be used in the finality stall cleanups and also as a cache for all in-block
195	/// transactions.
196	included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198	/// Stats for submit call durations
199	submit_stats: DurationSlidingStats,
200
201	/// Stats for submit_and_watch call durations
202	submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207	Block: BlockT,
208	ChainApi: graph::ChainApi<Block = Block> + 'static,
209	<Block as BlockT>::Hash: Unpin,
210{
211	// Injects a view for the given block to self.
212	//
213	// Helper for the pool new methods.
214	fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215		if let Some(block_number) =
216			self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217		{
218			let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219			let tree_route =
220				&TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221			let view = self.build_and_plug_view(None, &at_best, &tree_route);
222			self.view_store.insert_new_view_sync(view.into(), &tree_route);
223			trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224		};
225		self
226	}
227
228	/// Create new fork aware transaction pool with provided shared instance of `ChainApi` intended
229	/// for tests.
230	pub fn new_test(
231		pool_api: Arc<ChainApi>,
232		best_block_hash: Block::Hash,
233		finalized_hash: Block::Hash,
234		finality_timeout_threshold: Option<usize>,
235	) -> (Self, [ForkAwareTxPoolTask; 2]) {
236		Self::new_test_with_limits(
237			pool_api,
238			best_block_hash,
239			finalized_hash,
240			Options::default().ready,
241			Options::default().future,
242			usize::MAX,
243			finality_timeout_threshold,
244		)
245	}
246
247	/// Create new fork aware transaction pool with given limits and with provided shared instance
248	/// of `ChainApi` intended for tests.
249	pub fn new_test_with_limits(
250		pool_api: Arc<ChainApi>,
251		best_block_hash: Block::Hash,
252		finalized_hash: Block::Hash,
253		ready_limits: crate::PoolLimit,
254		future_limits: crate::PoolLimit,
255		mempool_max_transactions_count: usize,
256		finality_timeout_threshold: Option<usize>,
257	) -> (Self, [ForkAwareTxPoolTask; 2]) {
258		let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259		let listener = Arc::new(listener);
260
261		let (import_notification_sink, import_notification_sink_task) =
262			MultiViewImportNotificationSink::new_with_worker();
263
264		let (mempool, mempool_task) = TxMemPool::new(
265			pool_api.clone(),
266			listener.clone(),
267			Default::default(),
268			mempool_max_transactions_count,
269			ready_limits.total_bytes + future_limits.total_bytes,
270		);
271		let mempool = Arc::from(mempool);
272
273		let (dropped_stream_controller, dropped_stream) =
274			MultiViewDroppedWatcherController::<ChainApi>::new();
275
276		let view_store = Arc::new(ViewStore::new(
277			pool_api.clone(),
278			listener,
279			dropped_stream_controller,
280			import_notification_sink.clone(),
281		));
282
283		let dropped_monitor_task = Self::dropped_monitor_task(
284			dropped_stream,
285			mempool.clone(),
286			view_store.clone(),
287			import_notification_sink.clone(),
288		);
289
290		let combined_tasks = async move {
291			tokio::select! {
292				_ = listener_task => {},
293				_ = import_notification_sink_task => {},
294				_ = dropped_monitor_task => {}
295			}
296		}
297		.boxed();
298
299		let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301		(
302			Self {
303				mempool,
304				api: pool_api,
305				view_store,
306				ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308					best_block_hash,
309					finalized_hash,
310				))),
311				revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312				import_notification_sink,
313				options,
314				is_validator: false.into(),
315				metrics: Default::default(),
316				events_metrics_collector: EventsMetricsCollector::default(),
317				finality_timeout_threshold: finality_timeout_threshold
318					.unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319				included_transactions: Default::default(),
320				submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321				submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322					STAT_SLIDING_WINDOW,
323				)),
324			}
325			.inject_initial_view(best_block_hash),
326			[combined_tasks, mempool_task],
327		)
328	}
329
330	/// Monitors the stream of dropped transactions and removes them from the mempool and
331	/// view_store.
332	///
333	/// This asynchronous task continuously listens for dropped transaction notifications provided
334	/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
335	/// and `import_notification_sink` instances. For Usurped events, the transaction is also
336	/// removed from the view_store.
337	async fn dropped_monitor_task(
338		mut dropped_stream: StreamOfDropped<ChainApi>,
339		mempool: Arc<TxMemPool<ChainApi, Block>>,
340		view_store: Arc<ViewStore<ChainApi, Block>>,
341		import_notification_sink: MultiViewImportNotificationSink<
342			Block::Hash,
343			ExtrinsicHash<ChainApi>,
344		>,
345	) {
346		let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347		loop {
348			let Some(dropped) = dropped_stream.next().await else {
349				debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350				break;
351			};
352			let start = Instant::now();
353			let tx_hash = dropped.tx_hash;
354			trace!(
355				target: LOG_TARGET,
356				?tx_hash,
357				reason = ?dropped.reason,
358				"fatp::dropped notification, removing"
359			);
360			match dropped.reason {
361				DroppedReason::Usurped(new_tx_hash) => {
362					if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363						view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364					} else {
365						trace!(
366							target: LOG_TARGET,
367							tx_hash = ?new_tx_hash,
368							"error: dropped_monitor_task: no entry in mempool for new transaction"
369						);
370					};
371				},
372				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374				},
375				DroppedReason::Viewless => {
376					if let Some(tx) = mempool.get_by_hash(tx_hash).await {
377						trace!(
378							target: LOG_TARGET,
379							?tx_hash,
380							"dropped_monitor_task: transaction became viewless, marking for unban"
381						);
382						tx.set_needs_unban();
383					}
384					continue;
385				},
386			};
387
388			mempool.remove_transactions(&[tx_hash]).await;
389			import_notification_sink.clean_notified_items(&[tx_hash]);
390			view_store.listener.transaction_dropped(dropped);
391			insert_and_log_throttled!(
392				Level::DEBUG,
393				target:LOG_TARGET_STAT,
394				prefix:"dropped_stats",
395				dropped_stats,
396				start.elapsed().into()
397			);
398		}
399	}
400
401	/// Creates new fork aware transaction pool with the background revalidation worker.
402	///
403	/// The txpool essential tasks (including a revalidation worker) are spawned using provided
404	/// spawner.
405	pub fn new_with_background_worker(
406		options: Options,
407		is_validator: IsValidator,
408		pool_api: Arc<ChainApi>,
409		prometheus: Option<&PrometheusRegistry>,
410		spawner: impl SpawnEssentialNamed,
411		best_block_hash: Block::Hash,
412		finalized_hash: Block::Hash,
413	) -> Self {
414		let metrics = PrometheusMetrics::new(prometheus);
415		let (events_metrics_collector, event_metrics_task) =
416			EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
417
418		let (listener, listener_task) =
419			MultiViewListener::new_with_worker(events_metrics_collector.clone());
420		let listener = Arc::new(listener);
421
422		let (revalidation_queue, revalidation_task) =
423			revalidation_worker::RevalidationQueue::new_with_worker();
424
425		let (import_notification_sink, import_notification_sink_task) =
426			MultiViewImportNotificationSink::new_with_worker();
427
428		let (mempool, blocking_mempool_task) = TxMemPool::new(
429			pool_api.clone(),
430			listener.clone(),
431			metrics.clone(),
432			options.total_count(),
433			options.ready.total_bytes + options.future.total_bytes,
434		);
435		let mempool = Arc::from(mempool);
436
437		let (dropped_stream_controller, dropped_stream) =
438			MultiViewDroppedWatcherController::<ChainApi>::new();
439
440		let view_store = Arc::new(ViewStore::new(
441			pool_api.clone(),
442			listener,
443			dropped_stream_controller,
444			import_notification_sink.clone(),
445		));
446
447		let dropped_monitor_task = Self::dropped_monitor_task(
448			dropped_stream,
449			mempool.clone(),
450			view_store.clone(),
451			import_notification_sink.clone(),
452		);
453
454		let combined_tasks = async move {
455			tokio::select! {
456				_ = listener_task => {}
457				_ = revalidation_task => {},
458				_ = import_notification_sink_task => {},
459				_ = dropped_monitor_task => {}
460				_ = event_metrics_task => {},
461			}
462		}
463		.boxed();
464		spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
465		spawner.spawn_essential_blocking(
466			"txpool-background",
467			Some("transaction-pool"),
468			blocking_mempool_task,
469		);
470
471		Self {
472			mempool,
473			api: pool_api,
474			view_store,
475			ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
476			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
477				best_block_hash,
478				finalized_hash,
479			))),
480			revalidation_queue: Arc::from(revalidation_queue),
481			import_notification_sink,
482			options,
483			metrics,
484			events_metrics_collector,
485			is_validator,
486			finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
487			included_transactions: Default::default(),
488			submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
489			submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
490				STAT_SLIDING_WINDOW,
491			)),
492		}
493		.inject_initial_view(best_block_hash)
494	}
495
496	/// Get access to the underlying api
497	pub fn api(&self) -> &ChainApi {
498		&self.api
499	}
500
501	/// Provides a status for all views at the tips of the forks.
502	pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
503		self.view_store.status()
504	}
505
506	/// Provides a number of views at the tips of the forks.
507	pub fn active_views_count(&self) -> usize {
508		self.view_store.active_views.read().len()
509	}
510
511	/// Provides a number of views at the tips of the forks.
512	pub fn inactive_views_count(&self) -> usize {
513		self.view_store.inactive_views.read().len()
514	}
515
516	/// Provides internal views statistics.
517	///
518	/// Provides block number, count of ready, count of future transactions for every view. It is
519	/// suitable for printing log information.
520	fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
521		self.view_store
522			.active_views
523			.read()
524			.iter()
525			.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
526			.collect()
527	}
528
529	/// Checks if there is a view at the tip of the fork with given hash.
530	pub fn has_view(&self, hash: &Block::Hash) -> bool {
531		self.view_store.active_views.read().contains_key(hash)
532	}
533
534	/// Returns a number of unwatched and watched transactions in internal mempool.
535	///
536	/// Intended for use in unit tests.
537	pub async fn mempool_len(&self) -> (usize, usize) {
538		self.mempool.unwatched_and_watched_count().await
539	}
540
541	/// Returns a set of future transactions for given block hash.
542	///
543	/// Intended for logging / tests.
544	pub fn futures_at(
545		&self,
546		at: Block::Hash,
547	) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
548		self.view_store.futures_at(at)
549	}
550
551	/// Returns a best-effort set of ready transactions for a given block, without executing full
552	/// maintain process.
553	///
554	/// The method attempts to build a temporary view and create an iterator of ready transactions
555	/// for a specific `at` hash. If a valid view is found, it collects and prunes
556	/// transactions already included in the blocks and returns the valid set. Not finding a view
557	/// returns with the ready transaction set found in the most recent view processed by the
558	/// fork-aware txpool. Not being able to query for block number for the provided `at` block hash
559	/// results in returning an empty transaction set.
560	///
561	/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
562	/// so this process shall be fast.
563	pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
564		let start = Instant::now();
565		let api = self.api.clone();
566		debug!(
567			target: LOG_TARGET,
568			?at,
569			"fatp::ready_at_light"
570		);
571
572		let at_number = self.api.resolve_block_number(at).ok();
573		let finalized_number = self
574			.api
575			.resolve_block_number(self.enactment_state.lock().recent_finalized_block())
576			.ok();
577
578		// Prune all txs from the best view found, considering the extrinsics part of the blocks
579		// that are more recent than the view itself.
580		if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
581			let at_hn = HashAndNumber { hash: at, number: at_number };
582			finalized_number.and_then(|finalized_number| {
583				self.view_store
584					.find_view_descendent_up_to_number(&at_hn, finalized_number)
585					.map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
586			})
587		}) {
588			let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
589			let mut all_extrinsics = vec![];
590			for h in enacted_blocks {
591				let extrinsics = api
592					.block_body(h)
593					.await
594					.unwrap_or_else(|error| {
595						warn!(
596							target: LOG_TARGET,
597							%error,
598							"Compute ready light transactions: error request"
599						);
600						None
601					})
602					.unwrap_or_default()
603					.into_iter()
604					.map(|t| api.hash_and_length(&t).0);
605				all_extrinsics.extend(extrinsics);
606			}
607
608			let before_count = tmp_view.pool.validated_pool().status().ready;
609			let tags = tmp_view
610				.pool
611				.validated_pool()
612				.extrinsics_tags(&all_extrinsics)
613				.into_iter()
614				.flatten()
615				.flatten()
616				.collect::<Vec<_>>();
617			let _ = tmp_view.pool.validated_pool().prune_tags(tags);
618
619			let after_count = tmp_view.pool.validated_pool().status().ready;
620			debug!(
621				target: LOG_TARGET,
622				?at,
623				best_view_hash = ?view.at.hash,
624				before_count,
625				to_be_removed = all_extrinsics.len(),
626				after_count,
627				duration = ?start.elapsed(),
628				"fatp::ready_at_light -> light"
629			);
630			Box::new(tmp_view.pool.validated_pool().ready())
631		} else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
632			// Fallback for the case when `at` is not on the already known fork.
633			// Falls back to the most recent view, which may include txs which
634			// are invalid or already included in the blocks but can still yield a
635			// partially valid ready set, which is still better than including nothing.
636			debug!(
637				target: LOG_TARGET,
638				?at,
639				duration = ?start.elapsed(),
640				"fatp::ready_at_light -> most_recent_view"
641			);
642			Box::new(most_recent_view.pool.validated_pool().ready())
643		} else {
644			let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
645			debug!(
646				target: LOG_TARGET,
647				?at,
648				duration = ?start.elapsed(),
649				"fatp::ready_at_light -> empty"
650			);
651			empty
652		}
653	}
654
655	/// Waits for the set of ready transactions for a given block up to a specified timeout.
656	///
657	/// This method combines two futures:
658	/// - The `ready_at` future, which waits for the ready transactions resulting from the full
659	/// maintenance process to be available.
660	/// - The `ready_at_light` future, used as a fallback if the timeout expires before `ready_at`
661	/// completes. This provides a best-effort, ready set of transactions as a result light
662	/// maintain.
663	///
664	/// Returns a future resolving to a ready iterator of transactions.
665	async fn ready_at_with_timeout_internal(
666		&self,
667		at: Block::Hash,
668		timeout: std::time::Duration,
669	) -> ReadyIteratorFor<ChainApi> {
670		debug!(
671			target: LOG_TARGET,
672			?at,
673			?timeout,
674			"fatp::ready_at_with_timeout"
675		);
676		let timeout = futures_timer::Delay::new(timeout);
677		let (view_already_exists, ready_at) = self.ready_at_internal(at);
678
679		if view_already_exists {
680			return ready_at.await;
681		}
682
683		let maybe_ready = async move {
684			select! {
685				ready = ready_at => Some(ready),
686				_ = timeout => {
687					debug!(
688						target: LOG_TARGET,
689						?at,
690						"Timeout fired waiting for transaction pool at block. Proceeding with production."
691					);
692					None
693				}
694			}
695		};
696
697		let fall_back_ready = self.ready_at_light(at);
698		let (maybe_ready, fall_back_ready) =
699			futures::future::join(maybe_ready, fall_back_ready).await;
700		maybe_ready.unwrap_or(fall_back_ready)
701	}
702
703	fn ready_at_internal(
704		&self,
705		at: Block::Hash,
706	) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
707		let mut ready_poll = self.ready_poll.lock();
708
709		if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
710			debug!(
711				target: LOG_TARGET,
712				?at,
713				?inactive,
714				"fatp::ready_at_internal"
715			);
716			let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
717			return (true, async move { iterator }.boxed());
718		}
719
720		let pending = ready_poll
721			.add(at)
722			.map(|received| {
723				received.unwrap_or_else(|error| {
724					warn!(
725						target: LOG_TARGET,
726						%error,
727						"Error receiving ready-set iterator"
728					);
729					Box::new(std::iter::empty())
730				})
731			})
732			.boxed();
733		debug!(
734			target: LOG_TARGET,
735			?at,
736			pending_keys = ?ready_poll.pollers.keys(),
737			"fatp::ready_at_internal"
738		);
739		(false, pending)
740	}
741
742	/// Refer to [`Self::submit_and_watch`]
743	async fn submit_and_watch_inner(
744		&self,
745		at: Block::Hash,
746		source: TransactionSource,
747		xt: TransactionFor<Self>,
748	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
749		let xt = Arc::from(xt);
750
751		let at_number = self
752			.api
753			.block_id_to_number(&BlockId::Hash(at))
754			.ok()
755			.flatten()
756			.unwrap_or_default()
757			.into()
758			.as_u64();
759
760		let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
761			Ok(result) => result,
762			Err(TxPoolApiError::ImmediatelyDropped) => {
763				self.attempt_transaction_replacement(source, at_number, true, xt.clone())
764					.await?
765			},
766			Err(e) => return Err(e.into()),
767		};
768
769		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
770		self.events_metrics_collector.report_submitted(&insertion);
771
772		match self.view_store.submit_and_watch(at, insertion.source, xt).await {
773			Err(e) => {
774				self.mempool.remove_transactions(&[insertion.hash]).await;
775				Err(e.into())
776			},
777			Ok(mut outcome) => {
778				self.mempool
779					.update_transaction_priority(outcome.hash(), outcome.priority())
780					.await;
781				Ok(outcome.expect_watcher())
782			},
783		}
784	}
785
786	/// Refer to [`Self::submit_at`]
787	async fn submit_at_inner(
788		&self,
789		at: Block::Hash,
790		source: TransactionSource,
791		xts: Vec<TransactionFor<Self>>,
792	) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
793		let at_number = self
794			.api
795			.block_id_to_number(&BlockId::Hash(at))
796			.ok()
797			.flatten()
798			.unwrap_or_default()
799			.into()
800			.as_u64();
801		let view_store = self.view_store.clone();
802		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
803		let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
804
805		if view_store.is_empty() {
806			return Ok(mempool_results
807				.into_iter()
808				.map(|r| r.map(|r| r.hash).map_err(Into::into))
809				.collect::<Vec<_>>());
810		}
811
812		// Submit all the transactions to the mempool
813		let retries = mempool_results
814			.into_iter()
815			.zip(xts.clone())
816			.map(|(result, xt)| async move {
817				match result {
818					Err(TxPoolApiError::ImmediatelyDropped) => {
819						self.attempt_transaction_replacement(source, at_number, false, xt).await
820					},
821					_ => result,
822				}
823			})
824			.collect::<Vec<_>>();
825
826		let mempool_results = futures::future::join_all(retries).await;
827
828		// Collect transactions that were successfully submitted to the mempool...
829		let to_be_submitted = mempool_results
830			.iter()
831			.zip(xts)
832			.filter_map(|(result, xt)| {
833				result.as_ref().ok().map(|insertion| {
834					self.events_metrics_collector.report_submitted(&insertion);
835					(insertion.source.clone(), xt)
836				})
837			})
838			.collect::<Vec<_>>();
839
840		self.metrics
841			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
842
843		// ... and submit them to the view_store. Please note that transactions rejected by mempool
844		// are not sent here.
845		let mempool = self.mempool.clone();
846		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
847		let mut submission_results = reduce_multiview_result(results_map).into_iter();
848
849		// Note for composing final result:
850		//
851		// For each failed insertion into the mempool, the mempool result should be placed into
852		// the returned vector.
853		//
854		// For each successful insertion into the mempool, the corresponding
855		// view_store submission result needs to be examined (merged_results):
856		// - If there is an error during view_store submission, the transaction is removed from
857		// the mempool, and the final result recorded in the vector for this transaction is the
858		// view_store submission error.
859		//
860		// - If the view_store submission is successful, the transaction priority is updated in the
861		// mempool.
862		//
863		// Finally, it collects the hashes of updated transactions or submission errors (either
864		// from the mempool or view_store) into a returned vector (final_results).
865		const RESULTS_ASSUMPTION : &str =
866			"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
867		let merged_results = mempool_results.into_iter().map(|result| {
868			result.map_err(Into::into).and_then(|insertion| {
869				Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
870			})
871		});
872
873		let mut final_results = vec![];
874		for r in merged_results {
875			match r {
876				Ok((hash, submission_result)) => match submission_result {
877					Ok(r) => {
878						mempool.update_transaction_priority(r.hash(), r.priority()).await;
879						final_results.push(Ok(r.hash()));
880					},
881					Err(e) => {
882						mempool.remove_transactions(&[hash]).await;
883						final_results.push(Err(e));
884					},
885				},
886				Err(e) => final_results.push(Err(e)),
887			}
888		}
889
890		Ok(final_results)
891	}
892
893	/// Number of notified items in import_notification_sink.
894	///
895	/// Internal detail, exposed only for testing.
896	pub fn import_notification_sink_len(&self) -> usize {
897		self.import_notification_sink.notified_items_len()
898	}
899}
900
901/// Converts the input view-to-statuses map into the output vector of statuses.
902///
903/// The result of importing a bunch of transactions into a single view is the vector of statuses.
904/// Every item represents a status for single transaction. The input is the map that associates
905/// hash-views with vectors indicating the statuses of transactions imports.
906///
907/// Import to multiple views result in two-dimensional array of statuses, which is provided as
908/// input map.
909///
910/// This function converts the map into the vec of results, according to the following rules:
911/// - for given transaction if at least one status is success, then output vector contains success,
912/// - if given transaction status is error for every view, then output vector contains error.
913///
914/// The results for transactions are in the same order for every view. An output vector preserves
915/// this order.
916///
917/// ```skip
918/// in:
919/// view  |   xt0 status | xt1 status | xt2 status
920/// h1   -> [ Ok(xth0),    Ok(xth1),    Err       ]
921/// h2   -> [ Ok(xth0),    Err,         Err       ]
922/// h3   -> [ Ok(xth0),    Ok(xth1),    Err       ]
923///
924/// out:
925/// [ Ok(xth0), Ok(xth1), Err ]
926/// ```
927fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
928	let mut values = input.values();
929	let Some(first) = values.next() else {
930		return Default::default();
931	};
932	let length = first.len();
933	debug_assert!(values.all(|x| length == x.len()));
934
935	input
936		.into_values()
937		.reduce(|mut agg_results, results| {
938			agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
939				if agg_r.is_err() {
940					*agg_r = r;
941				}
942			});
943			agg_results
944		})
945		.unwrap_or_default()
946}
947
948#[async_trait]
949impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
950where
951	Block: BlockT,
952	ChainApi: 'static + graph::ChainApi<Block = Block>,
953	<Block as BlockT>::Hash: Unpin,
954{
955	type Block = ChainApi::Block;
956	type Hash = ExtrinsicHash<ChainApi>;
957	type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
958	type Error = ChainApi::Error;
959
960	/// Submits multiple transactions and returns a future resolving to the submission results.
961	///
962	/// Actual transactions submission process is delegated to the `ViewStore` internal instance.
963	///
964	/// The internal limits of the pool are checked. The results of submissions to individual views
965	/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
966	async fn submit_at(
967		&self,
968		at: <Self::Block as BlockT>::Hash,
969		source: TransactionSource,
970		xts: Vec<TransactionFor<Self>>,
971	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
972		let start = Instant::now();
973		trace!(
974			target: LOG_TARGET,
975			count = xts.len(),
976			active_views_count = self.active_views_count(),
977			"fatp::submit_at"
978		);
979		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
980		let result = self.submit_at_inner(at, source, xts).await;
981		insert_and_log_throttled!(
982			Level::DEBUG,
983			target:LOG_TARGET_STAT,
984			prefix:"submit_stats",
985			self.submit_stats,
986			start.elapsed().into()
987		);
988		result
989	}
990
991	/// Submits a single transaction and returns a future resolving to the submission results.
992	///
993	/// Actual transaction submission process is delegated to the `submit_at` function.
994	async fn submit_one(
995		&self,
996		_at: <Self::Block as BlockT>::Hash,
997		source: TransactionSource,
998		xt: TransactionFor<Self>,
999	) -> Result<TxHash<Self>, Self::Error> {
1000		trace!(
1001			target: LOG_TARGET,
1002			tx_hash = ?self.tx_hash(&xt),
1003			active_views_count = self.active_views_count(),
1004			"fatp::submit_one"
1005		);
1006		match self.submit_at(_at, source, vec![xt]).await {
1007			Ok(mut v) => {
1008				v.pop().expect("There is exactly one element in result of submit_at. qed.")
1009			},
1010			Err(e) => Err(e),
1011		}
1012	}
1013
1014	/// Submits a transaction and starts to watch its progress in the pool, returning a stream of
1015	/// status updates.
1016	///
1017	/// Actual transaction submission process is delegated to the `ViewStore` internal instance.
1018	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1019	async fn submit_and_watch(
1020		&self,
1021		at: <Self::Block as BlockT>::Hash,
1022		source: TransactionSource,
1023		xt: TransactionFor<Self>,
1024	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1025		let start = Instant::now();
1026		trace!(
1027			target: LOG_TARGET,
1028			tx_hash = ?self.tx_hash(&xt),
1029			views = self.active_views_count(),
1030			"fatp::submit_and_watch"
1031		);
1032		let result = self.submit_and_watch_inner(at, source, xt).await;
1033		insert_and_log_throttled!(
1034			Level::DEBUG,
1035			target:LOG_TARGET_STAT,
1036			prefix:"submit_and_watch_stats",
1037			self.submit_and_watch_stats,
1038			start.elapsed().into()
1039		);
1040		result
1041	}
1042
1043	/// Reports invalid transactions to the transaction pool.
1044	///
1045	/// This function takes an array of tuples, each consisting of a transaction hash and the
1046	/// corresponding error that occurred during transaction execution at given block.
1047	///
1048	/// The transaction pool implementation will determine which transactions should be
1049	/// removed from the pool. Transactions that depend on invalid transactions will also
1050	/// be removed.
1051	async fn report_invalid(
1052		&self,
1053		at: Option<<Self::Block as BlockT>::Hash>,
1054		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1055	) -> Vec<Arc<Self::InPoolTransaction>> {
1056		debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1057		log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1058		self.metrics
1059			.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1060
1061		let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1062
1063		let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1064		self.mempool.remove_transactions(&removed_hashes).await;
1065		self.import_notification_sink.clean_notified_items(&removed_hashes);
1066
1067		self.metrics
1068			.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1069
1070		removed
1071	}
1072
1073	// todo [#5491]: api change?
1074	// status(Hash) -> Option<PoolStatus>
1075	/// Returns the pool status which includes information like the number of ready and future
1076	/// transactions.
1077	///
1078	/// Currently the status for the most recently notified best block is returned (for which
1079	/// maintain process was accomplished).
1080	fn status(&self) -> PoolStatus {
1081		self.view_store
1082			.most_recent_view
1083			.read()
1084			.as_ref()
1085			.map(|v| v.status())
1086			.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1087	}
1088
1089	/// Return an event stream of notifications when transactions are imported to the pool.
1090	///
1091	/// Consumers of this stream should use the `ready` method to actually get the
1092	/// pending transactions in the right order.
1093	fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1094		self.import_notification_sink.event_stream()
1095	}
1096
1097	/// Returns the hash of a given transaction.
1098	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1099		self.api().hash_and_length(xt).0
1100	}
1101
1102	/// Notifies the pool about the broadcasting status of transactions.
1103	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1104		self.view_store.listener.transactions_broadcasted(propagations);
1105	}
1106
1107	/// Return specific ready transaction by hash, if there is one.
1108	///
1109	/// Currently the ready transaction is returned if it exists for the most recently notified best
1110	/// block (for which maintain process was accomplished).
1111	// todo [#5491]: api change: we probably should have at here?
1112	fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1113		let most_recent_view_hash =
1114			self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1115		let result = most_recent_view_hash
1116			.and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1117		trace!(
1118			target: LOG_TARGET,
1119			?tx_hash,
1120			is_ready = result.is_some(),
1121			most_recent_view = ?most_recent_view_hash,
1122			"ready_transaction"
1123		);
1124		result
1125	}
1126
1127	/// Returns an iterator for ready transactions at a specific block, ordered by priority.
1128	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1129		let (_, result) = self.ready_at_internal(at);
1130		result.await
1131	}
1132
1133	/// Returns an iterator for ready transactions, ordered by priority.
1134	///
1135	/// Currently the set of ready transactions is returned if it exists for the most recently
1136	/// notified best block (for which maintain process was accomplished).
1137	fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1138		self.view_store.ready()
1139	}
1140
1141	/// Returns a list of future transactions in the pool.
1142	///
1143	/// Currently the set of future transactions is returned if it exists for the most recently
1144	/// notified best block (for which maintain process was accomplished).
1145	fn futures(&self) -> Vec<Self::InPoolTransaction> {
1146		self.view_store.futures()
1147	}
1148
1149	/// Returns a set of ready transactions at a given block within the specified timeout.
1150	///
1151	/// If the timeout expires before the maintain process is accomplished, a best-effort
1152	/// set of transactions is returned (refer to `ready_at_light`).
1153	async fn ready_at_with_timeout(
1154		&self,
1155		at: <Self::Block as BlockT>::Hash,
1156		timeout: std::time::Duration,
1157	) -> ReadyIteratorFor<ChainApi> {
1158		self.ready_at_with_timeout_internal(at, timeout).await
1159	}
1160}
1161
1162impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1163	for ForkAwareTxPool<ChainApi, Block>
1164where
1165	Block: BlockT,
1166	ChainApi: 'static + graph::ChainApi<Block = Block>,
1167	<Block as BlockT>::Hash: Unpin,
1168{
1169	type Block = Block;
1170	type Hash = ExtrinsicHash<ChainApi>;
1171	type Error = ChainApi::Error;
1172
1173	fn submit_local(
1174		&self,
1175		at: Block::Hash,
1176		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1177	) -> Result<Self::Hash, Self::Error> {
1178		trace!(
1179			target: LOG_TARGET,
1180			active_views_count = self.active_views_count(),
1181			"fatp::submit_local"
1182		);
1183		let xt = Arc::from(xt);
1184		let at_number = self
1185			.api
1186			.block_id_to_number(&BlockId::Hash(at))
1187			.ok()
1188			.flatten()
1189			.unwrap_or_default()
1190			.into()
1191			.as_u64();
1192
1193		// note: would be nice to get rid of sync methods one day. See: #8912
1194		let result = self
1195			.mempool
1196			.clone()
1197			.extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1198			.remove(0);
1199
1200		let insertion = match result {
1201			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1202				TransactionSource::Local,
1203				false,
1204				xt.clone(),
1205			),
1206			_ => result,
1207		}?;
1208
1209		self.view_store
1210			.submit_local(xt)
1211			.inspect_err(|_| {
1212				self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1213			})
1214			.map(|outcome| {
1215				self.mempool
1216					.clone()
1217					.update_transaction_priority_sync(outcome.hash(), outcome.priority());
1218				outcome.hash()
1219			})
1220			.or_else(|_| Ok(insertion.hash))
1221	}
1222}
1223
1224impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1225where
1226	Block: BlockT,
1227	ChainApi: graph::ChainApi<Block = Block> + 'static,
1228	<Block as BlockT>::Hash: Unpin,
1229{
1230	/// Handles a new block notification.
1231	///
1232	/// It is responsible for handling a newly notified block. It executes some sanity checks, find
1233	/// the best view to clone from and executes the new view build procedure for the notified
1234	/// block.
1235	///
1236	/// If the view is correctly created, `ready_at` pollers for this block will be triggered.
1237	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1238	async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1239		let hash_and_number = match tree_route.last() {
1240			Some(hash_and_number) => hash_and_number,
1241			None => {
1242				warn!(
1243					target: LOG_TARGET,
1244					?tree_route,
1245					"Skipping ChainEvent - no last block in tree route"
1246				);
1247				return;
1248			},
1249		};
1250
1251		if self.has_view(&hash_and_number.hash) {
1252			debug!(
1253				target: LOG_TARGET,
1254				?hash_and_number,
1255				"view already exists for block"
1256			);
1257			return;
1258		}
1259
1260		let best_view = self.view_store.find_best_view(tree_route);
1261		let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1262
1263		if let Some(view) = new_view {
1264			{
1265				let view = view.clone();
1266				self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1267					Box::from(view.pool.validated_pool().ready())
1268				});
1269			}
1270
1271			View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1272		}
1273
1274		self.finality_stall_cleanup(hash_and_number).await;
1275	}
1276
1277	/// Cleans up transactions and views outdated by potential finality stalls.
1278	///
1279	/// This function removes transactions from the pool that were included in blocks but not
1280	/// finalized within a pre-defined block height threshold. Transactions not meeting finality
1281	/// within this threshold are notified with finality timed out event. The threshold is based on
1282	/// the current block number, 'at'.
1283	///
1284	/// Additionally, this method triggers the view store to handle and remove stale views caused by
1285	/// the finality stall.
1286	async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1287		let (oldest_block_number, finality_timedout_blocks) = {
1288			let mut included_transactions = self.included_transactions.lock();
1289
1290			let Some(oldest_block_number) =
1291				included_transactions.first_key_value().map(|(k, _)| k.number)
1292			else {
1293				return;
1294			};
1295
1296			if at.number.saturating_sub(oldest_block_number).into() <=
1297				self.finality_timeout_threshold.into()
1298			{
1299				return;
1300			}
1301
1302			let mut finality_timedout_blocks =
1303				indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1304
1305			included_transactions.retain(
1306				|HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1307					let diff = at.number.saturating_sub(*view_number);
1308					if diff.into() > self.finality_timeout_threshold.into() {
1309						finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1310						false
1311					} else {
1312						true
1313					}
1314				},
1315			);
1316
1317			(oldest_block_number, finality_timedout_blocks)
1318		};
1319
1320		if !finality_timedout_blocks.is_empty() {
1321			self.ready_poll.lock().remove_cancelled();
1322			self.view_store.listener.remove_stale_controllers();
1323		}
1324
1325		let finality_timedout_blocks_len = finality_timedout_blocks.len();
1326
1327		for (block_hash, tx_hashes) in finality_timedout_blocks {
1328			self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1329
1330			self.mempool.remove_transactions(&tx_hashes).await;
1331			self.import_notification_sink.clean_notified_items(&tx_hashes);
1332			self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1333		}
1334
1335		self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1336
1337		debug!(
1338			target: LOG_TARGET,
1339			?at,
1340			included_transactions_len = ?self.included_transactions.lock().len(),
1341			finality_timedout_blocks_len,
1342			?oldest_block_number,
1343			"finality_stall_cleanup"
1344		);
1345	}
1346
1347	/// Builds a new view.
1348	///
1349	/// If `origin_view` is provided, the new view will be cloned from it. Otherwise an empty view
1350	/// will be created.
1351	///
1352	/// This method will also update multi-view listeners with newly created view.
1353	///
1354	/// The new view will not be inserted into the view store.
1355	fn build_and_plug_view(
1356		&self,
1357		origin_view: Option<Arc<View<ChainApi>>>,
1358		at: &HashAndNumber<Block>,
1359		tree_route: &TreeRoute<Block>,
1360	) -> View<ChainApi> {
1361		let enter = Instant::now();
1362		let (view, view_dropped_stream, view_aggregated_stream) =
1363			if let Some(origin_view) = origin_view {
1364				let (mut view, view_dropped_stream, view_aggragated_stream) =
1365					View::new_from_other(&origin_view, at);
1366				if !tree_route.retracted().is_empty() {
1367					view.pool.clear_recently_pruned();
1368				}
1369				(view, view_dropped_stream, view_aggragated_stream)
1370			} else {
1371				debug!(
1372					target: LOG_TARGET,
1373					?at,
1374					"creating non-cloned view"
1375				);
1376				View::new(
1377					self.api.clone(),
1378					at.clone(),
1379					self.options.clone(),
1380					self.metrics.clone(),
1381					self.is_validator.clone(),
1382				)
1383			};
1384		debug!(
1385			target: LOG_TARGET,
1386			?at,
1387			duration = ?enter.elapsed(),
1388			"build_new_view::clone_view"
1389		);
1390
1391		// 1. Capture all import notification from the very beginning, so first register all
1392		// the listeners.
1393		self.import_notification_sink.add_view(
1394			view.at.hash,
1395			view.pool.validated_pool().import_notification_stream().boxed(),
1396		);
1397
1398		self.view_store
1399			.dropped_stream_controller
1400			.add_view(view.at.hash, view_dropped_stream.boxed());
1401
1402		self.view_store
1403			.listener
1404			.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1405
1406		view
1407	}
1408
1409	/// Builds and updates a new view.
1410	///
1411	/// This functio uses [`Self::build_new_view`] to create or clone new view.
1412	///
1413	/// The new view will be updated with transactions from the tree_route and the mempool, all
1414	/// required events will be triggered, it will be inserted to the view store (respecting all
1415	/// pre-insertion actions).
1416	async fn build_and_update_view(
1417		&self,
1418		origin_view: Option<Arc<View<ChainApi>>>,
1419		at: &HashAndNumber<Block>,
1420		tree_route: &TreeRoute<Block>,
1421	) -> Option<Arc<View<ChainApi>>> {
1422		let start = Instant::now();
1423		debug!(
1424			target: LOG_TARGET,
1425			?at,
1426			origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1427			?tree_route,
1428			"build_new_view"
1429		);
1430
1431		let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1432
1433		// sync the transactions statuses and referencing views in all the listeners with newly
1434		// cloned view.
1435		view.pool.validated_pool().retrigger_notifications();
1436		debug!(
1437			target: LOG_TARGET,
1438			?at,
1439			duration = ?start.elapsed(),
1440			"register_listeners"
1441		);
1442
1443		// 2. Handle transactions from the tree route. Pruning transactions from the view first
1444		// will make some space for mempool transactions in case we are at the view's limits.
1445		let start = Instant::now();
1446		self.update_view_with_fork(&view, tree_route, at.clone()).await;
1447		debug!(
1448			target: LOG_TARGET,
1449			?at,
1450			duration = ?start.elapsed(),
1451			"update_view_with_fork"
1452		);
1453
1454		// 3. Finally, submit transactions from the mempool.
1455		let start = Instant::now();
1456		self.update_view_with_mempool(&mut view).await;
1457		debug!(
1458			target: LOG_TARGET,
1459			?at,
1460			duration= ?start.elapsed(),
1461			"update_view_with_mempool"
1462		);
1463		let view = Arc::from(view);
1464		self.view_store.insert_new_view(view.clone(), tree_route).await;
1465
1466		debug!(
1467			target: LOG_TARGET,
1468			duration = ?start.elapsed(),
1469			?at,
1470			"build_new_view"
1471		);
1472		Some(view)
1473	}
1474
1475	/// Retrieves transactions hashes from a `included_transactions` cache or, if not present,
1476	/// fetches them from the blockchain API using the block's hash `at`.
1477	///
1478	/// Returns a `Vec` of transactions hashes
1479	async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1480		if let Some(txs) = self.included_transactions.lock().get(at) {
1481			return txs.clone();
1482		};
1483
1484		debug!(
1485			target: LOG_TARGET,
1486			?at,
1487			"fetch_block_transactions from api"
1488		);
1489
1490		self.api
1491			.block_body(at.hash)
1492			.await
1493			.unwrap_or_else(|error| {
1494				warn!(
1495					target: LOG_TARGET,
1496					%error,
1497					"fetch_block_transactions: error request"
1498				);
1499				None
1500			})
1501			.unwrap_or_default()
1502			.into_iter()
1503			.map(|t| self.hash_of(&t))
1504			.collect::<Vec<_>>()
1505	}
1506
1507	/// Returns the list of xts included in all block's ancestors up to recently finalized block (or
1508	/// up finality timeout threshold), including the block itself.
1509	///
1510	/// Example: for the following chain `F<-B1<-B2<-B3` xts from `B1,B2,B3` will be returned.
1511	async fn txs_included_since_finalized(
1512		&self,
1513		at: &HashAndNumber<Block>,
1514	) -> HashSet<TxHash<Self>> {
1515		let start = Instant::now();
1516		let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1517
1518		let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1519			return Default::default();
1520		};
1521
1522		let mut all_txs = HashSet::new();
1523
1524		for block in tree_route.enacted().iter() {
1525			// note: There is no point to fetch the transactions from blocks older than threshold.
1526			// All transactions included in these blocks, were already removed from pool
1527			// with FinalityTimeout event.
1528			if at.number.saturating_sub(block.number).into() <=
1529				self.finality_timeout_threshold.into()
1530			{
1531				all_txs.extend(self.fetch_block_transactions(block).await);
1532			}
1533		}
1534
1535		debug!(
1536			target: LOG_TARGET,
1537			?at,
1538			?recent_finalized_block,
1539			extrinsics_count = all_txs.len(),
1540			duration = ?start.elapsed(),
1541			"fatp::txs_included_since_finalized"
1542		);
1543		all_txs
1544	}
1545
1546	/// Updates the given view with the transactions from the internal mempol.
1547	///
1548	/// All transactions from the mempool (excluding those which are either already imported or
1549	/// already included in blocks since recently finalized block) are submitted to the
1550	/// view.
1551	///
1552	/// If there are no views, and mempool transaction is reported as invalid for the given view,
1553	/// the transaction is notified as invalid and removed from the mempool.
1554	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1555		let xts_count = self.mempool.unwatched_and_watched_count().await;
1556		debug!(
1557			target: LOG_TARGET,
1558			view_at = ?view.at,
1559			?xts_count,
1560			active_views_count = self.active_views_count(),
1561			"update_view_with_mempool"
1562		);
1563		let included_xts = self.txs_included_since_finalized(&view.at).await;
1564
1565		let view_hash = view.at.hash;
1566		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1567			.mempool
1568			.with_transactions(|iter| {
1569				iter.filter(|(hash, _)| {
1570					match view.imported_status(hash) {
1571						ImportedStatus::Banned => {
1572							trace!(
1573								target: LOG_TARGET,
1574								?hash,
1575								?view_hash,
1576								"update_view_with_mempool: skipped (temporarily banned)"
1577							);
1578							return false;
1579						},
1580						ImportedStatus::Imported => return false,
1581						ImportedStatus::NotImported => {},
1582					}
1583					!included_xts.contains(hash)
1584				})
1585				.map(|(k, v)| (*k, v.clone()))
1586				// todo [#8835]: better approach is needed - maybe time-budget approach?
1587				.take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1588				.collect::<HashMap<_, _>>()
1589			})
1590			.await
1591			.into_iter()
1592			.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1593			.unzip();
1594
1595		let results = view
1596			.submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1597			.await
1598			.into_iter()
1599			.zip(hashes)
1600			.map(|(result, tx_hash)| async move {
1601				match result {
1602					Ok(outcome) => Ok(self
1603						.mempool
1604						.update_transaction_priority(outcome.hash(), outcome.priority())
1605						.await),
1606					Err(error) => {
1607						trace!(
1608							target: LOG_TARGET,
1609							?tx_hash,
1610							?error,
1611							"update_view_with_mempool: tx rejected from view"
1612						);
1613						Err(tx_hash)
1614					},
1615				}
1616			})
1617			.collect::<Vec<_>>();
1618
1619		let results = futures::future::join_all(results).await;
1620
1621		let submitted_count = results.len();
1622
1623		debug!(
1624			target: LOG_TARGET,
1625			view_at_hash = ?view.at.hash,
1626			submitted_count,
1627			mempool_len = self.mempool.len(),
1628			"update_view_with_mempool"
1629		);
1630
1631		self.metrics
1632			.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1633
1634		// if there are no views yet, and a single newly created view is reporting error, just send
1635		// out the invalid event, and remove transaction.
1636		if self.view_store.is_empty() {
1637			for result in results {
1638				if let Err(tx_hash) = result {
1639					self.view_store.listener.transactions_invalidated(&[tx_hash]);
1640					self.mempool.remove_transactions(&[tx_hash]).await;
1641				}
1642			}
1643		}
1644	}
1645
1646	/// Attempts to search the view store for the `provides` tags of enacted
1647	/// transactions associated with the specified `tree_route`.
1648	///
1649	/// The 'provides' tags of transactions from enacted blocks are searched
1650	/// in inactive views. Found `provide` tags are intended to serve as cache,
1651	/// helping to avoid unnecessary revalidations during pruning.
1652	async fn collect_provides_tags_from_view_store(
1653		&self,
1654		tree_route: &TreeRoute<Block>,
1655		xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1656	) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1657		let blocks_hashes = tree_route
1658			.retracted()
1659			.iter()
1660			// Skip the tip of the retracted fork, since it has an active view.
1661			.skip(1)
1662			// Skip also the tip of the enacted fork, since it has an active view too.
1663			.chain(
1664				std::iter::once(tree_route.common_block())
1665					.chain(tree_route.enacted().iter().rev().skip(1)),
1666			)
1667			.collect::<Vec<&HashAndNumber<Block>>>();
1668
1669		self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1670	}
1671
1672	/// Build a map from blocks to their extrinsics.
1673	pub async fn collect_extrinsics(
1674		&self,
1675		blocks: &[HashAndNumber<Block>],
1676	) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1677		future::join_all(blocks.iter().map(|hn| async move {
1678			(
1679				hn.hash,
1680				self.api
1681					.block_body(hn.hash)
1682					.await
1683					.unwrap_or_else(|e| {
1684						warn!(target: LOG_TARGET, %e, ": block_body error request");
1685						None
1686					})
1687					.unwrap_or_default(),
1688			)
1689		}))
1690		.await
1691		.into_iter()
1692		.collect()
1693	}
1694
1695	/// Updates the view with the transactions from the given tree route.
1696	///
1697	/// Transactions from the retracted blocks are resubmitted to the given view. Tags for
1698	/// transactions included in blocks on enacted fork are pruned from the provided view.
1699	async fn update_view_with_fork(
1700		&self,
1701		view: &View<ChainApi>,
1702		tree_route: &TreeRoute<Block>,
1703		hash_and_number: HashAndNumber<Block>,
1704	) {
1705		debug!(
1706			target: LOG_TARGET,
1707			?tree_route,
1708			at = ?view.at,
1709			"update_view_with_fork"
1710		);
1711		let api = self.api.clone();
1712
1713		// Collect extrinsics on the enacted path in a map from block hn -> extrinsics.
1714		let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1715
1716		// Create a map from enacted blocks' extrinsics to their `provides`
1717		// tags based on inactive views.
1718		let known_provides_tags = Arc::new(
1719			self.collect_provides_tags_from_view_store(
1720				tree_route,
1721				extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1722			)
1723			.await,
1724		);
1725
1726		debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1727
1728		// We keep track of everything we prune so that later we won't add
1729		// transactions with those hashes from the retracted blocks.
1730		let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1731		future::join_all(tree_route.enacted().iter().map(|hn| {
1732			let api = api.clone();
1733			let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1734			let known_provides_tags = known_provides_tags.clone();
1735			async move {
1736				(
1737					hn,
1738					crate::prune_known_txs_for_block(
1739						hn,
1740						&*api,
1741						&view.pool,
1742						Some(xts),
1743						Some(known_provides_tags),
1744					)
1745					.await,
1746				)
1747			}
1748		}))
1749		.await
1750		.into_iter()
1751		.for_each(|(key, enacted_log)| {
1752			pruned_log.extend(enacted_log.clone());
1753			self.included_transactions.lock().insert(key.clone(), enacted_log);
1754		});
1755
1756		let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1757		self.metrics
1758			.report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1759
1760		// resubmit
1761		{
1762			let mut resubmit_transactions = Vec::new();
1763
1764			for retracted in tree_route.retracted().iter().rev() {
1765				let hash = retracted.hash;
1766
1767				let block_transactions = api
1768					.block_body(hash)
1769					.await
1770					.unwrap_or_else(|error| {
1771						warn!(
1772							target: LOG_TARGET,
1773							%error,
1774							"Failed to fetch block body"
1775						);
1776						None
1777					})
1778					.unwrap_or_default()
1779					.into_iter();
1780
1781				let mut resubmitted_to_report = 0;
1782
1783				let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1784					|(tx_hash, _)| {
1785						let contains = pruned_log.contains(&tx_hash);
1786
1787						// need to count all transactions, not just filtered, here
1788						resubmitted_to_report += 1;
1789
1790						if !contains {
1791							trace!(
1792								target: LOG_TARGET,
1793								?tx_hash,
1794								?hash,
1795								"Resubmitting from retracted block"
1796							);
1797						}
1798						!contains
1799					},
1800				);
1801				let mut result = vec![];
1802				for (tx_hash, tx) in txs {
1803					result.push(
1804						// find arc if tx is known
1805						self.mempool
1806							.get_by_hash(tx_hash)
1807							.await
1808							.map(|tx| (tx.source(), tx.tx()))
1809							.unwrap_or_else(|| {
1810								// These transactions are coming from retracted blocks, we
1811								// should simply consider them external.
1812								(TimedTransactionSource::new_external(true), Arc::from(tx))
1813							}),
1814					);
1815				}
1816				resubmit_transactions.extend(result);
1817
1818				self.metrics.report(|metrics| {
1819					metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1820				});
1821			}
1822
1823			let _ = view
1824				.pool
1825				.resubmit_at(
1826					&hash_and_number,
1827					resubmit_transactions,
1828					ValidateTransactionPriority::Maintained,
1829				)
1830				.await;
1831		}
1832	}
1833
1834	/// Executes the maintainance for the finalized event.
1835	///
1836	/// Performs a house-keeping required for finalized event. This includes:
1837	/// - executing the on finalized procedure for the view store,
1838	/// - purging finalized transactions from the mempool and triggering mempool revalidation,
1839	async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1840		let start = Instant::now();
1841		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1842		debug!(
1843			target: LOG_TARGET,
1844			?finalized_number,
1845			?tree_route,
1846			active_views_count = self.active_views_count(),
1847			"handle_finalized"
1848		);
1849		let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1850
1851		self.mempool.purge_finalized_transactions(&finalized_xts).await;
1852		self.import_notification_sink.clean_notified_items(&finalized_xts);
1853
1854		self.metrics
1855			.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1856
1857		if let Ok(Some(finalized_number)) = finalized_number {
1858			self.included_transactions
1859				.lock()
1860				.retain(|cached_block, _| finalized_number < cached_block.number);
1861			self.revalidation_queue
1862				.revalidate_mempool(
1863					self.mempool.clone(),
1864					self.view_store.clone(),
1865					HashAndNumber { hash: finalized_hash, number: finalized_number },
1866				)
1867				.await;
1868		} else {
1869			debug!(
1870				target: LOG_TARGET,
1871				?finalized_number,
1872				"handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1873			);
1874		}
1875
1876		self.ready_poll.lock().remove_cancelled();
1877
1878		debug!(
1879			target: LOG_TARGET,
1880			active_views_count = self.active_views_count(),
1881			included_transactions_len = ?self.included_transactions.lock().len(),
1882			duration = ?start.elapsed(),
1883			"handle_finalized after"
1884		);
1885	}
1886
1887	/// Computes a hash of the provided transaction
1888	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1889		self.api.hash_and_length(xt).0
1890	}
1891
1892	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
1893	/// one.
1894	///
1895	/// This asynchronous function verifies the new transaction against the most recent view. If a
1896	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
1897	/// new transaction.
1898	///
1899	/// If no lower-priority transaction is found, the function returns an error indicating the
1900	/// transaction was dropped immediately.
1901	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1902	async fn attempt_transaction_replacement(
1903		&self,
1904		source: TransactionSource,
1905		at_number: u64,
1906		watched: bool,
1907		xt: ExtrinsicFor<ChainApi>,
1908	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1909		let best_view = self
1910			.view_store
1911			.most_recent_view
1912			.read()
1913			.as_ref()
1914			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1915			.clone();
1916
1917		let (xt_hash, validated_tx) = best_view
1918			.pool
1919			.verify_one(
1920				best_view.at.hash,
1921				best_view.at.number,
1922				TimedTransactionSource::from_transaction_source(source, false),
1923				xt.clone(),
1924				crate::graph::CheckBannedBeforeVerify::Yes,
1925				ValidateTransactionPriority::Submitted,
1926			)
1927			.await;
1928
1929		let Some(priority) = validated_tx.priority() else {
1930			return Err(TxPoolApiError::ImmediatelyDropped);
1931		};
1932
1933		let insertion_info = self
1934			.mempool
1935			.try_insert_with_replacement(xt, priority, source, at_number, watched)
1936			.await?;
1937		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1938	}
1939
1940	/// Sync version of [`Self::attempt_transaction_replacement`].
1941	fn attempt_transaction_replacement_sync(
1942		&self,
1943		source: TransactionSource,
1944		watched: bool,
1945		xt: ExtrinsicFor<ChainApi>,
1946	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1947		let HashAndNumber { number: at_number, hash: at_hash } = self
1948			.view_store
1949			.most_recent_view
1950			.read()
1951			.as_ref()
1952			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1953			.at;
1954
1955		let ValidTransaction { priority, .. } = self
1956			.api
1957			.validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1958			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1959			.map_err(|e| match e {
1960				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1961				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1962			})?;
1963		let xt_hash = self.hash_of(&xt);
1964
1965		let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1966			xt,
1967			priority,
1968			source,
1969			at_number.into().as_u64(),
1970			watched,
1971		)?;
1972		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1973	}
1974
1975	fn post_attempt_transaction_replacement(
1976		&self,
1977		tx_hash: ExtrinsicHash<ChainApi>,
1978		insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1979	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1980		for worst_hash in &insertion_info.removed {
1981			trace!(
1982				target: LOG_TARGET,
1983				tx_hash = ?worst_hash,
1984				new_tx_hash = ?tx_hash,
1985				"removed: replaced by"
1986			);
1987			self.view_store
1988				.listener
1989				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1990
1991			self.view_store
1992				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1993					listener.limits_enforced(&removed_tx_hash);
1994				});
1995		}
1996
1997		return Ok(insertion_info);
1998	}
1999}
2000
2001#[async_trait]
2002impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
2003where
2004	Block: BlockT,
2005	ChainApi: 'static + graph::ChainApi<Block = Block>,
2006	<Block as BlockT>::Hash: Unpin,
2007{
2008	/// Executes the maintainance for the given chain event.
2009	async fn maintain(&self, event: ChainEvent<Self::Block>) {
2010		let start = Instant::now();
2011		debug!(
2012			target: LOG_TARGET,
2013			?event,
2014			"processing event"
2015		);
2016
2017		self.view_store.finish_background_revalidations().await;
2018
2019		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
2020
2021		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
2022			match self.api.tree_route(from, to) {
2023				Ok(tree_route) => Ok(tree_route),
2024				Err(e) => {
2025					return Err(format!(
2026						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
2027					))
2028				},
2029			}
2030		};
2031		let block_id_to_number =
2032			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
2033
2034		let result =
2035			self.enactment_state
2036				.lock()
2037				.update(&event, &compute_tree_route, &block_id_to_number);
2038
2039		match result {
2040			Err(error) => {
2041				debug!(
2042					target: LOG_TARGET,
2043					%error,
2044					"enactment_state::update error"
2045				);
2046				self.enactment_state.lock().force_update(&event);
2047			},
2048			Ok(EnactmentAction::Skip) => return,
2049			Ok(EnactmentAction::HandleFinalization) => {
2050				// todo [#5492]: in some cases handle_new_block is actually needed (new_num >
2051				// tips_of_forks) let hash = event.hash();
2052				// if !self.has_view(hash) {
2053				// 	if let Ok(tree_route) = compute_tree_route(prev_finalized_block, hash) {
2054				// 		self.handle_new_block(&tree_route).await;
2055				// 	}
2056				// }
2057			},
2058			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2059				self.handle_new_block(&tree_route).await;
2060			},
2061		};
2062
2063		match event {
2064			ChainEvent::NewBestBlock { .. } => {},
2065			ChainEvent::Finalized { hash, ref tree_route } => {
2066				self.handle_finalized(hash, tree_route).await;
2067
2068				debug!(
2069					target: LOG_TARGET,
2070					?tree_route,
2071					?prev_finalized_block,
2072					"on-finalized enacted"
2073				);
2074			},
2075		}
2076
2077		let duration = start.elapsed();
2078		let mempool_len = self.mempool_len().await;
2079		debug!(
2080			target: LOG_TARGET,
2081			txs = ?mempool_len,
2082			a = self.active_views_count(),
2083			i = self.inactive_views_count(),
2084			views = ?self.views_stats(),
2085			?event,
2086			?duration,
2087			"maintain"
2088		);
2089
2090		self.metrics.report(|metrics| {
2091			let (unwatched, watched) = mempool_len;
2092			let _ = (
2093				self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2094				self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2095				watched.try_into().map(|v| metrics.watched_txs.set(v)),
2096				unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2097			);
2098			metrics.maintain_duration.observe(duration.as_secs_f64());
2099		});
2100	}
2101}
2102
2103impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2104where
2105	Block: BlockT,
2106	Client: sp_api::ProvideRuntimeApi<Block>
2107		+ sc_client_api::BlockBackend<Block>
2108		+ sc_client_api::blockchain::HeaderBackend<Block>
2109		+ sp_runtime::traits::BlockIdTo<Block>
2110		+ sc_client_api::ExecutorProvider<Block>
2111		+ sc_client_api::UsageProvider<Block>
2112		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2113		+ Send
2114		+ Sync
2115		+ 'static,
2116	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2117	<Block as BlockT>::Hash: std::marker::Unpin,
2118{
2119	/// Create new fork aware transaction pool for a full node with the provided api.
2120	pub fn new_full(
2121		options: Options,
2122		is_validator: IsValidator,
2123		prometheus: Option<&PrometheusRegistry>,
2124		spawner: impl SpawnEssentialNamed,
2125		client: Arc<Client>,
2126	) -> Self {
2127		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2128		let pool = Self::new_with_background_worker(
2129			options,
2130			is_validator,
2131			pool_api,
2132			prometheus,
2133			spawner,
2134			client.usage_info().chain.best_hash,
2135			client.usage_info().chain.finalized_hash,
2136		);
2137
2138		pool
2139	}
2140}
2141
2142#[cfg(test)]
2143mod reduce_multiview_result_tests {
2144	use super::*;
2145	use sp_core::H256;
2146	#[derive(Debug, PartialEq, Clone)]
2147	enum Error {
2148		Custom(u8),
2149	}
2150
2151	#[test]
2152	fn empty() {
2153		sp_tracing::try_init_simple();
2154		let input = HashMap::default();
2155		let r = reduce_multiview_result::<H256, H256, Error>(input);
2156		assert!(r.is_empty());
2157	}
2158
2159	#[test]
2160	fn errors_only() {
2161		sp_tracing::try_init_simple();
2162		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2163			(
2164				H256::repeat_byte(0x13),
2165				vec![
2166					Err(Error::Custom(10)),
2167					Err(Error::Custom(11)),
2168					Err(Error::Custom(12)),
2169					Err(Error::Custom(13)),
2170				],
2171			),
2172			(
2173				H256::repeat_byte(0x14),
2174				vec![
2175					Err(Error::Custom(20)),
2176					Err(Error::Custom(21)),
2177					Err(Error::Custom(22)),
2178					Err(Error::Custom(23)),
2179				],
2180			),
2181			(
2182				H256::repeat_byte(0x15),
2183				vec![
2184					Err(Error::Custom(30)),
2185					Err(Error::Custom(31)),
2186					Err(Error::Custom(32)),
2187					Err(Error::Custom(33)),
2188				],
2189			),
2190		];
2191		let input = HashMap::from_iter(v.clone());
2192		let r = reduce_multiview_result(input);
2193
2194		// order in HashMap is random, the result shall be one of:
2195		assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2196	}
2197
2198	#[test]
2199	#[should_panic]
2200	#[cfg(debug_assertions)]
2201	fn invalid_lengths() {
2202		sp_tracing::try_init_simple();
2203		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2204			(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2205			(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2206		];
2207		let input = HashMap::from_iter(v);
2208		let _ = reduce_multiview_result(input);
2209	}
2210
2211	#[test]
2212	fn only_hashes() {
2213		sp_tracing::try_init_simple();
2214
2215		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2216			(
2217				H256::repeat_byte(0x13),
2218				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2219			),
2220			(
2221				H256::repeat_byte(0x14),
2222				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2223			),
2224		];
2225		let input = HashMap::from_iter(v);
2226		let r = reduce_multiview_result(input);
2227
2228		assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2229	}
2230
2231	#[test]
2232	fn one_view() {
2233		sp_tracing::try_init_simple();
2234		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2235			H256::repeat_byte(0x13),
2236			vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2237		)];
2238		let input = HashMap::from_iter(v);
2239		let r = reduce_multiview_result(input);
2240
2241		assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2242	}
2243
2244	#[test]
2245	fn mix() {
2246		sp_tracing::try_init_simple();
2247		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2248			(
2249				H256::repeat_byte(0x13),
2250				vec![
2251					Ok(H256::repeat_byte(0x10)),
2252					Err(Error::Custom(11)),
2253					Err(Error::Custom(12)),
2254					Err(Error::Custom(33)),
2255				],
2256			),
2257			(
2258				H256::repeat_byte(0x14),
2259				vec![
2260					Err(Error::Custom(20)),
2261					Ok(H256::repeat_byte(0x21)),
2262					Err(Error::Custom(22)),
2263					Err(Error::Custom(33)),
2264				],
2265			),
2266			(
2267				H256::repeat_byte(0x15),
2268				vec![
2269					Err(Error::Custom(30)),
2270					Err(Error::Custom(31)),
2271					Ok(H256::repeat_byte(0x32)),
2272					Err(Error::Custom(33)),
2273				],
2274			),
2275		];
2276		let input = HashMap::from_iter(v);
2277		let r = reduce_multiview_result(input);
2278
2279		assert_eq!(
2280			r,
2281			vec![
2282				Ok(H256::repeat_byte(0x10)),
2283				Ok(H256::repeat_byte(0x21)),
2284				Ok(H256::repeat_byte(0x32)),
2285				Err(Error::Custom(33))
2286			]
2287		);
2288	}
2289}