referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
view.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool view.
20//!
21//! The View represents the state of the transaction pool at given block. The view is created when
22//! new block is notified to transaction pool. Views are removed on finalization.
23//!
24//! Refer to [*View*](../index.html#view) section for more details.
25
26use super::metrics::MetricsLink as PrometheusMetrics;
27use crate::{
28	common::tracing_log_xt::log_xt_trace,
29	graph::{
30		self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash,
31		IsValidator, TransactionFor, ValidateTransactionPriority, ValidatedPoolSubmitOutcome,
32		ValidatedTransaction, ValidatedTransactionFor,
33	},
34	LOG_TARGET,
35};
36use indexmap::IndexMap;
37use parking_lot::Mutex;
38use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus};
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use sp_blockchain::HashAndNumber;
41use sp_runtime::{
42	generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
43	SaturatedConversion,
44};
45use std::{sync::Arc, time::Instant};
46use tracing::{debug, instrument, trace, Level};
47
48pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> {
49	revalidated: IndexMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
50	invalid_hashes: Vec<ExtrinsicHash<ChainApi>>,
51}
52
53/// Used to obtain result from RevalidationWorker on View side.
54pub(super) type RevalidationResultReceiver<ChainApi> =
55	tokio::sync::mpsc::Receiver<RevalidationResult<ChainApi>>;
56
57/// Used to send revalidation result from RevalidationWorker to View.
58pub(super) type RevalidationResultSender<ChainApi> =
59	tokio::sync::mpsc::Sender<RevalidationResult<ChainApi>>;
60
61/// Used to receive finish-revalidation-request from View on RevalidationWorker side.
62pub(super) type FinishRevalidationRequestReceiver = tokio::sync::mpsc::Receiver<()>;
63
64/// Used to send finish-revalidation-request from View to RevalidationWorker.
65pub(super) type FinishRevalidationRequestSender = tokio::sync::mpsc::Sender<()>;
66
67/// Endpoints of channels used on View side (maintain thread)
68pub(super) struct FinishRevalidationLocalChannels<ChainApi: graph::ChainApi> {
69	/// Used to send finish revalidation request.
70	finish_revalidation_request_tx: Option<FinishRevalidationRequestSender>,
71	/// Used to receive revalidation results.
72	revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
73}
74
75impl<ChainApi: graph::ChainApi> FinishRevalidationLocalChannels<ChainApi> {
76	/// Creates a new instance of endpoints for channels used on View side
77	pub fn new(
78		finish_revalidation_request_tx: FinishRevalidationRequestSender,
79		revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
80	) -> Self {
81		Self {
82			finish_revalidation_request_tx: Some(finish_revalidation_request_tx),
83			revalidation_result_rx,
84		}
85	}
86
87	/// Removes a finish revalidation sender
88	///
89	/// Should be called when revalidation was already terminated and finish revalidation message is
90	/// no longer expected.
91	fn remove_sender(&mut self) {
92		self.finish_revalidation_request_tx = None;
93	}
94}
95
96/// Endpoints of channels used on `RevalidationWorker` side (background thread)
97pub(super) struct FinishRevalidationWorkerChannels<ChainApi: graph::ChainApi> {
98	/// Used to receive finish revalidation request.
99	finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
100	/// Used to send revalidation results.
101	revalidation_result_tx: RevalidationResultSender<ChainApi>,
102}
103
104impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> {
105	/// Creates a new instance of endpoints for channels used on `RevalidationWorker` side
106	pub fn new(
107		finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
108		revalidation_result_tx: RevalidationResultSender<ChainApi>,
109	) -> Self {
110		Self { finish_revalidation_request_rx, revalidation_result_tx }
111	}
112}
113
114/// Single event used in aggregated stream. Tuple containing hash of transactions and its status.
115pub(super) type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>);
116/// Warning threshold for (unbounded) channel used in aggregated view's streams.
117const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000;
118
119/// Stream of events providing statuses of all the transactions within the pool.
120pub(super) type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
121
122/// Type alias for a stream of events intended to track dropped transactions.
123type DroppedMonitoringStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
124
125/// Notification handler for transactions updates triggered in `ValidatedPool`.
126///
127/// `ViewPoolObserver` handles transaction status changes notifications coming from an instance of
128/// validated pool associated with the `View` and forwards them through specified channels
129/// into the View's streams.
130pub(super) struct ViewPoolObserver<ChainApi: graph::ChainApi> {
131	/// The sink used to notify dropped by enforcing limits or by being usurped, or invalid
132	/// transactions.
133	///
134	/// Note: Ready and future statuses are alse communicated through this channel, enabling the
135	/// stream consumer to track views that reference the transaction.
136	dropped_stream_sink: TracingUnboundedSender<
137		TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
138	>,
139
140	/// The sink of the single, merged stream providing updates for all the transactions in the
141	/// associated pool.
142	///
143	/// Note: some of the events which are currently ignored on the other side of this channel
144	/// (external watcher) are not relayed.
145	aggregated_stream_sink: TracingUnboundedSender<
146		TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
147	>,
148}
149
150impl<C: graph::ChainApi> graph::EventHandler<C> for ViewPoolObserver<C> {
151	// note: skipped, notified by ForkAwareTxPool directly to multi view listener.
152	fn broadcasted(&self, _: ExtrinsicHash<C>, _: Vec<String>) {}
153	fn dropped(&self, _: ExtrinsicHash<C>) {}
154	fn finalized(&self, _: ExtrinsicHash<C>, _: BlockHash<C>, _: usize) {}
155	fn retracted(&self, _: ExtrinsicHash<C>, _: BlockHash<C>) {
156		// note: [#5479], we do not send to aggregated stream.
157	}
158
159	fn ready(&self, tx: ExtrinsicHash<C>) {
160		let status = TransactionStatus::Ready;
161		self.send_to_dropped_stream_sink(tx, status.clone());
162		self.send_to_aggregated_stream_sink(tx, status);
163	}
164
165	fn future(&self, tx: ExtrinsicHash<C>) {
166		let status = TransactionStatus::Future;
167		self.send_to_dropped_stream_sink(tx, status.clone());
168		self.send_to_aggregated_stream_sink(tx, status);
169	}
170
171	fn limits_enforced(&self, tx: ExtrinsicHash<C>) {
172		self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped);
173	}
174
175	fn usurped(&self, tx: ExtrinsicHash<C>, by: ExtrinsicHash<C>) {
176		self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by));
177	}
178
179	fn invalid(&self, tx: ExtrinsicHash<C>) {
180		self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid);
181	}
182
183	fn pruned(&self, tx: ExtrinsicHash<C>, block_hash: BlockHash<C>, tx_index: usize) {
184		self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index)));
185	}
186
187	fn finality_timeout(&self, tx: ExtrinsicHash<C>, hash: BlockHash<C>) {
188		//todo: do we need this? [related issue: #5482]
189		self.send_to_aggregated_stream_sink(tx, TransactionStatus::FinalityTimeout(hash));
190	}
191}
192
193impl<ChainApi: graph::ChainApi> ViewPoolObserver<ChainApi> {
194	/// Creates an instance of `ViewPoolObserver` together with associated view's streams.
195	///
196	/// This methods creates an event handler that shall be registered in the `ValidatedPool`
197	/// instance associated with the view. It also creates new view's streams:
198	/// - a single stream intended to watch dropped transactions only. The stream can be used to
199	///   subscribe to events related to dropping of all extrinsics in the pool.
200	/// - a single merged stream for all extrinsics in the associated pool. The stream can be used
201	/// to subscribe to life-cycle events of all extrinsics in the pool. For fork-aware
202	/// pool implementation this approach seems to be more efficient than using individual
203	/// streams for every transaction.
204	fn new() -> (
205		Self,
206		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
207		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
208	) {
209		let (dropped_stream_sink, dropped_stream) =
210			tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD);
211		let (aggregated_stream_sink, aggregated_stream) =
212			tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD);
213
214		(Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream)
215	}
216
217	/// Sends given event to the `dropped_stream_sink`.
218	fn send_to_dropped_stream_sink(
219		&self,
220		tx: ExtrinsicHash<ChainApi>,
221		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
222	) {
223		if let Err(e) = self.dropped_stream_sink.unbounded_send((tx, status.clone())) {
224			trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e);
225		}
226	}
227
228	/// Sends given event to the `aggregated_stream_sink`.
229	fn send_to_aggregated_stream_sink(
230		&self,
231		tx: ExtrinsicHash<ChainApi>,
232		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
233	) {
234		if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx, status.clone())) {
235			trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e);
236		}
237	}
238}
239
240/// Represents the state of transaction pool for given block.
241///
242/// Refer to [*View*](../index.html#view) section for more details on the purpose and life cycle of
243/// the `View`.
244pub(super) struct View<ChainApi: graph::ChainApi> {
245	/// The internal pool keeping the set of ready and future transaction at the given block.
246	pub(super) pool: graph::Pool<ChainApi, ViewPoolObserver<ChainApi>>,
247	/// The hash and number of the block with which this view is associated.
248	pub(super) at: HashAndNumber<ChainApi::Block>,
249	/// Endpoints of communication channel with background worker.
250	revalidation_worker_channels: Mutex<Option<FinishRevalidationLocalChannels<ChainApi>>>,
251	/// Prometheus's metrics endpoint.
252	metrics: PrometheusMetrics,
253}
254
255impl<ChainApi> View<ChainApi>
256where
257	ChainApi: graph::ChainApi,
258	<ChainApi::Block as BlockT>::Hash: Unpin,
259{
260	/// Creates a new empty view.
261	pub(super) fn new(
262		api: Arc<ChainApi>,
263		at: HashAndNumber<ChainApi::Block>,
264		options: graph::Options,
265		metrics: PrometheusMetrics,
266		is_validator: IsValidator,
267	) -> (
268		Self,
269		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
270		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
271	) {
272		metrics.report(|metrics| metrics.non_cloned_views.inc());
273		let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
274		(
275			Self {
276				pool: graph::Pool::new_with_event_handler(
277					options,
278					is_validator,
279					api,
280					event_handler,
281				),
282				at,
283				revalidation_worker_channels: Mutex::from(None),
284				metrics,
285			},
286			dropped_stream,
287			aggregated_stream,
288		)
289	}
290
291	/// Creates a copy of the other view.
292	pub(super) fn new_from_other(
293		&self,
294		at: &HashAndNumber<ChainApi::Block>,
295	) -> (
296		Self,
297		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
298		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
299	) {
300		let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
301		(
302			View {
303				at: at.clone(),
304				pool: self.pool.deep_clone_with_event_handler(event_handler),
305				revalidation_worker_channels: Mutex::from(None),
306				metrics: self.metrics.clone(),
307			},
308			dropped_stream,
309			aggregated_stream,
310		)
311	}
312
313	/// Imports single unvalidated extrinsic into the view.
314	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view::submit_one")]
315	pub(super) async fn submit_one(
316		&self,
317		source: TimedTransactionSource,
318		xt: ExtrinsicFor<ChainApi>,
319		validation_priority: ValidateTransactionPriority,
320	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
321		self.submit_many(std::iter::once((source, xt)), validation_priority)
322			.await
323			.pop()
324			.expect("There is exactly one result, qed.")
325	}
326
327	/// Imports many unvalidated extrinsics into the view.
328	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view::submit_many")]
329	pub(super) async fn submit_many(
330		&self,
331		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
332		validation_priority: ValidateTransactionPriority,
333	) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
334		if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) {
335			let xts = xts.into_iter().collect::<Vec<_>>();
336			log_xt_trace!(
337				target: LOG_TARGET,
338				xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0),
339				"view::submit_many at:{}",
340				self.at.hash
341			);
342			self.pool.submit_at(&self.at, xts, validation_priority).await
343		} else {
344			self.pool.submit_at(&self.at, xts, validation_priority).await
345		}
346	}
347
348	/// Synchronously imports single unvalidated extrinsics into the view.
349	pub(super) fn submit_local(
350		&self,
351		xt: ExtrinsicFor<ChainApi>,
352	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
353		let (tx_hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
354		trace!(
355			target: LOG_TARGET,
356			?tx_hash,
357			view_at_hash = ?self.at.hash,
358			"view::submit_local"
359		);
360		let validity = self
361			.pool
362			.validated_pool()
363			.api()
364			.validate_transaction_blocking(
365				self.at.hash,
366				sc_transaction_pool_api::TransactionSource::Local,
367				Arc::from(xt.clone()),
368			)?
369			.map_err(|e| {
370				match e {
371					TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
372					TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
373				}
374				.into()
375			})?;
376
377		let block_number = self
378			.pool
379			.validated_pool()
380			.api()
381			.block_id_to_number(&BlockId::hash(self.at.hash))?
382			.ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;
383
384		let validated = ValidatedTransaction::valid_at(
385			block_number.saturated_into::<u64>(),
386			tx_hash,
387			TimedTransactionSource::new_local(true),
388			Arc::from(xt),
389			length,
390			validity,
391		);
392
393		self.pool.validated_pool().submit(vec![validated]).remove(0)
394	}
395
396	/// Status of the pool associated with the view.
397	pub(super) fn status(&self) -> PoolStatus {
398		self.pool.validated_pool().status()
399	}
400
401	/// Revalidates some part of transaction from the internal pool.
402	///
403	/// Intended to be called from the revalidation worker. The revalidation process can be
404	/// terminated by sending a message to the `rx` channel provided within
405	/// `finish_revalidation_worker_channels`. Revalidation results are sent back over the `tx`
406	/// channels and shall be applied in maintain thread.
407	///
408	/// View revalidation currently is not throttled, and until not terminated it will revalidate
409	/// all the transactions. Note: this can be improved if CPU usage due to revalidation becomes a
410	/// problem.
411	pub(super) async fn revalidate(
412		&self,
413		finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<ChainApi>,
414	) {
415		let FinishRevalidationWorkerChannels {
416			mut finish_revalidation_request_rx,
417			revalidation_result_tx,
418		} = finish_revalidation_worker_channels;
419
420		debug!(
421			target: LOG_TARGET,
422			at_hash = ?self.at.hash,
423			"view::revalidate: at starting"
424		);
425		let start = Instant::now();
426		let validated_pool = self.pool.validated_pool();
427		let api = validated_pool.api();
428
429		let batch: Vec<_> = validated_pool.ready().collect();
430		let batch_len = batch.len();
431
432		//todo: sort batch by revalidation timestamp | maybe not needed at all? xts will be getting
433		//out of the view...
434		//todo: revalidate future, remove if invalid [#5496]
435
436		let mut invalid_hashes = Vec::new();
437		let mut revalidated = IndexMap::new();
438
439		let mut validation_results = vec![];
440		let mut batch_iter = batch.into_iter();
441		loop {
442			let mut should_break = false;
443			tokio::select! {
444				_ = finish_revalidation_request_rx.recv() => {
445					debug!(
446						target: LOG_TARGET,
447						at_hash = ?self.at.hash,
448						"view::revalidate: finish revalidation request received"
449					);
450					break
451				}
452				_ = async {
453					if let Some(tx) = batch_iter.next() {
454						let validation_result = (
455							api.validate_transaction(self.at.hash,
456								tx.source.clone().into(), tx.data.clone(),
457								ValidateTransactionPriority::Maintained).await,
458							tx.hash,
459							tx
460						);
461						validation_results.push(validation_result);
462					} else {
463						self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
464						should_break = true;
465					}
466				} => {}
467			}
468
469			if should_break {
470				break;
471			}
472		}
473
474		let revalidation_duration = start.elapsed();
475		self.metrics.report(|metrics| {
476			metrics.view_revalidation_duration.observe(revalidation_duration.as_secs_f64());
477		});
478		debug!(
479			target: LOG_TARGET,
480			at_hash = ?self.at.hash,
481			count = validation_results.len(),
482			batch_len,
483			duration = ?revalidation_duration,
484			"view::revalidate"
485		);
486		log_xt_trace!(
487			data:tuple,
488			target:LOG_TARGET,
489			validation_results.iter().map(|x| (x.1, &x.0)),
490			"view::revalidate result: {:?}"
491		);
492		for (validation_result, tx_hash, tx) in validation_results {
493			match validation_result {
494				Ok(Err(TransactionValidityError::Invalid(_))) => {
495					invalid_hashes.push(tx_hash);
496				},
497				Ok(Ok(validity)) => {
498					revalidated.insert(
499						tx_hash,
500						ValidatedTransaction::valid_at(
501							self.at.number.saturated_into::<u64>(),
502							tx_hash,
503							tx.source.clone(),
504							tx.data.clone(),
505							api.hash_and_length(&tx.data).1,
506							validity,
507						),
508					);
509				},
510				Ok(Err(TransactionValidityError::Unknown(error))) => {
511					trace!(
512						target: LOG_TARGET,
513						?tx_hash,
514						?error,
515						"Removing. Cannot determine transaction validity"
516					);
517					invalid_hashes.push(tx_hash);
518				},
519				Err(error) => {
520					trace!(
521						target: LOG_TARGET,
522						?tx_hash,
523						%error,
524						"Removing due to error during revalidation"
525					);
526					invalid_hashes.push(tx_hash);
527				},
528			}
529		}
530
531		debug!(
532			target: LOG_TARGET,
533			at_hash = ?self.at.hash,
534			"view::revalidate: sending revalidation result"
535		);
536		if let Err(error) = revalidation_result_tx
537			.send(RevalidationResult { invalid_hashes, revalidated })
538			.await
539		{
540			trace!(
541				target: LOG_TARGET,
542				at_hash = ?self.at.hash,
543				?error,
544				"view::revalidate: sending revalidation_result failed"
545			);
546		}
547	}
548
549	/// Sends revalidation request to the background worker.
550	///
551	/// Creates communication channels required to stop revalidation request and receive the
552	/// revalidation results and sends the revalidation request to the background worker.
553	///
554	/// Intended to be called from maintain thread, at the very end of the maintain process.
555	///
556	/// Refer to [*View revalidation*](../index.html#view-revalidation) for more details.
557	pub(super) async fn start_background_revalidation(
558		view: Arc<Self>,
559		revalidation_queue: Arc<
560			super::revalidation_worker::RevalidationQueue<ChainApi, ChainApi::Block>,
561		>,
562	) {
563		debug!(
564			target: LOG_TARGET,
565			at_hash = ?view.at.hash,
566			"view::start_background_revalidation"
567		);
568		let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
569			tokio::sync::mpsc::channel(1);
570		let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
571
572		let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
573			finish_revalidation_request_rx,
574			revalidation_result_tx,
575		);
576
577		let finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
578			finish_revalidation_request_tx,
579			revalidation_result_rx,
580		);
581
582		*view.revalidation_worker_channels.lock() = Some(finish_revalidation_local_channels);
583		revalidation_queue
584			.revalidate_view(view.clone(), finish_revalidation_worker_channels)
585			.await;
586	}
587
588	/// Terminates a background view revalidation.
589	///
590	/// Receives the results from the background worker and applies them to the internal pool.
591	/// Intended to be called from the maintain thread, at the very beginning of the maintain
592	/// process, before the new view is cloned and updated. Applying results before cloning ensures
593	/// that view contains up-to-date set of revalidated transactions.
594	///
595	/// Refer to [*View revalidation*](../index.html#view-revalidation) for more details.
596	pub(super) async fn finish_revalidation(&self) {
597		trace!(
598			target: LOG_TARGET,
599			at_hash = ?self.at.hash,
600			"view::finish_revalidation"
601		);
602		let Some(revalidation_worker_channels) = self.revalidation_worker_channels.lock().take()
603		else {
604			trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx");
605			return
606		};
607
608		let FinishRevalidationLocalChannels {
609			finish_revalidation_request_tx,
610			mut revalidation_result_rx,
611		} = revalidation_worker_channels;
612
613		if let Some(finish_revalidation_request_tx) = finish_revalidation_request_tx {
614			if let Err(error) = finish_revalidation_request_tx.send(()).await {
615				trace!(
616					target: LOG_TARGET,
617					at_hash = ?self.at.hash,
618					%error,
619					"view::finish_revalidation: sending cancellation request failed"
620				);
621			}
622		}
623
624		if let Some(revalidation_result) = revalidation_result_rx.recv().await {
625			let start = Instant::now();
626			let revalidated_len = revalidation_result.revalidated.len();
627			let validated_pool = self.pool.validated_pool();
628			validated_pool.remove_invalid(&revalidation_result.invalid_hashes);
629			if revalidated_len > 0 {
630				self.pool.resubmit(revalidation_result.revalidated);
631			}
632
633			self.metrics.report(|metrics| {
634				let _ = (
635					revalidation_result
636						.invalid_hashes
637						.len()
638						.try_into()
639						.map(|v| metrics.view_revalidation_invalid_txs.inc_by(v)),
640					revalidated_len
641						.try_into()
642						.map(|v| metrics.view_revalidation_resubmitted_txs.inc_by(v)),
643				);
644			});
645
646			debug!(
647				target: LOG_TARGET,
648				invalid = revalidation_result.invalid_hashes.len(),
649				revalidated = revalidated_len,
650				at_hash = ?self.at.hash,
651				duration = ?start.elapsed(),
652				"view::finish_revalidation: applying revalidation result"
653			);
654		}
655	}
656
657	/// Returns true if the transaction with given hash is already imported into the view.
658	pub(super) fn is_imported(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
659		const IGNORE_BANNED: bool = false;
660		self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
661	}
662
663	/// Removes the whole transaction subtree from the inner pool.
664	///
665	/// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details.
666	pub fn remove_subtree<F>(
667		&self,
668		hashes: &[ExtrinsicHash<ChainApi>],
669		ban_transactions: bool,
670		listener_action: F,
671	) -> Vec<TransactionFor<ChainApi>>
672	where
673		F: Fn(
674			&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
675			ExtrinsicHash<ChainApi>,
676		),
677	{
678		self.pool
679			.validated_pool()
680			.remove_subtree(hashes, ban_transactions, listener_action)
681	}
682}