1use futures::{future::join_all, FutureExt};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
38use sp_blockchain::HashAndNumber;
39use sp_runtime::{
40 traits::Block as BlockT,
41 transaction_validity::{InvalidTransaction, TransactionValidityError},
42};
43use std::{
44 collections::HashSet,
45 future::Future,
46 pin::Pin,
47 sync::{
48 atomic::{self, AtomicU64},
49 mpsc::{
50 channel as sync_bridge_channel, Receiver as SyncBridgeReceiver,
51 Sender as SyncBridgeSender,
52 },
53 Arc,
54 },
55 time::Instant,
56};
57use tracing::{debug, trace};
58
59use crate::{
60 common::tracing_log_xt::log_xt_trace,
61 graph,
62 graph::{base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash},
63 ValidateTransactionPriority, LOG_TARGET,
64};
65
66use super::{
67 metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
68 view_store::ViewStore,
69};
70
71mod tx_mem_pool_map;
72
73pub(crate) const TXMEMPOOL_REVALIDATION_PERIOD: u64 = 10;
75
76pub(crate) const TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE: usize = 1000;
78
79const SYNC_BRIDGE_EXPECT: &str = "The mempool blocking task shall not be terminated. qed.";
80
81pub(crate) struct TxInMemPool<ChainApi, Block>
83where
84 Block: BlockT,
85 ChainApi: graph::ChainApi<Block = Block> + 'static,
86{
87 watched: bool,
92 tx: ExtrinsicFor<ChainApi>,
94 bytes: usize,
96 source: TimedTransactionSource,
98 validated_at: AtomicU64,
100 priority: RwLock<Option<TransactionPriority>>,
103}
104
105impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
106where
107 Block: BlockT,
108 ChainApi: graph::ChainApi<Block = Block> + 'static,
109{
110 pub(crate) fn is_watched(&self) -> bool {
114 self.watched
115 }
116
117 fn new_unwatched(
119 source: TransactionSource,
120 tx: ExtrinsicFor<ChainApi>,
121 bytes: usize,
122 validated_at: u64,
123 ) -> Self {
124 Self::new(false, source, tx, bytes, validated_at)
125 }
126
127 fn new_watched(
129 source: TransactionSource,
130 tx: ExtrinsicFor<ChainApi>,
131 bytes: usize,
132 validated_at: u64,
133 ) -> Self {
134 Self::new(true, source, tx, bytes, validated_at)
135 }
136
137 fn new(
139 watched: bool,
140 source: TransactionSource,
141 tx: ExtrinsicFor<ChainApi>,
142 bytes: usize,
143 validated_at: u64,
144 ) -> Self {
145 Self::new_with_optional_priority(watched, source, tx, bytes, None, validated_at)
146 }
147
148 fn new_with_priority(
150 watched: bool,
151 source: TransactionSource,
152 tx: ExtrinsicFor<ChainApi>,
153 bytes: usize,
154 priority: TransactionPriority,
155 validated_at: u64,
156 ) -> Self {
157 Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority), validated_at)
158 }
159
160 fn new_with_optional_priority(
162 watched: bool,
163 source: TransactionSource,
164 tx: ExtrinsicFor<ChainApi>,
165 bytes: usize,
166 priority: Option<TransactionPriority>,
167 validated_at: u64,
168 ) -> Self {
169 Self {
170 watched,
171 tx,
172 source: TimedTransactionSource::from_transaction_source(source, true),
173 validated_at: AtomicU64::new(validated_at),
174 bytes,
175 priority: priority.into(),
176 }
177 }
178
179 pub(crate) fn tx(&self) -> ExtrinsicFor<ChainApi> {
183 self.tx.clone()
184 }
185
186 pub(crate) fn source(&self) -> TimedTransactionSource {
188 self.source.clone()
189 }
190
191 pub(crate) fn priority(&self) -> Option<TransactionPriority> {
193 *self.priority.read()
194 }
195}
196
197impl<ChainApi, Block> std::fmt::Debug for TxInMemPool<ChainApi, Block>
198where
199 Block: BlockT,
200 ChainApi: graph::ChainApi<Block = Block> + 'static,
201{
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("TxInMemPool")
204 .field("watched", &self.watched)
205 .field("tx", &"...")
206 .field("bytes", &self.bytes)
207 .field("source", &self.source)
208 .field("validated_at", &self.validated_at)
209 .field("priority", &self.priority)
210 .finish()
211 }
212}
213
214impl<ChainApi, Block> std::cmp::PartialEq for TxInMemPool<ChainApi, Block>
215where
216 Block: BlockT,
217 ChainApi: graph::ChainApi<Block = Block> + 'static,
218{
219 fn eq(&self, other: &Self) -> bool {
220 self.watched == other.watched &&
221 self.tx == other.tx &&
222 self.bytes == other.bytes &&
223 self.source == other.source &&
224 *self.priority.read() == *other.priority.read() &&
225 self.validated_at.load(atomic::Ordering::Relaxed) ==
226 other.validated_at.load(atomic::Ordering::Relaxed)
227 }
228}
229
230#[derive(Debug, Clone, Copy, Eq, PartialEq)]
231struct MempoolTxPriority(pub Option<TransactionPriority>);
232
233impl Ord for MempoolTxPriority {
234 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
235 match (&self.0, &other.0) {
236 (Some(a), Some(b)) => a.cmp(b),
237 (Some(_), None) => std::cmp::Ordering::Less,
238 (None, Some(_)) => std::cmp::Ordering::Greater,
239 (None, None) => std::cmp::Ordering::Equal,
240 }
241 }
242}
243impl From<Option<TransactionPriority>> for MempoolTxPriority {
244 fn from(value: Option<TransactionPriority>) -> Self {
245 MempoolTxPriority(value)
246 }
247}
248
249impl PartialOrd for MempoolTxPriority {
250 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
251 Some(self.cmp(other))
252 }
253}
254
255impl<ChainApi, Block> tx_mem_pool_map::Size for Arc<TxInMemPool<ChainApi, Block>>
256where
257 Block: BlockT,
258 ChainApi: graph::ChainApi<Block = Block> + 'static,
259{
260 fn size(&self) -> usize {
261 self.bytes
262 }
263}
264
265impl<ChainApi, Block> tx_mem_pool_map::PriorityAndTimestamp for Arc<TxInMemPool<ChainApi, Block>>
266where
267 Block: BlockT,
268 ChainApi: graph::ChainApi<Block = Block> + 'static,
269{
270 type Priority = MempoolTxPriority;
271 type Timestamp = Option<Instant>;
272
273 fn priority(&self) -> Self::Priority {
274 TxInMemPool::priority(self).into()
275 }
276
277 fn timestamp(&self) -> Self::Timestamp {
278 self.source().timestamp
279 }
280}
281
282type InternalTxMemPoolMap<ChainApi, Block> = tx_mem_pool_map::SizeTrackedStore<
283 ExtrinsicHash<ChainApi>,
284 tx_mem_pool_map::PriorityKey<MempoolTxPriority, Option<Instant>>,
285 Arc<TxInMemPool<ChainApi, Block>>,
286>;
287
288pub type TxMemPoolBlockingTask = Pin<Box<dyn Future<Output = ()> + Send>>;
292
293pub(super) struct TxMemPool<ChainApi, Block>
303where
304 Block: BlockT,
305 ChainApi: graph::ChainApi<Block = Block> + 'static,
306{
307 api: Arc<ChainApi>,
309
310 listener: Arc<MultiViewListener<ChainApi>>,
314
315 sync_channel: SyncBridgeSender<TxMemPoolSyncRequest<ChainApi, Block>>,
317
318 transactions: InternalTxMemPoolMap<ChainApi, Block>,
323
324 metrics: PrometheusMetrics,
326
327 max_transactions_count: usize,
329
330 max_transactions_total_bytes: usize,
332}
333
334#[derive(Debug)]
336pub(super) struct InsertionInfo<Hash> {
337 pub(super) hash: Hash,
338 pub(super) source: TimedTransactionSource,
339 pub(super) removed: Vec<Hash>,
340}
341
342impl<Hash> InsertionInfo<Hash> {
343 fn new(hash: Hash, source: TimedTransactionSource) -> Self {
344 Self::new_with_removed(hash, source, Default::default())
345 }
346 fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
347 Self { hash, source, removed }
348 }
349}
350
351impl<ChainApi, Block> TxMemPool<ChainApi, Block>
352where
353 Block: BlockT,
354 ChainApi: graph::ChainApi<Block = Block> + 'static,
355 <Block as BlockT>::Hash: Unpin,
356{
357 pub(super) fn new(
360 api: Arc<ChainApi>,
361 listener: Arc<MultiViewListener<ChainApi>>,
362 metrics: PrometheusMetrics,
363 max_transactions_count: usize,
364 max_transactions_total_bytes: usize,
365 ) -> (Self, TxMemPoolBlockingTask) {
366 let (sync_channel, rx) = sync_bridge_channel();
367 let task = Self::sync_bridge_task(rx);
368 (
369 Self {
370 api,
371 listener,
372 sync_channel,
373 transactions: Default::default(),
374 metrics,
375 max_transactions_count,
376 max_transactions_total_bytes,
377 },
378 task.boxed(),
379 )
380 }
381
382 #[cfg(test)]
384 fn new_test(
385 api: Arc<ChainApi>,
386 max_transactions_count: usize,
387 max_transactions_total_bytes: usize,
388 ) -> Self {
389 let (sync_channel, rx) = sync_bridge_channel();
390 tokio::task::spawn_blocking(move || Self::sync_bridge_task(rx));
391 Self {
392 api,
393 listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0),
394 transactions: Default::default(),
395 metrics: Default::default(),
396 sync_channel,
397 max_transactions_count,
398 max_transactions_total_bytes,
399 }
400 }
401
402 pub(super) async fn get_by_hash(
404 &self,
405 hash: ExtrinsicHash<ChainApi>,
406 ) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
407 self.transactions.read().await.get(&hash).map(Clone::clone)
408 }
409
410 pub async fn unwatched_and_watched_count(&self) -> (usize, usize) {
412 let transactions = self.transactions.read().await;
413 let watched_count = transactions.values().filter(|t| t.is_watched()).count();
414 (transactions.len() - watched_count, watched_count)
415 }
416
417 pub fn len(&self) -> usize {
419 self.transactions.len()
420 }
421
422 #[cfg(test)]
424 pub fn bytes(&self) -> usize {
425 return self.transactions.bytes()
426 }
427
428 fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
430 length > self.max_transactions_count ||
431 current_total_bytes > self.max_transactions_total_bytes
432 }
433
434 async fn try_insert(
437 &self,
438 tx_hash: ExtrinsicHash<ChainApi>,
439 tx: TxInMemPool<ChainApi, Block>,
440 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
441 let mut transactions = self.transactions.write().await;
442
443 let bytes = self.transactions.bytes();
444
445 let result = match (
446 self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
447 transactions.contains_key(&tx_hash),
448 ) {
449 (false, false) => {
450 let source = tx.source();
451 transactions.insert(tx_hash, Arc::from(tx));
452 Ok(InsertionInfo::new(tx_hash, source))
453 },
454 (_, true) =>
455 Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(tx_hash))),
456 (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
457 };
458 trace!(
459 target: LOG_TARGET,
460 ?tx_hash,
461 result_hash = ?result.as_ref().map(|r| r.hash),
462 "mempool::try_insert"
463 );
464 result
465 }
466
467 pub(super) async fn try_insert_with_replacement(
480 &self,
481 new_tx: ExtrinsicFor<ChainApi>,
482 priority: TransactionPriority,
483 source: TransactionSource,
484 validated_at: u64,
485 watched: bool,
486 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
487 let (hash, length) = self.api.hash_and_length(&new_tx);
488 let new_tx =
489 TxInMemPool::new_with_priority(watched, source, new_tx, length, priority, validated_at);
490 if new_tx.bytes > self.max_transactions_total_bytes {
491 return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
492 }
493
494 let mut transactions = self.transactions.write().await;
495
496 if transactions.contains_key(&hash) {
497 return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
498 }
499
500 let source = new_tx.source();
506 let new_tx = Arc::new(new_tx);
507 let insertion_result = transactions.try_insert_with_replacement(
508 self.max_transactions_total_bytes,
509 hash,
510 new_tx,
511 );
512 debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
513 match insertion_result {
514 None => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
515 Some(to_be_removed) => Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed)),
516 }
517 }
518
519 pub(super) async fn extend_unwatched(
524 &self,
525 source: TransactionSource,
526 validated_at: u64,
527 xts: &[ExtrinsicFor<ChainApi>],
528 ) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
529 {
530 let insert_futures = xts.into_iter().map(|xt| {
531 let api = self.api.clone();
532 let xt = xt.clone();
533 async move {
534 let (hash, length) = api.hash_and_length(&xt);
535 self.try_insert(hash, TxInMemPool::new_unwatched(source, xt, length, validated_at))
536 .await
537 }
538 });
539
540 join_all(insert_futures).await
541 }
542
543 pub(super) async fn push_watched(
546 &self,
547 source: TransactionSource,
548 validated_at: u64,
549 xt: ExtrinsicFor<ChainApi>,
550 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
551 let (hash, length) = self.api.hash_and_length(&xt);
552 self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length, validated_at))
553 .await
554 }
555
556 pub(super) async fn with_transactions<F, R>(&self, f: F) -> R
564 where
565 F: Fn(
566 std::collections::hash_map::Iter<
567 ExtrinsicHash<ChainApi>,
568 Arc<TxInMemPool<ChainApi, Block>>,
569 >,
570 ) -> R,
571 {
572 self.transactions.read().await.with_items(f)
573 }
574
575 pub(super) async fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) {
577 log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction");
578 let mut transactions = self.transactions.write().await;
579 for tx_hash in tx_hashes {
580 transactions.remove(tx_hash);
581 }
582 }
583
584 async fn revalidate_inner(&self, finalized_block: HashAndNumber<Block>) -> Vec<Block::Hash> {
588 trace!(
589 target: LOG_TARGET,
590 ?finalized_block,
591 "mempool::revalidate_inner"
592 );
593 let start = Instant::now();
594
595 let (total_count, to_be_validated) = {
596 (
597 self.transactions.len(),
598 self.with_transactions(|iter| {
599 iter.filter(|(_, xt)| {
600 let finalized_block_number = finalized_block.number.into().as_u64();
601 xt.validated_at.load(atomic::Ordering::Relaxed) +
602 TXMEMPOOL_REVALIDATION_PERIOD <
603 finalized_block_number
604 })
605 .sorted_by_key(|(_, tx)| tx.validated_at.load(atomic::Ordering::Relaxed))
606 .take(TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE)
607 .map(|(k, v)| (*k, v.clone()))
608 .collect::<Vec<_>>()
609 })
610 .await,
611 )
612 };
613
614 let validations_futures = to_be_validated.into_iter().map(|(xt_hash, xt)| {
615 self.api
616 .validate_transaction(
617 finalized_block.hash,
618 xt.source.clone().into(),
619 xt.tx(),
620 ValidateTransactionPriority::Maintained,
621 )
622 .map(move |validation_result| {
623 xt.validated_at
624 .store(finalized_block.number.into().as_u64(), atomic::Ordering::Relaxed);
625 (xt_hash, validation_result)
626 })
627 });
628 let validation_results = futures::future::join_all(validations_futures).await;
629 let validated_count = validation_results.len();
630
631 let duration = start.elapsed();
632
633 let invalid_hashes = validation_results
634 .into_iter()
635 .filter_map(|(tx_hash, validation_result)| match validation_result {
636 Ok(Ok(_)) |
637 Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => None,
638 Err(_) |
639 Ok(Err(TransactionValidityError::Unknown(_))) |
640 Ok(Err(TransactionValidityError::Invalid(_))) => {
641 trace!(
642 target: LOG_TARGET,
643 ?tx_hash,
644 ?validation_result,
645 "mempool::revalidate_inner invalid"
646 );
647 Some(tx_hash)
648 },
649 })
650 .collect::<Vec<_>>();
651
652 debug!(
653 target: LOG_TARGET,
654 ?finalized_block,
655 validated_count,
656 total_count,
657 invalid_hashes = invalid_hashes.len(),
658 ?duration,
659 "mempool::revalidate_inner"
660 );
661
662 invalid_hashes
663 }
664
665 pub(super) async fn purge_finalized_transactions(
667 &self,
668 finalized_xts: &Vec<ExtrinsicHash<ChainApi>>,
669 ) {
670 debug!(
671 target: LOG_TARGET,
672 count = finalized_xts.len(),
673 "purge_finalized_transactions"
674 );
675 log_xt_trace!(target: LOG_TARGET, finalized_xts, "purged finalized transactions");
676 let mut transactions = self.transactions.write().await;
677 finalized_xts.iter().for_each(|t| {
678 transactions.remove(t);
679 });
680 }
681
682 pub(super) async fn revalidate(
685 &self,
686 view_store: Arc<ViewStore<ChainApi, Block>>,
687 finalized_block: HashAndNumber<Block>,
688 ) {
689 let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
690
691 let mut invalid_hashes_subtrees =
692 revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>();
693 for tx in &revalidated_invalid_hashes {
694 invalid_hashes_subtrees.extend(
695 view_store
696 .remove_transaction_subtree(*tx, |_, _| {})
697 .into_iter()
698 .map(|tx| tx.hash),
699 );
700 }
701
702 {
703 let mut transactions = self.transactions.write().await;
704 invalid_hashes_subtrees.iter().for_each(|tx_hash| {
705 transactions.remove(&tx_hash);
706 });
707 };
708
709 self.metrics.report(|metrics| {
710 metrics
711 .mempool_revalidation_invalid_txs
712 .inc_by(invalid_hashes_subtrees.len() as _)
713 });
714
715 let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
716 let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();
717
718 let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::<Vec<_>>();
719
720 self.listener.transactions_invalidated(&invalid_hashes_subtrees);
724 view_store
725 .import_notification_sink
726 .clean_notified_items(&invalid_hashes_subtrees);
727 view_store
728 .dropped_stream_controller
729 .remove_transactions(invalid_hashes_subtrees);
730
731 trace!(
732 target: LOG_TARGET,
733 ?finalized_block,
734 revalidated_invalid_hashes_len,
735 invalid_hashes_subtrees_len,
736 "mempool::revalidate"
737 );
738 }
739
740 pub(super) async fn update_transaction_priority(
742 &self,
743 hash: ExtrinsicHash<ChainApi>,
744 prio: Option<TransactionPriority>,
745 ) {
746 if let Some(priority) = prio {
747 let mut transactions = self.transactions.write().await;
748
749 transactions.update_item(&hash, |t| {
750 *t.priority.write() = Some(priority);
751 });
752 }
753 }
754
755 pub(super) async fn count_unknown_transactions<'a>(
758 &self,
759 hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>,
760 ) -> usize {
761 let transactions = self.transactions.read().await;
762 hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count()
763 }
764}
765
766type ExtendUnwatchedResult<ChainApi> =
768 Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>;
769
770type TryInsertWithReplacementResult<ChainApi> =
772 Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>;
773
774enum TxMemPoolSyncRequest<ChainApi, Block>
776where
777 Block: BlockT,
778 ChainApi: graph::ChainApi<Block = Block> + 'static,
779{
780 RemoveTransactions(
781 Arc<TxMemPool<ChainApi, Block>>,
782 Vec<ExtrinsicHash<ChainApi>>,
783 SyncBridgeSender<()>,
784 ),
785 ExtendUnwatched(
786 Arc<TxMemPool<ChainApi, Block>>,
787 TransactionSource,
788 u64,
789 Vec<ExtrinsicFor<ChainApi>>,
790 SyncBridgeSender<ExtendUnwatchedResult<ChainApi>>,
791 ),
792 UpdateTransactionPriority(
793 Arc<TxMemPool<ChainApi, Block>>,
794 ExtrinsicHash<ChainApi>,
795 Option<TransactionPriority>,
796 SyncBridgeSender<()>,
797 ),
798 TryInsertWithReplacement(
799 Arc<TxMemPool<ChainApi, Block>>,
800 ExtrinsicFor<ChainApi>,
801 TransactionPriority,
802 TransactionSource,
803 u64,
804 bool,
805 SyncBridgeSender<TryInsertWithReplacementResult<ChainApi>>,
806 ),
807}
808
809impl<ChainApi, Block> TxMemPoolSyncRequest<ChainApi, Block>
810where
811 Block: BlockT,
812 ChainApi: graph::ChainApi<Block = Block> + 'static,
813{
814 fn remove_transactions(
815 mempool: Arc<TxMemPool<ChainApi, Block>>,
816 hashes: Vec<ExtrinsicHash<ChainApi>>,
817 ) -> (SyncBridgeReceiver<()>, Self) {
818 let (tx, rx) = sync_bridge_channel();
819 (rx, Self::RemoveTransactions(mempool, hashes, tx))
820 }
821
822 fn extend_unwatched(
823 mempool: Arc<TxMemPool<ChainApi, Block>>,
824 source: TransactionSource,
825 validated_at: u64,
826 xts: Vec<ExtrinsicFor<ChainApi>>,
827 ) -> (SyncBridgeReceiver<ExtendUnwatchedResult<ChainApi>>, Self) {
828 let (tx, rx) = sync_bridge_channel();
829 (rx, Self::ExtendUnwatched(mempool, source, validated_at, xts, tx))
830 }
831
832 fn update_transaction_priority(
833 mempool: Arc<TxMemPool<ChainApi, Block>>,
834 hash: ExtrinsicHash<ChainApi>,
835 prio: Option<TransactionPriority>,
836 ) -> (SyncBridgeReceiver<()>, Self) {
837 let (tx, rx) = sync_bridge_channel();
838 (rx, Self::UpdateTransactionPriority(mempool, hash, prio, tx))
839 }
840
841 fn try_insert_with_replacement(
842 mempool: Arc<TxMemPool<ChainApi, Block>>,
843 new_tx: ExtrinsicFor<ChainApi>,
844 priority: TransactionPriority,
845 source: TransactionSource,
846 validated_at: u64,
847 watched: bool,
848 ) -> (SyncBridgeReceiver<TryInsertWithReplacementResult<ChainApi>>, Self) {
849 let (tx, rx) = sync_bridge_channel();
850 (
851 rx,
852 Self::TryInsertWithReplacement(
853 mempool,
854 new_tx,
855 priority,
856 source,
857 validated_at,
858 watched,
859 tx,
860 ),
861 )
862 }
863}
864
865impl<ChainApi, Block> TxMemPool<ChainApi, Block>
866where
867 Block: BlockT,
868 ChainApi: graph::ChainApi<Block = Block> + 'static,
869 <Block as BlockT>::Hash: Unpin,
870{
871 async fn sync_bridge_task(rx: SyncBridgeReceiver<TxMemPoolSyncRequest<ChainApi, Block>>) {
872 for request in rx {
873 Self::handle_request(request).await;
874 }
875 }
876
877 async fn handle_request(request: TxMemPoolSyncRequest<ChainApi, Block>) {
878 match request {
879 TxMemPoolSyncRequest::RemoveTransactions(mempool, hashes, tx) => {
880 mempool.remove_transactions(&hashes).await;
881 if let Err(error) = tx.send(()) {
882 debug!(target: LOG_TARGET, ?error, "RemoveTransaction: sending response failed");
883 }
884 },
885 TxMemPoolSyncRequest::ExtendUnwatched(mempool, source, validated_at, txs, tx) => {
886 let result = mempool.extend_unwatched(source, validated_at, &txs).await;
887 if let Err(error) = tx.send(result) {
888 debug!(target: LOG_TARGET, ?error, "ExtendUnwatched: sending response failed");
889 }
890 },
891 TxMemPoolSyncRequest::UpdateTransactionPriority(mempool, hash, prio, tx) => {
892 let result = mempool.update_transaction_priority(hash, prio).await;
893 if let Err(error) = tx.send(result) {
894 debug!(target: LOG_TARGET, ?error, "UpdateTransactionPriority2: sending response failed");
895 }
896 },
897 TxMemPoolSyncRequest::TryInsertWithReplacement(
898 mempool,
899 new_tx,
900 priority,
901 source,
902 validated_at,
903 watched,
904 tx,
905 ) => {
906 let result = mempool
907 .try_insert_with_replacement(new_tx, priority, source, validated_at, watched)
908 .await;
909 if let Err(error) = tx.send(result) {
910 debug!(target: LOG_TARGET, ?error, "TryInsertWithReplacementSync: sending response failed");
911 }
912 },
913 }
914 }
915
916 pub(super) fn try_insert_with_replacement_sync(
917 self: Arc<Self>,
918 new_tx: ExtrinsicFor<ChainApi>,
919 priority: TransactionPriority,
920 source: TransactionSource,
921 validated_at: u64,
922 watched: bool,
923 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
924 let (response, request) = TxMemPoolSyncRequest::try_insert_with_replacement(
925 self.clone(),
926 new_tx,
927 priority,
928 source,
929 validated_at,
930 watched,
931 );
932 let _ = self.sync_channel.send(request);
933 response.recv().expect(SYNC_BRIDGE_EXPECT)
934 }
935
936 pub(super) fn extend_unwatched_sync(
937 self: Arc<Self>,
938 source: TransactionSource,
939 validated_at: u64,
940 xts: Vec<ExtrinsicFor<ChainApi>>,
941 ) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
942 {
943 let (response, request) =
944 TxMemPoolSyncRequest::extend_unwatched(self.clone(), source, validated_at, xts);
945 let _ = self.sync_channel.send(request);
946 response.recv().expect(SYNC_BRIDGE_EXPECT)
947 }
948
949 pub(super) fn remove_transactions_sync(
950 self: Arc<Self>,
951 tx_hashes: Vec<ExtrinsicHash<ChainApi>>,
952 ) {
953 let (response, request) =
954 TxMemPoolSyncRequest::remove_transactions(self.clone(), tx_hashes);
955 let _ = self.sync_channel.send(request);
956 response.recv().expect(SYNC_BRIDGE_EXPECT)
957 }
958
959 pub(super) fn update_transaction_priority_sync(
960 self: Arc<Self>,
961 hash: ExtrinsicHash<ChainApi>,
962 prio: Option<TransactionPriority>,
963 ) {
964 let (response, request) =
965 TxMemPoolSyncRequest::update_transaction_priority(self.clone(), hash, prio);
966 let _ = self.sync_channel.send(request);
967 response.recv().expect(SYNC_BRIDGE_EXPECT)
968 }
969}
970
971#[cfg(test)]
972mod tx_mem_pool_tests {
973 use futures::future::join_all;
974 use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
975 use substrate_test_runtime_client::Sr25519Keyring::*;
976
977 use crate::{
978 common::tests::TestApi, fork_aware_txpool::view_store::ViewStoreSubmitOutcome,
979 graph::ChainApi,
980 };
981
982 use super::*;
983
984 fn uxt(nonce: u64) -> Extrinsic {
985 crate::common::tests::uxt(Transfer {
986 from: Alice.into(),
987 to: AccountId::from_h256(H256::from_low_u64_be(2)),
988 amount: 5,
989 nonce,
990 })
991 }
992
993 #[tokio::test]
994 async fn extend_unwatched_obeys_limit() {
995 let max = 10;
996 let api = Arc::from(TestApi::default());
997 let mempool = TxMemPool::new_test(api, max, usize::MAX);
998
999 let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1000
1001 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1002 assert!(results.iter().take(max).all(Result::is_ok));
1003 assert!(matches!(
1004 results.into_iter().last().unwrap().unwrap_err(),
1005 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1006 ));
1007 }
1008
1009 #[tokio::test]
1010 async fn extend_unwatched_detects_already_imported() {
1011 sp_tracing::try_init_simple();
1012 let max = 10;
1013 let api = Arc::from(TestApi::default());
1014 let mempool = TxMemPool::new_test(api, max, usize::MAX);
1015
1016 let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1017 xts.push(xts.iter().last().unwrap().clone());
1018
1019 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1020 assert!(results.iter().take(max - 1).all(Result::is_ok));
1021 assert!(matches!(
1022 results.into_iter().last().unwrap().unwrap_err(),
1023 sc_transaction_pool_api::error::Error::AlreadyImported(_)
1024 ));
1025 }
1026
1027 #[tokio::test]
1028 async fn push_obeys_limit() {
1029 let max = 10;
1030 let api = Arc::from(TestApi::default());
1031 let mempool = TxMemPool::new_test(api, max, usize::MAX);
1032
1033 let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1034
1035 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1036 assert!(results.iter().all(Result::is_ok));
1037
1038 let xt = Arc::from(uxt(98));
1039 let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
1040 assert!(matches!(
1041 result.unwrap_err(),
1042 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1043 ));
1044 let xt = Arc::from(uxt(99));
1045 let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
1046 assert!(matches!(
1047 result.pop().unwrap().unwrap_err(),
1048 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1049 ));
1050 }
1051
1052 #[tokio::test]
1053 async fn push_detects_already_imported() {
1054 let max = 10;
1055 let api = Arc::from(TestApi::default());
1056 let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
1057
1058 let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1059 let xt0 = xts.iter().last().unwrap().clone();
1060 let xt1 = xts.iter().next().unwrap().clone();
1061
1062 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1063 assert!(results.iter().all(Result::is_ok));
1064
1065 let result = mempool.push_watched(TransactionSource::External, 0, xt0).await;
1066 assert!(matches!(
1067 result.unwrap_err(),
1068 sc_transaction_pool_api::error::Error::AlreadyImported(_)
1069 ));
1070 let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt1]).await;
1071 assert!(matches!(
1072 result.pop().unwrap().unwrap_err(),
1073 sc_transaction_pool_api::error::Error::AlreadyImported(_)
1074 ));
1075 }
1076
1077 #[tokio::test]
1078 async fn count_works() {
1079 sp_tracing::try_init_simple();
1080 trace!(target:LOG_TARGET,line=line!(),"xxx");
1081
1082 let max = 100;
1083 let api = Arc::from(TestApi::default());
1084 let mempool = TxMemPool::new_test(api, max, usize::MAX);
1085 trace!(target:LOG_TARGET,line=line!(),"xxx");
1086
1087 let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
1088 trace!(target:LOG_TARGET,line=line!(),"xxx");
1089
1090 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts0).await;
1091 trace!(target:LOG_TARGET,line=line!(),"xxx");
1092 assert!(results.iter().all(Result::is_ok));
1093 trace!(target:LOG_TARGET,line=line!(),"xxx");
1094
1095 let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
1096 trace!(target:LOG_TARGET,line=line!(),"xxx");
1097 let results = xts1
1098 .into_iter()
1099 .map(|t| mempool.push_watched(TransactionSource::External, 0, t));
1100 trace!(target:LOG_TARGET,line=line!(),"xxx");
1101 let results = join_all(results).await;
1102 trace!(target:LOG_TARGET,line=line!(),"xxx");
1103 assert!(results.iter().all(Result::is_ok));
1104 assert_eq!(mempool.unwatched_and_watched_count().await, (10, 5));
1105 }
1106
1107 const LARGE_XT_SIZE: usize = 1129;
1109
1110 fn large_uxt(x: usize) -> Extrinsic {
1111 ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
1112 }
1113
1114 #[tokio::test]
1115 async fn push_obeys_size_limit() {
1116 sp_tracing::try_init_simple();
1117 let max = 10;
1118 let api = Arc::from(TestApi::default());
1119 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1120
1121 let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1122
1123 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1124
1125 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1126 assert!(results.iter().all(Result::is_ok));
1127 assert_eq!(mempool.bytes(), total_xts_bytes);
1128
1129 let xt = Arc::from(large_uxt(98));
1130 let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
1131 assert!(matches!(
1132 result.unwrap_err(),
1133 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1134 ));
1135
1136 let xt = Arc::from(large_uxt(99));
1137 let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
1138 assert!(matches!(
1139 result.pop().unwrap().unwrap_err(),
1140 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1141 ));
1142 }
1143
1144 #[tokio::test]
1145 async fn replacing_txs_works_for_same_tx_size() {
1146 sp_tracing::try_init_simple();
1147 let max = 10;
1148 let api = Arc::from(TestApi::default());
1149 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1150
1151 let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1152
1153 let low_prio = 0u64;
1154 let hi_prio = u64::MAX;
1155
1156 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1157 let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1158 .iter()
1159 .map(|t| {
1160 let h = api.hash_and_length(t).0;
1161 (ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
1162 })
1163 .unzip();
1164
1165 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1166 assert!(results.iter().all(Result::is_ok));
1167 assert_eq!(mempool.bytes(), total_xts_bytes);
1168
1169 for o in submit_outcomes {
1170 mempool.update_transaction_priority(o.hash(), o.priority()).await;
1171 }
1172
1173 let xt = Arc::from(large_uxt(98));
1174 let hash = api.hash_and_length(&xt).0;
1175 let result = mempool
1176 .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1177 .await
1178 .unwrap();
1179
1180 assert_eq!(result.hash, hash);
1181 assert_eq!(result.removed, hashes[0..1]);
1182 }
1183
1184 #[tokio::test]
1185 async fn replacing_txs_removes_proper_size_of_txs() {
1186 sp_tracing::try_init_simple();
1187 let max = 10;
1188 let api = Arc::from(TestApi::default());
1189 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
1190
1191 let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1192
1193 let low_prio = 0u64;
1194 let hi_prio = u64::MAX;
1195
1196 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1197 let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1198 .iter()
1199 .map(|t| {
1200 let h = api.hash_and_length(t).0;
1201 (ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
1202 })
1203 .unzip();
1204
1205 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1206 assert!(results.iter().all(Result::is_ok));
1207 assert_eq!(mempool.bytes(), total_xts_bytes);
1208 assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
1209
1210 for o in submit_outcomes {
1211 mempool.update_transaction_priority(o.hash(), o.priority()).await;
1212 }
1213
1214 let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
1216 let (hash, length) = api.hash_and_length(&xt);
1217 assert_eq!(length, 1130);
1218 let result = mempool
1219 .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1220 .await
1221 .unwrap();
1222
1223 assert_eq!(result.hash, hash);
1224 assert_eq!(result.removed, hashes[0..2]);
1225 }
1226
1227 #[tokio::test]
1228 async fn replacing_txs_removes_proper_size_and_prios() {
1229 sp_tracing::try_init_simple();
1230 const COUNT: usize = 10;
1231 let api = Arc::from(TestApi::default());
1232 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1233
1234 let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1235
1236 let hi_prio = u64::MAX;
1237
1238 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1239 let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
1240 .iter()
1241 .enumerate()
1242 .map(|(prio, t)| {
1243 let h = api.hash_and_length(t).0;
1244 (ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
1245 })
1246 .unzip();
1247
1248 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1249 assert!(results.iter().all(Result::is_ok));
1250 assert_eq!(mempool.bytes(), total_xts_bytes);
1251
1252 for o in submit_outcomes {
1253 mempool.update_transaction_priority(o.hash(), o.priority()).await;
1254 }
1255
1256 let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
1258 let (hash, length) = api.hash_and_length(&xt);
1259 assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
1261 let result = mempool
1262 .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1263 .await
1264 .unwrap();
1265
1266 assert_eq!(result.hash, hash);
1267 assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
1268 }
1269
1270 #[tokio::test]
1271 async fn replacing_txs_skips_lower_prio_tx() {
1272 sp_tracing::try_init_simple();
1273 const COUNT: usize = 10;
1274 let api = Arc::from(TestApi::default());
1275 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1276
1277 let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1278
1279 let hi_prio = 100u64;
1280 let low_prio = 10u64;
1281
1282 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1283 let submit_outcomes: Vec<ViewStoreSubmitOutcome<TestApi>> = xts
1284 .iter()
1285 .map(|t| {
1286 let h = api.hash_and_length(t).0;
1287 ViewStoreSubmitOutcome::new(h, Some(hi_prio))
1288 })
1289 .collect::<Vec<_>>();
1290
1291 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1292 assert!(results.iter().all(Result::is_ok));
1293 assert_eq!(mempool.bytes(), total_xts_bytes);
1294
1295 for o in submit_outcomes {
1296 mempool.update_transaction_priority(o.hash(), o.priority()).await;
1297 }
1298
1299 let xt = Arc::from(large_uxt(98));
1300 let result = mempool
1301 .try_insert_with_replacement(xt, low_prio, TransactionSource::External, 0, false)
1302 .await;
1303
1304 assert!(matches!(
1306 result.unwrap_err(),
1307 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1308 ));
1309 }
1310
1311 #[tokio::test]
1312 async fn replacing_txs_is_skipped_if_prios_are_not_set() {
1313 sp_tracing::try_init_simple();
1314 const COUNT: usize = 10;
1315 let api = Arc::from(TestApi::default());
1316 let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
1317
1318 let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
1319
1320 let hi_prio = u64::MAX;
1321
1322 let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
1323
1324 let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
1325 assert!(results.iter().all(Result::is_ok));
1326 assert_eq!(mempool.bytes(), total_xts_bytes);
1327
1328 let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
1330 let length = api.hash_and_length(&xt).1;
1331 assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
1333
1334 let result = mempool
1335 .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
1336 .await;
1337
1338 assert!(matches!(
1340 result.unwrap_err(),
1341 sc_transaction_pool_api::error::Error::ImmediatelyDropped
1342 ));
1343 }
1344}