sc_rpc_spec_v2/transaction/
transaction.rs1use crate::{
22 transaction::{
23 api::TransactionApiServer,
24 error::Error,
25 event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent},
26 },
27 SubscriptionTaskExecutor,
28};
29
30use codec::Decode;
31use futures::{StreamExt, TryFutureExt};
32use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33
34use super::metrics::{InstanceMetrics, Metrics};
35
36use sc_rpc::utils::{RingBuffer, Subscription};
37use sc_transaction_pool_api::{
38 error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
39 TransactionStatus,
40};
41use sp_blockchain::HeaderBackend;
42use sp_core::Bytes;
43use sp_runtime::traits::Block as BlockT;
44use std::sync::Arc;
45
46pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
47
48pub struct Transaction<Pool, Client> {
50 client: Arc<Client>,
52 pool: Arc<Pool>,
54 executor: SubscriptionTaskExecutor,
56 metrics: Option<Metrics>,
58}
59
60impl<Pool, Client> Transaction<Pool, Client> {
61 pub fn new(
63 client: Arc<Client>,
64 pool: Arc<Pool>,
65 executor: SubscriptionTaskExecutor,
66 metrics: Option<Metrics>,
67 ) -> Self {
68 Transaction { client, pool, executor, metrics }
69 }
70}
71
72const TX_SOURCE: TransactionSource = TransactionSource::External;
78
79#[async_trait]
80impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
81where
82 Pool: TransactionPool + Sync + Send + 'static,
83 Pool::Hash: Unpin,
84 <Pool::Block as BlockT>::Hash: Unpin,
85 Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
86{
87 fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
88 let client = self.client.clone();
89 let pool = self.pool.clone();
90
91 let mut metrics = InstanceMetrics::new(self.metrics.clone());
93
94 let fut = async move {
95 let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
96 Ok(decoded_extrinsic) => decoded_extrinsic,
97 Err(e) => {
98 log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);
99
100 let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
101
102 let event = TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
103 error: "Extrinsic bytes cannot be decoded".into(),
104 });
105
106 metrics.register_event(&event);
107
108 let _ = sink.send(&event).await;
110 return
111 },
112 };
113
114 let best_block_hash = client.info().best_hash;
115
116 let submit = pool
117 .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
118 .map_err(|e| {
119 e.into_pool_error()
120 .map(Error::from)
121 .unwrap_or_else(|e| Error::Verification(Box::new(e)))
122 });
123
124 let Ok(sink) = pending.accept().await.map(Subscription::from) else {
125 return;
126 };
127
128 match submit.await {
129 Ok(stream) => {
130 let stream = stream
131 .filter_map(|event| {
132 let event = handle_event(event);
133
134 event.as_ref().inspect(|event| {
135 metrics.register_event(event);
136 });
137
138 async move { event }
139 })
140 .boxed();
141
142 sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
144 },
145 Err(err) => {
146 let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
149
150 metrics.register_event(&event);
151
152 _ = sink.send(&event).await;
153 },
154 };
155 };
156
157 sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
158 }
159}
160
161#[inline]
164fn handle_event<Hash: Clone, BlockHash: Clone>(
165 event: TransactionStatus<Hash, BlockHash>,
166) -> Option<TransactionEvent<BlockHash>> {
167 match event {
168 TransactionStatus::Ready | TransactionStatus::Future =>
169 Some(TransactionEvent::<BlockHash>::Validated),
170 TransactionStatus::InBlock((hash, index)) =>
171 Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { hash, index }))),
172 TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
173 TransactionStatus::FinalityTimeout(_) =>
174 Some(TransactionEvent::Dropped(TransactionDropped {
175 error: "Maximum number of finality watchers has been reached".into(),
176 })),
177 TransactionStatus::Finalized((hash, index)) =>
178 Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
179 TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
180 error: "Extrinsic was rendered invalid by another extrinsic".into(),
181 })),
182 TransactionStatus::Dropped => Some(TransactionEvent::Dropped(TransactionDropped {
183 error: "Extrinsic dropped from the pool due to exceeding limits".into(),
184 })),
185 TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
186 error: "Extrinsic marked as invalid".into(),
187 })),
188 TransactionStatus::Broadcast(_) => None,
190 }
191}