1use crate::{
20 common::{
21 sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22 },
23 insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31 traits::SaturatedConversion,
32 transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35 collections::{HashMap, HashSet},
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42 base_pool::{self as base, PruneStatus},
43 listener::EventHandler,
44 pool::{
45 BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46 },
47 rotator::{BanReason, PoolRotator},
48 watcher::Watcher,
49};
50
51#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54 Valid(base::Transaction<Hash, Ex>),
56 Invalid(Hash, Error),
58 Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65 pub fn valid_at(
67 at: u64,
68 hash: Hash,
69 source: base::TimedTransactionSource,
70 data: Ex,
71 bytes: usize,
72 validity: ValidTransaction,
73 ) -> Self {
74 Self::Valid(base::Transaction {
75 data,
76 bytes,
77 hash,
78 source,
79 priority: validity.priority,
80 requires: validity.requires,
81 provides: validity.provides,
82 propagate: validity.propagate,
83 valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84 })
85 }
86
87 pub fn priority(&self) -> Option<TransactionPriority> {
89 match self {
90 ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91 _ => None,
92 }
93 }
94}
95
96pub type ValidatedTransactionFor<B> =
98 ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108 fn from(is_validator: bool) -> Self {
109 Self(Arc::new(Box::new(move || is_validator)))
110 }
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114 fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115 Self(Arc::new(is_validator))
116 }
117}
118
119pub struct BaseSubmitOutcome<B: ChainApi, W> {
121 hash: ExtrinsicHash<B>,
123 watcher: Option<W>,
125
126 priority: Option<TransactionPriority>,
128}
129
130pub type ValidatedPoolSubmitOutcome<B> =
132 BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135 pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137 Self { hash, priority, watcher: None }
138 }
139
140 pub fn with_watcher(mut self, watcher: W) -> Self {
142 self.watcher = Some(watcher);
143 self
144 }
145
146 pub fn priority(&self) -> Option<TransactionPriority> {
148 self.priority
149 }
150
151 pub fn hash(&self) -> ExtrinsicHash<B> {
153 self.hash
154 }
155
156 pub fn expect_watcher(&mut self) -> W {
159 self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160 }
161}
162
163pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165 api: Arc<B>,
166 is_validator: IsValidator,
167 options: Options,
168 event_dispatcher: RwLock<EventDispatcher<B, L>>,
169 pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170 import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171 rotator: PoolRotator<ExtrinsicHash<B>>,
172 enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176 fn clone(&self) -> Self {
177 Self {
178 api: self.api.clone(),
179 is_validator: self.is_validator.clone(),
180 options: self.options.clone(),
181 event_dispatcher: Default::default(),
182 pool: RwLock::from(self.pool.read().clone()),
183 import_notification_sinks: Default::default(),
184 rotator: self.rotator.clone(),
185 enforce_limits_stats: self.enforce_limits_stats.clone(),
186 }
187 }
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192 Self {
193 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194 event_handler,
195 ))),
196 ..self.clone()
197 }
198 }
199
200 pub fn new_with_staticly_sized_rotator(
202 options: Options,
203 is_validator: IsValidator,
204 api: Arc<B>,
205 ) -> Self {
206 let ban_time = options.ban_time;
207 Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208 }
209
210 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212 let ban_time = options.ban_time;
213 let total_count = options.total_count();
214 Self::new_with_rotator(
215 options,
216 is_validator,
217 api,
218 PoolRotator::new_with_expected_size(ban_time, total_count),
219 None,
220 )
221 }
222
223 pub fn new_with_event_handler(
225 options: Options,
226 is_validator: IsValidator,
227 api: Arc<B>,
228 event_handler: L,
229 ) -> Self {
230 let ban_time = options.ban_time;
231 let total_count = options.total_count();
232 Self::new_with_rotator(
233 options,
234 is_validator,
235 api,
236 PoolRotator::new_with_expected_size(ban_time, total_count),
237 Some(event_handler),
238 )
239 }
240
241 fn new_with_rotator(
242 options: Options,
243 is_validator: IsValidator,
244 api: Arc<B>,
245 rotator: PoolRotator<ExtrinsicHash<B>>,
246 event_handler: Option<L>,
247 ) -> Self {
248 let base_pool = base::BasePool::new(options.reject_future_transactions);
249 Self {
250 is_validator,
251 options,
252 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253 api,
254 pool: RwLock::new(base_pool),
255 import_notification_sinks: Default::default(),
256 rotator,
257 enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258 STAT_SLIDING_WINDOW,
259 )),
260 }
261 }
262
263 pub fn ban(
265 &self,
266 now: &Instant,
267 hashes: impl IntoIterator<Item = ExtrinsicHash<B>>,
268 reason: BanReason,
269 ) {
270 self.rotator.ban(now, hashes, reason)
271 }
272
273 pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
275 self.rotator.is_banned(hash)
276 }
277
278 pub fn unban_if_validation(&self, hash: &ExtrinsicHash<B>) -> bool {
283 self.rotator.unban_if_validation(hash)
284 }
285
286 pub fn check_is_known(
292 &self,
293 tx_hash: &ExtrinsicHash<B>,
294 ignore_banned: bool,
295 ) -> Result<(), B::Error> {
296 if !ignore_banned && self.is_banned(tx_hash) {
297 Err(error::Error::TemporarilyBanned.into())
298 } else if self.pool.read().is_imported(tx_hash) {
299 Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
300 } else {
301 Ok(())
302 }
303 }
304
305 pub fn submit(
307 &self,
308 txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
309 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
310 let results = txs
311 .into_iter()
312 .map(|validated_tx| self.submit_one(validated_tx))
313 .collect::<Vec<_>>();
314
315 let removed = if results.iter().any(|res| res.is_ok()) {
317 let start = Instant::now();
318 let removed = self.enforce_limits();
319 insert_and_log_throttled_sync!(
320 Level::DEBUG,
321 target:"txpool",
322 prefix:"enforce_limits_stats",
323 self.enforce_limits_stats,
324 start.elapsed().into()
325 );
326 removed
327 } else {
328 Default::default()
329 };
330
331 results
332 .into_iter()
333 .map(|res| match res {
334 Ok(outcome) if removed.contains(&outcome.hash) => {
335 Err(error::Error::ImmediatelyDropped.into())
336 },
337 other => other,
338 })
339 .collect()
340 }
341
342 fn submit_one(
344 &self,
345 tx: ValidatedTransactionFor<B>,
346 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
347 match tx {
348 ValidatedTransaction::Valid(tx) => {
349 let priority = tx.priority;
350 trace!(
351 target: LOG_TARGET,
352 tx_hash = ?tx.hash,
353 "ValidatedPool::submit_one"
354 );
355 if !tx.propagate && !(self.is_validator.0)() {
356 return Err(error::Error::Unactionable.into());
357 }
358
359 let imported = self.pool.write().import(tx)?;
360
361 if let base::Imported::Ready { ref hash, .. } = imported {
362 let sinks = &mut self.import_notification_sinks.lock();
363 sinks.retain_mut(|sink| match sink.try_send(*hash) {
364 Ok(()) => true,
365 Err(e) => {
366 if e.is_full() {
367 warn!(
368 target: LOG_TARGET,
369 tx_hash = ?hash,
370 "Trying to notify an import but the channel is full"
371 );
372 true
373 } else {
374 false
375 }
376 },
377 });
378 }
379
380 let mut event_dispatcher = self.event_dispatcher.write();
381 fire_events(&mut *event_dispatcher, &imported);
382 Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
383 },
384 ValidatedTransaction::Invalid(tx_hash, error) => {
385 trace!(
386 target: LOG_TARGET,
387 ?tx_hash,
388 ?error,
389 "ValidatedPool::submit_one invalid"
390 );
391 self.rotator
392 .ban(&Instant::now(), std::iter::once(tx_hash), BanReason::Validation);
393 Err(error)
394 },
395 ValidatedTransaction::Unknown(tx_hash, error) => {
396 trace!(
397 target: LOG_TARGET,
398 ?tx_hash,
399 ?error,
400 "ValidatedPool::submit_one unknown"
401 );
402 self.event_dispatcher.write().invalid(&tx_hash);
403 Err(error)
404 },
405 }
406 }
407
408 fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
409 let status = self.pool.read().status();
410 let ready_limit = &self.options.ready;
411 let future_limit = &self.options.future;
412
413 if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
414 future_limit.is_exceeded(status.future, status.future_bytes)
415 {
416 trace!(
417 target: LOG_TARGET,
418 ready_count = ready_limit.count,
419 ready_kb = ready_limit.total_bytes / 1024,
420 future_count = future_limit.count,
421 future_kb = future_limit.total_bytes / 1024,
422 "Enforcing limits"
423 );
424
425 let removed = {
427 let mut pool = self.pool.write();
428 let removed = pool
429 .enforce_limits(ready_limit, future_limit)
430 .into_iter()
431 .map(|x| x.hash)
432 .collect::<HashSet<_>>();
433 self.rotator.ban(
435 &Instant::now(),
436 removed.iter().copied(),
437 BanReason::LimitsEnforced,
438 );
439 removed
440 };
441 if !removed.is_empty() {
442 trace!(
443 target: LOG_TARGET,
444 dropped_count = removed.len(),
445 "Enforcing limits"
446 );
447 }
448
449 let mut event_dispatcher = self.event_dispatcher.write();
451 for h in &removed {
452 event_dispatcher.limits_enforced(h);
453 }
454
455 removed
456 } else {
457 Default::default()
458 }
459 }
460
461 pub fn submit_and_watch(
463 &self,
464 tx: ValidatedTransactionFor<B>,
465 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
466 match tx {
467 ValidatedTransaction::Valid(tx) => {
468 let hash = self.api.hash_and_length(&tx.data).0;
469 let watcher = self.create_watcher(hash);
470 self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
471 .pop()
472 .expect("One extrinsic passed; one result returned; qed")
473 .map(|outcome| outcome.with_watcher(watcher))
474 },
475 ValidatedTransaction::Invalid(hash, err) => {
476 self.rotator.ban(&Instant::now(), std::iter::once(hash), BanReason::Validation);
477 Err(err)
478 },
479 ValidatedTransaction::Unknown(_, err) => Err(err),
480 }
481 }
482
483 pub fn create_watcher(
485 &self,
486 tx_hash: ExtrinsicHash<B>,
487 ) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
488 self.event_dispatcher.write().create_watcher(tx_hash)
489 }
490
491 pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
493 self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
494 }
495
496 pub fn resubmit(
501 &self,
502 mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
503 ) {
504 #[derive(Debug, Clone, Copy, PartialEq)]
505 enum Status {
506 Future,
507 Ready,
508 Failed,
509 Dropped,
510 }
511
512 let (mut initial_statuses, final_statuses) = {
513 let mut pool = self.pool.write();
514
515 let mut initial_statuses = HashMap::new();
523 let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
524 while !updated_transactions.is_empty() {
525 let hash = updated_transactions
526 .keys()
527 .next()
528 .cloned()
529 .expect("transactions is not empty; qed");
530
531 let removed = pool.remove_subtree(&[hash]);
535 for removed_tx in removed {
536 let removed_hash = removed_tx.hash;
537 let updated_transaction = updated_transactions.shift_remove(&removed_hash);
538 let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
539 updated_tx
540 } else {
541 let transaction = match Arc::try_unwrap(removed_tx) {
544 Ok(transaction) => transaction,
545 Err(transaction) => transaction.duplicate(),
546 };
547 ValidatedTransaction::Valid(transaction)
548 };
549
550 initial_statuses.insert(removed_hash, Status::Ready);
551 txs_to_resubmit.push((removed_hash, tx_to_resubmit));
552 }
553 updated_transactions.shift_remove(&hash);
555 }
556
557 pool.with_futures_enabled(|pool, reject_future_transactions| {
562 let mut final_statuses = HashMap::new();
564 for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
565 match tx_to_resubmit {
566 ValidatedTransaction::Valid(tx) => match pool.import(tx) {
567 Ok(imported) => match imported {
568 base::Imported::Ready { promoted, failed, removed, .. } => {
569 final_statuses.insert(tx_hash, Status::Ready);
570 for hash in promoted {
571 final_statuses.insert(hash, Status::Ready);
572 }
573 for hash in failed {
574 final_statuses.insert(hash, Status::Failed);
575 }
576 for tx in removed {
577 final_statuses.insert(tx.hash, Status::Dropped);
578 }
579 },
580 base::Imported::Future { .. } => {
581 final_statuses.insert(tx_hash, Status::Future);
582 },
583 },
584 Err(error) => {
585 warn!(
590 target: LOG_TARGET,
591 ?tx_hash,
592 %error,
593 "Removing invalid transaction from update"
594 );
595 final_statuses.insert(tx_hash, Status::Failed);
596 },
597 },
598 ValidatedTransaction::Invalid(_, _) |
599 ValidatedTransaction::Unknown(_, _) => {
600 final_statuses.insert(tx_hash, Status::Failed);
601 },
602 }
603 }
604
605 if reject_future_transactions {
608 for future_tx in pool.clear_future() {
609 final_statuses.insert(future_tx.hash, Status::Dropped);
610 }
611 }
612
613 (initial_statuses, final_statuses)
614 })
615 };
616
617 let mut event_dispatcher = self.event_dispatcher.write();
619 for (hash, final_status) in final_statuses {
620 let initial_status = initial_statuses.remove(&hash);
621 if initial_status.is_none() || Some(final_status) != initial_status {
622 match final_status {
623 Status::Future => event_dispatcher.future(&hash),
624 Status::Ready => event_dispatcher.ready(&hash, None),
625 Status::Dropped => event_dispatcher.dropped(&hash),
626 Status::Failed => event_dispatcher.invalid(&hash),
627 }
628 }
629 }
630 }
631
632 pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
634 self.pool
635 .read()
636 .by_hashes(hashes)
637 .into_iter()
638 .map(|existing_in_pool| {
639 existing_in_pool.map(|transaction| transaction.provides.to_vec())
640 })
641 .collect()
642 }
643
644 pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
646 self.pool.read().ready_by_hash(hash)
647 }
648
649 pub fn prune_tags(
651 &self,
652 tags: impl IntoIterator<Item = Tag>,
653 ) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
654 let status = self.pool.write().prune_tags(tags);
656 {
659 let mut event_dispatcher = self.event_dispatcher.write();
660 for promoted in &status.promoted {
661 fire_events(&mut *event_dispatcher, promoted);
662 }
663 for f in &status.failed {
664 event_dispatcher.dropped(f);
665 }
666 }
667
668 status
669 }
670
671 pub fn resubmit_pruned(
673 &self,
674 at: &HashAndNumber<B::Block>,
675 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
676 pruned_hashes: Vec<ExtrinsicHash<B>>,
677 pruned_xts: Vec<ValidatedTransactionFor<B>>,
678 ) {
679 debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
680
681 self.submit(pruned_xts);
687
688 self.fire_pruned(at, known_imported_hashes.into_iter());
691
692 self.clear_stale(at);
695 }
696
697 pub fn fire_pruned(
699 &self,
700 at: &HashAndNumber<B::Block>,
701 hashes: impl Iterator<Item = ExtrinsicHash<B>>,
702 ) {
703 let mut event_dispatcher = self.event_dispatcher.write();
704 let mut set = HashSet::with_capacity(hashes.size_hint().0);
705 for h in hashes {
706 if !set.contains(&h) {
709 event_dispatcher.pruned(at.hash, &h);
710 set.insert(h);
711 }
712 }
713 }
714
715 pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
721 let HashAndNumber { number, .. } = *at;
722 let number = number.saturated_into::<u64>();
723 let now = Instant::now();
724 let to_remove = {
725 self.ready()
726 .filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
727 .map(|tx| tx.hash)
728 .collect::<Vec<_>>()
729 };
730 let futures_to_remove: Vec<ExtrinsicHash<B>> = {
731 let p = self.pool.read();
732 let mut hashes = Vec::new();
733 for tx in p.futures() {
734 if self.rotator.ban_if_stale(&now, number, tx) {
735 hashes.push(tx.hash);
736 }
737 }
738 hashes
739 };
740 debug!(
741 target:LOG_TARGET,
742 to_remove_len=to_remove.len(),
743 futures_to_remove_len=futures_to_remove.len(),
744 "clear_stale"
745 );
746 self.remove_invalid(&to_remove);
748 self.remove_invalid(&futures_to_remove);
749 self.rotator.clear_timeouts(&now);
751 }
752
753 pub fn api(&self) -> &B {
755 &self.api
756 }
757
758 pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
763 const CHANNEL_BUFFER_SIZE: usize = 1024;
764
765 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
766 self.import_notification_sinks.lock().push(sink);
767 stream
768 }
769
770 pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
772 let mut event_dispatcher = self.event_dispatcher.write();
773 for (hash, peers) in propagated.into_iter() {
774 event_dispatcher.broadcasted(&hash, peers);
775 }
776 }
777
778 pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
790 if hashes.is_empty() {
792 return vec![];
793 }
794
795 let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
796 listener.invalid(&removed_tx_hash);
797 });
798
799 trace!(
800 target: LOG_TARGET,
801 removed_count = hashes.len(),
802 invalid_count = invalid.len(),
803 "Removed invalid transactions"
804 );
805 log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
806
807 invalid
808 }
809
810 pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
812 self.pool.read().ready()
813 }
814
815 pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
817 self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
818 }
819
820 pub fn status(&self) -> PoolStatus {
822 self.pool.read().status()
823 }
824
825 pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
827 trace!(
828 target: LOG_TARGET,
829 ?block_hash,
830 "Attempting to notify watchers of finalization"
831 );
832 self.event_dispatcher.write().finalized(block_hash);
833 Ok(())
834 }
835
836 pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
838 self.event_dispatcher.write().retracted(block_hash)
839 }
840
841 pub fn retrigger_notifications(&self) {
846 let pool = self.pool.read();
847 let mut event_dispatcher = self.event_dispatcher.write();
848 pool.ready().for_each(|r| {
849 event_dispatcher.ready(&r.hash, None);
850 });
851 pool.futures().for_each(|f| {
852 event_dispatcher.future(&f.hash);
853 });
854 }
855
856 pub fn remove_subtree<F>(
871 &self,
872 hashes: &[ExtrinsicHash<B>],
873 ban_transactions: bool,
874 event_dispatcher_action: F,
875 ) -> Vec<TransactionFor<B>>
876 where
877 F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
878 {
879 if ban_transactions {
881 self.rotator.ban(&Instant::now(), hashes.iter().cloned(), BanReason::Validation);
882 };
883 let removed = self.pool.write().remove_subtree(hashes);
884
885 removed
886 .into_iter()
887 .map(|tx| {
888 let removed_tx_hash = tx.hash;
889 let mut event_dispatcher = self.event_dispatcher.write();
890 event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
891 tx.clone()
892 })
893 .collect::<Vec<_>>()
894 }
895}
896
897fn fire_events<B, L, Ex>(
898 event_dispatcher: &mut EventDispatcher<B, L>,
899 imported: &base::Imported<ExtrinsicHash<B>, Ex>,
900) where
901 B: ChainApi,
902 L: EventHandler<B>,
903{
904 match *imported {
905 base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
906 event_dispatcher.ready(hash, None);
907 failed.iter().for_each(|f| event_dispatcher.invalid(f));
908 removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
909 promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
910 },
911 base::Imported::Future { ref hash } => event_dispatcher.future(hash),
912 }
913}