use super::metrics::MetricsLink as PrometheusMetrics;
use crate::{
common::log_xt::log_xt_trace,
graph::{
self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash,
IsValidator, ValidatedTransaction, ValidatedTransactionFor,
},
LOG_TARGET,
};
use parking_lot::Mutex;
use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
SaturatedConversion,
};
use std::{collections::HashMap, sync::Arc, time::Instant};
pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> {
revalidated: HashMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
invalid_hashes: Vec<ExtrinsicHash<ChainApi>>,
}
pub(super) type RevalidationResultReceiver<ChainApi> =
tokio::sync::mpsc::Receiver<RevalidationResult<ChainApi>>;
pub(super) type RevalidationResultSender<ChainApi> =
tokio::sync::mpsc::Sender<RevalidationResult<ChainApi>>;
pub(super) type FinishRevalidationRequestReceiver = tokio::sync::mpsc::Receiver<()>;
pub(super) type FinishRevalidationRequestSender = tokio::sync::mpsc::Sender<()>;
pub(super) struct FinishRevalidationLocalChannels<ChainApi: graph::ChainApi> {
finish_revalidation_request_tx: Option<FinishRevalidationRequestSender>,
revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
}
impl<ChainApi: graph::ChainApi> FinishRevalidationLocalChannels<ChainApi> {
pub fn new(
finish_revalidation_request_tx: FinishRevalidationRequestSender,
revalidation_result_rx: RevalidationResultReceiver<ChainApi>,
) -> Self {
Self {
finish_revalidation_request_tx: Some(finish_revalidation_request_tx),
revalidation_result_rx,
}
}
fn remove_sender(&mut self) {
self.finish_revalidation_request_tx = None;
}
}
pub(super) struct FinishRevalidationWorkerChannels<ChainApi: graph::ChainApi> {
finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
revalidation_result_tx: RevalidationResultSender<ChainApi>,
}
impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> {
pub fn new(
finish_revalidation_request_rx: FinishRevalidationRequestReceiver,
revalidation_result_tx: RevalidationResultSender<ChainApi>,
) -> Self {
Self { finish_revalidation_request_rx, revalidation_result_tx }
}
}
pub(super) struct View<ChainApi: graph::ChainApi> {
pub(super) pool: graph::Pool<ChainApi>,
pub(super) at: HashAndNumber<ChainApi::Block>,
revalidation_worker_channels: Mutex<Option<FinishRevalidationLocalChannels<ChainApi>>>,
metrics: PrometheusMetrics,
}
impl<ChainApi> View<ChainApi>
where
ChainApi: graph::ChainApi,
<ChainApi::Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
at: HashAndNumber<ChainApi::Block>,
options: graph::Options,
metrics: PrometheusMetrics,
is_validator: IsValidator,
) -> Self {
metrics.report(|metrics| metrics.non_cloned_views.inc());
Self {
pool: graph::Pool::new(options, is_validator, api),
at,
revalidation_worker_channels: Mutex::from(None),
metrics,
}
}
pub(super) fn new_from_other(&self, at: &HashAndNumber<ChainApi::Block>) -> Self {
View {
at: at.clone(),
pool: self.pool.deep_clone(),
revalidation_worker_channels: Mutex::from(None),
metrics: self.metrics.clone(),
}
}
pub(super) async fn submit_many(
&self,
xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
if log::log_enabled!(target: LOG_TARGET, log::Level::Trace) {
let xts = xts.into_iter().collect::<Vec<_>>();
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "[{:?}] view::submit_many at:{}", self.at.hash);
self.pool.submit_at(&self.at, xts).await
} else {
self.pool.submit_at(&self.at, xts).await
}
}
pub(super) async fn submit_and_watch(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>>, ChainApi::Error> {
log::trace!(target: LOG_TARGET, "[{:?}] view::submit_and_watch at:{}", self.pool.validated_pool().api().hash_and_length(&xt).0, self.at.hash);
self.pool.submit_and_watch(&self.at, source, xt).await
}
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash);
let validity = self
.pool
.validated_pool()
.api()
.validate_transaction_blocking(
self.at.hash,
sc_transaction_pool_api::TransactionSource::Local,
Arc::from(xt.clone()),
)?
.map_err(|e| {
match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
}
.into()
})?;
let block_number = self
.pool
.validated_pool()
.api()
.block_id_to_number(&BlockId::hash(self.at.hash))?
.ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;
let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash,
TimedTransactionSource::new_local(true),
Arc::from(xt),
length,
validity,
);
self.pool.validated_pool().submit(vec![validated]).remove(0)
}
pub(super) fn status(&self) -> PoolStatus {
self.pool.validated_pool().status()
}
pub(super) fn create_watcher(
&self,
tx_hash: ExtrinsicHash<ChainApi>,
) -> Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>> {
self.pool.validated_pool().create_watcher(tx_hash)
}
pub(super) async fn revalidate(
&self,
finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<ChainApi>,
) {
let FinishRevalidationWorkerChannels {
mut finish_revalidation_request_rx,
revalidation_result_tx,
} = finish_revalidation_worker_channels;
log::trace!(target:LOG_TARGET, "view::revalidate: at {} starting", self.at.hash);
let start = Instant::now();
let validated_pool = self.pool.validated_pool();
let api = validated_pool.api();
let batch: Vec<_> = validated_pool.ready().collect();
let batch_len = batch.len();
let mut invalid_hashes = Vec::new();
let mut revalidated = HashMap::new();
let mut validation_results = vec![];
let mut batch_iter = batch.into_iter();
loop {
let mut should_break = false;
tokio::select! {
_ = finish_revalidation_request_rx.recv() => {
log::trace!(target: LOG_TARGET, "view::revalidate: finish revalidation request received at {}.", self.at.hash);
break
}
_ = async {
if let Some(tx) = batch_iter.next() {
let validation_result = (api.validate_transaction(self.at.hash, tx.source.clone().into(), tx.data.clone()).await, tx.hash, tx);
validation_results.push(validation_result);
} else {
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
should_break = true;
}
} => {}
}
if should_break {
break;
}
}
let revalidation_duration = start.elapsed();
self.metrics.report(|metrics| {
metrics.view_revalidation_duration.observe(revalidation_duration.as_secs_f64());
});
log::debug!(
target:LOG_TARGET,
"view::revalidate: at {:?} count: {}/{} took {:?}",
self.at.hash,
validation_results.len(),
batch_len,
revalidation_duration
);
log_xt_trace!(data:tuple, target:LOG_TARGET, validation_results.iter().map(|x| (x.1, &x.0)), "[{:?}] view::revalidateresult: {:?}");
for (validation_result, tx_hash, tx) in validation_results {
match validation_result {
Ok(Err(TransactionValidityError::Invalid(_))) => {
invalid_hashes.push(tx_hash);
},
Ok(Ok(validity)) => {
revalidated.insert(
tx_hash,
ValidatedTransaction::valid_at(
self.at.number.saturated_into::<u64>(),
tx_hash,
tx.source.clone(),
tx.data.clone(),
api.hash_and_length(&tx.data).1,
validity,
),
);
},
Ok(Err(TransactionValidityError::Unknown(e))) => {
log::trace!(
target: LOG_TARGET,
"[{:?}]: Removing. Cannot determine transaction validity: {:?}",
tx_hash,
e
);
invalid_hashes.push(tx_hash);
},
Err(validation_err) => {
log::trace!(
target: LOG_TARGET,
"[{:?}]: Removing due to error during revalidation: {}",
tx_hash,
validation_err
);
invalid_hashes.push(tx_hash);
},
}
}
log::trace!(target:LOG_TARGET, "view::revalidate: sending revalidation result at {}", self.at.hash);
if let Err(e) = revalidation_result_tx
.send(RevalidationResult { invalid_hashes, revalidated })
.await
{
log::trace!(target:LOG_TARGET, "view::revalidate: sending revalidation_result at {} failed {:?}", self.at.hash, e);
}
}
pub(super) async fn start_background_revalidation(
view: Arc<Self>,
revalidation_queue: Arc<
super::revalidation_worker::RevalidationQueue<ChainApi, ChainApi::Block>,
>,
) {
log::trace!(target:LOG_TARGET,"view::start_background_revalidation: at {}", view.at.hash);
let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
tokio::sync::mpsc::channel(1);
let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
finish_revalidation_request_rx,
revalidation_result_tx,
);
let finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
finish_revalidation_request_tx,
revalidation_result_rx,
);
*view.revalidation_worker_channels.lock() = Some(finish_revalidation_local_channels);
revalidation_queue
.revalidate_view(view.clone(), finish_revalidation_worker_channels)
.await;
}
pub(super) async fn finish_revalidation(&self) {
log::trace!(target:LOG_TARGET,"view::finish_revalidation: at {}", self.at.hash);
let Some(revalidation_worker_channels) = self.revalidation_worker_channels.lock().take()
else {
log::trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx");
return
};
let FinishRevalidationLocalChannels {
finish_revalidation_request_tx,
mut revalidation_result_rx,
} = revalidation_worker_channels;
if let Some(finish_revalidation_request_tx) = finish_revalidation_request_tx {
if let Err(e) = finish_revalidation_request_tx.send(()).await {
log::trace!(target:LOG_TARGET, "view::finish_revalidation: sending cancellation request at {} failed {:?}", self.at.hash, e);
}
}
if let Some(revalidation_result) = revalidation_result_rx.recv().await {
let start = Instant::now();
let revalidated_len = revalidation_result.revalidated.len();
let validated_pool = self.pool.validated_pool();
validated_pool.remove_invalid(&revalidation_result.invalid_hashes);
if revalidated_len > 0 {
self.pool.resubmit(revalidation_result.revalidated);
}
self.metrics.report(|metrics| {
let _ = (
revalidation_result
.invalid_hashes
.len()
.try_into()
.map(|v| metrics.view_revalidation_invalid_txs.inc_by(v)),
revalidated_len
.try_into()
.map(|v| metrics.view_revalidation_resubmitted_txs.inc_by(v)),
);
});
log::debug!(
target:LOG_TARGET,
"view::finish_revalidation: applying revalidation result invalid: {} revalidated: {} at {:?} took {:?}",
revalidation_result.invalid_hashes.len(),
revalidated_len,
self.at.hash,
start.elapsed()
);
}
}
pub(super) fn is_imported(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
const IGNORE_BANNED: bool = false;
self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
}
}