referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
fork_aware_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate fork-aware transaction pool implementation.
20
21use super::{
22	dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23	import_notification_sink::MultiViewImportNotificationSink,
24	metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25	multi_view_listener::MultiViewListener,
26	tx_mem_pool::{InsertionInfo, TxMemPool},
27	view::View,
28	view_store::ViewStore,
29};
30use crate::{
31	api::FullChainApi,
32	common::{
33		sliding_stat::DurationSlidingStats,
34		tracing_log_xt::{log_xt_debug, log_xt_trace},
35		STAT_SLIDING_WINDOW,
36	},
37	enactment_state::{EnactmentAction, EnactmentState},
38	fork_aware_txpool::{
39		dropped_watcher::{DroppedReason, DroppedTransaction},
40		revalidation_worker,
41	},
42	graph::{
43		self,
44		base_pool::{TimedTransactionSource, Transaction},
45		BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46	},
47	insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48	LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52	channel::oneshot,
53	future::{self},
54	prelude::*,
55	FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62	TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67	generic::BlockId,
68	traits::{Block as BlockT, NumberFor},
69	transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70	Saturating,
71};
72use std::{
73	collections::{BTreeMap, HashMap, HashSet},
74	pin::Pin,
75	sync::Arc,
76	time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, instrument, trace, warn, Level};
80
81/// The maximum block height difference before considering a view or transaction as timed-out
82/// due to a finality stall. When the difference exceeds this threshold, elements are treated
83/// as stale and are subject to cleanup.
84const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86/// The number of transactions that will be sent from the mempool to the newly created view during
87/// the maintain process.
88// todo [#8835]: better approach is needed - maybe time-budget approach?
89// note: yap parachain block size.
90const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92/// Fork aware transaction pool task, that needs to be polled.
93pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95/// A structure that maintains a collection of pollers associated with specific block hashes
96/// (views).
97struct ReadyPoll<T, Block>
98where
99	Block: BlockT,
100{
101	pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106	Block: BlockT,
107{
108	/// Creates a new `ReadyPoll` instance with an empty collection of pollers.
109	fn new() -> Self {
110		Self { pollers: Default::default() }
111	}
112
113	/// Adds a new poller for a specific block hash and returns the `Receiver` end of the created
114	/// oneshot channel which will be used to deliver polled result.
115	fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116		let (s, r) = oneshot::channel();
117		self.pollers.entry(at).or_default().push(s);
118		r
119	}
120
121	/// Triggers all pollers associated with a specific block by sending the polled result through
122	/// each oneshot channel.
123	///
124	/// `ready_iterator` is a closure that generates the result data to be sent to the pollers.
125	fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126		debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127		let Some(pollers) = self.pollers.remove(&at) else { return };
128		pollers.into_iter().for_each(|p| {
129			debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130			let _ = p.send(ready_iterator());
131		});
132	}
133
134	/// Removes pollers that have their oneshot channels cancelled.
135	fn remove_cancelled(&mut self) {
136		self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137	}
138}
139
140/// The fork-aware transaction pool.
141///
142/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
143pub struct ForkAwareTxPool<ChainApi, Block>
144where
145	Block: BlockT,
146	ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148	/// The reference to the `ChainApi` provided by client/backend.
149	api: Arc<ChainApi>,
150
151	/// Intermediate buffer for the incoming transaction.
152	mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154	/// The store for all the views.
155	view_store: Arc<ViewStore<ChainApi, Block>>,
156
157	/// Utility for managing pollers of `ready_at` future.
158	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160	/// Prometheus's metrics endpoint.
161	metrics: PrometheusMetrics,
162
163	/// Collector of transaction statuses updates, reports transaction events metrics.
164	events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166	/// Util tracking best and finalized block.
167	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169	/// The channel allowing to send revalidation jobs to the background thread.
170	revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172	/// Util providing an aggregated stream of transactions that were imported to ready queue in
173	/// any view.
174	import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176	/// Externally provided pool options.
177	options: Options,
178
179	/// Is node the validator.
180	is_validator: IsValidator,
181
182	/// Finality timeout threshold.
183	///
184	/// Sets the maximum permissible block height difference between the latest block
185	/// and the oldest transactions or views in the pool. Beyond this difference,
186	/// transactions/views are considered timed out and eligible for cleanup.
187	finality_timeout_threshold: usize,
188
189	/// Transactions included in blocks since the most recently finalized block (including this
190	/// block).
191	///
192	/// Holds a mapping of block hash and number to their corresponding transaction hashes.
193	///
194	/// Intended to be used in the finality stall cleanups and also as a cache for all in-block
195	/// transactions.
196	included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198	/// Stats for submit call durations
199	submit_stats: DurationSlidingStats,
200
201	/// Stats for submit_and_watch call durations
202	submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207	Block: BlockT,
208	ChainApi: graph::ChainApi<Block = Block> + 'static,
209	<Block as BlockT>::Hash: Unpin,
210{
211	// Injects a view for the given block to self.
212	//
213	// Helper for the pool new methods.
214	fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215		if let Some(block_number) =
216			self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217		{
218			let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219			let tree_route =
220				&TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221			let view = self.build_and_plug_view(None, &at_best, &tree_route);
222			self.view_store.insert_new_view_sync(view.into(), &tree_route);
223			trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224		};
225		self
226	}
227
228	/// Create new fork aware transaction pool with provided shared instance of `ChainApi` intended
229	/// for tests.
230	pub fn new_test(
231		pool_api: Arc<ChainApi>,
232		best_block_hash: Block::Hash,
233		finalized_hash: Block::Hash,
234		finality_timeout_threshold: Option<usize>,
235	) -> (Self, [ForkAwareTxPoolTask; 2]) {
236		Self::new_test_with_limits(
237			pool_api,
238			best_block_hash,
239			finalized_hash,
240			Options::default().ready,
241			Options::default().future,
242			usize::MAX,
243			finality_timeout_threshold,
244		)
245	}
246
247	/// Create new fork aware transaction pool with given limits and with provided shared instance
248	/// of `ChainApi` intended for tests.
249	pub fn new_test_with_limits(
250		pool_api: Arc<ChainApi>,
251		best_block_hash: Block::Hash,
252		finalized_hash: Block::Hash,
253		ready_limits: crate::PoolLimit,
254		future_limits: crate::PoolLimit,
255		mempool_max_transactions_count: usize,
256		finality_timeout_threshold: Option<usize>,
257	) -> (Self, [ForkAwareTxPoolTask; 2]) {
258		let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259		let listener = Arc::new(listener);
260
261		let (import_notification_sink, import_notification_sink_task) =
262			MultiViewImportNotificationSink::new_with_worker();
263
264		let (mempool, mempool_task) = TxMemPool::new(
265			pool_api.clone(),
266			listener.clone(),
267			Default::default(),
268			mempool_max_transactions_count,
269			ready_limits.total_bytes + future_limits.total_bytes,
270		);
271		let mempool = Arc::from(mempool);
272
273		let (dropped_stream_controller, dropped_stream) =
274			MultiViewDroppedWatcherController::<ChainApi>::new();
275
276		let view_store = Arc::new(ViewStore::new(
277			pool_api.clone(),
278			listener,
279			dropped_stream_controller,
280			import_notification_sink.clone(),
281		));
282
283		let dropped_monitor_task = Self::dropped_monitor_task(
284			dropped_stream,
285			mempool.clone(),
286			view_store.clone(),
287			import_notification_sink.clone(),
288		);
289
290		let combined_tasks = async move {
291			tokio::select! {
292				_ = listener_task => {},
293				_ = import_notification_sink_task => {},
294				_ = dropped_monitor_task => {}
295			}
296		}
297		.boxed();
298
299		let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301		(
302			Self {
303				mempool,
304				api: pool_api,
305				view_store,
306				ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308					best_block_hash,
309					finalized_hash,
310				))),
311				revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312				import_notification_sink,
313				options,
314				is_validator: false.into(),
315				metrics: Default::default(),
316				events_metrics_collector: EventsMetricsCollector::default(),
317				finality_timeout_threshold: finality_timeout_threshold
318					.unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319				included_transactions: Default::default(),
320				submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321				submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322					STAT_SLIDING_WINDOW,
323				)),
324			}
325			.inject_initial_view(best_block_hash),
326			[combined_tasks, mempool_task],
327		)
328	}
329
330	/// Monitors the stream of dropped transactions and removes them from the mempool and
331	/// view_store.
332	///
333	/// This asynchronous task continuously listens for dropped transaction notifications provided
334	/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
335	/// and `import_notification_sink` instances. For Usurped events, the transaction is also
336	/// removed from the view_store.
337	async fn dropped_monitor_task(
338		mut dropped_stream: StreamOfDropped<ChainApi>,
339		mempool: Arc<TxMemPool<ChainApi, Block>>,
340		view_store: Arc<ViewStore<ChainApi, Block>>,
341		import_notification_sink: MultiViewImportNotificationSink<
342			Block::Hash,
343			ExtrinsicHash<ChainApi>,
344		>,
345	) {
346		let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347		loop {
348			let Some(dropped) = dropped_stream.next().await else {
349				debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350				break;
351			};
352			let start = Instant::now();
353			let tx_hash = dropped.tx_hash;
354			trace!(
355				target: LOG_TARGET,
356				?tx_hash,
357				reason = ?dropped.reason,
358				"fatp::dropped notification, removing"
359			);
360			match dropped.reason {
361				DroppedReason::Usurped(new_tx_hash) => {
362					if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363						view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364					} else {
365						trace!(
366							target: LOG_TARGET,
367							tx_hash = ?new_tx_hash,
368							"error: dropped_monitor_task: no entry in mempool for new transaction"
369						);
370					};
371				},
372				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374				},
375			};
376
377			mempool.remove_transactions(&[tx_hash]).await;
378			import_notification_sink.clean_notified_items(&[tx_hash]);
379			view_store.listener.transaction_dropped(dropped);
380			insert_and_log_throttled!(
381				Level::DEBUG,
382				target:LOG_TARGET_STAT,
383				prefix:"dropped_stats",
384				dropped_stats,
385				start.elapsed().into()
386			);
387		}
388	}
389
390	/// Creates new fork aware transaction pool with the background revalidation worker.
391	///
392	/// The txpool essential tasks (including a revalidation worker) are spawned using provided
393	/// spawner.
394	pub fn new_with_background_worker(
395		options: Options,
396		is_validator: IsValidator,
397		pool_api: Arc<ChainApi>,
398		prometheus: Option<&PrometheusRegistry>,
399		spawner: impl SpawnEssentialNamed,
400		best_block_hash: Block::Hash,
401		finalized_hash: Block::Hash,
402	) -> Self {
403		let metrics = PrometheusMetrics::new(prometheus);
404		let (events_metrics_collector, event_metrics_task) =
405			EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
406
407		let (listener, listener_task) =
408			MultiViewListener::new_with_worker(events_metrics_collector.clone());
409		let listener = Arc::new(listener);
410
411		let (revalidation_queue, revalidation_task) =
412			revalidation_worker::RevalidationQueue::new_with_worker();
413
414		let (import_notification_sink, import_notification_sink_task) =
415			MultiViewImportNotificationSink::new_with_worker();
416
417		let (mempool, blocking_mempool_task) = TxMemPool::new(
418			pool_api.clone(),
419			listener.clone(),
420			metrics.clone(),
421			options.total_count(),
422			options.ready.total_bytes + options.future.total_bytes,
423		);
424		let mempool = Arc::from(mempool);
425
426		let (dropped_stream_controller, dropped_stream) =
427			MultiViewDroppedWatcherController::<ChainApi>::new();
428
429		let view_store = Arc::new(ViewStore::new(
430			pool_api.clone(),
431			listener,
432			dropped_stream_controller,
433			import_notification_sink.clone(),
434		));
435
436		let dropped_monitor_task = Self::dropped_monitor_task(
437			dropped_stream,
438			mempool.clone(),
439			view_store.clone(),
440			import_notification_sink.clone(),
441		);
442
443		let combined_tasks = async move {
444			tokio::select! {
445				_ = listener_task => {}
446				_ = revalidation_task => {},
447				_ = import_notification_sink_task => {},
448				_ = dropped_monitor_task => {}
449				_ = event_metrics_task => {},
450			}
451		}
452		.boxed();
453		spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
454		spawner.spawn_essential_blocking(
455			"txpool-background",
456			Some("transaction-pool"),
457			blocking_mempool_task,
458		);
459
460		Self {
461			mempool,
462			api: pool_api,
463			view_store,
464			ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
465			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
466				best_block_hash,
467				finalized_hash,
468			))),
469			revalidation_queue: Arc::from(revalidation_queue),
470			import_notification_sink,
471			options,
472			metrics,
473			events_metrics_collector,
474			is_validator,
475			finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
476			included_transactions: Default::default(),
477			submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
478			submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
479				STAT_SLIDING_WINDOW,
480			)),
481		}
482		.inject_initial_view(best_block_hash)
483	}
484
485	/// Get access to the underlying api
486	pub fn api(&self) -> &ChainApi {
487		&self.api
488	}
489
490	/// Provides a status for all views at the tips of the forks.
491	pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
492		self.view_store.status()
493	}
494
495	/// Provides a number of views at the tips of the forks.
496	pub fn active_views_count(&self) -> usize {
497		self.view_store.active_views.read().len()
498	}
499
500	/// Provides a number of views at the tips of the forks.
501	pub fn inactive_views_count(&self) -> usize {
502		self.view_store.inactive_views.read().len()
503	}
504
505	/// Provides internal views statistics.
506	///
507	/// Provides block number, count of ready, count of future transactions for every view. It is
508	/// suitable for printing log information.
509	fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
510		self.view_store
511			.active_views
512			.read()
513			.iter()
514			.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
515			.collect()
516	}
517
518	/// Checks if there is a view at the tip of the fork with given hash.
519	pub fn has_view(&self, hash: &Block::Hash) -> bool {
520		self.view_store.active_views.read().contains_key(hash)
521	}
522
523	/// Returns a number of unwatched and watched transactions in internal mempool.
524	///
525	/// Intended for use in unit tests.
526	pub async fn mempool_len(&self) -> (usize, usize) {
527		self.mempool.unwatched_and_watched_count().await
528	}
529
530	/// Returns a set of future transactions for given block hash.
531	///
532	/// Intended for logging / tests.
533	pub fn futures_at(
534		&self,
535		at: Block::Hash,
536	) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
537		self.view_store.futures_at(at)
538	}
539
540	/// Returns a best-effort set of ready transactions for a given block, without executing full
541	/// maintain process.
542	///
543	/// The method attempts to build a temporary view and create an iterator of ready transactions
544	/// for a specific `at` hash. If a valid view is found, it collects and prunes
545	/// transactions already included in the blocks and returns the valid set. Not finding a view
546	/// returns with the ready transaction set found in the most recent view processed by the
547	/// fork-aware txpool. Not being able to query for block number for the provided `at` block hash
548	/// results in returning an empty transaction set.
549	///
550	/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
551	/// so this process shall be fast.
552	pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
553		let start = Instant::now();
554		let api = self.api.clone();
555		debug!(
556			target: LOG_TARGET,
557			?at,
558			"fatp::ready_at_light"
559		);
560
561		let at_number = self.api.resolve_block_number(at).ok();
562		let finalized_number = self
563			.api
564			.resolve_block_number(self.enactment_state.lock().recent_finalized_block())
565			.ok();
566
567		// Prune all txs from the best view found, considering the extrinsics part of the blocks
568		// that are more recent than the view itself.
569		if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
570			let at_hn = HashAndNumber { hash: at, number: at_number };
571			finalized_number.and_then(|finalized_number| {
572				self.view_store
573					.find_view_descendent_up_to_number(&at_hn, finalized_number)
574					.map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
575			})
576		}) {
577			let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
578			let mut all_extrinsics = vec![];
579			for h in enacted_blocks {
580				let extrinsics = api
581					.block_body(h)
582					.await
583					.unwrap_or_else(|error| {
584						warn!(
585							target: LOG_TARGET,
586							%error,
587							"Compute ready light transactions: error request"
588						);
589						None
590					})
591					.unwrap_or_default()
592					.into_iter()
593					.map(|t| api.hash_and_length(&t).0);
594				all_extrinsics.extend(extrinsics);
595			}
596
597			let before_count = tmp_view.pool.validated_pool().status().ready;
598			let tags = tmp_view
599				.pool
600				.validated_pool()
601				.extrinsics_tags(&all_extrinsics)
602				.into_iter()
603				.flatten()
604				.flatten()
605				.collect::<Vec<_>>();
606			let _ = tmp_view.pool.validated_pool().prune_tags(tags);
607
608			let after_count = tmp_view.pool.validated_pool().status().ready;
609			debug!(
610				target: LOG_TARGET,
611				?at,
612				best_view_hash = ?view.at.hash,
613				before_count,
614				to_be_removed = all_extrinsics.len(),
615				after_count,
616				duration = ?start.elapsed(),
617				"fatp::ready_at_light -> light"
618			);
619			Box::new(tmp_view.pool.validated_pool().ready())
620		} else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
621			// Fallback for the case when `at` is not on the already known fork.
622			// Falls back to the most recent view, which may include txs which
623			// are invalid or already included in the blocks but can still yield a
624			// partially valid ready set, which is still better than including nothing.
625			debug!(
626				target: LOG_TARGET,
627				?at,
628				duration = ?start.elapsed(),
629				"fatp::ready_at_light -> most_recent_view"
630			);
631			Box::new(most_recent_view.pool.validated_pool().ready())
632		} else {
633			let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
634			debug!(
635				target: LOG_TARGET,
636				?at,
637				duration = ?start.elapsed(),
638				"fatp::ready_at_light -> empty"
639			);
640			empty
641		}
642	}
643
644	/// Waits for the set of ready transactions for a given block up to a specified timeout.
645	///
646	/// This method combines two futures:
647	/// - The `ready_at` future, which waits for the ready transactions resulting from the full
648	/// maintenance process to be available.
649	/// - The `ready_at_light` future, used as a fallback if the timeout expires before `ready_at`
650	/// completes. This provides a best-effort, ready set of transactions as a result light
651	/// maintain.
652	///
653	/// Returns a future resolving to a ready iterator of transactions.
654	async fn ready_at_with_timeout_internal(
655		&self,
656		at: Block::Hash,
657		timeout: std::time::Duration,
658	) -> ReadyIteratorFor<ChainApi> {
659		debug!(
660			target: LOG_TARGET,
661			?at,
662			?timeout,
663			"fatp::ready_at_with_timeout"
664		);
665		let timeout = futures_timer::Delay::new(timeout);
666		let (view_already_exists, ready_at) = self.ready_at_internal(at);
667
668		if view_already_exists {
669			return ready_at.await;
670		}
671
672		let maybe_ready = async move {
673			select! {
674				ready = ready_at => Some(ready),
675				_ = timeout => {
676					debug!(
677						target: LOG_TARGET,
678						?at,
679						"Timeout fired waiting for transaction pool at block. Proceeding with production."
680					);
681					None
682				}
683			}
684		};
685
686		let fall_back_ready = self.ready_at_light(at);
687		let (maybe_ready, fall_back_ready) =
688			futures::future::join(maybe_ready, fall_back_ready).await;
689		maybe_ready.unwrap_or(fall_back_ready)
690	}
691
692	fn ready_at_internal(
693		&self,
694		at: Block::Hash,
695	) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
696		let mut ready_poll = self.ready_poll.lock();
697
698		if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
699			debug!(
700				target: LOG_TARGET,
701				?at,
702				?inactive,
703				"fatp::ready_at_internal"
704			);
705			let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
706			return (true, async move { iterator }.boxed());
707		}
708
709		let pending = ready_poll
710			.add(at)
711			.map(|received| {
712				received.unwrap_or_else(|error| {
713					warn!(
714						target: LOG_TARGET,
715						%error,
716						"Error receiving ready-set iterator"
717					);
718					Box::new(std::iter::empty())
719				})
720			})
721			.boxed();
722		debug!(
723			target: LOG_TARGET,
724			?at,
725			pending_keys = ?ready_poll.pollers.keys(),
726			"fatp::ready_at_internal"
727		);
728		(false, pending)
729	}
730
731	/// Refer to [`Self::submit_and_watch`]
732	async fn submit_and_watch_inner(
733		&self,
734		at: Block::Hash,
735		source: TransactionSource,
736		xt: TransactionFor<Self>,
737	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
738		let xt = Arc::from(xt);
739
740		let at_number = self
741			.api
742			.block_id_to_number(&BlockId::Hash(at))
743			.ok()
744			.flatten()
745			.unwrap_or_default()
746			.into()
747			.as_u64();
748
749		let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
750			Ok(result) => result,
751			Err(TxPoolApiError::ImmediatelyDropped) => {
752				self.attempt_transaction_replacement(source, at_number, true, xt.clone())
753					.await?
754			},
755			Err(e) => return Err(e.into()),
756		};
757
758		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
759		self.events_metrics_collector.report_submitted(&insertion);
760
761		match self.view_store.submit_and_watch(at, insertion.source, xt).await {
762			Err(e) => {
763				self.mempool.remove_transactions(&[insertion.hash]).await;
764				Err(e.into())
765			},
766			Ok(mut outcome) => {
767				self.mempool
768					.update_transaction_priority(outcome.hash(), outcome.priority())
769					.await;
770				Ok(outcome.expect_watcher())
771			},
772		}
773	}
774
775	/// Refer to [`Self::submit_at`]
776	async fn submit_at_inner(
777		&self,
778		at: Block::Hash,
779		source: TransactionSource,
780		xts: Vec<TransactionFor<Self>>,
781	) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
782		let at_number = self
783			.api
784			.block_id_to_number(&BlockId::Hash(at))
785			.ok()
786			.flatten()
787			.unwrap_or_default()
788			.into()
789			.as_u64();
790		let view_store = self.view_store.clone();
791		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
792		let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
793
794		if view_store.is_empty() {
795			return Ok(mempool_results
796				.into_iter()
797				.map(|r| r.map(|r| r.hash).map_err(Into::into))
798				.collect::<Vec<_>>());
799		}
800
801		// Submit all the transactions to the mempool
802		let retries = mempool_results
803			.into_iter()
804			.zip(xts.clone())
805			.map(|(result, xt)| async move {
806				match result {
807					Err(TxPoolApiError::ImmediatelyDropped) => {
808						self.attempt_transaction_replacement(source, at_number, false, xt).await
809					},
810					_ => result,
811				}
812			})
813			.collect::<Vec<_>>();
814
815		let mempool_results = futures::future::join_all(retries).await;
816
817		// Collect transactions that were successfully submitted to the mempool...
818		let to_be_submitted = mempool_results
819			.iter()
820			.zip(xts)
821			.filter_map(|(result, xt)| {
822				result.as_ref().ok().map(|insertion| {
823					self.events_metrics_collector.report_submitted(&insertion);
824					(insertion.source.clone(), xt)
825				})
826			})
827			.collect::<Vec<_>>();
828
829		self.metrics
830			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
831
832		// ... and submit them to the view_store. Please note that transactions rejected by mempool
833		// are not sent here.
834		let mempool = self.mempool.clone();
835		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
836		let mut submission_results = reduce_multiview_result(results_map).into_iter();
837
838		// Note for composing final result:
839		//
840		// For each failed insertion into the mempool, the mempool result should be placed into
841		// the returned vector.
842		//
843		// For each successful insertion into the mempool, the corresponding
844		// view_store submission result needs to be examined (merged_results):
845		// - If there is an error during view_store submission, the transaction is removed from
846		// the mempool, and the final result recorded in the vector for this transaction is the
847		// view_store submission error.
848		//
849		// - If the view_store submission is successful, the transaction priority is updated in the
850		// mempool.
851		//
852		// Finally, it collects the hashes of updated transactions or submission errors (either
853		// from the mempool or view_store) into a returned vector (final_results).
854		const RESULTS_ASSUMPTION : &str =
855			"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
856		let merged_results = mempool_results.into_iter().map(|result| {
857			result.map_err(Into::into).and_then(|insertion| {
858				Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
859			})
860		});
861
862		let mut final_results = vec![];
863		for r in merged_results {
864			match r {
865				Ok((hash, submission_result)) => match submission_result {
866					Ok(r) => {
867						mempool.update_transaction_priority(r.hash(), r.priority()).await;
868						final_results.push(Ok(r.hash()));
869					},
870					Err(e) => {
871						mempool.remove_transactions(&[hash]).await;
872						final_results.push(Err(e));
873					},
874				},
875				Err(e) => final_results.push(Err(e)),
876			}
877		}
878
879		Ok(final_results)
880	}
881
882	/// Number of notified items in import_notification_sink.
883	///
884	/// Internal detail, exposed only for testing.
885	pub fn import_notification_sink_len(&self) -> usize {
886		self.import_notification_sink.notified_items_len()
887	}
888}
889
890/// Converts the input view-to-statuses map into the output vector of statuses.
891///
892/// The result of importing a bunch of transactions into a single view is the vector of statuses.
893/// Every item represents a status for single transaction. The input is the map that associates
894/// hash-views with vectors indicating the statuses of transactions imports.
895///
896/// Import to multiple views result in two-dimensional array of statuses, which is provided as
897/// input map.
898///
899/// This function converts the map into the vec of results, according to the following rules:
900/// - for given transaction if at least one status is success, then output vector contains success,
901/// - if given transaction status is error for every view, then output vector contains error.
902///
903/// The results for transactions are in the same order for every view. An output vector preserves
904/// this order.
905///
906/// ```skip
907/// in:
908/// view  |   xt0 status | xt1 status | xt2 status
909/// h1   -> [ Ok(xth0),    Ok(xth1),    Err       ]
910/// h2   -> [ Ok(xth0),    Err,         Err       ]
911/// h3   -> [ Ok(xth0),    Ok(xth1),    Err       ]
912///
913/// out:
914/// [ Ok(xth0), Ok(xth1), Err ]
915/// ```
916fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
917	let mut values = input.values();
918	let Some(first) = values.next() else {
919		return Default::default();
920	};
921	let length = first.len();
922	debug_assert!(values.all(|x| length == x.len()));
923
924	input
925		.into_values()
926		.reduce(|mut agg_results, results| {
927			agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
928				if agg_r.is_err() {
929					*agg_r = r;
930				}
931			});
932			agg_results
933		})
934		.unwrap_or_default()
935}
936
937#[async_trait]
938impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
939where
940	Block: BlockT,
941	ChainApi: 'static + graph::ChainApi<Block = Block>,
942	<Block as BlockT>::Hash: Unpin,
943{
944	type Block = ChainApi::Block;
945	type Hash = ExtrinsicHash<ChainApi>;
946	type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
947	type Error = ChainApi::Error;
948
949	/// Submits multiple transactions and returns a future resolving to the submission results.
950	///
951	/// Actual transactions submission process is delegated to the `ViewStore` internal instance.
952	///
953	/// The internal limits of the pool are checked. The results of submissions to individual views
954	/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
955	async fn submit_at(
956		&self,
957		at: <Self::Block as BlockT>::Hash,
958		source: TransactionSource,
959		xts: Vec<TransactionFor<Self>>,
960	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
961		let start = Instant::now();
962		trace!(
963			target: LOG_TARGET,
964			count = xts.len(),
965			active_views_count = self.active_views_count(),
966			"fatp::submit_at"
967		);
968		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
969		let result = self.submit_at_inner(at, source, xts).await;
970		insert_and_log_throttled!(
971			Level::DEBUG,
972			target:LOG_TARGET_STAT,
973			prefix:"submit_stats",
974			self.submit_stats,
975			start.elapsed().into()
976		);
977		result
978	}
979
980	/// Submits a single transaction and returns a future resolving to the submission results.
981	///
982	/// Actual transaction submission process is delegated to the `submit_at` function.
983	async fn submit_one(
984		&self,
985		_at: <Self::Block as BlockT>::Hash,
986		source: TransactionSource,
987		xt: TransactionFor<Self>,
988	) -> Result<TxHash<Self>, Self::Error> {
989		trace!(
990			target: LOG_TARGET,
991			tx_hash = ?self.tx_hash(&xt),
992			active_views_count = self.active_views_count(),
993			"fatp::submit_one"
994		);
995		match self.submit_at(_at, source, vec![xt]).await {
996			Ok(mut v) => {
997				v.pop().expect("There is exactly one element in result of submit_at. qed.")
998			},
999			Err(e) => Err(e),
1000		}
1001	}
1002
1003	/// Submits a transaction and starts to watch its progress in the pool, returning a stream of
1004	/// status updates.
1005	///
1006	/// Actual transaction submission process is delegated to the `ViewStore` internal instance.
1007	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1008	async fn submit_and_watch(
1009		&self,
1010		at: <Self::Block as BlockT>::Hash,
1011		source: TransactionSource,
1012		xt: TransactionFor<Self>,
1013	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1014		let start = Instant::now();
1015		trace!(
1016			target: LOG_TARGET,
1017			tx_hash = ?self.tx_hash(&xt),
1018			views = self.active_views_count(),
1019			"fatp::submit_and_watch"
1020		);
1021		let result = self.submit_and_watch_inner(at, source, xt).await;
1022		insert_and_log_throttled!(
1023			Level::DEBUG,
1024			target:LOG_TARGET_STAT,
1025			prefix:"submit_and_watch_stats",
1026			self.submit_and_watch_stats,
1027			start.elapsed().into()
1028		);
1029		result
1030	}
1031
1032	/// Reports invalid transactions to the transaction pool.
1033	///
1034	/// This function takes an array of tuples, each consisting of a transaction hash and the
1035	/// corresponding error that occurred during transaction execution at given block.
1036	///
1037	/// The transaction pool implementation will determine which transactions should be
1038	/// removed from the pool. Transactions that depend on invalid transactions will also
1039	/// be removed.
1040	async fn report_invalid(
1041		&self,
1042		at: Option<<Self::Block as BlockT>::Hash>,
1043		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1044	) -> Vec<Arc<Self::InPoolTransaction>> {
1045		debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1046		log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1047		self.metrics
1048			.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1049
1050		let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1051
1052		let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1053		self.mempool.remove_transactions(&removed_hashes).await;
1054		self.import_notification_sink.clean_notified_items(&removed_hashes);
1055
1056		self.metrics
1057			.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1058
1059		removed
1060	}
1061
1062	// todo [#5491]: api change?
1063	// status(Hash) -> Option<PoolStatus>
1064	/// Returns the pool status which includes information like the number of ready and future
1065	/// transactions.
1066	///
1067	/// Currently the status for the most recently notified best block is returned (for which
1068	/// maintain process was accomplished).
1069	fn status(&self) -> PoolStatus {
1070		self.view_store
1071			.most_recent_view
1072			.read()
1073			.as_ref()
1074			.map(|v| v.status())
1075			.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1076	}
1077
1078	/// Return an event stream of notifications when transactions are imported to the pool.
1079	///
1080	/// Consumers of this stream should use the `ready` method to actually get the
1081	/// pending transactions in the right order.
1082	fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1083		self.import_notification_sink.event_stream()
1084	}
1085
1086	/// Returns the hash of a given transaction.
1087	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1088		self.api().hash_and_length(xt).0
1089	}
1090
1091	/// Notifies the pool about the broadcasting status of transactions.
1092	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1093		self.view_store.listener.transactions_broadcasted(propagations);
1094	}
1095
1096	/// Return specific ready transaction by hash, if there is one.
1097	///
1098	/// Currently the ready transaction is returned if it exists for the most recently notified best
1099	/// block (for which maintain process was accomplished).
1100	// todo [#5491]: api change: we probably should have at here?
1101	fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1102		let most_recent_view_hash =
1103			self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1104		let result = most_recent_view_hash
1105			.and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1106		trace!(
1107			target: LOG_TARGET,
1108			?tx_hash,
1109			is_ready = result.is_some(),
1110			most_recent_view = ?most_recent_view_hash,
1111			"ready_transaction"
1112		);
1113		result
1114	}
1115
1116	/// Returns an iterator for ready transactions at a specific block, ordered by priority.
1117	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1118		let (_, result) = self.ready_at_internal(at);
1119		result.await
1120	}
1121
1122	/// Returns an iterator for ready transactions, ordered by priority.
1123	///
1124	/// Currently the set of ready transactions is returned if it exists for the most recently
1125	/// notified best block (for which maintain process was accomplished).
1126	fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1127		self.view_store.ready()
1128	}
1129
1130	/// Returns a list of future transactions in the pool.
1131	///
1132	/// Currently the set of future transactions is returned if it exists for the most recently
1133	/// notified best block (for which maintain process was accomplished).
1134	fn futures(&self) -> Vec<Self::InPoolTransaction> {
1135		self.view_store.futures()
1136	}
1137
1138	/// Returns a set of ready transactions at a given block within the specified timeout.
1139	///
1140	/// If the timeout expires before the maintain process is accomplished, a best-effort
1141	/// set of transactions is returned (refer to `ready_at_light`).
1142	async fn ready_at_with_timeout(
1143		&self,
1144		at: <Self::Block as BlockT>::Hash,
1145		timeout: std::time::Duration,
1146	) -> ReadyIteratorFor<ChainApi> {
1147		self.ready_at_with_timeout_internal(at, timeout).await
1148	}
1149}
1150
1151impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1152	for ForkAwareTxPool<ChainApi, Block>
1153where
1154	Block: BlockT,
1155	ChainApi: 'static + graph::ChainApi<Block = Block>,
1156	<Block as BlockT>::Hash: Unpin,
1157{
1158	type Block = Block;
1159	type Hash = ExtrinsicHash<ChainApi>;
1160	type Error = ChainApi::Error;
1161
1162	fn submit_local(
1163		&self,
1164		at: Block::Hash,
1165		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1166	) -> Result<Self::Hash, Self::Error> {
1167		trace!(
1168			target: LOG_TARGET,
1169			active_views_count = self.active_views_count(),
1170			"fatp::submit_local"
1171		);
1172		let xt = Arc::from(xt);
1173		let at_number = self
1174			.api
1175			.block_id_to_number(&BlockId::Hash(at))
1176			.ok()
1177			.flatten()
1178			.unwrap_or_default()
1179			.into()
1180			.as_u64();
1181
1182		// note: would be nice to get rid of sync methods one day. See: #8912
1183		let result = self
1184			.mempool
1185			.clone()
1186			.extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1187			.remove(0);
1188
1189		let insertion = match result {
1190			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1191				TransactionSource::Local,
1192				false,
1193				xt.clone(),
1194			),
1195			_ => result,
1196		}?;
1197
1198		self.view_store
1199			.submit_local(xt)
1200			.inspect_err(|_| {
1201				self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1202			})
1203			.map(|outcome| {
1204				self.mempool
1205					.clone()
1206					.update_transaction_priority_sync(outcome.hash(), outcome.priority());
1207				outcome.hash()
1208			})
1209			.or_else(|_| Ok(insertion.hash))
1210	}
1211}
1212
1213impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1214where
1215	Block: BlockT,
1216	ChainApi: graph::ChainApi<Block = Block> + 'static,
1217	<Block as BlockT>::Hash: Unpin,
1218{
1219	/// Handles a new block notification.
1220	///
1221	/// It is responsible for handling a newly notified block. It executes some sanity checks, find
1222	/// the best view to clone from and executes the new view build procedure for the notified
1223	/// block.
1224	///
1225	/// If the view is correctly created, `ready_at` pollers for this block will be triggered.
1226	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1227	async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1228		let hash_and_number = match tree_route.last() {
1229			Some(hash_and_number) => hash_and_number,
1230			None => {
1231				warn!(
1232					target: LOG_TARGET,
1233					?tree_route,
1234					"Skipping ChainEvent - no last block in tree route"
1235				);
1236				return;
1237			},
1238		};
1239
1240		if self.has_view(&hash_and_number.hash) {
1241			debug!(
1242				target: LOG_TARGET,
1243				?hash_and_number,
1244				"view already exists for block"
1245			);
1246			return;
1247		}
1248
1249		let best_view = self.view_store.find_best_view(tree_route);
1250		let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1251
1252		if let Some(view) = new_view {
1253			{
1254				let view = view.clone();
1255				self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1256					Box::from(view.pool.validated_pool().ready())
1257				});
1258			}
1259
1260			View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1261		}
1262
1263		self.finality_stall_cleanup(hash_and_number).await;
1264	}
1265
1266	/// Cleans up transactions and views outdated by potential finality stalls.
1267	///
1268	/// This function removes transactions from the pool that were included in blocks but not
1269	/// finalized within a pre-defined block height threshold. Transactions not meeting finality
1270	/// within this threshold are notified with finality timed out event. The threshold is based on
1271	/// the current block number, 'at'.
1272	///
1273	/// Additionally, this method triggers the view store to handle and remove stale views caused by
1274	/// the finality stall.
1275	async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1276		let (oldest_block_number, finality_timedout_blocks) = {
1277			let mut included_transactions = self.included_transactions.lock();
1278
1279			let Some(oldest_block_number) =
1280				included_transactions.first_key_value().map(|(k, _)| k.number)
1281			else {
1282				return;
1283			};
1284
1285			if at.number.saturating_sub(oldest_block_number).into() <=
1286				self.finality_timeout_threshold.into()
1287			{
1288				return;
1289			}
1290
1291			let mut finality_timedout_blocks =
1292				indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1293
1294			included_transactions.retain(
1295				|HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1296					let diff = at.number.saturating_sub(*view_number);
1297					if diff.into() > self.finality_timeout_threshold.into() {
1298						finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1299						false
1300					} else {
1301						true
1302					}
1303				},
1304			);
1305
1306			(oldest_block_number, finality_timedout_blocks)
1307		};
1308
1309		if !finality_timedout_blocks.is_empty() {
1310			self.ready_poll.lock().remove_cancelled();
1311			self.view_store.listener.remove_stale_controllers();
1312		}
1313
1314		let finality_timedout_blocks_len = finality_timedout_blocks.len();
1315
1316		for (block_hash, tx_hashes) in finality_timedout_blocks {
1317			self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1318
1319			self.mempool.remove_transactions(&tx_hashes).await;
1320			self.import_notification_sink.clean_notified_items(&tx_hashes);
1321			self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1322		}
1323
1324		self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1325
1326		debug!(
1327			target: LOG_TARGET,
1328			?at,
1329			included_transactions_len = ?self.included_transactions.lock().len(),
1330			finality_timedout_blocks_len,
1331			?oldest_block_number,
1332			"finality_stall_cleanup"
1333		);
1334	}
1335
1336	/// Builds a new view.
1337	///
1338	/// If `origin_view` is provided, the new view will be cloned from it. Otherwise an empty view
1339	/// will be created.
1340	///
1341	/// This method will also update multi-view listeners with newly created view.
1342	///
1343	/// The new view will not be inserted into the view store.
1344	fn build_and_plug_view(
1345		&self,
1346		origin_view: Option<Arc<View<ChainApi>>>,
1347		at: &HashAndNumber<Block>,
1348		tree_route: &TreeRoute<Block>,
1349	) -> View<ChainApi> {
1350		let enter = Instant::now();
1351		let (view, view_dropped_stream, view_aggregated_stream) =
1352			if let Some(origin_view) = origin_view {
1353				let (mut view, view_dropped_stream, view_aggragated_stream) =
1354					View::new_from_other(&origin_view, at);
1355				if !tree_route.retracted().is_empty() {
1356					view.pool.clear_recently_pruned();
1357				}
1358				(view, view_dropped_stream, view_aggragated_stream)
1359			} else {
1360				debug!(
1361					target: LOG_TARGET,
1362					?at,
1363					"creating non-cloned view"
1364				);
1365				View::new(
1366					self.api.clone(),
1367					at.clone(),
1368					self.options.clone(),
1369					self.metrics.clone(),
1370					self.is_validator.clone(),
1371				)
1372			};
1373		debug!(
1374			target: LOG_TARGET,
1375			?at,
1376			duration = ?enter.elapsed(),
1377			"build_new_view::clone_view"
1378		);
1379
1380		// 1. Capture all import notification from the very beginning, so first register all
1381		// the listeners.
1382		self.import_notification_sink.add_view(
1383			view.at.hash,
1384			view.pool.validated_pool().import_notification_stream().boxed(),
1385		);
1386
1387		self.view_store
1388			.dropped_stream_controller
1389			.add_view(view.at.hash, view_dropped_stream.boxed());
1390
1391		self.view_store
1392			.listener
1393			.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1394
1395		view
1396	}
1397
1398	/// Builds and updates a new view.
1399	///
1400	/// This functio uses [`Self::build_new_view`] to create or clone new view.
1401	///
1402	/// The new view will be updated with transactions from the tree_route and the mempool, all
1403	/// required events will be triggered, it will be inserted to the view store (respecting all
1404	/// pre-insertion actions).
1405	async fn build_and_update_view(
1406		&self,
1407		origin_view: Option<Arc<View<ChainApi>>>,
1408		at: &HashAndNumber<Block>,
1409		tree_route: &TreeRoute<Block>,
1410	) -> Option<Arc<View<ChainApi>>> {
1411		let start = Instant::now();
1412		debug!(
1413			target: LOG_TARGET,
1414			?at,
1415			origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1416			?tree_route,
1417			"build_new_view"
1418		);
1419
1420		let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1421
1422		// sync the transactions statuses and referencing views in all the listeners with newly
1423		// cloned view.
1424		view.pool.validated_pool().retrigger_notifications();
1425		debug!(
1426			target: LOG_TARGET,
1427			?at,
1428			duration = ?start.elapsed(),
1429			"register_listeners"
1430		);
1431
1432		// 2. Handle transactions from the tree route. Pruning transactions from the view first
1433		// will make some space for mempool transactions in case we are at the view's limits.
1434		let start = Instant::now();
1435		self.update_view_with_fork(&view, tree_route, at.clone()).await;
1436		debug!(
1437			target: LOG_TARGET,
1438			?at,
1439			duration = ?start.elapsed(),
1440			"update_view_with_fork"
1441		);
1442
1443		// 3. Finally, submit transactions from the mempool.
1444		let start = Instant::now();
1445		self.update_view_with_mempool(&mut view).await;
1446		debug!(
1447			target: LOG_TARGET,
1448			?at,
1449			duration= ?start.elapsed(),
1450			"update_view_with_mempool"
1451		);
1452		let view = Arc::from(view);
1453		self.view_store.insert_new_view(view.clone(), tree_route).await;
1454
1455		debug!(
1456			target: LOG_TARGET,
1457			duration = ?start.elapsed(),
1458			?at,
1459			"build_new_view"
1460		);
1461		Some(view)
1462	}
1463
1464	/// Retrieves transactions hashes from a `included_transactions` cache or, if not present,
1465	/// fetches them from the blockchain API using the block's hash `at`.
1466	///
1467	/// Returns a `Vec` of transactions hashes
1468	async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1469		if let Some(txs) = self.included_transactions.lock().get(at) {
1470			return txs.clone();
1471		};
1472
1473		debug!(
1474			target: LOG_TARGET,
1475			?at,
1476			"fetch_block_transactions from api"
1477		);
1478
1479		self.api
1480			.block_body(at.hash)
1481			.await
1482			.unwrap_or_else(|error| {
1483				warn!(
1484					target: LOG_TARGET,
1485					%error,
1486					"fetch_block_transactions: error request"
1487				);
1488				None
1489			})
1490			.unwrap_or_default()
1491			.into_iter()
1492			.map(|t| self.hash_of(&t))
1493			.collect::<Vec<_>>()
1494	}
1495
1496	/// Returns the list of xts included in all block's ancestors up to recently finalized block (or
1497	/// up finality timeout threshold), including the block itself.
1498	///
1499	/// Example: for the following chain `F<-B1<-B2<-B3` xts from `B1,B2,B3` will be returned.
1500	async fn txs_included_since_finalized(
1501		&self,
1502		at: &HashAndNumber<Block>,
1503	) -> HashSet<TxHash<Self>> {
1504		let start = Instant::now();
1505		let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1506
1507		let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1508			return Default::default();
1509		};
1510
1511		let mut all_txs = HashSet::new();
1512
1513		for block in tree_route.enacted().iter() {
1514			// note: There is no point to fetch the transactions from blocks older than threshold.
1515			// All transactions included in these blocks, were already removed from pool
1516			// with FinalityTimeout event.
1517			if at.number.saturating_sub(block.number).into() <=
1518				self.finality_timeout_threshold.into()
1519			{
1520				all_txs.extend(self.fetch_block_transactions(block).await);
1521			}
1522		}
1523
1524		debug!(
1525			target: LOG_TARGET,
1526			?at,
1527			?recent_finalized_block,
1528			extrinsics_count = all_txs.len(),
1529			duration = ?start.elapsed(),
1530			"fatp::txs_included_since_finalized"
1531		);
1532		all_txs
1533	}
1534
1535	/// Updates the given view with the transactions from the internal mempol.
1536	///
1537	/// All transactions from the mempool (excluding those which are either already imported or
1538	/// already included in blocks since recently finalized block) are submitted to the
1539	/// view.
1540	///
1541	/// If there are no views, and mempool transaction is reported as invalid for the given view,
1542	/// the transaction is notified as invalid and removed from the mempool.
1543	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1544		let xts_count = self.mempool.unwatched_and_watched_count().await;
1545		debug!(
1546			target: LOG_TARGET,
1547			view_at = ?view.at,
1548			?xts_count,
1549			active_views_count = self.active_views_count(),
1550			"update_view_with_mempool"
1551		);
1552		let included_xts = self.txs_included_since_finalized(&view.at).await;
1553
1554		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1555			.mempool
1556			.with_transactions(|iter| {
1557				iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1558					.map(|(k, v)| (*k, v.clone()))
1559					// todo [#8835]: better approach is needed - maybe time-budget approach?
1560					.take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1561					.collect::<HashMap<_, _>>()
1562			})
1563			.await
1564			.into_iter()
1565			.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1566			.unzip();
1567
1568		let results = view
1569			.submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1570			.await
1571			.into_iter()
1572			.zip(hashes)
1573			.map(|(result, tx_hash)| async move {
1574				if let Ok(outcome) = result {
1575					Ok(self
1576						.mempool
1577						.update_transaction_priority(outcome.hash(), outcome.priority())
1578						.await)
1579				} else {
1580					Err(tx_hash)
1581				}
1582			})
1583			.collect::<Vec<_>>();
1584
1585		let results = futures::future::join_all(results).await;
1586
1587		let submitted_count = results.len();
1588
1589		debug!(
1590			target: LOG_TARGET,
1591			view_at_hash = ?view.at.hash,
1592			submitted_count,
1593			mempool_len = self.mempool.len(),
1594			"update_view_with_mempool"
1595		);
1596
1597		self.metrics
1598			.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1599
1600		// if there are no views yet, and a single newly created view is reporting error, just send
1601		// out the invalid event, and remove transaction.
1602		if self.view_store.is_empty() {
1603			for result in results {
1604				if let Err(tx_hash) = result {
1605					self.view_store.listener.transactions_invalidated(&[tx_hash]);
1606					self.mempool.remove_transactions(&[tx_hash]).await;
1607				}
1608			}
1609		}
1610	}
1611
1612	/// Attempts to search the view store for the `provides` tags of enacted
1613	/// transactions associated with the specified `tree_route`.
1614	///
1615	/// The 'provides' tags of transactions from enacted blocks are searched
1616	/// in inactive views. Found `provide` tags are intended to serve as cache,
1617	/// helping to avoid unnecessary revalidations during pruning.
1618	async fn collect_provides_tags_from_view_store(
1619		&self,
1620		tree_route: &TreeRoute<Block>,
1621		xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1622	) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1623		let blocks_hashes = tree_route
1624			.retracted()
1625			.iter()
1626			// Skip the tip of the retracted fork, since it has an active view.
1627			.skip(1)
1628			// Skip also the tip of the enacted fork, since it has an active view too.
1629			.chain(
1630				std::iter::once(tree_route.common_block())
1631					.chain(tree_route.enacted().iter().rev().skip(1)),
1632			)
1633			.collect::<Vec<&HashAndNumber<Block>>>();
1634
1635		self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1636	}
1637
1638	/// Build a map from blocks to their extrinsics.
1639	pub async fn collect_extrinsics(
1640		&self,
1641		blocks: &[HashAndNumber<Block>],
1642	) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1643		future::join_all(blocks.iter().map(|hn| async move {
1644			(
1645				hn.hash,
1646				self.api
1647					.block_body(hn.hash)
1648					.await
1649					.unwrap_or_else(|e| {
1650						warn!(target: LOG_TARGET, %e, ": block_body error request");
1651						None
1652					})
1653					.unwrap_or_default(),
1654			)
1655		}))
1656		.await
1657		.into_iter()
1658		.collect()
1659	}
1660
1661	/// Updates the view with the transactions from the given tree route.
1662	///
1663	/// Transactions from the retracted blocks are resubmitted to the given view. Tags for
1664	/// transactions included in blocks on enacted fork are pruned from the provided view.
1665	async fn update_view_with_fork(
1666		&self,
1667		view: &View<ChainApi>,
1668		tree_route: &TreeRoute<Block>,
1669		hash_and_number: HashAndNumber<Block>,
1670	) {
1671		debug!(
1672			target: LOG_TARGET,
1673			?tree_route,
1674			at = ?view.at,
1675			"update_view_with_fork"
1676		);
1677		let api = self.api.clone();
1678
1679		// Collect extrinsics on the enacted path in a map from block hn -> extrinsics.
1680		let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1681
1682		// Create a map from enacted blocks' extrinsics to their `provides`
1683		// tags based on inactive views.
1684		let known_provides_tags = Arc::new(
1685			self.collect_provides_tags_from_view_store(
1686				tree_route,
1687				extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1688			)
1689			.await,
1690		);
1691
1692		debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1693
1694		// We keep track of everything we prune so that later we won't add
1695		// transactions with those hashes from the retracted blocks.
1696		let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1697		future::join_all(tree_route.enacted().iter().map(|hn| {
1698			let api = api.clone();
1699			let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1700			let known_provides_tags = known_provides_tags.clone();
1701			async move {
1702				(
1703					hn,
1704					crate::prune_known_txs_for_block(
1705						hn,
1706						&*api,
1707						&view.pool,
1708						Some(xts),
1709						Some(known_provides_tags),
1710					)
1711					.await,
1712				)
1713			}
1714		}))
1715		.await
1716		.into_iter()
1717		.for_each(|(key, enacted_log)| {
1718			pruned_log.extend(enacted_log.clone());
1719			self.included_transactions.lock().insert(key.clone(), enacted_log);
1720		});
1721
1722		let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1723		self.metrics
1724			.report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1725
1726		// resubmit
1727		{
1728			let mut resubmit_transactions = Vec::new();
1729
1730			for retracted in tree_route.retracted() {
1731				let hash = retracted.hash;
1732
1733				let block_transactions = api
1734					.block_body(hash)
1735					.await
1736					.unwrap_or_else(|error| {
1737						warn!(
1738							target: LOG_TARGET,
1739							%error,
1740							"Failed to fetch block body"
1741						);
1742						None
1743					})
1744					.unwrap_or_default()
1745					.into_iter();
1746
1747				let mut resubmitted_to_report = 0;
1748
1749				let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1750					|(tx_hash, _)| {
1751						let contains = pruned_log.contains(&tx_hash);
1752
1753						// need to count all transactions, not just filtered, here
1754						resubmitted_to_report += 1;
1755
1756						if !contains {
1757							trace!(
1758								target: LOG_TARGET,
1759								?tx_hash,
1760								?hash,
1761								"Resubmitting from retracted block"
1762							);
1763						}
1764						!contains
1765					},
1766				);
1767				let mut result = vec![];
1768				for (tx_hash, tx) in txs {
1769					result.push(
1770						// find arc if tx is known
1771						self.mempool
1772							.get_by_hash(tx_hash)
1773							.await
1774							.map(|tx| (tx.source(), tx.tx()))
1775							.unwrap_or_else(|| {
1776								// These transactions are coming from retracted blocks, we
1777								// should simply consider them external.
1778								(TimedTransactionSource::new_external(true), Arc::from(tx))
1779							}),
1780					);
1781				}
1782				resubmit_transactions.extend(result);
1783
1784				self.metrics.report(|metrics| {
1785					metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1786				});
1787			}
1788
1789			let _ = view
1790				.pool
1791				.resubmit_at(
1792					&hash_and_number,
1793					resubmit_transactions,
1794					ValidateTransactionPriority::Maintained,
1795				)
1796				.await;
1797		}
1798	}
1799
1800	/// Executes the maintainance for the finalized event.
1801	///
1802	/// Performs a house-keeping required for finalized event. This includes:
1803	/// - executing the on finalized procedure for the view store,
1804	/// - purging finalized transactions from the mempool and triggering mempool revalidation,
1805	async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1806		let start = Instant::now();
1807		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1808		debug!(
1809			target: LOG_TARGET,
1810			?finalized_number,
1811			?tree_route,
1812			active_views_count = self.active_views_count(),
1813			"handle_finalized"
1814		);
1815		let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1816
1817		self.mempool.purge_finalized_transactions(&finalized_xts).await;
1818		self.import_notification_sink.clean_notified_items(&finalized_xts);
1819
1820		self.metrics
1821			.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1822
1823		if let Ok(Some(finalized_number)) = finalized_number {
1824			self.included_transactions
1825				.lock()
1826				.retain(|cached_block, _| finalized_number < cached_block.number);
1827			self.revalidation_queue
1828				.revalidate_mempool(
1829					self.mempool.clone(),
1830					self.view_store.clone(),
1831					HashAndNumber { hash: finalized_hash, number: finalized_number },
1832				)
1833				.await;
1834		} else {
1835			debug!(
1836				target: LOG_TARGET,
1837				?finalized_number,
1838				"handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1839			);
1840		}
1841
1842		self.ready_poll.lock().remove_cancelled();
1843
1844		debug!(
1845			target: LOG_TARGET,
1846			active_views_count = self.active_views_count(),
1847			included_transactions_len = ?self.included_transactions.lock().len(),
1848			duration = ?start.elapsed(),
1849			"handle_finalized after"
1850		);
1851	}
1852
1853	/// Computes a hash of the provided transaction
1854	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1855		self.api.hash_and_length(xt).0
1856	}
1857
1858	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
1859	/// one.
1860	///
1861	/// This asynchronous function verifies the new transaction against the most recent view. If a
1862	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
1863	/// new transaction.
1864	///
1865	/// If no lower-priority transaction is found, the function returns an error indicating the
1866	/// transaction was dropped immediately.
1867	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1868	async fn attempt_transaction_replacement(
1869		&self,
1870		source: TransactionSource,
1871		at_number: u64,
1872		watched: bool,
1873		xt: ExtrinsicFor<ChainApi>,
1874	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1875		let best_view = self
1876			.view_store
1877			.most_recent_view
1878			.read()
1879			.as_ref()
1880			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1881			.clone();
1882
1883		let (xt_hash, validated_tx) = best_view
1884			.pool
1885			.verify_one(
1886				best_view.at.hash,
1887				best_view.at.number,
1888				TimedTransactionSource::from_transaction_source(source, false),
1889				xt.clone(),
1890				crate::graph::CheckBannedBeforeVerify::Yes,
1891				ValidateTransactionPriority::Submitted,
1892			)
1893			.await;
1894
1895		let Some(priority) = validated_tx.priority() else {
1896			return Err(TxPoolApiError::ImmediatelyDropped);
1897		};
1898
1899		let insertion_info = self
1900			.mempool
1901			.try_insert_with_replacement(xt, priority, source, at_number, watched)
1902			.await?;
1903		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1904	}
1905
1906	/// Sync version of [`Self::attempt_transaction_replacement`].
1907	fn attempt_transaction_replacement_sync(
1908		&self,
1909		source: TransactionSource,
1910		watched: bool,
1911		xt: ExtrinsicFor<ChainApi>,
1912	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1913		let HashAndNumber { number: at_number, hash: at_hash } = self
1914			.view_store
1915			.most_recent_view
1916			.read()
1917			.as_ref()
1918			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1919			.at;
1920
1921		let ValidTransaction { priority, .. } = self
1922			.api
1923			.validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1924			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1925			.map_err(|e| match e {
1926				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1927				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1928			})?;
1929		let xt_hash = self.hash_of(&xt);
1930
1931		let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1932			xt,
1933			priority,
1934			source,
1935			at_number.into().as_u64(),
1936			watched,
1937		)?;
1938		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1939	}
1940
1941	fn post_attempt_transaction_replacement(
1942		&self,
1943		tx_hash: ExtrinsicHash<ChainApi>,
1944		insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1945	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1946		for worst_hash in &insertion_info.removed {
1947			trace!(
1948				target: LOG_TARGET,
1949				tx_hash = ?worst_hash,
1950				new_tx_hash = ?tx_hash,
1951				"removed: replaced by"
1952			);
1953			self.view_store
1954				.listener
1955				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1956
1957			self.view_store
1958				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1959					listener.limits_enforced(&removed_tx_hash);
1960				});
1961		}
1962
1963		return Ok(insertion_info);
1964	}
1965}
1966
1967#[async_trait]
1968impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1969where
1970	Block: BlockT,
1971	ChainApi: 'static + graph::ChainApi<Block = Block>,
1972	<Block as BlockT>::Hash: Unpin,
1973{
1974	/// Executes the maintainance for the given chain event.
1975	async fn maintain(&self, event: ChainEvent<Self::Block>) {
1976		let start = Instant::now();
1977		debug!(
1978			target: LOG_TARGET,
1979			?event,
1980			"processing event"
1981		);
1982
1983		self.view_store.finish_background_revalidations().await;
1984
1985		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1986
1987		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1988			match self.api.tree_route(from, to) {
1989				Ok(tree_route) => Ok(tree_route),
1990				Err(e) => {
1991					return Err(format!(
1992						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1993					))
1994				},
1995			}
1996		};
1997		let block_id_to_number =
1998			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1999
2000		let result =
2001			self.enactment_state
2002				.lock()
2003				.update(&event, &compute_tree_route, &block_id_to_number);
2004
2005		match result {
2006			Err(error) => {
2007				debug!(
2008					target: LOG_TARGET,
2009					%error,
2010					"enactment_state::update error"
2011				);
2012				self.enactment_state.lock().force_update(&event);
2013			},
2014			Ok(EnactmentAction::Skip) => return,
2015			Ok(EnactmentAction::HandleFinalization) => {
2016				// todo [#5492]: in some cases handle_new_block is actually needed (new_num >
2017				// tips_of_forks) let hash = event.hash();
2018				// if !self.has_view(hash) {
2019				// 	if let Ok(tree_route) = compute_tree_route(prev_finalized_block, hash) {
2020				// 		self.handle_new_block(&tree_route).await;
2021				// 	}
2022				// }
2023			},
2024			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2025				self.handle_new_block(&tree_route).await;
2026			},
2027		};
2028
2029		match event {
2030			ChainEvent::NewBestBlock { .. } => {},
2031			ChainEvent::Finalized { hash, ref tree_route } => {
2032				self.handle_finalized(hash, tree_route).await;
2033
2034				debug!(
2035					target: LOG_TARGET,
2036					?tree_route,
2037					?prev_finalized_block,
2038					"on-finalized enacted"
2039				);
2040			},
2041		}
2042
2043		let duration = start.elapsed();
2044		let mempool_len = self.mempool_len().await;
2045		debug!(
2046			target: LOG_TARGET,
2047			txs = ?mempool_len,
2048			a = self.active_views_count(),
2049			i = self.inactive_views_count(),
2050			views = ?self.views_stats(),
2051			?event,
2052			?duration,
2053			"maintain"
2054		);
2055
2056		self.metrics.report(|metrics| {
2057			let (unwatched, watched) = mempool_len;
2058			let _ = (
2059				self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2060				self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2061				watched.try_into().map(|v| metrics.watched_txs.set(v)),
2062				unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2063			);
2064			metrics.maintain_duration.observe(duration.as_secs_f64());
2065		});
2066	}
2067}
2068
2069impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2070where
2071	Block: BlockT,
2072	Client: sp_api::ProvideRuntimeApi<Block>
2073		+ sc_client_api::BlockBackend<Block>
2074		+ sc_client_api::blockchain::HeaderBackend<Block>
2075		+ sp_runtime::traits::BlockIdTo<Block>
2076		+ sc_client_api::ExecutorProvider<Block>
2077		+ sc_client_api::UsageProvider<Block>
2078		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2079		+ Send
2080		+ Sync
2081		+ 'static,
2082	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2083	<Block as BlockT>::Hash: std::marker::Unpin,
2084{
2085	/// Create new fork aware transaction pool for a full node with the provided api.
2086	pub fn new_full(
2087		options: Options,
2088		is_validator: IsValidator,
2089		prometheus: Option<&PrometheusRegistry>,
2090		spawner: impl SpawnEssentialNamed,
2091		client: Arc<Client>,
2092	) -> Self {
2093		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2094		let pool = Self::new_with_background_worker(
2095			options,
2096			is_validator,
2097			pool_api,
2098			prometheus,
2099			spawner,
2100			client.usage_info().chain.best_hash,
2101			client.usage_info().chain.finalized_hash,
2102		);
2103
2104		pool
2105	}
2106}
2107
2108#[cfg(test)]
2109mod reduce_multiview_result_tests {
2110	use super::*;
2111	use sp_core::H256;
2112	#[derive(Debug, PartialEq, Clone)]
2113	enum Error {
2114		Custom(u8),
2115	}
2116
2117	#[test]
2118	fn empty() {
2119		sp_tracing::try_init_simple();
2120		let input = HashMap::default();
2121		let r = reduce_multiview_result::<H256, H256, Error>(input);
2122		assert!(r.is_empty());
2123	}
2124
2125	#[test]
2126	fn errors_only() {
2127		sp_tracing::try_init_simple();
2128		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2129			(
2130				H256::repeat_byte(0x13),
2131				vec![
2132					Err(Error::Custom(10)),
2133					Err(Error::Custom(11)),
2134					Err(Error::Custom(12)),
2135					Err(Error::Custom(13)),
2136				],
2137			),
2138			(
2139				H256::repeat_byte(0x14),
2140				vec![
2141					Err(Error::Custom(20)),
2142					Err(Error::Custom(21)),
2143					Err(Error::Custom(22)),
2144					Err(Error::Custom(23)),
2145				],
2146			),
2147			(
2148				H256::repeat_byte(0x15),
2149				vec![
2150					Err(Error::Custom(30)),
2151					Err(Error::Custom(31)),
2152					Err(Error::Custom(32)),
2153					Err(Error::Custom(33)),
2154				],
2155			),
2156		];
2157		let input = HashMap::from_iter(v.clone());
2158		let r = reduce_multiview_result(input);
2159
2160		// order in HashMap is random, the result shall be one of:
2161		assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2162	}
2163
2164	#[test]
2165	#[should_panic]
2166	#[cfg(debug_assertions)]
2167	fn invalid_lengths() {
2168		sp_tracing::try_init_simple();
2169		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2170			(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2171			(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2172		];
2173		let input = HashMap::from_iter(v);
2174		let _ = reduce_multiview_result(input);
2175	}
2176
2177	#[test]
2178	fn only_hashes() {
2179		sp_tracing::try_init_simple();
2180
2181		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2182			(
2183				H256::repeat_byte(0x13),
2184				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2185			),
2186			(
2187				H256::repeat_byte(0x14),
2188				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2189			),
2190		];
2191		let input = HashMap::from_iter(v);
2192		let r = reduce_multiview_result(input);
2193
2194		assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2195	}
2196
2197	#[test]
2198	fn one_view() {
2199		sp_tracing::try_init_simple();
2200		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2201			H256::repeat_byte(0x13),
2202			vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2203		)];
2204		let input = HashMap::from_iter(v);
2205		let r = reduce_multiview_result(input);
2206
2207		assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2208	}
2209
2210	#[test]
2211	fn mix() {
2212		sp_tracing::try_init_simple();
2213		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2214			(
2215				H256::repeat_byte(0x13),
2216				vec![
2217					Ok(H256::repeat_byte(0x10)),
2218					Err(Error::Custom(11)),
2219					Err(Error::Custom(12)),
2220					Err(Error::Custom(33)),
2221				],
2222			),
2223			(
2224				H256::repeat_byte(0x14),
2225				vec![
2226					Err(Error::Custom(20)),
2227					Ok(H256::repeat_byte(0x21)),
2228					Err(Error::Custom(22)),
2229					Err(Error::Custom(33)),
2230				],
2231			),
2232			(
2233				H256::repeat_byte(0x15),
2234				vec![
2235					Err(Error::Custom(30)),
2236					Err(Error::Custom(31)),
2237					Ok(H256::repeat_byte(0x32)),
2238					Err(Error::Custom(33)),
2239				],
2240			),
2241		];
2242		let input = HashMap::from_iter(v);
2243		let r = reduce_multiview_result(input);
2244
2245		assert_eq!(
2246			r,
2247			vec![
2248				Ok(H256::repeat_byte(0x10)),
2249				Ok(H256::repeat_byte(0x21)),
2250				Ok(H256::repeat_byte(0x32)),
2251				Err(Error::Custom(33))
2252			]
2253		);
2254	}
2255}