sc_rpc_spec_v2/transaction/
transaction_broadcast.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for broadcasting transactions.
20
21use crate::{
22	common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
23	SubscriptionTaskExecutor,
24};
25use codec::Decode;
26use futures::{FutureExt, Stream, StreamExt};
27use futures_util::stream::AbortHandle;
28use jsonrpsee::{
29	core::{async_trait, RpcResult},
30	ConnectionId, Extensions,
31};
32use parking_lot::RwLock;
33use rand::{distributions::Alphanumeric, Rng};
34use sc_client_api::BlockchainEvents;
35use sc_transaction_pool_api::{
36	error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource,
37};
38use sp_blockchain::HeaderBackend;
39use sp_core::Bytes;
40use sp_runtime::traits::Block as BlockT;
41use std::{collections::HashMap, sync::Arc};
42
43use super::error::ErrorBroadcast;
44
45/// An API for transaction RPC calls.
46pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
47	/// Substrate client.
48	client: Arc<Client>,
49	/// Transactions pool.
50	pool: Arc<Pool>,
51	/// Executor to spawn subscriptions.
52	executor: SubscriptionTaskExecutor,
53	/// The broadcast operation IDs.
54	broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
55	/// Keep track of how many concurrent operations are active for each connection.
56	rpc_connections: RpcConnections,
57}
58
59/// The state of a broadcast operation.
60struct BroadcastState<Pool: TransactionPool> {
61	/// Handle to abort the running future that broadcasts the transaction.
62	handle: AbortHandle,
63	/// Associated tx hash.
64	tx_hash: <Pool as TransactionPool>::Hash,
65}
66
67impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
68	/// Creates a new [`TransactionBroadcast`].
69	pub fn new(
70		client: Arc<Client>,
71		pool: Arc<Pool>,
72		executor: SubscriptionTaskExecutor,
73		max_transactions_per_connection: usize,
74	) -> Self {
75		TransactionBroadcast {
76			client,
77			pool,
78			executor,
79			broadcast_ids: Default::default(),
80			rpc_connections: RpcConnections::new(max_transactions_per_connection),
81		}
82	}
83
84	/// Generate an unique operation ID for the `transaction_broadcast` RPC method.
85	pub fn generate_unique_id(&self) -> String {
86		let generate_operation_id = || {
87			// The length of the operation ID.
88			const OPERATION_ID_LEN: usize = 16;
89
90			rand::thread_rng()
91				.sample_iter(Alphanumeric)
92				.take(OPERATION_ID_LEN)
93				.map(char::from)
94				.collect::<String>()
95		};
96
97		let mut id = generate_operation_id();
98
99		let broadcast_ids = self.broadcast_ids.read();
100
101		while broadcast_ids.contains_key(&id) {
102			id = generate_operation_id();
103		}
104
105		id
106	}
107}
108
109/// Currently we treat all RPC transactions as externals.
110///
111/// Possibly in the future we could allow opt-in for special treatment
112/// of such transactions, so that the block authors can inject
113/// some unique transactions via RPC and have them included in the pool.
114const TX_SOURCE: TransactionSource = TransactionSource::External;
115
116#[async_trait]
117impl<Pool, Client> TransactionBroadcastApiServer for TransactionBroadcast<Pool, Client>
118where
119	Pool: TransactionPool + Sync + Send + 'static,
120	Pool::Error: IntoPoolError,
121	<Pool::Block as BlockT>::Hash: Unpin,
122	Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
123{
124	async fn broadcast(&self, ext: &Extensions, bytes: Bytes) -> RpcResult<Option<String>> {
125		let pool = self.pool.clone();
126		let conn_id = ext
127			.get::<ConnectionId>()
128			.copied()
129			.expect("ConnectionId is always set by jsonrpsee; qed");
130
131		// The unique ID of this operation.
132		let id = self.generate_unique_id();
133
134		// Ensure that the connection has not reached the maximum number of active operations.
135		let Some(reserved_connection) = self.rpc_connections.reserve_space(conn_id) else {
136			return Ok(None)
137		};
138		let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
139			// This can only happen if the generated operation ID is not unique.
140			return Ok(None)
141		};
142
143		// The JSON-RPC server might check whether the transaction is valid before broadcasting it.
144		// If it does so and if the transaction is invalid, the server should silently do nothing
145		// and the JSON-RPC client is not informed of the problem. Invalid transactions should still
146		// count towards the limit to the number of simultaneously broadcasted transactions.
147		let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
148			return Ok(Some(id));
149		};
150		// Save the tx hash to remove it later.
151		let tx_hash = pool.hash_of(&decoded_extrinsic);
152
153		// The compiler can no longer deduce the type of the stream and complains
154		// about `one type is more general than the other`.
155		let mut best_block_import_stream: std::pin::Pin<
156			Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
157		> =
158			Box::pin(self.client.import_notification_stream().filter_map(
159				|notification| async move { notification.is_new_best.then_some(notification.hash) },
160			));
161
162		let broadcast_transaction_fut = async move {
163			// Flag to determine if the we should broadcast the transaction again.
164			let mut is_done = false;
165
166			while !is_done {
167				// Wait for the last block to become available.
168				let Some(best_block_hash) =
169					last_stream_element(&mut best_block_import_stream).await
170				else {
171					return;
172				};
173
174				let mut stream = match pool
175					.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone())
176					.await
177				{
178					Ok(stream) => stream,
179					// The transaction was not included to the pool.
180					Err(e) => {
181						let Ok(pool_err) = e.into_pool_error() else { return };
182
183						if pool_err.is_retriable() {
184							// Try to resubmit the transaction at a later block for
185							// recoverable errors.
186							continue
187						} else {
188							return;
189						}
190					},
191				};
192
193				while let Some(event) = stream.next().await {
194					// Check if the transaction could be submitted again
195					// at a later time.
196					if event.is_retriable() {
197						break;
198					}
199
200					// Stop if this is the final event of the transaction stream
201					// and the event is not retriable.
202					if event.is_final() {
203						is_done = true;
204						break;
205					}
206				}
207			}
208		};
209
210		// Convert the future into an abortable future, for easily terminating it from the
211		// `transaction_stop` method.
212		let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
213		let broadcast_ids = self.broadcast_ids.clone();
214		let drop_id = id.clone();
215		let pool = self.pool.clone();
216		// The future expected by the executor must be `Future<Output = ()>` instead of
217		// `Future<Output = Result<(), Aborted>>`.
218		let fut = fut.map(move |result| {
219			// Connection space is cleaned when this object is dropped.
220			drop(reserved_identifier);
221
222			// Remove the entry from the broadcast IDs map.
223			let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
224
225			// The broadcast was not stopped.
226			if result.is_ok() {
227				return
228			}
229
230			// Best effort pool removal (tx can already be finalized).
231			pool.remove_invalid(&[broadcast_state.tx_hash]);
232		});
233
234		// Keep track of this entry and the abortable handle.
235		{
236			let mut broadcast_ids = self.broadcast_ids.write();
237			broadcast_ids.insert(id.clone(), BroadcastState { handle, tx_hash });
238		}
239
240		sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
241
242		Ok(Some(id))
243	}
244
245	async fn stop_broadcast(
246		&self,
247		ext: &Extensions,
248		operation_id: String,
249	) -> Result<(), ErrorBroadcast> {
250		let conn_id = ext
251			.get::<ConnectionId>()
252			.copied()
253			.expect("ConnectionId is always set by jsonrpsee; qed");
254
255		// The operation ID must correlate to the same connection ID.
256		if !self.rpc_connections.contains_identifier(conn_id, &operation_id) {
257			return Err(ErrorBroadcast::InvalidOperationID)
258		}
259
260		let mut broadcast_ids = self.broadcast_ids.write();
261
262		let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
263			return Err(ErrorBroadcast::InvalidOperationID)
264		};
265
266		broadcast_state.handle.abort();
267
268		Ok(())
269	}
270}
271
272/// Returns the last element of the provided stream, or `None` if the stream is closed.
273async fn last_stream_element<S>(stream: &mut S) -> Option<S::Item>
274where
275	S: Stream + Unpin,
276{
277	let Some(mut element) = stream.next().await else { return None };
278
279	// We are effectively polling the stream for the last available item at this time.
280	// The `now_or_never` returns `None` if the stream is `Pending`.
281	//
282	// If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`.
283	while let Some(next) = stream.next().now_or_never() {
284		let Some(next) = next else {
285			// Nothing to do if the stream terminated.
286			return None
287		};
288		element = next;
289	}
290
291	Some(element)
292}
293
294#[cfg(test)]
295mod tests {
296	use super::*;
297	use tokio_stream::wrappers::ReceiverStream;
298
299	#[tokio::test]
300	async fn check_last_stream_element() {
301		let (tx, rx) = tokio::sync::mpsc::channel(16);
302
303		let mut stream = ReceiverStream::new(rx);
304		// Check the stream with one element queued.
305		tx.send(1).await.unwrap();
306		assert_eq!(last_stream_element(&mut stream).await, Some(1));
307
308		// Check the stream with multiple elements.
309		tx.send(1).await.unwrap();
310		tx.send(2).await.unwrap();
311		tx.send(3).await.unwrap();
312		assert_eq!(last_stream_element(&mut stream).await, Some(3));
313
314		// Drop the stream with some elements
315		tx.send(1).await.unwrap();
316		tx.send(2).await.unwrap();
317		drop(tx);
318		assert_eq!(last_stream_element(&mut stream).await, None);
319	}
320}