1use super::{
22 import_notification_sink::MultiViewImportNotificationSink,
23 multi_view_listener::{MultiViewListener, TxStatusStream},
24 view::{View, ViewPoolObserver},
25};
26use crate::{
27 fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController,
28 graph::{
29 self,
30 base_pool::{TimedTransactionSource, Transaction},
31 BaseSubmitOutcome, BlockHash, ExtrinsicFor, ExtrinsicHash, TransactionFor,
32 ValidatedPoolSubmitOutcome,
33 },
34 ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
35};
36use itertools::Itertools;
37use parking_lot::RwLock;
38use sc_transaction_pool_api::{
39 error::Error as PoolError, PoolStatus, TransactionTag as Tag, TxInvalidityReportMap,
40};
41use sp_blockchain::{HashAndNumber, TreeRoute};
42use sp_runtime::{
43 generic::BlockId,
44 traits::{Block as BlockT, Header, One, Saturating},
45 transaction_validity::{InvalidTransaction, TransactionValidityError},
46};
47use std::{
48 collections::{hash_map::Entry, HashMap, HashSet},
49 sync::Arc,
50 time::Instant,
51};
52use tracing::{debug, instrument, trace, warn, Level};
53
54#[derive(Clone)]
57struct PendingTxSubmission<ChainApi>
58where
59 ChainApi: graph::ChainApi,
60{
61 xt: ExtrinsicFor<ChainApi>,
63 source: TimedTransactionSource,
65}
66
67type RemovalCallback<ChainApi> = Arc<
70 dyn Fn(
71 &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
72 ExtrinsicHash<ChainApi>,
73 ) + Send
74 + Sync,
75>;
76
77struct PendingTxRemoval<ChainApi>
80where
81 ChainApi: graph::ChainApi,
82{
83 xt_hash: ExtrinsicHash<ChainApi>,
85 listener_action: RemovalCallback<ChainApi>,
87}
88
89enum PreInsertAction<ChainApi>
92where
93 ChainApi: graph::ChainApi,
94{
95 SubmitTx(PendingTxSubmission<ChainApi>),
98
99 RemoveSubtree(PendingTxRemoval<ChainApi>),
101}
102
103struct PendingPreInsertTask<ChainApi>
106where
107 ChainApi: graph::ChainApi,
108{
109 action: PreInsertAction<ChainApi>,
111 processed: bool,
114}
115
116impl<ChainApi> PendingPreInsertTask<ChainApi>
117where
118 ChainApi: graph::ChainApi,
119{
120 fn new_submission_action(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource) -> Self {
122 Self {
123 processed: false,
124 action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source }),
125 }
126 }
127
128 fn new_removal_action(
130 xt_hash: ExtrinsicHash<ChainApi>,
131 listener: RemovalCallback<ChainApi>,
132 ) -> Self {
133 Self {
134 processed: false,
135 action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
136 xt_hash,
137 listener_action: listener,
138 }),
139 }
140 }
141
142 fn mark_processed(&mut self) {
145 self.processed = true;
146 }
147}
148
149pub(super) struct ViewStore<ChainApi, Block>
151where
152 Block: BlockT,
153 ChainApi: graph::ChainApi<Block = Block>,
154{
155 pub(super) api: Arc<ChainApi>,
157 pub(super) active_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
161 pub(super) inactive_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
166 pub(super) listener: Arc<MultiViewListener<ChainApi>>,
170 pub(super) most_recent_view: RwLock<Option<Arc<View<ChainApi>>>>,
173 pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
175 pub(super) import_notification_sink:
178 MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
179 pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
185}
186
187pub(super) type ViewStoreSubmitOutcome<ChainApi> =
189 BaseSubmitOutcome<ChainApi, TxStatusStream<ChainApi>>;
190
191impl<ChainApi: graph::ChainApi> From<ValidatedPoolSubmitOutcome<ChainApi>>
192 for ViewStoreSubmitOutcome<ChainApi>
193{
194 fn from(value: ValidatedPoolSubmitOutcome<ChainApi>) -> Self {
195 Self::new(value.hash(), value.priority())
196 }
197}
198
199impl<ChainApi, Block> ViewStore<ChainApi, Block>
200where
201 Block: BlockT,
202 ChainApi: graph::ChainApi<Block = Block> + 'static,
203 <Block as BlockT>::Hash: Unpin,
204{
205 pub(super) fn new(
207 api: Arc<ChainApi>,
208 listener: Arc<MultiViewListener<ChainApi>>,
209 dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
210 import_notification_sink: MultiViewImportNotificationSink<
211 Block::Hash,
212 ExtrinsicHash<ChainApi>,
213 >,
214 ) -> Self {
215 Self {
216 api,
217 active_views: Default::default(),
218 inactive_views: Default::default(),
219 listener,
220 most_recent_view: RwLock::from(None),
221 dropped_stream_controller,
222 import_notification_sink,
223 pending_txs_tasks: Default::default(),
224 }
225 }
226
227 pub(super) async fn submit(
229 &self,
230 xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
231 ) -> HashMap<Block::Hash, Vec<Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error>>> {
232 let submit_futures = {
233 let active_views = self.active_views.read();
234 active_views
235 .values()
236 .map(|view| {
237 let view = view.clone();
238 let xts = xts.clone();
239 async move {
240 (
241 view.at.hash,
242 view.submit_many(xts, ValidateTransactionPriority::Submitted)
243 .await
244 .into_iter()
245 .map(|r| r.map(Into::into))
246 .collect::<Vec<_>>(),
247 )
248 }
249 })
250 .collect::<Vec<_>>()
251 };
252 let results = futures::future::join_all(submit_futures).await;
253
254 HashMap::<_, _>::from_iter(results.into_iter())
255 }
256
257 pub(super) fn submit_local(
259 &self,
260 xt: ExtrinsicFor<ChainApi>,
261 ) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
262 let active_views = self.active_views.read().values().cloned().collect::<Vec<_>>();
263
264 let tx_hash = self.api.hash_and_length(&xt).0;
265
266 let result = active_views
267 .iter()
268 .map(|view| view.submit_local(xt.clone()))
269 .find_or_first(Result::is_ok);
270
271 match result {
272 Some(Err(error)) => {
273 trace!(
274 target: LOG_TARGET,
275 ?tx_hash,
276 %error,
277 "submit_local failed"
278 );
279 Err(error)
280 },
281 None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
282 Some(Ok(r)) => Ok(r.into()),
283 }
284 }
285
286 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::sumbit_and_watch")]
294 pub(super) async fn submit_and_watch(
295 &self,
296 _at: Block::Hash,
297 source: TimedTransactionSource,
298 xt: ExtrinsicFor<ChainApi>,
299 ) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
300 let tx_hash = self.api.hash_and_length(&xt).0;
301 let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
302 return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
303 };
304 let submit_futures = {
305 let active_views = self.active_views.read();
306 active_views
307 .values()
308 .map(|view| {
309 let view = view.clone();
310 let xt = xt.clone();
311 let source = source.clone();
312 async move {
313 view.submit_one(source, xt, ValidateTransactionPriority::Submitted).await
314 }
315 })
316 .collect::<Vec<_>>()
317 };
318 let result = futures::future::join_all(submit_futures)
319 .await
320 .into_iter()
321 .find_or_first(Result::is_ok);
322
323 match result {
324 Some(Err(error)) => {
325 trace!(
326 target: LOG_TARGET,
327 ?tx_hash,
328 %error,
329 "submit_and_watch failed"
330 );
331 return Err(error);
332 },
333 Some(Ok(result)) =>
334 Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
335 None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
336 }
337 }
338
339 pub(super) fn status(&self) -> HashMap<Block::Hash, PoolStatus> {
341 self.active_views.read().iter().map(|(h, v)| (*h, v.status())).collect()
342 }
343
344 pub(super) fn is_empty(&self) -> bool {
346 self.active_views.read().is_empty() && self.inactive_views.read().is_empty()
347 }
348
349 pub(super) fn find_view_descendent_up_to_number(
355 &self,
356 at: &HashAndNumber<Block>,
357 up_to: <<Block as BlockT>::Header as Header>::Number,
358 ) -> Option<(Arc<View<ChainApi>>, Vec<Block::Hash>)> {
359 let mut enacted_blocks = Vec::new();
360 let mut at_hash = at.hash;
361 let mut at_number = at.number;
362
363 while at_number >= up_to {
366 if let Some((view, _)) = self.get_view_at(at_hash, true) {
368 return Some((view, enacted_blocks));
369 }
370
371 enacted_blocks.push(at_hash);
372
373 let header = self.api.block_header(at_hash).ok().flatten()?;
375 at_hash = *header.parent_hash();
376 at_number = at_number.saturating_sub(One::one());
377 }
378
379 None
380 }
381
382 pub(super) fn find_best_view(
396 &self,
397 tree_route: &TreeRoute<Block>,
398 ) -> Option<Arc<View<ChainApi>>> {
399 let active_views = self.active_views.read();
400 let best_view = {
401 tree_route
402 .retracted()
403 .iter()
404 .chain(std::iter::once(tree_route.common_block()))
405 .chain(tree_route.enacted().iter())
406 .rev()
407 .find(|block| active_views.contains_key(&block.hash))
408 };
409 best_view.map(|h| {
410 active_views
411 .get(&h.hash)
412 .expect("hash was just found in the map's keys. qed")
413 .clone()
414 })
415 }
416
417 pub(super) fn ready(&self) -> ReadyIteratorFor<ChainApi> {
422 let ready_iterator =
423 self.most_recent_view.read().as_ref().map(|v| v.pool.validated_pool().ready());
424
425 if let Some(ready_iterator) = ready_iterator {
426 return Box::new(ready_iterator)
427 } else {
428 return Box::new(std::iter::empty())
429 }
430 }
431
432 pub(super) fn futures(
437 &self,
438 ) -> Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>> {
439 self.most_recent_view
440 .read()
441 .as_ref()
442 .and_then(|view| self.futures_at(view.at.hash))
443 .unwrap_or_default()
444 }
445
446 pub(super) fn futures_at(
448 &self,
449 at: Block::Hash,
450 ) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
451 self.get_view_at(at, true)
452 .map(|(v, _)| v.pool.validated_pool().pool.read().futures().cloned().collect())
453 }
454
455 pub(super) async fn finalize_route(
462 &self,
463 finalized_hash: Block::Hash,
464 tree_route: &[Block::Hash],
465 ) -> Vec<ExtrinsicHash<ChainApi>> {
466 debug!(
467 target: LOG_TARGET,
468 ?finalized_hash,
469 ?tree_route,
470 "finalize_route"
471 );
472 let mut finalized_transactions = Vec::new();
473
474 for block in tree_route.iter().chain(std::iter::once(&finalized_hash)) {
475 let extrinsics = self
476 .api
477 .block_body(*block)
478 .await
479 .unwrap_or_else(|error| {
480 warn!(
481 target: LOG_TARGET,
482 %error,
483 "Finalize route: error request"
484 );
485 None
486 })
487 .unwrap_or_default()
488 .iter()
489 .map(|e| self.api.hash_and_length(&e).0)
490 .collect::<Vec<_>>();
491
492 extrinsics
493 .iter()
494 .enumerate()
495 .for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i));
496
497 finalized_transactions.extend(extrinsics);
498 }
499
500 debug!(
501 target: LOG_TARGET,
502 "finalize_route: done"
503 );
504 finalized_transactions
505 }
506
507 pub(super) fn ready_transaction(
512 &self,
513 at: Block::Hash,
514 tx_hash: &ExtrinsicHash<ChainApi>,
515 ) -> Option<TransactionFor<ChainApi>> {
516 self.active_views
517 .read()
518 .get(&at)
519 .and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash))
520 }
521
522 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::insert_new_view")]
527 pub(super) async fn insert_new_view(
528 &self,
529 view: Arc<View<ChainApi>>,
530 tree_route: &TreeRoute<Block>,
531 ) {
532 self.apply_pending_tx_replacements(view.clone()).await;
533
534 let start = Instant::now();
535 self.insert_new_view_sync(view, tree_route);
536
537 debug!(
538 target: LOG_TARGET,
539 inactive_views = ?self.inactive_views.read().keys(),
540 duration = ?start.elapsed(),
541 "insert_new_view"
542 );
543 }
544
545 pub(super) fn insert_new_view_sync(
554 &self,
555 view: Arc<View<ChainApi>>,
556 tree_route: &TreeRoute<Block>,
557 ) {
558 {
560 let mut most_recent_view_lock = self.most_recent_view.write();
561 let mut active_views = self.active_views.write();
562 let mut inactive_views = self.inactive_views.write();
563
564 std::iter::once(tree_route.common_block())
565 .chain(tree_route.enacted().iter())
566 .map(|block| block.hash)
567 .for_each(|hash| {
568 active_views.remove(&hash).map(|view| {
569 inactive_views.insert(hash, view);
570 });
571 });
572 active_views.insert(view.at.hash, view.clone());
573 most_recent_view_lock.replace(view.clone());
574 };
575 }
576
577 pub(super) fn get_view_at(
583 &self,
584 at: Block::Hash,
585 allow_inactive: bool,
586 ) -> Option<(Arc<View<ChainApi>>, bool)> {
587 if let Some(view) = self.active_views.read().get(&at) {
588 return Some((view.clone(), false));
589 }
590 if allow_inactive {
591 if let Some(view) = self.inactive_views.read().get(&at) {
592 return Some((view.clone(), true))
593 }
594 };
595 None
596 }
597
598 pub(crate) async fn handle_finalized(
612 &self,
613 finalized_hash: Block::Hash,
614 tree_route: &[Block::Hash],
615 ) -> Vec<ExtrinsicHash<ChainApi>> {
616 let finalized_xts = self.finalize_route(finalized_hash, tree_route).await;
617 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
618
619 let mut dropped_views = vec![];
620 {
622 let mut active_views = self.active_views.write();
623 let mut inactive_views = self.inactive_views.write();
624 active_views.retain(|hash, v| {
625 let retain = match finalized_number {
626 Err(_) | Ok(None) => *hash == finalized_hash,
627 Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
628 Ok(Some(n)) => v.at.number > n,
629 };
630 if !retain {
631 dropped_views.push(*hash);
632 }
633 retain
634 });
635
636 inactive_views.retain(|hash, v| {
637 let retain = match finalized_number {
638 Err(_) | Ok(None) => false,
639 Ok(Some(n)) => v.at.number >= n,
640 };
641 if !retain {
642 dropped_views.push(*hash);
643 }
644 retain
645 });
646
647 debug!(
648 target: LOG_TARGET,
649 inactive_views = ?inactive_views.keys(),
650 ?dropped_views,
651 "handle_finalized"
652 );
653 }
654
655 self.listener.remove_stale_controllers();
656 self.dropped_stream_controller.remove_transactions(finalized_xts.clone());
657
658 self.listener.remove_view(finalized_hash);
659 for view in dropped_views {
660 self.listener.remove_view(view);
661 self.dropped_stream_controller.remove_view(view);
662 }
663
664 finalized_xts
665 }
666
667 pub(crate) async fn finish_background_revalidations(&self) {
672 let start = Instant::now();
673 let finish_revalidation_futures = {
674 let active_views = self.active_views.read();
675 active_views
676 .values()
677 .map(|view| {
678 let view = view.clone();
679 async move { view.finish_revalidation().await }
680 })
681 .collect::<Vec<_>>()
682 };
683 debug!(
684 target: LOG_TARGET,
685 duration = ?start.elapsed(),
686 "finish_background_revalidations before"
687 );
688 futures::future::join_all(finish_revalidation_futures).await;
689 debug!(
690 target: LOG_TARGET,
691 duration = ?start.elapsed(),
692 "finish_background_revalidations after"
693 );
694 }
695
696 pub(crate) fn report_invalid(
722 &self,
723 at: Option<Block::Hash>,
724 invalid_tx_errors: TxInvalidityReportMap<ExtrinsicHash<ChainApi>>,
725 ) -> Vec<TransactionFor<ChainApi>> {
726 let mut remove_from_view = vec![];
727 let mut remove_from_pool = vec![];
728
729 invalid_tx_errors.into_iter().for_each(|(hash, e)| match e {
730 Some(TransactionValidityError::Invalid(
731 InvalidTransaction::Future | InvalidTransaction::Stale,
732 )) => {
733 remove_from_view.push(hash);
734 },
735 _ => {
736 remove_from_pool.push(hash);
737 },
738 });
739
740 at.map(|at| {
743 self.get_view_at(at, true)
744 .map(|(view, _)| view.remove_subtree(&remove_from_view, false, |_, _| {}))
745 });
746
747 let mut removed = vec![];
748 for tx_hash in &remove_from_pool {
749 let removed_from_pool = self.remove_transaction_subtree(*tx_hash, |_, _| {});
750 removed_from_pool
751 .iter()
752 .find(|tx| tx.hash == *tx_hash)
753 .map(|tx| removed.push(tx.clone()));
754 }
755
756 self.listener.transactions_invalidated(&remove_from_pool);
757
758 removed
759 }
760
761 pub(super) async fn replace_transaction(
774 &self,
775 source: TimedTransactionSource,
776 xt: ExtrinsicFor<ChainApi>,
777 replaced: ExtrinsicHash<ChainApi>,
778 ) {
779 if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
780 entry.insert(PendingPreInsertTask::new_submission_action(xt.clone(), source.clone()));
781 } else {
782 return
783 };
784
785 let tx_hash = self.api.hash_and_length(&xt).0;
786 trace!(
787 target: LOG_TARGET,
788 ?replaced,
789 ?tx_hash,
790 "replace_transaction"
791 );
792 self.replace_transaction_in_views(source, xt, tx_hash, replaced).await;
793
794 if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
795 replacement.mark_processed();
796 }
797 }
798
799 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::apply_pending_tx_replacements")]
803 async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
804 let start = Instant::now();
805 let mut futures = vec![];
806 let mut replace_count = 0;
807 let mut remove_count = 0;
808
809 for replacement in self.pending_txs_tasks.read().values() {
810 match replacement.action {
811 PreInsertAction::SubmitTx(ref submission) => {
812 replace_count += 1;
813 let xt_hash = self.api.hash_and_length(&submission.xt).0;
814 futures.push(self.replace_transaction_in_view(
815 view.clone(),
816 submission.source.clone(),
817 submission.xt.clone(),
818 xt_hash,
819 ));
820 },
821 PreInsertAction::RemoveSubtree(ref removal) => {
822 remove_count += 1;
823 view.remove_subtree(&[removal.xt_hash], true, &*removal.listener_action);
824 },
825 }
826 }
827 let _ = futures::future::join_all(futures).await;
828 self.pending_txs_tasks.write().retain(|_, r| !r.processed);
829 debug!(
830 target: LOG_TARGET,
831 at_hash = ?view.at.hash,
832 replace_count,
833 remove_count,
834 duration = ?start.elapsed(),
835 count = ?self.pending_txs_tasks.read().len(),
836 "apply_pending_tx_replacements"
837 );
838 }
839
840 async fn replace_transaction_in_view(
844 &self,
845 view: Arc<View<ChainApi>>,
846 source: TimedTransactionSource,
847 xt: ExtrinsicFor<ChainApi>,
848 tx_hash: ExtrinsicHash<ChainApi>,
849 ) {
850 if let Err(error) =
851 view.submit_one(source, xt, ValidateTransactionPriority::Maintained).await
852 {
853 trace!(
854 target: LOG_TARGET,
855 ?tx_hash,
856 at_hash = ?view.at.hash,
857 %error,
858 "replace_transaction: submit failed"
859 );
860 }
861 }
862
863 async fn replace_transaction_in_views(
868 &self,
869 source: TimedTransactionSource,
870 xt: ExtrinsicFor<ChainApi>,
871 tx_hash: ExtrinsicHash<ChainApi>,
872 replaced: ExtrinsicHash<ChainApi>,
873 ) {
874 let submit_futures = {
875 let active_views = self.active_views.read();
876 let inactive_views = self.inactive_views.read();
877 active_views
878 .iter()
879 .chain(inactive_views.iter())
880 .filter(|(_, view)| view.is_imported(&replaced))
881 .map(|(_, view)| {
882 self.replace_transaction_in_view(
883 view.clone(),
884 source.clone(),
885 xt.clone(),
886 tx_hash,
887 )
888 })
889 .collect::<Vec<_>>()
890 };
891 let _results = futures::future::join_all(submit_futures).await;
892 }
893
894 pub(super) fn remove_transaction_subtree<F>(
911 &self,
912 xt_hash: ExtrinsicHash<ChainApi>,
913 listener_action: F,
914 ) -> Vec<TransactionFor<ChainApi>>
915 where
916 F: Fn(
917 &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
918 ExtrinsicHash<ChainApi>,
919 ) + Clone
920 + Send
921 + Sync
922 + 'static,
923 {
924 if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
925 entry.insert(PendingPreInsertTask::new_removal_action(
926 xt_hash,
927 Arc::from(listener_action.clone()),
928 ));
929 };
930
931 let mut seen = HashSet::new();
932
933 let removed = self
934 .active_views
935 .read()
936 .iter()
937 .chain(self.inactive_views.read().iter())
938 .filter(|(_, view)| view.is_imported(&xt_hash))
939 .flat_map(|(_, view)| view.remove_subtree(&[xt_hash], true, &listener_action))
940 .filter_map(|xt| seen.insert(xt.hash).then(|| xt.clone()))
941 .collect();
942
943 if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
944 removal_action.mark_processed();
945 }
946
947 removed
948 }
949
950 pub(crate) fn finality_stall_view_cleanup(&self, at: &HashAndNumber<Block>, threshold: usize) {
960 let mut dropped_views = vec![];
961 {
962 let mut active_views = self.active_views.write();
963 let mut inactive_views = self.inactive_views.write();
964 let mut f = |hash: &BlockHash<ChainApi>, v: &View<ChainApi>| -> bool {
965 let diff = at.number.saturating_sub(v.at.number);
966 if diff.into() > threshold.into() {
967 dropped_views.push(*hash);
968 false
969 } else {
970 true
971 }
972 };
973
974 active_views.retain(|h, v| f(h, v));
975 inactive_views.retain(|h, v| f(h, v));
976 }
977
978 if !dropped_views.is_empty() {
979 for view in dropped_views {
980 self.listener.remove_view(view);
981 self.dropped_stream_controller.remove_view(view);
982 }
983 }
984 }
985
986 pub(crate) fn provides_tags_from_inactive_views(
989 &self,
990 block_hashes: Vec<&HashAndNumber<Block>>,
991 mut xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
992 ) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
993 let mut provides_tags_map = HashMap::new();
994
995 block_hashes.into_iter().for_each(|hn| {
996 if let Some((view, _)) = self.get_view_at(hn.hash, true) {
998 let provides_tags = view.pool.validated_pool().extrinsics_tags(&xts_hashes);
999 let xts_provides_tags = xts_hashes
1000 .iter()
1001 .zip(provides_tags.into_iter())
1002 .filter_map(|(hash, maybe_tags)| maybe_tags.map(|tags| (*hash, tags)))
1003 .collect::<HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>>>();
1004
1005 xts_hashes.retain(|xth| !xts_provides_tags.contains_key(xth));
1007
1008 provides_tags_map.extend(xts_provides_tags);
1010 }
1011 });
1012
1013 provides_tags_map
1014 }
1015}