referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
fork_aware_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate fork-aware transaction pool implementation.
20
21use super::{
22	dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23	import_notification_sink::MultiViewImportNotificationSink,
24	metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25	multi_view_listener::MultiViewListener,
26	tx_mem_pool::{InsertionInfo, TxMemPool},
27	view::View,
28	view_store::ViewStore,
29};
30use crate::{
31	api::FullChainApi,
32	common::{
33		sliding_stat::DurationSlidingStats,
34		tracing_log_xt::{log_xt_debug, log_xt_trace},
35		STAT_SLIDING_WINDOW,
36	},
37	enactment_state::{EnactmentAction, EnactmentState},
38	fork_aware_txpool::{
39		dropped_watcher::{DroppedReason, DroppedTransaction},
40		revalidation_worker,
41	},
42	graph::{
43		self,
44		base_pool::{TimedTransactionSource, Transaction},
45		BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46	},
47	insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48	LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52	channel::oneshot,
53	future::{self},
54	prelude::*,
55	FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62	TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67	generic::BlockId,
68	traits::{Block as BlockT, NumberFor},
69	transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70	Saturating,
71};
72use std::{
73	collections::{BTreeMap, HashMap, HashSet},
74	pin::Pin,
75	sync::Arc,
76	time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, info, instrument, trace, warn, Level};
80
81/// The maximum block height difference before considering a view or transaction as timed-out
82/// due to a finality stall. When the difference exceeds this threshold, elements are treated
83/// as stale and are subject to cleanup.
84const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86/// The number of transactions that will be sent from the mempool to the newly created view during
87/// the maintain process.
88//todo [#8835]: better approach is needed - maybe time-budget approach?
89//note: yap parachain block size.
90const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92/// Fork aware transaction pool task, that needs to be polled.
93pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95/// A structure that maintains a collection of pollers associated with specific block hashes
96/// (views).
97struct ReadyPoll<T, Block>
98where
99	Block: BlockT,
100{
101	pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106	Block: BlockT,
107{
108	/// Creates a new `ReadyPoll` instance with an empty collection of pollers.
109	fn new() -> Self {
110		Self { pollers: Default::default() }
111	}
112
113	/// Adds a new poller for a specific block hash and returns the `Receiver` end of the created
114	/// oneshot channel which will be used to deliver polled result.
115	fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116		let (s, r) = oneshot::channel();
117		self.pollers.entry(at).or_default().push(s);
118		r
119	}
120
121	/// Triggers all pollers associated with a specific block by sending the polled result through
122	/// each oneshot channel.
123	///
124	/// `ready_iterator` is a closure that generates the result data to be sent to the pollers.
125	fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126		debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127		let Some(pollers) = self.pollers.remove(&at) else { return };
128		pollers.into_iter().for_each(|p| {
129			debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130			let _ = p.send(ready_iterator());
131		});
132	}
133
134	/// Removes pollers that have their oneshot channels cancelled.
135	fn remove_cancelled(&mut self) {
136		self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137	}
138}
139
140/// The fork-aware transaction pool.
141///
142/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
143pub struct ForkAwareTxPool<ChainApi, Block>
144where
145	Block: BlockT,
146	ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148	/// The reference to the `ChainApi` provided by client/backend.
149	api: Arc<ChainApi>,
150
151	/// Intermediate buffer for the incoming transaction.
152	mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154	/// The store for all the views.
155	view_store: Arc<ViewStore<ChainApi, Block>>,
156
157	/// Utility for managing pollers of `ready_at` future.
158	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160	/// Prometheus's metrics endpoint.
161	metrics: PrometheusMetrics,
162
163	/// Collector of transaction statuses updates, reports transaction events metrics.
164	events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166	/// Util tracking best and finalized block.
167	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169	/// The channel allowing to send revalidation jobs to the background thread.
170	revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172	/// Util providing an aggregated stream of transactions that were imported to ready queue in
173	/// any view.
174	import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176	/// Externally provided pool options.
177	options: Options,
178
179	/// Is node the validator.
180	is_validator: IsValidator,
181
182	/// Finality timeout threshold.
183	///
184	/// Sets the maximum permissible block height difference between the latest block
185	/// and the oldest transactions or views in the pool. Beyond this difference,
186	/// transactions/views are considered timed out and eligible for cleanup.
187	finality_timeout_threshold: usize,
188
189	/// Transactions included in blocks since the most recently finalized block (including this
190	/// block).
191	///
192	/// Holds a mapping of block hash and number to their corresponding transaction hashes.
193	///
194	/// Intended to be used in the finality stall cleanups and also as a cache for all in-block
195	/// transactions.
196	included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198	/// Stats for submit call durations
199	submit_stats: DurationSlidingStats,
200
201	/// Stats for submit_and_watch call durations
202	submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207	Block: BlockT,
208	ChainApi: graph::ChainApi<Block = Block> + 'static,
209	<Block as BlockT>::Hash: Unpin,
210{
211	// Injects a view for the given block to self.
212	//
213	// Helper for the pool new methods.
214	fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215		if let Some(block_number) =
216			self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217		{
218			let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219			let tree_route =
220				&TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221			let view = self.build_and_plug_view(None, &at_best, &tree_route);
222			self.view_store.insert_new_view_sync(view.into(), &tree_route);
223			trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224		};
225		self
226	}
227
228	/// Create new fork aware transaction pool with provided shared instance of `ChainApi` intended
229	/// for tests.
230	pub fn new_test(
231		pool_api: Arc<ChainApi>,
232		best_block_hash: Block::Hash,
233		finalized_hash: Block::Hash,
234		finality_timeout_threshold: Option<usize>,
235	) -> (Self, [ForkAwareTxPoolTask; 2]) {
236		Self::new_test_with_limits(
237			pool_api,
238			best_block_hash,
239			finalized_hash,
240			Options::default().ready,
241			Options::default().future,
242			usize::MAX,
243			finality_timeout_threshold,
244		)
245	}
246
247	/// Create new fork aware transaction pool with given limits and with provided shared instance
248	/// of `ChainApi` intended for tests.
249	pub fn new_test_with_limits(
250		pool_api: Arc<ChainApi>,
251		best_block_hash: Block::Hash,
252		finalized_hash: Block::Hash,
253		ready_limits: crate::PoolLimit,
254		future_limits: crate::PoolLimit,
255		mempool_max_transactions_count: usize,
256		finality_timeout_threshold: Option<usize>,
257	) -> (Self, [ForkAwareTxPoolTask; 2]) {
258		let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259		let listener = Arc::new(listener);
260
261		let (import_notification_sink, import_notification_sink_task) =
262			MultiViewImportNotificationSink::new_with_worker();
263
264		let (mempool, mempool_task) = TxMemPool::new(
265			pool_api.clone(),
266			listener.clone(),
267			Default::default(),
268			mempool_max_transactions_count,
269			ready_limits.total_bytes + future_limits.total_bytes,
270		);
271		let mempool = Arc::from(mempool);
272
273		let (dropped_stream_controller, dropped_stream) =
274			MultiViewDroppedWatcherController::<ChainApi>::new();
275
276		let view_store = Arc::new(ViewStore::new(
277			pool_api.clone(),
278			listener,
279			dropped_stream_controller,
280			import_notification_sink.clone(),
281		));
282
283		let dropped_monitor_task = Self::dropped_monitor_task(
284			dropped_stream,
285			mempool.clone(),
286			view_store.clone(),
287			import_notification_sink.clone(),
288		);
289
290		let combined_tasks = async move {
291			tokio::select! {
292				_ = listener_task => {},
293				_ = import_notification_sink_task => {},
294				_ = dropped_monitor_task => {}
295			}
296		}
297		.boxed();
298
299		let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301		(
302			Self {
303				mempool,
304				api: pool_api,
305				view_store,
306				ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308					best_block_hash,
309					finalized_hash,
310				))),
311				revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312				import_notification_sink,
313				options,
314				is_validator: false.into(),
315				metrics: Default::default(),
316				events_metrics_collector: EventsMetricsCollector::default(),
317				finality_timeout_threshold: finality_timeout_threshold
318					.unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319				included_transactions: Default::default(),
320				submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321				submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322					STAT_SLIDING_WINDOW,
323				)),
324			}
325			.inject_initial_view(best_block_hash),
326			[combined_tasks, mempool_task],
327		)
328	}
329
330	/// Monitors the stream of dropped transactions and removes them from the mempool and
331	/// view_store.
332	///
333	/// This asynchronous task continuously listens for dropped transaction notifications provided
334	/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
335	/// and `import_notification_sink` instances. For Usurped events, the transaction is also
336	/// removed from the view_store.
337	async fn dropped_monitor_task(
338		mut dropped_stream: StreamOfDropped<ChainApi>,
339		mempool: Arc<TxMemPool<ChainApi, Block>>,
340		view_store: Arc<ViewStore<ChainApi, Block>>,
341		import_notification_sink: MultiViewImportNotificationSink<
342			Block::Hash,
343			ExtrinsicHash<ChainApi>,
344		>,
345	) {
346		let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347		loop {
348			let Some(dropped) = dropped_stream.next().await else {
349				debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350				break;
351			};
352			let start = Instant::now();
353			let tx_hash = dropped.tx_hash;
354			trace!(
355				target: LOG_TARGET,
356				?tx_hash,
357				reason = ?dropped.reason,
358				"fatp::dropped notification, removing"
359			);
360			match dropped.reason {
361				DroppedReason::Usurped(new_tx_hash) => {
362					if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363						view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364					} else {
365						trace!(
366							target: LOG_TARGET,
367							tx_hash = ?new_tx_hash,
368							"error: dropped_monitor_task: no entry in mempool for new transaction"
369						);
370					};
371				},
372				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374				},
375			};
376
377			mempool.remove_transactions(&[tx_hash]).await;
378			import_notification_sink.clean_notified_items(&[tx_hash]);
379			view_store.listener.transaction_dropped(dropped);
380			insert_and_log_throttled!(
381				Level::DEBUG,
382				target:LOG_TARGET_STAT,
383				prefix:"dropped_stats",
384				dropped_stats,
385				start.elapsed().into()
386			);
387		}
388	}
389
390	/// Creates new fork aware transaction pool with the background revalidation worker.
391	///
392	/// The txpool essential tasks (including a revalidation worker) are spawned using provided
393	/// spawner.
394	pub fn new_with_background_worker(
395		options: Options,
396		is_validator: IsValidator,
397		pool_api: Arc<ChainApi>,
398		prometheus: Option<&PrometheusRegistry>,
399		spawner: impl SpawnEssentialNamed,
400		best_block_hash: Block::Hash,
401		finalized_hash: Block::Hash,
402	) -> Self {
403		let metrics = PrometheusMetrics::new(prometheus);
404		let (events_metrics_collector, event_metrics_task) =
405			EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
406
407		let (listener, listener_task) =
408			MultiViewListener::new_with_worker(events_metrics_collector.clone());
409		let listener = Arc::new(listener);
410
411		let (revalidation_queue, revalidation_task) =
412			revalidation_worker::RevalidationQueue::new_with_worker();
413
414		let (import_notification_sink, import_notification_sink_task) =
415			MultiViewImportNotificationSink::new_with_worker();
416
417		let (mempool, blocking_mempool_task) = TxMemPool::new(
418			pool_api.clone(),
419			listener.clone(),
420			metrics.clone(),
421			options.total_count(),
422			options.ready.total_bytes + options.future.total_bytes,
423		);
424		let mempool = Arc::from(mempool);
425
426		let (dropped_stream_controller, dropped_stream) =
427			MultiViewDroppedWatcherController::<ChainApi>::new();
428
429		let view_store = Arc::new(ViewStore::new(
430			pool_api.clone(),
431			listener,
432			dropped_stream_controller,
433			import_notification_sink.clone(),
434		));
435
436		let dropped_monitor_task = Self::dropped_monitor_task(
437			dropped_stream,
438			mempool.clone(),
439			view_store.clone(),
440			import_notification_sink.clone(),
441		);
442
443		let combined_tasks = async move {
444			tokio::select! {
445				_ = listener_task => {}
446				_ = revalidation_task => {},
447				_ = import_notification_sink_task => {},
448				_ = dropped_monitor_task => {}
449				_ = event_metrics_task => {},
450			}
451		}
452		.boxed();
453		spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
454		spawner.spawn_essential_blocking(
455			"txpool-background",
456			Some("transaction-pool"),
457			blocking_mempool_task,
458		);
459
460		Self {
461			mempool,
462			api: pool_api,
463			view_store,
464			ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
465			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
466				best_block_hash,
467				finalized_hash,
468			))),
469			revalidation_queue: Arc::from(revalidation_queue),
470			import_notification_sink,
471			options,
472			metrics,
473			events_metrics_collector,
474			is_validator,
475			finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
476			included_transactions: Default::default(),
477			submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
478			submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
479				STAT_SLIDING_WINDOW,
480			)),
481		}
482		.inject_initial_view(best_block_hash)
483	}
484
485	/// Get access to the underlying api
486	pub fn api(&self) -> &ChainApi {
487		&self.api
488	}
489
490	/// Provides a status for all views at the tips of the forks.
491	pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
492		self.view_store.status()
493	}
494
495	/// Provides a number of views at the tips of the forks.
496	pub fn active_views_count(&self) -> usize {
497		self.view_store.active_views.read().len()
498	}
499
500	/// Provides a number of views at the tips of the forks.
501	pub fn inactive_views_count(&self) -> usize {
502		self.view_store.inactive_views.read().len()
503	}
504
505	/// Provides internal views statistics.
506	///
507	/// Provides block number, count of ready, count of future transactions for every view. It is
508	/// suitable for printing log information.
509	fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
510		self.view_store
511			.active_views
512			.read()
513			.iter()
514			.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
515			.collect()
516	}
517
518	/// Checks if there is a view at the tip of the fork with given hash.
519	pub fn has_view(&self, hash: &Block::Hash) -> bool {
520		self.view_store.active_views.read().contains_key(hash)
521	}
522
523	/// Returns a number of unwatched and watched transactions in internal mempool.
524	///
525	/// Intended for use in unit tests.
526	pub async fn mempool_len(&self) -> (usize, usize) {
527		self.mempool.unwatched_and_watched_count().await
528	}
529
530	/// Returns a set of future transactions for given block hash.
531	///
532	/// Intended for logging / tests.
533	pub fn futures_at(
534		&self,
535		at: Block::Hash,
536	) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
537		self.view_store.futures_at(at)
538	}
539
540	/// Returns a best-effort set of ready transactions for a given block, without executing full
541	/// maintain process.
542	///
543	/// The method attempts to build a temporary view and create an iterator of ready transactions
544	/// for a specific `at` hash. If a valid view is found, it collects and prunes
545	/// transactions already included in the blocks and returns the valid set. Not finding a view
546	/// returns with the ready transaction set found in the most recent view processed by the
547	/// fork-aware txpool. Not being able to query for block number for the provided `at` block hash
548	/// results in returning an empty transaction set.
549	///
550	/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
551	/// so this process shall be fast.
552	pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
553		let start = Instant::now();
554		let api = self.api.clone();
555		debug!(
556			target: LOG_TARGET,
557			?at,
558			"fatp::ready_at_light"
559		);
560
561		let at_number = self.api.resolve_block_number(at).ok();
562		let finalized_number = self
563			.api
564			.resolve_block_number(self.enactment_state.lock().recent_finalized_block())
565			.ok();
566
567		// Prune all txs from the best view found, considering the extrinsics part of the blocks
568		// that are more recent than the view itself.
569		if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
570			let at_hn = HashAndNumber { hash: at, number: at_number };
571			finalized_number.and_then(|finalized_number| {
572				self.view_store
573					.find_view_descendent_up_to_number(&at_hn, finalized_number)
574					.map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
575			})
576		}) {
577			let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
578			let mut all_extrinsics = vec![];
579			for h in enacted_blocks {
580				let extrinsics = api
581					.block_body(h)
582					.await
583					.unwrap_or_else(|error| {
584						warn!(
585							target: LOG_TARGET,
586							%error,
587							"Compute ready light transactions: error request"
588						);
589						None
590					})
591					.unwrap_or_default()
592					.into_iter()
593					.map(|t| api.hash_and_length(&t).0);
594				all_extrinsics.extend(extrinsics);
595			}
596
597			let before_count = tmp_view.pool.validated_pool().status().ready;
598			let tags = tmp_view
599				.pool
600				.validated_pool()
601				.extrinsics_tags(&all_extrinsics)
602				.into_iter()
603				.flatten()
604				.flatten()
605				.collect::<Vec<_>>();
606			let _ = tmp_view.pool.validated_pool().prune_tags(tags);
607
608			let after_count = tmp_view.pool.validated_pool().status().ready;
609			debug!(
610				target: LOG_TARGET,
611				?at,
612				best_view_hash = ?view.at.hash,
613				before_count,
614				to_be_removed = all_extrinsics.len(),
615				after_count,
616				duration = ?start.elapsed(),
617				"fatp::ready_at_light -> light"
618			);
619			Box::new(tmp_view.pool.validated_pool().ready())
620		} else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
621			// Fallback for the case when `at` is not on the already known fork.
622			// Falls back to the most recent view, which may include txs which
623			// are invalid or already included in the blocks but can still yield a
624			// partially valid ready set, which is still better than including nothing.
625			debug!(
626				target: LOG_TARGET,
627				?at,
628				duration = ?start.elapsed(),
629				"fatp::ready_at_light -> most_recent_view"
630			);
631			Box::new(most_recent_view.pool.validated_pool().ready())
632		} else {
633			let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
634			debug!(
635				target: LOG_TARGET,
636				?at,
637				duration = ?start.elapsed(),
638				"fatp::ready_at_light -> empty"
639			);
640			empty
641		}
642	}
643
644	/// Waits for the set of ready transactions for a given block up to a specified timeout.
645	///
646	/// This method combines two futures:
647	/// - The `ready_at` future, which waits for the ready transactions resulting from the full
648	/// maintenance process to be available.
649	/// - The `ready_at_light` future, used as a fallback if the timeout expires before `ready_at`
650	/// completes. This provides a best-effort, ready set of transactions as a result light
651	/// maintain.
652	///
653	/// Returns a future resolving to a ready iterator of transactions.
654	async fn ready_at_with_timeout_internal(
655		&self,
656		at: Block::Hash,
657		timeout: std::time::Duration,
658	) -> ReadyIteratorFor<ChainApi> {
659		debug!(
660			target: LOG_TARGET,
661			?at,
662			?timeout,
663			"fatp::ready_at_with_timeout"
664		);
665		let timeout = futures_timer::Delay::new(timeout);
666		let (view_already_exists, ready_at) = self.ready_at_internal(at);
667
668		if view_already_exists {
669			return ready_at.await;
670		}
671
672		let maybe_ready = async move {
673			select! {
674				ready = ready_at => Some(ready),
675				_ = timeout => {
676					debug!(
677						target: LOG_TARGET,
678						?at,
679						"Timeout fired waiting for transaction pool at block. Proceeding with production."
680					);
681					None
682				}
683			}
684		};
685
686		let fall_back_ready = self.ready_at_light(at);
687		let (maybe_ready, fall_back_ready) =
688			futures::future::join(maybe_ready, fall_back_ready).await;
689		maybe_ready.unwrap_or(fall_back_ready)
690	}
691
692	fn ready_at_internal(
693		&self,
694		at: Block::Hash,
695	) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
696		let mut ready_poll = self.ready_poll.lock();
697
698		if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
699			debug!(
700				target: LOG_TARGET,
701				?at,
702				?inactive,
703				"fatp::ready_at_internal"
704			);
705			let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
706			return (true, async move { iterator }.boxed());
707		}
708
709		let pending = ready_poll
710			.add(at)
711			.map(|received| {
712				received.unwrap_or_else(|error| {
713					warn!(
714						target: LOG_TARGET,
715						%error,
716						"Error receiving ready-set iterator"
717					);
718					Box::new(std::iter::empty())
719				})
720			})
721			.boxed();
722		debug!(
723			target: LOG_TARGET,
724			?at,
725			pending_keys = ?ready_poll.pollers.keys(),
726			"fatp::ready_at_internal"
727		);
728		(false, pending)
729	}
730
731	/// Refer to [`Self::submit_and_watch`]
732	async fn submit_and_watch_inner(
733		&self,
734		at: Block::Hash,
735		source: TransactionSource,
736		xt: TransactionFor<Self>,
737	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
738		let xt = Arc::from(xt);
739
740		let at_number = self
741			.api
742			.block_id_to_number(&BlockId::Hash(at))
743			.ok()
744			.flatten()
745			.unwrap_or_default()
746			.into()
747			.as_u64();
748
749		let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
750			Ok(result) => result,
751			Err(TxPoolApiError::ImmediatelyDropped) =>
752				self.attempt_transaction_replacement(source, at_number, true, xt.clone())
753					.await?,
754			Err(e) => return Err(e.into()),
755		};
756
757		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
758		self.events_metrics_collector.report_submitted(&insertion);
759
760		match self.view_store.submit_and_watch(at, insertion.source, xt).await {
761			Err(e) => {
762				self.mempool.remove_transactions(&[insertion.hash]).await;
763				Err(e.into())
764			},
765			Ok(mut outcome) => {
766				self.mempool
767					.update_transaction_priority(outcome.hash(), outcome.priority())
768					.await;
769				Ok(outcome.expect_watcher())
770			},
771		}
772	}
773
774	/// Refer to [`Self::submit_at`]
775	async fn submit_at_inner(
776		&self,
777		at: Block::Hash,
778		source: TransactionSource,
779		xts: Vec<TransactionFor<Self>>,
780	) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
781		let at_number = self
782			.api
783			.block_id_to_number(&BlockId::Hash(at))
784			.ok()
785			.flatten()
786			.unwrap_or_default()
787			.into()
788			.as_u64();
789		let view_store = self.view_store.clone();
790		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
791		let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
792
793		if view_store.is_empty() {
794			return Ok(mempool_results
795				.into_iter()
796				.map(|r| r.map(|r| r.hash).map_err(Into::into))
797				.collect::<Vec<_>>())
798		}
799
800		// Submit all the transactions to the mempool
801		let retries = mempool_results
802			.into_iter()
803			.zip(xts.clone())
804			.map(|(result, xt)| async move {
805				match result {
806					Err(TxPoolApiError::ImmediatelyDropped) =>
807						self.attempt_transaction_replacement(source, at_number, false, xt).await,
808					_ => result,
809				}
810			})
811			.collect::<Vec<_>>();
812
813		let mempool_results = futures::future::join_all(retries).await;
814
815		// Collect transactions that were successfully submitted to the mempool...
816		let to_be_submitted = mempool_results
817			.iter()
818			.zip(xts)
819			.filter_map(|(result, xt)| {
820				result.as_ref().ok().map(|insertion| {
821					self.events_metrics_collector.report_submitted(&insertion);
822					(insertion.source.clone(), xt)
823				})
824			})
825			.collect::<Vec<_>>();
826
827		self.metrics
828			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
829
830		// ... and submit them to the view_store. Please note that transactions rejected by mempool
831		// are not sent here.
832		let mempool = self.mempool.clone();
833		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
834		let mut submission_results = reduce_multiview_result(results_map).into_iter();
835
836		// Note for composing final result:
837		//
838		// For each failed insertion into the mempool, the mempool result should be placed into
839		// the returned vector.
840		//
841		// For each successful insertion into the mempool, the corresponding
842		// view_store submission result needs to be examined (merged_results):
843		// - If there is an error during view_store submission, the transaction is removed from
844		// the mempool, and the final result recorded in the vector for this transaction is the
845		// view_store submission error.
846		//
847		// - If the view_store submission is successful, the transaction priority is updated in the
848		// mempool.
849		//
850		// Finally, it collects the hashes of updated transactions or submission errors (either
851		// from the mempool or view_store) into a returned vector (final_results).
852		const RESULTS_ASSUMPTION : &str =
853			"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
854		let merged_results = mempool_results.into_iter().map(|result| {
855			result.map_err(Into::into).and_then(|insertion| {
856				Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
857			})
858		});
859
860		let mut final_results = vec![];
861		for r in merged_results {
862			match r {
863				Ok((hash, submission_result)) => match submission_result {
864					Ok(r) => {
865						mempool.update_transaction_priority(r.hash(), r.priority()).await;
866						final_results.push(Ok(r.hash()));
867					},
868					Err(e) => {
869						mempool.remove_transactions(&[hash]).await;
870						final_results.push(Err(e));
871					},
872				},
873				Err(e) => final_results.push(Err(e)),
874			}
875		}
876
877		Ok(final_results)
878	}
879
880	/// Number of notified items in import_notification_sink.
881	///
882	/// Internal detail, exposed only for testing.
883	pub fn import_notification_sink_len(&self) -> usize {
884		self.import_notification_sink.notified_items_len()
885	}
886}
887
888/// Converts the input view-to-statuses map into the output vector of statuses.
889///
890/// The result of importing a bunch of transactions into a single view is the vector of statuses.
891/// Every item represents a status for single transaction. The input is the map that associates
892/// hash-views with vectors indicating the statuses of transactions imports.
893///
894/// Import to multiple views result in two-dimensional array of statuses, which is provided as
895/// input map.
896///
897/// This function converts the map into the vec of results, according to the following rules:
898/// - for given transaction if at least one status is success, then output vector contains success,
899/// - if given transaction status is error for every view, then output vector contains error.
900///
901/// The results for transactions are in the same order for every view. An output vector preserves
902/// this order.
903///
904/// ```skip
905/// in:
906/// view  |   xt0 status | xt1 status | xt2 status
907/// h1   -> [ Ok(xth0),    Ok(xth1),    Err       ]
908/// h2   -> [ Ok(xth0),    Err,         Err       ]
909/// h3   -> [ Ok(xth0),    Ok(xth1),    Err       ]
910///
911/// out:
912/// [ Ok(xth0), Ok(xth1), Err ]
913/// ```
914fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
915	let mut values = input.values();
916	let Some(first) = values.next() else {
917		return Default::default();
918	};
919	let length = first.len();
920	debug_assert!(values.all(|x| length == x.len()));
921
922	input
923		.into_values()
924		.reduce(|mut agg_results, results| {
925			agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
926				if agg_r.is_err() {
927					*agg_r = r;
928				}
929			});
930			agg_results
931		})
932		.unwrap_or_default()
933}
934
935#[async_trait]
936impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
937where
938	Block: BlockT,
939	ChainApi: 'static + graph::ChainApi<Block = Block>,
940	<Block as BlockT>::Hash: Unpin,
941{
942	type Block = ChainApi::Block;
943	type Hash = ExtrinsicHash<ChainApi>;
944	type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
945	type Error = ChainApi::Error;
946
947	/// Submits multiple transactions and returns a future resolving to the submission results.
948	///
949	/// Actual transactions submission process is delegated to the `ViewStore` internal instance.
950	///
951	/// The internal limits of the pool are checked. The results of submissions to individual views
952	/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
953	async fn submit_at(
954		&self,
955		at: <Self::Block as BlockT>::Hash,
956		source: TransactionSource,
957		xts: Vec<TransactionFor<Self>>,
958	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
959		let start = Instant::now();
960		trace!(
961			target: LOG_TARGET,
962			count = xts.len(),
963			active_views_count = self.active_views_count(),
964			"fatp::submit_at"
965		);
966		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
967		let result = self.submit_at_inner(at, source, xts).await;
968		insert_and_log_throttled!(
969			Level::DEBUG,
970			target:LOG_TARGET_STAT,
971			prefix:"submit_stats",
972			self.submit_stats,
973			start.elapsed().into()
974		);
975		result
976	}
977
978	/// Submits a single transaction and returns a future resolving to the submission results.
979	///
980	/// Actual transaction submission process is delegated to the `submit_at` function.
981	async fn submit_one(
982		&self,
983		_at: <Self::Block as BlockT>::Hash,
984		source: TransactionSource,
985		xt: TransactionFor<Self>,
986	) -> Result<TxHash<Self>, Self::Error> {
987		trace!(
988			target: LOG_TARGET,
989			tx_hash = ?self.tx_hash(&xt),
990			active_views_count = self.active_views_count(),
991			"fatp::submit_one"
992		);
993		match self.submit_at(_at, source, vec![xt]).await {
994			Ok(mut v) =>
995				v.pop().expect("There is exactly one element in result of submit_at. qed."),
996			Err(e) => Err(e),
997		}
998	}
999
1000	/// Submits a transaction and starts to watch its progress in the pool, returning a stream of
1001	/// status updates.
1002	///
1003	/// Actual transaction submission process is delegated to the `ViewStore` internal instance.
1004	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1005	async fn submit_and_watch(
1006		&self,
1007		at: <Self::Block as BlockT>::Hash,
1008		source: TransactionSource,
1009		xt: TransactionFor<Self>,
1010	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1011		let start = Instant::now();
1012		trace!(
1013			target: LOG_TARGET,
1014			tx_hash = ?self.tx_hash(&xt),
1015			views = self.active_views_count(),
1016			"fatp::submit_and_watch"
1017		);
1018		let result = self.submit_and_watch_inner(at, source, xt).await;
1019		insert_and_log_throttled!(
1020			Level::DEBUG,
1021			target:LOG_TARGET_STAT,
1022			prefix:"submit_and_watch_stats",
1023			self.submit_and_watch_stats,
1024			start.elapsed().into()
1025		);
1026		result
1027	}
1028
1029	/// Reports invalid transactions to the transaction pool.
1030	///
1031	/// This function takes an array of tuples, each consisting of a transaction hash and the
1032	/// corresponding error that occurred during transaction execution at given block.
1033	///
1034	/// The transaction pool implementation will determine which transactions should be
1035	/// removed from the pool. Transactions that depend on invalid transactions will also
1036	/// be removed.
1037	async fn report_invalid(
1038		&self,
1039		at: Option<<Self::Block as BlockT>::Hash>,
1040		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1041	) -> Vec<Arc<Self::InPoolTransaction>> {
1042		debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1043		log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1044		self.metrics
1045			.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1046
1047		let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1048
1049		let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1050		self.mempool.remove_transactions(&removed_hashes).await;
1051		self.import_notification_sink.clean_notified_items(&removed_hashes);
1052
1053		self.metrics
1054			.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1055
1056		removed
1057	}
1058
1059	// todo [#5491]: api change?
1060	// status(Hash) -> Option<PoolStatus>
1061	/// Returns the pool status which includes information like the number of ready and future
1062	/// transactions.
1063	///
1064	/// Currently the status for the most recently notified best block is returned (for which
1065	/// maintain process was accomplished).
1066	fn status(&self) -> PoolStatus {
1067		self.view_store
1068			.most_recent_view
1069			.read()
1070			.as_ref()
1071			.map(|v| v.status())
1072			.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1073	}
1074
1075	/// Return an event stream of notifications when transactions are imported to the pool.
1076	///
1077	/// Consumers of this stream should use the `ready` method to actually get the
1078	/// pending transactions in the right order.
1079	fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1080		self.import_notification_sink.event_stream()
1081	}
1082
1083	/// Returns the hash of a given transaction.
1084	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1085		self.api().hash_and_length(xt).0
1086	}
1087
1088	/// Notifies the pool about the broadcasting status of transactions.
1089	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1090		self.view_store.listener.transactions_broadcasted(propagations);
1091	}
1092
1093	/// Return specific ready transaction by hash, if there is one.
1094	///
1095	/// Currently the ready transaction is returned if it exists for the most recently notified best
1096	/// block (for which maintain process was accomplished).
1097	// todo [#5491]: api change: we probably should have at here?
1098	fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1099		let most_recent_view_hash =
1100			self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1101		let result = most_recent_view_hash
1102			.and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1103		trace!(
1104			target: LOG_TARGET,
1105			?tx_hash,
1106			is_ready = result.is_some(),
1107			most_recent_view = ?most_recent_view_hash,
1108			"ready_transaction"
1109		);
1110		result
1111	}
1112
1113	/// Returns an iterator for ready transactions at a specific block, ordered by priority.
1114	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1115		let (_, result) = self.ready_at_internal(at);
1116		result.await
1117	}
1118
1119	/// Returns an iterator for ready transactions, ordered by priority.
1120	///
1121	/// Currently the set of ready transactions is returned if it exists for the most recently
1122	/// notified best block (for which maintain process was accomplished).
1123	fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1124		self.view_store.ready()
1125	}
1126
1127	/// Returns a list of future transactions in the pool.
1128	///
1129	/// Currently the set of future transactions is returned if it exists for the most recently
1130	/// notified best block (for which maintain process was accomplished).
1131	fn futures(&self) -> Vec<Self::InPoolTransaction> {
1132		self.view_store.futures()
1133	}
1134
1135	/// Returns a set of ready transactions at a given block within the specified timeout.
1136	///
1137	/// If the timeout expires before the maintain process is accomplished, a best-effort
1138	/// set of transactions is returned (refer to `ready_at_light`).
1139	async fn ready_at_with_timeout(
1140		&self,
1141		at: <Self::Block as BlockT>::Hash,
1142		timeout: std::time::Duration,
1143	) -> ReadyIteratorFor<ChainApi> {
1144		self.ready_at_with_timeout_internal(at, timeout).await
1145	}
1146}
1147
1148impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1149	for ForkAwareTxPool<ChainApi, Block>
1150where
1151	Block: BlockT,
1152	ChainApi: 'static + graph::ChainApi<Block = Block>,
1153	<Block as BlockT>::Hash: Unpin,
1154{
1155	type Block = Block;
1156	type Hash = ExtrinsicHash<ChainApi>;
1157	type Error = ChainApi::Error;
1158
1159	fn submit_local(
1160		&self,
1161		at: Block::Hash,
1162		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1163	) -> Result<Self::Hash, Self::Error> {
1164		trace!(
1165			target: LOG_TARGET,
1166			active_views_count = self.active_views_count(),
1167			"fatp::submit_local"
1168		);
1169		let xt = Arc::from(xt);
1170		let at_number = self
1171			.api
1172			.block_id_to_number(&BlockId::Hash(at))
1173			.ok()
1174			.flatten()
1175			.unwrap_or_default()
1176			.into()
1177			.as_u64();
1178
1179		// note: would be nice to get rid of sync methods one day. See: #8912
1180		let result = self
1181			.mempool
1182			.clone()
1183			.extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1184			.remove(0);
1185
1186		let insertion = match result {
1187			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1188				TransactionSource::Local,
1189				false,
1190				xt.clone(),
1191			),
1192			_ => result,
1193		}?;
1194
1195		self.view_store
1196			.submit_local(xt)
1197			.inspect_err(|_| {
1198				self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1199			})
1200			.map(|outcome| {
1201				self.mempool
1202					.clone()
1203					.update_transaction_priority_sync(outcome.hash(), outcome.priority());
1204				outcome.hash()
1205			})
1206			.or_else(|_| Ok(insertion.hash))
1207	}
1208}
1209
1210impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1211where
1212	Block: BlockT,
1213	ChainApi: graph::ChainApi<Block = Block> + 'static,
1214	<Block as BlockT>::Hash: Unpin,
1215{
1216	/// Handles a new block notification.
1217	///
1218	/// It is responsible for handling a newly notified block. It executes some sanity checks, find
1219	/// the best view to clone from and executes the new view build procedure for the notified
1220	/// block.
1221	///
1222	/// If the view is correctly created, `ready_at` pollers for this block will be triggered.
1223	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1224	async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1225		let hash_and_number = match tree_route.last() {
1226			Some(hash_and_number) => hash_and_number,
1227			None => {
1228				warn!(
1229					target: LOG_TARGET,
1230					?tree_route,
1231					"Skipping ChainEvent - no last block in tree route"
1232				);
1233				return
1234			},
1235		};
1236
1237		if self.has_view(&hash_and_number.hash) {
1238			debug!(
1239				target: LOG_TARGET,
1240				?hash_and_number,
1241				"view already exists for block"
1242			);
1243			return
1244		}
1245
1246		let best_view = self.view_store.find_best_view(tree_route);
1247		let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1248
1249		if let Some(view) = new_view {
1250			{
1251				let view = view.clone();
1252				self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1253					Box::from(view.pool.validated_pool().ready())
1254				});
1255			}
1256
1257			View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1258		}
1259
1260		self.finality_stall_cleanup(hash_and_number).await;
1261	}
1262
1263	/// Cleans up transactions and views outdated by potential finality stalls.
1264	///
1265	/// This function removes transactions from the pool that were included in blocks but not
1266	/// finalized within a pre-defined block height threshold. Transactions not meeting finality
1267	/// within this threshold are notified with finality timed out event. The threshold is based on
1268	/// the current block number, 'at'.
1269	///
1270	/// Additionally, this method triggers the view store to handle and remove stale views caused by
1271	/// the finality stall.
1272	async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1273		let (oldest_block_number, finality_timedout_blocks) = {
1274			let mut included_transactions = self.included_transactions.lock();
1275
1276			let Some(oldest_block_number) =
1277				included_transactions.first_key_value().map(|(k, _)| k.number)
1278			else {
1279				return
1280			};
1281
1282			if at.number.saturating_sub(oldest_block_number).into() <=
1283				self.finality_timeout_threshold.into()
1284			{
1285				return
1286			}
1287
1288			let mut finality_timedout_blocks =
1289				indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1290
1291			included_transactions.retain(
1292				|HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1293					let diff = at.number.saturating_sub(*view_number);
1294					if diff.into() > self.finality_timeout_threshold.into() {
1295						finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1296						false
1297					} else {
1298						true
1299					}
1300				},
1301			);
1302
1303			(oldest_block_number, finality_timedout_blocks)
1304		};
1305
1306		if !finality_timedout_blocks.is_empty() {
1307			self.ready_poll.lock().remove_cancelled();
1308			self.view_store.listener.remove_stale_controllers();
1309		}
1310
1311		let finality_timedout_blocks_len = finality_timedout_blocks.len();
1312
1313		for (block_hash, tx_hashes) in finality_timedout_blocks {
1314			self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1315
1316			self.mempool.remove_transactions(&tx_hashes).await;
1317			self.import_notification_sink.clean_notified_items(&tx_hashes);
1318			self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1319		}
1320
1321		self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1322
1323		debug!(
1324			target: LOG_TARGET,
1325			?at,
1326			included_transactions_len = ?self.included_transactions.lock().len(),
1327			finality_timedout_blocks_len,
1328			?oldest_block_number,
1329			"finality_stall_cleanup"
1330		);
1331	}
1332
1333	/// Builds a new view.
1334	///
1335	/// If `origin_view` is provided, the new view will be cloned from it. Otherwise an empty view
1336	/// will be created.
1337	///
1338	/// This method will also update multi-view listeners with newly created view.
1339	///
1340	/// The new view will not be inserted into the view store.
1341	fn build_and_plug_view(
1342		&self,
1343		origin_view: Option<Arc<View<ChainApi>>>,
1344		at: &HashAndNumber<Block>,
1345		tree_route: &TreeRoute<Block>,
1346	) -> View<ChainApi> {
1347		let enter = Instant::now();
1348		let (view, view_dropped_stream, view_aggregated_stream) =
1349			if let Some(origin_view) = origin_view {
1350				let (mut view, view_dropped_stream, view_aggragated_stream) =
1351					View::new_from_other(&origin_view, at);
1352				if !tree_route.retracted().is_empty() {
1353					view.pool.clear_recently_pruned();
1354				}
1355				(view, view_dropped_stream, view_aggragated_stream)
1356			} else {
1357				debug!(
1358					target: LOG_TARGET,
1359					?at,
1360					"creating non-cloned view"
1361				);
1362				View::new(
1363					self.api.clone(),
1364					at.clone(),
1365					self.options.clone(),
1366					self.metrics.clone(),
1367					self.is_validator.clone(),
1368				)
1369			};
1370		debug!(
1371			target: LOG_TARGET,
1372			?at,
1373			duration = ?enter.elapsed(),
1374			"build_new_view::clone_view"
1375		);
1376
1377		// 1. Capture all import notification from the very beginning, so first register all
1378		//the listeners.
1379		self.import_notification_sink.add_view(
1380			view.at.hash,
1381			view.pool.validated_pool().import_notification_stream().boxed(),
1382		);
1383
1384		self.view_store
1385			.dropped_stream_controller
1386			.add_view(view.at.hash, view_dropped_stream.boxed());
1387
1388		self.view_store
1389			.listener
1390			.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1391
1392		view
1393	}
1394
1395	/// Builds and updates a new view.
1396	///
1397	/// This functio uses [`Self::build_new_view`] to create or clone new view.
1398	///
1399	/// The new view will be updated with transactions from the tree_route and the mempool, all
1400	/// required events will be triggered, it will be inserted to the view store (respecting all
1401	/// pre-insertion actions).
1402	async fn build_and_update_view(
1403		&self,
1404		origin_view: Option<Arc<View<ChainApi>>>,
1405		at: &HashAndNumber<Block>,
1406		tree_route: &TreeRoute<Block>,
1407	) -> Option<Arc<View<ChainApi>>> {
1408		let start = Instant::now();
1409		debug!(
1410			target: LOG_TARGET,
1411			?at,
1412			origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1413			?tree_route,
1414			"build_new_view"
1415		);
1416
1417		let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1418
1419		// sync the transactions statuses and referencing views in all the listeners with newly
1420		// cloned view.
1421		view.pool.validated_pool().retrigger_notifications();
1422		debug!(
1423			target: LOG_TARGET,
1424			?at,
1425			duration = ?start.elapsed(),
1426			"register_listeners"
1427		);
1428
1429		// 2. Handle transactions from the tree route. Pruning transactions from the view first
1430		// will make some space for mempool transactions in case we are at the view's limits.
1431		let start = Instant::now();
1432		self.update_view_with_fork(&view, tree_route, at.clone()).await;
1433		debug!(
1434			target: LOG_TARGET,
1435			?at,
1436			duration = ?start.elapsed(),
1437			"update_view_with_fork"
1438		);
1439
1440		// 3. Finally, submit transactions from the mempool.
1441		let start = Instant::now();
1442		self.update_view_with_mempool(&mut view).await;
1443		debug!(
1444			target: LOG_TARGET,
1445			?at,
1446			duration= ?start.elapsed(),
1447			"update_view_with_mempool"
1448		);
1449		let view = Arc::from(view);
1450		self.view_store.insert_new_view(view.clone(), tree_route).await;
1451
1452		debug!(
1453			target: LOG_TARGET,
1454			duration = ?start.elapsed(),
1455			?at,
1456			"build_new_view"
1457		);
1458		Some(view)
1459	}
1460
1461	/// Retrieves transactions hashes from a `included_transactions` cache or, if not present,
1462	/// fetches them from the blockchain API using the block's hash `at`.
1463	///
1464	/// Returns a `Vec` of transactions hashes
1465	async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1466		if let Some(txs) = self.included_transactions.lock().get(at) {
1467			return txs.clone()
1468		};
1469
1470		debug!(
1471			target: LOG_TARGET,
1472			?at,
1473			"fetch_block_transactions from api"
1474		);
1475
1476		self.api
1477			.block_body(at.hash)
1478			.await
1479			.unwrap_or_else(|error| {
1480				warn!(
1481					target: LOG_TARGET,
1482					%error,
1483					"fetch_block_transactions: error request"
1484				);
1485				None
1486			})
1487			.unwrap_or_default()
1488			.into_iter()
1489			.map(|t| self.hash_of(&t))
1490			.collect::<Vec<_>>()
1491	}
1492
1493	/// Returns the list of xts included in all block's ancestors up to recently finalized block (or
1494	/// up finality timeout threshold), including the block itself.
1495	///
1496	/// Example: for the following chain `F<-B1<-B2<-B3` xts from `B1,B2,B3` will be returned.
1497	async fn txs_included_since_finalized(
1498		&self,
1499		at: &HashAndNumber<Block>,
1500	) -> HashSet<TxHash<Self>> {
1501		let start = Instant::now();
1502		let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1503
1504		let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1505			return Default::default()
1506		};
1507
1508		let mut all_txs = HashSet::new();
1509
1510		for block in tree_route.enacted().iter() {
1511			// note: There is no point to fetch the transactions from blocks older than threshold.
1512			// All transactions included in these blocks, were already removed from pool
1513			// with FinalityTimeout event.
1514			if at.number.saturating_sub(block.number).into() <=
1515				self.finality_timeout_threshold.into()
1516			{
1517				all_txs.extend(self.fetch_block_transactions(block).await);
1518			}
1519		}
1520
1521		debug!(
1522			target: LOG_TARGET,
1523			?at,
1524			?recent_finalized_block,
1525			extrinsics_count = all_txs.len(),
1526			duration = ?start.elapsed(),
1527			"fatp::txs_included_since_finalized"
1528		);
1529		all_txs
1530	}
1531
1532	/// Updates the given view with the transactions from the internal mempol.
1533	///
1534	/// All transactions from the mempool (excluding those which are either already imported or
1535	/// already included in blocks since recently finalized block) are submitted to the
1536	/// view.
1537	///
1538	/// If there are no views, and mempool transaction is reported as invalid for the given view,
1539	/// the transaction is notified as invalid and removed from the mempool.
1540	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1541		let xts_count = self.mempool.unwatched_and_watched_count().await;
1542		debug!(
1543			target: LOG_TARGET,
1544			view_at = ?view.at,
1545			?xts_count,
1546			active_views_count = self.active_views_count(),
1547			"update_view_with_mempool"
1548		);
1549		let included_xts = self.txs_included_since_finalized(&view.at).await;
1550
1551		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1552			.mempool
1553			.with_transactions(|iter| {
1554				iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1555					.map(|(k, v)| (*k, v.clone()))
1556					//todo [#8835]: better approach is needed - maybe time-budget approach?
1557					.take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1558					.collect::<HashMap<_, _>>()
1559			})
1560			.await
1561			.into_iter()
1562			.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1563			.unzip();
1564
1565		let results = view
1566			.submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1567			.await
1568			.into_iter()
1569			.zip(hashes)
1570			.map(|(result, tx_hash)| async move {
1571				if let Ok(outcome) = result {
1572					Ok(self
1573						.mempool
1574						.update_transaction_priority(outcome.hash(), outcome.priority())
1575						.await)
1576				} else {
1577					Err(tx_hash)
1578				}
1579			})
1580			.collect::<Vec<_>>();
1581
1582		let results = futures::future::join_all(results).await;
1583
1584		let submitted_count = results.len();
1585
1586		debug!(
1587			target: LOG_TARGET,
1588			view_at_hash = ?view.at.hash,
1589			submitted_count,
1590			mempool_len = self.mempool.len(),
1591			"update_view_with_mempool"
1592		);
1593
1594		self.metrics
1595			.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1596
1597		// if there are no views yet, and a single newly created view is reporting error, just send
1598		// out the invalid event, and remove transaction.
1599		if self.view_store.is_empty() {
1600			for result in results {
1601				if let Err(tx_hash) = result {
1602					self.view_store.listener.transactions_invalidated(&[tx_hash]);
1603					self.mempool.remove_transactions(&[tx_hash]).await;
1604				}
1605			}
1606		}
1607	}
1608
1609	/// Attempts to search the view store for the `provides` tags of enacted
1610	/// transactions associated with the specified `tree_route`.
1611	///
1612	/// The 'provides' tags of transactions from enacted blocks are searched
1613	/// in inactive views. Found `provide` tags are intended to serve as cache,
1614	/// helping to avoid unnecessary revalidations during pruning.
1615	async fn collect_provides_tags_from_view_store(
1616		&self,
1617		tree_route: &TreeRoute<Block>,
1618		xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1619	) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1620		let blocks_hashes = tree_route
1621			.retracted()
1622			.iter()
1623			// Skip the tip of the retracted fork, since it has an active view.
1624			.skip(1)
1625			// Skip also the tip of the enacted fork, since it has an active view too.
1626			.chain(
1627				std::iter::once(tree_route.common_block())
1628					.chain(tree_route.enacted().iter().rev().skip(1)),
1629			)
1630			.collect::<Vec<&HashAndNumber<Block>>>();
1631
1632		self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1633	}
1634
1635	/// Build a map from blocks to their extrinsics.
1636	pub async fn collect_extrinsics(
1637		&self,
1638		blocks: &[HashAndNumber<Block>],
1639	) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1640		future::join_all(blocks.iter().map(|hn| async move {
1641			(
1642				hn.hash,
1643				self.api
1644					.block_body(hn.hash)
1645					.await
1646					.unwrap_or_else(|e| {
1647						warn!(target: LOG_TARGET, %e, ": block_body error request");
1648						None
1649					})
1650					.unwrap_or_default(),
1651			)
1652		}))
1653		.await
1654		.into_iter()
1655		.collect()
1656	}
1657
1658	/// Updates the view with the transactions from the given tree route.
1659	///
1660	/// Transactions from the retracted blocks are resubmitted to the given view. Tags for
1661	/// transactions included in blocks on enacted fork are pruned from the provided view.
1662	async fn update_view_with_fork(
1663		&self,
1664		view: &View<ChainApi>,
1665		tree_route: &TreeRoute<Block>,
1666		hash_and_number: HashAndNumber<Block>,
1667	) {
1668		debug!(
1669			target: LOG_TARGET,
1670			?tree_route,
1671			at = ?view.at,
1672			"update_view_with_fork"
1673		);
1674		let api = self.api.clone();
1675
1676		// Collect extrinsics on the enacted path in a map from block hn -> extrinsics.
1677		let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1678
1679		// Create a map from enacted blocks' extrinsics to their `provides`
1680		// tags based on inactive views.
1681		let known_provides_tags = Arc::new(
1682			self.collect_provides_tags_from_view_store(
1683				tree_route,
1684				extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1685			)
1686			.await,
1687		);
1688
1689		debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1690
1691		// We keep track of everything we prune so that later we won't add
1692		// transactions with those hashes from the retracted blocks.
1693		let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1694		future::join_all(tree_route.enacted().iter().map(|hn| {
1695			let api = api.clone();
1696			let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1697			let known_provides_tags = known_provides_tags.clone();
1698			async move {
1699				(
1700					hn,
1701					crate::prune_known_txs_for_block(
1702						hn,
1703						&*api,
1704						&view.pool,
1705						Some(xts),
1706						Some(known_provides_tags),
1707					)
1708					.await,
1709				)
1710			}
1711		}))
1712		.await
1713		.into_iter()
1714		.for_each(|(key, enacted_log)| {
1715			pruned_log.extend(enacted_log.clone());
1716			self.included_transactions.lock().insert(key.clone(), enacted_log);
1717		});
1718
1719		let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1720		self.metrics
1721			.report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1722
1723		//resubmit
1724		{
1725			let mut resubmit_transactions = Vec::new();
1726
1727			for retracted in tree_route.retracted() {
1728				let hash = retracted.hash;
1729
1730				let block_transactions = api
1731					.block_body(hash)
1732					.await
1733					.unwrap_or_else(|error| {
1734						warn!(
1735							target: LOG_TARGET,
1736							%error,
1737							"Failed to fetch block body"
1738						);
1739						None
1740					})
1741					.unwrap_or_default()
1742					.into_iter();
1743
1744				let mut resubmitted_to_report = 0;
1745
1746				let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1747					|(tx_hash, _)| {
1748						let contains = pruned_log.contains(&tx_hash);
1749
1750						// need to count all transactions, not just filtered, here
1751						resubmitted_to_report += 1;
1752
1753						if !contains {
1754							trace!(
1755								target: LOG_TARGET,
1756								?tx_hash,
1757								?hash,
1758								"Resubmitting from retracted block"
1759							);
1760						}
1761						!contains
1762					},
1763				);
1764				let mut result = vec![];
1765				for (tx_hash, tx) in txs {
1766					result.push(
1767						//find arc if tx is known
1768						self.mempool
1769							.get_by_hash(tx_hash)
1770							.await
1771							.map(|tx| (tx.source(), tx.tx()))
1772							.unwrap_or_else(|| {
1773								// These transactions are coming from retracted blocks, we
1774								// should simply consider them external.
1775								(TimedTransactionSource::new_external(true), Arc::from(tx))
1776							}),
1777					);
1778				}
1779				resubmit_transactions.extend(result);
1780
1781				self.metrics.report(|metrics| {
1782					metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1783				});
1784			}
1785
1786			let _ = view
1787				.pool
1788				.resubmit_at(
1789					&hash_and_number,
1790					resubmit_transactions,
1791					ValidateTransactionPriority::Maintained,
1792				)
1793				.await;
1794		}
1795	}
1796
1797	/// Executes the maintainance for the finalized event.
1798	///
1799	/// Performs a house-keeping required for finalized event. This includes:
1800	/// - executing the on finalized procedure for the view store,
1801	/// - purging finalized transactions from the mempool and triggering mempool revalidation,
1802	async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1803		let start = Instant::now();
1804		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1805		debug!(
1806			target: LOG_TARGET,
1807			?finalized_number,
1808			?tree_route,
1809			active_views_count = self.active_views_count(),
1810			"handle_finalized"
1811		);
1812		let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1813
1814		self.mempool.purge_finalized_transactions(&finalized_xts).await;
1815		self.import_notification_sink.clean_notified_items(&finalized_xts);
1816
1817		self.metrics
1818			.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1819
1820		if let Ok(Some(finalized_number)) = finalized_number {
1821			self.included_transactions
1822				.lock()
1823				.retain(|cached_block, _| finalized_number < cached_block.number);
1824			self.revalidation_queue
1825				.revalidate_mempool(
1826					self.mempool.clone(),
1827					self.view_store.clone(),
1828					HashAndNumber { hash: finalized_hash, number: finalized_number },
1829				)
1830				.await;
1831		} else {
1832			debug!(
1833				target: LOG_TARGET,
1834				?finalized_number,
1835				"handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1836			);
1837		}
1838
1839		self.ready_poll.lock().remove_cancelled();
1840
1841		debug!(
1842			target: LOG_TARGET,
1843			active_views_count = self.active_views_count(),
1844			included_transactions_len = ?self.included_transactions.lock().len(),
1845			duration = ?start.elapsed(),
1846			"handle_finalized after"
1847		);
1848	}
1849
1850	/// Computes a hash of the provided transaction
1851	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1852		self.api.hash_and_length(xt).0
1853	}
1854
1855	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
1856	/// one.
1857	///
1858	/// This asynchronous function verifies the new transaction against the most recent view. If a
1859	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
1860	/// new transaction.
1861	///
1862	/// If no lower-priority transaction is found, the function returns an error indicating the
1863	/// transaction was dropped immediately.
1864	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1865	async fn attempt_transaction_replacement(
1866		&self,
1867		source: TransactionSource,
1868		at_number: u64,
1869		watched: bool,
1870		xt: ExtrinsicFor<ChainApi>,
1871	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1872		let best_view = self
1873			.view_store
1874			.most_recent_view
1875			.read()
1876			.as_ref()
1877			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1878			.clone();
1879
1880		let (xt_hash, validated_tx) = best_view
1881			.pool
1882			.verify_one(
1883				best_view.at.hash,
1884				best_view.at.number,
1885				TimedTransactionSource::from_transaction_source(source, false),
1886				xt.clone(),
1887				crate::graph::CheckBannedBeforeVerify::Yes,
1888				ValidateTransactionPriority::Submitted,
1889			)
1890			.await;
1891
1892		let Some(priority) = validated_tx.priority() else {
1893			return Err(TxPoolApiError::ImmediatelyDropped)
1894		};
1895
1896		let insertion_info = self
1897			.mempool
1898			.try_insert_with_replacement(xt, priority, source, at_number, watched)
1899			.await?;
1900		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1901	}
1902
1903	/// Sync version of [`Self::attempt_transaction_replacement`].
1904	fn attempt_transaction_replacement_sync(
1905		&self,
1906		source: TransactionSource,
1907		watched: bool,
1908		xt: ExtrinsicFor<ChainApi>,
1909	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1910		let HashAndNumber { number: at_number, hash: at_hash } = self
1911			.view_store
1912			.most_recent_view
1913			.read()
1914			.as_ref()
1915			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1916			.at;
1917
1918		let ValidTransaction { priority, .. } = self
1919			.api
1920			.validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1921			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1922			.map_err(|e| match e {
1923				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1924				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1925			})?;
1926		let xt_hash = self.hash_of(&xt);
1927
1928		let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1929			xt,
1930			priority,
1931			source,
1932			at_number.into().as_u64(),
1933			watched,
1934		)?;
1935		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1936	}
1937
1938	fn post_attempt_transaction_replacement(
1939		&self,
1940		tx_hash: ExtrinsicHash<ChainApi>,
1941		insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1942	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1943		for worst_hash in &insertion_info.removed {
1944			trace!(
1945				target: LOG_TARGET,
1946				tx_hash = ?worst_hash,
1947				new_tx_hash = ?tx_hash,
1948				"removed: replaced by"
1949			);
1950			self.view_store
1951				.listener
1952				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1953
1954			self.view_store
1955				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1956					listener.limits_enforced(&removed_tx_hash);
1957				});
1958		}
1959
1960		return Ok(insertion_info)
1961	}
1962}
1963
1964#[async_trait]
1965impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1966where
1967	Block: BlockT,
1968	ChainApi: 'static + graph::ChainApi<Block = Block>,
1969	<Block as BlockT>::Hash: Unpin,
1970{
1971	/// Executes the maintainance for the given chain event.
1972	async fn maintain(&self, event: ChainEvent<Self::Block>) {
1973		let start = Instant::now();
1974		debug!(
1975			target: LOG_TARGET,
1976			?event,
1977			"processing event"
1978		);
1979
1980		self.view_store.finish_background_revalidations().await;
1981
1982		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1983
1984		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1985			match self.api.tree_route(from, to) {
1986				Ok(tree_route) => Ok(tree_route),
1987				Err(e) =>
1988					return Err(format!(
1989						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1990					)),
1991			}
1992		};
1993		let block_id_to_number =
1994			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1995
1996		let result =
1997			self.enactment_state
1998				.lock()
1999				.update(&event, &compute_tree_route, &block_id_to_number);
2000
2001		match result {
2002			Err(error) => {
2003				debug!(
2004					target: LOG_TARGET,
2005					%error,
2006					"enactment_state::update error"
2007				);
2008				self.enactment_state.lock().force_update(&event);
2009			},
2010			Ok(EnactmentAction::Skip) => return,
2011			Ok(EnactmentAction::HandleFinalization) => {
2012				// todo [#5492]: in some cases handle_new_block is actually needed (new_num >
2013				// tips_of_forks) let hash = event.hash();
2014				// if !self.has_view(hash) {
2015				// 	if let Ok(tree_route) = compute_tree_route(prev_finalized_block, hash) {
2016				// 		self.handle_new_block(&tree_route).await;
2017				// 	}
2018				// }
2019			},
2020			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2021				self.handle_new_block(&tree_route).await;
2022			},
2023		};
2024
2025		match event {
2026			ChainEvent::NewBestBlock { .. } => {},
2027			ChainEvent::Finalized { hash, ref tree_route } => {
2028				self.handle_finalized(hash, tree_route).await;
2029
2030				debug!(
2031					target: LOG_TARGET,
2032					?tree_route,
2033					?prev_finalized_block,
2034					"on-finalized enacted"
2035				);
2036			},
2037		}
2038
2039		let duration = start.elapsed();
2040		let mempool_len = self.mempool_len().await;
2041		info!(
2042			target: LOG_TARGET,
2043			txs = ?mempool_len,
2044			a = self.active_views_count(),
2045			i = self.inactive_views_count(),
2046			views = ?self.views_stats(),
2047			?event,
2048			?duration,
2049			"maintain"
2050		);
2051
2052		self.metrics.report(|metrics| {
2053			let (unwatched, watched) = mempool_len;
2054			let _ = (
2055				self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2056				self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2057				watched.try_into().map(|v| metrics.watched_txs.set(v)),
2058				unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2059			);
2060			metrics.maintain_duration.observe(duration.as_secs_f64());
2061		});
2062	}
2063}
2064
2065impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2066where
2067	Block: BlockT,
2068	Client: sp_api::ProvideRuntimeApi<Block>
2069		+ sc_client_api::BlockBackend<Block>
2070		+ sc_client_api::blockchain::HeaderBackend<Block>
2071		+ sp_runtime::traits::BlockIdTo<Block>
2072		+ sc_client_api::ExecutorProvider<Block>
2073		+ sc_client_api::UsageProvider<Block>
2074		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2075		+ Send
2076		+ Sync
2077		+ 'static,
2078	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2079	<Block as BlockT>::Hash: std::marker::Unpin,
2080{
2081	/// Create new fork aware transaction pool for a full node with the provided api.
2082	pub fn new_full(
2083		options: Options,
2084		is_validator: IsValidator,
2085		prometheus: Option<&PrometheusRegistry>,
2086		spawner: impl SpawnEssentialNamed,
2087		client: Arc<Client>,
2088	) -> Self {
2089		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2090		let pool = Self::new_with_background_worker(
2091			options,
2092			is_validator,
2093			pool_api,
2094			prometheus,
2095			spawner,
2096			client.usage_info().chain.best_hash,
2097			client.usage_info().chain.finalized_hash,
2098		);
2099
2100		pool
2101	}
2102}
2103
2104#[cfg(test)]
2105mod reduce_multiview_result_tests {
2106	use super::*;
2107	use sp_core::H256;
2108	#[derive(Debug, PartialEq, Clone)]
2109	enum Error {
2110		Custom(u8),
2111	}
2112
2113	#[test]
2114	fn empty() {
2115		sp_tracing::try_init_simple();
2116		let input = HashMap::default();
2117		let r = reduce_multiview_result::<H256, H256, Error>(input);
2118		assert!(r.is_empty());
2119	}
2120
2121	#[test]
2122	fn errors_only() {
2123		sp_tracing::try_init_simple();
2124		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2125			(
2126				H256::repeat_byte(0x13),
2127				vec![
2128					Err(Error::Custom(10)),
2129					Err(Error::Custom(11)),
2130					Err(Error::Custom(12)),
2131					Err(Error::Custom(13)),
2132				],
2133			),
2134			(
2135				H256::repeat_byte(0x14),
2136				vec![
2137					Err(Error::Custom(20)),
2138					Err(Error::Custom(21)),
2139					Err(Error::Custom(22)),
2140					Err(Error::Custom(23)),
2141				],
2142			),
2143			(
2144				H256::repeat_byte(0x15),
2145				vec![
2146					Err(Error::Custom(30)),
2147					Err(Error::Custom(31)),
2148					Err(Error::Custom(32)),
2149					Err(Error::Custom(33)),
2150				],
2151			),
2152		];
2153		let input = HashMap::from_iter(v.clone());
2154		let r = reduce_multiview_result(input);
2155
2156		//order in HashMap is random, the result shall be one of:
2157		assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2158	}
2159
2160	#[test]
2161	#[should_panic]
2162	#[cfg(debug_assertions)]
2163	fn invalid_lengths() {
2164		sp_tracing::try_init_simple();
2165		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2166			(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2167			(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2168		];
2169		let input = HashMap::from_iter(v);
2170		let _ = reduce_multiview_result(input);
2171	}
2172
2173	#[test]
2174	fn only_hashes() {
2175		sp_tracing::try_init_simple();
2176
2177		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2178			(
2179				H256::repeat_byte(0x13),
2180				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2181			),
2182			(
2183				H256::repeat_byte(0x14),
2184				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2185			),
2186		];
2187		let input = HashMap::from_iter(v);
2188		let r = reduce_multiview_result(input);
2189
2190		assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2191	}
2192
2193	#[test]
2194	fn one_view() {
2195		sp_tracing::try_init_simple();
2196		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2197			H256::repeat_byte(0x13),
2198			vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2199		)];
2200		let input = HashMap::from_iter(v);
2201		let r = reduce_multiview_result(input);
2202
2203		assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2204	}
2205
2206	#[test]
2207	fn mix() {
2208		sp_tracing::try_init_simple();
2209		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2210			(
2211				H256::repeat_byte(0x13),
2212				vec![
2213					Ok(H256::repeat_byte(0x10)),
2214					Err(Error::Custom(11)),
2215					Err(Error::Custom(12)),
2216					Err(Error::Custom(33)),
2217				],
2218			),
2219			(
2220				H256::repeat_byte(0x14),
2221				vec![
2222					Err(Error::Custom(20)),
2223					Ok(H256::repeat_byte(0x21)),
2224					Err(Error::Custom(22)),
2225					Err(Error::Custom(33)),
2226				],
2227			),
2228			(
2229				H256::repeat_byte(0x15),
2230				vec![
2231					Err(Error::Custom(30)),
2232					Err(Error::Custom(31)),
2233					Ok(H256::repeat_byte(0x32)),
2234					Err(Error::Custom(33)),
2235				],
2236			),
2237		];
2238		let input = HashMap::from_iter(v);
2239		let r = reduce_multiview_result(input);
2240
2241		assert_eq!(
2242			r,
2243			vec![
2244				Ok(H256::repeat_byte(0x10)),
2245				Ok(H256::repeat_byte(0x21)),
2246				Ok(H256::repeat_byte(0x32)),
2247				Err(Error::Custom(33))
2248			]
2249		);
2250	}
2251}