referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/transaction/
transaction.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for submitting transactions.
20
21use crate::{
22	transaction::{
23		api::TransactionApiServer,
24		error::Error,
25		event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent},
26	},
27	SubscriptionTaskExecutor,
28};
29
30use codec::Decode;
31use futures::{StreamExt, TryFutureExt};
32use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33
34use super::metrics::{InstanceMetrics, Metrics};
35
36use sc_rpc::utils::{RingBuffer, Subscription};
37use sc_transaction_pool_api::{
38	error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
39	TransactionStatus,
40};
41use sp_blockchain::HeaderBackend;
42use sp_core::Bytes;
43use sp_runtime::traits::Block as BlockT;
44use std::sync::Arc;
45
46pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
47
48/// An API for transaction RPC calls.
49pub struct Transaction<Pool, Client> {
50	/// Substrate client.
51	client: Arc<Client>,
52	/// Transactions pool.
53	pool: Arc<Pool>,
54	/// Executor to spawn subscriptions.
55	executor: SubscriptionTaskExecutor,
56	/// Metrics for transactions.
57	metrics: Option<Metrics>,
58}
59
60impl<Pool, Client> Transaction<Pool, Client> {
61	/// Creates a new [`Transaction`].
62	pub fn new(
63		client: Arc<Client>,
64		pool: Arc<Pool>,
65		executor: SubscriptionTaskExecutor,
66		metrics: Option<Metrics>,
67	) -> Self {
68		Transaction { client, pool, executor, metrics }
69	}
70}
71
72/// Currently we treat all RPC transactions as externals.
73///
74/// Possibly in the future we could allow opt-in for special treatment
75/// of such transactions, so that the block authors can inject
76/// some unique transactions via RPC and have them included in the pool.
77const TX_SOURCE: TransactionSource = TransactionSource::External;
78
79#[async_trait]
80impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
81where
82	Pool: TransactionPool + Sync + Send + 'static,
83	Pool::Hash: Unpin,
84	<Pool::Block as BlockT>::Hash: Unpin,
85	Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
86{
87	fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
88		let client = self.client.clone();
89		let pool = self.pool.clone();
90
91		// Get a new transaction metrics instance and increment the counter.
92		let mut metrics = InstanceMetrics::new(self.metrics.clone());
93
94		let fut = async move {
95			let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
96				Ok(decoded_extrinsic) => decoded_extrinsic,
97				Err(e) => {
98					log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);
99
100					let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
101
102					let event = TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
103						error: "Extrinsic bytes cannot be decoded".into(),
104					});
105
106					metrics.register_event(&event);
107
108					// The transaction is invalid.
109					let _ = sink.send(&event).await;
110					return
111				},
112			};
113
114			let best_block_hash = client.info().best_hash;
115
116			let submit = pool
117				.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
118				.map_err(|e| {
119					e.into_pool_error()
120						.map(Error::from)
121						.unwrap_or_else(|e| Error::Verification(Box::new(e)))
122				});
123
124			let Ok(sink) = pending.accept().await.map(Subscription::from) else {
125				return;
126			};
127
128			match submit.await {
129				Ok(stream) => {
130					let stream = stream
131						.filter_map(|event| {
132							let event = handle_event(event);
133
134							event.as_ref().inspect(|event| {
135								metrics.register_event(event);
136							});
137
138							async move { event }
139						})
140						.boxed();
141
142					// If the subscription is too slow older events will be overwritten.
143					sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
144				},
145				Err(err) => {
146					// We have not created an `Watcher` for the tx. Make sure the
147					// error is still propagated as an event.
148					let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
149
150					metrics.register_event(&event);
151
152					_ = sink.send(&event).await;
153				},
154			};
155		};
156
157		sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
158	}
159}
160
161/// Handle events generated by the transaction-pool and convert them
162/// to the new API expected state.
163#[inline]
164fn handle_event<Hash: Clone, BlockHash: Clone>(
165	event: TransactionStatus<Hash, BlockHash>,
166) -> Option<TransactionEvent<BlockHash>> {
167	match event {
168		TransactionStatus::Ready | TransactionStatus::Future =>
169			Some(TransactionEvent::<BlockHash>::Validated),
170		TransactionStatus::InBlock((hash, index)) =>
171			Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { hash, index }))),
172		TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
173		TransactionStatus::FinalityTimeout(_) =>
174			Some(TransactionEvent::Dropped(TransactionDropped {
175				error: "Maximum number of finality watchers has been reached".into(),
176			})),
177		TransactionStatus::Finalized((hash, index)) =>
178			Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
179		TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
180			error: "Extrinsic was rendered invalid by another extrinsic".into(),
181		})),
182		TransactionStatus::Dropped => Some(TransactionEvent::Dropped(TransactionDropped {
183			error: "Extrinsic dropped from the pool due to exceeding limits".into(),
184		})),
185		TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
186			error: "Extrinsic marked as invalid".into(),
187		})),
188		// These are the events that are not supported by the new API.
189		TransactionStatus::Broadcast(_) => None,
190	}
191}