1use crate::{
22 common::{sliding_stat::DurationSlidingStats, STAT_SLIDING_WINDOW},
23 graph::ValidateTransactionPriority,
24 insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
25};
26use async_trait::async_trait;
27use codec::Encode;
28use futures::future::{Future, FutureExt};
29use prometheus_endpoint::Registry as PrometheusRegistry;
30use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
31use sp_api::{ApiExt, ProvideRuntimeApi};
32use sp_blockchain::{HeaderMetadata, TreeRoute};
33use sp_core::traits::SpawnEssentialNamed;
34use sp_runtime::{
35 generic::BlockId,
36 traits::{self, Block as BlockT, BlockIdTo},
37 transaction_validity::{TransactionSource, TransactionValidity},
38};
39use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
40use std::{
41 marker::PhantomData,
42 pin::Pin,
43 sync::Arc,
44 time::{Duration, Instant},
45};
46use tokio::sync::{mpsc, oneshot, Mutex};
47
48use super::{
49 error::{self, Error},
50 metrics::{ApiMetrics, ApiMetricsExt},
51};
52use crate::graph;
53use tracing::{trace, warn, Level};
54
55pub struct FullChainApi<Client, Block> {
57 client: Arc<Client>,
58 _marker: PhantomData<Block>,
59 metrics: Option<Arc<ApiMetrics>>,
60 validation_pool_normal: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
61 validation_pool_maintained: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
62 validate_transaction_normal_stats: DurationSlidingStats,
63 validate_transaction_maintained_stats: DurationSlidingStats,
64}
65
66fn spawn_validation_pool_task(
68 name: &'static str,
69 receiver_normal: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
70 receiver_maintained: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
71 spawner: &impl SpawnEssentialNamed,
72 stats: DurationSlidingStats,
73 blocking_stats: DurationSlidingStats,
74) {
75 spawner.spawn_essential_blocking(
76 name,
77 Some("transaction-pool"),
78 async move {
79 loop {
80 let start = Instant::now();
81
82 let task = {
83 let receiver_maintained = receiver_maintained.clone();
84 let receiver_normal = receiver_normal.clone();
85 tokio::select! {
86 Some(task) = async {
87 receiver_maintained.lock().await.recv().await
88 } => { task }
89 Some(task) = async {
90 receiver_normal.lock().await.recv().await
91 } => { task }
92 else => {
93 return
94 }
95 }
96 };
97
98 let blocking_duration = {
99 let start = Instant::now();
100 task.await;
101 start.elapsed()
102 };
103
104 insert_and_log_throttled!(
105 Level::DEBUG,
106 target:LOG_TARGET_STAT,
107 prefix:format!("validate_transaction_inner_stats"),
108 stats,
109 start.elapsed().into()
110 );
111 insert_and_log_throttled!(
112 Level::DEBUG,
113 target:LOG_TARGET_STAT,
114 prefix:format!("validate_transaction_blocking_stats"),
115 blocking_stats,
116 blocking_duration.into()
117 );
118 trace!(target:LOG_TARGET, duration=?start.elapsed(), "spawn_validation_pool_task");
119 }
120 }
121 .boxed(),
122 );
123}
124
125impl<Client, Block> FullChainApi<Client, Block> {
126 pub fn new(
128 client: Arc<Client>,
129 prometheus: Option<&PrometheusRegistry>,
130 spawner: &impl SpawnEssentialNamed,
131 ) -> Self {
132 let stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
133 let blocking_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
134
135 let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
136 Err(error) => {
137 warn!(
138 target: LOG_TARGET,
139 ?error,
140 "Failed to register transaction pool API Prometheus metrics"
141 );
142 None
143 },
144 Ok(api) => Some(Arc::new(api)),
145 });
146
147 let (sender, receiver) = mpsc::channel(1);
148 let (sender_maintained, receiver_maintained) = mpsc::channel(1);
149
150 let receiver = Arc::new(Mutex::new(receiver));
151 let receiver_maintained = Arc::new(Mutex::new(receiver_maintained));
152 spawn_validation_pool_task(
153 "transaction-pool-task-0",
154 receiver.clone(),
155 receiver_maintained.clone(),
156 spawner,
157 stats.clone(),
158 blocking_stats.clone(),
159 );
160 spawn_validation_pool_task(
161 "transaction-pool-task-1",
162 receiver,
163 receiver_maintained,
164 spawner,
165 stats.clone(),
166 blocking_stats.clone(),
167 );
168
169 FullChainApi {
170 client,
171 validation_pool_normal: sender,
172 validation_pool_maintained: sender_maintained,
173 _marker: Default::default(),
174 metrics,
175 validate_transaction_normal_stats: DurationSlidingStats::new(Duration::from_secs(
176 STAT_SLIDING_WINDOW,
177 )),
178 validate_transaction_maintained_stats: DurationSlidingStats::new(Duration::from_secs(
179 STAT_SLIDING_WINDOW,
180 )),
181 }
182 }
183}
184
185#[async_trait]
186impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
187where
188 Block: BlockT,
189 Client: ProvideRuntimeApi<Block>
190 + BlockBackend<Block>
191 + BlockIdTo<Block>
192 + HeaderBackend<Block>
193 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
194 Client: Send + Sync + 'static,
195 Client::Api: TaggedTransactionQueue<Block>,
196{
197 type Block = Block;
198 type Error = error::Error;
199
200 async fn block_body(
201 &self,
202 hash: Block::Hash,
203 ) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
204 self.client.block_body(hash).map_err(error::Error::from)
205 }
206
207 async fn validate_transaction(
208 &self,
209 at: <Self::Block as BlockT>::Hash,
210 source: TransactionSource,
211 uxt: graph::ExtrinsicFor<Self>,
212 validation_priority: ValidateTransactionPriority,
213 ) -> Result<TransactionValidity, Self::Error> {
214 let start = Instant::now();
215 let (tx, rx) = oneshot::channel();
216 let client = self.client.clone();
217 let (stats, validation_pool, prefix) =
218 if validation_priority == ValidateTransactionPriority::Maintained {
219 (
220 self.validate_transaction_maintained_stats.clone(),
221 self.validation_pool_maintained.clone(),
222 "validate_transaction_maintained_stats",
223 )
224 } else {
225 (
226 self.validate_transaction_normal_stats.clone(),
227 self.validation_pool_normal.clone(),
228 "validate_transaction_stats",
229 )
230 };
231 let metrics = self.metrics.clone();
232
233 metrics.report(|m| m.validations_scheduled.inc());
234
235 {
236 validation_pool
237 .send(
238 async move {
239 let res = validate_transaction_blocking(&*client, at, source, uxt);
240 let _ = tx.send(res);
241 metrics.report(|m| m.validations_finished.inc());
242 }
243 .boxed(),
244 )
245 .await
246 .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
247 }
248
249 let validity = match rx.await {
250 Ok(r) => r,
251 Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
252 };
253
254 insert_and_log_throttled!(
255 Level::DEBUG,
256 target:LOG_TARGET_STAT,
257 prefix:prefix,
258 stats,
259 start.elapsed().into()
260 );
261
262 validity
263 }
264
265 fn validate_transaction_blocking(
269 &self,
270 at: Block::Hash,
271 source: TransactionSource,
272 uxt: graph::ExtrinsicFor<Self>,
273 ) -> Result<TransactionValidity, Self::Error> {
274 validate_transaction_blocking(&*self.client, at, source, uxt)
275 }
276
277 fn block_id_to_number(
278 &self,
279 at: &BlockId<Self::Block>,
280 ) -> Result<Option<graph::NumberFor<Self>>, Self::Error> {
281 self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
282 }
283
284 fn block_id_to_hash(
285 &self,
286 at: &BlockId<Self::Block>,
287 ) -> Result<Option<graph::BlockHash<Self>>, Self::Error> {
288 self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
289 }
290
291 fn hash_and_length(
292 &self,
293 ex: &graph::RawExtrinsicFor<Self>,
294 ) -> (graph::ExtrinsicHash<Self>, usize) {
295 ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
296 }
297
298 fn block_header(
299 &self,
300 hash: <Self::Block as BlockT>::Hash,
301 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
302 self.client.header(hash).map_err(Into::into)
303 }
304
305 fn tree_route(
306 &self,
307 from: <Self::Block as BlockT>::Hash,
308 to: <Self::Block as BlockT>::Hash,
309 ) -> Result<TreeRoute<Self::Block>, Self::Error> {
310 sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
311 }
312}
313
314fn validate_transaction_blocking<Client, Block>(
317 client: &Client,
318 at: Block::Hash,
319 source: TransactionSource,
320 uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
321) -> error::Result<TransactionValidity>
322where
323 Block: BlockT,
324 Client: ProvideRuntimeApi<Block>
325 + BlockBackend<Block>
326 + BlockIdTo<Block>
327 + HeaderBackend<Block>
328 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
329 Client: Send + Sync + 'static,
330 Client::Api: TaggedTransactionQueue<Block>,
331{
332 let s = std::time::Instant::now();
333 let tx_hash = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
334
335 let result = sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
336 {
337 let runtime_api = client.runtime_api();
338 let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
339 runtime_api
340 .api_version::<dyn TaggedTransactionQueue<Block>>(at)
341 .map_err(|e| Error::RuntimeApi(e.to_string()))?
342 .ok_or_else(|| Error::RuntimeApi(
343 format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
344 ))
345 }?;
346
347 use sp_api::Core;
348
349 sp_tracing::within_span!(
350 sp_tracing::Level::TRACE, "runtime::validate_transaction";
351 {
352 if api_version >= 3 {
353 runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
354 .map_err(|e| Error::RuntimeApi(e.to_string()))
355 } else {
356 let block_number = client.to_number(&BlockId::Hash(at))
357 .map_err(|e| Error::RuntimeApi(e.to_string()))?
358 .ok_or_else(||
359 Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
360 )?;
361
362 runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
364 block_number + sp_runtime::traits::One::one(),
365 Default::default(),
366 Default::default(),
367 at,
368 Default::default()),
369 ).map_err(|e| Error::RuntimeApi(e.to_string()))?;
370
371 if api_version == 2 {
372 #[allow(deprecated)] runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
374 .map_err(|e| Error::RuntimeApi(e.to_string()))
375 } else {
376 #[allow(deprecated)] runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
378 .map_err(|e| Error::RuntimeApi(e.to_string()))
379 }
380 }
381 })
382 });
383 trace!(
384 target: LOG_TARGET,
385 ?tx_hash,
386 ?at,
387 duration = ?s.elapsed(),
388 "validate_transaction_blocking"
389 );
390 result
391}