1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Extrinsics status updates.
use futures::Stream;
use sc_transaction_pool_api::TransactionStatus;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
/// Extrinsic watcher.
///
/// Represents a stream of status updates for a particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H, BH> {
receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
/// transaction hash of watched extrinsic
hash: H,
}
impl<H, BH> Watcher<H, BH> {
/// Returns the transaction hash.
pub fn hash(&self) -> &H {
&self.hash
}
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item = TransactionStatus<H, BH>> {
self.receiver
}
}
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug)]
pub struct Sender<H, BH> {
receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
is_finalized: bool,
}
impl<H, BH> Default for Sender<H, BH> {
fn default() -> Self {
Sender { receivers: Default::default(), is_finalized: false }
}
}
impl<H: Clone, BH: Clone> Sender<H, BH> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
self.receivers.push(tx);
Watcher { receiver, hash }
}
/// Transaction became ready.
pub fn ready(&mut self) {
self.send(TransactionStatus::Ready)
}
/// Transaction was moved to future.
pub fn future(&mut self) {
self.send(TransactionStatus::Future)
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(TransactionStatus::Usurped(hash));
self.is_finalized = true;
}
/// Extrinsic has been included in block with given hash.
pub fn in_block(&mut self, hash: BH, index: usize) {
self.send(TransactionStatus::InBlock((hash, index)));
}
/// Extrinsic has been finalized by a finality gadget.
pub fn finalized(&mut self, hash: BH, index: usize) {
self.send(TransactionStatus::Finalized((hash, index)));
self.is_finalized = true;
}
/// The block this extrinsic was included in has been retracted
pub fn finality_timeout(&mut self, hash: BH) {
self.send(TransactionStatus::FinalityTimeout(hash));
self.is_finalized = true;
}
/// The block this extrinsic was included in has been retracted
pub fn retracted(&mut self, hash: BH) {
self.send(TransactionStatus::Retracted(hash));
}
/// Extrinsic has been marked as invalid by the block builder.
pub fn invalid(&mut self) {
self.send(TransactionStatus::Invalid);
// we mark as finalized as there are no more notifications
self.is_finalized = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
}
/// The extrinsic has been broadcast to the given peers.
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(TransactionStatus::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalized.
pub fn is_done(&self) -> bool {
self.is_finalized || self.receivers.is_empty()
}
fn send(&mut self, status: TransactionStatus<H, BH>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}