sc_transaction_pool/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate transaction pool implementation.
20
21#![recursion_limit = "256"]
22#![warn(missing_docs)]
23#![warn(unused_extern_crates)]
24
25mod api;
26mod enactment_state;
27pub mod error;
28mod graph;
29mod metrics;
30mod revalidation;
31#[cfg(test)]
32mod tests;
33
34pub use crate::api::FullChainApi;
35use async_trait::async_trait;
36use enactment_state::{EnactmentAction, EnactmentState};
37use futures::{
38	channel::oneshot,
39	future::{self, ready},
40	prelude::*,
41};
42pub use graph::{
43	base_pool::Limit as PoolLimit, ChainApi, Options, Pool, Transaction, ValidatedTransaction,
44};
45use parking_lot::Mutex;
46use std::{
47	collections::{HashMap, HashSet},
48	pin::Pin,
49	sync::Arc,
50};
51
52use graph::{ExtrinsicHash, IsValidator};
53use sc_transaction_pool_api::{
54	error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
55	PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, TransactionSource,
56	TransactionStatusStreamFor, TxHash,
57};
58use sp_core::traits::SpawnEssentialNamed;
59use sp_runtime::{
60	generic::BlockId,
61	traits::{AtLeast32Bit, Block as BlockT, Extrinsic, Header as HeaderT, NumberFor, Zero},
62};
63use std::time::Instant;
64
65use crate::metrics::MetricsLink as PrometheusMetrics;
66use prometheus_endpoint::Registry as PrometheusRegistry;
67
68use sp_blockchain::{HashAndNumber, TreeRoute};
69
70pub(crate) const LOG_TARGET: &str = "txpool";
71
72type BoxedReadyIterator<Hash, Data> =
73	Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
74
75type ReadyIteratorFor<PoolApi> =
76	BoxedReadyIterator<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
77
78type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output = ReadyIteratorFor<PoolApi>> + Send>>;
79
80/// A transaction pool for a full node.
81pub type FullPool<Block, Client> = BasicPool<FullChainApi<Client, Block>, Block>;
82
83/// Basic implementation of transaction pool that can be customized by providing PoolApi.
84pub struct BasicPool<PoolApi, Block>
85where
86	Block: BlockT,
87	PoolApi: graph::ChainApi<Block = Block>,
88{
89	pool: Arc<graph::Pool<PoolApi>>,
90	api: Arc<PoolApi>,
91	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
92	revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
93	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
94	metrics: PrometheusMetrics,
95	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
96}
97
98struct ReadyPoll<T, Block: BlockT> {
99	updated_at: NumberFor<Block>,
100	pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
101}
102
103impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
104	fn default() -> Self {
105		Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
106	}
107}
108
109impl<T, Block: BlockT> ReadyPoll<T, Block> {
110	fn new(best_block_number: NumberFor<Block>) -> Self {
111		Self { updated_at: best_block_number, pollers: Default::default() }
112	}
113
114	fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
115		self.updated_at = number;
116
117		let mut idx = 0;
118		while idx < self.pollers.len() {
119			if self.pollers[idx].0 <= number {
120				let poller_sender = self.pollers.swap_remove(idx);
121				log::debug!(target: LOG_TARGET, "Sending ready signal at block {}", number);
122				let _ = poller_sender.1.send(iterator_factory());
123			} else {
124				idx += 1;
125			}
126		}
127	}
128
129	fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
130		let (sender, receiver) = oneshot::channel();
131		self.pollers.push((number, sender));
132		receiver
133	}
134
135	fn updated_at(&self) -> NumberFor<Block> {
136		self.updated_at
137	}
138}
139
140/// Type of revalidation.
141pub enum RevalidationType {
142	/// Light revalidation type.
143	///
144	/// During maintenance, transaction pool makes periodic revalidation
145	/// of all transactions depending on number of blocks or time passed.
146	/// Also this kind of revalidation does not resubmit transactions from
147	/// retracted blocks, since it is too expensive.
148	Light,
149
150	/// Full revalidation type.
151	///
152	/// During maintenance, transaction pool revalidates some fixed amount of
153	/// transactions from the pool of valid transactions.
154	Full,
155}
156
157impl<PoolApi, Block> BasicPool<PoolApi, Block>
158where
159	Block: BlockT,
160	PoolApi: graph::ChainApi<Block = Block> + 'static,
161{
162	/// Create new basic transaction pool with provided api, for tests.
163	pub fn new_test(
164		pool_api: Arc<PoolApi>,
165		best_block_hash: Block::Hash,
166		finalized_hash: Block::Hash,
167		options: graph::Options,
168	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
169		let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
170		let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
171			pool_api.clone(),
172			pool.clone(),
173			finalized_hash,
174		);
175		(
176			Self {
177				api: pool_api,
178				pool,
179				revalidation_queue: Arc::new(revalidation_queue),
180				revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
181				ready_poll: Default::default(),
182				metrics: Default::default(),
183				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
184					best_block_hash,
185					finalized_hash,
186				))),
187			},
188			background_task,
189		)
190	}
191
192	/// Create new basic transaction pool with provided api and custom
193	/// revalidation type.
194	pub fn with_revalidation_type(
195		options: graph::Options,
196		is_validator: IsValidator,
197		pool_api: Arc<PoolApi>,
198		prometheus: Option<&PrometheusRegistry>,
199		revalidation_type: RevalidationType,
200		spawner: impl SpawnEssentialNamed,
201		best_block_number: NumberFor<Block>,
202		best_block_hash: Block::Hash,
203		finalized_hash: Block::Hash,
204	) -> Self {
205		let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
206		let (revalidation_queue, background_task) = match revalidation_type {
207			RevalidationType::Light =>
208				(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
209			RevalidationType::Full => {
210				let (queue, background) = revalidation::RevalidationQueue::new_background(
211					pool_api.clone(),
212					pool.clone(),
213					finalized_hash,
214				);
215				(queue, Some(background))
216			},
217		};
218
219		if let Some(background_task) = background_task {
220			spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
221		}
222
223		Self {
224			api: pool_api,
225			pool,
226			revalidation_queue: Arc::new(revalidation_queue),
227			revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
228				RevalidationType::Light =>
229					RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
230				RevalidationType::Full => RevalidationStrategy::Always,
231			})),
232			ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
233			metrics: PrometheusMetrics::new(prometheus),
234			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
235				best_block_hash,
236				finalized_hash,
237			))),
238		}
239	}
240
241	/// Gets shared reference to the underlying pool.
242	pub fn pool(&self) -> &Arc<graph::Pool<PoolApi>> {
243		&self.pool
244	}
245
246	/// Get access to the underlying api
247	pub fn api(&self) -> &PoolApi {
248		&self.api
249	}
250}
251
252impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
253where
254	Block: BlockT,
255	PoolApi: 'static + graph::ChainApi<Block = Block>,
256{
257	type Block = PoolApi::Block;
258	type Hash = graph::ExtrinsicHash<PoolApi>;
259	type InPoolTransaction = graph::base_pool::Transaction<TxHash<Self>, TransactionFor<Self>>;
260	type Error = PoolApi::Error;
261
262	fn submit_at(
263		&self,
264		at: <Self::Block as BlockT>::Hash,
265		source: TransactionSource,
266		xts: Vec<TransactionFor<Self>>,
267	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
268		let pool = self.pool.clone();
269
270		self.metrics
271			.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
272
273		async move { pool.submit_at(at, source, xts).await }.boxed()
274	}
275
276	fn submit_one(
277		&self,
278		at: <Self::Block as BlockT>::Hash,
279		source: TransactionSource,
280		xt: TransactionFor<Self>,
281	) -> PoolFuture<TxHash<Self>, Self::Error> {
282		let pool = self.pool.clone();
283
284		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
285
286		async move { pool.submit_one(at, source, xt).await }.boxed()
287	}
288
289	fn submit_and_watch(
290		&self,
291		at: <Self::Block as BlockT>::Hash,
292		source: TransactionSource,
293		xt: TransactionFor<Self>,
294	) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
295		let pool = self.pool.clone();
296
297		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
298
299		async move {
300			let watcher = pool.submit_and_watch(at, source, xt).await?;
301
302			Ok(watcher.into_stream().boxed())
303		}
304		.boxed()
305	}
306
307	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
308		let removed = self.pool.validated_pool().remove_invalid(hashes);
309		self.metrics
310			.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
311		removed
312	}
313
314	fn status(&self) -> PoolStatus {
315		self.pool.validated_pool().status()
316	}
317
318	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
319		self.pool.validated_pool().import_notification_stream()
320	}
321
322	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
323		self.pool.hash_of(xt)
324	}
325
326	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
327		self.pool.validated_pool().on_broadcasted(propagations)
328	}
329
330	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
331		self.pool.validated_pool().ready_by_hash(hash)
332	}
333
334	fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
335		let status = self.status();
336		// If there are no transactions in the pool, it is fine to return early.
337		//
338		// There could be transaction being added because of some re-org happening at the relevant
339		// block, but this is relative unlikely.
340		if status.ready == 0 && status.future == 0 {
341			return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
342		}
343
344		if self.ready_poll.lock().updated_at() >= at {
345			log::trace!(target: LOG_TARGET, "Transaction pool already processed block  #{}", at);
346			let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
347			return async move { iterator }.boxed()
348		}
349
350		self.ready_poll
351			.lock()
352			.add(at)
353			.map(|received| {
354				received.unwrap_or_else(|e| {
355					log::warn!("Error receiving pending set: {:?}", e);
356					Box::new(std::iter::empty())
357				})
358			})
359			.boxed()
360	}
361
362	fn ready(&self) -> ReadyIteratorFor<PoolApi> {
363		Box::new(self.pool.validated_pool().ready())
364	}
365
366	fn futures(&self) -> Vec<Self::InPoolTransaction> {
367		let pool = self.pool.validated_pool().pool.read();
368
369		pool.futures().cloned().collect::<Vec<_>>()
370	}
371}
372
373impl<Block, Client> FullPool<Block, Client>
374where
375	Block: BlockT,
376	Client: sp_api::ProvideRuntimeApi<Block>
377		+ sc_client_api::BlockBackend<Block>
378		+ sc_client_api::blockchain::HeaderBackend<Block>
379		+ sp_runtime::traits::BlockIdTo<Block>
380		+ sc_client_api::ExecutorProvider<Block>
381		+ sc_client_api::UsageProvider<Block>
382		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
383		+ Send
384		+ Sync
385		+ 'static,
386	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
387{
388	/// Create new basic transaction pool for a full node with the provided api.
389	pub fn new_full(
390		options: graph::Options,
391		is_validator: IsValidator,
392		prometheus: Option<&PrometheusRegistry>,
393		spawner: impl SpawnEssentialNamed,
394		client: Arc<Client>,
395	) -> Arc<Self> {
396		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
397		let pool = Arc::new(Self::with_revalidation_type(
398			options,
399			is_validator,
400			pool_api,
401			prometheus,
402			RevalidationType::Full,
403			spawner,
404			client.usage_info().chain.best_number,
405			client.usage_info().chain.best_hash,
406			client.usage_info().chain.finalized_hash,
407		));
408
409		pool
410	}
411}
412
413impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
414	for BasicPool<FullChainApi<Client, Block>, Block>
415where
416	Block: BlockT,
417	Client: sp_api::ProvideRuntimeApi<Block>
418		+ sc_client_api::BlockBackend<Block>
419		+ sc_client_api::blockchain::HeaderBackend<Block>
420		+ sp_runtime::traits::BlockIdTo<Block>
421		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
422	Client: Send + Sync + 'static,
423	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
424{
425	type Block = Block;
426	type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
427	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
428
429	fn submit_local(
430		&self,
431		at: Block::Hash,
432		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
433	) -> Result<Self::Hash, Self::Error> {
434		use sp_runtime::{
435			traits::SaturatedConversion, transaction_validity::TransactionValidityError,
436		};
437
438		let validity = self
439			.api
440			.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
441			.map_err(|e| {
442				Self::Error::Pool(match e {
443					TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
444					TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
445				})
446			})?;
447
448		let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
449		let block_number = self
450			.api
451			.block_id_to_number(&BlockId::hash(at))?
452			.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
453
454		let validated = ValidatedTransaction::valid_at(
455			block_number.saturated_into::<u64>(),
456			hash,
457			TransactionSource::Local,
458			xt,
459			bytes,
460			validity,
461		);
462
463		self.pool.validated_pool().submit(vec![validated]).remove(0)
464	}
465}
466
467#[cfg_attr(test, derive(Debug))]
468enum RevalidationStatus<N> {
469	/// The revalidation has never been completed.
470	NotScheduled,
471	/// The revalidation is scheduled.
472	Scheduled(Option<Instant>, Option<N>),
473	/// The revalidation is in progress.
474	InProgress,
475}
476
477enum RevalidationStrategy<N> {
478	Always,
479	Light(RevalidationStatus<N>),
480}
481
482struct RevalidationAction {
483	revalidate: bool,
484	resubmit: bool,
485}
486
487impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
488	pub fn clear(&mut self) {
489		if let Self::Light(status) = self {
490			status.clear()
491		}
492	}
493
494	pub fn next(
495		&mut self,
496		block: N,
497		revalidate_time_period: Option<std::time::Duration>,
498		revalidate_block_period: Option<N>,
499	) -> RevalidationAction {
500		match self {
501			Self::Light(status) => RevalidationAction {
502				revalidate: status.next_required(
503					block,
504					revalidate_time_period,
505					revalidate_block_period,
506				),
507				resubmit: false,
508			},
509			Self::Always => RevalidationAction { revalidate: true, resubmit: true },
510		}
511	}
512}
513
514impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
515	/// Called when revalidation is completed.
516	pub fn clear(&mut self) {
517		*self = Self::NotScheduled;
518	}
519
520	/// Returns true if revalidation is required.
521	pub fn next_required(
522		&mut self,
523		block: N,
524		revalidate_time_period: Option<std::time::Duration>,
525		revalidate_block_period: Option<N>,
526	) -> bool {
527		match *self {
528			Self::NotScheduled => {
529				*self = Self::Scheduled(
530					revalidate_time_period.map(|period| Instant::now() + period),
531					revalidate_block_period.map(|period| block + period),
532				);
533				false
534			},
535			Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
536				let is_required =
537					revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
538						revalidate_at_block.map(|at| block >= at).unwrap_or(false);
539				if is_required {
540					*self = Self::InProgress;
541				}
542				is_required
543			},
544			Self::InProgress => false,
545		}
546	}
547}
548
549/// Prune the known txs for the given block.
550async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = Block>>(
551	block_hash: Block::Hash,
552	api: &Api,
553	pool: &graph::Pool<Api>,
554) -> Vec<ExtrinsicHash<Api>> {
555	let extrinsics = api
556		.block_body(block_hash)
557		.await
558		.unwrap_or_else(|e| {
559			log::warn!("Prune known transactions: error request: {}", e);
560			None
561		})
562		.unwrap_or_default();
563
564	let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
565
566	log::trace!(target: LOG_TARGET, "Pruning transactions: {:?}", hashes);
567
568	let header = match api.block_header(block_hash) {
569		Ok(Some(h)) => h,
570		Ok(None) => {
571			log::debug!(target: LOG_TARGET, "Could not find header for {:?}.", block_hash);
572			return hashes
573		},
574		Err(e) => {
575			log::debug!(target: LOG_TARGET, "Error retrieving header for {:?}: {}", block_hash, e);
576			return hashes
577		},
578	};
579
580	if let Err(e) = pool.prune(block_hash, *header.parent_hash(), &extrinsics).await {
581		log::error!("Cannot prune known in the pool: {}", e);
582	}
583
584	hashes
585}
586
587impl<PoolApi, Block> BasicPool<PoolApi, Block>
588where
589	Block: BlockT,
590	PoolApi: 'static + graph::ChainApi<Block = Block>,
591{
592	/// Handles enactment and retraction of blocks, prunes stale transactions
593	/// (that have already been enacted) and resubmits transactions that were
594	/// retracted.
595	async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
596		log::trace!(target: LOG_TARGET, "handle_enactment tree_route: {tree_route:?}");
597		let pool = self.pool.clone();
598		let api = self.api.clone();
599
600		let (hash, block_number) = match tree_route.last() {
601			Some(HashAndNumber { hash, number }) => (hash, number),
602			None => {
603				log::warn!(
604					target: LOG_TARGET,
605					"Skipping ChainEvent - no last block in tree route {:?}",
606					tree_route,
607				);
608				return
609			},
610		};
611
612		let next_action = self.revalidation_strategy.lock().next(
613			*block_number,
614			Some(std::time::Duration::from_secs(60)),
615			Some(20u32.into()),
616		);
617
618		// We keep track of everything we prune so that later we won't add
619		// transactions with those hashes from the retracted blocks.
620		let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
621
622		// If there is a tree route, we use this to prune known tx based on the enacted
623		// blocks. Before pruning enacted transactions, we inform the listeners about
624		// retracted blocks and their transactions. This order is important, because
625		// if we enact and retract the same transaction at the same time, we want to
626		// send first the retract and than the prune event.
627		for retracted in tree_route.retracted() {
628			// notify txs awaiting finality that it has been retracted
629			pool.validated_pool().on_block_retracted(retracted.hash);
630		}
631
632		future::join_all(
633			tree_route
634				.enacted()
635				.iter()
636				.map(|h| prune_known_txs_for_block(h.hash, &*api, &*pool)),
637		)
638		.await
639		.into_iter()
640		.for_each(|enacted_log| {
641			pruned_log.extend(enacted_log);
642		});
643
644		self.metrics
645			.report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
646
647		if next_action.resubmit {
648			let mut resubmit_transactions = Vec::new();
649
650			for retracted in tree_route.retracted() {
651				let hash = retracted.hash;
652
653				let block_transactions = api
654					.block_body(hash)
655					.await
656					.unwrap_or_else(|e| {
657						log::warn!("Failed to fetch block body: {}", e);
658						None
659					})
660					.unwrap_or_default()
661					.into_iter()
662					.filter(|tx| tx.is_signed().unwrap_or(true));
663
664				let mut resubmitted_to_report = 0;
665
666				resubmit_transactions.extend(block_transactions.into_iter().filter(|tx| {
667					let tx_hash = pool.hash_of(tx);
668					let contains = pruned_log.contains(&tx_hash);
669
670					// need to count all transactions, not just filtered, here
671					resubmitted_to_report += 1;
672
673					if !contains {
674						log::debug!(
675							target: LOG_TARGET,
676							"[{:?}]: Resubmitting from retracted block {:?}",
677							tx_hash,
678							hash,
679						);
680					}
681					!contains
682				}));
683
684				self.metrics.report(|metrics| {
685					metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
686				});
687			}
688
689			if let Err(e) = pool
690				.resubmit_at(
691					*hash,
692					// These transactions are coming from retracted blocks, we should
693					// simply consider them external.
694					TransactionSource::External,
695					resubmit_transactions,
696				)
697				.await
698			{
699				log::debug!(
700					target: LOG_TARGET,
701					"[{:?}] Error re-submitting transactions: {}",
702					hash,
703					e,
704				)
705			}
706		}
707
708		let extra_pool = pool.clone();
709		// After #5200 lands, this arguably might be moved to the
710		// handler of "all blocks notification".
711		self.ready_poll
712			.lock()
713			.trigger(*block_number, move || Box::new(extra_pool.validated_pool().ready()));
714
715		if next_action.revalidate {
716			let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
717			self.revalidation_queue.revalidate_later(*hash, hashes).await;
718
719			self.revalidation_strategy.lock().clear();
720		}
721	}
722}
723
724#[async_trait]
725impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
726where
727	Block: BlockT,
728	PoolApi: 'static + graph::ChainApi<Block = Block>,
729{
730	async fn maintain(&self, event: ChainEvent<Self::Block>) {
731		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
732		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
733			match self.api.tree_route(from, to) {
734				Ok(tree_route) => Ok(tree_route),
735				Err(e) =>
736					return Err(format!(
737						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
738					)),
739			}
740		};
741		let block_id_to_number =
742			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
743
744		let result =
745			self.enactment_state
746				.lock()
747				.update(&event, &compute_tree_route, &block_id_to_number);
748
749		match result {
750			Err(msg) => {
751				log::debug!(target: LOG_TARGET, "{msg}");
752				self.enactment_state.lock().force_update(&event);
753			},
754			Ok(EnactmentAction::Skip) => return,
755			Ok(EnactmentAction::HandleFinalization) => {},
756			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
757				self.handle_enactment(tree_route).await;
758			},
759		};
760
761		if let ChainEvent::Finalized { hash, tree_route } = event {
762			log::trace!(
763				target: LOG_TARGET,
764				"on-finalized enacted: {tree_route:?}, previously finalized: \
765				{prev_finalized_block:?}",
766			);
767
768			for hash in tree_route.iter().chain(std::iter::once(&hash)) {
769				if let Err(e) = self.pool.validated_pool().on_block_finalized(*hash).await {
770					log::warn!(
771						target: LOG_TARGET,
772						"Error occurred while attempting to notify watchers about finalization {}: {}",
773						hash, e
774					)
775				}
776			}
777		}
778	}
779}
780
781/// Inform the transaction pool about imported and finalized blocks.
782pub async fn notification_future<Client, Pool, Block>(client: Arc<Client>, txpool: Arc<Pool>)
783where
784	Block: BlockT,
785	Client: sc_client_api::BlockchainEvents<Block>,
786	Pool: MaintainedTransactionPool<Block = Block>,
787{
788	let import_stream = client
789		.import_notification_stream()
790		.filter_map(|n| ready(n.try_into().ok()))
791		.fuse();
792	let finality_stream = client.finality_notification_stream().map(Into::into).fuse();
793
794	futures::stream::select(import_stream, finality_stream)
795		.for_each(|evt| txpool.maintain(evt))
796		.await
797}