1use super::{
22 dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
23 import_notification_sink::MultiViewImportNotificationSink,
24 metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
25 multi_view_listener::MultiViewListener,
26 tx_mem_pool::{InsertionInfo, TxMemPool},
27 view::{ImportedStatus, View},
28 view_store::ViewStore,
29};
30use crate::{
31 api::FullChainApi,
32 common::{
33 sliding_stat::DurationSlidingStats,
34 tracing_log_xt::{log_xt_debug, log_xt_trace},
35 STAT_SLIDING_WINDOW,
36 },
37 enactment_state::{EnactmentAction, EnactmentState},
38 fork_aware_txpool::{
39 dropped_watcher::{DroppedReason, DroppedTransaction},
40 revalidation_worker,
41 },
42 graph::{
43 self,
44 base_pool::{TimedTransactionSource, Transaction},
45 BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
46 },
47 insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
48 LOG_TARGET_STAT,
49};
50use async_trait::async_trait;
51use futures::{
52 channel::oneshot,
53 future::{self},
54 prelude::*,
55 FutureExt,
56};
57use parking_lot::Mutex;
58use prometheus_endpoint::Registry as PrometheusRegistry;
59use sc_transaction_pool_api::{
60 error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
61 MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
62 TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
63};
64use sp_blockchain::{HashAndNumber, TreeRoute};
65use sp_core::traits::SpawnEssentialNamed;
66use sp_runtime::{
67 generic::BlockId,
68 traits::{Block as BlockT, NumberFor},
69 transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
70 Saturating,
71};
72use std::{
73 collections::{BTreeMap, HashMap, HashSet},
74 pin::Pin,
75 sync::Arc,
76 time::{Duration, Instant},
77};
78use tokio::select;
79use tracing::{debug, instrument, trace, warn, Level};
80
81const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
85
86const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
91
92pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
94
95struct ReadyPoll<T, Block>
98where
99 Block: BlockT,
100{
101 pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
102}
103
104impl<T, Block> ReadyPoll<T, Block>
105where
106 Block: BlockT,
107{
108 fn new() -> Self {
110 Self { pollers: Default::default() }
111 }
112
113 fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
116 let (s, r) = oneshot::channel();
117 self.pollers.entry(at).or_default().push(s);
118 r
119 }
120
121 fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
126 debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
127 let Some(pollers) = self.pollers.remove(&at) else { return };
128 pollers.into_iter().for_each(|p| {
129 debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
130 let _ = p.send(ready_iterator());
131 });
132 }
133
134 fn remove_cancelled(&mut self) {
136 self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
137 }
138}
139
140pub struct ForkAwareTxPool<ChainApi, Block>
144where
145 Block: BlockT,
146 ChainApi: graph::ChainApi<Block = Block> + 'static,
147{
148 api: Arc<ChainApi>,
150
151 mempool: Arc<TxMemPool<ChainApi, Block>>,
153
154 view_store: Arc<ViewStore<ChainApi, Block>>,
156
157 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
159
160 metrics: PrometheusMetrics,
162
163 events_metrics_collector: EventsMetricsCollector<ChainApi>,
165
166 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
168
169 revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
171
172 import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
175
176 options: Options,
178
179 is_validator: IsValidator,
181
182 finality_timeout_threshold: usize,
188
189 included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
197
198 submit_stats: DurationSlidingStats,
200
201 submit_and_watch_stats: DurationSlidingStats,
203}
204
205impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
206where
207 Block: BlockT,
208 ChainApi: graph::ChainApi<Block = Block> + 'static,
209 <Block as BlockT>::Hash: Unpin,
210{
211 fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
215 if let Some(block_number) =
216 self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
217 {
218 let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
219 let tree_route =
220 &TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
221 let view = self.build_and_plug_view(None, &at_best, &tree_route);
222 self.view_store.insert_new_view_sync(view.into(), &tree_route);
223 trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
224 };
225 self
226 }
227
228 pub fn new_test(
231 pool_api: Arc<ChainApi>,
232 best_block_hash: Block::Hash,
233 finalized_hash: Block::Hash,
234 finality_timeout_threshold: Option<usize>,
235 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
236 Self::new_test_with_limits(
237 pool_api,
238 best_block_hash,
239 finalized_hash,
240 Options::default().ready,
241 Options::default().future,
242 usize::MAX,
243 finality_timeout_threshold,
244 )
245 }
246
247 pub fn new_test_with_limits(
250 pool_api: Arc<ChainApi>,
251 best_block_hash: Block::Hash,
252 finalized_hash: Block::Hash,
253 ready_limits: crate::PoolLimit,
254 future_limits: crate::PoolLimit,
255 mempool_max_transactions_count: usize,
256 finality_timeout_threshold: Option<usize>,
257 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
258 let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
259 let listener = Arc::new(listener);
260
261 let (import_notification_sink, import_notification_sink_task) =
262 MultiViewImportNotificationSink::new_with_worker();
263
264 let (mempool, mempool_task) = TxMemPool::new(
265 pool_api.clone(),
266 listener.clone(),
267 Default::default(),
268 mempool_max_transactions_count,
269 ready_limits.total_bytes + future_limits.total_bytes,
270 );
271 let mempool = Arc::from(mempool);
272
273 let (dropped_stream_controller, dropped_stream) =
274 MultiViewDroppedWatcherController::<ChainApi>::new();
275
276 let view_store = Arc::new(ViewStore::new(
277 pool_api.clone(),
278 listener,
279 dropped_stream_controller,
280 import_notification_sink.clone(),
281 ));
282
283 let dropped_monitor_task = Self::dropped_monitor_task(
284 dropped_stream,
285 mempool.clone(),
286 view_store.clone(),
287 import_notification_sink.clone(),
288 );
289
290 let combined_tasks = async move {
291 tokio::select! {
292 _ = listener_task => {},
293 _ = import_notification_sink_task => {},
294 _ = dropped_monitor_task => {}
295 }
296 }
297 .boxed();
298
299 let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
300
301 (
302 Self {
303 mempool,
304 api: pool_api,
305 view_store,
306 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
307 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
308 best_block_hash,
309 finalized_hash,
310 ))),
311 revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
312 import_notification_sink,
313 options,
314 is_validator: false.into(),
315 metrics: Default::default(),
316 events_metrics_collector: EventsMetricsCollector::default(),
317 finality_timeout_threshold: finality_timeout_threshold
318 .unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
319 included_transactions: Default::default(),
320 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
321 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
322 STAT_SLIDING_WINDOW,
323 )),
324 }
325 .inject_initial_view(best_block_hash),
326 [combined_tasks, mempool_task],
327 )
328 }
329
330 async fn dropped_monitor_task(
338 mut dropped_stream: StreamOfDropped<ChainApi>,
339 mempool: Arc<TxMemPool<ChainApi, Block>>,
340 view_store: Arc<ViewStore<ChainApi, Block>>,
341 import_notification_sink: MultiViewImportNotificationSink<
342 Block::Hash,
343 ExtrinsicHash<ChainApi>,
344 >,
345 ) {
346 let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
347 loop {
348 let Some(dropped) = dropped_stream.next().await else {
349 debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
350 break;
351 };
352 let start = Instant::now();
353 let tx_hash = dropped.tx_hash;
354 trace!(
355 target: LOG_TARGET,
356 ?tx_hash,
357 reason = ?dropped.reason,
358 "fatp::dropped notification, removing"
359 );
360 match dropped.reason {
361 DroppedReason::Usurped(new_tx_hash) => {
362 if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
363 view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
364 } else {
365 trace!(
366 target: LOG_TARGET,
367 tx_hash = ?new_tx_hash,
368 "error: dropped_monitor_task: no entry in mempool for new transaction"
369 );
370 };
371 },
372 DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
373 view_store.remove_transaction_subtree(tx_hash, |_, _| {});
374 },
375 DroppedReason::Viewless => {
376 if let Some(tx) = mempool.get_by_hash(tx_hash).await {
377 trace!(
378 target: LOG_TARGET,
379 ?tx_hash,
380 "dropped_monitor_task: transaction became viewless, marking for unban"
381 );
382 tx.set_needs_unban();
383 }
384 continue;
385 },
386 };
387
388 mempool.remove_transactions(&[tx_hash]).await;
389 import_notification_sink.clean_notified_items(&[tx_hash]);
390 view_store.listener.transaction_dropped(dropped);
391 insert_and_log_throttled!(
392 Level::DEBUG,
393 target:LOG_TARGET_STAT,
394 prefix:"dropped_stats",
395 dropped_stats,
396 start.elapsed().into()
397 );
398 }
399 }
400
401 pub fn new_with_background_worker(
406 options: Options,
407 is_validator: IsValidator,
408 pool_api: Arc<ChainApi>,
409 prometheus: Option<&PrometheusRegistry>,
410 spawner: impl SpawnEssentialNamed,
411 best_block_hash: Block::Hash,
412 finalized_hash: Block::Hash,
413 ) -> Self {
414 let metrics = PrometheusMetrics::new(prometheus);
415 let (events_metrics_collector, event_metrics_task) =
416 EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
417
418 let (listener, listener_task) =
419 MultiViewListener::new_with_worker(events_metrics_collector.clone());
420 let listener = Arc::new(listener);
421
422 let (revalidation_queue, revalidation_task) =
423 revalidation_worker::RevalidationQueue::new_with_worker();
424
425 let (import_notification_sink, import_notification_sink_task) =
426 MultiViewImportNotificationSink::new_with_worker();
427
428 let (mempool, blocking_mempool_task) = TxMemPool::new(
429 pool_api.clone(),
430 listener.clone(),
431 metrics.clone(),
432 options.total_count(),
433 options.ready.total_bytes + options.future.total_bytes,
434 );
435 let mempool = Arc::from(mempool);
436
437 let (dropped_stream_controller, dropped_stream) =
438 MultiViewDroppedWatcherController::<ChainApi>::new();
439
440 let view_store = Arc::new(ViewStore::new(
441 pool_api.clone(),
442 listener,
443 dropped_stream_controller,
444 import_notification_sink.clone(),
445 ));
446
447 let dropped_monitor_task = Self::dropped_monitor_task(
448 dropped_stream,
449 mempool.clone(),
450 view_store.clone(),
451 import_notification_sink.clone(),
452 );
453
454 let combined_tasks = async move {
455 tokio::select! {
456 _ = listener_task => {}
457 _ = revalidation_task => {},
458 _ = import_notification_sink_task => {},
459 _ = dropped_monitor_task => {}
460 _ = event_metrics_task => {},
461 }
462 }
463 .boxed();
464 spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
465 spawner.spawn_essential_blocking(
466 "txpool-background",
467 Some("transaction-pool"),
468 blocking_mempool_task,
469 );
470
471 Self {
472 mempool,
473 api: pool_api,
474 view_store,
475 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
476 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
477 best_block_hash,
478 finalized_hash,
479 ))),
480 revalidation_queue: Arc::from(revalidation_queue),
481 import_notification_sink,
482 options,
483 metrics,
484 events_metrics_collector,
485 is_validator,
486 finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
487 included_transactions: Default::default(),
488 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
489 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
490 STAT_SLIDING_WINDOW,
491 )),
492 }
493 .inject_initial_view(best_block_hash)
494 }
495
496 pub fn api(&self) -> &ChainApi {
498 &self.api
499 }
500
501 pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
503 self.view_store.status()
504 }
505
506 pub fn active_views_count(&self) -> usize {
508 self.view_store.active_views.read().len()
509 }
510
511 pub fn inactive_views_count(&self) -> usize {
513 self.view_store.inactive_views.read().len()
514 }
515
516 fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
521 self.view_store
522 .active_views
523 .read()
524 .iter()
525 .map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
526 .collect()
527 }
528
529 pub fn has_view(&self, hash: &Block::Hash) -> bool {
531 self.view_store.active_views.read().contains_key(hash)
532 }
533
534 pub async fn mempool_len(&self) -> (usize, usize) {
538 self.mempool.unwatched_and_watched_count().await
539 }
540
541 pub fn futures_at(
545 &self,
546 at: Block::Hash,
547 ) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
548 self.view_store.futures_at(at)
549 }
550
551 pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
564 let start = Instant::now();
565 let api = self.api.clone();
566 debug!(
567 target: LOG_TARGET,
568 ?at,
569 "fatp::ready_at_light"
570 );
571
572 let at_number = self.api.resolve_block_number(at).ok();
573 let finalized_number = self
574 .api
575 .resolve_block_number(self.enactment_state.lock().recent_finalized_block())
576 .ok();
577
578 if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
581 let at_hn = HashAndNumber { hash: at, number: at_number };
582 finalized_number.and_then(|finalized_number| {
583 self.view_store
584 .find_view_descendent_up_to_number(&at_hn, finalized_number)
585 .map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
586 })
587 }) {
588 let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
589 let mut all_extrinsics = vec![];
590 for h in enacted_blocks {
591 let extrinsics = api
592 .block_body(h)
593 .await
594 .unwrap_or_else(|error| {
595 warn!(
596 target: LOG_TARGET,
597 %error,
598 "Compute ready light transactions: error request"
599 );
600 None
601 })
602 .unwrap_or_default()
603 .into_iter()
604 .map(|t| api.hash_and_length(&t).0);
605 all_extrinsics.extend(extrinsics);
606 }
607
608 let before_count = tmp_view.pool.validated_pool().status().ready;
609 let tags = tmp_view
610 .pool
611 .validated_pool()
612 .extrinsics_tags(&all_extrinsics)
613 .into_iter()
614 .flatten()
615 .flatten()
616 .collect::<Vec<_>>();
617 let _ = tmp_view.pool.validated_pool().prune_tags(tags);
618
619 let after_count = tmp_view.pool.validated_pool().status().ready;
620 debug!(
621 target: LOG_TARGET,
622 ?at,
623 best_view_hash = ?view.at.hash,
624 before_count,
625 to_be_removed = all_extrinsics.len(),
626 after_count,
627 duration = ?start.elapsed(),
628 "fatp::ready_at_light -> light"
629 );
630 Box::new(tmp_view.pool.validated_pool().ready())
631 } else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
632 debug!(
637 target: LOG_TARGET,
638 ?at,
639 duration = ?start.elapsed(),
640 "fatp::ready_at_light -> most_recent_view"
641 );
642 Box::new(most_recent_view.pool.validated_pool().ready())
643 } else {
644 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
645 debug!(
646 target: LOG_TARGET,
647 ?at,
648 duration = ?start.elapsed(),
649 "fatp::ready_at_light -> empty"
650 );
651 empty
652 }
653 }
654
655 async fn ready_at_with_timeout_internal(
666 &self,
667 at: Block::Hash,
668 timeout: std::time::Duration,
669 ) -> ReadyIteratorFor<ChainApi> {
670 debug!(
671 target: LOG_TARGET,
672 ?at,
673 ?timeout,
674 "fatp::ready_at_with_timeout"
675 );
676 let timeout = futures_timer::Delay::new(timeout);
677 let (view_already_exists, ready_at) = self.ready_at_internal(at);
678
679 if view_already_exists {
680 return ready_at.await;
681 }
682
683 let maybe_ready = async move {
684 select! {
685 ready = ready_at => Some(ready),
686 _ = timeout => {
687 debug!(
688 target: LOG_TARGET,
689 ?at,
690 "Timeout fired waiting for transaction pool at block. Proceeding with production."
691 );
692 None
693 }
694 }
695 };
696
697 let fall_back_ready = self.ready_at_light(at);
698 let (maybe_ready, fall_back_ready) =
699 futures::future::join(maybe_ready, fall_back_ready).await;
700 maybe_ready.unwrap_or(fall_back_ready)
701 }
702
703 fn ready_at_internal(
704 &self,
705 at: Block::Hash,
706 ) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
707 let mut ready_poll = self.ready_poll.lock();
708
709 if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
710 debug!(
711 target: LOG_TARGET,
712 ?at,
713 ?inactive,
714 "fatp::ready_at_internal"
715 );
716 let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
717 return (true, async move { iterator }.boxed());
718 }
719
720 let pending = ready_poll
721 .add(at)
722 .map(|received| {
723 received.unwrap_or_else(|error| {
724 warn!(
725 target: LOG_TARGET,
726 %error,
727 "Error receiving ready-set iterator"
728 );
729 Box::new(std::iter::empty())
730 })
731 })
732 .boxed();
733 debug!(
734 target: LOG_TARGET,
735 ?at,
736 pending_keys = ?ready_poll.pollers.keys(),
737 "fatp::ready_at_internal"
738 );
739 (false, pending)
740 }
741
742 async fn submit_and_watch_inner(
744 &self,
745 at: Block::Hash,
746 source: TransactionSource,
747 xt: TransactionFor<Self>,
748 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
749 let xt = Arc::from(xt);
750
751 let at_number = self
752 .api
753 .block_id_to_number(&BlockId::Hash(at))
754 .ok()
755 .flatten()
756 .unwrap_or_default()
757 .into()
758 .as_u64();
759
760 let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
761 Ok(result) => result,
762 Err(TxPoolApiError::ImmediatelyDropped) => {
763 self.attempt_transaction_replacement(source, at_number, true, xt.clone())
764 .await?
765 },
766 Err(e) => return Err(e.into()),
767 };
768
769 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
770 self.events_metrics_collector.report_submitted(&insertion);
771
772 match self.view_store.submit_and_watch(at, insertion.source, xt).await {
773 Err(e) => {
774 self.mempool.remove_transactions(&[insertion.hash]).await;
775 Err(e.into())
776 },
777 Ok(mut outcome) => {
778 self.mempool
779 .update_transaction_priority(outcome.hash(), outcome.priority())
780 .await;
781 Ok(outcome.expect_watcher())
782 },
783 }
784 }
785
786 async fn submit_at_inner(
788 &self,
789 at: Block::Hash,
790 source: TransactionSource,
791 xts: Vec<TransactionFor<Self>>,
792 ) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
793 let at_number = self
794 .api
795 .block_id_to_number(&BlockId::Hash(at))
796 .ok()
797 .flatten()
798 .unwrap_or_default()
799 .into()
800 .as_u64();
801 let view_store = self.view_store.clone();
802 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
803 let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
804
805 if view_store.is_empty() {
806 return Ok(mempool_results
807 .into_iter()
808 .map(|r| r.map(|r| r.hash).map_err(Into::into))
809 .collect::<Vec<_>>());
810 }
811
812 let retries = mempool_results
814 .into_iter()
815 .zip(xts.clone())
816 .map(|(result, xt)| async move {
817 match result {
818 Err(TxPoolApiError::ImmediatelyDropped) => {
819 self.attempt_transaction_replacement(source, at_number, false, xt).await
820 },
821 _ => result,
822 }
823 })
824 .collect::<Vec<_>>();
825
826 let mempool_results = futures::future::join_all(retries).await;
827
828 let to_be_submitted = mempool_results
830 .iter()
831 .zip(xts)
832 .filter_map(|(result, xt)| {
833 result.as_ref().ok().map(|insertion| {
834 self.events_metrics_collector.report_submitted(&insertion);
835 (insertion.source.clone(), xt)
836 })
837 })
838 .collect::<Vec<_>>();
839
840 self.metrics
841 .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
842
843 let mempool = self.mempool.clone();
846 let results_map = view_store.submit(to_be_submitted.into_iter()).await;
847 let mut submission_results = reduce_multiview_result(results_map).into_iter();
848
849 const RESULTS_ASSUMPTION : &str =
866 "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
867 let merged_results = mempool_results.into_iter().map(|result| {
868 result.map_err(Into::into).and_then(|insertion| {
869 Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
870 })
871 });
872
873 let mut final_results = vec![];
874 for r in merged_results {
875 match r {
876 Ok((hash, submission_result)) => match submission_result {
877 Ok(r) => {
878 mempool.update_transaction_priority(r.hash(), r.priority()).await;
879 final_results.push(Ok(r.hash()));
880 },
881 Err(e) => {
882 mempool.remove_transactions(&[hash]).await;
883 final_results.push(Err(e));
884 },
885 },
886 Err(e) => final_results.push(Err(e)),
887 }
888 }
889
890 Ok(final_results)
891 }
892
893 pub fn import_notification_sink_len(&self) -> usize {
897 self.import_notification_sink.notified_items_len()
898 }
899}
900
901fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
928 let mut values = input.values();
929 let Some(first) = values.next() else {
930 return Default::default();
931 };
932 let length = first.len();
933 debug_assert!(values.all(|x| length == x.len()));
934
935 input
936 .into_values()
937 .reduce(|mut agg_results, results| {
938 agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
939 if agg_r.is_err() {
940 *agg_r = r;
941 }
942 });
943 agg_results
944 })
945 .unwrap_or_default()
946}
947
948#[async_trait]
949impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
950where
951 Block: BlockT,
952 ChainApi: 'static + graph::ChainApi<Block = Block>,
953 <Block as BlockT>::Hash: Unpin,
954{
955 type Block = ChainApi::Block;
956 type Hash = ExtrinsicHash<ChainApi>;
957 type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
958 type Error = ChainApi::Error;
959
960 async fn submit_at(
967 &self,
968 at: <Self::Block as BlockT>::Hash,
969 source: TransactionSource,
970 xts: Vec<TransactionFor<Self>>,
971 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
972 let start = Instant::now();
973 trace!(
974 target: LOG_TARGET,
975 count = xts.len(),
976 active_views_count = self.active_views_count(),
977 "fatp::submit_at"
978 );
979 log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
980 let result = self.submit_at_inner(at, source, xts).await;
981 insert_and_log_throttled!(
982 Level::DEBUG,
983 target:LOG_TARGET_STAT,
984 prefix:"submit_stats",
985 self.submit_stats,
986 start.elapsed().into()
987 );
988 result
989 }
990
991 async fn submit_one(
995 &self,
996 _at: <Self::Block as BlockT>::Hash,
997 source: TransactionSource,
998 xt: TransactionFor<Self>,
999 ) -> Result<TxHash<Self>, Self::Error> {
1000 trace!(
1001 target: LOG_TARGET,
1002 tx_hash = ?self.tx_hash(&xt),
1003 active_views_count = self.active_views_count(),
1004 "fatp::submit_one"
1005 );
1006 match self.submit_at(_at, source, vec![xt]).await {
1007 Ok(mut v) => {
1008 v.pop().expect("There is exactly one element in result of submit_at. qed.")
1009 },
1010 Err(e) => Err(e),
1011 }
1012 }
1013
1014 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
1019 async fn submit_and_watch(
1020 &self,
1021 at: <Self::Block as BlockT>::Hash,
1022 source: TransactionSource,
1023 xt: TransactionFor<Self>,
1024 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1025 let start = Instant::now();
1026 trace!(
1027 target: LOG_TARGET,
1028 tx_hash = ?self.tx_hash(&xt),
1029 views = self.active_views_count(),
1030 "fatp::submit_and_watch"
1031 );
1032 let result = self.submit_and_watch_inner(at, source, xt).await;
1033 insert_and_log_throttled!(
1034 Level::DEBUG,
1035 target:LOG_TARGET_STAT,
1036 prefix:"submit_and_watch_stats",
1037 self.submit_and_watch_stats,
1038 start.elapsed().into()
1039 );
1040 result
1041 }
1042
1043 async fn report_invalid(
1052 &self,
1053 at: Option<<Self::Block as BlockT>::Hash>,
1054 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1055 ) -> Vec<Arc<Self::InPoolTransaction>> {
1056 debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1057 log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1058 self.metrics
1059 .report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1060
1061 let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1062
1063 let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1064 self.mempool.remove_transactions(&removed_hashes).await;
1065 self.import_notification_sink.clean_notified_items(&removed_hashes);
1066
1067 self.metrics
1068 .report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1069
1070 removed
1071 }
1072
1073 fn status(&self) -> PoolStatus {
1081 self.view_store
1082 .most_recent_view
1083 .read()
1084 .as_ref()
1085 .map(|v| v.status())
1086 .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1087 }
1088
1089 fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1094 self.import_notification_sink.event_stream()
1095 }
1096
1097 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1099 self.api().hash_and_length(xt).0
1100 }
1101
1102 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1104 self.view_store.listener.transactions_broadcasted(propagations);
1105 }
1106
1107 fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1113 let most_recent_view_hash =
1114 self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1115 let result = most_recent_view_hash
1116 .and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1117 trace!(
1118 target: LOG_TARGET,
1119 ?tx_hash,
1120 is_ready = result.is_some(),
1121 most_recent_view = ?most_recent_view_hash,
1122 "ready_transaction"
1123 );
1124 result
1125 }
1126
1127 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1129 let (_, result) = self.ready_at_internal(at);
1130 result.await
1131 }
1132
1133 fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1138 self.view_store.ready()
1139 }
1140
1141 fn futures(&self) -> Vec<Self::InPoolTransaction> {
1146 self.view_store.futures()
1147 }
1148
1149 async fn ready_at_with_timeout(
1154 &self,
1155 at: <Self::Block as BlockT>::Hash,
1156 timeout: std::time::Duration,
1157 ) -> ReadyIteratorFor<ChainApi> {
1158 self.ready_at_with_timeout_internal(at, timeout).await
1159 }
1160}
1161
1162impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
1163 for ForkAwareTxPool<ChainApi, Block>
1164where
1165 Block: BlockT,
1166 ChainApi: 'static + graph::ChainApi<Block = Block>,
1167 <Block as BlockT>::Hash: Unpin,
1168{
1169 type Block = Block;
1170 type Hash = ExtrinsicHash<ChainApi>;
1171 type Error = ChainApi::Error;
1172
1173 fn submit_local(
1174 &self,
1175 at: Block::Hash,
1176 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
1177 ) -> Result<Self::Hash, Self::Error> {
1178 trace!(
1179 target: LOG_TARGET,
1180 active_views_count = self.active_views_count(),
1181 "fatp::submit_local"
1182 );
1183 let xt = Arc::from(xt);
1184 let at_number = self
1185 .api
1186 .block_id_to_number(&BlockId::Hash(at))
1187 .ok()
1188 .flatten()
1189 .unwrap_or_default()
1190 .into()
1191 .as_u64();
1192
1193 let result = self
1195 .mempool
1196 .clone()
1197 .extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1198 .remove(0);
1199
1200 let insertion = match result {
1201 Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1202 TransactionSource::Local,
1203 false,
1204 xt.clone(),
1205 ),
1206 _ => result,
1207 }?;
1208
1209 self.view_store
1210 .submit_local(xt)
1211 .inspect_err(|_| {
1212 self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1213 })
1214 .map(|outcome| {
1215 self.mempool
1216 .clone()
1217 .update_transaction_priority_sync(outcome.hash(), outcome.priority());
1218 outcome.hash()
1219 })
1220 .or_else(|_| Ok(insertion.hash))
1221 }
1222}
1223
1224impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1225where
1226 Block: BlockT,
1227 ChainApi: graph::ChainApi<Block = Block> + 'static,
1228 <Block as BlockT>::Hash: Unpin,
1229{
1230 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1238 async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1239 let hash_and_number = match tree_route.last() {
1240 Some(hash_and_number) => hash_and_number,
1241 None => {
1242 warn!(
1243 target: LOG_TARGET,
1244 ?tree_route,
1245 "Skipping ChainEvent - no last block in tree route"
1246 );
1247 return;
1248 },
1249 };
1250
1251 if self.has_view(&hash_and_number.hash) {
1252 debug!(
1253 target: LOG_TARGET,
1254 ?hash_and_number,
1255 "view already exists for block"
1256 );
1257 return;
1258 }
1259
1260 let best_view = self.view_store.find_best_view(tree_route);
1261 let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1262
1263 if let Some(view) = new_view {
1264 {
1265 let view = view.clone();
1266 self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1267 Box::from(view.pool.validated_pool().ready())
1268 });
1269 }
1270
1271 View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1272 }
1273
1274 self.finality_stall_cleanup(hash_and_number).await;
1275 }
1276
1277 async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1287 let (oldest_block_number, finality_timedout_blocks) = {
1288 let mut included_transactions = self.included_transactions.lock();
1289
1290 let Some(oldest_block_number) =
1291 included_transactions.first_key_value().map(|(k, _)| k.number)
1292 else {
1293 return;
1294 };
1295
1296 if at.number.saturating_sub(oldest_block_number).into() <=
1297 self.finality_timeout_threshold.into()
1298 {
1299 return;
1300 }
1301
1302 let mut finality_timedout_blocks =
1303 indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1304
1305 included_transactions.retain(
1306 |HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1307 let diff = at.number.saturating_sub(*view_number);
1308 if diff.into() > self.finality_timeout_threshold.into() {
1309 finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1310 false
1311 } else {
1312 true
1313 }
1314 },
1315 );
1316
1317 (oldest_block_number, finality_timedout_blocks)
1318 };
1319
1320 if !finality_timedout_blocks.is_empty() {
1321 self.ready_poll.lock().remove_cancelled();
1322 self.view_store.listener.remove_stale_controllers();
1323 }
1324
1325 let finality_timedout_blocks_len = finality_timedout_blocks.len();
1326
1327 for (block_hash, tx_hashes) in finality_timedout_blocks {
1328 self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1329
1330 self.mempool.remove_transactions(&tx_hashes).await;
1331 self.import_notification_sink.clean_notified_items(&tx_hashes);
1332 self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1333 }
1334
1335 self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1336
1337 debug!(
1338 target: LOG_TARGET,
1339 ?at,
1340 included_transactions_len = ?self.included_transactions.lock().len(),
1341 finality_timedout_blocks_len,
1342 ?oldest_block_number,
1343 "finality_stall_cleanup"
1344 );
1345 }
1346
1347 fn build_and_plug_view(
1356 &self,
1357 origin_view: Option<Arc<View<ChainApi>>>,
1358 at: &HashAndNumber<Block>,
1359 tree_route: &TreeRoute<Block>,
1360 ) -> View<ChainApi> {
1361 let enter = Instant::now();
1362 let (view, view_dropped_stream, view_aggregated_stream) =
1363 if let Some(origin_view) = origin_view {
1364 let (mut view, view_dropped_stream, view_aggragated_stream) =
1365 View::new_from_other(&origin_view, at);
1366 if !tree_route.retracted().is_empty() {
1367 view.pool.clear_recently_pruned();
1368 }
1369 (view, view_dropped_stream, view_aggragated_stream)
1370 } else {
1371 debug!(
1372 target: LOG_TARGET,
1373 ?at,
1374 "creating non-cloned view"
1375 );
1376 View::new(
1377 self.api.clone(),
1378 at.clone(),
1379 self.options.clone(),
1380 self.metrics.clone(),
1381 self.is_validator.clone(),
1382 )
1383 };
1384 debug!(
1385 target: LOG_TARGET,
1386 ?at,
1387 duration = ?enter.elapsed(),
1388 "build_new_view::clone_view"
1389 );
1390
1391 self.import_notification_sink.add_view(
1394 view.at.hash,
1395 view.pool.validated_pool().import_notification_stream().boxed(),
1396 );
1397
1398 self.view_store
1399 .dropped_stream_controller
1400 .add_view(view.at.hash, view_dropped_stream.boxed());
1401
1402 self.view_store
1403 .listener
1404 .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1405
1406 view
1407 }
1408
1409 async fn build_and_update_view(
1417 &self,
1418 origin_view: Option<Arc<View<ChainApi>>>,
1419 at: &HashAndNumber<Block>,
1420 tree_route: &TreeRoute<Block>,
1421 ) -> Option<Arc<View<ChainApi>>> {
1422 let start = Instant::now();
1423 debug!(
1424 target: LOG_TARGET,
1425 ?at,
1426 origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1427 ?tree_route,
1428 "build_new_view"
1429 );
1430
1431 let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1432
1433 view.pool.validated_pool().retrigger_notifications();
1436 debug!(
1437 target: LOG_TARGET,
1438 ?at,
1439 duration = ?start.elapsed(),
1440 "register_listeners"
1441 );
1442
1443 let start = Instant::now();
1446 self.update_view_with_fork(&view, tree_route, at.clone()).await;
1447 debug!(
1448 target: LOG_TARGET,
1449 ?at,
1450 duration = ?start.elapsed(),
1451 "update_view_with_fork"
1452 );
1453
1454 let start = Instant::now();
1456 self.update_view_with_mempool(&mut view).await;
1457 debug!(
1458 target: LOG_TARGET,
1459 ?at,
1460 duration= ?start.elapsed(),
1461 "update_view_with_mempool"
1462 );
1463 let view = Arc::from(view);
1464 self.view_store.insert_new_view(view.clone(), tree_route).await;
1465
1466 debug!(
1467 target: LOG_TARGET,
1468 duration = ?start.elapsed(),
1469 ?at,
1470 "build_new_view"
1471 );
1472 Some(view)
1473 }
1474
1475 async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1480 if let Some(txs) = self.included_transactions.lock().get(at) {
1481 return txs.clone();
1482 };
1483
1484 debug!(
1485 target: LOG_TARGET,
1486 ?at,
1487 "fetch_block_transactions from api"
1488 );
1489
1490 self.api
1491 .block_body(at.hash)
1492 .await
1493 .unwrap_or_else(|error| {
1494 warn!(
1495 target: LOG_TARGET,
1496 %error,
1497 "fetch_block_transactions: error request"
1498 );
1499 None
1500 })
1501 .unwrap_or_default()
1502 .into_iter()
1503 .map(|t| self.hash_of(&t))
1504 .collect::<Vec<_>>()
1505 }
1506
1507 async fn txs_included_since_finalized(
1512 &self,
1513 at: &HashAndNumber<Block>,
1514 ) -> HashSet<TxHash<Self>> {
1515 let start = Instant::now();
1516 let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1517
1518 let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1519 return Default::default();
1520 };
1521
1522 let mut all_txs = HashSet::new();
1523
1524 for block in tree_route.enacted().iter() {
1525 if at.number.saturating_sub(block.number).into() <=
1529 self.finality_timeout_threshold.into()
1530 {
1531 all_txs.extend(self.fetch_block_transactions(block).await);
1532 }
1533 }
1534
1535 debug!(
1536 target: LOG_TARGET,
1537 ?at,
1538 ?recent_finalized_block,
1539 extrinsics_count = all_txs.len(),
1540 duration = ?start.elapsed(),
1541 "fatp::txs_included_since_finalized"
1542 );
1543 all_txs
1544 }
1545
1546 async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1555 let xts_count = self.mempool.unwatched_and_watched_count().await;
1556 debug!(
1557 target: LOG_TARGET,
1558 view_at = ?view.at,
1559 ?xts_count,
1560 active_views_count = self.active_views_count(),
1561 "update_view_with_mempool"
1562 );
1563 let included_xts = self.txs_included_since_finalized(&view.at).await;
1564
1565 let view_hash = view.at.hash;
1566 let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1567 .mempool
1568 .with_transactions(|iter| {
1569 iter.filter(|(hash, _)| {
1570 match view.imported_status(hash) {
1571 ImportedStatus::Banned => {
1572 trace!(
1573 target: LOG_TARGET,
1574 ?hash,
1575 ?view_hash,
1576 "update_view_with_mempool: skipped (temporarily banned)"
1577 );
1578 return false;
1579 },
1580 ImportedStatus::Imported => return false,
1581 ImportedStatus::NotImported => {},
1582 }
1583 !included_xts.contains(hash)
1584 })
1585 .map(|(k, v)| (*k, v.clone()))
1586 .take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1588 .collect::<HashMap<_, _>>()
1589 })
1590 .await
1591 .into_iter()
1592 .map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1593 .unzip();
1594
1595 let results = view
1596 .submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1597 .await
1598 .into_iter()
1599 .zip(hashes)
1600 .map(|(result, tx_hash)| async move {
1601 match result {
1602 Ok(outcome) => Ok(self
1603 .mempool
1604 .update_transaction_priority(outcome.hash(), outcome.priority())
1605 .await),
1606 Err(error) => {
1607 trace!(
1608 target: LOG_TARGET,
1609 ?tx_hash,
1610 ?error,
1611 "update_view_with_mempool: tx rejected from view"
1612 );
1613 Err(tx_hash)
1614 },
1615 }
1616 })
1617 .collect::<Vec<_>>();
1618
1619 let results = futures::future::join_all(results).await;
1620
1621 let submitted_count = results.len();
1622
1623 debug!(
1624 target: LOG_TARGET,
1625 view_at_hash = ?view.at.hash,
1626 submitted_count,
1627 mempool_len = self.mempool.len(),
1628 "update_view_with_mempool"
1629 );
1630
1631 self.metrics
1632 .report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1633
1634 if self.view_store.is_empty() {
1637 for result in results {
1638 if let Err(tx_hash) = result {
1639 self.view_store.listener.transactions_invalidated(&[tx_hash]);
1640 self.mempool.remove_transactions(&[tx_hash]).await;
1641 }
1642 }
1643 }
1644 }
1645
1646 async fn collect_provides_tags_from_view_store(
1653 &self,
1654 tree_route: &TreeRoute<Block>,
1655 xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1656 ) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1657 let blocks_hashes = tree_route
1658 .retracted()
1659 .iter()
1660 .skip(1)
1662 .chain(
1664 std::iter::once(tree_route.common_block())
1665 .chain(tree_route.enacted().iter().rev().skip(1)),
1666 )
1667 .collect::<Vec<&HashAndNumber<Block>>>();
1668
1669 self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1670 }
1671
1672 pub async fn collect_extrinsics(
1674 &self,
1675 blocks: &[HashAndNumber<Block>],
1676 ) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1677 future::join_all(blocks.iter().map(|hn| async move {
1678 (
1679 hn.hash,
1680 self.api
1681 .block_body(hn.hash)
1682 .await
1683 .unwrap_or_else(|e| {
1684 warn!(target: LOG_TARGET, %e, ": block_body error request");
1685 None
1686 })
1687 .unwrap_or_default(),
1688 )
1689 }))
1690 .await
1691 .into_iter()
1692 .collect()
1693 }
1694
1695 async fn update_view_with_fork(
1700 &self,
1701 view: &View<ChainApi>,
1702 tree_route: &TreeRoute<Block>,
1703 hash_and_number: HashAndNumber<Block>,
1704 ) {
1705 debug!(
1706 target: LOG_TARGET,
1707 ?tree_route,
1708 at = ?view.at,
1709 "update_view_with_fork"
1710 );
1711 let api = self.api.clone();
1712
1713 let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1715
1716 let known_provides_tags = Arc::new(
1719 self.collect_provides_tags_from_view_store(
1720 tree_route,
1721 extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1722 )
1723 .await,
1724 );
1725
1726 debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1727
1728 let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1731 future::join_all(tree_route.enacted().iter().map(|hn| {
1732 let api = api.clone();
1733 let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1734 let known_provides_tags = known_provides_tags.clone();
1735 async move {
1736 (
1737 hn,
1738 crate::prune_known_txs_for_block(
1739 hn,
1740 &*api,
1741 &view.pool,
1742 Some(xts),
1743 Some(known_provides_tags),
1744 )
1745 .await,
1746 )
1747 }
1748 }))
1749 .await
1750 .into_iter()
1751 .for_each(|(key, enacted_log)| {
1752 pruned_log.extend(enacted_log.clone());
1753 self.included_transactions.lock().insert(key.clone(), enacted_log);
1754 });
1755
1756 let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1757 self.metrics
1758 .report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1759
1760 {
1762 let mut resubmit_transactions = Vec::new();
1763
1764 for retracted in tree_route.retracted().iter().rev() {
1765 let hash = retracted.hash;
1766
1767 let block_transactions = api
1768 .block_body(hash)
1769 .await
1770 .unwrap_or_else(|error| {
1771 warn!(
1772 target: LOG_TARGET,
1773 %error,
1774 "Failed to fetch block body"
1775 );
1776 None
1777 })
1778 .unwrap_or_default()
1779 .into_iter();
1780
1781 let mut resubmitted_to_report = 0;
1782
1783 let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1784 |(tx_hash, _)| {
1785 let contains = pruned_log.contains(&tx_hash);
1786
1787 resubmitted_to_report += 1;
1789
1790 if !contains {
1791 trace!(
1792 target: LOG_TARGET,
1793 ?tx_hash,
1794 ?hash,
1795 "Resubmitting from retracted block"
1796 );
1797 }
1798 !contains
1799 },
1800 );
1801 let mut result = vec![];
1802 for (tx_hash, tx) in txs {
1803 result.push(
1804 self.mempool
1806 .get_by_hash(tx_hash)
1807 .await
1808 .map(|tx| (tx.source(), tx.tx()))
1809 .unwrap_or_else(|| {
1810 (TimedTransactionSource::new_external(true), Arc::from(tx))
1813 }),
1814 );
1815 }
1816 resubmit_transactions.extend(result);
1817
1818 self.metrics.report(|metrics| {
1819 metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1820 });
1821 }
1822
1823 let _ = view
1824 .pool
1825 .resubmit_at(
1826 &hash_and_number,
1827 resubmit_transactions,
1828 ValidateTransactionPriority::Maintained,
1829 )
1830 .await;
1831 }
1832 }
1833
1834 async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1840 let start = Instant::now();
1841 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1842 debug!(
1843 target: LOG_TARGET,
1844 ?finalized_number,
1845 ?tree_route,
1846 active_views_count = self.active_views_count(),
1847 "handle_finalized"
1848 );
1849 let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1850
1851 self.mempool.purge_finalized_transactions(&finalized_xts).await;
1852 self.import_notification_sink.clean_notified_items(&finalized_xts);
1853
1854 self.metrics
1855 .report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1856
1857 if let Ok(Some(finalized_number)) = finalized_number {
1858 self.included_transactions
1859 .lock()
1860 .retain(|cached_block, _| finalized_number < cached_block.number);
1861 self.revalidation_queue
1862 .revalidate_mempool(
1863 self.mempool.clone(),
1864 self.view_store.clone(),
1865 HashAndNumber { hash: finalized_hash, number: finalized_number },
1866 )
1867 .await;
1868 } else {
1869 debug!(
1870 target: LOG_TARGET,
1871 ?finalized_number,
1872 "handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1873 );
1874 }
1875
1876 self.ready_poll.lock().remove_cancelled();
1877
1878 debug!(
1879 target: LOG_TARGET,
1880 active_views_count = self.active_views_count(),
1881 included_transactions_len = ?self.included_transactions.lock().len(),
1882 duration = ?start.elapsed(),
1883 "handle_finalized after"
1884 );
1885 }
1886
1887 fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1889 self.api.hash_and_length(xt).0
1890 }
1891
1892 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1902 async fn attempt_transaction_replacement(
1903 &self,
1904 source: TransactionSource,
1905 at_number: u64,
1906 watched: bool,
1907 xt: ExtrinsicFor<ChainApi>,
1908 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1909 let best_view = self
1910 .view_store
1911 .most_recent_view
1912 .read()
1913 .as_ref()
1914 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1915 .clone();
1916
1917 let (xt_hash, validated_tx) = best_view
1918 .pool
1919 .verify_one(
1920 best_view.at.hash,
1921 best_view.at.number,
1922 TimedTransactionSource::from_transaction_source(source, false),
1923 xt.clone(),
1924 crate::graph::CheckBannedBeforeVerify::Yes,
1925 ValidateTransactionPriority::Submitted,
1926 )
1927 .await;
1928
1929 let Some(priority) = validated_tx.priority() else {
1930 return Err(TxPoolApiError::ImmediatelyDropped);
1931 };
1932
1933 let insertion_info = self
1934 .mempool
1935 .try_insert_with_replacement(xt, priority, source, at_number, watched)
1936 .await?;
1937 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1938 }
1939
1940 fn attempt_transaction_replacement_sync(
1942 &self,
1943 source: TransactionSource,
1944 watched: bool,
1945 xt: ExtrinsicFor<ChainApi>,
1946 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1947 let HashAndNumber { number: at_number, hash: at_hash } = self
1948 .view_store
1949 .most_recent_view
1950 .read()
1951 .as_ref()
1952 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1953 .at;
1954
1955 let ValidTransaction { priority, .. } = self
1956 .api
1957 .validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1958 .map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1959 .map_err(|e| match e {
1960 TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1961 TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1962 })?;
1963 let xt_hash = self.hash_of(&xt);
1964
1965 let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1966 xt,
1967 priority,
1968 source,
1969 at_number.into().as_u64(),
1970 watched,
1971 )?;
1972 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1973 }
1974
1975 fn post_attempt_transaction_replacement(
1976 &self,
1977 tx_hash: ExtrinsicHash<ChainApi>,
1978 insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1979 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1980 for worst_hash in &insertion_info.removed {
1981 trace!(
1982 target: LOG_TARGET,
1983 tx_hash = ?worst_hash,
1984 new_tx_hash = ?tx_hash,
1985 "removed: replaced by"
1986 );
1987 self.view_store
1988 .listener
1989 .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1990
1991 self.view_store
1992 .remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1993 listener.limits_enforced(&removed_tx_hash);
1994 });
1995 }
1996
1997 return Ok(insertion_info);
1998 }
1999}
2000
2001#[async_trait]
2002impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
2003where
2004 Block: BlockT,
2005 ChainApi: 'static + graph::ChainApi<Block = Block>,
2006 <Block as BlockT>::Hash: Unpin,
2007{
2008 async fn maintain(&self, event: ChainEvent<Self::Block>) {
2010 let start = Instant::now();
2011 debug!(
2012 target: LOG_TARGET,
2013 ?event,
2014 "processing event"
2015 );
2016
2017 self.view_store.finish_background_revalidations().await;
2018
2019 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
2020
2021 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
2022 match self.api.tree_route(from, to) {
2023 Ok(tree_route) => Ok(tree_route),
2024 Err(e) => {
2025 return Err(format!(
2026 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
2027 ))
2028 },
2029 }
2030 };
2031 let block_id_to_number =
2032 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
2033
2034 let result =
2035 self.enactment_state
2036 .lock()
2037 .update(&event, &compute_tree_route, &block_id_to_number);
2038
2039 match result {
2040 Err(error) => {
2041 debug!(
2042 target: LOG_TARGET,
2043 %error,
2044 "enactment_state::update error"
2045 );
2046 self.enactment_state.lock().force_update(&event);
2047 },
2048 Ok(EnactmentAction::Skip) => return,
2049 Ok(EnactmentAction::HandleFinalization) => {
2050 },
2058 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2059 self.handle_new_block(&tree_route).await;
2060 },
2061 };
2062
2063 match event {
2064 ChainEvent::NewBestBlock { .. } => {},
2065 ChainEvent::Finalized { hash, ref tree_route } => {
2066 self.handle_finalized(hash, tree_route).await;
2067
2068 debug!(
2069 target: LOG_TARGET,
2070 ?tree_route,
2071 ?prev_finalized_block,
2072 "on-finalized enacted"
2073 );
2074 },
2075 }
2076
2077 let duration = start.elapsed();
2078 let mempool_len = self.mempool_len().await;
2079 debug!(
2080 target: LOG_TARGET,
2081 txs = ?mempool_len,
2082 a = self.active_views_count(),
2083 i = self.inactive_views_count(),
2084 views = ?self.views_stats(),
2085 ?event,
2086 ?duration,
2087 "maintain"
2088 );
2089
2090 self.metrics.report(|metrics| {
2091 let (unwatched, watched) = mempool_len;
2092 let _ = (
2093 self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2094 self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2095 watched.try_into().map(|v| metrics.watched_txs.set(v)),
2096 unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2097 );
2098 metrics.maintain_duration.observe(duration.as_secs_f64());
2099 });
2100 }
2101}
2102
2103impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2104where
2105 Block: BlockT,
2106 Client: sp_api::ProvideRuntimeApi<Block>
2107 + sc_client_api::BlockBackend<Block>
2108 + sc_client_api::blockchain::HeaderBackend<Block>
2109 + sp_runtime::traits::BlockIdTo<Block>
2110 + sc_client_api::ExecutorProvider<Block>
2111 + sc_client_api::UsageProvider<Block>
2112 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
2113 + Send
2114 + Sync
2115 + 'static,
2116 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
2117 <Block as BlockT>::Hash: std::marker::Unpin,
2118{
2119 pub fn new_full(
2121 options: Options,
2122 is_validator: IsValidator,
2123 prometheus: Option<&PrometheusRegistry>,
2124 spawner: impl SpawnEssentialNamed,
2125 client: Arc<Client>,
2126 ) -> Self {
2127 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2128 let pool = Self::new_with_background_worker(
2129 options,
2130 is_validator,
2131 pool_api,
2132 prometheus,
2133 spawner,
2134 client.usage_info().chain.best_hash,
2135 client.usage_info().chain.finalized_hash,
2136 );
2137
2138 pool
2139 }
2140}
2141
2142#[cfg(test)]
2143mod reduce_multiview_result_tests {
2144 use super::*;
2145 use sp_core::H256;
2146 #[derive(Debug, PartialEq, Clone)]
2147 enum Error {
2148 Custom(u8),
2149 }
2150
2151 #[test]
2152 fn empty() {
2153 sp_tracing::try_init_simple();
2154 let input = HashMap::default();
2155 let r = reduce_multiview_result::<H256, H256, Error>(input);
2156 assert!(r.is_empty());
2157 }
2158
2159 #[test]
2160 fn errors_only() {
2161 sp_tracing::try_init_simple();
2162 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2163 (
2164 H256::repeat_byte(0x13),
2165 vec![
2166 Err(Error::Custom(10)),
2167 Err(Error::Custom(11)),
2168 Err(Error::Custom(12)),
2169 Err(Error::Custom(13)),
2170 ],
2171 ),
2172 (
2173 H256::repeat_byte(0x14),
2174 vec![
2175 Err(Error::Custom(20)),
2176 Err(Error::Custom(21)),
2177 Err(Error::Custom(22)),
2178 Err(Error::Custom(23)),
2179 ],
2180 ),
2181 (
2182 H256::repeat_byte(0x15),
2183 vec![
2184 Err(Error::Custom(30)),
2185 Err(Error::Custom(31)),
2186 Err(Error::Custom(32)),
2187 Err(Error::Custom(33)),
2188 ],
2189 ),
2190 ];
2191 let input = HashMap::from_iter(v.clone());
2192 let r = reduce_multiview_result(input);
2193
2194 assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2196 }
2197
2198 #[test]
2199 #[should_panic]
2200 #[cfg(debug_assertions)]
2201 fn invalid_lengths() {
2202 sp_tracing::try_init_simple();
2203 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2204 (H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2205 (H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2206 ];
2207 let input = HashMap::from_iter(v);
2208 let _ = reduce_multiview_result(input);
2209 }
2210
2211 #[test]
2212 fn only_hashes() {
2213 sp_tracing::try_init_simple();
2214
2215 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2216 (
2217 H256::repeat_byte(0x13),
2218 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2219 ),
2220 (
2221 H256::repeat_byte(0x14),
2222 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2223 ),
2224 ];
2225 let input = HashMap::from_iter(v);
2226 let r = reduce_multiview_result(input);
2227
2228 assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2229 }
2230
2231 #[test]
2232 fn one_view() {
2233 sp_tracing::try_init_simple();
2234 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2235 H256::repeat_byte(0x13),
2236 vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2237 )];
2238 let input = HashMap::from_iter(v);
2239 let r = reduce_multiview_result(input);
2240
2241 assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2242 }
2243
2244 #[test]
2245 fn mix() {
2246 sp_tracing::try_init_simple();
2247 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2248 (
2249 H256::repeat_byte(0x13),
2250 vec![
2251 Ok(H256::repeat_byte(0x10)),
2252 Err(Error::Custom(11)),
2253 Err(Error::Custom(12)),
2254 Err(Error::Custom(33)),
2255 ],
2256 ),
2257 (
2258 H256::repeat_byte(0x14),
2259 vec![
2260 Err(Error::Custom(20)),
2261 Ok(H256::repeat_byte(0x21)),
2262 Err(Error::Custom(22)),
2263 Err(Error::Custom(33)),
2264 ],
2265 ),
2266 (
2267 H256::repeat_byte(0x15),
2268 vec![
2269 Err(Error::Custom(30)),
2270 Err(Error::Custom(31)),
2271 Ok(H256::repeat_byte(0x32)),
2272 Err(Error::Custom(33)),
2273 ],
2274 ),
2275 ];
2276 let input = HashMap::from_iter(v);
2277 let r = reduce_multiview_result(input);
2278
2279 assert_eq!(
2280 r,
2281 vec![
2282 Ok(H256::repeat_byte(0x10)),
2283 Ok(H256::repeat_byte(0x21)),
2284 Ok(H256::repeat_byte(0x32)),
2285 Err(Error::Custom(33))
2286 ]
2287 );
2288 }
2289}