sc_rpc_spec_v2/transaction/
transaction_broadcast.rs1use crate::{
22 common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
23 SubscriptionTaskExecutor,
24};
25use codec::Decode;
26use futures::{FutureExt, Stream, StreamExt};
27use futures_util::stream::AbortHandle;
28use jsonrpsee::{
29 core::{async_trait, RpcResult},
30 ConnectionId, Extensions,
31};
32use parking_lot::RwLock;
33use rand::{distributions::Alphanumeric, Rng};
34use sc_client_api::BlockchainEvents;
35use sc_transaction_pool_api::{
36 error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource,
37};
38use sp_blockchain::HeaderBackend;
39use sp_core::Bytes;
40use sp_runtime::traits::Block as BlockT;
41use std::{collections::HashMap, sync::Arc};
42
43use super::error::ErrorBroadcast;
44
45pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
47 client: Arc<Client>,
49 pool: Arc<Pool>,
51 executor: SubscriptionTaskExecutor,
53 broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
55 rpc_connections: RpcConnections,
57}
58
59struct BroadcastState<Pool: TransactionPool> {
61 handle: AbortHandle,
63 tx_hash: <Pool as TransactionPool>::Hash,
65}
66
67impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
68 pub fn new(
70 client: Arc<Client>,
71 pool: Arc<Pool>,
72 executor: SubscriptionTaskExecutor,
73 max_transactions_per_connection: usize,
74 ) -> Self {
75 TransactionBroadcast {
76 client,
77 pool,
78 executor,
79 broadcast_ids: Default::default(),
80 rpc_connections: RpcConnections::new(max_transactions_per_connection),
81 }
82 }
83
84 pub fn generate_unique_id(&self) -> String {
86 let generate_operation_id = || {
87 const OPERATION_ID_LEN: usize = 16;
89
90 rand::thread_rng()
91 .sample_iter(Alphanumeric)
92 .take(OPERATION_ID_LEN)
93 .map(char::from)
94 .collect::<String>()
95 };
96
97 let mut id = generate_operation_id();
98
99 let broadcast_ids = self.broadcast_ids.read();
100
101 while broadcast_ids.contains_key(&id) {
102 id = generate_operation_id();
103 }
104
105 id
106 }
107}
108
109const TX_SOURCE: TransactionSource = TransactionSource::External;
115
116#[async_trait]
117impl<Pool, Client> TransactionBroadcastApiServer for TransactionBroadcast<Pool, Client>
118where
119 Pool: TransactionPool + Sync + Send + 'static,
120 Pool::Error: IntoPoolError,
121 <Pool::Block as BlockT>::Hash: Unpin,
122 Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
123{
124 async fn broadcast(&self, ext: &Extensions, bytes: Bytes) -> RpcResult<Option<String>> {
125 let pool = self.pool.clone();
126 let conn_id = ext
127 .get::<ConnectionId>()
128 .copied()
129 .expect("ConnectionId is always set by jsonrpsee; qed");
130
131 let id = self.generate_unique_id();
133
134 let Some(reserved_connection) = self.rpc_connections.reserve_space(conn_id) else {
136 return Ok(None)
137 };
138 let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
139 return Ok(None)
141 };
142
143 let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
148 return Ok(Some(id));
149 };
150 let tx_hash = pool.hash_of(&decoded_extrinsic);
152
153 let mut best_block_import_stream: std::pin::Pin<
156 Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
157 > =
158 Box::pin(self.client.import_notification_stream().filter_map(
159 |notification| async move { notification.is_new_best.then_some(notification.hash) },
160 ));
161
162 let broadcast_transaction_fut = async move {
163 let mut is_done = false;
165
166 while !is_done {
167 let Some(best_block_hash) =
169 last_stream_element(&mut best_block_import_stream).await
170 else {
171 return;
172 };
173
174 let mut stream = match pool
175 .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone())
176 .await
177 {
178 Ok(stream) => stream,
179 Err(e) => {
181 let Ok(pool_err) = e.into_pool_error() else { return };
182
183 if pool_err.is_retriable() {
184 continue
187 } else {
188 return;
189 }
190 },
191 };
192
193 while let Some(event) = stream.next().await {
194 if event.is_retriable() {
197 break;
198 }
199
200 if event.is_final() {
203 is_done = true;
204 break;
205 }
206 }
207 }
208 };
209
210 let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
213 let broadcast_ids = self.broadcast_ids.clone();
214 let drop_id = id.clone();
215 let pool = self.pool.clone();
216 let fut = fut.map(move |result| {
219 drop(reserved_identifier);
221
222 let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
224
225 if result.is_ok() {
227 return
228 }
229
230 pool.remove_invalid(&[broadcast_state.tx_hash]);
232 });
233
234 {
236 let mut broadcast_ids = self.broadcast_ids.write();
237 broadcast_ids.insert(id.clone(), BroadcastState { handle, tx_hash });
238 }
239
240 sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
241
242 Ok(Some(id))
243 }
244
245 async fn stop_broadcast(
246 &self,
247 ext: &Extensions,
248 operation_id: String,
249 ) -> Result<(), ErrorBroadcast> {
250 let conn_id = ext
251 .get::<ConnectionId>()
252 .copied()
253 .expect("ConnectionId is always set by jsonrpsee; qed");
254
255 if !self.rpc_connections.contains_identifier(conn_id, &operation_id) {
257 return Err(ErrorBroadcast::InvalidOperationID)
258 }
259
260 let mut broadcast_ids = self.broadcast_ids.write();
261
262 let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
263 return Err(ErrorBroadcast::InvalidOperationID)
264 };
265
266 broadcast_state.handle.abort();
267
268 Ok(())
269 }
270}
271
272async fn last_stream_element<S>(stream: &mut S) -> Option<S::Item>
274where
275 S: Stream + Unpin,
276{
277 let Some(mut element) = stream.next().await else { return None };
278
279 while let Some(next) = stream.next().now_or_never() {
284 let Some(next) = next else {
285 return None
287 };
288 element = next;
289 }
290
291 Some(element)
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use tokio_stream::wrappers::ReceiverStream;
298
299 #[tokio::test]
300 async fn check_last_stream_element() {
301 let (tx, rx) = tokio::sync::mpsc::channel(16);
302
303 let mut stream = ReceiverStream::new(rx);
304 tx.send(1).await.unwrap();
306 assert_eq!(last_stream_element(&mut stream).await, Some(1));
307
308 tx.send(1).await.unwrap();
310 tx.send(2).await.unwrap();
311 tx.send(3).await.unwrap();
312 assert_eq!(last_stream_element(&mut stream).await, Some(3));
313
314 tx.send(1).await.unwrap();
316 tx.send(2).await.unwrap();
317 drop(tx);
318 assert_eq!(last_stream_element(&mut stream).await, None);
319 }
320}