1use super::metrics::MetricsLink as PrometheusMetrics;
27use crate::{
28 common::tracing_log_xt::log_xt_trace,
29 graph::{
30 self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash,
31 IsValidator, TransactionFor, ValidateTransactionPriority, ValidatedPoolSubmitOutcome,
32 ValidatedTransaction, ValidatedTransactionFor,
33 },
34 LOG_TARGET,
35};
36use indexmap::IndexMap;
37use parking_lot::Mutex;
38use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus};
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use sp_blockchain::HashAndNumber;
41use sp_runtime::{
42 generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
43 SaturatedConversion,
44};
45use std::{sync::Arc, time::Instant};
46use tracing::{debug, instrument, trace, Level};
47
48pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> {
49 revalidated: IndexMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
50 invalid_hashes: Vec<ExtrinsicHash<ChainApi>>,
51}
52
53pub(super) type RevalidationResultReceiver<ChainApi> =
55 tokio::sync::mpsc::Receiver<RevalidationResult<ChainApi>>;
56
57pub(super) type RevalidationResultSender<ChainApi> =
59 tokio::sync::mpsc::Sender<RevalidationResult<ChainApi>>;
60
61pub(super) type FinishRevalidationRequestReceiver = tokio::sync::mpsc::Receiver<()>;
63
64pub(super) type FinishRevalidationRequestSender = tokio::sync::mpsc::Sender<()>;
66
67pub(super) struct FinishRevalidationLocalChannels<ChainApi: graph::ChainApi> {
69 finish_revalidation_request_tx: Option<FinishRevalidationRequestSender>,
71 revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
73}
74
75impl<ChainApi: graph::ChainApi> FinishRevalidationLocalChannels<ChainApi> {
76 pub fn new(
78 finish_revalidation_request_tx: FinishRevalidationRequestSender,
79 revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
80 ) -> Self {
81 Self {
82 finish_revalidation_request_tx: Some(finish_revalidation_request_tx),
83 revalidation_result_rx,
84 }
85 }
86
87 fn remove_sender(&mut self) {
92 self.finish_revalidation_request_tx = None;
93 }
94}
95
96pub(super) struct FinishRevalidationWorkerChannels<ChainApi: graph::ChainApi> {
98 finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
100 revalidation_result_tx: RevalidationResultSender<ChainApi>,
102}
103
104impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> {
105 pub fn new(
107 finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
108 revalidation_result_tx: RevalidationResultSender<ChainApi>,
109 ) -> Self {
110 Self { finish_revalidation_request_rx, revalidation_result_tx }
111 }
112}
113
114pub(super) type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>);
116const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000;
118
119pub(super) type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
121
122type DroppedMonitoringStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
124
125pub(super) struct ViewPoolObserver<ChainApi: graph::ChainApi> {
131 dropped_stream_sink: TracingUnboundedSender<
137 TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
138 >,
139
140 aggregated_stream_sink: TracingUnboundedSender<
146 TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
147 >,
148}
149
150impl<C: graph::ChainApi> graph::EventHandler<C> for ViewPoolObserver<C> {
151 fn broadcasted(&self, _: ExtrinsicHash<C>, _: Vec<String>) {}
153 fn dropped(&self, _: ExtrinsicHash<C>) {}
154 fn finalized(&self, _: ExtrinsicHash<C>, _: BlockHash<C>, _: usize) {}
155 fn retracted(&self, _: ExtrinsicHash<C>, _: BlockHash<C>) {
156 }
158
159 fn ready(&self, tx: ExtrinsicHash<C>) {
160 let status = TransactionStatus::Ready;
161 self.send_to_dropped_stream_sink(tx, status.clone());
162 self.send_to_aggregated_stream_sink(tx, status);
163 }
164
165 fn future(&self, tx: ExtrinsicHash<C>) {
166 let status = TransactionStatus::Future;
167 self.send_to_dropped_stream_sink(tx, status.clone());
168 self.send_to_aggregated_stream_sink(tx, status);
169 }
170
171 fn limits_enforced(&self, tx: ExtrinsicHash<C>) {
172 self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped);
173 }
174
175 fn usurped(&self, tx: ExtrinsicHash<C>, by: ExtrinsicHash<C>) {
176 self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by));
177 }
178
179 fn invalid(&self, tx: ExtrinsicHash<C>) {
180 self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid);
181 }
182
183 fn pruned(&self, tx: ExtrinsicHash<C>, block_hash: BlockHash<C>, tx_index: usize) {
184 self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index)));
185 }
186
187 fn finality_timeout(&self, tx: ExtrinsicHash<C>, hash: BlockHash<C>) {
188 self.send_to_aggregated_stream_sink(tx, TransactionStatus::FinalityTimeout(hash));
190 }
191}
192
193impl<ChainApi: graph::ChainApi> ViewPoolObserver<ChainApi> {
194 fn new() -> (
205 Self,
206 DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
207 AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
208 ) {
209 let (dropped_stream_sink, dropped_stream) =
210 tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD);
211 let (aggregated_stream_sink, aggregated_stream) =
212 tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD);
213
214 (Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream)
215 }
216
217 fn send_to_dropped_stream_sink(
219 &self,
220 tx: ExtrinsicHash<ChainApi>,
221 status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
222 ) {
223 if let Err(e) = self.dropped_stream_sink.unbounded_send((tx, status.clone())) {
224 trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e);
225 }
226 }
227
228 fn send_to_aggregated_stream_sink(
230 &self,
231 tx: ExtrinsicHash<ChainApi>,
232 status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
233 ) {
234 if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx, status.clone())) {
235 trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e);
236 }
237 }
238}
239
240pub(super) struct View<ChainApi: graph::ChainApi> {
245 pub(super) pool: graph::Pool<ChainApi, ViewPoolObserver<ChainApi>>,
247 pub(super) at: HashAndNumber<ChainApi::Block>,
249 revalidation_worker_channels: Mutex<Option<FinishRevalidationLocalChannels<ChainApi>>>,
251 metrics: PrometheusMetrics,
253}
254
255impl<ChainApi> View<ChainApi>
256where
257 ChainApi: graph::ChainApi,
258 <ChainApi::Block as BlockT>::Hash: Unpin,
259{
260 pub(super) fn new(
262 api: Arc<ChainApi>,
263 at: HashAndNumber<ChainApi::Block>,
264 options: graph::Options,
265 metrics: PrometheusMetrics,
266 is_validator: IsValidator,
267 ) -> (
268 Self,
269 DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
270 AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
271 ) {
272 metrics.report(|metrics| metrics.non_cloned_views.inc());
273 let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
274 (
275 Self {
276 pool: graph::Pool::new_with_event_handler(
277 options,
278 is_validator,
279 api,
280 event_handler,
281 ),
282 at,
283 revalidation_worker_channels: Mutex::from(None),
284 metrics,
285 },
286 dropped_stream,
287 aggregated_stream,
288 )
289 }
290
291 pub(super) fn new_from_other(
293 &self,
294 at: &HashAndNumber<ChainApi::Block>,
295 ) -> (
296 Self,
297 DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
298 AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
299 ) {
300 let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
301 (
302 View {
303 at: at.clone(),
304 pool: self.pool.deep_clone_with_event_handler(event_handler),
305 revalidation_worker_channels: Mutex::from(None),
306 metrics: self.metrics.clone(),
307 },
308 dropped_stream,
309 aggregated_stream,
310 )
311 }
312
313 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view::submit_one")]
315 pub(super) async fn submit_one(
316 &self,
317 source: TimedTransactionSource,
318 xt: ExtrinsicFor<ChainApi>,
319 validation_priority: ValidateTransactionPriority,
320 ) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
321 self.submit_many(std::iter::once((source, xt)), validation_priority)
322 .await
323 .pop()
324 .expect("There is exactly one result, qed.")
325 }
326
327 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view::submit_many")]
329 pub(super) async fn submit_many(
330 &self,
331 xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
332 validation_priority: ValidateTransactionPriority,
333 ) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
334 if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) {
335 let xts = xts.into_iter().collect::<Vec<_>>();
336 log_xt_trace!(
337 target: LOG_TARGET,
338 xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0),
339 "view::submit_many at:{}",
340 self.at.hash
341 );
342 self.pool.submit_at(&self.at, xts, validation_priority).await
343 } else {
344 self.pool.submit_at(&self.at, xts, validation_priority).await
345 }
346 }
347
348 pub(super) fn submit_local(
350 &self,
351 xt: ExtrinsicFor<ChainApi>,
352 ) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
353 let (tx_hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
354 trace!(
355 target: LOG_TARGET,
356 ?tx_hash,
357 view_at_hash = ?self.at.hash,
358 "view::submit_local"
359 );
360 let validity = self
361 .pool
362 .validated_pool()
363 .api()
364 .validate_transaction_blocking(
365 self.at.hash,
366 sc_transaction_pool_api::TransactionSource::Local,
367 Arc::from(xt.clone()),
368 )?
369 .map_err(|e| {
370 match e {
371 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
372 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
373 }
374 .into()
375 })?;
376
377 let block_number = self
378 .pool
379 .validated_pool()
380 .api()
381 .block_id_to_number(&BlockId::hash(self.at.hash))?
382 .ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;
383
384 let validated = ValidatedTransaction::valid_at(
385 block_number.saturated_into::<u64>(),
386 tx_hash,
387 TimedTransactionSource::new_local(true),
388 Arc::from(xt),
389 length,
390 validity,
391 );
392
393 self.pool.validated_pool().submit(vec![validated]).remove(0)
394 }
395
396 pub(super) fn status(&self) -> PoolStatus {
398 self.pool.validated_pool().status()
399 }
400
401 pub(super) async fn revalidate(
412 &self,
413 finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<ChainApi>,
414 ) {
415 let FinishRevalidationWorkerChannels {
416 mut finish_revalidation_request_rx,
417 revalidation_result_tx,
418 } = finish_revalidation_worker_channels;
419
420 debug!(
421 target: LOG_TARGET,
422 at_hash = ?self.at.hash,
423 "view::revalidate: at starting"
424 );
425 let start = Instant::now();
426 let validated_pool = self.pool.validated_pool();
427 let api = validated_pool.api();
428
429 let batch: Vec<_> = validated_pool.ready().collect();
430 let batch_len = batch.len();
431
432 let mut invalid_hashes = Vec::new();
437 let mut revalidated = IndexMap::new();
438
439 let mut validation_results = vec![];
440 let mut batch_iter = batch.into_iter();
441 loop {
442 let mut should_break = false;
443 tokio::select! {
444 _ = finish_revalidation_request_rx.recv() => {
445 debug!(
446 target: LOG_TARGET,
447 at_hash = ?self.at.hash,
448 "view::revalidate: finish revalidation request received"
449 );
450 break
451 }
452 _ = async {
453 if let Some(tx) = batch_iter.next() {
454 let validation_result = (
455 api.validate_transaction(self.at.hash,
456 tx.source.clone().into(), tx.data.clone(),
457 ValidateTransactionPriority::Maintained).await,
458 tx.hash,
459 tx
460 );
461 validation_results.push(validation_result);
462 } else {
463 self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
464 should_break = true;
465 }
466 } => {}
467 }
468
469 if should_break {
470 break;
471 }
472 }
473
474 let revalidation_duration = start.elapsed();
475 self.metrics.report(|metrics| {
476 metrics.view_revalidation_duration.observe(revalidation_duration.as_secs_f64());
477 });
478 debug!(
479 target: LOG_TARGET,
480 at_hash = ?self.at.hash,
481 count = validation_results.len(),
482 batch_len,
483 duration = ?revalidation_duration,
484 "view::revalidate"
485 );
486 log_xt_trace!(
487 data:tuple,
488 target:LOG_TARGET,
489 validation_results.iter().map(|x| (x.1, &x.0)),
490 "view::revalidate result: {:?}"
491 );
492 for (validation_result, tx_hash, tx) in validation_results {
493 match validation_result {
494 Ok(Err(TransactionValidityError::Invalid(_))) => {
495 invalid_hashes.push(tx_hash);
496 },
497 Ok(Ok(validity)) => {
498 revalidated.insert(
499 tx_hash,
500 ValidatedTransaction::valid_at(
501 self.at.number.saturated_into::<u64>(),
502 tx_hash,
503 tx.source.clone(),
504 tx.data.clone(),
505 api.hash_and_length(&tx.data).1,
506 validity,
507 ),
508 );
509 },
510 Ok(Err(TransactionValidityError::Unknown(error))) => {
511 trace!(
512 target: LOG_TARGET,
513 ?tx_hash,
514 ?error,
515 "Removing. Cannot determine transaction validity"
516 );
517 invalid_hashes.push(tx_hash);
518 },
519 Err(error) => {
520 trace!(
521 target: LOG_TARGET,
522 ?tx_hash,
523 %error,
524 "Removing due to error during revalidation"
525 );
526 invalid_hashes.push(tx_hash);
527 },
528 }
529 }
530
531 debug!(
532 target: LOG_TARGET,
533 at_hash = ?self.at.hash,
534 "view::revalidate: sending revalidation result"
535 );
536 if let Err(error) = revalidation_result_tx
537 .send(RevalidationResult { invalid_hashes, revalidated })
538 .await
539 {
540 trace!(
541 target: LOG_TARGET,
542 at_hash = ?self.at.hash,
543 ?error,
544 "view::revalidate: sending revalidation_result failed"
545 );
546 }
547 }
548
549 pub(super) async fn start_background_revalidation(
558 view: Arc<Self>,
559 revalidation_queue: Arc<
560 super::revalidation_worker::RevalidationQueue<ChainApi, ChainApi::Block>,
561 >,
562 ) {
563 debug!(
564 target: LOG_TARGET,
565 at_hash = ?view.at.hash,
566 "view::start_background_revalidation"
567 );
568 let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
569 tokio::sync::mpsc::channel(1);
570 let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
571
572 let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
573 finish_revalidation_request_rx,
574 revalidation_result_tx,
575 );
576
577 let finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
578 finish_revalidation_request_tx,
579 revalidation_result_rx,
580 );
581
582 *view.revalidation_worker_channels.lock() = Some(finish_revalidation_local_channels);
583 revalidation_queue
584 .revalidate_view(view.clone(), finish_revalidation_worker_channels)
585 .await;
586 }
587
588 pub(super) async fn finish_revalidation(&self) {
597 trace!(
598 target: LOG_TARGET,
599 at_hash = ?self.at.hash,
600 "view::finish_revalidation"
601 );
602 let Some(revalidation_worker_channels) = self.revalidation_worker_channels.lock().take()
603 else {
604 trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx");
605 return
606 };
607
608 let FinishRevalidationLocalChannels {
609 finish_revalidation_request_tx,
610 mut revalidation_result_rx,
611 } = revalidation_worker_channels;
612
613 if let Some(finish_revalidation_request_tx) = finish_revalidation_request_tx {
614 if let Err(error) = finish_revalidation_request_tx.send(()).await {
615 trace!(
616 target: LOG_TARGET,
617 at_hash = ?self.at.hash,
618 %error,
619 "view::finish_revalidation: sending cancellation request failed"
620 );
621 }
622 }
623
624 if let Some(revalidation_result) = revalidation_result_rx.recv().await {
625 let start = Instant::now();
626 let revalidated_len = revalidation_result.revalidated.len();
627 let validated_pool = self.pool.validated_pool();
628 validated_pool.remove_invalid(&revalidation_result.invalid_hashes);
629 if revalidated_len > 0 {
630 self.pool.resubmit(revalidation_result.revalidated);
631 }
632
633 self.metrics.report(|metrics| {
634 let _ = (
635 revalidation_result
636 .invalid_hashes
637 .len()
638 .try_into()
639 .map(|v| metrics.view_revalidation_invalid_txs.inc_by(v)),
640 revalidated_len
641 .try_into()
642 .map(|v| metrics.view_revalidation_resubmitted_txs.inc_by(v)),
643 );
644 });
645
646 debug!(
647 target: LOG_TARGET,
648 invalid = revalidation_result.invalid_hashes.len(),
649 revalidated = revalidated_len,
650 at_hash = ?self.at.hash,
651 duration = ?start.elapsed(),
652 "view::finish_revalidation: applying revalidation result"
653 );
654 }
655 }
656
657 pub(super) fn is_imported(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
659 const IGNORE_BANNED: bool = false;
660 self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
661 }
662
663 pub fn remove_subtree<F>(
667 &self,
668 hashes: &[ExtrinsicHash<ChainApi>],
669 ban_transactions: bool,
670 listener_action: F,
671 ) -> Vec<TransactionFor<ChainApi>>
672 where
673 F: Fn(
674 &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
675 ExtrinsicHash<ChainApi>,
676 ),
677 {
678 self.pool
679 .validated_pool()
680 .remove_subtree(hashes, ban_transactions, listener_action)
681 }
682}