1use super::{
22 dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23 import_notification_sink::MultiViewImportNotificationSink,
24 metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25 multi_view_listener::MultiViewListener,
26 tx_mem_pool::{InsertionInfo, TxMemPool},
27 view::View,
28 view_store::ViewStore,
29};
30use crate::{
31 api::FullChainApi,
32 common::{
33 sliding_stat::DurationSlidingStats,
34 tracing_log_xt::{log_xt_debug, log_xt_trace},
35 STAT_SLIDING_WINDOW,
36 },
37 enactment_state::{EnactmentAction, EnactmentState},
38 fork_aware_txpool::{
39 dropped_watcher::{DroppedReason, DroppedTransaction},
40 revalidation_worker,
41 },
42 graph::{
43 self,
44 base_pool::{TimedTransactionSource, Transaction},
45 BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46 },
47 insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48 LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52 channel::oneshot,
53 future::{self},
54 prelude::*,
55 FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60 error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61 MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62 TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67 generic::BlockId,
68 traits::{Block as BlockT, NumberFor},
69 transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70 Saturating,
71};
72use std::{
73 collections::{BTreeMap, HashMap, HashSet},
74 pin::Pin,
75 sync::Arc,
76 time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, info, instrument, trace, warn, Level};
80
81const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95struct ReadyPoll<T, Block>
98where
99 Block: BlockT,
100{
101 pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106 Block: BlockT,
107{
108 fn new() -> Self {
110 Self { pollers: Default::default() }
111 }
112
113 fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116 let (s, r) = oneshot::channel();
117 self.pollers.entry(at).or_default().push(s);
118 r
119 }
120
121 fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126 debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127 let Some(pollers) = self.pollers.remove(&at) else { return };
128 pollers.into_iter().for_each(|p| {
129 debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130 let _ = p.send(ready_iterator());
131 });
132 }
133
134 fn remove_cancelled(&mut self) {
136 self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137 }
138}
139
140pub struct ForkAwareTxPool<ChainApi, Block>
144where
145 Block: BlockT,
146 ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148 api: Arc<ChainApi>,
150
151 mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154 view_store: Arc<ViewStore<ChainApi, Block>>,
156
157 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160 metrics: PrometheusMetrics,
162
163 events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169 revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172 import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176 options: Options,
178
179 is_validator: IsValidator,
181
182 finality_timeout_threshold: usize,
188
189 included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198 submit_stats: DurationSlidingStats,
200
201 submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207 Block: BlockT,
208 ChainApi: graph::ChainApi<Block = Block> + 'static,
209 <Block as BlockT>::Hash: Unpin,
210{
211 fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215 if let Some(block_number) =
216 self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217 {
218 let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219 let tree_route =
220 &TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221 let view = self.build_and_plug_view(None, &at_best, &tree_route);
222 self.view_store.insert_new_view_sync(view.into(), &tree_route);
223 trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224 };
225 self
226 }
227
228 pub fn new_test(
231 pool_api: Arc<ChainApi>,
232 best_block_hash: Block::Hash,
233 finalized_hash: Block::Hash,
234 finality_timeout_threshold: Option<usize>,
235 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
236 Self::new_test_with_limits(
237 pool_api,
238 best_block_hash,
239 finalized_hash,
240 Options::default().ready,
241 Options::default().future,
242 usize::MAX,
243 finality_timeout_threshold,
244 )
245 }
246
247 pub fn new_test_with_limits(
250 pool_api: Arc<ChainApi>,
251 best_block_hash: Block::Hash,
252 finalized_hash: Block::Hash,
253 ready_limits: crate::PoolLimit,
254 future_limits: crate::PoolLimit,
255 mempool_max_transactions_count: usize,
256 finality_timeout_threshold: Option<usize>,
257 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
258 let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259 let listener = Arc::new(listener);
260
261 let (import_notification_sink, import_notification_sink_task) =
262 MultiViewImportNotificationSink::new_with_worker();
263
264 let (mempool, mempool_task) = TxMemPool::new(
265 pool_api.clone(),
266 listener.clone(),
267 Default::default(),
268 mempool_max_transactions_count,
269 ready_limits.total_bytes + future_limits.total_bytes,
270 );
271 let mempool = Arc::from(mempool);
272
273 let (dropped_stream_controller, dropped_stream) =
274 MultiViewDroppedWatcherController::<ChainApi>::new();
275
276 let view_store = Arc::new(ViewStore::new(
277 pool_api.clone(),
278 listener,
279 dropped_stream_controller,
280 import_notification_sink.clone(),
281 ));
282
283 let dropped_monitor_task = Self::dropped_monitor_task(
284 dropped_stream,
285 mempool.clone(),
286 view_store.clone(),
287 import_notification_sink.clone(),
288 );
289
290 let combined_tasks = async move {
291 tokio::select! {
292 _ = listener_task => {},
293 _ = import_notification_sink_task => {},
294 _ = dropped_monitor_task => {}
295 }
296 }
297 .boxed();
298
299 let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301 (
302 Self {
303 mempool,
304 api: pool_api,
305 view_store,
306 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308 best_block_hash,
309 finalized_hash,
310 ))),
311 revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312 import_notification_sink,
313 options,
314 is_validator: false.into(),
315 metrics: Default::default(),
316 events_metrics_collector: EventsMetricsCollector::default(),
317 finality_timeout_threshold: finality_timeout_threshold
318 .unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319 included_transactions: Default::default(),
320 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322 STAT_SLIDING_WINDOW,
323 )),
324 }
325 .inject_initial_view(best_block_hash),
326 [combined_tasks, mempool_task],
327 )
328 }
329
330 async fn dropped_monitor_task(
338 mut dropped_stream: StreamOfDropped<ChainApi>,
339 mempool: Arc<TxMemPool<ChainApi, Block>>,
340 view_store: Arc<ViewStore<ChainApi, Block>>,
341 import_notification_sink: MultiViewImportNotificationSink<
342 Block::Hash,
343 ExtrinsicHash<ChainApi>,
344 >,
345 ) {
346 let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347 loop {
348 let Some(dropped) = dropped_stream.next().await else {
349 debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350 break;
351 };
352 let start = Instant::now();
353 let tx_hash = dropped.tx_hash;
354 trace!(
355 target: LOG_TARGET,
356 ?tx_hash,
357 reason = ?dropped.reason,
358 "fatp::dropped notification, removing"
359 );
360 match dropped.reason {
361 DroppedReason::Usurped(new_tx_hash) => {
362 if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363 view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364 } else {
365 trace!(
366 target: LOG_TARGET,
367 tx_hash = ?new_tx_hash,
368 "error: dropped_monitor_task: no entry in mempool for new transaction"
369 );
370 };
371 },
372 DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373 view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374 },
375 };
376
377 mempool.remove_transactions(&[tx_hash]).await;
378 import_notification_sink.clean_notified_items(&[tx_hash]);
379 view_store.listener.transaction_dropped(dropped);
380 insert_and_log_throttled!(
381 Level::DEBUG,
382 target:LOG_TARGET_STAT,
383 prefix:"dropped_stats",
384 dropped_stats,
385 start.elapsed().into()
386 );
387 }
388 }
389
390 pub fn new_with_background_worker(
395 options: Options,
396 is_validator: IsValidator,
397 pool_api: Arc<ChainApi>,
398 prometheus: Option<&PrometheusRegistry>,
399 spawner: impl SpawnEssentialNamed,
400 best_block_hash: Block::Hash,
401 finalized_hash: Block::Hash,
402 ) -> Self {
403 let metrics = PrometheusMetrics::new(prometheus);
404 let (events_metrics_collector, event_metrics_task) =
405 EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
406
407 let (listener, listener_task) =
408 MultiViewListener::new_with_worker(events_metrics_collector.clone());
409 let listener = Arc::new(listener);
410
411 let (revalidation_queue, revalidation_task) =
412 revalidation_worker::RevalidationQueue::new_with_worker();
413
414 let (import_notification_sink, import_notification_sink_task) =
415 MultiViewImportNotificationSink::new_with_worker();
416
417 let (mempool, blocking_mempool_task) = TxMemPool::new(
418 pool_api.clone(),
419 listener.clone(),
420 metrics.clone(),
421 options.total_count(),
422 options.ready.total_bytes + options.future.total_bytes,
423 );
424 let mempool = Arc::from(mempool);
425
426 let (dropped_stream_controller, dropped_stream) =
427 MultiViewDroppedWatcherController::<ChainApi>::new();
428
429 let view_store = Arc::new(ViewStore::new(
430 pool_api.clone(),
431 listener,
432 dropped_stream_controller,
433 import_notification_sink.clone(),
434 ));
435
436 let dropped_monitor_task = Self::dropped_monitor_task(
437 dropped_stream,
438 mempool.clone(),
439 view_store.clone(),
440 import_notification_sink.clone(),
441 );
442
443 let combined_tasks = async move {
444 tokio::select! {
445 _ = listener_task => {}
446 _ = revalidation_task => {},
447 _ = import_notification_sink_task => {},
448 _ = dropped_monitor_task => {}
449 _ = event_metrics_task => {},
450 }
451 }
452 .boxed();
453 spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
454 spawner.spawn_essential_blocking(
455 "txpool-background",
456 Some("transaction-pool"),
457 blocking_mempool_task,
458 );
459
460 Self {
461 mempool,
462 api: pool_api,
463 view_store,
464 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
465 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
466 best_block_hash,
467 finalized_hash,
468 ))),
469 revalidation_queue: Arc::from(revalidation_queue),
470 import_notification_sink,
471 options,
472 metrics,
473 events_metrics_collector,
474 is_validator,
475 finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
476 included_transactions: Default::default(),
477 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
478 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
479 STAT_SLIDING_WINDOW,
480 )),
481 }
482 .inject_initial_view(best_block_hash)
483 }
484
485 pub fn api(&self) -> &ChainApi {
487 &self.api
488 }
489
490 pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
492 self.view_store.status()
493 }
494
495 pub fn active_views_count(&self) -> usize {
497 self.view_store.active_views.read().len()
498 }
499
500 pub fn inactive_views_count(&self) -> usize {
502 self.view_store.inactive_views.read().len()
503 }
504
505 fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
510 self.view_store
511 .active_views
512 .read()
513 .iter()
514 .map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
515 .collect()
516 }
517
518 pub fn has_view(&self, hash: &Block::Hash) -> bool {
520 self.view_store.active_views.read().contains_key(hash)
521 }
522
523 pub async fn mempool_len(&self) -> (usize, usize) {
527 self.mempool.unwatched_and_watched_count().await
528 }
529
530 pub fn futures_at(
534 &self,
535 at: Block::Hash,
536 ) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
537 self.view_store.futures_at(at)
538 }
539
540 pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
553 let start = Instant::now();
554 let api = self.api.clone();
555 debug!(
556 target: LOG_TARGET,
557 ?at,
558 "fatp::ready_at_light"
559 );
560
561 let at_number = self.api.resolve_block_number(at).ok();
562 let finalized_number = self
563 .api
564 .resolve_block_number(self.enactment_state.lock().recent_finalized_block())
565 .ok();
566
567 if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
570 let at_hn = HashAndNumber { hash: at, number: at_number };
571 finalized_number.and_then(|finalized_number| {
572 self.view_store
573 .find_view_descendent_up_to_number(&at_hn, finalized_number)
574 .map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
575 })
576 }) {
577 let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
578 let mut all_extrinsics = vec![];
579 for h in enacted_blocks {
580 let extrinsics = api
581 .block_body(h)
582 .await
583 .unwrap_or_else(|error| {
584 warn!(
585 target: LOG_TARGET,
586 %error,
587 "Compute ready light transactions: error request"
588 );
589 None
590 })
591 .unwrap_or_default()
592 .into_iter()
593 .map(|t| api.hash_and_length(&t).0);
594 all_extrinsics.extend(extrinsics);
595 }
596
597 let before_count = tmp_view.pool.validated_pool().status().ready;
598 let tags = tmp_view
599 .pool
600 .validated_pool()
601 .extrinsics_tags(&all_extrinsics)
602 .into_iter()
603 .flatten()
604 .flatten()
605 .collect::<Vec<_>>();
606 let _ = tmp_view.pool.validated_pool().prune_tags(tags);
607
608 let after_count = tmp_view.pool.validated_pool().status().ready;
609 debug!(
610 target: LOG_TARGET,
611 ?at,
612 best_view_hash = ?view.at.hash,
613 before_count,
614 to_be_removed = all_extrinsics.len(),
615 after_count,
616 duration = ?start.elapsed(),
617 "fatp::ready_at_light -> light"
618 );
619 Box::new(tmp_view.pool.validated_pool().ready())
620 } else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
621 debug!(
626 target: LOG_TARGET,
627 ?at,
628 duration = ?start.elapsed(),
629 "fatp::ready_at_light -> most_recent_view"
630 );
631 Box::new(most_recent_view.pool.validated_pool().ready())
632 } else {
633 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
634 debug!(
635 target: LOG_TARGET,
636 ?at,
637 duration = ?start.elapsed(),
638 "fatp::ready_at_light -> empty"
639 );
640 empty
641 }
642 }
643
644 async fn ready_at_with_timeout_internal(
655 &self,
656 at: Block::Hash,
657 timeout: std::time::Duration,
658 ) -> ReadyIteratorFor<ChainApi> {
659 debug!(
660 target: LOG_TARGET,
661 ?at,
662 ?timeout,
663 "fatp::ready_at_with_timeout"
664 );
665 let timeout = futures_timer::Delay::new(timeout);
666 let (view_already_exists, ready_at) = self.ready_at_internal(at);
667
668 if view_already_exists {
669 return ready_at.await;
670 }
671
672 let maybe_ready = async move {
673 select! {
674 ready = ready_at => Some(ready),
675 _ = timeout => {
676 debug!(
677 target: LOG_TARGET,
678 ?at,
679 "Timeout fired waiting for transaction pool at block. Proceeding with production."
680 );
681 None
682 }
683 }
684 };
685
686 let fall_back_ready = self.ready_at_light(at);
687 let (maybe_ready, fall_back_ready) =
688 futures::future::join(maybe_ready, fall_back_ready).await;
689 maybe_ready.unwrap_or(fall_back_ready)
690 }
691
692 fn ready_at_internal(
693 &self,
694 at: Block::Hash,
695 ) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
696 let mut ready_poll = self.ready_poll.lock();
697
698 if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
699 debug!(
700 target: LOG_TARGET,
701 ?at,
702 ?inactive,
703 "fatp::ready_at_internal"
704 );
705 let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
706 return (true, async move { iterator }.boxed());
707 }
708
709 let pending = ready_poll
710 .add(at)
711 .map(|received| {
712 received.unwrap_or_else(|error| {
713 warn!(
714 target: LOG_TARGET,
715 %error,
716 "Error receiving ready-set iterator"
717 );
718 Box::new(std::iter::empty())
719 })
720 })
721 .boxed();
722 debug!(
723 target: LOG_TARGET,
724 ?at,
725 pending_keys = ?ready_poll.pollers.keys(),
726 "fatp::ready_at_internal"
727 );
728 (false, pending)
729 }
730
731 async fn submit_and_watch_inner(
733 &self,
734 at: Block::Hash,
735 source: TransactionSource,
736 xt: TransactionFor<Self>,
737 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
738 let xt = Arc::from(xt);
739
740 let at_number = self
741 .api
742 .block_id_to_number(&BlockId::Hash(at))
743 .ok()
744 .flatten()
745 .unwrap_or_default()
746 .into()
747 .as_u64();
748
749 let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
750 Ok(result) => result,
751 Err(TxPoolApiError::ImmediatelyDropped) =>
752 self.attempt_transaction_replacement(source, at_number, true, xt.clone())
753 .await?,
754 Err(e) => return Err(e.into()),
755 };
756
757 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
758 self.events_metrics_collector.report_submitted(&insertion);
759
760 match self.view_store.submit_and_watch(at, insertion.source, xt).await {
761 Err(e) => {
762 self.mempool.remove_transactions(&[insertion.hash]).await;
763 Err(e.into())
764 },
765 Ok(mut outcome) => {
766 self.mempool
767 .update_transaction_priority(outcome.hash(), outcome.priority())
768 .await;
769 Ok(outcome.expect_watcher())
770 },
771 }
772 }
773
774 async fn submit_at_inner(
776 &self,
777 at: Block::Hash,
778 source: TransactionSource,
779 xts: Vec<TransactionFor<Self>>,
780 ) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
781 let at_number = self
782 .api
783 .block_id_to_number(&BlockId::Hash(at))
784 .ok()
785 .flatten()
786 .unwrap_or_default()
787 .into()
788 .as_u64();
789 let view_store = self.view_store.clone();
790 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
791 let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
792
793 if view_store.is_empty() {
794 return Ok(mempool_results
795 .into_iter()
796 .map(|r| r.map(|r| r.hash).map_err(Into::into))
797 .collect::<Vec<_>>())
798 }
799
800 let retries = mempool_results
802 .into_iter()
803 .zip(xts.clone())
804 .map(|(result, xt)| async move {
805 match result {
806 Err(TxPoolApiError::ImmediatelyDropped) =>
807 self.attempt_transaction_replacement(source, at_number, false, xt).await,
808 _ => result,
809 }
810 })
811 .collect::<Vec<_>>();
812
813 let mempool_results = futures::future::join_all(retries).await;
814
815 let to_be_submitted = mempool_results
817 .iter()
818 .zip(xts)
819 .filter_map(|(result, xt)| {
820 result.as_ref().ok().map(|insertion| {
821 self.events_metrics_collector.report_submitted(&insertion);
822 (insertion.source.clone(), xt)
823 })
824 })
825 .collect::<Vec<_>>();
826
827 self.metrics
828 .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
829
830 let mempool = self.mempool.clone();
833 let results_map = view_store.submit(to_be_submitted.into_iter()).await;
834 let mut submission_results = reduce_multiview_result(results_map).into_iter();
835
836 const RESULTS_ASSUMPTION : &str =
853 "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
854 let merged_results = mempool_results.into_iter().map(|result| {
855 result.map_err(Into::into).and_then(|insertion| {
856 Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
857 })
858 });
859
860 let mut final_results = vec![];
861 for r in merged_results {
862 match r {
863 Ok((hash, submission_result)) => match submission_result {
864 Ok(r) => {
865 mempool.update_transaction_priority(r.hash(), r.priority()).await;
866 final_results.push(Ok(r.hash()));
867 },
868 Err(e) => {
869 mempool.remove_transactions(&[hash]).await;
870 final_results.push(Err(e));
871 },
872 },
873 Err(e) => final_results.push(Err(e)),
874 }
875 }
876
877 Ok(final_results)
878 }
879
880 pub fn import_notification_sink_len(&self) -> usize {
884 self.import_notification_sink.notified_items_len()
885 }
886}
887
888fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
915 let mut values = input.values();
916 let Some(first) = values.next() else {
917 return Default::default();
918 };
919 let length = first.len();
920 debug_assert!(values.all(|x| length == x.len()));
921
922 input
923 .into_values()
924 .reduce(|mut agg_results, results| {
925 agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
926 if agg_r.is_err() {
927 *agg_r = r;
928 }
929 });
930 agg_results
931 })
932 .unwrap_or_default()
933}
934
935#[async_trait]
936impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
937where
938 Block: BlockT,
939 ChainApi: 'static + graph::ChainApi<Block = Block>,
940 <Block as BlockT>::Hash: Unpin,
941{
942 type Block = ChainApi::Block;
943 type Hash = ExtrinsicHash<ChainApi>;
944 type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
945 type Error = ChainApi::Error;
946
947 async fn submit_at(
954 &self,
955 at: <Self::Block as BlockT>::Hash,
956 source: TransactionSource,
957 xts: Vec<TransactionFor<Self>>,
958 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
959 let start = Instant::now();
960 trace!(
961 target: LOG_TARGET,
962 count = xts.len(),
963 active_views_count = self.active_views_count(),
964 "fatp::submit_at"
965 );
966 log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
967 let result = self.submit_at_inner(at, source, xts).await;
968 insert_and_log_throttled!(
969 Level::DEBUG,
970 target:LOG_TARGET_STAT,
971 prefix:"submit_stats",
972 self.submit_stats,
973 start.elapsed().into()
974 );
975 result
976 }
977
978 async fn submit_one(
982 &self,
983 _at: <Self::Block as BlockT>::Hash,
984 source: TransactionSource,
985 xt: TransactionFor<Self>,
986 ) -> Result<TxHash<Self>, Self::Error> {
987 trace!(
988 target: LOG_TARGET,
989 tx_hash = ?self.tx_hash(&xt),
990 active_views_count = self.active_views_count(),
991 "fatp::submit_one"
992 );
993 match self.submit_at(_at, source, vec![xt]).await {
994 Ok(mut v) =>
995 v.pop().expect("There is exactly one element in result of submit_at. qed."),
996 Err(e) => Err(e),
997 }
998 }
999
1000 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1005 async fn submit_and_watch(
1006 &self,
1007 at: <Self::Block as BlockT>::Hash,
1008 source: TransactionSource,
1009 xt: TransactionFor<Self>,
1010 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1011 let start = Instant::now();
1012 trace!(
1013 target: LOG_TARGET,
1014 tx_hash = ?self.tx_hash(&xt),
1015 views = self.active_views_count(),
1016 "fatp::submit_and_watch"
1017 );
1018 let result = self.submit_and_watch_inner(at, source, xt).await;
1019 insert_and_log_throttled!(
1020 Level::DEBUG,
1021 target:LOG_TARGET_STAT,
1022 prefix:"submit_and_watch_stats",
1023 self.submit_and_watch_stats,
1024 start.elapsed().into()
1025 );
1026 result
1027 }
1028
1029 async fn report_invalid(
1038 &self,
1039 at: Option<<Self::Block as BlockT>::Hash>,
1040 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1041 ) -> Vec<Arc<Self::InPoolTransaction>> {
1042 debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1043 log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1044 self.metrics
1045 .report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1046
1047 let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1048
1049 let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1050 self.mempool.remove_transactions(&removed_hashes).await;
1051 self.import_notification_sink.clean_notified_items(&removed_hashes);
1052
1053 self.metrics
1054 .report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1055
1056 removed
1057 }
1058
1059 fn status(&self) -> PoolStatus {
1067 self.view_store
1068 .most_recent_view
1069 .read()
1070 .as_ref()
1071 .map(|v| v.status())
1072 .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1073 }
1074
1075 fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1080 self.import_notification_sink.event_stream()
1081 }
1082
1083 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1085 self.api().hash_and_length(xt).0
1086 }
1087
1088 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1090 self.view_store.listener.transactions_broadcasted(propagations);
1091 }
1092
1093 fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1099 let most_recent_view_hash =
1100 self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1101 let result = most_recent_view_hash
1102 .and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1103 trace!(
1104 target: LOG_TARGET,
1105 ?tx_hash,
1106 is_ready = result.is_some(),
1107 most_recent_view = ?most_recent_view_hash,
1108 "ready_transaction"
1109 );
1110 result
1111 }
1112
1113 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1115 let (_, result) = self.ready_at_internal(at);
1116 result.await
1117 }
1118
1119 fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1124 self.view_store.ready()
1125 }
1126
1127 fn futures(&self) -> Vec<Self::InPoolTransaction> {
1132 self.view_store.futures()
1133 }
1134
1135 async fn ready_at_with_timeout(
1140 &self,
1141 at: <Self::Block as BlockT>::Hash,
1142 timeout: std::time::Duration,
1143 ) -> ReadyIteratorFor<ChainApi> {
1144 self.ready_at_with_timeout_internal(at, timeout).await
1145 }
1146}
1147
1148impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1149 for ForkAwareTxPool<ChainApi, Block>
1150where
1151 Block: BlockT,
1152 ChainApi: 'static + graph::ChainApi<Block = Block>,
1153 <Block as BlockT>::Hash: Unpin,
1154{
1155 type Block = Block;
1156 type Hash = ExtrinsicHash<ChainApi>;
1157 type Error = ChainApi::Error;
1158
1159 fn submit_local(
1160 &self,
1161 at: Block::Hash,
1162 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1163 ) -> Result<Self::Hash, Self::Error> {
1164 trace!(
1165 target: LOG_TARGET,
1166 active_views_count = self.active_views_count(),
1167 "fatp::submit_local"
1168 );
1169 let xt = Arc::from(xt);
1170 let at_number = self
1171 .api
1172 .block_id_to_number(&BlockId::Hash(at))
1173 .ok()
1174 .flatten()
1175 .unwrap_or_default()
1176 .into()
1177 .as_u64();
1178
1179 let result = self
1181 .mempool
1182 .clone()
1183 .extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1184 .remove(0);
1185
1186 let insertion = match result {
1187 Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1188 TransactionSource::Local,
1189 false,
1190 xt.clone(),
1191 ),
1192 _ => result,
1193 }?;
1194
1195 self.view_store
1196 .submit_local(xt)
1197 .inspect_err(|_| {
1198 self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1199 })
1200 .map(|outcome| {
1201 self.mempool
1202 .clone()
1203 .update_transaction_priority_sync(outcome.hash(), outcome.priority());
1204 outcome.hash()
1205 })
1206 .or_else(|_| Ok(insertion.hash))
1207 }
1208}
1209
1210impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1211where
1212 Block: BlockT,
1213 ChainApi: graph::ChainApi<Block = Block> + 'static,
1214 <Block as BlockT>::Hash: Unpin,
1215{
1216 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1224 async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1225 let hash_and_number = match tree_route.last() {
1226 Some(hash_and_number) => hash_and_number,
1227 None => {
1228 warn!(
1229 target: LOG_TARGET,
1230 ?tree_route,
1231 "Skipping ChainEvent - no last block in tree route"
1232 );
1233 return
1234 },
1235 };
1236
1237 if self.has_view(&hash_and_number.hash) {
1238 debug!(
1239 target: LOG_TARGET,
1240 ?hash_and_number,
1241 "view already exists for block"
1242 );
1243 return
1244 }
1245
1246 let best_view = self.view_store.find_best_view(tree_route);
1247 let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1248
1249 if let Some(view) = new_view {
1250 {
1251 let view = view.clone();
1252 self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1253 Box::from(view.pool.validated_pool().ready())
1254 });
1255 }
1256
1257 View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1258 }
1259
1260 self.finality_stall_cleanup(hash_and_number).await;
1261 }
1262
1263 async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1273 let (oldest_block_number, finality_timedout_blocks) = {
1274 let mut included_transactions = self.included_transactions.lock();
1275
1276 let Some(oldest_block_number) =
1277 included_transactions.first_key_value().map(|(k, _)| k.number)
1278 else {
1279 return
1280 };
1281
1282 if at.number.saturating_sub(oldest_block_number).into() <=
1283 self.finality_timeout_threshold.into()
1284 {
1285 return
1286 }
1287
1288 let mut finality_timedout_blocks =
1289 indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1290
1291 included_transactions.retain(
1292 |HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1293 let diff = at.number.saturating_sub(*view_number);
1294 if diff.into() > self.finality_timeout_threshold.into() {
1295 finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1296 false
1297 } else {
1298 true
1299 }
1300 },
1301 );
1302
1303 (oldest_block_number, finality_timedout_blocks)
1304 };
1305
1306 if !finality_timedout_blocks.is_empty() {
1307 self.ready_poll.lock().remove_cancelled();
1308 self.view_store.listener.remove_stale_controllers();
1309 }
1310
1311 let finality_timedout_blocks_len = finality_timedout_blocks.len();
1312
1313 for (block_hash, tx_hashes) in finality_timedout_blocks {
1314 self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1315
1316 self.mempool.remove_transactions(&tx_hashes).await;
1317 self.import_notification_sink.clean_notified_items(&tx_hashes);
1318 self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1319 }
1320
1321 self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1322
1323 debug!(
1324 target: LOG_TARGET,
1325 ?at,
1326 included_transactions_len = ?self.included_transactions.lock().len(),
1327 finality_timedout_blocks_len,
1328 ?oldest_block_number,
1329 "finality_stall_cleanup"
1330 );
1331 }
1332
1333 fn build_and_plug_view(
1342 &self,
1343 origin_view: Option<Arc<View<ChainApi>>>,
1344 at: &HashAndNumber<Block>,
1345 tree_route: &TreeRoute<Block>,
1346 ) -> View<ChainApi> {
1347 let enter = Instant::now();
1348 let (view, view_dropped_stream, view_aggregated_stream) =
1349 if let Some(origin_view) = origin_view {
1350 let (mut view, view_dropped_stream, view_aggragated_stream) =
1351 View::new_from_other(&origin_view, at);
1352 if !tree_route.retracted().is_empty() {
1353 view.pool.clear_recently_pruned();
1354 }
1355 (view, view_dropped_stream, view_aggragated_stream)
1356 } else {
1357 debug!(
1358 target: LOG_TARGET,
1359 ?at,
1360 "creating non-cloned view"
1361 );
1362 View::new(
1363 self.api.clone(),
1364 at.clone(),
1365 self.options.clone(),
1366 self.metrics.clone(),
1367 self.is_validator.clone(),
1368 )
1369 };
1370 debug!(
1371 target: LOG_TARGET,
1372 ?at,
1373 duration = ?enter.elapsed(),
1374 "build_new_view::clone_view"
1375 );
1376
1377 self.import_notification_sink.add_view(
1380 view.at.hash,
1381 view.pool.validated_pool().import_notification_stream().boxed(),
1382 );
1383
1384 self.view_store
1385 .dropped_stream_controller
1386 .add_view(view.at.hash, view_dropped_stream.boxed());
1387
1388 self.view_store
1389 .listener
1390 .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1391
1392 view
1393 }
1394
1395 async fn build_and_update_view(
1403 &self,
1404 origin_view: Option<Arc<View<ChainApi>>>,
1405 at: &HashAndNumber<Block>,
1406 tree_route: &TreeRoute<Block>,
1407 ) -> Option<Arc<View<ChainApi>>> {
1408 let start = Instant::now();
1409 debug!(
1410 target: LOG_TARGET,
1411 ?at,
1412 origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1413 ?tree_route,
1414 "build_new_view"
1415 );
1416
1417 let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1418
1419 view.pool.validated_pool().retrigger_notifications();
1422 debug!(
1423 target: LOG_TARGET,
1424 ?at,
1425 duration = ?start.elapsed(),
1426 "register_listeners"
1427 );
1428
1429 let start = Instant::now();
1432 self.update_view_with_fork(&view, tree_route, at.clone()).await;
1433 debug!(
1434 target: LOG_TARGET,
1435 ?at,
1436 duration = ?start.elapsed(),
1437 "update_view_with_fork"
1438 );
1439
1440 let start = Instant::now();
1442 self.update_view_with_mempool(&mut view).await;
1443 debug!(
1444 target: LOG_TARGET,
1445 ?at,
1446 duration= ?start.elapsed(),
1447 "update_view_with_mempool"
1448 );
1449 let view = Arc::from(view);
1450 self.view_store.insert_new_view(view.clone(), tree_route).await;
1451
1452 debug!(
1453 target: LOG_TARGET,
1454 duration = ?start.elapsed(),
1455 ?at,
1456 "build_new_view"
1457 );
1458 Some(view)
1459 }
1460
1461 async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1466 if let Some(txs) = self.included_transactions.lock().get(at) {
1467 return txs.clone()
1468 };
1469
1470 debug!(
1471 target: LOG_TARGET,
1472 ?at,
1473 "fetch_block_transactions from api"
1474 );
1475
1476 self.api
1477 .block_body(at.hash)
1478 .await
1479 .unwrap_or_else(|error| {
1480 warn!(
1481 target: LOG_TARGET,
1482 %error,
1483 "fetch_block_transactions: error request"
1484 );
1485 None
1486 })
1487 .unwrap_or_default()
1488 .into_iter()
1489 .map(|t| self.hash_of(&t))
1490 .collect::<Vec<_>>()
1491 }
1492
1493 async fn txs_included_since_finalized(
1498 &self,
1499 at: &HashAndNumber<Block>,
1500 ) -> HashSet<TxHash<Self>> {
1501 let start = Instant::now();
1502 let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1503
1504 let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1505 return Default::default()
1506 };
1507
1508 let mut all_txs = HashSet::new();
1509
1510 for block in tree_route.enacted().iter() {
1511 if at.number.saturating_sub(block.number).into() <=
1515 self.finality_timeout_threshold.into()
1516 {
1517 all_txs.extend(self.fetch_block_transactions(block).await);
1518 }
1519 }
1520
1521 debug!(
1522 target: LOG_TARGET,
1523 ?at,
1524 ?recent_finalized_block,
1525 extrinsics_count = all_txs.len(),
1526 duration = ?start.elapsed(),
1527 "fatp::txs_included_since_finalized"
1528 );
1529 all_txs
1530 }
1531
1532 async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1541 let xts_count = self.mempool.unwatched_and_watched_count().await;
1542 debug!(
1543 target: LOG_TARGET,
1544 view_at = ?view.at,
1545 ?xts_count,
1546 active_views_count = self.active_views_count(),
1547 "update_view_with_mempool"
1548 );
1549 let included_xts = self.txs_included_since_finalized(&view.at).await;
1550
1551 let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1552 .mempool
1553 .with_transactions(|iter| {
1554 iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1555 .map(|(k, v)| (*k, v.clone()))
1556 .take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1558 .collect::<HashMap<_, _>>()
1559 })
1560 .await
1561 .into_iter()
1562 .map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1563 .unzip();
1564
1565 let results = view
1566 .submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1567 .await
1568 .into_iter()
1569 .zip(hashes)
1570 .map(|(result, tx_hash)| async move {
1571 if let Ok(outcome) = result {
1572 Ok(self
1573 .mempool
1574 .update_transaction_priority(outcome.hash(), outcome.priority())
1575 .await)
1576 } else {
1577 Err(tx_hash)
1578 }
1579 })
1580 .collect::<Vec<_>>();
1581
1582 let results = futures::future::join_all(results).await;
1583
1584 let submitted_count = results.len();
1585
1586 debug!(
1587 target: LOG_TARGET,
1588 view_at_hash = ?view.at.hash,
1589 submitted_count,
1590 mempool_len = self.mempool.len(),
1591 "update_view_with_mempool"
1592 );
1593
1594 self.metrics
1595 .report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1596
1597 if self.view_store.is_empty() {
1600 for result in results {
1601 if let Err(tx_hash) = result {
1602 self.view_store.listener.transactions_invalidated(&[tx_hash]);
1603 self.mempool.remove_transactions(&[tx_hash]).await;
1604 }
1605 }
1606 }
1607 }
1608
1609 async fn collect_provides_tags_from_view_store(
1616 &self,
1617 tree_route: &TreeRoute<Block>,
1618 xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1619 ) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1620 let blocks_hashes = tree_route
1621 .retracted()
1622 .iter()
1623 .skip(1)
1625 .chain(
1627 std::iter::once(tree_route.common_block())
1628 .chain(tree_route.enacted().iter().rev().skip(1)),
1629 )
1630 .collect::<Vec<&HashAndNumber<Block>>>();
1631
1632 self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1633 }
1634
1635 pub async fn collect_extrinsics(
1637 &self,
1638 blocks: &[HashAndNumber<Block>],
1639 ) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1640 future::join_all(blocks.iter().map(|hn| async move {
1641 (
1642 hn.hash,
1643 self.api
1644 .block_body(hn.hash)
1645 .await
1646 .unwrap_or_else(|e| {
1647 warn!(target: LOG_TARGET, %e, ": block_body error request");
1648 None
1649 })
1650 .unwrap_or_default(),
1651 )
1652 }))
1653 .await
1654 .into_iter()
1655 .collect()
1656 }
1657
1658 async fn update_view_with_fork(
1663 &self,
1664 view: &View<ChainApi>,
1665 tree_route: &TreeRoute<Block>,
1666 hash_and_number: HashAndNumber<Block>,
1667 ) {
1668 debug!(
1669 target: LOG_TARGET,
1670 ?tree_route,
1671 at = ?view.at,
1672 "update_view_with_fork"
1673 );
1674 let api = self.api.clone();
1675
1676 let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1678
1679 let known_provides_tags = Arc::new(
1682 self.collect_provides_tags_from_view_store(
1683 tree_route,
1684 extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1685 )
1686 .await,
1687 );
1688
1689 debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1690
1691 let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1694 future::join_all(tree_route.enacted().iter().map(|hn| {
1695 let api = api.clone();
1696 let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1697 let known_provides_tags = known_provides_tags.clone();
1698 async move {
1699 (
1700 hn,
1701 crate::prune_known_txs_for_block(
1702 hn,
1703 &*api,
1704 &view.pool,
1705 Some(xts),
1706 Some(known_provides_tags),
1707 )
1708 .await,
1709 )
1710 }
1711 }))
1712 .await
1713 .into_iter()
1714 .for_each(|(key, enacted_log)| {
1715 pruned_log.extend(enacted_log.clone());
1716 self.included_transactions.lock().insert(key.clone(), enacted_log);
1717 });
1718
1719 let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1720 self.metrics
1721 .report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1722
1723 {
1725 let mut resubmit_transactions = Vec::new();
1726
1727 for retracted in tree_route.retracted() {
1728 let hash = retracted.hash;
1729
1730 let block_transactions = api
1731 .block_body(hash)
1732 .await
1733 .unwrap_or_else(|error| {
1734 warn!(
1735 target: LOG_TARGET,
1736 %error,
1737 "Failed to fetch block body"
1738 );
1739 None
1740 })
1741 .unwrap_or_default()
1742 .into_iter();
1743
1744 let mut resubmitted_to_report = 0;
1745
1746 let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1747 |(tx_hash, _)| {
1748 let contains = pruned_log.contains(&tx_hash);
1749
1750 resubmitted_to_report += 1;
1752
1753 if !contains {
1754 trace!(
1755 target: LOG_TARGET,
1756 ?tx_hash,
1757 ?hash,
1758 "Resubmitting from retracted block"
1759 );
1760 }
1761 !contains
1762 },
1763 );
1764 let mut result = vec![];
1765 for (tx_hash, tx) in txs {
1766 result.push(
1767 self.mempool
1769 .get_by_hash(tx_hash)
1770 .await
1771 .map(|tx| (tx.source(), tx.tx()))
1772 .unwrap_or_else(|| {
1773 (TimedTransactionSource::new_external(true), Arc::from(tx))
1776 }),
1777 );
1778 }
1779 resubmit_transactions.extend(result);
1780
1781 self.metrics.report(|metrics| {
1782 metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1783 });
1784 }
1785
1786 let _ = view
1787 .pool
1788 .resubmit_at(
1789 &hash_and_number,
1790 resubmit_transactions,
1791 ValidateTransactionPriority::Maintained,
1792 )
1793 .await;
1794 }
1795 }
1796
1797 async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1803 let start = Instant::now();
1804 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1805 debug!(
1806 target: LOG_TARGET,
1807 ?finalized_number,
1808 ?tree_route,
1809 active_views_count = self.active_views_count(),
1810 "handle_finalized"
1811 );
1812 let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1813
1814 self.mempool.purge_finalized_transactions(&finalized_xts).await;
1815 self.import_notification_sink.clean_notified_items(&finalized_xts);
1816
1817 self.metrics
1818 .report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1819
1820 if let Ok(Some(finalized_number)) = finalized_number {
1821 self.included_transactions
1822 .lock()
1823 .retain(|cached_block, _| finalized_number < cached_block.number);
1824 self.revalidation_queue
1825 .revalidate_mempool(
1826 self.mempool.clone(),
1827 self.view_store.clone(),
1828 HashAndNumber { hash: finalized_hash, number: finalized_number },
1829 )
1830 .await;
1831 } else {
1832 debug!(
1833 target: LOG_TARGET,
1834 ?finalized_number,
1835 "handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1836 );
1837 }
1838
1839 self.ready_poll.lock().remove_cancelled();
1840
1841 debug!(
1842 target: LOG_TARGET,
1843 active_views_count = self.active_views_count(),
1844 included_transactions_len = ?self.included_transactions.lock().len(),
1845 duration = ?start.elapsed(),
1846 "handle_finalized after"
1847 );
1848 }
1849
1850 fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1852 self.api.hash_and_length(xt).0
1853 }
1854
1855 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1865 async fn attempt_transaction_replacement(
1866 &self,
1867 source: TransactionSource,
1868 at_number: u64,
1869 watched: bool,
1870 xt: ExtrinsicFor<ChainApi>,
1871 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1872 let best_view = self
1873 .view_store
1874 .most_recent_view
1875 .read()
1876 .as_ref()
1877 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1878 .clone();
1879
1880 let (xt_hash, validated_tx) = best_view
1881 .pool
1882 .verify_one(
1883 best_view.at.hash,
1884 best_view.at.number,
1885 TimedTransactionSource::from_transaction_source(source, false),
1886 xt.clone(),
1887 crate::graph::CheckBannedBeforeVerify::Yes,
1888 ValidateTransactionPriority::Submitted,
1889 )
1890 .await;
1891
1892 let Some(priority) = validated_tx.priority() else {
1893 return Err(TxPoolApiError::ImmediatelyDropped)
1894 };
1895
1896 let insertion_info = self
1897 .mempool
1898 .try_insert_with_replacement(xt, priority, source, at_number, watched)
1899 .await?;
1900 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1901 }
1902
1903 fn attempt_transaction_replacement_sync(
1905 &self,
1906 source: TransactionSource,
1907 watched: bool,
1908 xt: ExtrinsicFor<ChainApi>,
1909 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1910 let HashAndNumber { number: at_number, hash: at_hash } = self
1911 .view_store
1912 .most_recent_view
1913 .read()
1914 .as_ref()
1915 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1916 .at;
1917
1918 let ValidTransaction { priority, .. } = self
1919 .api
1920 .validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1921 .map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1922 .map_err(|e| match e {
1923 TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1924 TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1925 })?;
1926 let xt_hash = self.hash_of(&xt);
1927
1928 let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1929 xt,
1930 priority,
1931 source,
1932 at_number.into().as_u64(),
1933 watched,
1934 )?;
1935 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1936 }
1937
1938 fn post_attempt_transaction_replacement(
1939 &self,
1940 tx_hash: ExtrinsicHash<ChainApi>,
1941 insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1942 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1943 for worst_hash in &insertion_info.removed {
1944 trace!(
1945 target: LOG_TARGET,
1946 tx_hash = ?worst_hash,
1947 new_tx_hash = ?tx_hash,
1948 "removed: replaced by"
1949 );
1950 self.view_store
1951 .listener
1952 .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1953
1954 self.view_store
1955 .remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1956 listener.limits_enforced(&removed_tx_hash);
1957 });
1958 }
1959
1960 return Ok(insertion_info)
1961 }
1962}
1963
1964#[async_trait]
1965impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1966where
1967 Block: BlockT,
1968 ChainApi: 'static + graph::ChainApi<Block = Block>,
1969 <Block as BlockT>::Hash: Unpin,
1970{
1971 async fn maintain(&self, event: ChainEvent<Self::Block>) {
1973 let start = Instant::now();
1974 debug!(
1975 target: LOG_TARGET,
1976 ?event,
1977 "processing event"
1978 );
1979
1980 self.view_store.finish_background_revalidations().await;
1981
1982 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1983
1984 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1985 match self.api.tree_route(from, to) {
1986 Ok(tree_route) => Ok(tree_route),
1987 Err(e) =>
1988 return Err(format!(
1989 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1990 )),
1991 }
1992 };
1993 let block_id_to_number =
1994 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1995
1996 let result =
1997 self.enactment_state
1998 .lock()
1999 .update(&event, &compute_tree_route, &block_id_to_number);
2000
2001 match result {
2002 Err(error) => {
2003 debug!(
2004 target: LOG_TARGET,
2005 %error,
2006 "enactment_state::update error"
2007 );
2008 self.enactment_state.lock().force_update(&event);
2009 },
2010 Ok(EnactmentAction::Skip) => return,
2011 Ok(EnactmentAction::HandleFinalization) => {
2012 },
2020 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2021 self.handle_new_block(&tree_route).await;
2022 },
2023 };
2024
2025 match event {
2026 ChainEvent::NewBestBlock { .. } => {},
2027 ChainEvent::Finalized { hash, ref tree_route } => {
2028 self.handle_finalized(hash, tree_route).await;
2029
2030 debug!(
2031 target: LOG_TARGET,
2032 ?tree_route,
2033 ?prev_finalized_block,
2034 "on-finalized enacted"
2035 );
2036 },
2037 }
2038
2039 let duration = start.elapsed();
2040 let mempool_len = self.mempool_len().await;
2041 info!(
2042 target: LOG_TARGET,
2043 txs = ?mempool_len,
2044 a = self.active_views_count(),
2045 i = self.inactive_views_count(),
2046 views = ?self.views_stats(),
2047 ?event,
2048 ?duration,
2049 "maintain"
2050 );
2051
2052 self.metrics.report(|metrics| {
2053 let (unwatched, watched) = mempool_len;
2054 let _ = (
2055 self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2056 self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2057 watched.try_into().map(|v| metrics.watched_txs.set(v)),
2058 unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2059 );
2060 metrics.maintain_duration.observe(duration.as_secs_f64());
2061 });
2062 }
2063}
2064
2065impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2066where
2067 Block: BlockT,
2068 Client: sp_api::ProvideRuntimeApi<Block>
2069 + sc_client_api::BlockBackend<Block>
2070 + sc_client_api::blockchain::HeaderBackend<Block>
2071 + sp_runtime::traits::BlockIdTo<Block>
2072 + sc_client_api::ExecutorProvider<Block>
2073 + sc_client_api::UsageProvider<Block>
2074 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2075 + Send
2076 + Sync
2077 + 'static,
2078 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2079 <Block as BlockT>::Hash: std::marker::Unpin,
2080{
2081 pub fn new_full(
2083 options: Options,
2084 is_validator: IsValidator,
2085 prometheus: Option<&PrometheusRegistry>,
2086 spawner: impl SpawnEssentialNamed,
2087 client: Arc<Client>,
2088 ) -> Self {
2089 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2090 let pool = Self::new_with_background_worker(
2091 options,
2092 is_validator,
2093 pool_api,
2094 prometheus,
2095 spawner,
2096 client.usage_info().chain.best_hash,
2097 client.usage_info().chain.finalized_hash,
2098 );
2099
2100 pool
2101 }
2102}
2103
2104#[cfg(test)]
2105mod reduce_multiview_result_tests {
2106 use super::*;
2107 use sp_core::H256;
2108 #[derive(Debug, PartialEq, Clone)]
2109 enum Error {
2110 Custom(u8),
2111 }
2112
2113 #[test]
2114 fn empty() {
2115 sp_tracing::try_init_simple();
2116 let input = HashMap::default();
2117 let r = reduce_multiview_result::<H256, H256, Error>(input);
2118 assert!(r.is_empty());
2119 }
2120
2121 #[test]
2122 fn errors_only() {
2123 sp_tracing::try_init_simple();
2124 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2125 (
2126 H256::repeat_byte(0x13),
2127 vec![
2128 Err(Error::Custom(10)),
2129 Err(Error::Custom(11)),
2130 Err(Error::Custom(12)),
2131 Err(Error::Custom(13)),
2132 ],
2133 ),
2134 (
2135 H256::repeat_byte(0x14),
2136 vec![
2137 Err(Error::Custom(20)),
2138 Err(Error::Custom(21)),
2139 Err(Error::Custom(22)),
2140 Err(Error::Custom(23)),
2141 ],
2142 ),
2143 (
2144 H256::repeat_byte(0x15),
2145 vec![
2146 Err(Error::Custom(30)),
2147 Err(Error::Custom(31)),
2148 Err(Error::Custom(32)),
2149 Err(Error::Custom(33)),
2150 ],
2151 ),
2152 ];
2153 let input = HashMap::from_iter(v.clone());
2154 let r = reduce_multiview_result(input);
2155
2156 assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2158 }
2159
2160 #[test]
2161 #[should_panic]
2162 #[cfg(debug_assertions)]
2163 fn invalid_lengths() {
2164 sp_tracing::try_init_simple();
2165 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2166 (H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2167 (H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2168 ];
2169 let input = HashMap::from_iter(v);
2170 let _ = reduce_multiview_result(input);
2171 }
2172
2173 #[test]
2174 fn only_hashes() {
2175 sp_tracing::try_init_simple();
2176
2177 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2178 (
2179 H256::repeat_byte(0x13),
2180 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2181 ),
2182 (
2183 H256::repeat_byte(0x14),
2184 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2185 ),
2186 ];
2187 let input = HashMap::from_iter(v);
2188 let r = reduce_multiview_result(input);
2189
2190 assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2191 }
2192
2193 #[test]
2194 fn one_view() {
2195 sp_tracing::try_init_simple();
2196 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2197 H256::repeat_byte(0x13),
2198 vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2199 )];
2200 let input = HashMap::from_iter(v);
2201 let r = reduce_multiview_result(input);
2202
2203 assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2204 }
2205
2206 #[test]
2207 fn mix() {
2208 sp_tracing::try_init_simple();
2209 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2210 (
2211 H256::repeat_byte(0x13),
2212 vec![
2213 Ok(H256::repeat_byte(0x10)),
2214 Err(Error::Custom(11)),
2215 Err(Error::Custom(12)),
2216 Err(Error::Custom(33)),
2217 ],
2218 ),
2219 (
2220 H256::repeat_byte(0x14),
2221 vec![
2222 Err(Error::Custom(20)),
2223 Ok(H256::repeat_byte(0x21)),
2224 Err(Error::Custom(22)),
2225 Err(Error::Custom(33)),
2226 ],
2227 ),
2228 (
2229 H256::repeat_byte(0x15),
2230 vec![
2231 Err(Error::Custom(30)),
2232 Err(Error::Custom(31)),
2233 Ok(H256::repeat_byte(0x32)),
2234 Err(Error::Custom(33)),
2235 ],
2236 ),
2237 ];
2238 let input = HashMap::from_iter(v);
2239 let r = reduce_multiview_result(input);
2240
2241 assert_eq!(
2242 r,
2243 vec![
2244 Ok(H256::repeat_byte(0x10)),
2245 Ok(H256::repeat_byte(0x21)),
2246 Ok(H256::repeat_byte(0x32)),
2247 Err(Error::Custom(33))
2248 ]
2249 );
2250 }
2251}