1use crate::LOG_TARGET;
22use codec::Encode;
23use futures::{
24 channel::{mpsc, oneshot},
25 future::{ready, Future, FutureExt, Ready},
26 lock::Mutex,
27 SinkExt, StreamExt,
28};
29use std::{marker::PhantomData, pin::Pin, sync::Arc};
30
31use prometheus_endpoint::Registry as PrometheusRegistry;
32use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
33use sp_api::{ApiExt, ProvideRuntimeApi};
34use sp_blockchain::{HeaderMetadata, TreeRoute};
35use sp_core::traits::SpawnEssentialNamed;
36use sp_runtime::{
37 generic::BlockId,
38 traits::{self, Block as BlockT, BlockIdTo},
39 transaction_validity::{TransactionSource, TransactionValidity},
40};
41use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
42
43use crate::{
44 error::{self, Error},
45 graph,
46 metrics::{ApiMetrics, ApiMetricsExt},
47};
48
49pub struct FullChainApi<Client, Block> {
51 client: Arc<Client>,
52 _marker: PhantomData<Block>,
53 metrics: Option<Arc<ApiMetrics>>,
54 validation_pool: Arc<Mutex<mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
55}
56
57fn spawn_validation_pool_task(
59 name: &'static str,
60 receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
61 spawner: &impl SpawnEssentialNamed,
62) {
63 spawner.spawn_essential_blocking(
64 name,
65 Some("transaction-pool"),
66 async move {
67 loop {
68 let task = receiver.lock().await.next().await;
69 match task {
70 None => return,
71 Some(task) => task.await,
72 }
73 }
74 }
75 .boxed(),
76 );
77}
78
79impl<Client, Block> FullChainApi<Client, Block> {
80 pub fn new(
82 client: Arc<Client>,
83 prometheus: Option<&PrometheusRegistry>,
84 spawner: &impl SpawnEssentialNamed,
85 ) -> Self {
86 let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
87 Err(err) => {
88 log::warn!(
89 target: LOG_TARGET,
90 "Failed to register transaction pool api prometheus metrics: {:?}",
91 err,
92 );
93 None
94 },
95 Ok(api) => Some(Arc::new(api)),
96 });
97
98 let (sender, receiver) = mpsc::channel(0);
99
100 let receiver = Arc::new(Mutex::new(receiver));
101 spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
102 spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
103
104 FullChainApi {
105 client,
106 validation_pool: Arc::new(Mutex::new(sender)),
107 _marker: Default::default(),
108 metrics,
109 }
110 }
111}
112
113impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
114where
115 Block: BlockT,
116 Client: ProvideRuntimeApi<Block>
117 + BlockBackend<Block>
118 + BlockIdTo<Block>
119 + HeaderBackend<Block>
120 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
121 Client: Send + Sync + 'static,
122 Client::Api: TaggedTransactionQueue<Block>,
123{
124 type Block = Block;
125 type Error = error::Error;
126 type ValidationFuture =
127 Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
128 type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
129
130 fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture {
131 ready(self.client.block_body(hash).map_err(error::Error::from))
132 }
133
134 fn validate_transaction(
135 &self,
136 at: <Self::Block as BlockT>::Hash,
137 source: TransactionSource,
138 uxt: graph::ExtrinsicFor<Self>,
139 ) -> Self::ValidationFuture {
140 let (tx, rx) = oneshot::channel();
141 let client = self.client.clone();
142 let validation_pool = self.validation_pool.clone();
143 let metrics = self.metrics.clone();
144
145 async move {
146 metrics.report(|m| m.validations_scheduled.inc());
147
148 validation_pool
149 .lock()
150 .await
151 .send(
152 async move {
153 let res = validate_transaction_blocking(&*client, at, source, uxt);
154 let _ = tx.send(res);
155 metrics.report(|m| m.validations_finished.inc());
156 }
157 .boxed(),
158 )
159 .await
160 .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
161
162 match rx.await {
163 Ok(r) => r,
164 Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
165 }
166 }
167 .boxed()
168 }
169
170 fn block_id_to_number(
171 &self,
172 at: &BlockId<Self::Block>,
173 ) -> error::Result<Option<graph::NumberFor<Self>>> {
174 self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
175 }
176
177 fn block_id_to_hash(
178 &self,
179 at: &BlockId<Self::Block>,
180 ) -> error::Result<Option<graph::BlockHash<Self>>> {
181 self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
182 }
183
184 fn hash_and_length(
185 &self,
186 ex: &graph::ExtrinsicFor<Self>,
187 ) -> (graph::ExtrinsicHash<Self>, usize) {
188 ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
189 }
190
191 fn block_header(
192 &self,
193 hash: <Self::Block as BlockT>::Hash,
194 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
195 self.client.header(hash).map_err(Into::into)
196 }
197
198 fn tree_route(
199 &self,
200 from: <Self::Block as BlockT>::Hash,
201 to: <Self::Block as BlockT>::Hash,
202 ) -> Result<TreeRoute<Self::Block>, Self::Error> {
203 sp_blockchain::tree_route::<Block, Client>(&*self.client, from, to).map_err(Into::into)
204 }
205}
206
207fn validate_transaction_blocking<Client, Block>(
210 client: &Client,
211 at: Block::Hash,
212 source: TransactionSource,
213 uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
214) -> error::Result<TransactionValidity>
215where
216 Block: BlockT,
217 Client: ProvideRuntimeApi<Block>
218 + BlockBackend<Block>
219 + BlockIdTo<Block>
220 + HeaderBackend<Block>
221 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
222 Client: Send + Sync + 'static,
223 Client::Api: TaggedTransactionQueue<Block>,
224{
225 sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
226 {
227 let runtime_api = client.runtime_api();
228 let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
229 runtime_api
230 .api_version::<dyn TaggedTransactionQueue<Block>>(at)
231 .map_err(|e| Error::RuntimeApi(e.to_string()))?
232 .ok_or_else(|| Error::RuntimeApi(
233 format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
234 ))
235 }?;
236
237 use sp_api::Core;
238
239 sp_tracing::within_span!(
240 sp_tracing::Level::TRACE, "runtime::validate_transaction";
241 {
242 if api_version >= 3 {
243 runtime_api.validate_transaction(at, source, uxt, at)
244 .map_err(|e| Error::RuntimeApi(e.to_string()))
245 } else {
246 let block_number = client.to_number(&BlockId::Hash(at))
247 .map_err(|e| Error::RuntimeApi(e.to_string()))?
248 .ok_or_else(||
249 Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
250 )?;
251
252 runtime_api.initialize_block(at, &sp_runtime::traits::Header::new(
254 block_number + sp_runtime::traits::One::one(),
255 Default::default(),
256 Default::default(),
257 at,
258 Default::default()),
259 ).map_err(|e| Error::RuntimeApi(e.to_string()))?;
260
261 if api_version == 2 {
262 #[allow(deprecated)] runtime_api.validate_transaction_before_version_3(at, source, uxt)
264 .map_err(|e| Error::RuntimeApi(e.to_string()))
265 } else {
266 #[allow(deprecated)] runtime_api.validate_transaction_before_version_2(at, uxt)
268 .map_err(|e| Error::RuntimeApi(e.to_string()))
269 }
270 }
271 })
272 })
273}
274
275impl<Client, Block> FullChainApi<Client, Block>
276where
277 Block: BlockT,
278 Client: ProvideRuntimeApi<Block>
279 + BlockBackend<Block>
280 + BlockIdTo<Block>
281 + HeaderBackend<Block>
282 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
283 Client: Send + Sync + 'static,
284 Client::Api: TaggedTransactionQueue<Block>,
285{
286 pub fn validate_transaction_blocking(
291 &self,
292 at: Block::Hash,
293 source: TransactionSource,
294 uxt: graph::ExtrinsicFor<Self>,
295 ) -> error::Result<TransactionValidity> {
296 validate_transaction_blocking(&*self.client, at, source, uxt)
297 }
298}