use super::{
multi_view_listener::{MultiViewListener, TxStatusStream},
view::View,
};
use crate::{
fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController,
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
ExtrinsicFor, ExtrinsicHash, TransactionFor,
},
ReadyIteratorFor, LOG_TARGET,
};
use futures::prelude::*;
use itertools::Itertools;
use parking_lot::RwLock;
use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
use sp_blockchain::TreeRoute;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
time::Instant,
};
#[derive(Clone)]
struct PendingTxReplacement<ChainApi>
where
ChainApi: graph::ChainApi,
{
processed: bool,
xt: ExtrinsicFor<ChainApi>,
source: TimedTransactionSource,
watched: bool,
}
impl<ChainApi> PendingTxReplacement<ChainApi>
where
ChainApi: graph::ChainApi,
{
fn new(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource, watched: bool) -> Self {
Self { processed: false, xt, source, watched }
}
}
pub(super) struct ViewStore<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block>,
{
pub(super) api: Arc<ChainApi>,
pub(super) active_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
pub(super) inactive_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
pub(super) listener: Arc<MultiViewListener<ChainApi>>,
pub(super) most_recent_view: RwLock<Option<Block::Hash>>,
pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
pending_txs_replacements:
RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingTxReplacement<ChainApi>>>,
}
impl<ChainApi, Block> ViewStore<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
) -> Self {
Self {
api,
active_views: Default::default(),
inactive_views: Default::default(),
listener,
most_recent_view: RwLock::from(None),
dropped_stream_controller,
pending_txs_replacements: Default::default(),
}
}
pub(super) async fn submit(
&self,
xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
) -> HashMap<Block::Hash, Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>>> {
let submit_futures = {
let active_views = self.active_views.read();
active_views
.iter()
.map(|(_, view)| {
let view = view.clone();
let xts = xts.clone();
async move { (view.at.hash, view.submit_many(xts).await) }
})
.collect::<Vec<_>>()
};
let results = futures::future::join_all(submit_futures).await;
HashMap::<_, _>::from_iter(results.into_iter())
}
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let active_views = self
.active_views
.read()
.iter()
.map(|(_, view)| view.clone())
.collect::<Vec<_>>();
let tx_hash = self.api.hash_and_length(&xt).0;
let result = active_views
.iter()
.map(|view| view.submit_local(xt.clone()))
.find_or_first(Result::is_ok);
if let Some(Err(err)) = result {
log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
return Err(err)
};
Ok(tx_hash)
}
pub(super) async fn submit_and_watch(
&self,
_at: Block::Hash,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<TxStatusStream<ChainApi>, ChainApi::Error> {
let tx_hash = self.api.hash_and_length(&xt).0;
let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
};
let submit_and_watch_futures = {
let active_views = self.active_views.read();
active_views
.iter()
.map(|(_, view)| {
let view = view.clone();
let xt = xt.clone();
let source = source.clone();
async move {
match view.submit_and_watch(source, xt).await {
Ok(watcher) => {
self.listener.add_view_watcher_for_tx(
tx_hash,
view.at.hash,
watcher.into_stream().boxed(),
);
Ok(())
},
Err(e) => Err(e),
}
}
})
.collect::<Vec<_>>()
};
let maybe_error = futures::future::join_all(submit_and_watch_futures)
.await
.into_iter()
.find_or_first(Result::is_ok);
if let Some(Err(err)) = maybe_error {
log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
return Err(err);
};
Ok(external_watcher)
}
pub(super) fn status(&self) -> HashMap<Block::Hash, PoolStatus> {
self.active_views.read().iter().map(|(h, v)| (*h, v.status())).collect()
}
pub(super) fn is_empty(&self) -> bool {
self.active_views.read().is_empty() && self.inactive_views.read().is_empty()
}
pub(super) fn find_best_view(
&self,
tree_route: &TreeRoute<Block>,
) -> Option<Arc<View<ChainApi>>> {
let active_views = self.active_views.read();
let best_view = {
tree_route
.retracted()
.iter()
.chain(std::iter::once(tree_route.common_block()))
.chain(tree_route.enacted().iter())
.rev()
.find(|block| active_views.contains_key(&block.hash))
};
best_view.map(|h| {
active_views
.get(&h.hash)
.expect("hash was just found in the map's keys. qed")
.clone()
})
}
pub(super) fn ready(&self) -> ReadyIteratorFor<ChainApi> {
let ready_iterator = self
.most_recent_view
.read()
.map(|at| self.get_view_at(at, true))
.flatten()
.map(|(v, _)| v.pool.validated_pool().ready());
if let Some(ready_iterator) = ready_iterator {
return Box::new(ready_iterator)
} else {
return Box::new(std::iter::empty())
}
}
pub(super) fn futures(
&self,
) -> Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>> {
self.most_recent_view
.read()
.map(|at| self.futures_at(at))
.flatten()
.unwrap_or_default()
}
pub(super) fn futures_at(
&self,
at: Block::Hash,
) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
self.get_view_at(at, true)
.map(|(v, _)| v.pool.validated_pool().pool.read().futures().cloned().collect())
}
pub(super) async fn finalize_route(
&self,
finalized_hash: Block::Hash,
tree_route: &[Block::Hash],
) -> Vec<ExtrinsicHash<ChainApi>> {
log::trace!(target: LOG_TARGET, "finalize_route finalized_hash:{finalized_hash:?} tree_route: {tree_route:?}");
let mut finalized_transactions = Vec::new();
for block in tree_route.iter().chain(std::iter::once(&finalized_hash)) {
let extrinsics = self
.api
.block_body(*block)
.await
.unwrap_or_else(|e| {
log::warn!(target: LOG_TARGET, "Finalize route: error request: {}", e);
None
})
.unwrap_or_default()
.iter()
.map(|e| self.api.hash_and_length(&e).0)
.collect::<Vec<_>>();
extrinsics
.iter()
.enumerate()
.for_each(|(i, tx_hash)| self.listener.finalize_transaction(*tx_hash, *block, i));
finalized_transactions.extend(extrinsics);
}
finalized_transactions
}
pub(super) fn ready_transaction(
&self,
at: Block::Hash,
tx_hash: &ExtrinsicHash<ChainApi>,
) -> Option<TransactionFor<ChainApi>> {
self.active_views
.read()
.get(&at)
.and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash))
}
pub(super) async fn insert_new_view(
&self,
view: Arc<View<ChainApi>>,
tree_route: &TreeRoute<Block>,
) {
self.apply_pending_tx_replacements(view.clone()).await;
{
let mut most_recent_view_lock = self.most_recent_view.write();
let mut active_views = self.active_views.write();
let mut inactive_views = self.inactive_views.write();
std::iter::once(tree_route.common_block())
.chain(tree_route.enacted().iter())
.map(|block| block.hash)
.for_each(|hash| {
active_views.remove(&hash).map(|view| {
inactive_views.insert(hash, view);
});
});
active_views.insert(view.at.hash, view.clone());
most_recent_view_lock.replace(view.at.hash);
};
log::trace!(target:LOG_TARGET,"insert_new_view: inactive_views: {:?}", self.inactive_views.read().keys());
}
pub(super) fn get_view_at(
&self,
at: Block::Hash,
allow_inactive: bool,
) -> Option<(Arc<View<ChainApi>>, bool)> {
if let Some(view) = self.active_views.read().get(&at) {
return Some((view.clone(), false));
}
if allow_inactive {
if let Some(view) = self.inactive_views.read().get(&at) {
return Some((view.clone(), true))
}
};
None
}
pub(crate) async fn handle_pre_finalized(&self, finalized_hash: Block::Hash) {
let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
let mut removed_views = vec![];
{
let active_views = self.active_views.read();
let inactive_views = self.inactive_views.read();
active_views
.iter()
.filter(|(hash, v)| !match finalized_number {
Err(_) | Ok(None) => **hash == finalized_hash,
Ok(Some(n)) if v.at.number == n => **hash == finalized_hash,
Ok(Some(n)) => v.at.number > n,
})
.map(|(_, v)| removed_views.push(v.at.hash))
.for_each(drop);
inactive_views
.iter()
.filter(|(_, v)| !match finalized_number {
Err(_) | Ok(None) => false,
Ok(Some(n)) => v.at.number >= n,
})
.map(|(_, v)| removed_views.push(v.at.hash))
.for_each(drop);
}
log::trace!(target:LOG_TARGET,"handle_pre_finalized: removed_views: {:?}", removed_views);
removed_views.iter().for_each(|view| {
self.dropped_stream_controller.remove_view(*view);
});
}
pub(crate) async fn handle_finalized(
&self,
finalized_hash: Block::Hash,
tree_route: &[Block::Hash],
) -> Vec<ExtrinsicHash<ChainApi>> {
let finalized_xts = self.finalize_route(finalized_hash, tree_route).await;
let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
let mut dropped_views = vec![];
{
let mut active_views = self.active_views.write();
let mut inactive_views = self.inactive_views.write();
active_views.retain(|hash, v| {
let retain = match finalized_number {
Err(_) | Ok(None) => *hash == finalized_hash,
Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
Ok(Some(n)) => v.at.number > n,
};
if !retain {
dropped_views.push(*hash);
}
retain
});
inactive_views.retain(|hash, v| {
let retain = match finalized_number {
Err(_) | Ok(None) => false,
Ok(Some(n)) => v.at.number >= n,
};
if !retain {
dropped_views.push(*hash);
}
retain
});
log::trace!(target:LOG_TARGET,"handle_finalized: inactive_views: {:?}", inactive_views.keys());
}
log::trace!(target:LOG_TARGET,"handle_finalized: dropped_views: {:?}", dropped_views);
self.listener.remove_stale_controllers();
self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone());
self.listener.remove_view(finalized_hash);
for view in dropped_views {
self.listener.remove_view(view);
self.dropped_stream_controller.remove_view(view);
}
finalized_xts
}
pub(crate) async fn finish_background_revalidations(&self) {
let start = Instant::now();
let finish_revalidation_futures = {
let active_views = self.active_views.read();
active_views
.iter()
.map(|(_, view)| {
let view = view.clone();
async move { view.finish_revalidation().await }
})
.collect::<Vec<_>>()
};
futures::future::join_all(finish_revalidation_futures).await;
log::trace!(target:LOG_TARGET,"finish_background_revalidations took {:?}", start.elapsed());
}
pub(super) async fn replace_transaction(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
replaced: ExtrinsicHash<ChainApi>,
watched: bool,
) {
if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) {
entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched));
} else {
return
};
let xt_hash = self.api.hash_and_length(&xt).0;
log::trace!(target:LOG_TARGET,"[{replaced:?}] replace_transaction wtih {xt_hash:?}, w:{watched}");
self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await;
if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) {
replacement.processed = true;
}
}
async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
let mut futures = vec![];
for replacement in self.pending_txs_replacements.read().values() {
let xt_hash = self.api.hash_and_length(&replacement.xt).0;
futures.push(self.replace_transaction_in_view(
view.clone(),
replacement.source.clone(),
replacement.xt.clone(),
xt_hash,
replacement.watched,
));
}
let _results = futures::future::join_all(futures).await;
self.pending_txs_replacements.write().retain(|_, r| r.processed);
}
async fn replace_transaction_in_view(
&self,
view: Arc<View<ChainApi>>,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
xt_hash: ExtrinsicHash<ChainApi>,
watched: bool,
) {
if watched {
match view.submit_and_watch(source, xt).await {
Ok(watcher) => {
self.listener.add_view_watcher_for_tx(
xt_hash,
view.at.hash,
watcher.into_stream().boxed(),
);
},
Err(e) => {
log::trace!(
target:LOG_TARGET,
"[{:?}] replace_transaction: submit_and_watch to {} failed {}",
xt_hash, view.at.hash, e
);
},
}
} else {
if let Some(Err(e)) = view.submit_many(std::iter::once((source, xt))).await.pop() {
log::trace!(
target:LOG_TARGET,
"[{:?}] replace_transaction: submit to {} failed {}",
xt_hash, view.at.hash, e
);
}
}
}
async fn replace_transaction_in_views(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
xt_hash: ExtrinsicHash<ChainApi>,
replaced: ExtrinsicHash<ChainApi>,
watched: bool,
) {
if watched && !self.listener.contains_tx(&xt_hash) {
log::trace!(
target:LOG_TARGET,
"error: replace_transaction_in_views: no listener for watched transaction {:?}",
xt_hash,
);
return;
}
let submit_futures = {
let active_views = self.active_views.read();
let inactive_views = self.inactive_views.read();
active_views
.iter()
.chain(inactive_views.iter())
.filter(|(_, view)| view.is_imported(&replaced))
.map(|(_, view)| {
self.replace_transaction_in_view(
view.clone(),
source.clone(),
xt.clone(),
xt_hash,
watched,
)
})
.collect::<Vec<_>>()
};
let _results = futures::future::join_all(submit_futures).await;
}
}