1#![recursion_limit = "256"]
22#![warn(missing_docs)]
23#![warn(unused_extern_crates)]
24
25mod api;
26mod enactment_state;
27pub mod error;
28mod graph;
29mod metrics;
30mod revalidation;
31#[cfg(test)]
32mod tests;
33
34pub use crate::api::FullChainApi;
35use async_trait::async_trait;
36use enactment_state::{EnactmentAction, EnactmentState};
37use futures::{
38 channel::oneshot,
39 future::{self, ready},
40 prelude::*,
41};
42pub use graph::{
43 base_pool::Limit as PoolLimit, ChainApi, Options, Pool, Transaction, ValidatedTransaction,
44};
45use parking_lot::Mutex;
46use std::{
47 collections::{HashMap, HashSet},
48 pin::Pin,
49 sync::Arc,
50};
51
52use graph::{ExtrinsicHash, IsValidator};
53use sc_transaction_pool_api::{
54 error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
55 PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, TransactionSource,
56 TransactionStatusStreamFor, TxHash,
57};
58use sp_core::traits::SpawnEssentialNamed;
59use sp_runtime::{
60 generic::BlockId,
61 traits::{AtLeast32Bit, Block as BlockT, Extrinsic, Header as HeaderT, NumberFor, Zero},
62};
63use std::time::Instant;
64
65use crate::metrics::MetricsLink as PrometheusMetrics;
66use prometheus_endpoint::Registry as PrometheusRegistry;
67
68use sp_blockchain::{HashAndNumber, TreeRoute};
69
70pub(crate) const LOG_TARGET: &str = "txpool";
71
72type BoxedReadyIterator<Hash, Data> =
73 Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
74
75type ReadyIteratorFor<PoolApi> =
76 BoxedReadyIterator<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
77
78type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output = ReadyIteratorFor<PoolApi>> + Send>>;
79
80pub type FullPool<Block, Client> = BasicPool<FullChainApi<Client, Block>, Block>;
82
83pub struct BasicPool<PoolApi, Block>
85where
86 Block: BlockT,
87 PoolApi: graph::ChainApi<Block = Block>,
88{
89 pool: Arc<graph::Pool<PoolApi>>,
90 api: Arc<PoolApi>,
91 revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
92 revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
93 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
94 metrics: PrometheusMetrics,
95 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
96}
97
98struct ReadyPoll<T, Block: BlockT> {
99 updated_at: NumberFor<Block>,
100 pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
101}
102
103impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
104 fn default() -> Self {
105 Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
106 }
107}
108
109impl<T, Block: BlockT> ReadyPoll<T, Block> {
110 fn new(best_block_number: NumberFor<Block>) -> Self {
111 Self { updated_at: best_block_number, pollers: Default::default() }
112 }
113
114 fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
115 self.updated_at = number;
116
117 let mut idx = 0;
118 while idx < self.pollers.len() {
119 if self.pollers[idx].0 <= number {
120 let poller_sender = self.pollers.swap_remove(idx);
121 log::debug!(target: LOG_TARGET, "Sending ready signal at block {}", number);
122 let _ = poller_sender.1.send(iterator_factory());
123 } else {
124 idx += 1;
125 }
126 }
127 }
128
129 fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
130 let (sender, receiver) = oneshot::channel();
131 self.pollers.push((number, sender));
132 receiver
133 }
134
135 fn updated_at(&self) -> NumberFor<Block> {
136 self.updated_at
137 }
138}
139
140pub enum RevalidationType {
142 Light,
149
150 Full,
155}
156
157impl<PoolApi, Block> BasicPool<PoolApi, Block>
158where
159 Block: BlockT,
160 PoolApi: graph::ChainApi<Block = Block> + 'static,
161{
162 pub fn new_test(
164 pool_api: Arc<PoolApi>,
165 best_block_hash: Block::Hash,
166 finalized_hash: Block::Hash,
167 options: graph::Options,
168 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
169 let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
170 let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
171 pool_api.clone(),
172 pool.clone(),
173 finalized_hash,
174 );
175 (
176 Self {
177 api: pool_api,
178 pool,
179 revalidation_queue: Arc::new(revalidation_queue),
180 revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
181 ready_poll: Default::default(),
182 metrics: Default::default(),
183 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
184 best_block_hash,
185 finalized_hash,
186 ))),
187 },
188 background_task,
189 )
190 }
191
192 pub fn with_revalidation_type(
195 options: graph::Options,
196 is_validator: IsValidator,
197 pool_api: Arc<PoolApi>,
198 prometheus: Option<&PrometheusRegistry>,
199 revalidation_type: RevalidationType,
200 spawner: impl SpawnEssentialNamed,
201 best_block_number: NumberFor<Block>,
202 best_block_hash: Block::Hash,
203 finalized_hash: Block::Hash,
204 ) -> Self {
205 let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
206 let (revalidation_queue, background_task) = match revalidation_type {
207 RevalidationType::Light =>
208 (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
209 RevalidationType::Full => {
210 let (queue, background) = revalidation::RevalidationQueue::new_background(
211 pool_api.clone(),
212 pool.clone(),
213 finalized_hash,
214 );
215 (queue, Some(background))
216 },
217 };
218
219 if let Some(background_task) = background_task {
220 spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
221 }
222
223 Self {
224 api: pool_api,
225 pool,
226 revalidation_queue: Arc::new(revalidation_queue),
227 revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
228 RevalidationType::Light =>
229 RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
230 RevalidationType::Full => RevalidationStrategy::Always,
231 })),
232 ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
233 metrics: PrometheusMetrics::new(prometheus),
234 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
235 best_block_hash,
236 finalized_hash,
237 ))),
238 }
239 }
240
241 pub fn pool(&self) -> &Arc<graph::Pool<PoolApi>> {
243 &self.pool
244 }
245
246 pub fn api(&self) -> &PoolApi {
248 &self.api
249 }
250}
251
252impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
253where
254 Block: BlockT,
255 PoolApi: 'static + graph::ChainApi<Block = Block>,
256{
257 type Block = PoolApi::Block;
258 type Hash = graph::ExtrinsicHash<PoolApi>;
259 type InPoolTransaction = graph::base_pool::Transaction<TxHash<Self>, TransactionFor<Self>>;
260 type Error = PoolApi::Error;
261
262 fn submit_at(
263 &self,
264 at: <Self::Block as BlockT>::Hash,
265 source: TransactionSource,
266 xts: Vec<TransactionFor<Self>>,
267 ) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
268 let pool = self.pool.clone();
269
270 self.metrics
271 .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
272
273 async move { pool.submit_at(at, source, xts).await }.boxed()
274 }
275
276 fn submit_one(
277 &self,
278 at: <Self::Block as BlockT>::Hash,
279 source: TransactionSource,
280 xt: TransactionFor<Self>,
281 ) -> PoolFuture<TxHash<Self>, Self::Error> {
282 let pool = self.pool.clone();
283
284 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
285
286 async move { pool.submit_one(at, source, xt).await }.boxed()
287 }
288
289 fn submit_and_watch(
290 &self,
291 at: <Self::Block as BlockT>::Hash,
292 source: TransactionSource,
293 xt: TransactionFor<Self>,
294 ) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
295 let pool = self.pool.clone();
296
297 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
298
299 async move {
300 let watcher = pool.submit_and_watch(at, source, xt).await?;
301
302 Ok(watcher.into_stream().boxed())
303 }
304 .boxed()
305 }
306
307 fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
308 let removed = self.pool.validated_pool().remove_invalid(hashes);
309 self.metrics
310 .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
311 removed
312 }
313
314 fn status(&self) -> PoolStatus {
315 self.pool.validated_pool().status()
316 }
317
318 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
319 self.pool.validated_pool().import_notification_stream()
320 }
321
322 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
323 self.pool.hash_of(xt)
324 }
325
326 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
327 self.pool.validated_pool().on_broadcasted(propagations)
328 }
329
330 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
331 self.pool.validated_pool().ready_by_hash(hash)
332 }
333
334 fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
335 let status = self.status();
336 if status.ready == 0 && status.future == 0 {
341 return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
342 }
343
344 if self.ready_poll.lock().updated_at() >= at {
345 log::trace!(target: LOG_TARGET, "Transaction pool already processed block #{}", at);
346 let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
347 return async move { iterator }.boxed()
348 }
349
350 self.ready_poll
351 .lock()
352 .add(at)
353 .map(|received| {
354 received.unwrap_or_else(|e| {
355 log::warn!("Error receiving pending set: {:?}", e);
356 Box::new(std::iter::empty())
357 })
358 })
359 .boxed()
360 }
361
362 fn ready(&self) -> ReadyIteratorFor<PoolApi> {
363 Box::new(self.pool.validated_pool().ready())
364 }
365
366 fn futures(&self) -> Vec<Self::InPoolTransaction> {
367 let pool = self.pool.validated_pool().pool.read();
368
369 pool.futures().cloned().collect::<Vec<_>>()
370 }
371}
372
373impl<Block, Client> FullPool<Block, Client>
374where
375 Block: BlockT,
376 Client: sp_api::ProvideRuntimeApi<Block>
377 + sc_client_api::BlockBackend<Block>
378 + sc_client_api::blockchain::HeaderBackend<Block>
379 + sp_runtime::traits::BlockIdTo<Block>
380 + sc_client_api::ExecutorProvider<Block>
381 + sc_client_api::UsageProvider<Block>
382 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
383 + Send
384 + Sync
385 + 'static,
386 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
387{
388 pub fn new_full(
390 options: graph::Options,
391 is_validator: IsValidator,
392 prometheus: Option<&PrometheusRegistry>,
393 spawner: impl SpawnEssentialNamed,
394 client: Arc<Client>,
395 ) -> Arc<Self> {
396 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
397 let pool = Arc::new(Self::with_revalidation_type(
398 options,
399 is_validator,
400 pool_api,
401 prometheus,
402 RevalidationType::Full,
403 spawner,
404 client.usage_info().chain.best_number,
405 client.usage_info().chain.best_hash,
406 client.usage_info().chain.finalized_hash,
407 ));
408
409 pool
410 }
411}
412
413impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
414 for BasicPool<FullChainApi<Client, Block>, Block>
415where
416 Block: BlockT,
417 Client: sp_api::ProvideRuntimeApi<Block>
418 + sc_client_api::BlockBackend<Block>
419 + sc_client_api::blockchain::HeaderBackend<Block>
420 + sp_runtime::traits::BlockIdTo<Block>
421 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
422 Client: Send + Sync + 'static,
423 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
424{
425 type Block = Block;
426 type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
427 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
428
429 fn submit_local(
430 &self,
431 at: Block::Hash,
432 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
433 ) -> Result<Self::Hash, Self::Error> {
434 use sp_runtime::{
435 traits::SaturatedConversion, transaction_validity::TransactionValidityError,
436 };
437
438 let validity = self
439 .api
440 .validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
441 .map_err(|e| {
442 Self::Error::Pool(match e {
443 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
444 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
445 })
446 })?;
447
448 let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
449 let block_number = self
450 .api
451 .block_id_to_number(&BlockId::hash(at))?
452 .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
453
454 let validated = ValidatedTransaction::valid_at(
455 block_number.saturated_into::<u64>(),
456 hash,
457 TransactionSource::Local,
458 xt,
459 bytes,
460 validity,
461 );
462
463 self.pool.validated_pool().submit(vec![validated]).remove(0)
464 }
465}
466
467#[cfg_attr(test, derive(Debug))]
468enum RevalidationStatus<N> {
469 NotScheduled,
471 Scheduled(Option<Instant>, Option<N>),
473 InProgress,
475}
476
477enum RevalidationStrategy<N> {
478 Always,
479 Light(RevalidationStatus<N>),
480}
481
482struct RevalidationAction {
483 revalidate: bool,
484 resubmit: bool,
485}
486
487impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
488 pub fn clear(&mut self) {
489 if let Self::Light(status) = self {
490 status.clear()
491 }
492 }
493
494 pub fn next(
495 &mut self,
496 block: N,
497 revalidate_time_period: Option<std::time::Duration>,
498 revalidate_block_period: Option<N>,
499 ) -> RevalidationAction {
500 match self {
501 Self::Light(status) => RevalidationAction {
502 revalidate: status.next_required(
503 block,
504 revalidate_time_period,
505 revalidate_block_period,
506 ),
507 resubmit: false,
508 },
509 Self::Always => RevalidationAction { revalidate: true, resubmit: true },
510 }
511 }
512}
513
514impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
515 pub fn clear(&mut self) {
517 *self = Self::NotScheduled;
518 }
519
520 pub fn next_required(
522 &mut self,
523 block: N,
524 revalidate_time_period: Option<std::time::Duration>,
525 revalidate_block_period: Option<N>,
526 ) -> bool {
527 match *self {
528 Self::NotScheduled => {
529 *self = Self::Scheduled(
530 revalidate_time_period.map(|period| Instant::now() + period),
531 revalidate_block_period.map(|period| block + period),
532 );
533 false
534 },
535 Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
536 let is_required =
537 revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
538 revalidate_at_block.map(|at| block >= at).unwrap_or(false);
539 if is_required {
540 *self = Self::InProgress;
541 }
542 is_required
543 },
544 Self::InProgress => false,
545 }
546 }
547}
548
549async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = Block>>(
551 block_hash: Block::Hash,
552 api: &Api,
553 pool: &graph::Pool<Api>,
554) -> Vec<ExtrinsicHash<Api>> {
555 let extrinsics = api
556 .block_body(block_hash)
557 .await
558 .unwrap_or_else(|e| {
559 log::warn!("Prune known transactions: error request: {}", e);
560 None
561 })
562 .unwrap_or_default();
563
564 let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
565
566 log::trace!(target: LOG_TARGET, "Pruning transactions: {:?}", hashes);
567
568 let header = match api.block_header(block_hash) {
569 Ok(Some(h)) => h,
570 Ok(None) => {
571 log::debug!(target: LOG_TARGET, "Could not find header for {:?}.", block_hash);
572 return hashes
573 },
574 Err(e) => {
575 log::debug!(target: LOG_TARGET, "Error retrieving header for {:?}: {}", block_hash, e);
576 return hashes
577 },
578 };
579
580 if let Err(e) = pool.prune(block_hash, *header.parent_hash(), &extrinsics).await {
581 log::error!("Cannot prune known in the pool: {}", e);
582 }
583
584 hashes
585}
586
587impl<PoolApi, Block> BasicPool<PoolApi, Block>
588where
589 Block: BlockT,
590 PoolApi: 'static + graph::ChainApi<Block = Block>,
591{
592 async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
596 log::trace!(target: LOG_TARGET, "handle_enactment tree_route: {tree_route:?}");
597 let pool = self.pool.clone();
598 let api = self.api.clone();
599
600 let (hash, block_number) = match tree_route.last() {
601 Some(HashAndNumber { hash, number }) => (hash, number),
602 None => {
603 log::warn!(
604 target: LOG_TARGET,
605 "Skipping ChainEvent - no last block in tree route {:?}",
606 tree_route,
607 );
608 return
609 },
610 };
611
612 let next_action = self.revalidation_strategy.lock().next(
613 *block_number,
614 Some(std::time::Duration::from_secs(60)),
615 Some(20u32.into()),
616 );
617
618 let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
621
622 for retracted in tree_route.retracted() {
628 pool.validated_pool().on_block_retracted(retracted.hash);
630 }
631
632 future::join_all(
633 tree_route
634 .enacted()
635 .iter()
636 .map(|h| prune_known_txs_for_block(h.hash, &*api, &*pool)),
637 )
638 .await
639 .into_iter()
640 .for_each(|enacted_log| {
641 pruned_log.extend(enacted_log);
642 });
643
644 self.metrics
645 .report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
646
647 if next_action.resubmit {
648 let mut resubmit_transactions = Vec::new();
649
650 for retracted in tree_route.retracted() {
651 let hash = retracted.hash;
652
653 let block_transactions = api
654 .block_body(hash)
655 .await
656 .unwrap_or_else(|e| {
657 log::warn!("Failed to fetch block body: {}", e);
658 None
659 })
660 .unwrap_or_default()
661 .into_iter()
662 .filter(|tx| tx.is_signed().unwrap_or(true));
663
664 let mut resubmitted_to_report = 0;
665
666 resubmit_transactions.extend(block_transactions.into_iter().filter(|tx| {
667 let tx_hash = pool.hash_of(tx);
668 let contains = pruned_log.contains(&tx_hash);
669
670 resubmitted_to_report += 1;
672
673 if !contains {
674 log::debug!(
675 target: LOG_TARGET,
676 "[{:?}]: Resubmitting from retracted block {:?}",
677 tx_hash,
678 hash,
679 );
680 }
681 !contains
682 }));
683
684 self.metrics.report(|metrics| {
685 metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
686 });
687 }
688
689 if let Err(e) = pool
690 .resubmit_at(
691 *hash,
692 TransactionSource::External,
695 resubmit_transactions,
696 )
697 .await
698 {
699 log::debug!(
700 target: LOG_TARGET,
701 "[{:?}] Error re-submitting transactions: {}",
702 hash,
703 e,
704 )
705 }
706 }
707
708 let extra_pool = pool.clone();
709 self.ready_poll
712 .lock()
713 .trigger(*block_number, move || Box::new(extra_pool.validated_pool().ready()));
714
715 if next_action.revalidate {
716 let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
717 self.revalidation_queue.revalidate_later(*hash, hashes).await;
718
719 self.revalidation_strategy.lock().clear();
720 }
721 }
722}
723
724#[async_trait]
725impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
726where
727 Block: BlockT,
728 PoolApi: 'static + graph::ChainApi<Block = Block>,
729{
730 async fn maintain(&self, event: ChainEvent<Self::Block>) {
731 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
732 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
733 match self.api.tree_route(from, to) {
734 Ok(tree_route) => Ok(tree_route),
735 Err(e) =>
736 return Err(format!(
737 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
738 )),
739 }
740 };
741 let block_id_to_number =
742 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
743
744 let result =
745 self.enactment_state
746 .lock()
747 .update(&event, &compute_tree_route, &block_id_to_number);
748
749 match result {
750 Err(msg) => {
751 log::debug!(target: LOG_TARGET, "{msg}");
752 self.enactment_state.lock().force_update(&event);
753 },
754 Ok(EnactmentAction::Skip) => return,
755 Ok(EnactmentAction::HandleFinalization) => {},
756 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
757 self.handle_enactment(tree_route).await;
758 },
759 };
760
761 if let ChainEvent::Finalized { hash, tree_route } = event {
762 log::trace!(
763 target: LOG_TARGET,
764 "on-finalized enacted: {tree_route:?}, previously finalized: \
765 {prev_finalized_block:?}",
766 );
767
768 for hash in tree_route.iter().chain(std::iter::once(&hash)) {
769 if let Err(e) = self.pool.validated_pool().on_block_finalized(*hash).await {
770 log::warn!(
771 target: LOG_TARGET,
772 "Error occurred while attempting to notify watchers about finalization {}: {}",
773 hash, e
774 )
775 }
776 }
777 }
778 }
779}
780
781pub async fn notification_future<Client, Pool, Block>(client: Arc<Client>, txpool: Arc<Pool>)
783where
784 Block: BlockT,
785 Client: sc_client_api::BlockchainEvents<Block>,
786 Pool: MaintainedTransactionPool<Block = Block>,
787{
788 let import_stream = client
789 .import_notification_stream()
790 .filter_map(|n| ready(n.try_into().ok()))
791 .fuse();
792 let finality_stream = client.finality_notification_stream().map(Into::into).fuse();
793
794 futures::stream::select(import_stream, finality_stream)
795 .for_each(|evt| txpool.maintain(evt))
796 .await
797}