referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/common/
api.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Chain api required for the transaction pool.
20
21use crate::{
22	common::{sliding_stat::DurationSlidingStats, STAT_SLIDING_WINDOW},
23	graph::ValidateTransactionPriority,
24	insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
25};
26use async_trait::async_trait;
27use codec::Encode;
28use futures::future::{Future, FutureExt};
29use prometheus_endpoint::Registry as PrometheusRegistry;
30use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
31use sp_api::{ApiExt, ProvideRuntimeApi};
32use sp_blockchain::{HeaderMetadata, TreeRoute};
33use sp_core::traits::SpawnEssentialNamed;
34use sp_runtime::{
35	generic::BlockId,
36	traits::{self, Block as BlockT, BlockIdTo},
37	transaction_validity::{TransactionSource, TransactionValidity},
38};
39use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
40use std::{
41	marker::PhantomData,
42	pin::Pin,
43	sync::Arc,
44	time::{Duration, Instant},
45};
46use tokio::sync::{mpsc, oneshot, Mutex};
47
48use super::{
49	error::{self, Error},
50	metrics::{ApiMetrics, ApiMetricsExt},
51};
52use crate::graph;
53use tracing::{trace, warn, Level};
54
55/// The transaction pool logic for full client.
56pub struct FullChainApi<Client, Block> {
57	client: Arc<Client>,
58	_marker: PhantomData<Block>,
59	metrics: Option<Arc<ApiMetrics>>,
60	validation_pool_normal: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
61	validation_pool_maintained: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
62	validate_transaction_normal_stats: DurationSlidingStats,
63	validate_transaction_maintained_stats: DurationSlidingStats,
64}
65
66/// Spawn a validation task that will be used by the transaction pool to validate transactions.
67fn spawn_validation_pool_task(
68	name: &'static str,
69	receiver_normal: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
70	receiver_maintained: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
71	spawner: &impl SpawnEssentialNamed,
72	stats: DurationSlidingStats,
73	blocking_stats: DurationSlidingStats,
74) {
75	spawner.spawn_essential_blocking(
76		name,
77		Some("transaction-pool"),
78		async move {
79			loop {
80				let start = Instant::now();
81
82				let task = {
83					let receiver_maintained = receiver_maintained.clone();
84					let receiver_normal = receiver_normal.clone();
85					tokio::select! {
86						Some(task) = async {
87							receiver_maintained.lock().await.recv().await
88						} => { task }
89						Some(task) = async {
90							receiver_normal.lock().await.recv().await
91						} => { task }
92						else => {
93							return
94						}
95					}
96				};
97
98				let blocking_duration = {
99					let start = Instant::now();
100					task.await;
101					start.elapsed()
102				};
103
104				insert_and_log_throttled!(
105					Level::DEBUG,
106					target:LOG_TARGET_STAT,
107					prefix:format!("validate_transaction_inner_stats"),
108					stats,
109					start.elapsed().into()
110				);
111				insert_and_log_throttled!(
112					Level::DEBUG,
113					target:LOG_TARGET_STAT,
114					prefix:format!("validate_transaction_blocking_stats"),
115					blocking_stats,
116					blocking_duration.into()
117				);
118				trace!(target:LOG_TARGET, duration=?start.elapsed(), "spawn_validation_pool_task");
119			}
120		}
121		.boxed(),
122	);
123}
124
125impl<Client, Block> FullChainApi<Client, Block> {
126	/// Create new transaction pool logic.
127	pub fn new(
128		client: Arc<Client>,
129		prometheus: Option<&PrometheusRegistry>,
130		spawner: &impl SpawnEssentialNamed,
131	) -> Self {
132		let stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
133		let blocking_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
134
135		let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
136			Err(error) => {
137				warn!(
138					target: LOG_TARGET,
139					?error,
140					"Failed to register transaction pool API Prometheus metrics"
141				);
142				None
143			},
144			Ok(api) => Some(Arc::new(api)),
145		});
146
147		let (sender, receiver) = mpsc::channel(1);
148		let (sender_maintained, receiver_maintained) = mpsc::channel(1);
149
150		let receiver = Arc::new(Mutex::new(receiver));
151		let receiver_maintained = Arc::new(Mutex::new(receiver_maintained));
152		spawn_validation_pool_task(
153			"transaction-pool-task-0",
154			receiver.clone(),
155			receiver_maintained.clone(),
156			spawner,
157			stats.clone(),
158			blocking_stats.clone(),
159		);
160		spawn_validation_pool_task(
161			"transaction-pool-task-1",
162			receiver,
163			receiver_maintained,
164			spawner,
165			stats.clone(),
166			blocking_stats.clone(),
167		);
168
169		FullChainApi {
170			client,
171			validation_pool_normal: sender,
172			validation_pool_maintained: sender_maintained,
173			_marker: Default::default(),
174			metrics,
175			validate_transaction_normal_stats: DurationSlidingStats::new(Duration::from_secs(
176				STAT_SLIDING_WINDOW,
177			)),
178			validate_transaction_maintained_stats: DurationSlidingStats::new(Duration::from_secs(
179				STAT_SLIDING_WINDOW,
180			)),
181		}
182	}
183}
184
185#[async_trait]
186impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
187where
188	Block: BlockT,
189	Client: ProvideRuntimeApi<Block>
190		+ BlockBackend<Block>
191		+ BlockIdTo<Block>
192		+ HeaderBackend<Block>
193		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
194	Client: Send + Sync + 'static,
195	Client::Api: TaggedTransactionQueue<Block>,
196{
197	type Block = Block;
198	type Error = error::Error;
199
200	async fn block_body(
201		&self,
202		hash: Block::Hash,
203	) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
204		self.client.block_body(hash).map_err(error::Error::from)
205	}
206
207	async fn validate_transaction(
208		&self,
209		at: <Self::Block as BlockT>::Hash,
210		source: TransactionSource,
211		uxt: graph::ExtrinsicFor<Self>,
212		validation_priority: ValidateTransactionPriority,
213	) -> Result<TransactionValidity, Self::Error> {
214		let start = Instant::now();
215		let (tx, rx) = oneshot::channel();
216		let client = self.client.clone();
217		let (stats, validation_pool, prefix) =
218			if validation_priority == ValidateTransactionPriority::Maintained {
219				(
220					self.validate_transaction_maintained_stats.clone(),
221					self.validation_pool_maintained.clone(),
222					"validate_transaction_maintained_stats",
223				)
224			} else {
225				(
226					self.validate_transaction_normal_stats.clone(),
227					self.validation_pool_normal.clone(),
228					"validate_transaction_stats",
229				)
230			};
231		let metrics = self.metrics.clone();
232
233		metrics.report(|m| m.validations_scheduled.inc());
234
235		{
236			validation_pool
237				.send(
238					async move {
239						let res = validate_transaction_blocking(&*client, at, source, uxt);
240						let _ = tx.send(res);
241						metrics.report(|m| m.validations_finished.inc());
242					}
243					.boxed(),
244				)
245				.await
246				.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
247		}
248
249		let validity = match rx.await {
250			Ok(r) => r,
251			Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
252		};
253
254		insert_and_log_throttled!(
255			Level::DEBUG,
256			target:LOG_TARGET_STAT,
257			prefix:prefix,
258			stats,
259			start.elapsed().into()
260		);
261
262		validity
263	}
264
265	/// Validates a transaction by calling into the runtime.
266	///
267	/// Same as `validate_transaction` but blocks the current thread when performing validation.
268	fn validate_transaction_blocking(
269		&self,
270		at: Block::Hash,
271		source: TransactionSource,
272		uxt: graph::ExtrinsicFor<Self>,
273	) -> Result<TransactionValidity, Self::Error> {
274		validate_transaction_blocking(&*self.client, at, source, uxt)
275	}
276
277	fn block_id_to_number(
278		&self,
279		at: &BlockId<Self::Block>,
280	) -> Result<Option<graph::NumberFor<Self>>, Self::Error> {
281		self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
282	}
283
284	fn block_id_to_hash(
285		&self,
286		at: &BlockId<Self::Block>,
287	) -> Result<Option<graph::BlockHash<Self>>, Self::Error> {
288		self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
289	}
290
291	fn hash_and_length(
292		&self,
293		ex: &graph::RawExtrinsicFor<Self>,
294	) -> (graph::ExtrinsicHash<Self>, usize) {
295		ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
296	}
297
298	fn block_header(
299		&self,
300		hash: <Self::Block as BlockT>::Hash,
301	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
302		self.client.header(hash).map_err(Into::into)
303	}
304
305	fn tree_route(
306		&self,
307		from: <Self::Block as BlockT>::Hash,
308		to: <Self::Block as BlockT>::Hash,
309	) -> Result<TreeRoute<Self::Block>, Self::Error> {
310		sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
311	}
312}
313
314/// Helper function to validate a transaction using a full chain API.
315/// This method will call into the runtime to perform the validation.
316fn validate_transaction_blocking<Client, Block>(
317	client: &Client,
318	at: Block::Hash,
319	source: TransactionSource,
320	uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
321) -> error::Result<TransactionValidity>
322where
323	Block: BlockT,
324	Client: ProvideRuntimeApi<Block>
325		+ BlockBackend<Block>
326		+ BlockIdTo<Block>
327		+ HeaderBackend<Block>
328		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
329	Client: Send + Sync + 'static,
330	Client::Api: TaggedTransactionQueue<Block>,
331{
332	let s = std::time::Instant::now();
333	let tx_hash = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
334
335	let result = sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
336	{
337		let runtime_api = client.runtime_api();
338		let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
339			runtime_api
340				.api_version::<dyn TaggedTransactionQueue<Block>>(at)
341				.map_err(|e| Error::RuntimeApi(e.to_string()))?
342				.ok_or_else(|| Error::RuntimeApi(
343					format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
344				))
345		}?;
346
347		use sp_api::Core;
348
349		sp_tracing::within_span!(
350			sp_tracing::Level::TRACE, "runtime::validate_transaction";
351		{
352			if api_version >= 3 {
353				runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
354					.map_err(|e| Error::RuntimeApi(e.to_string()))
355			} else {
356				let block_number = client.to_number(&BlockId::Hash(at))
357					.map_err(|e| Error::RuntimeApi(e.to_string()))?
358					.ok_or_else(||
359						Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
360					)?;
361
362				// The old versions require us to call `initialize_block` before.
363				runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
364					block_number + sp_runtime::traits::One::one(),
365					Default::default(),
366					Default::default(),
367					at,
368					Default::default()),
369				).map_err(|e| Error::RuntimeApi(e.to_string()))?;
370
371				if api_version == 2 {
372					#[allow(deprecated)] // old validate_transaction
373					runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
374						.map_err(|e| Error::RuntimeApi(e.to_string()))
375				} else {
376					#[allow(deprecated)] // old validate_transaction
377					runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
378						.map_err(|e| Error::RuntimeApi(e.to_string()))
379				}
380			}
381		})
382	});
383	trace!(
384		target: LOG_TARGET,
385		?tx_hash,
386		?at,
387		duration = ?s.elapsed(),
388		"validate_transaction_blocking"
389	);
390	result
391}