use super::{
dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
import_notification_sink::MultiViewImportNotificationSink,
metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
multi_view_listener::MultiViewListener,
tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
view::View,
view_store::ViewStore,
};
use crate::{
api::FullChainApi,
common::tracing_log_xt::{log_xt_debug, log_xt_trace},
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::{
dropped_watcher::{DroppedReason, DroppedTransaction},
revalidation_worker,
},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options,
},
ReadyIteratorFor, LOG_TARGET,
};
use async_trait::async_trait;
use futures::{
channel::oneshot,
future::{self},
prelude::*,
FutureExt,
};
use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority,
TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor},
transaction_validity::{TransactionValidityError, ValidTransaction},
Saturating,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
pin::Pin,
sync::Arc,
time::Instant,
};
use tokio::select;
use tracing::{debug, info, trace, warn};
const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
struct ReadyPoll<T, Block>
where
Block: BlockT,
{
pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
}
impl<T, Block> ReadyPoll<T, Block>
where
Block: BlockT,
{
fn new() -> Self {
Self { pollers: Default::default() }
}
fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
let (s, r) = oneshot::channel();
self.pollers.entry(at).or_default().push(s);
r
}
fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
trace!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
let Some(pollers) = self.pollers.remove(&at) else { return };
pollers.into_iter().for_each(|p| {
debug!(target: LOG_TARGET, "trigger ready signal at block {}", at);
let _ = p.send(ready_iterator());
});
}
fn remove_cancelled(&mut self) {
self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
}
}
pub struct ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
api: Arc<ChainApi>,
mempool: Arc<TxMemPool<ChainApi, Block>>,
view_store: Arc<ViewStore<ChainApi, Block>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
metrics: PrometheusMetrics,
events_metrics_collector: EventsMetricsCollector<ChainApi>,
enactment_state: Arc<Mutex<EnactmentState<Block>>>,
revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
options: Options,
is_validator: IsValidator,
finality_timeout_threshold: usize,
included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
}
impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub fn new_test(
pool_api: Arc<ChainApi>,
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
finality_timeout_threshold: Option<usize>,
) -> (Self, ForkAwareTxPoolTask) {
Self::new_test_with_limits(
pool_api,
best_block_hash,
finalized_hash,
Options::default().ready,
Options::default().future,
usize::MAX,
finality_timeout_threshold,
)
}
pub fn new_test_with_limits(
pool_api: Arc<ChainApi>,
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
ready_limits: crate::PoolLimit,
future_limits: crate::PoolLimit,
mempool_max_transactions_count: usize,
finality_timeout_threshold: Option<usize>,
) -> (Self, ForkAwareTxPoolTask) {
let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
let listener = Arc::new(listener);
let (import_notification_sink, import_notification_sink_task) =
MultiViewImportNotificationSink::new_with_worker();
let mempool = Arc::from(TxMemPool::new(
pool_api.clone(),
listener.clone(),
Default::default(),
mempool_max_transactions_count,
ready_limits.total_bytes + future_limits.total_bytes,
));
let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();
let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
mempool.clone(),
view_store.clone(),
import_notification_sink.clone(),
);
let combined_tasks = async move {
tokio::select! {
_ = listener_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
}
}
.boxed();
let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
(
Self {
mempool,
api: pool_api,
view_store,
ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
finalized_hash,
))),
revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
import_notification_sink,
options,
is_validator: false.into(),
metrics: Default::default(),
events_metrics_collector: EventsMetricsCollector::default(),
finality_timeout_threshold: finality_timeout_threshold
.unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
included_transactions: Default::default(),
},
combined_tasks,
)
}
async fn dropped_monitor_task(
mut dropped_stream: StreamOfDropped<ChainApi>,
mempool: Arc<TxMemPool<ChainApi, Block>>,
view_store: Arc<ViewStore<ChainApi, Block>>,
import_notification_sink: MultiViewImportNotificationSink<
Block::Hash,
ExtrinsicHash<ChainApi>,
>,
) {
loop {
let Some(dropped) = dropped_stream.next().await else {
debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
break;
};
let tx_hash = dropped.tx_hash;
trace!(
target: LOG_TARGET,
?tx_hash,
reason = ?dropped.reason,
"fatp::dropped notification, removing"
);
match dropped.reason {
DroppedReason::Usurped(new_tx_hash) => {
if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
} else {
trace!(
target: LOG_TARGET,
tx_hash = ?new_tx_hash,
"error: dropped_monitor_task: no entry in mempool for new transaction"
);
};
},
DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
view_store.remove_transaction_subtree(tx_hash, |_, _| {});
},
};
mempool.remove_transactions(&[tx_hash]);
import_notification_sink.clean_notified_items(&[tx_hash]);
view_store.listener.transaction_dropped(dropped);
}
}
pub fn new_with_background_worker(
options: Options,
is_validator: IsValidator,
pool_api: Arc<ChainApi>,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnEssentialNamed,
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
) -> Self {
let metrics = PrometheusMetrics::new(prometheus);
let (events_metrics_collector, event_metrics_task) =
EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
let (listener, listener_task) =
MultiViewListener::new_with_worker(events_metrics_collector.clone());
let listener = Arc::new(listener);
let (revalidation_queue, revalidation_task) =
revalidation_worker::RevalidationQueue::new_with_worker();
let (import_notification_sink, import_notification_sink_task) =
MultiViewImportNotificationSink::new_with_worker();
let mempool = Arc::from(TxMemPool::new(
pool_api.clone(),
listener.clone(),
metrics.clone(),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * options.total_count(),
options.ready.total_bytes + options.future.total_bytes,
));
let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();
let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
mempool.clone(),
view_store.clone(),
import_notification_sink.clone(),
);
let combined_tasks = async move {
tokio::select! {
_ = listener_task => {}
_ = revalidation_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
_ = event_metrics_task => {},
}
}
.boxed();
spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
Self {
mempool,
api: pool_api,
view_store,
ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
finalized_hash,
))),
revalidation_queue: Arc::from(revalidation_queue),
import_notification_sink,
options,
metrics,
events_metrics_collector,
is_validator,
finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
included_transactions: Default::default(),
}
}
pub fn api(&self) -> &ChainApi {
&self.api
}
pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
self.view_store.status()
}
pub fn active_views_count(&self) -> usize {
self.view_store.active_views.read().len()
}
pub fn inactive_views_count(&self) -> usize {
self.view_store.inactive_views.read().len()
}
fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
self.view_store
.active_views
.read()
.iter()
.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
.collect()
}
pub fn has_view(&self, hash: &Block::Hash) -> bool {
self.view_store.active_views.read().contains_key(hash)
}
pub fn mempool_len(&self) -> (usize, usize) {
self.mempool.unwatched_and_watched_count()
}
pub fn futures_at(
&self,
at: Block::Hash,
) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
self.view_store.futures_at(at)
}
pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
let start = Instant::now();
let api = self.api.clone();
trace!(
target: LOG_TARGET,
?at,
"fatp::ready_at_light"
);
let Ok(block_number) = self.api.resolve_block_number(at) else {
return Box::new(std::iter::empty())
};
let best_result = {
api.tree_route(self.enactment_state.lock().recent_finalized_block(), at).map(
|tree_route| {
if let Some((index, view)) =
tree_route.enacted().iter().enumerate().rev().skip(1).find_map(|(i, b)| {
self.view_store.get_view_at(b.hash, true).map(|(view, _)| (i, view))
}) {
let e = tree_route.enacted()[index..].to_vec();
(TreeRoute::new(e, 0).ok(), Some(view))
} else {
(None, None)
}
},
)
};
if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
let (tmp_view, _, _): (View<ChainApi>, _, _) =
View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number });
let mut all_extrinsics = vec![];
for h in best_tree_route.enacted() {
let extrinsics = api
.block_body(h.hash)
.await
.unwrap_or_else(|error| {
warn!(
target: LOG_TARGET,
%error,
"Compute ready light transactions: error request"
);
None
})
.unwrap_or_default()
.into_iter()
.map(|t| api.hash_and_length(&t).0);
all_extrinsics.extend(extrinsics);
}
let before_count = tmp_view.pool.validated_pool().status().ready;
let tags = tmp_view
.pool
.validated_pool()
.extrinsics_tags(&all_extrinsics)
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
let _ = tmp_view.pool.validated_pool().prune_tags(tags);
let after_count = tmp_view.pool.validated_pool().status().ready;
debug!(
target: LOG_TARGET,
?at,
best_view_hash = ?best_view.at.hash,
before_count,
to_be_removed = all_extrinsics.len(),
after_count,
duration = ?start.elapsed(),
"fatp::ready_at_light"
);
Box::new(tmp_view.pool.validated_pool().ready())
} else {
let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
debug!(
target: LOG_TARGET,
?at,
duration = ?start.elapsed(),
"fatp::ready_at_light -> empty"
);
empty
}
}
async fn ready_at_with_timeout_internal(
&self,
at: Block::Hash,
timeout: std::time::Duration,
) -> ReadyIteratorFor<ChainApi> {
debug!(
target: LOG_TARGET,
?at,
?timeout,
"fatp::ready_at_with_timeout"
);
let timeout = futures_timer::Delay::new(timeout);
let (view_already_exists, ready_at) = self.ready_at_internal(at);
if view_already_exists {
return ready_at.await;
}
let maybe_ready = async move {
select! {
ready = ready_at => Some(ready),
_ = timeout => {
warn!(
target: LOG_TARGET,
?at,
"Timeout fired waiting for transaction pool at block. Proceeding with production."
);
None
}
}
};
let fall_back_ready = self.ready_at_light(at);
let (maybe_ready, fall_back_ready) =
futures::future::join(maybe_ready, fall_back_ready).await;
maybe_ready.unwrap_or(fall_back_ready)
}
fn ready_at_internal(
&self,
at: Block::Hash,
) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
let mut ready_poll = self.ready_poll.lock();
if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
debug!(
target: LOG_TARGET,
?at,
?inactive,
"fatp::ready_at_internal"
);
let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
return (true, async move { iterator }.boxed());
}
let pending = ready_poll
.add(at)
.map(|received| {
received.unwrap_or_else(|error| {
warn!(
target: LOG_TARGET,
%error,
"Error receiving ready-set iterator"
);
Box::new(std::iter::empty())
})
})
.boxed();
debug!(
target: LOG_TARGET,
?at,
pending_keys = ?ready_poll.pollers.keys(),
"fatp::ready_at_internal"
);
(false, pending)
}
}
fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
let mut values = input.values();
let Some(first) = values.next() else {
return Default::default();
};
let length = first.len();
debug_assert!(values.all(|x| length == x.len()));
input
.into_values()
.reduce(|mut agg_results, results| {
agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
if agg_r.is_err() {
*agg_r = r;
}
});
agg_results
})
.unwrap_or_default()
}
#[async_trait]
impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: 'static + graph::ChainApi<Block = Block>,
<Block as BlockT>::Hash: Unpin,
{
type Block = ChainApi::Block;
type Hash = ExtrinsicHash<ChainApi>;
type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
type Error = ChainApi::Error;
async fn submit_at(
&self,
_: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let view_store = self.view_store.clone();
debug!(
target: LOG_TARGET,
count = xts.len(),
active_views_count = self.active_views_count(),
"fatp::submit_at"
);
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_results = self.mempool.extend_unwatched(source, &xts);
if view_store.is_empty() {
return Ok(mempool_results
.into_iter()
.map(|r| r.map(|r| r.hash).map_err(Into::into))
.collect::<Vec<_>>())
}
let retries = mempool_results
.into_iter()
.zip(xts.clone())
.map(|(result, xt)| async move {
match result {
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(source, false, xt).await,
_ => result,
}
})
.collect::<Vec<_>>();
let mempool_results = futures::future::join_all(retries).await;
let to_be_submitted = mempool_results
.iter()
.zip(xts)
.filter_map(|(result, xt)| {
result.as_ref().ok().map(|insertion| {
self.events_metrics_collector.report_submitted(&insertion);
(insertion.source.clone(), xt)
})
})
.collect::<Vec<_>>();
self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
let mempool = self.mempool.clone();
let results_map = view_store.submit(to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();
const RESULTS_ASSUMPTION : &str =
"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
Ok(mempool_results
.into_iter()
.map(|result| {
result.map_err(Into::into).and_then(|insertion| {
submission_results.next().expect(RESULTS_ASSUMPTION).inspect_err(|_| {
mempool.remove_transactions(&[insertion.hash]);
})
})
})
.map(|r| {
r.map(|r| {
mempool.update_transaction_priority(&r);
r.hash()
})
})
.collect::<Vec<_>>())
}
async fn submit_one(
&self,
_at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> Result<TxHash<Self>, Self::Error> {
trace!(
target: LOG_TARGET,
tx_hash = ?self.tx_hash(&xt),
active_views_count = self.active_views_count(),
"fatp::submit_one"
);
match self.submit_at(_at, source, vec![xt]).await {
Ok(mut v) =>
v.pop().expect("There is exactly one element in result of submit_at. qed."),
Err(e) => Err(e),
}
}
async fn submit_and_watch(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
trace!(
target: LOG_TARGET,
tx_hash = ?self.tx_hash(&xt),
views = self.active_views_count(),
"fatp::submit_and_watch"
);
let xt = Arc::from(xt);
let insertion = match self.mempool.push_watched(source, xt.clone()) {
Ok(result) => result,
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(source, true, xt.clone()).await?,
Err(e) => return Err(e.into()),
};
self.metrics.report(|metrics| metrics.submitted_transactions.inc());
self.events_metrics_collector.report_submitted(&insertion);
self.view_store
.submit_and_watch(at, insertion.source, xt)
.await
.inspect_err(|_| {
self.mempool.remove_transactions(&[insertion.hash]);
})
.map(|mut outcome| {
self.mempool.update_transaction_priority(&outcome);
outcome.expect_watcher()
})
}
fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
) -> Vec<Arc<Self::InPoolTransaction>> {
debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
self.metrics
.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
let removed = self.view_store.report_invalid(at, invalid_tx_errors);
let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
self.mempool.remove_transactions(&removed_hashes);
self.import_notification_sink.clean_notified_items(&removed_hashes);
self.metrics
.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
removed
}
fn status(&self) -> PoolStatus {
self.view_store
.most_recent_view
.read()
.map(|hash| self.view_store.status()[&hash].clone())
.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
}
fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
self.import_notification_sink.event_stream()
}
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.api().hash_and_length(xt).0
}
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.view_store.listener.transactions_broadcasted(propagations);
}
fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
let most_recent_view = self.view_store.most_recent_view.read();
let result = most_recent_view
.map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash))
.flatten();
trace!(
target: LOG_TARGET,
?tx_hash,
is_ready = result.is_some(),
?most_recent_view,
"ready_transaction"
);
result
}
async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
let (_, result) = self.ready_at_internal(at);
result.await
}
fn ready(&self) -> ReadyIteratorFor<ChainApi> {
self.view_store.ready()
}
fn futures(&self) -> Vec<Self::InPoolTransaction> {
self.view_store.futures()
}
async fn ready_at_with_timeout(
&self,
at: <Self::Block as BlockT>::Hash,
timeout: std::time::Duration,
) -> ReadyIteratorFor<ChainApi> {
self.ready_at_with_timeout_internal(at, timeout).await
}
}
impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
for ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: 'static + graph::ChainApi<Block = Block>,
<Block as BlockT>::Hash: Unpin,
{
type Block = Block;
type Hash = ExtrinsicHash<ChainApi>;
type Error = ChainApi::Error;
fn submit_local(
&self,
_at: Block::Hash,
xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
debug!(
target: LOG_TARGET,
active_views_count = self.active_views_count(),
"fatp::submit_local"
);
let xt = Arc::from(xt);
let result =
self.mempool.extend_unwatched(TransactionSource::Local, &[xt.clone()]).remove(0);
let insertion = match result {
Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
TransactionSource::Local,
false,
xt.clone(),
),
_ => result,
}?;
self.view_store
.submit_local(xt)
.inspect_err(|_| {
self.mempool.remove_transactions(&[insertion.hash]);
})
.map(|outcome| {
self.mempool.update_transaction_priority(&outcome);
outcome.hash()
})
.or_else(|_| Ok(insertion.hash))
}
}
impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
let hash_and_number = match tree_route.last() {
Some(hash_and_number) => hash_and_number,
None => {
warn!(
target: LOG_TARGET,
?tree_route,
"Skipping ChainEvent - no last block in tree route"
);
return
},
};
if self.has_view(&hash_and_number.hash) {
trace!(
target: LOG_TARGET,
?hash_and_number,
"view already exists for block"
);
return
}
let best_view = self.view_store.find_best_view(tree_route);
let new_view = self.build_new_view(best_view, hash_and_number, tree_route).await;
if let Some(view) = new_view {
{
let view = view.clone();
self.ready_poll.lock().trigger(hash_and_number.hash, move || {
Box::from(view.pool.validated_pool().ready())
});
}
View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
}
self.finality_stall_cleanup(hash_and_number);
}
fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
let (oldest_block_number, finality_timedout_blocks) = {
let mut included_transactions = self.included_transactions.lock();
let Some(oldest_block_number) =
included_transactions.first_key_value().map(|(k, _)| k.number)
else {
return
};
if at.number.saturating_sub(oldest_block_number).into() <=
self.finality_timeout_threshold.into()
{
return
}
let mut finality_timedout_blocks =
indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
included_transactions.retain(
|HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
let diff = at.number.saturating_sub(*view_number);
if diff.into() > self.finality_timeout_threshold.into() {
finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
false
} else {
true
}
},
);
(oldest_block_number, finality_timedout_blocks)
};
if !finality_timedout_blocks.is_empty() {
self.ready_poll.lock().remove_cancelled();
self.view_store.listener.remove_stale_controllers();
}
let finality_timedout_blocks_len = finality_timedout_blocks.len();
for (block_hash, tx_hashes) in finality_timedout_blocks {
self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
self.mempool.remove_transactions(&tx_hashes);
self.import_notification_sink.clean_notified_items(&tx_hashes);
self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
}
self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
debug!(
target: LOG_TARGET,
?at,
included_transactions_len = ?self.included_transactions.lock().len(),
finality_timedout_blocks_len,
?oldest_block_number,
"finality_stall_cleanup"
);
}
async fn build_new_view(
&self,
origin_view: Option<Arc<View<ChainApi>>>,
at: &HashAndNumber<Block>,
tree_route: &TreeRoute<Block>,
) -> Option<Arc<View<ChainApi>>> {
debug!(
target: LOG_TARGET,
?at,
origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
?tree_route,
"build_new_view"
);
let (mut view, view_dropped_stream, view_aggregated_stream) =
if let Some(origin_view) = origin_view {
let (mut view, view_dropped_stream, view_aggragated_stream) =
View::new_from_other(&origin_view, at);
if !tree_route.retracted().is_empty() {
view.pool.clear_recently_pruned();
}
(view, view_dropped_stream, view_aggragated_stream)
} else {
debug!(
target: LOG_TARGET,
?at,
"creating non-cloned view"
);
View::new(
self.api.clone(),
at.clone(),
self.options.clone(),
self.metrics.clone(),
self.is_validator.clone(),
)
};
let start = Instant::now();
self.import_notification_sink.add_view(
view.at.hash,
view.pool.validated_pool().import_notification_stream().boxed(),
);
self.view_store
.dropped_stream_controller
.add_view(view.at.hash, view_dropped_stream.boxed());
self.view_store
.listener
.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
view.pool.validated_pool().retrigger_notifications();
debug!(
target: LOG_TARGET,
?at,
duration = ?start.elapsed(),
"register_listeners"
);
let start = Instant::now();
self.update_view_with_fork(&view, tree_route, at.clone()).await;
debug!(
target: LOG_TARGET,
?at,
duration = ?start.elapsed(),
"update_view_with_fork"
);
let start = Instant::now();
self.update_view_with_mempool(&mut view).await;
debug!(
target: LOG_TARGET,
?at,
duration= ?start.elapsed(),
"update_view_with_mempool"
);
let view = Arc::from(view);
self.view_store.insert_new_view(view.clone(), tree_route).await;
Some(view)
}
async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
if let Some(txs) = self.included_transactions.lock().get(at) {
return txs.clone()
};
trace!(
target: LOG_TARGET,
?at,
"fetch_block_transactions from api"
);
self.api
.block_body(at.hash)
.await
.unwrap_or_else(|error| {
warn!(
target: LOG_TARGET,
%error,
"fetch_block_transactions: error request"
);
None
})
.unwrap_or_default()
.into_iter()
.map(|t| self.hash_of(&t))
.collect::<Vec<_>>()
}
async fn txs_included_since_finalized(
&self,
at: &HashAndNumber<Block>,
) -> HashSet<TxHash<Self>> {
let start = Instant::now();
let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
return Default::default()
};
let mut all_txs = HashSet::new();
for block in tree_route.enacted().iter() {
if at.number.saturating_sub(block.number).into() <=
self.finality_timeout_threshold.into()
{
all_txs.extend(self.fetch_block_transactions(block).await);
}
}
debug!(
target: LOG_TARGET,
?at,
?recent_finalized_block,
extrinsics_count = all_txs.len(),
duration = ?start.elapsed(),
"fatp::txs_included_since_finalized"
);
all_txs
}
async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
debug!(
target: LOG_TARGET,
view_at = ?view.at,
xts_count = ?self.mempool.unwatched_and_watched_count(),
active_views_count = self.active_views_count(),
"update_view_with_mempool"
);
let included_xts = self.txs_included_since_finalized(&view.at).await;
let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
.mempool
.clone_transactions()
.into_iter()
.filter(|(hash, _)| !view.is_imported(hash))
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
.unzip();
let results = view
.submit_many(xts_filtered)
.await
.into_iter()
.zip(hashes)
.map(|(result, tx_hash)| {
result
.map(|outcome| self.mempool.update_transaction_priority(&outcome.into()))
.or_else(|_| Err(tx_hash))
})
.collect::<Vec<_>>();
let submitted_count = results.len();
debug!(
target: LOG_TARGET,
view_at_hash = ?view.at.hash,
submitted_count,
mempool_len = self.mempool.len(),
"update_view_with_mempool"
);
self.metrics
.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
if self.view_store.is_empty() {
for result in results {
if let Err(tx_hash) = result {
self.view_store.listener.transactions_invalidated(&[tx_hash]);
self.mempool.remove_transactions(&[tx_hash]);
}
}
}
}
async fn update_view_with_fork(
&self,
view: &View<ChainApi>,
tree_route: &TreeRoute<Block>,
hash_and_number: HashAndNumber<Block>,
) {
debug!(
target: LOG_TARGET,
?tree_route,
at = ?view.at,
"update_view_with_fork"
);
let api = self.api.clone();
let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
future::join_all(tree_route.enacted().iter().map(|hn| {
let api = api.clone();
async move { (hn, crate::prune_known_txs_for_block(hn, &*api, &view.pool).await) }
}))
.await
.into_iter()
.for_each(|(key, enacted_log)| {
pruned_log.extend(enacted_log.clone());
self.included_transactions.lock().insert(key.clone(), enacted_log);
});
self.metrics.report(|metrics| {
metrics
.unknown_from_block_import_txs
.inc_by(self.mempool.count_unknown_transactions(pruned_log.iter()) as _)
});
{
let mut resubmit_transactions = Vec::new();
for retracted in tree_route.retracted() {
let hash = retracted.hash;
let block_transactions = api
.block_body(hash)
.await
.unwrap_or_else(|error| {
warn!(
target: LOG_TARGET,
%error,
"Failed to fetch block body"
);
None
})
.unwrap_or_default()
.into_iter();
let mut resubmitted_to_report = 0;
resubmit_transactions.extend(
block_transactions
.into_iter()
.map(|tx| (self.hash_of(&tx), tx))
.filter(|(tx_hash, _)| {
let contains = pruned_log.contains(&tx_hash);
resubmitted_to_report += 1;
if !contains {
trace!(
target: LOG_TARGET,
?tx_hash,
?hash,
"Resubmitting from retracted block"
);
}
!contains
})
.map(|(tx_hash, tx)| {
self.mempool
.get_by_hash(tx_hash)
.map(|tx| (tx.source(), tx.tx()))
.unwrap_or_else(|| {
(TimedTransactionSource::new_external(true), Arc::from(tx))
})
}),
);
self.metrics.report(|metrics| {
metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
});
}
let _ = view.pool.resubmit_at(&hash_and_number, resubmit_transactions).await;
}
}
async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
debug!(
target: LOG_TARGET,
?finalized_number,
?tree_route,
active_views_count = self.active_views_count(),
"handle_finalized"
);
let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
self.mempool.purge_finalized_transactions(&finalized_xts).await;
self.import_notification_sink.clean_notified_items(&finalized_xts);
self.metrics
.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
if let Ok(Some(finalized_number)) = finalized_number {
self.included_transactions
.lock()
.retain(|cached_block, _| finalized_number < cached_block.number);
self.revalidation_queue
.revalidate_mempool(
self.mempool.clone(),
self.view_store.clone(),
HashAndNumber { hash: finalized_hash, number: finalized_number },
)
.await;
} else {
trace!(
target: LOG_TARGET,
?finalized_number,
"handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
);
}
self.ready_poll.lock().remove_cancelled();
debug!(
target: LOG_TARGET,
active_views_count = self.active_views_count(),
included_transactions_len = ?self.included_transactions.lock().len(),
"handle_finalized after"
);
}
fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.api.hash_and_length(xt).0
}
async fn attempt_transaction_replacement(
&self,
source: TransactionSource,
watched: bool,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let at = self
.view_store
.most_recent_view
.read()
.ok_or(TxPoolApiError::ImmediatelyDropped)?;
let (best_view, _) = self
.view_store
.get_view_at(at, false)
.ok_or(TxPoolApiError::ImmediatelyDropped)?;
let (xt_hash, validated_tx) = best_view
.pool
.verify_one(
best_view.at.hash,
best_view.at.number,
TimedTransactionSource::from_transaction_source(source, false),
xt.clone(),
crate::graph::CheckBannedBeforeVerify::Yes,
)
.await;
let Some(priority) = validated_tx.priority() else {
return Err(TxPoolApiError::ImmediatelyDropped)
};
self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
}
fn attempt_transaction_replacement_sync(
&self,
source: TransactionSource,
watched: bool,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let at = self
.view_store
.most_recent_view
.read()
.ok_or(TxPoolApiError::ImmediatelyDropped)?;
let ValidTransaction { priority, .. } = self
.api
.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))
.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
.map_err(|e| match e {
TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
})?;
let xt_hash = self.hash_of(&xt);
self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
}
fn attempt_transaction_replacement_inner(
&self,
xt: ExtrinsicFor<ChainApi>,
tx_hash: ExtrinsicHash<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
watched: bool,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let insertion_info =
self.mempool.try_insert_with_replacement(xt, priority, source, watched)?;
for worst_hash in &insertion_info.removed {
trace!(
target: LOG_TARGET,
tx_hash = ?worst_hash,
new_tx_hash = ?tx_hash,
"removed: replaced by"
);
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
self.view_store
.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
listener.limits_enforced(&removed_tx_hash);
});
}
return Ok(insertion_info)
}
}
#[async_trait]
impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: 'static + graph::ChainApi<Block = Block>,
<Block as BlockT>::Hash: Unpin,
{
async fn maintain(&self, event: ChainEvent<Self::Block>) {
let start = Instant::now();
debug!(
target: LOG_TARGET,
?event,
"processing event"
);
self.view_store.finish_background_revalidations().await;
let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
match self.api.tree_route(from, to) {
Ok(tree_route) => Ok(tree_route),
Err(e) =>
return Err(format!(
"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
)),
}
};
let block_id_to_number =
|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
let result =
self.enactment_state
.lock()
.update(&event, &compute_tree_route, &block_id_to_number);
match result {
Err(error) => {
trace!(
target: LOG_TARGET,
%error,
"enactment_state::update error"
);
self.enactment_state.lock().force_update(&event);
},
Ok(EnactmentAction::Skip) => return,
Ok(EnactmentAction::HandleFinalization) => {
},
Ok(EnactmentAction::HandleEnactment(tree_route)) => {
self.handle_new_block(&tree_route).await;
},
};
match event {
ChainEvent::NewBestBlock { .. } => {},
ChainEvent::Finalized { hash, ref tree_route } => {
self.handle_finalized(hash, tree_route).await;
trace!(
target: LOG_TARGET,
?tree_route,
?prev_finalized_block,
"on-finalized enacted"
);
},
}
let duration = start.elapsed();
info!(
target: LOG_TARGET,
txs = ?self.mempool_len(),
a = self.active_views_count(),
i = self.inactive_views_count(),
views = ?self.views_stats(),
?event,
?duration,
"maintain"
);
self.metrics.report(|metrics| {
let (unwatched, watched) = self.mempool_len();
let _ = (
self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
watched.try_into().map(|v| metrics.watched_txs.set(v)),
unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
);
metrics.maintain_duration.observe(duration.as_secs_f64());
});
}
}
impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>
+ sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
+ Send
+ Sync
+ 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
<Block as BlockT>::Hash: std::marker::Unpin,
{
pub fn new_full(
options: Options,
is_validator: IsValidator,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnEssentialNamed,
client: Arc<Client>,
) -> Self {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
let pool = Self::new_with_background_worker(
options,
is_validator,
pool_api,
prometheus,
spawner,
client.usage_info().chain.best_hash,
client.usage_info().chain.finalized_hash,
);
pool
}
}
#[cfg(test)]
mod reduce_multiview_result_tests {
use super::*;
use sp_core::H256;
#[derive(Debug, PartialEq, Clone)]
enum Error {
Custom(u8),
}
#[test]
fn empty() {
sp_tracing::try_init_simple();
let input = HashMap::default();
let r = reduce_multiview_result::<H256, H256, Error>(input);
assert!(r.is_empty());
}
#[test]
fn errors_only() {
sp_tracing::try_init_simple();
let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
(
H256::repeat_byte(0x13),
vec![
Err(Error::Custom(10)),
Err(Error::Custom(11)),
Err(Error::Custom(12)),
Err(Error::Custom(13)),
],
),
(
H256::repeat_byte(0x14),
vec![
Err(Error::Custom(20)),
Err(Error::Custom(21)),
Err(Error::Custom(22)),
Err(Error::Custom(23)),
],
),
(
H256::repeat_byte(0x15),
vec![
Err(Error::Custom(30)),
Err(Error::Custom(31)),
Err(Error::Custom(32)),
Err(Error::Custom(33)),
],
),
];
let input = HashMap::from_iter(v.clone());
let r = reduce_multiview_result(input);
assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
}
#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn invalid_lengths() {
sp_tracing::try_init_simple();
let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
];
let input = HashMap::from_iter(v);
let _ = reduce_multiview_result(input);
}
#[test]
fn only_hashes() {
sp_tracing::try_init_simple();
let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
(
H256::repeat_byte(0x13),
vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
),
(
H256::repeat_byte(0x14),
vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
),
];
let input = HashMap::from_iter(v);
let r = reduce_multiview_result(input);
assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
}
#[test]
fn one_view() {
sp_tracing::try_init_simple();
let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
H256::repeat_byte(0x13),
vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
)];
let input = HashMap::from_iter(v);
let r = reduce_multiview_result(input);
assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
}
#[test]
fn mix() {
sp_tracing::try_init_simple();
let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
(
H256::repeat_byte(0x13),
vec![
Ok(H256::repeat_byte(0x10)),
Err(Error::Custom(11)),
Err(Error::Custom(12)),
Err(Error::Custom(33)),
],
),
(
H256::repeat_byte(0x14),
vec![
Err(Error::Custom(20)),
Ok(H256::repeat_byte(0x21)),
Err(Error::Custom(22)),
Err(Error::Custom(33)),
],
),
(
H256::repeat_byte(0x15),
vec![
Err(Error::Custom(30)),
Err(Error::Custom(31)),
Ok(H256::repeat_byte(0x32)),
Err(Error::Custom(33)),
],
),
];
let input = HashMap::from_iter(v);
let r = reduce_multiview_result(input);
assert_eq!(
r,
vec![
Ok(H256::repeat_byte(0x10)),
Ok(H256::repeat_byte(0x21)),
Ok(H256::repeat_byte(0x32)),
Err(Error::Custom(33))
]
);
}
}