use super::metrics::MetricsLink as PrometheusMetrics;
use crate::{
common::tracing_log_xt::log_xt_trace,
graph::{
self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash,
IsValidator, TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction,
ValidatedTransactionFor,
},
LOG_TARGET,
};
use indexmap::IndexMap;
use parking_lot::Mutex;
use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
SaturatedConversion,
};
use std::{sync::Arc, time::Instant};
use tracing::{debug, trace};
pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> {
revalidated: IndexMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
invalid_hashes: Vec<ExtrinsicHash<ChainApi>>,
}
pub(super) type RevalidationResultReceiver<ChainApi> =
tokio::sync::mpsc::Receiver<RevalidationResult<ChainApi>>;
pub(super) type RevalidationResultSender<ChainApi> =
tokio::sync::mpsc::Sender<RevalidationResult<ChainApi>>;
pub(super) type FinishRevalidationRequestReceiver = tokio::sync::mpsc::Receiver<()>;
pub(super) type FinishRevalidationRequestSender = tokio::sync::mpsc::Sender<()>;
pub(super) struct FinishRevalidationLocalChannels<ChainApi: graph::ChainApi> {
finish_revalidation_request_tx: Option<FinishRevalidationRequestSender>,
revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
}
impl<ChainApi: graph::ChainApi> FinishRevalidationLocalChannels<ChainApi> {
pub fn new(
finish_revalidation_request_tx: FinishRevalidationRequestSender,
revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
) -> Self {
Self {
finish_revalidation_request_tx: Some(finish_revalidation_request_tx),
revalidation_result_rx,
}
}
fn remove_sender(&mut self) {
self.finish_revalidation_request_tx = None;
}
}
pub(super) struct FinishRevalidationWorkerChannels<ChainApi: graph::ChainApi> {
finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
revalidation_result_tx: RevalidationResultSender<ChainApi>,
}
impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> {
pub fn new(
finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
revalidation_result_tx: RevalidationResultSender<ChainApi>,
) -> Self {
Self { finish_revalidation_request_rx, revalidation_result_tx }
}
}
pub(super) type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>);
const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000;
pub(super) type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
type DroppedMonitoringStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
pub(super) struct ViewPoolObserver<ChainApi: graph::ChainApi> {
dropped_stream_sink: TracingUnboundedSender<
TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
>,
aggregated_stream_sink: TracingUnboundedSender<
TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
>,
}
impl<C: graph::ChainApi> graph::EventHandler<C> for ViewPoolObserver<C> {
fn broadcasted(&self, _: ExtrinsicHash<C>, _: Vec<String>) {}
fn dropped(&self, _: ExtrinsicHash<C>) {}
fn finalized(&self, _: ExtrinsicHash<C>, _: BlockHash<C>, _: usize) {}
fn retracted(&self, _: ExtrinsicHash<C>, _: BlockHash<C>) {
}
fn ready(&self, tx: ExtrinsicHash<C>) {
let status = TransactionStatus::Ready;
self.send_to_dropped_stream_sink(tx, status.clone());
self.send_to_aggregated_stream_sink(tx, status);
}
fn future(&self, tx: ExtrinsicHash<C>) {
let status = TransactionStatus::Future;
self.send_to_dropped_stream_sink(tx, status.clone());
self.send_to_aggregated_stream_sink(tx, status);
}
fn limits_enforced(&self, tx: ExtrinsicHash<C>) {
self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped);
}
fn usurped(&self, tx: ExtrinsicHash<C>, by: ExtrinsicHash<C>) {
self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by));
}
fn invalid(&self, tx: ExtrinsicHash<C>) {
self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid);
}
fn pruned(&self, tx: ExtrinsicHash<C>, block_hash: BlockHash<C>, tx_index: usize) {
self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index)));
}
fn finality_timeout(&self, tx: ExtrinsicHash<C>, hash: BlockHash<C>) {
self.send_to_aggregated_stream_sink(tx, TransactionStatus::FinalityTimeout(hash));
}
}
impl<ChainApi: graph::ChainApi> ViewPoolObserver<ChainApi> {
fn new() -> (
Self,
DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
) {
let (dropped_stream_sink, dropped_stream) =
tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD);
let (aggregated_stream_sink, aggregated_stream) =
tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD);
(Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream)
}
fn send_to_dropped_stream_sink(
&self,
tx: ExtrinsicHash<ChainApi>,
status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
) {
if let Err(e) = self.dropped_stream_sink.unbounded_send((tx, status.clone())) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e);
}
}
fn send_to_aggregated_stream_sink(
&self,
tx: ExtrinsicHash<ChainApi>,
status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
) {
if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx, status.clone())) {
trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e);
}
}
}
pub(super) struct View<ChainApi: graph::ChainApi> {
pub(super) pool: graph::Pool<ChainApi, ViewPoolObserver<ChainApi>>,
pub(super) at: HashAndNumber<ChainApi::Block>,
revalidation_worker_channels: Mutex<Option<FinishRevalidationLocalChannels<ChainApi>>>,
metrics: PrometheusMetrics,
}
impl<ChainApi> View<ChainApi>
where
ChainApi: graph::ChainApi,
<ChainApi::Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
at: HashAndNumber<ChainApi::Block>,
options: graph::Options,
metrics: PrometheusMetrics,
is_validator: IsValidator,
) -> (
Self,
DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
) {
metrics.report(|metrics| metrics.non_cloned_views.inc());
let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
(
Self {
pool: graph::Pool::new_with_event_handler(
options,
is_validator,
api,
event_handler,
),
at,
revalidation_worker_channels: Mutex::from(None),
metrics,
},
dropped_stream,
aggregated_stream,
)
}
pub(super) fn new_from_other(
&self,
at: &HashAndNumber<ChainApi::Block>,
) -> (
Self,
DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
) {
let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
(
View {
at: at.clone(),
pool: self.pool.deep_clone_with_event_handler(event_handler),
revalidation_worker_channels: Mutex::from(None),
metrics: self.metrics.clone(),
},
dropped_stream,
aggregated_stream,
)
}
pub(super) async fn submit_one(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
self.submit_many(std::iter::once((source, xt)))
.await
.pop()
.expect("There is exactly one result, qed.")
}
pub(super) async fn submit_many(
&self,
xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) {
let xts = xts.into_iter().collect::<Vec<_>>();
log_xt_trace!(
target: LOG_TARGET,
xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0),
"view::submit_many at:{}",
self.at.hash
);
self.pool.submit_at(&self.at, xts).await
} else {
self.pool.submit_at(&self.at, xts).await
}
}
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
let (tx_hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
trace!(
target: LOG_TARGET,
?tx_hash,
view_at_hash = ?self.at.hash,
"view::submit_local"
);
let validity = self
.pool
.validated_pool()
.api()
.validate_transaction_blocking(
self.at.hash,
sc_transaction_pool_api::TransactionSource::Local,
Arc::from(xt.clone()),
)?
.map_err(|e| {
match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
}
.into()
})?;
let block_number = self
.pool
.validated_pool()
.api()
.block_id_to_number(&BlockId::hash(self.at.hash))?
.ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;
let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
tx_hash,
TimedTransactionSource::new_local(true),
Arc::from(xt),
length,
validity,
);
self.pool.validated_pool().submit(vec![validated]).remove(0)
}
pub(super) fn status(&self) -> PoolStatus {
self.pool.validated_pool().status()
}
pub(super) async fn revalidate(
&self,
finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<ChainApi>,
) {
let FinishRevalidationWorkerChannels {
mut finish_revalidation_request_rx,
revalidation_result_tx,
} = finish_revalidation_worker_channels;
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
"view::revalidate: at starting"
);
let start = Instant::now();
let validated_pool = self.pool.validated_pool();
let api = validated_pool.api();
let batch: Vec<_> = validated_pool.ready().collect();
let batch_len = batch.len();
let mut invalid_hashes = Vec::new();
let mut revalidated = IndexMap::new();
let mut validation_results = vec![];
let mut batch_iter = batch.into_iter();
loop {
let mut should_break = false;
tokio::select! {
_ = finish_revalidation_request_rx.recv() => {
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
"view::revalidate: finish revalidation request received"
);
break
}
_ = async {
if let Some(tx) = batch_iter.next() {
let validation_result = (api.validate_transaction(self.at.hash, tx.source.clone().into(), tx.data.clone()).await, tx.hash, tx);
validation_results.push(validation_result);
} else {
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
should_break = true;
}
} => {}
}
if should_break {
break;
}
}
let revalidation_duration = start.elapsed();
self.metrics.report(|metrics| {
metrics.view_revalidation_duration.observe(revalidation_duration.as_secs_f64());
});
debug!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
count = validation_results.len(),
batch_len,
duration = ?revalidation_duration,
"view::revalidate"
);
log_xt_trace!(
data:tuple,
target:LOG_TARGET,
validation_results.iter().map(|x| (x.1, &x.0)),
"view::revalidate result: {:?}"
);
for (validation_result, tx_hash, tx) in validation_results {
match validation_result {
Ok(Err(TransactionValidityError::Invalid(_))) => {
invalid_hashes.push(tx_hash);
},
Ok(Ok(validity)) => {
revalidated.insert(
tx_hash,
ValidatedTransaction::valid_at(
self.at.number.saturated_into::<u64>(),
tx_hash,
tx.source.clone(),
tx.data.clone(),
api.hash_and_length(&tx.data).1,
validity,
),
);
},
Ok(Err(TransactionValidityError::Unknown(error))) => {
trace!(
target: LOG_TARGET,
?tx_hash,
?error,
"Removing. Cannot determine transaction validity"
);
invalid_hashes.push(tx_hash);
},
Err(error) => {
trace!(
target: LOG_TARGET,
?tx_hash,
%error,
"Removing due to error during revalidation"
);
invalid_hashes.push(tx_hash);
},
}
}
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
"view::revalidate: sending revalidation result"
);
if let Err(error) = revalidation_result_tx
.send(RevalidationResult { invalid_hashes, revalidated })
.await
{
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
?error,
"view::revalidate: sending revalidation_result failed"
);
}
}
pub(super) async fn start_background_revalidation(
view: Arc<Self>,
revalidation_queue: Arc<
super::revalidation_worker::RevalidationQueue<ChainApi, ChainApi::Block>,
>,
) {
trace!(
target: LOG_TARGET,
at_hash = ?view.at.hash,
"view::start_background_revalidation"
);
let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
tokio::sync::mpsc::channel(1);
let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
finish_revalidation_request_rx,
revalidation_result_tx,
);
let finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
finish_revalidation_request_tx,
revalidation_result_rx,
);
*view.revalidation_worker_channels.lock() = Some(finish_revalidation_local_channels);
revalidation_queue
.revalidate_view(view.clone(), finish_revalidation_worker_channels)
.await;
}
pub(super) async fn finish_revalidation(&self) {
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
"view::finish_revalidation"
);
let Some(revalidation_worker_channels) = self.revalidation_worker_channels.lock().take()
else {
trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx");
return
};
let FinishRevalidationLocalChannels {
finish_revalidation_request_tx,
mut revalidation_result_rx,
} = revalidation_worker_channels;
if let Some(finish_revalidation_request_tx) = finish_revalidation_request_tx {
if let Err(error) = finish_revalidation_request_tx.send(()).await {
trace!(
target: LOG_TARGET,
at_hash = ?self.at.hash,
%error,
"view::finish_revalidation: sending cancellation request failed"
);
}
}
if let Some(revalidation_result) = revalidation_result_rx.recv().await {
let start = Instant::now();
let revalidated_len = revalidation_result.revalidated.len();
let validated_pool = self.pool.validated_pool();
validated_pool.remove_invalid(&revalidation_result.invalid_hashes);
if revalidated_len > 0 {
self.pool.resubmit(revalidation_result.revalidated);
}
self.metrics.report(|metrics| {
let _ = (
revalidation_result
.invalid_hashes
.len()
.try_into()
.map(|v| metrics.view_revalidation_invalid_txs.inc_by(v)),
revalidated_len
.try_into()
.map(|v| metrics.view_revalidation_resubmitted_txs.inc_by(v)),
);
});
debug!(
target: LOG_TARGET,
invalid = revalidation_result.invalid_hashes.len(),
revalidated = revalidated_len,
at_hash = ?self.at.hash,
duration = ?start.elapsed(),
"view::finish_revalidation: applying revalidation result"
);
}
}
pub(super) fn is_imported(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
const IGNORE_BANNED: bool = false;
self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
}
pub fn remove_subtree<F>(
&self,
hashes: &[ExtrinsicHash<ChainApi>],
ban_transactions: bool,
listener_action: F,
) -> Vec<TransactionFor<ChainApi>>
where
F: Fn(
&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
ExtrinsicHash<ChainApi>,
),
{
self.pool
.validated_pool()
.remove_subtree(hashes, ban_transactions, listener_action)
}
}