1use crate::{
20 common::{
21 sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22 },
23 insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31 traits::SaturatedConversion,
32 transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35 collections::{HashMap, HashSet},
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42 base_pool::{self as base, PruneStatus},
43 listener::EventHandler,
44 pool::{
45 BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46 },
47 rotator::PoolRotator,
48 watcher::Watcher,
49};
50
51#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54 Valid(base::Transaction<Hash, Ex>),
56 Invalid(Hash, Error),
58 Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65 pub fn valid_at(
67 at: u64,
68 hash: Hash,
69 source: base::TimedTransactionSource,
70 data: Ex,
71 bytes: usize,
72 validity: ValidTransaction,
73 ) -> Self {
74 Self::Valid(base::Transaction {
75 data,
76 bytes,
77 hash,
78 source,
79 priority: validity.priority,
80 requires: validity.requires,
81 provides: validity.provides,
82 propagate: validity.propagate,
83 valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84 })
85 }
86
87 pub fn priority(&self) -> Option<TransactionPriority> {
89 match self {
90 ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91 _ => None,
92 }
93 }
94}
95
96pub type ValidatedTransactionFor<B> =
98 ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108 fn from(is_validator: bool) -> Self {
109 Self(Arc::new(Box::new(move || is_validator)))
110 }
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114 fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115 Self(Arc::new(is_validator))
116 }
117}
118
119pub struct BaseSubmitOutcome<B: ChainApi, W> {
121 hash: ExtrinsicHash<B>,
123 watcher: Option<W>,
125
126 priority: Option<TransactionPriority>,
128}
129
130pub type ValidatedPoolSubmitOutcome<B> =
132 BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135 pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137 Self { hash, priority, watcher: None }
138 }
139
140 pub fn with_watcher(mut self, watcher: W) -> Self {
142 self.watcher = Some(watcher);
143 self
144 }
145
146 pub fn priority(&self) -> Option<TransactionPriority> {
148 self.priority
149 }
150
151 pub fn hash(&self) -> ExtrinsicHash<B> {
153 self.hash
154 }
155
156 pub fn expect_watcher(&mut self) -> W {
159 self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160 }
161}
162
163pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165 api: Arc<B>,
166 is_validator: IsValidator,
167 options: Options,
168 event_dispatcher: RwLock<EventDispatcher<B, L>>,
169 pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170 import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171 rotator: PoolRotator<ExtrinsicHash<B>>,
172 enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176 fn clone(&self) -> Self {
177 Self {
178 api: self.api.clone(),
179 is_validator: self.is_validator.clone(),
180 options: self.options.clone(),
181 event_dispatcher: Default::default(),
182 pool: RwLock::from(self.pool.read().clone()),
183 import_notification_sinks: Default::default(),
184 rotator: self.rotator.clone(),
185 enforce_limits_stats: self.enforce_limits_stats.clone(),
186 }
187 }
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192 Self {
193 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194 event_handler,
195 ))),
196 ..self.clone()
197 }
198 }
199
200 pub fn new_with_staticly_sized_rotator(
202 options: Options,
203 is_validator: IsValidator,
204 api: Arc<B>,
205 ) -> Self {
206 let ban_time = options.ban_time;
207 Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208 }
209
210 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212 let ban_time = options.ban_time;
213 let total_count = options.total_count();
214 Self::new_with_rotator(
215 options,
216 is_validator,
217 api,
218 PoolRotator::new_with_expected_size(ban_time, total_count),
219 None,
220 )
221 }
222
223 pub fn new_with_event_handler(
225 options: Options,
226 is_validator: IsValidator,
227 api: Arc<B>,
228 event_handler: L,
229 ) -> Self {
230 let ban_time = options.ban_time;
231 let total_count = options.total_count();
232 Self::new_with_rotator(
233 options,
234 is_validator,
235 api,
236 PoolRotator::new_with_expected_size(ban_time, total_count),
237 Some(event_handler),
238 )
239 }
240
241 fn new_with_rotator(
242 options: Options,
243 is_validator: IsValidator,
244 api: Arc<B>,
245 rotator: PoolRotator<ExtrinsicHash<B>>,
246 event_handler: Option<L>,
247 ) -> Self {
248 let base_pool = base::BasePool::new(options.reject_future_transactions);
249 Self {
250 is_validator,
251 options,
252 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253 api,
254 pool: RwLock::new(base_pool),
255 import_notification_sinks: Default::default(),
256 rotator,
257 enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258 STAT_SLIDING_WINDOW,
259 )),
260 }
261 }
262
263 pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
265 self.rotator.ban(now, hashes)
266 }
267
268 pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
270 self.rotator.is_banned(hash)
271 }
272
273 pub fn check_is_known(
279 &self,
280 tx_hash: &ExtrinsicHash<B>,
281 ignore_banned: bool,
282 ) -> Result<(), B::Error> {
283 if !ignore_banned && self.is_banned(tx_hash) {
284 Err(error::Error::TemporarilyBanned.into())
285 } else if self.pool.read().is_imported(tx_hash) {
286 Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
287 } else {
288 Ok(())
289 }
290 }
291
292 pub fn submit(
294 &self,
295 txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
296 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
297 let results = txs
298 .into_iter()
299 .map(|validated_tx| self.submit_one(validated_tx))
300 .collect::<Vec<_>>();
301
302 let removed = if results.iter().any(|res| res.is_ok()) {
304 let start = Instant::now();
305 let removed = self.enforce_limits();
306 insert_and_log_throttled_sync!(
307 Level::DEBUG,
308 target:"txpool",
309 prefix:"enforce_limits_stats",
310 self.enforce_limits_stats,
311 start.elapsed().into()
312 );
313 removed
314 } else {
315 Default::default()
316 };
317
318 results
319 .into_iter()
320 .map(|res| match res {
321 Ok(outcome) if removed.contains(&outcome.hash) =>
322 Err(error::Error::ImmediatelyDropped.into()),
323 other => other,
324 })
325 .collect()
326 }
327
328 fn submit_one(
330 &self,
331 tx: ValidatedTransactionFor<B>,
332 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
333 match tx {
334 ValidatedTransaction::Valid(tx) => {
335 let priority = tx.priority;
336 trace!(
337 target: LOG_TARGET,
338 tx_hash = ?tx.hash,
339 "ValidatedPool::submit_one"
340 );
341 if !tx.propagate && !(self.is_validator.0)() {
342 return Err(error::Error::Unactionable.into())
343 }
344
345 let imported = self.pool.write().import(tx)?;
346
347 if let base::Imported::Ready { ref hash, .. } = imported {
348 let sinks = &mut self.import_notification_sinks.lock();
349 sinks.retain_mut(|sink| match sink.try_send(*hash) {
350 Ok(()) => true,
351 Err(e) =>
352 if e.is_full() {
353 warn!(
354 target: LOG_TARGET,
355 tx_hash = ?hash,
356 "Trying to notify an import but the channel is full"
357 );
358 true
359 } else {
360 false
361 },
362 });
363 }
364
365 let mut event_dispatcher = self.event_dispatcher.write();
366 fire_events(&mut *event_dispatcher, &imported);
367 Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
368 },
369 ValidatedTransaction::Invalid(tx_hash, error) => {
370 trace!(
371 target: LOG_TARGET,
372 ?tx_hash,
373 ?error,
374 "ValidatedPool::submit_one invalid"
375 );
376 self.rotator.ban(&Instant::now(), std::iter::once(tx_hash));
377 Err(error)
378 },
379 ValidatedTransaction::Unknown(tx_hash, error) => {
380 trace!(
381 target: LOG_TARGET,
382 ?tx_hash,
383 ?error,
384 "ValidatedPool::submit_one unknown"
385 );
386 self.event_dispatcher.write().invalid(&tx_hash);
387 Err(error)
388 },
389 }
390 }
391
392 fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
393 let status = self.pool.read().status();
394 let ready_limit = &self.options.ready;
395 let future_limit = &self.options.future;
396
397 if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
398 future_limit.is_exceeded(status.future, status.future_bytes)
399 {
400 trace!(
401 target: LOG_TARGET,
402 ready_count = ready_limit.count,
403 ready_kb = ready_limit.total_bytes / 1024,
404 future_count = future_limit.count,
405 future_kb = future_limit.total_bytes / 1024,
406 "Enforcing limits"
407 );
408
409 let removed = {
411 let mut pool = self.pool.write();
412 let removed = pool
413 .enforce_limits(ready_limit, future_limit)
414 .into_iter()
415 .map(|x| x.hash)
416 .collect::<HashSet<_>>();
417 self.rotator.ban(&Instant::now(), removed.iter().copied());
419 removed
420 };
421 if !removed.is_empty() {
422 trace!(
423 target: LOG_TARGET,
424 dropped_count = removed.len(),
425 "Enforcing limits"
426 );
427 }
428
429 let mut event_dispatcher = self.event_dispatcher.write();
431 for h in &removed {
432 event_dispatcher.limits_enforced(h);
433 }
434
435 removed
436 } else {
437 Default::default()
438 }
439 }
440
441 pub fn submit_and_watch(
443 &self,
444 tx: ValidatedTransactionFor<B>,
445 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
446 match tx {
447 ValidatedTransaction::Valid(tx) => {
448 let hash = self.api.hash_and_length(&tx.data).0;
449 let watcher = self.create_watcher(hash);
450 self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
451 .pop()
452 .expect("One extrinsic passed; one result returned; qed")
453 .map(|outcome| outcome.with_watcher(watcher))
454 },
455 ValidatedTransaction::Invalid(hash, err) => {
456 self.rotator.ban(&Instant::now(), std::iter::once(hash));
457 Err(err)
458 },
459 ValidatedTransaction::Unknown(_, err) => Err(err),
460 }
461 }
462
463 pub fn create_watcher(
465 &self,
466 tx_hash: ExtrinsicHash<B>,
467 ) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
468 self.event_dispatcher.write().create_watcher(tx_hash)
469 }
470
471 pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
473 self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
474 }
475
476 pub fn resubmit(
481 &self,
482 mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
483 ) {
484 #[derive(Debug, Clone, Copy, PartialEq)]
485 enum Status {
486 Future,
487 Ready,
488 Failed,
489 Dropped,
490 }
491
492 let (mut initial_statuses, final_statuses) = {
493 let mut pool = self.pool.write();
494
495 let mut initial_statuses = HashMap::new();
503 let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
504 while !updated_transactions.is_empty() {
505 let hash = updated_transactions
506 .keys()
507 .next()
508 .cloned()
509 .expect("transactions is not empty; qed");
510
511 let removed = pool.remove_subtree(&[hash]);
515 for removed_tx in removed {
516 let removed_hash = removed_tx.hash;
517 let updated_transaction = updated_transactions.shift_remove(&removed_hash);
518 let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
519 updated_tx
520 } else {
521 let transaction = match Arc::try_unwrap(removed_tx) {
524 Ok(transaction) => transaction,
525 Err(transaction) => transaction.duplicate(),
526 };
527 ValidatedTransaction::Valid(transaction)
528 };
529
530 initial_statuses.insert(removed_hash, Status::Ready);
531 txs_to_resubmit.push((removed_hash, tx_to_resubmit));
532 }
533 updated_transactions.shift_remove(&hash);
535 }
536
537 pool.with_futures_enabled(|pool, reject_future_transactions| {
542 let mut final_statuses = HashMap::new();
544 for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
545 match tx_to_resubmit {
546 ValidatedTransaction::Valid(tx) => match pool.import(tx) {
547 Ok(imported) => match imported {
548 base::Imported::Ready { promoted, failed, removed, .. } => {
549 final_statuses.insert(tx_hash, Status::Ready);
550 for hash in promoted {
551 final_statuses.insert(hash, Status::Ready);
552 }
553 for hash in failed {
554 final_statuses.insert(hash, Status::Failed);
555 }
556 for tx in removed {
557 final_statuses.insert(tx.hash, Status::Dropped);
558 }
559 },
560 base::Imported::Future { .. } => {
561 final_statuses.insert(tx_hash, Status::Future);
562 },
563 },
564 Err(error) => {
565 warn!(
570 target: LOG_TARGET,
571 ?tx_hash,
572 %error,
573 "Removing invalid transaction from update"
574 );
575 final_statuses.insert(tx_hash, Status::Failed);
576 },
577 },
578 ValidatedTransaction::Invalid(_, _) |
579 ValidatedTransaction::Unknown(_, _) => {
580 final_statuses.insert(tx_hash, Status::Failed);
581 },
582 }
583 }
584
585 if reject_future_transactions {
588 for future_tx in pool.clear_future() {
589 final_statuses.insert(future_tx.hash, Status::Dropped);
590 }
591 }
592
593 (initial_statuses, final_statuses)
594 })
595 };
596
597 let mut event_dispatcher = self.event_dispatcher.write();
599 for (hash, final_status) in final_statuses {
600 let initial_status = initial_statuses.remove(&hash);
601 if initial_status.is_none() || Some(final_status) != initial_status {
602 match final_status {
603 Status::Future => event_dispatcher.future(&hash),
604 Status::Ready => event_dispatcher.ready(&hash, None),
605 Status::Dropped => event_dispatcher.dropped(&hash),
606 Status::Failed => event_dispatcher.invalid(&hash),
607 }
608 }
609 }
610 }
611
612 pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
614 self.pool
615 .read()
616 .by_hashes(hashes)
617 .into_iter()
618 .map(|existing_in_pool| {
619 existing_in_pool.map(|transaction| transaction.provides.to_vec())
620 })
621 .collect()
622 }
623
624 pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
626 self.pool.read().ready_by_hash(hash)
627 }
628
629 pub fn prune_tags(
631 &self,
632 tags: impl IntoIterator<Item = Tag>,
633 ) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
634 let status = self.pool.write().prune_tags(tags);
636 {
639 let mut event_dispatcher = self.event_dispatcher.write();
640 for promoted in &status.promoted {
641 fire_events(&mut *event_dispatcher, promoted);
642 }
643 for f in &status.failed {
644 event_dispatcher.dropped(f);
645 }
646 }
647
648 status
649 }
650
651 pub fn resubmit_pruned(
653 &self,
654 at: &HashAndNumber<B::Block>,
655 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
656 pruned_hashes: Vec<ExtrinsicHash<B>>,
657 pruned_xts: Vec<ValidatedTransactionFor<B>>,
658 ) {
659 debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
660
661 let results = self.submit(pruned_xts);
663
664 let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
667 match r.map_err(error::IntoPoolError::into_pool_error) {
668 Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
669 _ => None,
670 }
671 });
672 let hashes = hashes.chain(known_imported_hashes.into_iter());
675 self.fire_pruned(at, hashes);
676
677 self.clear_stale(at);
680 }
681
682 pub fn fire_pruned(
684 &self,
685 at: &HashAndNumber<B::Block>,
686 hashes: impl Iterator<Item = ExtrinsicHash<B>>,
687 ) {
688 let mut event_dispatcher = self.event_dispatcher.write();
689 let mut set = HashSet::with_capacity(hashes.size_hint().0);
690 for h in hashes {
691 if !set.contains(&h) {
694 event_dispatcher.pruned(at.hash, &h);
695 set.insert(h);
696 }
697 }
698 }
699
700 pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
706 let HashAndNumber { number, .. } = *at;
707 let number = number.saturated_into::<u64>();
708 let now = Instant::now();
709 let to_remove = {
710 self.ready()
711 .filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
712 .map(|tx| tx.hash)
713 .collect::<Vec<_>>()
714 };
715 let futures_to_remove: Vec<ExtrinsicHash<B>> = {
716 let p = self.pool.read();
717 let mut hashes = Vec::new();
718 for tx in p.futures() {
719 if self.rotator.ban_if_stale(&now, number, tx) {
720 hashes.push(tx.hash);
721 }
722 }
723 hashes
724 };
725 debug!(
726 target:LOG_TARGET,
727 to_remove_len=to_remove.len(),
728 futures_to_remove_len=futures_to_remove.len(),
729 "clear_stale"
730 );
731 self.remove_invalid(&to_remove);
733 self.remove_invalid(&futures_to_remove);
734 self.rotator.clear_timeouts(&now);
736 }
737
738 pub fn api(&self) -> &B {
740 &self.api
741 }
742
743 pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
748 const CHANNEL_BUFFER_SIZE: usize = 1024;
749
750 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
751 self.import_notification_sinks.lock().push(sink);
752 stream
753 }
754
755 pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
757 let mut event_dispatcher = self.event_dispatcher.write();
758 for (hash, peers) in propagated.into_iter() {
759 event_dispatcher.broadcasted(&hash, peers);
760 }
761 }
762
763 pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
775 if hashes.is_empty() {
777 return vec![]
778 }
779
780 let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
781 listener.invalid(&removed_tx_hash);
782 });
783
784 trace!(
785 target: LOG_TARGET,
786 removed_count = hashes.len(),
787 invalid_count = invalid.len(),
788 "Removed invalid transactions"
789 );
790 log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
791
792 invalid
793 }
794
795 pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
797 self.pool.read().ready()
798 }
799
800 pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
802 self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
803 }
804
805 pub fn status(&self) -> PoolStatus {
807 self.pool.read().status()
808 }
809
810 pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
812 trace!(
813 target: LOG_TARGET,
814 ?block_hash,
815 "Attempting to notify watchers of finalization"
816 );
817 self.event_dispatcher.write().finalized(block_hash);
818 Ok(())
819 }
820
821 pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
823 self.event_dispatcher.write().retracted(block_hash)
824 }
825
826 pub fn retrigger_notifications(&self) {
831 let pool = self.pool.read();
832 let mut event_dispatcher = self.event_dispatcher.write();
833 pool.ready().for_each(|r| {
834 event_dispatcher.ready(&r.hash, None);
835 });
836 pool.futures().for_each(|f| {
837 event_dispatcher.future(&f.hash);
838 });
839 }
840
841 pub fn remove_subtree<F>(
856 &self,
857 hashes: &[ExtrinsicHash<B>],
858 ban_transactions: bool,
859 event_dispatcher_action: F,
860 ) -> Vec<TransactionFor<B>>
861 where
862 F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
863 {
864 if ban_transactions {
866 self.rotator.ban(&Instant::now(), hashes.iter().cloned());
867 };
868 let removed = self.pool.write().remove_subtree(hashes);
869
870 removed
871 .into_iter()
872 .map(|tx| {
873 let removed_tx_hash = tx.hash;
874 let mut event_dispatcher = self.event_dispatcher.write();
875 event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
876 tx.clone()
877 })
878 .collect::<Vec<_>>()
879 }
880}
881
882fn fire_events<B, L, Ex>(
883 event_dispatcher: &mut EventDispatcher<B, L>,
884 imported: &base::Imported<ExtrinsicHash<B>, Ex>,
885) where
886 B: ChainApi,
887 L: EventHandler<B>,
888{
889 match *imported {
890 base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
891 event_dispatcher.ready(hash, None);
892 failed.iter().for_each(|f| event_dispatcher.invalid(f));
893 removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
894 promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
895 },
896 base::Imported::Future { ref hash } => event_dispatcher.future(hash),
897 }
898}