sc_rpc_spec_v2/transaction/
transaction_broadcast.rs1use crate::{
22	common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
23	SubscriptionTaskExecutor,
24};
25use codec::Decode;
26use futures::{FutureExt, Stream, StreamExt};
27use futures_util::stream::AbortHandle;
28use jsonrpsee::{
29	core::{async_trait, RpcResult},
30	ConnectionId, Extensions,
31};
32use parking_lot::RwLock;
33use rand::{distributions::Alphanumeric, Rng};
34use sc_client_api::BlockchainEvents;
35use sc_transaction_pool_api::{
36	error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource,
37};
38use sp_blockchain::HeaderBackend;
39use sp_core::Bytes;
40use sp_runtime::traits::Block as BlockT;
41use std::{collections::HashMap, sync::Arc};
42
43use super::error::ErrorBroadcast;
44
45pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
47	client: Arc<Client>,
49	pool: Arc<Pool>,
51	executor: SubscriptionTaskExecutor,
53	broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
55	rpc_connections: RpcConnections,
57}
58
59struct BroadcastState<Pool: TransactionPool> {
61	handle: AbortHandle,
63	tx_hash: <Pool as TransactionPool>::Hash,
65}
66
67impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
68	pub fn new(
70		client: Arc<Client>,
71		pool: Arc<Pool>,
72		executor: SubscriptionTaskExecutor,
73		max_transactions_per_connection: usize,
74	) -> Self {
75		TransactionBroadcast {
76			client,
77			pool,
78			executor,
79			broadcast_ids: Default::default(),
80			rpc_connections: RpcConnections::new(max_transactions_per_connection),
81		}
82	}
83
84	pub fn generate_unique_id(&self) -> String {
86		let generate_operation_id = || {
87			const OPERATION_ID_LEN: usize = 16;
89
90			rand::thread_rng()
91				.sample_iter(Alphanumeric)
92				.take(OPERATION_ID_LEN)
93				.map(char::from)
94				.collect::<String>()
95		};
96
97		let mut id = generate_operation_id();
98
99		let broadcast_ids = self.broadcast_ids.read();
100
101		while broadcast_ids.contains_key(&id) {
102			id = generate_operation_id();
103		}
104
105		id
106	}
107}
108
109const TX_SOURCE: TransactionSource = TransactionSource::External;
115
116#[async_trait]
117impl<Pool, Client> TransactionBroadcastApiServer for TransactionBroadcast<Pool, Client>
118where
119	Pool: TransactionPool + Sync + Send + 'static,
120	Pool::Error: IntoPoolError,
121	<Pool::Block as BlockT>::Hash: Unpin,
122	Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
123{
124	async fn broadcast(&self, ext: &Extensions, bytes: Bytes) -> RpcResult<Option<String>> {
125		let pool = self.pool.clone();
126		let conn_id = ext
127			.get::<ConnectionId>()
128			.copied()
129			.expect("ConnectionId is always set by jsonrpsee; qed");
130
131		let id = self.generate_unique_id();
133
134		let Some(reserved_connection) = self.rpc_connections.reserve_space(conn_id) else {
136			return Ok(None)
137		};
138		let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
139			return Ok(None)
141		};
142
143		let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
148			return Ok(Some(id));
149		};
150		let tx_hash = pool.hash_of(&decoded_extrinsic);
152
153		let best_hash = self.client.info().best_hash;
161
162		let mut best_block_import_stream: std::pin::Pin<
165			Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
166		> = Box::pin(futures::stream::select(
167			futures::stream::iter(std::iter::once(best_hash)),
168			self.client.import_notification_stream().filter_map(|notification| async move {
169				notification.is_new_best.then_some(notification.hash)
170			}),
171		));
172
173		let broadcast_transaction_fut = async move {
174			let mut is_done = false;
176
177			while !is_done {
178				let Some(best_block_hash) =
180					last_stream_element(&mut best_block_import_stream).await
181				else {
182					return;
183				};
184
185				let mut stream = match pool
186					.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone())
187					.await
188				{
189					Ok(stream) => stream,
190					Err(e) => {
192						let Ok(pool_err) = e.into_pool_error() else { return };
193
194						if pool_err.is_retriable() {
195							continue
198						} else {
199							return;
200						}
201					},
202				};
203
204				while let Some(event) = stream.next().await {
205					if event.is_retriable() {
208						break;
209					}
210
211					if event.is_final() {
214						is_done = true;
215						break;
216					}
217				}
218			}
219		};
220
221		let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
224		let broadcast_ids = self.broadcast_ids.clone();
225		let drop_id = id.clone();
226		let pool = self.pool.clone();
227		let fut = fut.then(move |result| {
230			async move {
231				drop(reserved_identifier);
233
234				let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
236
237				if result.is_ok() {
239					return
240				}
241
242				pool.report_invalid(None, [(broadcast_state.tx_hash, None)].into()).await;
244			}
245		});
246
247		{
249			let mut broadcast_ids = self.broadcast_ids.write();
250			broadcast_ids.insert(id.clone(), BroadcastState { handle, tx_hash });
251		}
252
253		sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
254
255		Ok(Some(id))
256	}
257
258	async fn stop_broadcast(
259		&self,
260		ext: &Extensions,
261		operation_id: String,
262	) -> Result<(), ErrorBroadcast> {
263		let conn_id = ext
264			.get::<ConnectionId>()
265			.copied()
266			.expect("ConnectionId is always set by jsonrpsee; qed");
267
268		if !self.rpc_connections.contains_identifier(conn_id, &operation_id) {
270			return Err(ErrorBroadcast::InvalidOperationID)
271		}
272
273		let mut broadcast_ids = self.broadcast_ids.write();
274
275		let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
276			return Err(ErrorBroadcast::InvalidOperationID)
277		};
278
279		broadcast_state.handle.abort();
280
281		Ok(())
282	}
283}
284
285async fn last_stream_element<S>(stream: &mut S) -> Option<S::Item>
287where
288	S: Stream + Unpin,
289{
290	let Some(mut element) = stream.next().await else { return None };
291
292	while let Some(next) = stream.next().now_or_never() {
297		let Some(next) = next else {
298			return None
300		};
301		element = next;
302	}
303
304	Some(element)
305}
306
307#[cfg(test)]
308mod tests {
309	use super::*;
310	use tokio_stream::wrappers::ReceiverStream;
311
312	#[tokio::test]
313	async fn check_last_stream_element() {
314		let (tx, rx) = tokio::sync::mpsc::channel(16);
315
316		let mut stream = ReceiverStream::new(rx);
317		tx.send(1).await.unwrap();
319		assert_eq!(last_stream_element(&mut stream).await, Some(1));
320
321		tx.send(1).await.unwrap();
323		tx.send(2).await.unwrap();
324		tx.send(3).await.unwrap();
325		assert_eq!(last_stream_element(&mut stream).await, Some(3));
326
327		tx.send(1).await.unwrap();
329		tx.send(2).await.unwrap();
330		drop(tx);
331		assert_eq!(last_stream_element(&mut stream).await, None);
332	}
333}