sc_transaction_pool/
api.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Chain api required for the transaction pool.
20
21use crate::LOG_TARGET;
22use codec::Encode;
23use futures::{
24	channel::{mpsc, oneshot},
25	future::{ready, Future, FutureExt, Ready},
26	lock::Mutex,
27	SinkExt, StreamExt,
28};
29use std::{marker::PhantomData, pin::Pin, sync::Arc};
30
31use prometheus_endpoint::Registry as PrometheusRegistry;
32use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
33use sp_api::{ApiExt, ProvideRuntimeApi};
34use sp_blockchain::{HeaderMetadata, TreeRoute};
35use sp_core::traits::SpawnEssentialNamed;
36use sp_runtime::{
37	generic::BlockId,
38	traits::{self, Block as BlockT, BlockIdTo},
39	transaction_validity::{TransactionSource, TransactionValidity},
40};
41use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
42
43use crate::{
44	error::{self, Error},
45	graph,
46	metrics::{ApiMetrics, ApiMetricsExt},
47};
48
49/// The transaction pool logic for full client.
50pub struct FullChainApi<Client, Block> {
51	client: Arc<Client>,
52	_marker: PhantomData<Block>,
53	metrics: Option<Arc<ApiMetrics>>,
54	validation_pool: Arc<Mutex<mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
55}
56
57/// Spawn a validation task that will be used by the transaction pool to validate transactions.
58fn spawn_validation_pool_task(
59	name: &'static str,
60	receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
61	spawner: &impl SpawnEssentialNamed,
62) {
63	spawner.spawn_essential_blocking(
64		name,
65		Some("transaction-pool"),
66		async move {
67			loop {
68				let task = receiver.lock().await.next().await;
69				match task {
70					None => return,
71					Some(task) => task.await,
72				}
73			}
74		}
75		.boxed(),
76	);
77}
78
79impl<Client, Block> FullChainApi<Client, Block> {
80	/// Create new transaction pool logic.
81	pub fn new(
82		client: Arc<Client>,
83		prometheus: Option<&PrometheusRegistry>,
84		spawner: &impl SpawnEssentialNamed,
85	) -> Self {
86		let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
87			Err(err) => {
88				log::warn!(
89					target: LOG_TARGET,
90					"Failed to register transaction pool api prometheus metrics: {:?}",
91					err,
92				);
93				None
94			},
95			Ok(api) => Some(Arc::new(api)),
96		});
97
98		let (sender, receiver) = mpsc::channel(0);
99
100		let receiver = Arc::new(Mutex::new(receiver));
101		spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
102		spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
103
104		FullChainApi {
105			client,
106			validation_pool: Arc::new(Mutex::new(sender)),
107			_marker: Default::default(),
108			metrics,
109		}
110	}
111}
112
113impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
114where
115	Block: BlockT,
116	Client: ProvideRuntimeApi<Block>
117		+ BlockBackend<Block>
118		+ BlockIdTo<Block>
119		+ HeaderBackend<Block>
120		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
121	Client: Send + Sync + 'static,
122	Client::Api: TaggedTransactionQueue<Block>,
123{
124	type Block = Block;
125	type Error = error::Error;
126	type ValidationFuture =
127		Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
128	type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
129
130	fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture {
131		ready(self.client.block_body(hash).map_err(error::Error::from))
132	}
133
134	fn validate_transaction(
135		&self,
136		at: <Self::Block as BlockT>::Hash,
137		source: TransactionSource,
138		uxt: graph::ExtrinsicFor<Self>,
139	) -> Self::ValidationFuture {
140		let (tx, rx) = oneshot::channel();
141		let client = self.client.clone();
142		let validation_pool = self.validation_pool.clone();
143		let metrics = self.metrics.clone();
144
145		async move {
146			metrics.report(|m| m.validations_scheduled.inc());
147
148			validation_pool
149				.lock()
150				.await
151				.send(
152					async move {
153						let res = validate_transaction_blocking(&*client, at, source, uxt);
154						let _ = tx.send(res);
155						metrics.report(|m| m.validations_finished.inc());
156					}
157					.boxed(),
158				)
159				.await
160				.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
161
162			match rx.await {
163				Ok(r) => r,
164				Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
165			}
166		}
167		.boxed()
168	}
169
170	fn block_id_to_number(
171		&self,
172		at: &BlockId<Self::Block>,
173	) -> error::Result<Option<graph::NumberFor<Self>>> {
174		self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
175	}
176
177	fn block_id_to_hash(
178		&self,
179		at: &BlockId<Self::Block>,
180	) -> error::Result<Option<graph::BlockHash<Self>>> {
181		self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
182	}
183
184	fn hash_and_length(
185		&self,
186		ex: &graph::ExtrinsicFor<Self>,
187	) -> (graph::ExtrinsicHash<Self>, usize) {
188		ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
189	}
190
191	fn block_header(
192		&self,
193		hash: <Self::Block as BlockT>::Hash,
194	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
195		self.client.header(hash).map_err(Into::into)
196	}
197
198	fn tree_route(
199		&self,
200		from: <Self::Block as BlockT>::Hash,
201		to: <Self::Block as BlockT>::Hash,
202	) -> Result<TreeRoute<Self::Block>, Self::Error> {
203		sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
204	}
205}
206
207/// Helper function to validate a transaction using a full chain API.
208/// This method will call into the runtime to perform the validation.
209fn validate_transaction_blocking<Client, Block>(
210	client: &Client,
211	at: Block::Hash,
212	source: TransactionSource,
213	uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
214) -> error::Result<TransactionValidity>
215where
216	Block: BlockT,
217	Client: ProvideRuntimeApi<Block>
218		+ BlockBackend<Block>
219		+ BlockIdTo<Block>
220		+ HeaderBackend<Block>
221		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
222	Client: Send + Sync + 'static,
223	Client::Api: TaggedTransactionQueue<Block>,
224{
225	sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
226	{
227		let runtime_api = client.runtime_api();
228		let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
229			runtime_api
230				.api_version::<dyn TaggedTransactionQueue<Block>>(at)
231				.map_err(|e| Error::RuntimeApi(e.to_string()))?
232				.ok_or_else(|| Error::RuntimeApi(
233					format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
234				))
235		}?;
236
237		use sp_api::Core;
238
239		sp_tracing::within_span!(
240			sp_tracing::Level::TRACE, "runtime::validate_transaction";
241		{
242			if api_version >= 3 {
243				runtime_api.validate_transaction(at, source, uxt, at)
244					.map_err(|e| Error::RuntimeApi(e.to_string()))
245			} else {
246				let block_number = client.to_number(&BlockId::Hash(at))
247					.map_err(|e| Error::RuntimeApi(e.to_string()))?
248					.ok_or_else(||
249						Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
250					)?;
251
252				// The old versions require us to call `initialize_block` before.
253				runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
254					block_number + sp_runtime::traits::One::one(),
255					Default::default(),
256					Default::default(),
257					at,
258					Default::default()),
259				).map_err(|e| Error::RuntimeApi(e.to_string()))?;
260
261				if api_version == 2 {
262					#[allow(deprecated)] // old validate_transaction
263					runtime_api.validate_transaction_before_version_3(at, source, uxt)
264						.map_err(|e| Error::RuntimeApi(e.to_string()))
265				} else {
266					#[allow(deprecated)] // old validate_transaction
267					runtime_api.validate_transaction_before_version_2(at, uxt)
268						.map_err(|e| Error::RuntimeApi(e.to_string()))
269				}
270			}
271		})
272	})
273}
274
275impl<Client, Block> FullChainApi<Client, Block>
276where
277	Block: BlockT,
278	Client: ProvideRuntimeApi<Block>
279		+ BlockBackend<Block>
280		+ BlockIdTo<Block>
281		+ HeaderBackend<Block>
282		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
283	Client: Send + Sync + 'static,
284	Client::Api: TaggedTransactionQueue<Block>,
285{
286	/// Validates a transaction by calling into the runtime, same as
287	/// `validate_transaction` but blocks the current thread when performing
288	/// validation. Only implemented for `FullChainApi` since we can call into
289	/// the runtime locally.
290	pub fn validate_transaction_blocking(
291		&self,
292		at: Block::Hash,
293		source: TransactionSource,
294		uxt: graph::ExtrinsicFor<Self>,
295	) -> error::Result<TransactionValidity> {
296		validate_transaction_blocking(&*self.client, at, source, uxt)
297	}
298}