sc_rpc_spec_v2/transaction/
transaction.rs1use crate::{
22 transaction::{
23 api::TransactionApiServer,
24 error::Error,
25 event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent},
26 },
27 SubscriptionTaskExecutor,
28};
29
30use codec::Decode;
31use futures::{StreamExt, TryFutureExt};
32use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33use sc_rpc::utils::{RingBuffer, Subscription};
34use sc_transaction_pool_api::{
35 error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
36 TransactionStatus,
37};
38use sp_blockchain::HeaderBackend;
39use sp_core::Bytes;
40use sp_runtime::traits::Block as BlockT;
41use std::sync::Arc;
42
43pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
44
45pub struct Transaction<Pool, Client> {
47 client: Arc<Client>,
49 pool: Arc<Pool>,
51 executor: SubscriptionTaskExecutor,
53}
54
55impl<Pool, Client> Transaction<Pool, Client> {
56 pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
58 Transaction { client, pool, executor }
59 }
60}
61
62const TX_SOURCE: TransactionSource = TransactionSource::External;
68
69#[async_trait]
70impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
71where
72 Pool: TransactionPool + Sync + Send + 'static,
73 Pool::Hash: Unpin,
74 <Pool::Block as BlockT>::Hash: Unpin,
75 Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
76{
77 fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
78 let client = self.client.clone();
79 let pool = self.pool.clone();
80
81 let fut = async move {
82 let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
83 Ok(decoded_extrinsic) => decoded_extrinsic,
84 Err(e) => {
85 log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);
86
87 let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
88
89 let _ = sink
91 .send(&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
92 error: "Extrinsic bytes cannot be decoded".into(),
93 }))
94 .await;
95 return
96 },
97 };
98
99 let best_block_hash = client.info().best_hash;
100
101 let submit = pool
102 .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
103 .map_err(|e| {
104 e.into_pool_error()
105 .map(Error::from)
106 .unwrap_or_else(|e| Error::Verification(Box::new(e)))
107 });
108
109 let Ok(sink) = pending.accept().await.map(Subscription::from) else {
110 return;
111 };
112
113 match submit.await {
114 Ok(stream) => {
115 let stream =
116 stream.filter_map(move |event| async move { handle_event(event) }).boxed();
117
118 sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
120 },
121 Err(err) => {
122 let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
125 _ = sink.send(&event).await;
126 },
127 };
128 };
129
130 sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
131 }
132}
133
134#[inline]
137pub fn handle_event<Hash: Clone, BlockHash: Clone>(
138 event: TransactionStatus<Hash, BlockHash>,
139) -> Option<TransactionEvent<BlockHash>> {
140 match event {
141 TransactionStatus::Ready | TransactionStatus::Future =>
142 Some(TransactionEvent::<BlockHash>::Validated),
143 TransactionStatus::InBlock((hash, index)) =>
144 Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { hash, index }))),
145 TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
146 TransactionStatus::FinalityTimeout(_) =>
147 Some(TransactionEvent::Dropped(TransactionDropped {
148 error: "Maximum number of finality watchers has been reached".into(),
149 })),
150 TransactionStatus::Finalized((hash, index)) =>
151 Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
152 TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
153 error: "Extrinsic was rendered invalid by another extrinsic".into(),
154 })),
155 TransactionStatus::Dropped => Some(TransactionEvent::Dropped(TransactionDropped {
156 error: "Extrinsic dropped from the pool due to exceeding limits".into(),
157 })),
158 TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
159 error: "Extrinsic marked as invalid".into(),
160 })),
161 TransactionStatus::Broadcast(_) => None,
163 }
164}