1use crate::{
20 common::{
21 sliding_stat::SyncDurationSlidingStats, tracing_log_xt::log_xt_trace, STAT_SLIDING_WINDOW,
22 },
23 insert_and_log_throttled_sync, LOG_TARGET,
24};
25use futures::channel::mpsc::{channel, Sender};
26use indexmap::IndexMap;
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
29use sp_blockchain::HashAndNumber;
30use sp_runtime::{
31 traits::SaturatedConversion,
32 transaction_validity::{TransactionTag as Tag, ValidTransaction},
33};
34use std::{
35 collections::{HashMap, HashSet},
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tracing::{debug, trace, warn, Level};
40
41use super::{
42 base_pool::{self as base, PruneStatus},
43 listener::EventHandler,
44 pool::{
45 BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
46 },
47 rotator::PoolRotator,
48 watcher::Watcher,
49};
50
51#[derive(Debug)]
53pub enum ValidatedTransaction<Hash, Ex, Error> {
54 Valid(base::Transaction<Hash, Ex>),
56 Invalid(Hash, Error),
58 Unknown(Hash, Error),
62}
63
64impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
65 pub fn valid_at(
67 at: u64,
68 hash: Hash,
69 source: base::TimedTransactionSource,
70 data: Ex,
71 bytes: usize,
72 validity: ValidTransaction,
73 ) -> Self {
74 Self::Valid(base::Transaction {
75 data,
76 bytes,
77 hash,
78 source,
79 priority: validity.priority,
80 requires: validity.requires,
81 provides: validity.provides,
82 propagate: validity.propagate,
83 valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
84 })
85 }
86
87 pub fn priority(&self) -> Option<TransactionPriority> {
89 match self {
90 ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
91 _ => None,
92 }
93 }
94}
95
96pub type ValidatedTransactionFor<B> =
98 ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
99
100pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>;
102
103#[derive(Clone)]
105pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
106
107impl From<bool> for IsValidator {
108 fn from(is_validator: bool) -> Self {
109 Self(Arc::new(Box::new(move || is_validator)))
110 }
111}
112
113impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
114 fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
115 Self(Arc::new(is_validator))
116 }
117}
118
119pub struct BaseSubmitOutcome<B: ChainApi, W> {
121 hash: ExtrinsicHash<B>,
123 watcher: Option<W>,
125
126 priority: Option<TransactionPriority>,
128}
129
130pub type ValidatedPoolSubmitOutcome<B> =
132 BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
133
134impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
135 pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
137 Self { hash, priority, watcher: None }
138 }
139
140 pub fn with_watcher(mut self, watcher: W) -> Self {
142 self.watcher = Some(watcher);
143 self
144 }
145
146 pub fn priority(&self) -> Option<TransactionPriority> {
148 self.priority
149 }
150
151 pub fn hash(&self) -> ExtrinsicHash<B> {
153 self.hash
154 }
155
156 pub fn expect_watcher(&mut self) -> W {
159 self.watcher.take().expect("watcher was set in submit_and_watch. qed")
160 }
161}
162
163pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> {
165 api: Arc<B>,
166 is_validator: IsValidator,
167 options: Options,
168 event_dispatcher: RwLock<EventDispatcher<B, L>>,
169 pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
170 import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
171 rotator: PoolRotator<ExtrinsicHash<B>>,
172 enforce_limits_stats: SyncDurationSlidingStats,
173}
174
175impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> {
176 fn clone(&self) -> Self {
177 Self {
178 api: self.api.clone(),
179 is_validator: self.is_validator.clone(),
180 options: self.options.clone(),
181 event_dispatcher: Default::default(),
182 pool: RwLock::from(self.pool.read().clone()),
183 import_notification_sinks: Default::default(),
184 rotator: self.rotator.clone(),
185 enforce_limits_stats: self.enforce_limits_stats.clone(),
186 }
187 }
188}
189
190impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
191 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
192 Self {
193 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some(
194 event_handler,
195 ))),
196 ..self.clone()
197 }
198 }
199
200 pub fn new_with_staticly_sized_rotator(
202 options: Options,
203 is_validator: IsValidator,
204 api: Arc<B>,
205 ) -> Self {
206 let ban_time = options.ban_time;
207 Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None)
208 }
209
210 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
212 let ban_time = options.ban_time;
213 let total_count = options.total_count();
214 Self::new_with_rotator(
215 options,
216 is_validator,
217 api,
218 PoolRotator::new_with_expected_size(ban_time, total_count),
219 None,
220 )
221 }
222
223 pub fn new_with_event_handler(
225 options: Options,
226 is_validator: IsValidator,
227 api: Arc<B>,
228 event_handler: L,
229 ) -> Self {
230 let ban_time = options.ban_time;
231 let total_count = options.total_count();
232 Self::new_with_rotator(
233 options,
234 is_validator,
235 api,
236 PoolRotator::new_with_expected_size(ban_time, total_count),
237 Some(event_handler),
238 )
239 }
240
241 fn new_with_rotator(
242 options: Options,
243 is_validator: IsValidator,
244 api: Arc<B>,
245 rotator: PoolRotator<ExtrinsicHash<B>>,
246 event_handler: Option<L>,
247 ) -> Self {
248 let base_pool = base::BasePool::new(options.reject_future_transactions);
249 Self {
250 is_validator,
251 options,
252 event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)),
253 api,
254 pool: RwLock::new(base_pool),
255 import_notification_sinks: Default::default(),
256 rotator,
257 enforce_limits_stats: SyncDurationSlidingStats::new(Duration::from_secs(
258 STAT_SLIDING_WINDOW,
259 )),
260 }
261 }
262
263 pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
265 self.rotator.ban(now, hashes)
266 }
267
268 pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
270 self.rotator.is_banned(hash)
271 }
272
273 pub fn check_is_known(
279 &self,
280 tx_hash: &ExtrinsicHash<B>,
281 ignore_banned: bool,
282 ) -> Result<(), B::Error> {
283 if !ignore_banned && self.is_banned(tx_hash) {
284 Err(error::Error::TemporarilyBanned.into())
285 } else if self.pool.read().is_imported(tx_hash) {
286 Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
287 } else {
288 Ok(())
289 }
290 }
291
292 pub fn submit(
294 &self,
295 txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
296 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
297 let results = txs
298 .into_iter()
299 .map(|validated_tx| self.submit_one(validated_tx))
300 .collect::<Vec<_>>();
301
302 let removed = if results.iter().any(|res| res.is_ok()) {
304 let start = Instant::now();
305 let removed = self.enforce_limits();
306 insert_and_log_throttled_sync!(
307 Level::DEBUG,
308 target:"txpool",
309 prefix:"enforce_limits_stats",
310 self.enforce_limits_stats,
311 start.elapsed().into()
312 );
313 removed
314 } else {
315 Default::default()
316 };
317
318 results
319 .into_iter()
320 .map(|res| match res {
321 Ok(outcome) if removed.contains(&outcome.hash) => {
322 Err(error::Error::ImmediatelyDropped.into())
323 },
324 other => other,
325 })
326 .collect()
327 }
328
329 fn submit_one(
331 &self,
332 tx: ValidatedTransactionFor<B>,
333 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
334 match tx {
335 ValidatedTransaction::Valid(tx) => {
336 let priority = tx.priority;
337 trace!(
338 target: LOG_TARGET,
339 tx_hash = ?tx.hash,
340 "ValidatedPool::submit_one"
341 );
342 if !tx.propagate && !(self.is_validator.0)() {
343 return Err(error::Error::Unactionable.into());
344 }
345
346 let imported = self.pool.write().import(tx)?;
347
348 if let base::Imported::Ready { ref hash, .. } = imported {
349 let sinks = &mut self.import_notification_sinks.lock();
350 sinks.retain_mut(|sink| match sink.try_send(*hash) {
351 Ok(()) => true,
352 Err(e) => {
353 if e.is_full() {
354 warn!(
355 target: LOG_TARGET,
356 tx_hash = ?hash,
357 "Trying to notify an import but the channel is full"
358 );
359 true
360 } else {
361 false
362 }
363 },
364 });
365 }
366
367 let mut event_dispatcher = self.event_dispatcher.write();
368 fire_events(&mut *event_dispatcher, &imported);
369 Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
370 },
371 ValidatedTransaction::Invalid(tx_hash, error) => {
372 trace!(
373 target: LOG_TARGET,
374 ?tx_hash,
375 ?error,
376 "ValidatedPool::submit_one invalid"
377 );
378 self.rotator.ban(&Instant::now(), std::iter::once(tx_hash));
379 Err(error)
380 },
381 ValidatedTransaction::Unknown(tx_hash, error) => {
382 trace!(
383 target: LOG_TARGET,
384 ?tx_hash,
385 ?error,
386 "ValidatedPool::submit_one unknown"
387 );
388 self.event_dispatcher.write().invalid(&tx_hash);
389 Err(error)
390 },
391 }
392 }
393
394 fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
395 let status = self.pool.read().status();
396 let ready_limit = &self.options.ready;
397 let future_limit = &self.options.future;
398
399 if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
400 future_limit.is_exceeded(status.future, status.future_bytes)
401 {
402 trace!(
403 target: LOG_TARGET,
404 ready_count = ready_limit.count,
405 ready_kb = ready_limit.total_bytes / 1024,
406 future_count = future_limit.count,
407 future_kb = future_limit.total_bytes / 1024,
408 "Enforcing limits"
409 );
410
411 let removed = {
413 let mut pool = self.pool.write();
414 let removed = pool
415 .enforce_limits(ready_limit, future_limit)
416 .into_iter()
417 .map(|x| x.hash)
418 .collect::<HashSet<_>>();
419 self.rotator.ban(&Instant::now(), removed.iter().copied());
421 removed
422 };
423 if !removed.is_empty() {
424 trace!(
425 target: LOG_TARGET,
426 dropped_count = removed.len(),
427 "Enforcing limits"
428 );
429 }
430
431 let mut event_dispatcher = self.event_dispatcher.write();
433 for h in &removed {
434 event_dispatcher.limits_enforced(h);
435 }
436
437 removed
438 } else {
439 Default::default()
440 }
441 }
442
443 pub fn submit_and_watch(
445 &self,
446 tx: ValidatedTransactionFor<B>,
447 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
448 match tx {
449 ValidatedTransaction::Valid(tx) => {
450 let hash = self.api.hash_and_length(&tx.data).0;
451 let watcher = self.create_watcher(hash);
452 self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
453 .pop()
454 .expect("One extrinsic passed; one result returned; qed")
455 .map(|outcome| outcome.with_watcher(watcher))
456 },
457 ValidatedTransaction::Invalid(hash, err) => {
458 self.rotator.ban(&Instant::now(), std::iter::once(hash));
459 Err(err)
460 },
461 ValidatedTransaction::Unknown(_, err) => Err(err),
462 }
463 }
464
465 pub fn create_watcher(
467 &self,
468 tx_hash: ExtrinsicHash<B>,
469 ) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> {
470 self.event_dispatcher.write().create_watcher(tx_hash)
471 }
472
473 pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> {
475 self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect()
476 }
477
478 pub fn resubmit(
483 &self,
484 mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
485 ) {
486 #[derive(Debug, Clone, Copy, PartialEq)]
487 enum Status {
488 Future,
489 Ready,
490 Failed,
491 Dropped,
492 }
493
494 let (mut initial_statuses, final_statuses) = {
495 let mut pool = self.pool.write();
496
497 let mut initial_statuses = HashMap::new();
505 let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
506 while !updated_transactions.is_empty() {
507 let hash = updated_transactions
508 .keys()
509 .next()
510 .cloned()
511 .expect("transactions is not empty; qed");
512
513 let removed = pool.remove_subtree(&[hash]);
517 for removed_tx in removed {
518 let removed_hash = removed_tx.hash;
519 let updated_transaction = updated_transactions.shift_remove(&removed_hash);
520 let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
521 updated_tx
522 } else {
523 let transaction = match Arc::try_unwrap(removed_tx) {
526 Ok(transaction) => transaction,
527 Err(transaction) => transaction.duplicate(),
528 };
529 ValidatedTransaction::Valid(transaction)
530 };
531
532 initial_statuses.insert(removed_hash, Status::Ready);
533 txs_to_resubmit.push((removed_hash, tx_to_resubmit));
534 }
535 updated_transactions.shift_remove(&hash);
537 }
538
539 pool.with_futures_enabled(|pool, reject_future_transactions| {
544 let mut final_statuses = HashMap::new();
546 for (tx_hash, tx_to_resubmit) in txs_to_resubmit {
547 match tx_to_resubmit {
548 ValidatedTransaction::Valid(tx) => match pool.import(tx) {
549 Ok(imported) => match imported {
550 base::Imported::Ready { promoted, failed, removed, .. } => {
551 final_statuses.insert(tx_hash, Status::Ready);
552 for hash in promoted {
553 final_statuses.insert(hash, Status::Ready);
554 }
555 for hash in failed {
556 final_statuses.insert(hash, Status::Failed);
557 }
558 for tx in removed {
559 final_statuses.insert(tx.hash, Status::Dropped);
560 }
561 },
562 base::Imported::Future { .. } => {
563 final_statuses.insert(tx_hash, Status::Future);
564 },
565 },
566 Err(error) => {
567 warn!(
572 target: LOG_TARGET,
573 ?tx_hash,
574 %error,
575 "Removing invalid transaction from update"
576 );
577 final_statuses.insert(tx_hash, Status::Failed);
578 },
579 },
580 ValidatedTransaction::Invalid(_, _) |
581 ValidatedTransaction::Unknown(_, _) => {
582 final_statuses.insert(tx_hash, Status::Failed);
583 },
584 }
585 }
586
587 if reject_future_transactions {
590 for future_tx in pool.clear_future() {
591 final_statuses.insert(future_tx.hash, Status::Dropped);
592 }
593 }
594
595 (initial_statuses, final_statuses)
596 })
597 };
598
599 let mut event_dispatcher = self.event_dispatcher.write();
601 for (hash, final_status) in final_statuses {
602 let initial_status = initial_statuses.remove(&hash);
603 if initial_status.is_none() || Some(final_status) != initial_status {
604 match final_status {
605 Status::Future => event_dispatcher.future(&hash),
606 Status::Ready => event_dispatcher.ready(&hash, None),
607 Status::Dropped => event_dispatcher.dropped(&hash),
608 Status::Failed => event_dispatcher.invalid(&hash),
609 }
610 }
611 }
612 }
613
614 pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
616 self.pool
617 .read()
618 .by_hashes(hashes)
619 .into_iter()
620 .map(|existing_in_pool| {
621 existing_in_pool.map(|transaction| transaction.provides.to_vec())
622 })
623 .collect()
624 }
625
626 pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
628 self.pool.read().ready_by_hash(hash)
629 }
630
631 pub fn prune_tags(
633 &self,
634 tags: impl IntoIterator<Item = Tag>,
635 ) -> PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>> {
636 let status = self.pool.write().prune_tags(tags);
638 {
641 let mut event_dispatcher = self.event_dispatcher.write();
642 for promoted in &status.promoted {
643 fire_events(&mut *event_dispatcher, promoted);
644 }
645 for f in &status.failed {
646 event_dispatcher.dropped(f);
647 }
648 }
649
650 status
651 }
652
653 pub fn resubmit_pruned(
655 &self,
656 at: &HashAndNumber<B::Block>,
657 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
658 pruned_hashes: Vec<ExtrinsicHash<B>>,
659 pruned_xts: Vec<ValidatedTransactionFor<B>>,
660 ) {
661 debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
662
663 let results = self.submit(pruned_xts);
665
666 let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
669 match r.map_err(error::IntoPoolError::into_pool_error) {
670 Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
671 _ => None,
672 }
673 });
674 let hashes = hashes.chain(known_imported_hashes.into_iter());
677 self.fire_pruned(at, hashes);
678
679 self.clear_stale(at);
682 }
683
684 pub fn fire_pruned(
686 &self,
687 at: &HashAndNumber<B::Block>,
688 hashes: impl Iterator<Item = ExtrinsicHash<B>>,
689 ) {
690 let mut event_dispatcher = self.event_dispatcher.write();
691 let mut set = HashSet::with_capacity(hashes.size_hint().0);
692 for h in hashes {
693 if !set.contains(&h) {
696 event_dispatcher.pruned(at.hash, &h);
697 set.insert(h);
698 }
699 }
700 }
701
702 pub fn clear_stale(&self, at: &HashAndNumber<B::Block>) {
708 let HashAndNumber { number, .. } = *at;
709 let number = number.saturated_into::<u64>();
710 let now = Instant::now();
711 let to_remove = {
712 self.ready()
713 .filter(|tx| self.rotator.ban_if_stale(&now, number, tx))
714 .map(|tx| tx.hash)
715 .collect::<Vec<_>>()
716 };
717 let futures_to_remove: Vec<ExtrinsicHash<B>> = {
718 let p = self.pool.read();
719 let mut hashes = Vec::new();
720 for tx in p.futures() {
721 if self.rotator.ban_if_stale(&now, number, tx) {
722 hashes.push(tx.hash);
723 }
724 }
725 hashes
726 };
727 debug!(
728 target:LOG_TARGET,
729 to_remove_len=to_remove.len(),
730 futures_to_remove_len=futures_to_remove.len(),
731 "clear_stale"
732 );
733 self.remove_invalid(&to_remove);
735 self.remove_invalid(&futures_to_remove);
736 self.rotator.clear_timeouts(&now);
738 }
739
740 pub fn api(&self) -> &B {
742 &self.api
743 }
744
745 pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
750 const CHANNEL_BUFFER_SIZE: usize = 1024;
751
752 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
753 self.import_notification_sinks.lock().push(sink);
754 stream
755 }
756
757 pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
759 let mut event_dispatcher = self.event_dispatcher.write();
760 for (hash, peers) in propagated.into_iter() {
761 event_dispatcher.broadcasted(&hash, peers);
762 }
763 }
764
765 pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
777 if hashes.is_empty() {
779 return vec![];
780 }
781
782 let invalid = self.remove_subtree(hashes, true, |listener, removed_tx_hash| {
783 listener.invalid(&removed_tx_hash);
784 });
785
786 trace!(
787 target: LOG_TARGET,
788 removed_count = hashes.len(),
789 invalid_count = invalid.len(),
790 "Removed invalid transactions"
791 );
792 log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
793
794 invalid
795 }
796
797 pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
799 self.pool.read().ready()
800 }
801
802 pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
804 self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
805 }
806
807 pub fn status(&self) -> PoolStatus {
809 self.pool.read().status()
810 }
811
812 pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
814 trace!(
815 target: LOG_TARGET,
816 ?block_hash,
817 "Attempting to notify watchers of finalization"
818 );
819 self.event_dispatcher.write().finalized(block_hash);
820 Ok(())
821 }
822
823 pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
825 self.event_dispatcher.write().retracted(block_hash)
826 }
827
828 pub fn retrigger_notifications(&self) {
833 let pool = self.pool.read();
834 let mut event_dispatcher = self.event_dispatcher.write();
835 pool.ready().for_each(|r| {
836 event_dispatcher.ready(&r.hash, None);
837 });
838 pool.futures().for_each(|f| {
839 event_dispatcher.future(&f.hash);
840 });
841 }
842
843 pub fn remove_subtree<F>(
858 &self,
859 hashes: &[ExtrinsicHash<B>],
860 ban_transactions: bool,
861 event_dispatcher_action: F,
862 ) -> Vec<TransactionFor<B>>
863 where
864 F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>),
865 {
866 if ban_transactions {
868 self.rotator.ban(&Instant::now(), hashes.iter().cloned());
869 };
870 let removed = self.pool.write().remove_subtree(hashes);
871
872 removed
873 .into_iter()
874 .map(|tx| {
875 let removed_tx_hash = tx.hash;
876 let mut event_dispatcher = self.event_dispatcher.write();
877 event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash);
878 tx.clone()
879 })
880 .collect::<Vec<_>>()
881 }
882}
883
884fn fire_events<B, L, Ex>(
885 event_dispatcher: &mut EventDispatcher<B, L>,
886 imported: &base::Imported<ExtrinsicHash<B>, Ex>,
887) where
888 B: ChainApi,
889 L: EventHandler<B>,
890{
891 match *imported {
892 base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
893 event_dispatcher.ready(hash, None);
894 failed.iter().for_each(|f| event_dispatcher.invalid(f));
895 removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash));
896 promoted.iter().for_each(|p| event_dispatcher.ready(p, None));
897 },
898 base::Imported::Future { ref hash } => event_dispatcher.future(hash),
899 }
900}