1use super::{
22 dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23 import_notification_sink::MultiViewImportNotificationSink,
24 metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25 multi_view_listener::MultiViewListener,
26 tx_mem_pool::{InsertionInfo, TxMemPool},
27 view::View,
28 view_store::ViewStore,
29};
30use crate::{
31 api::FullChainApi,
32 common::{
33 sliding_stat::DurationSlidingStats,
34 tracing_log_xt::{log_xt_debug, log_xt_trace},
35 STAT_SLIDING_WINDOW,
36 },
37 enactment_state::{EnactmentAction, EnactmentState},
38 fork_aware_txpool::{
39 dropped_watcher::{DroppedReason, DroppedTransaction},
40 revalidation_worker,
41 },
42 graph::{
43 self,
44 base_pool::{TimedTransactionSource, Transaction},
45 BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46 },
47 insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48 LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52 channel::oneshot,
53 future::{self},
54 prelude::*,
55 FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60 error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61 MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62 TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67 generic::BlockId,
68 traits::{Block as BlockT, NumberFor},
69 transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70 Saturating,
71};
72use std::{
73 collections::{BTreeMap, HashMap, HashSet},
74 pin::Pin,
75 sync::Arc,
76 time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, instrument, trace, warn, Level};
80
81const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95struct ReadyPoll<T, Block>
98where
99 Block: BlockT,
100{
101 pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106 Block: BlockT,
107{
108 fn new() -> Self {
110 Self { pollers: Default::default() }
111 }
112
113 fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116 let (s, r) = oneshot::channel();
117 self.pollers.entry(at).or_default().push(s);
118 r
119 }
120
121 fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126 debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127 let Some(pollers) = self.pollers.remove(&at) else { return };
128 pollers.into_iter().for_each(|p| {
129 debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130 let _ = p.send(ready_iterator());
131 });
132 }
133
134 fn remove_cancelled(&mut self) {
136 self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137 }
138}
139
140pub struct ForkAwareTxPool<ChainApi, Block>
144where
145 Block: BlockT,
146 ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148 api: Arc<ChainApi>,
150
151 mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154 view_store: Arc<ViewStore<ChainApi, Block>>,
156
157 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160 metrics: PrometheusMetrics,
162
163 events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169 revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172 import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176 options: Options,
178
179 is_validator: IsValidator,
181
182 finality_timeout_threshold: usize,
188
189 included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198 submit_stats: DurationSlidingStats,
200
201 submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207 Block: BlockT,
208 ChainApi: graph::ChainApi<Block = Block> + 'static,
209 <Block as BlockT>::Hash: Unpin,
210{
211 fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215 if let Some(block_number) =
216 self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217 {
218 let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219 let tree_route =
220 &TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221 let view = self.build_and_plug_view(None, &at_best, &tree_route);
222 self.view_store.insert_new_view_sync(view.into(), &tree_route);
223 trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224 };
225 self
226 }
227
228 pub fn new_test(
231 pool_api: Arc<ChainApi>,
232 best_block_hash: Block::Hash,
233 finalized_hash: Block::Hash,
234 finality_timeout_threshold: Option<usize>,
235 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
236 Self::new_test_with_limits(
237 pool_api,
238 best_block_hash,
239 finalized_hash,
240 Options::default().ready,
241 Options::default().future,
242 usize::MAX,
243 finality_timeout_threshold,
244 )
245 }
246
247 pub fn new_test_with_limits(
250 pool_api: Arc<ChainApi>,
251 best_block_hash: Block::Hash,
252 finalized_hash: Block::Hash,
253 ready_limits: crate::PoolLimit,
254 future_limits: crate::PoolLimit,
255 mempool_max_transactions_count: usize,
256 finality_timeout_threshold: Option<usize>,
257 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
258 let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259 let listener = Arc::new(listener);
260
261 let (import_notification_sink, import_notification_sink_task) =
262 MultiViewImportNotificationSink::new_with_worker();
263
264 let (mempool, mempool_task) = TxMemPool::new(
265 pool_api.clone(),
266 listener.clone(),
267 Default::default(),
268 mempool_max_transactions_count,
269 ready_limits.total_bytes + future_limits.total_bytes,
270 );
271 let mempool = Arc::from(mempool);
272
273 let (dropped_stream_controller, dropped_stream) =
274 MultiViewDroppedWatcherController::<ChainApi>::new();
275
276 let view_store = Arc::new(ViewStore::new(
277 pool_api.clone(),
278 listener,
279 dropped_stream_controller,
280 import_notification_sink.clone(),
281 ));
282
283 let dropped_monitor_task = Self::dropped_monitor_task(
284 dropped_stream,
285 mempool.clone(),
286 view_store.clone(),
287 import_notification_sink.clone(),
288 );
289
290 let combined_tasks = async move {
291 tokio::select! {
292 _ = listener_task => {},
293 _ = import_notification_sink_task => {},
294 _ = dropped_monitor_task => {}
295 }
296 }
297 .boxed();
298
299 let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301 (
302 Self {
303 mempool,
304 api: pool_api,
305 view_store,
306 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308 best_block_hash,
309 finalized_hash,
310 ))),
311 revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312 import_notification_sink,
313 options,
314 is_validator: false.into(),
315 metrics: Default::default(),
316 events_metrics_collector: EventsMetricsCollector::default(),
317 finality_timeout_threshold: finality_timeout_threshold
318 .unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319 included_transactions: Default::default(),
320 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322 STAT_SLIDING_WINDOW,
323 )),
324 }
325 .inject_initial_view(best_block_hash),
326 [combined_tasks, mempool_task],
327 )
328 }
329
330 async fn dropped_monitor_task(
338 mut dropped_stream: StreamOfDropped<ChainApi>,
339 mempool: Arc<TxMemPool<ChainApi, Block>>,
340 view_store: Arc<ViewStore<ChainApi, Block>>,
341 import_notification_sink: MultiViewImportNotificationSink<
342 Block::Hash,
343 ExtrinsicHash<ChainApi>,
344 >,
345 ) {
346 let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347 loop {
348 let Some(dropped) = dropped_stream.next().await else {
349 debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350 break;
351 };
352 let start = Instant::now();
353 let tx_hash = dropped.tx_hash;
354 trace!(
355 target: LOG_TARGET,
356 ?tx_hash,
357 reason = ?dropped.reason,
358 "fatp::dropped notification, removing"
359 );
360 match dropped.reason {
361 DroppedReason::Usurped(new_tx_hash) => {
362 if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363 view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364 } else {
365 trace!(
366 target: LOG_TARGET,
367 tx_hash = ?new_tx_hash,
368 "error: dropped_monitor_task: no entry in mempool for new transaction"
369 );
370 };
371 },
372 DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373 view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374 },
375 };
376
377 mempool.remove_transactions(&[tx_hash]).await;
378 import_notification_sink.clean_notified_items(&[tx_hash]);
379 view_store.listener.transaction_dropped(dropped);
380 insert_and_log_throttled!(
381 Level::DEBUG,
382 target:LOG_TARGET_STAT,
383 prefix:"dropped_stats",
384 dropped_stats,
385 start.elapsed().into()
386 );
387 }
388 }
389
390 pub fn new_with_background_worker(
395 options: Options,
396 is_validator: IsValidator,
397 pool_api: Arc<ChainApi>,
398 prometheus: Option<&PrometheusRegistry>,
399 spawner: impl SpawnEssentialNamed,
400 best_block_hash: Block::Hash,
401 finalized_hash: Block::Hash,
402 ) -> Self {
403 let metrics = PrometheusMetrics::new(prometheus);
404 let (events_metrics_collector, event_metrics_task) =
405 EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
406
407 let (listener, listener_task) =
408 MultiViewListener::new_with_worker(events_metrics_collector.clone());
409 let listener = Arc::new(listener);
410
411 let (revalidation_queue, revalidation_task) =
412 revalidation_worker::RevalidationQueue::new_with_worker();
413
414 let (import_notification_sink, import_notification_sink_task) =
415 MultiViewImportNotificationSink::new_with_worker();
416
417 let (mempool, blocking_mempool_task) = TxMemPool::new(
418 pool_api.clone(),
419 listener.clone(),
420 metrics.clone(),
421 options.total_count(),
422 options.ready.total_bytes + options.future.total_bytes,
423 );
424 let mempool = Arc::from(mempool);
425
426 let (dropped_stream_controller, dropped_stream) =
427 MultiViewDroppedWatcherController::<ChainApi>::new();
428
429 let view_store = Arc::new(ViewStore::new(
430 pool_api.clone(),
431 listener,
432 dropped_stream_controller,
433 import_notification_sink.clone(),
434 ));
435
436 let dropped_monitor_task = Self::dropped_monitor_task(
437 dropped_stream,
438 mempool.clone(),
439 view_store.clone(),
440 import_notification_sink.clone(),
441 );
442
443 let combined_tasks = async move {
444 tokio::select! {
445 _ = listener_task => {}
446 _ = revalidation_task => {},
447 _ = import_notification_sink_task => {},
448 _ = dropped_monitor_task => {}
449 _ = event_metrics_task => {},
450 }
451 }
452 .boxed();
453 spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
454 spawner.spawn_essential_blocking(
455 "txpool-background",
456 Some("transaction-pool"),
457 blocking_mempool_task,
458 );
459
460 Self {
461 mempool,
462 api: pool_api,
463 view_store,
464 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
465 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
466 best_block_hash,
467 finalized_hash,
468 ))),
469 revalidation_queue: Arc::from(revalidation_queue),
470 import_notification_sink,
471 options,
472 metrics,
473 events_metrics_collector,
474 is_validator,
475 finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
476 included_transactions: Default::default(),
477 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
478 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
479 STAT_SLIDING_WINDOW,
480 )),
481 }
482 .inject_initial_view(best_block_hash)
483 }
484
485 pub fn api(&self) -> &ChainApi {
487 &self.api
488 }
489
490 pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
492 self.view_store.status()
493 }
494
495 pub fn active_views_count(&self) -> usize {
497 self.view_store.active_views.read().len()
498 }
499
500 pub fn inactive_views_count(&self) -> usize {
502 self.view_store.inactive_views.read().len()
503 }
504
505 fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
510 self.view_store
511 .active_views
512 .read()
513 .iter()
514 .map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
515 .collect()
516 }
517
518 pub fn has_view(&self, hash: &Block::Hash) -> bool {
520 self.view_store.active_views.read().contains_key(hash)
521 }
522
523 pub async fn mempool_len(&self) -> (usize, usize) {
527 self.mempool.unwatched_and_watched_count().await
528 }
529
530 pub fn futures_at(
534 &self,
535 at: Block::Hash,
536 ) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
537 self.view_store.futures_at(at)
538 }
539
540 pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
553 let start = Instant::now();
554 let api = self.api.clone();
555 debug!(
556 target: LOG_TARGET,
557 ?at,
558 "fatp::ready_at_light"
559 );
560
561 let at_number = self.api.resolve_block_number(at).ok();
562 let finalized_number = self
563 .api
564 .resolve_block_number(self.enactment_state.lock().recent_finalized_block())
565 .ok();
566
567 if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
570 let at_hn = HashAndNumber { hash: at, number: at_number };
571 finalized_number.and_then(|finalized_number| {
572 self.view_store
573 .find_view_descendent_up_to_number(&at_hn, finalized_number)
574 .map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
575 })
576 }) {
577 let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
578 let mut all_extrinsics = vec![];
579 for h in enacted_blocks {
580 let extrinsics = api
581 .block_body(h)
582 .await
583 .unwrap_or_else(|error| {
584 warn!(
585 target: LOG_TARGET,
586 %error,
587 "Compute ready light transactions: error request"
588 );
589 None
590 })
591 .unwrap_or_default()
592 .into_iter()
593 .map(|t| api.hash_and_length(&t).0);
594 all_extrinsics.extend(extrinsics);
595 }
596
597 let before_count = tmp_view.pool.validated_pool().status().ready;
598 let tags = tmp_view
599 .pool
600 .validated_pool()
601 .extrinsics_tags(&all_extrinsics)
602 .into_iter()
603 .flatten()
604 .flatten()
605 .collect::<Vec<_>>();
606 let _ = tmp_view.pool.validated_pool().prune_tags(tags);
607
608 let after_count = tmp_view.pool.validated_pool().status().ready;
609 debug!(
610 target: LOG_TARGET,
611 ?at,
612 best_view_hash = ?view.at.hash,
613 before_count,
614 to_be_removed = all_extrinsics.len(),
615 after_count,
616 duration = ?start.elapsed(),
617 "fatp::ready_at_light -> light"
618 );
619 Box::new(tmp_view.pool.validated_pool().ready())
620 } else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
621 debug!(
626 target: LOG_TARGET,
627 ?at,
628 duration = ?start.elapsed(),
629 "fatp::ready_at_light -> most_recent_view"
630 );
631 Box::new(most_recent_view.pool.validated_pool().ready())
632 } else {
633 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
634 debug!(
635 target: LOG_TARGET,
636 ?at,
637 duration = ?start.elapsed(),
638 "fatp::ready_at_light -> empty"
639 );
640 empty
641 }
642 }
643
644 async fn ready_at_with_timeout_internal(
655 &self,
656 at: Block::Hash,
657 timeout: std::time::Duration,
658 ) -> ReadyIteratorFor<ChainApi> {
659 debug!(
660 target: LOG_TARGET,
661 ?at,
662 ?timeout,
663 "fatp::ready_at_with_timeout"
664 );
665 let timeout = futures_timer::Delay::new(timeout);
666 let (view_already_exists, ready_at) = self.ready_at_internal(at);
667
668 if view_already_exists {
669 return ready_at.await;
670 }
671
672 let maybe_ready = async move {
673 select! {
674 ready = ready_at => Some(ready),
675 _ = timeout => {
676 debug!(
677 target: LOG_TARGET,
678 ?at,
679 "Timeout fired waiting for transaction pool at block. Proceeding with production."
680 );
681 None
682 }
683 }
684 };
685
686 let fall_back_ready = self.ready_at_light(at);
687 let (maybe_ready, fall_back_ready) =
688 futures::future::join(maybe_ready, fall_back_ready).await;
689 maybe_ready.unwrap_or(fall_back_ready)
690 }
691
692 fn ready_at_internal(
693 &self,
694 at: Block::Hash,
695 ) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
696 let mut ready_poll = self.ready_poll.lock();
697
698 if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
699 debug!(
700 target: LOG_TARGET,
701 ?at,
702 ?inactive,
703 "fatp::ready_at_internal"
704 );
705 let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
706 return (true, async move { iterator }.boxed());
707 }
708
709 let pending = ready_poll
710 .add(at)
711 .map(|received| {
712 received.unwrap_or_else(|error| {
713 warn!(
714 target: LOG_TARGET,
715 %error,
716 "Error receiving ready-set iterator"
717 );
718 Box::new(std::iter::empty())
719 })
720 })
721 .boxed();
722 debug!(
723 target: LOG_TARGET,
724 ?at,
725 pending_keys = ?ready_poll.pollers.keys(),
726 "fatp::ready_at_internal"
727 );
728 (false, pending)
729 }
730
731 async fn submit_and_watch_inner(
733 &self,
734 at: Block::Hash,
735 source: TransactionSource,
736 xt: TransactionFor<Self>,
737 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
738 let xt = Arc::from(xt);
739
740 let at_number = self
741 .api
742 .block_id_to_number(&BlockId::Hash(at))
743 .ok()
744 .flatten()
745 .unwrap_or_default()
746 .into()
747 .as_u64();
748
749 let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
750 Ok(result) => result,
751 Err(TxPoolApiError::ImmediatelyDropped) => {
752 self.attempt_transaction_replacement(source, at_number, true, xt.clone())
753 .await?
754 },
755 Err(e) => return Err(e.into()),
756 };
757
758 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
759 self.events_metrics_collector.report_submitted(&insertion);
760
761 match self.view_store.submit_and_watch(at, insertion.source, xt).await {
762 Err(e) => {
763 self.mempool.remove_transactions(&[insertion.hash]).await;
764 Err(e.into())
765 },
766 Ok(mut outcome) => {
767 self.mempool
768 .update_transaction_priority(outcome.hash(), outcome.priority())
769 .await;
770 Ok(outcome.expect_watcher())
771 },
772 }
773 }
774
775 async fn submit_at_inner(
777 &self,
778 at: Block::Hash,
779 source: TransactionSource,
780 xts: Vec<TransactionFor<Self>>,
781 ) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
782 let at_number = self
783 .api
784 .block_id_to_number(&BlockId::Hash(at))
785 .ok()
786 .flatten()
787 .unwrap_or_default()
788 .into()
789 .as_u64();
790 let view_store = self.view_store.clone();
791 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
792 let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
793
794 if view_store.is_empty() {
795 return Ok(mempool_results
796 .into_iter()
797 .map(|r| r.map(|r| r.hash).map_err(Into::into))
798 .collect::<Vec<_>>());
799 }
800
801 let retries = mempool_results
803 .into_iter()
804 .zip(xts.clone())
805 .map(|(result, xt)| async move {
806 match result {
807 Err(TxPoolApiError::ImmediatelyDropped) => {
808 self.attempt_transaction_replacement(source, at_number, false, xt).await
809 },
810 _ => result,
811 }
812 })
813 .collect::<Vec<_>>();
814
815 let mempool_results = futures::future::join_all(retries).await;
816
817 let to_be_submitted = mempool_results
819 .iter()
820 .zip(xts)
821 .filter_map(|(result, xt)| {
822 result.as_ref().ok().map(|insertion| {
823 self.events_metrics_collector.report_submitted(&insertion);
824 (insertion.source.clone(), xt)
825 })
826 })
827 .collect::<Vec<_>>();
828
829 self.metrics
830 .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
831
832 let mempool = self.mempool.clone();
835 let results_map = view_store.submit(to_be_submitted.into_iter()).await;
836 let mut submission_results = reduce_multiview_result(results_map).into_iter();
837
838 const RESULTS_ASSUMPTION : &str =
855 "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
856 let merged_results = mempool_results.into_iter().map(|result| {
857 result.map_err(Into::into).and_then(|insertion| {
858 Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
859 })
860 });
861
862 let mut final_results = vec![];
863 for r in merged_results {
864 match r {
865 Ok((hash, submission_result)) => match submission_result {
866 Ok(r) => {
867 mempool.update_transaction_priority(r.hash(), r.priority()).await;
868 final_results.push(Ok(r.hash()));
869 },
870 Err(e) => {
871 mempool.remove_transactions(&[hash]).await;
872 final_results.push(Err(e));
873 },
874 },
875 Err(e) => final_results.push(Err(e)),
876 }
877 }
878
879 Ok(final_results)
880 }
881
882 pub fn import_notification_sink_len(&self) -> usize {
886 self.import_notification_sink.notified_items_len()
887 }
888}
889
890fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
917 let mut values = input.values();
918 let Some(first) = values.next() else {
919 return Default::default();
920 };
921 let length = first.len();
922 debug_assert!(values.all(|x| length == x.len()));
923
924 input
925 .into_values()
926 .reduce(|mut agg_results, results| {
927 agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
928 if agg_r.is_err() {
929 *agg_r = r;
930 }
931 });
932 agg_results
933 })
934 .unwrap_or_default()
935}
936
937#[async_trait]
938impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
939where
940 Block: BlockT,
941 ChainApi: 'static + graph::ChainApi<Block = Block>,
942 <Block as BlockT>::Hash: Unpin,
943{
944 type Block = ChainApi::Block;
945 type Hash = ExtrinsicHash<ChainApi>;
946 type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
947 type Error = ChainApi::Error;
948
949 async fn submit_at(
956 &self,
957 at: <Self::Block as BlockT>::Hash,
958 source: TransactionSource,
959 xts: Vec<TransactionFor<Self>>,
960 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
961 let start = Instant::now();
962 trace!(
963 target: LOG_TARGET,
964 count = xts.len(),
965 active_views_count = self.active_views_count(),
966 "fatp::submit_at"
967 );
968 log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
969 let result = self.submit_at_inner(at, source, xts).await;
970 insert_and_log_throttled!(
971 Level::DEBUG,
972 target:LOG_TARGET_STAT,
973 prefix:"submit_stats",
974 self.submit_stats,
975 start.elapsed().into()
976 );
977 result
978 }
979
980 async fn submit_one(
984 &self,
985 _at: <Self::Block as BlockT>::Hash,
986 source: TransactionSource,
987 xt: TransactionFor<Self>,
988 ) -> Result<TxHash<Self>, Self::Error> {
989 trace!(
990 target: LOG_TARGET,
991 tx_hash = ?self.tx_hash(&xt),
992 active_views_count = self.active_views_count(),
993 "fatp::submit_one"
994 );
995 match self.submit_at(_at, source, vec![xt]).await {
996 Ok(mut v) => {
997 v.pop().expect("There is exactly one element in result of submit_at. qed.")
998 },
999 Err(e) => Err(e),
1000 }
1001 }
1002
1003 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1008 async fn submit_and_watch(
1009 &self,
1010 at: <Self::Block as BlockT>::Hash,
1011 source: TransactionSource,
1012 xt: TransactionFor<Self>,
1013 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1014 let start = Instant::now();
1015 trace!(
1016 target: LOG_TARGET,
1017 tx_hash = ?self.tx_hash(&xt),
1018 views = self.active_views_count(),
1019 "fatp::submit_and_watch"
1020 );
1021 let result = self.submit_and_watch_inner(at, source, xt).await;
1022 insert_and_log_throttled!(
1023 Level::DEBUG,
1024 target:LOG_TARGET_STAT,
1025 prefix:"submit_and_watch_stats",
1026 self.submit_and_watch_stats,
1027 start.elapsed().into()
1028 );
1029 result
1030 }
1031
1032 async fn report_invalid(
1041 &self,
1042 at: Option<<Self::Block as BlockT>::Hash>,
1043 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1044 ) -> Vec<Arc<Self::InPoolTransaction>> {
1045 debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1046 log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1047 self.metrics
1048 .report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1049
1050 let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1051
1052 let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1053 self.mempool.remove_transactions(&removed_hashes).await;
1054 self.import_notification_sink.clean_notified_items(&removed_hashes);
1055
1056 self.metrics
1057 .report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1058
1059 removed
1060 }
1061
1062 fn status(&self) -> PoolStatus {
1070 self.view_store
1071 .most_recent_view
1072 .read()
1073 .as_ref()
1074 .map(|v| v.status())
1075 .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1076 }
1077
1078 fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1083 self.import_notification_sink.event_stream()
1084 }
1085
1086 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1088 self.api().hash_and_length(xt).0
1089 }
1090
1091 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1093 self.view_store.listener.transactions_broadcasted(propagations);
1094 }
1095
1096 fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1102 let most_recent_view_hash =
1103 self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1104 let result = most_recent_view_hash
1105 .and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1106 trace!(
1107 target: LOG_TARGET,
1108 ?tx_hash,
1109 is_ready = result.is_some(),
1110 most_recent_view = ?most_recent_view_hash,
1111 "ready_transaction"
1112 );
1113 result
1114 }
1115
1116 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1118 let (_, result) = self.ready_at_internal(at);
1119 result.await
1120 }
1121
1122 fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1127 self.view_store.ready()
1128 }
1129
1130 fn futures(&self) -> Vec<Self::InPoolTransaction> {
1135 self.view_store.futures()
1136 }
1137
1138 async fn ready_at_with_timeout(
1143 &self,
1144 at: <Self::Block as BlockT>::Hash,
1145 timeout: std::time::Duration,
1146 ) -> ReadyIteratorFor<ChainApi> {
1147 self.ready_at_with_timeout_internal(at, timeout).await
1148 }
1149}
1150
1151impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1152 for ForkAwareTxPool<ChainApi, Block>
1153where
1154 Block: BlockT,
1155 ChainApi: 'static + graph::ChainApi<Block = Block>,
1156 <Block as BlockT>::Hash: Unpin,
1157{
1158 type Block = Block;
1159 type Hash = ExtrinsicHash<ChainApi>;
1160 type Error = ChainApi::Error;
1161
1162 fn submit_local(
1163 &self,
1164 at: Block::Hash,
1165 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1166 ) -> Result<Self::Hash, Self::Error> {
1167 trace!(
1168 target: LOG_TARGET,
1169 active_views_count = self.active_views_count(),
1170 "fatp::submit_local"
1171 );
1172 let xt = Arc::from(xt);
1173 let at_number = self
1174 .api
1175 .block_id_to_number(&BlockId::Hash(at))
1176 .ok()
1177 .flatten()
1178 .unwrap_or_default()
1179 .into()
1180 .as_u64();
1181
1182 let result = self
1184 .mempool
1185 .clone()
1186 .extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1187 .remove(0);
1188
1189 let insertion = match result {
1190 Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1191 TransactionSource::Local,
1192 false,
1193 xt.clone(),
1194 ),
1195 _ => result,
1196 }?;
1197
1198 self.view_store
1199 .submit_local(xt)
1200 .inspect_err(|_| {
1201 self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1202 })
1203 .map(|outcome| {
1204 self.mempool
1205 .clone()
1206 .update_transaction_priority_sync(outcome.hash(), outcome.priority());
1207 outcome.hash()
1208 })
1209 .or_else(|_| Ok(insertion.hash))
1210 }
1211}
1212
1213impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1214where
1215 Block: BlockT,
1216 ChainApi: graph::ChainApi<Block = Block> + 'static,
1217 <Block as BlockT>::Hash: Unpin,
1218{
1219 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1227 async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1228 let hash_and_number = match tree_route.last() {
1229 Some(hash_and_number) => hash_and_number,
1230 None => {
1231 warn!(
1232 target: LOG_TARGET,
1233 ?tree_route,
1234 "Skipping ChainEvent - no last block in tree route"
1235 );
1236 return;
1237 },
1238 };
1239
1240 if self.has_view(&hash_and_number.hash) {
1241 debug!(
1242 target: LOG_TARGET,
1243 ?hash_and_number,
1244 "view already exists for block"
1245 );
1246 return;
1247 }
1248
1249 let best_view = self.view_store.find_best_view(tree_route);
1250 let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1251
1252 if let Some(view) = new_view {
1253 {
1254 let view = view.clone();
1255 self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1256 Box::from(view.pool.validated_pool().ready())
1257 });
1258 }
1259
1260 View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1261 }
1262
1263 self.finality_stall_cleanup(hash_and_number).await;
1264 }
1265
1266 async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1276 let (oldest_block_number, finality_timedout_blocks) = {
1277 let mut included_transactions = self.included_transactions.lock();
1278
1279 let Some(oldest_block_number) =
1280 included_transactions.first_key_value().map(|(k, _)| k.number)
1281 else {
1282 return;
1283 };
1284
1285 if at.number.saturating_sub(oldest_block_number).into() <=
1286 self.finality_timeout_threshold.into()
1287 {
1288 return;
1289 }
1290
1291 let mut finality_timedout_blocks =
1292 indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1293
1294 included_transactions.retain(
1295 |HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1296 let diff = at.number.saturating_sub(*view_number);
1297 if diff.into() > self.finality_timeout_threshold.into() {
1298 finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1299 false
1300 } else {
1301 true
1302 }
1303 },
1304 );
1305
1306 (oldest_block_number, finality_timedout_blocks)
1307 };
1308
1309 if !finality_timedout_blocks.is_empty() {
1310 self.ready_poll.lock().remove_cancelled();
1311 self.view_store.listener.remove_stale_controllers();
1312 }
1313
1314 let finality_timedout_blocks_len = finality_timedout_blocks.len();
1315
1316 for (block_hash, tx_hashes) in finality_timedout_blocks {
1317 self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1318
1319 self.mempool.remove_transactions(&tx_hashes).await;
1320 self.import_notification_sink.clean_notified_items(&tx_hashes);
1321 self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1322 }
1323
1324 self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1325
1326 debug!(
1327 target: LOG_TARGET,
1328 ?at,
1329 included_transactions_len = ?self.included_transactions.lock().len(),
1330 finality_timedout_blocks_len,
1331 ?oldest_block_number,
1332 "finality_stall_cleanup"
1333 );
1334 }
1335
1336 fn build_and_plug_view(
1345 &self,
1346 origin_view: Option<Arc<View<ChainApi>>>,
1347 at: &HashAndNumber<Block>,
1348 tree_route: &TreeRoute<Block>,
1349 ) -> View<ChainApi> {
1350 let enter = Instant::now();
1351 let (view, view_dropped_stream, view_aggregated_stream) =
1352 if let Some(origin_view) = origin_view {
1353 let (mut view, view_dropped_stream, view_aggragated_stream) =
1354 View::new_from_other(&origin_view, at);
1355 if !tree_route.retracted().is_empty() {
1356 view.pool.clear_recently_pruned();
1357 }
1358 (view, view_dropped_stream, view_aggragated_stream)
1359 } else {
1360 debug!(
1361 target: LOG_TARGET,
1362 ?at,
1363 "creating non-cloned view"
1364 );
1365 View::new(
1366 self.api.clone(),
1367 at.clone(),
1368 self.options.clone(),
1369 self.metrics.clone(),
1370 self.is_validator.clone(),
1371 )
1372 };
1373 debug!(
1374 target: LOG_TARGET,
1375 ?at,
1376 duration = ?enter.elapsed(),
1377 "build_new_view::clone_view"
1378 );
1379
1380 self.import_notification_sink.add_view(
1383 view.at.hash,
1384 view.pool.validated_pool().import_notification_stream().boxed(),
1385 );
1386
1387 self.view_store
1388 .dropped_stream_controller
1389 .add_view(view.at.hash, view_dropped_stream.boxed());
1390
1391 self.view_store
1392 .listener
1393 .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1394
1395 view
1396 }
1397
1398 async fn build_and_update_view(
1406 &self,
1407 origin_view: Option<Arc<View<ChainApi>>>,
1408 at: &HashAndNumber<Block>,
1409 tree_route: &TreeRoute<Block>,
1410 ) -> Option<Arc<View<ChainApi>>> {
1411 let start = Instant::now();
1412 debug!(
1413 target: LOG_TARGET,
1414 ?at,
1415 origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1416 ?tree_route,
1417 "build_new_view"
1418 );
1419
1420 let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1421
1422 view.pool.validated_pool().retrigger_notifications();
1425 debug!(
1426 target: LOG_TARGET,
1427 ?at,
1428 duration = ?start.elapsed(),
1429 "register_listeners"
1430 );
1431
1432 let start = Instant::now();
1435 self.update_view_with_fork(&view, tree_route, at.clone()).await;
1436 debug!(
1437 target: LOG_TARGET,
1438 ?at,
1439 duration = ?start.elapsed(),
1440 "update_view_with_fork"
1441 );
1442
1443 let start = Instant::now();
1445 self.update_view_with_mempool(&mut view).await;
1446 debug!(
1447 target: LOG_TARGET,
1448 ?at,
1449 duration= ?start.elapsed(),
1450 "update_view_with_mempool"
1451 );
1452 let view = Arc::from(view);
1453 self.view_store.insert_new_view(view.clone(), tree_route).await;
1454
1455 debug!(
1456 target: LOG_TARGET,
1457 duration = ?start.elapsed(),
1458 ?at,
1459 "build_new_view"
1460 );
1461 Some(view)
1462 }
1463
1464 async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1469 if let Some(txs) = self.included_transactions.lock().get(at) {
1470 return txs.clone();
1471 };
1472
1473 debug!(
1474 target: LOG_TARGET,
1475 ?at,
1476 "fetch_block_transactions from api"
1477 );
1478
1479 self.api
1480 .block_body(at.hash)
1481 .await
1482 .unwrap_or_else(|error| {
1483 warn!(
1484 target: LOG_TARGET,
1485 %error,
1486 "fetch_block_transactions: error request"
1487 );
1488 None
1489 })
1490 .unwrap_or_default()
1491 .into_iter()
1492 .map(|t| self.hash_of(&t))
1493 .collect::<Vec<_>>()
1494 }
1495
1496 async fn txs_included_since_finalized(
1501 &self,
1502 at: &HashAndNumber<Block>,
1503 ) -> HashSet<TxHash<Self>> {
1504 let start = Instant::now();
1505 let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1506
1507 let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1508 return Default::default();
1509 };
1510
1511 let mut all_txs = HashSet::new();
1512
1513 for block in tree_route.enacted().iter() {
1514 if at.number.saturating_sub(block.number).into() <=
1518 self.finality_timeout_threshold.into()
1519 {
1520 all_txs.extend(self.fetch_block_transactions(block).await);
1521 }
1522 }
1523
1524 debug!(
1525 target: LOG_TARGET,
1526 ?at,
1527 ?recent_finalized_block,
1528 extrinsics_count = all_txs.len(),
1529 duration = ?start.elapsed(),
1530 "fatp::txs_included_since_finalized"
1531 );
1532 all_txs
1533 }
1534
1535 async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1544 let xts_count = self.mempool.unwatched_and_watched_count().await;
1545 debug!(
1546 target: LOG_TARGET,
1547 view_at = ?view.at,
1548 ?xts_count,
1549 active_views_count = self.active_views_count(),
1550 "update_view_with_mempool"
1551 );
1552 let included_xts = self.txs_included_since_finalized(&view.at).await;
1553
1554 let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1555 .mempool
1556 .with_transactions(|iter| {
1557 iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1558 .map(|(k, v)| (*k, v.clone()))
1559 .take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1561 .collect::<HashMap<_, _>>()
1562 })
1563 .await
1564 .into_iter()
1565 .map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1566 .unzip();
1567
1568 let results = view
1569 .submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1570 .await
1571 .into_iter()
1572 .zip(hashes)
1573 .map(|(result, tx_hash)| async move {
1574 if let Ok(outcome) = result {
1575 Ok(self
1576 .mempool
1577 .update_transaction_priority(outcome.hash(), outcome.priority())
1578 .await)
1579 } else {
1580 Err(tx_hash)
1581 }
1582 })
1583 .collect::<Vec<_>>();
1584
1585 let results = futures::future::join_all(results).await;
1586
1587 let submitted_count = results.len();
1588
1589 debug!(
1590 target: LOG_TARGET,
1591 view_at_hash = ?view.at.hash,
1592 submitted_count,
1593 mempool_len = self.mempool.len(),
1594 "update_view_with_mempool"
1595 );
1596
1597 self.metrics
1598 .report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1599
1600 if self.view_store.is_empty() {
1603 for result in results {
1604 if let Err(tx_hash) = result {
1605 self.view_store.listener.transactions_invalidated(&[tx_hash]);
1606 self.mempool.remove_transactions(&[tx_hash]).await;
1607 }
1608 }
1609 }
1610 }
1611
1612 async fn collect_provides_tags_from_view_store(
1619 &self,
1620 tree_route: &TreeRoute<Block>,
1621 xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1622 ) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1623 let blocks_hashes = tree_route
1624 .retracted()
1625 .iter()
1626 .skip(1)
1628 .chain(
1630 std::iter::once(tree_route.common_block())
1631 .chain(tree_route.enacted().iter().rev().skip(1)),
1632 )
1633 .collect::<Vec<&HashAndNumber<Block>>>();
1634
1635 self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1636 }
1637
1638 pub async fn collect_extrinsics(
1640 &self,
1641 blocks: &[HashAndNumber<Block>],
1642 ) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1643 future::join_all(blocks.iter().map(|hn| async move {
1644 (
1645 hn.hash,
1646 self.api
1647 .block_body(hn.hash)
1648 .await
1649 .unwrap_or_else(|e| {
1650 warn!(target: LOG_TARGET, %e, ": block_body error request");
1651 None
1652 })
1653 .unwrap_or_default(),
1654 )
1655 }))
1656 .await
1657 .into_iter()
1658 .collect()
1659 }
1660
1661 async fn update_view_with_fork(
1666 &self,
1667 view: &View<ChainApi>,
1668 tree_route: &TreeRoute<Block>,
1669 hash_and_number: HashAndNumber<Block>,
1670 ) {
1671 debug!(
1672 target: LOG_TARGET,
1673 ?tree_route,
1674 at = ?view.at,
1675 "update_view_with_fork"
1676 );
1677 let api = self.api.clone();
1678
1679 let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1681
1682 let known_provides_tags = Arc::new(
1685 self.collect_provides_tags_from_view_store(
1686 tree_route,
1687 extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1688 )
1689 .await,
1690 );
1691
1692 debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1693
1694 let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1697 future::join_all(tree_route.enacted().iter().map(|hn| {
1698 let api = api.clone();
1699 let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1700 let known_provides_tags = known_provides_tags.clone();
1701 async move {
1702 (
1703 hn,
1704 crate::prune_known_txs_for_block(
1705 hn,
1706 &*api,
1707 &view.pool,
1708 Some(xts),
1709 Some(known_provides_tags),
1710 )
1711 .await,
1712 )
1713 }
1714 }))
1715 .await
1716 .into_iter()
1717 .for_each(|(key, enacted_log)| {
1718 pruned_log.extend(enacted_log.clone());
1719 self.included_transactions.lock().insert(key.clone(), enacted_log);
1720 });
1721
1722 let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1723 self.metrics
1724 .report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1725
1726 {
1728 let mut resubmit_transactions = Vec::new();
1729
1730 for retracted in tree_route.retracted() {
1731 let hash = retracted.hash;
1732
1733 let block_transactions = api
1734 .block_body(hash)
1735 .await
1736 .unwrap_or_else(|error| {
1737 warn!(
1738 target: LOG_TARGET,
1739 %error,
1740 "Failed to fetch block body"
1741 );
1742 None
1743 })
1744 .unwrap_or_default()
1745 .into_iter();
1746
1747 let mut resubmitted_to_report = 0;
1748
1749 let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1750 |(tx_hash, _)| {
1751 let contains = pruned_log.contains(&tx_hash);
1752
1753 resubmitted_to_report += 1;
1755
1756 if !contains {
1757 trace!(
1758 target: LOG_TARGET,
1759 ?tx_hash,
1760 ?hash,
1761 "Resubmitting from retracted block"
1762 );
1763 }
1764 !contains
1765 },
1766 );
1767 let mut result = vec![];
1768 for (tx_hash, tx) in txs {
1769 result.push(
1770 self.mempool
1772 .get_by_hash(tx_hash)
1773 .await
1774 .map(|tx| (tx.source(), tx.tx()))
1775 .unwrap_or_else(|| {
1776 (TimedTransactionSource::new_external(true), Arc::from(tx))
1779 }),
1780 );
1781 }
1782 resubmit_transactions.extend(result);
1783
1784 self.metrics.report(|metrics| {
1785 metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1786 });
1787 }
1788
1789 let _ = view
1790 .pool
1791 .resubmit_at(
1792 &hash_and_number,
1793 resubmit_transactions,
1794 ValidateTransactionPriority::Maintained,
1795 )
1796 .await;
1797 }
1798 }
1799
1800 async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1806 let start = Instant::now();
1807 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1808 debug!(
1809 target: LOG_TARGET,
1810 ?finalized_number,
1811 ?tree_route,
1812 active_views_count = self.active_views_count(),
1813 "handle_finalized"
1814 );
1815 let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1816
1817 self.mempool.purge_finalized_transactions(&finalized_xts).await;
1818 self.import_notification_sink.clean_notified_items(&finalized_xts);
1819
1820 self.metrics
1821 .report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1822
1823 if let Ok(Some(finalized_number)) = finalized_number {
1824 self.included_transactions
1825 .lock()
1826 .retain(|cached_block, _| finalized_number < cached_block.number);
1827 self.revalidation_queue
1828 .revalidate_mempool(
1829 self.mempool.clone(),
1830 self.view_store.clone(),
1831 HashAndNumber { hash: finalized_hash, number: finalized_number },
1832 )
1833 .await;
1834 } else {
1835 debug!(
1836 target: LOG_TARGET,
1837 ?finalized_number,
1838 "handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1839 );
1840 }
1841
1842 self.ready_poll.lock().remove_cancelled();
1843
1844 debug!(
1845 target: LOG_TARGET,
1846 active_views_count = self.active_views_count(),
1847 included_transactions_len = ?self.included_transactions.lock().len(),
1848 duration = ?start.elapsed(),
1849 "handle_finalized after"
1850 );
1851 }
1852
1853 fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1855 self.api.hash_and_length(xt).0
1856 }
1857
1858 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1868 async fn attempt_transaction_replacement(
1869 &self,
1870 source: TransactionSource,
1871 at_number: u64,
1872 watched: bool,
1873 xt: ExtrinsicFor<ChainApi>,
1874 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1875 let best_view = self
1876 .view_store
1877 .most_recent_view
1878 .read()
1879 .as_ref()
1880 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1881 .clone();
1882
1883 let (xt_hash, validated_tx) = best_view
1884 .pool
1885 .verify_one(
1886 best_view.at.hash,
1887 best_view.at.number,
1888 TimedTransactionSource::from_transaction_source(source, false),
1889 xt.clone(),
1890 crate::graph::CheckBannedBeforeVerify::Yes,
1891 ValidateTransactionPriority::Submitted,
1892 )
1893 .await;
1894
1895 let Some(priority) = validated_tx.priority() else {
1896 return Err(TxPoolApiError::ImmediatelyDropped);
1897 };
1898
1899 let insertion_info = self
1900 .mempool
1901 .try_insert_with_replacement(xt, priority, source, at_number, watched)
1902 .await?;
1903 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1904 }
1905
1906 fn attempt_transaction_replacement_sync(
1908 &self,
1909 source: TransactionSource,
1910 watched: bool,
1911 xt: ExtrinsicFor<ChainApi>,
1912 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1913 let HashAndNumber { number: at_number, hash: at_hash } = self
1914 .view_store
1915 .most_recent_view
1916 .read()
1917 .as_ref()
1918 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1919 .at;
1920
1921 let ValidTransaction { priority, .. } = self
1922 .api
1923 .validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1924 .map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1925 .map_err(|e| match e {
1926 TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1927 TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1928 })?;
1929 let xt_hash = self.hash_of(&xt);
1930
1931 let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1932 xt,
1933 priority,
1934 source,
1935 at_number.into().as_u64(),
1936 watched,
1937 )?;
1938 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1939 }
1940
1941 fn post_attempt_transaction_replacement(
1942 &self,
1943 tx_hash: ExtrinsicHash<ChainApi>,
1944 insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1945 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1946 for worst_hash in &insertion_info.removed {
1947 trace!(
1948 target: LOG_TARGET,
1949 tx_hash = ?worst_hash,
1950 new_tx_hash = ?tx_hash,
1951 "removed: replaced by"
1952 );
1953 self.view_store
1954 .listener
1955 .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1956
1957 self.view_store
1958 .remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1959 listener.limits_enforced(&removed_tx_hash);
1960 });
1961 }
1962
1963 return Ok(insertion_info);
1964 }
1965}
1966
1967#[async_trait]
1968impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1969where
1970 Block: BlockT,
1971 ChainApi: 'static + graph::ChainApi<Block = Block>,
1972 <Block as BlockT>::Hash: Unpin,
1973{
1974 async fn maintain(&self, event: ChainEvent<Self::Block>) {
1976 let start = Instant::now();
1977 debug!(
1978 target: LOG_TARGET,
1979 ?event,
1980 "processing event"
1981 );
1982
1983 self.view_store.finish_background_revalidations().await;
1984
1985 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1986
1987 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1988 match self.api.tree_route(from, to) {
1989 Ok(tree_route) => Ok(tree_route),
1990 Err(e) => {
1991 return Err(format!(
1992 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1993 ))
1994 },
1995 }
1996 };
1997 let block_id_to_number =
1998 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1999
2000 let result =
2001 self.enactment_state
2002 .lock()
2003 .update(&event, &compute_tree_route, &block_id_to_number);
2004
2005 match result {
2006 Err(error) => {
2007 debug!(
2008 target: LOG_TARGET,
2009 %error,
2010 "enactment_state::update error"
2011 );
2012 self.enactment_state.lock().force_update(&event);
2013 },
2014 Ok(EnactmentAction::Skip) => return,
2015 Ok(EnactmentAction::HandleFinalization) => {
2016 },
2024 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2025 self.handle_new_block(&tree_route).await;
2026 },
2027 };
2028
2029 match event {
2030 ChainEvent::NewBestBlock { .. } => {},
2031 ChainEvent::Finalized { hash, ref tree_route } => {
2032 self.handle_finalized(hash, tree_route).await;
2033
2034 debug!(
2035 target: LOG_TARGET,
2036 ?tree_route,
2037 ?prev_finalized_block,
2038 "on-finalized enacted"
2039 );
2040 },
2041 }
2042
2043 let duration = start.elapsed();
2044 let mempool_len = self.mempool_len().await;
2045 debug!(
2046 target: LOG_TARGET,
2047 txs = ?mempool_len,
2048 a = self.active_views_count(),
2049 i = self.inactive_views_count(),
2050 views = ?self.views_stats(),
2051 ?event,
2052 ?duration,
2053 "maintain"
2054 );
2055
2056 self.metrics.report(|metrics| {
2057 let (unwatched, watched) = mempool_len;
2058 let _ = (
2059 self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2060 self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2061 watched.try_into().map(|v| metrics.watched_txs.set(v)),
2062 unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2063 );
2064 metrics.maintain_duration.observe(duration.as_secs_f64());
2065 });
2066 }
2067}
2068
2069impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2070where
2071 Block: BlockT,
2072 Client: sp_api::ProvideRuntimeApi<Block>
2073 + sc_client_api::BlockBackend<Block>
2074 + sc_client_api::blockchain::HeaderBackend<Block>
2075 + sp_runtime::traits::BlockIdTo<Block>
2076 + sc_client_api::ExecutorProvider<Block>
2077 + sc_client_api::UsageProvider<Block>
2078 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2079 + Send
2080 + Sync
2081 + 'static,
2082 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2083 <Block as BlockT>::Hash: std::marker::Unpin,
2084{
2085 pub fn new_full(
2087 options: Options,
2088 is_validator: IsValidator,
2089 prometheus: Option<&PrometheusRegistry>,
2090 spawner: impl SpawnEssentialNamed,
2091 client: Arc<Client>,
2092 ) -> Self {
2093 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2094 let pool = Self::new_with_background_worker(
2095 options,
2096 is_validator,
2097 pool_api,
2098 prometheus,
2099 spawner,
2100 client.usage_info().chain.best_hash,
2101 client.usage_info().chain.finalized_hash,
2102 );
2103
2104 pool
2105 }
2106}
2107
2108#[cfg(test)]
2109mod reduce_multiview_result_tests {
2110 use super::*;
2111 use sp_core::H256;
2112 #[derive(Debug, PartialEq, Clone)]
2113 enum Error {
2114 Custom(u8),
2115 }
2116
2117 #[test]
2118 fn empty() {
2119 sp_tracing::try_init_simple();
2120 let input = HashMap::default();
2121 let r = reduce_multiview_result::<H256, H256, Error>(input);
2122 assert!(r.is_empty());
2123 }
2124
2125 #[test]
2126 fn errors_only() {
2127 sp_tracing::try_init_simple();
2128 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2129 (
2130 H256::repeat_byte(0x13),
2131 vec![
2132 Err(Error::Custom(10)),
2133 Err(Error::Custom(11)),
2134 Err(Error::Custom(12)),
2135 Err(Error::Custom(13)),
2136 ],
2137 ),
2138 (
2139 H256::repeat_byte(0x14),
2140 vec![
2141 Err(Error::Custom(20)),
2142 Err(Error::Custom(21)),
2143 Err(Error::Custom(22)),
2144 Err(Error::Custom(23)),
2145 ],
2146 ),
2147 (
2148 H256::repeat_byte(0x15),
2149 vec![
2150 Err(Error::Custom(30)),
2151 Err(Error::Custom(31)),
2152 Err(Error::Custom(32)),
2153 Err(Error::Custom(33)),
2154 ],
2155 ),
2156 ];
2157 let input = HashMap::from_iter(v.clone());
2158 let r = reduce_multiview_result(input);
2159
2160 assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2162 }
2163
2164 #[test]
2165 #[should_panic]
2166 #[cfg(debug_assertions)]
2167 fn invalid_lengths() {
2168 sp_tracing::try_init_simple();
2169 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2170 (H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2171 (H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2172 ];
2173 let input = HashMap::from_iter(v);
2174 let _ = reduce_multiview_result(input);
2175 }
2176
2177 #[test]
2178 fn only_hashes() {
2179 sp_tracing::try_init_simple();
2180
2181 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2182 (
2183 H256::repeat_byte(0x13),
2184 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2185 ),
2186 (
2187 H256::repeat_byte(0x14),
2188 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2189 ),
2190 ];
2191 let input = HashMap::from_iter(v);
2192 let r = reduce_multiview_result(input);
2193
2194 assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2195 }
2196
2197 #[test]
2198 fn one_view() {
2199 sp_tracing::try_init_simple();
2200 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2201 H256::repeat_byte(0x13),
2202 vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2203 )];
2204 let input = HashMap::from_iter(v);
2205 let r = reduce_multiview_result(input);
2206
2207 assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2208 }
2209
2210 #[test]
2211 fn mix() {
2212 sp_tracing::try_init_simple();
2213 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2214 (
2215 H256::repeat_byte(0x13),
2216 vec![
2217 Ok(H256::repeat_byte(0x10)),
2218 Err(Error::Custom(11)),
2219 Err(Error::Custom(12)),
2220 Err(Error::Custom(33)),
2221 ],
2222 ),
2223 (
2224 H256::repeat_byte(0x14),
2225 vec![
2226 Err(Error::Custom(20)),
2227 Ok(H256::repeat_byte(0x21)),
2228 Err(Error::Custom(22)),
2229 Err(Error::Custom(33)),
2230 ],
2231 ),
2232 (
2233 H256::repeat_byte(0x15),
2234 vec![
2235 Err(Error::Custom(30)),
2236 Err(Error::Custom(31)),
2237 Ok(H256::repeat_byte(0x32)),
2238 Err(Error::Custom(33)),
2239 ],
2240 ),
2241 ];
2242 let input = HashMap::from_iter(v);
2243 let r = reduce_multiview_result(input);
2244
2245 assert_eq!(
2246 r,
2247 vec![
2248 Ok(H256::repeat_byte(0x10)),
2249 Ok(H256::repeat_byte(0x21)),
2250 Ok(H256::repeat_byte(0x32)),
2251 Err(Error::Custom(33))
2252 ]
2253 );
2254 }
2255}