1use crate::graph::{
22 BlockHash, ChainApi, ExtrinsicHash, ValidateTransactionPriority, ValidatedTransaction,
23};
24use futures::prelude::*;
25use indexmap::IndexMap;
26use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
27use sp_runtime::{
28 generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
29};
30use std::{
31 collections::{BTreeMap, HashMap, HashSet},
32 pin::Pin,
33 sync::Arc,
34 time::Duration,
35};
36use tracing::{debug, trace, warn};
37
38const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
39
40const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
41
42const LOG_TARGET: &str = "txpool::revalidation";
43
44type Pool<Api> = crate::graph::Pool<Api, ()>;
45
46struct WorkerPayload<Api: ChainApi> {
48 at: BlockHash<Api>,
49 transactions: Vec<ExtrinsicHash<Api>>,
50}
51
52struct RevalidationWorker<Api: ChainApi> {
56 api: Arc<Api>,
57 pool: Arc<Pool<Api>>,
58 best_block: BlockHash<Api>,
59 block_ordered: BTreeMap<BlockHash<Api>, HashSet<ExtrinsicHash<Api>>>,
60 members: HashMap<ExtrinsicHash<Api>, BlockHash<Api>>,
61}
62
63impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
64
65async fn batch_revalidate<Api: ChainApi>(
70 pool: Arc<Pool<Api>>,
71 api: Arc<Api>,
72 at: BlockHash<Api>,
73 batch: impl IntoIterator<Item = ExtrinsicHash<Api>>,
74) {
75 let block_number = match api.block_id_to_number(&BlockId::Hash(at)) {
79 Ok(Some(n)) => n,
80 Ok(None) => {
81 trace!(
82 target: LOG_TARGET,
83 ?at,
84 "Revalidation skipped: could not get block number"
85 );
86 return
87 },
88 Err(error) => {
89 trace!(
90 target: LOG_TARGET,
91 ?at,
92 ?error,
93 "Revalidation skipped."
94 );
95 return
96 },
97 };
98
99 let mut invalid_hashes = Vec::new();
100 let mut revalidated = IndexMap::new();
101
102 let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
103 pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
104 api.validate_transaction(
105 at,
106 ext.source.clone().into(),
107 ext.data.clone(),
108 ValidateTransactionPriority::Submitted,
109 )
110 .map(move |validation_result| (validation_result, ext_hash, ext))
111 })
112 }))
113 .await;
114
115 for (validation_result, tx_hash, ext) in validation_results {
116 match validation_result {
117 Ok(Err(TransactionValidityError::Invalid(error))) => {
118 trace!(
119 target: LOG_TARGET,
120 ?tx_hash,
121 ?error,
122 "Revalidation: invalid."
123 );
124 invalid_hashes.push(tx_hash);
125 },
126 Ok(Err(TransactionValidityError::Unknown(error))) => {
127 trace!(
130 target: LOG_TARGET,
131 ?tx_hash,
132 ?error,
133 "Unknown during revalidation."
134 );
135 },
136 Ok(Ok(validity)) => {
137 revalidated.insert(
138 tx_hash,
139 ValidatedTransaction::valid_at(
140 block_number.saturated_into::<u64>(),
141 tx_hash,
142 ext.source.clone(),
143 ext.data.clone(),
144 api.hash_and_length(&ext.data).1,
145 validity,
146 ),
147 );
148 },
149 Err(error) => {
150 trace!(
151 target: LOG_TARGET,
152 ?tx_hash,
153 ?error,
154 "Removing due to error during revalidation."
155 );
156 invalid_hashes.push(tx_hash);
157 },
158 }
159 }
160
161 pool.validated_pool().remove_invalid(&invalid_hashes);
162 if revalidated.len() > 0 {
163 pool.resubmit(revalidated);
164 }
165}
166
167impl<Api: ChainApi> RevalidationWorker<Api> {
168 fn new(api: Arc<Api>, pool: Arc<Pool<Api>>, best_block: BlockHash<Api>) -> Self {
169 Self {
170 api,
171 pool,
172 best_block,
173 block_ordered: Default::default(),
174 members: Default::default(),
175 }
176 }
177
178 fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
179 let mut queued_exts = Vec::new();
180 let mut left =
181 std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);
182
183 while left > 0 {
186 let first_block = match self.block_ordered.keys().next().cloned() {
187 Some(bn) => bn,
188 None => break,
189 };
190 let mut block_drained = false;
191 if let Some(extrinsics) = self.block_ordered.get_mut(&first_block) {
192 let to_queue = extrinsics.iter().take(left).cloned().collect::<Vec<_>>();
193 if to_queue.len() == extrinsics.len() {
194 block_drained = true;
195 } else {
196 for xt in &to_queue {
197 extrinsics.remove(xt);
198 }
199 }
200 left -= to_queue.len();
201 queued_exts.extend(to_queue);
202 }
203
204 if block_drained {
205 self.block_ordered.remove(&first_block);
206 }
207 }
208
209 for hash in queued_exts.iter() {
210 self.members.remove(hash);
211 }
212
213 queued_exts
214 }
215
216 fn len(&self) -> usize {
217 self.block_ordered.iter().map(|b| b.1.len()).sum()
218 }
219
220 fn push(&mut self, worker_payload: WorkerPayload<Api>) {
221 let transactions = worker_payload.transactions;
223 let block_number = worker_payload.at;
224
225 for tx_hash in transactions {
226 if self.members.contains_key(&tx_hash) {
228 trace!(
229 target: LOG_TARGET,
230 ?tx_hash,
231 "Skipped adding for revalidation: Already there."
232 );
233
234 continue
235 }
236
237 self.block_ordered
238 .entry(block_number)
239 .and_modify(|value| {
240 value.insert(tx_hash);
241 })
242 .or_insert_with(|| {
243 let mut bt = HashSet::new();
244 bt.insert(tx_hash);
245 bt
246 });
247 self.members.insert(tx_hash, block_number);
248 }
249 }
250
251 pub async fn run(
257 mut self,
258 from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
259 interval: Duration,
260 ) {
261 let interval_fut = futures_timer::Delay::new(interval);
262 let from_queue = from_queue.fuse();
263 futures::pin_mut!(interval_fut, from_queue);
264 let this = &mut self;
265
266 loop {
267 futures::select! {
268 _ = (&mut interval_fut).fuse() => {
270 let next_batch = this.prepare_batch();
271 let batch_len = next_batch.len();
272
273 batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
274
275 if batch_len > 0 || this.len() > 0 {
276 trace!(
277 target: LOG_TARGET,
278 batch_len,
279 queue_len = this.len(),
280 "Revalidated transactions. Left in the queue for revalidation."
281 );
282 }
283
284 interval_fut.reset(interval);
285 },
286 workload = from_queue.next() => {
287 match workload {
288 Some(worker_payload) => {
289 this.best_block = worker_payload.at;
290 this.push(worker_payload);
291
292 if this.members.len() > 0 {
293 trace!(
294 target: LOG_TARGET,
295 at = ?this.best_block,
296 transactions = ?this.members,
297 "Updated revalidation queue."
298 );
299 }
300
301 continue;
302 },
303 None => break,
305 }
306 }
307 }
308 }
309 }
310}
311
312pub struct RevalidationQueue<Api: ChainApi> {
317 pool: Arc<Pool<Api>>,
318 api: Arc<Api>,
319 background: Option<TracingUnboundedSender<WorkerPayload<Api>>>,
320}
321
322impl<Api: ChainApi> RevalidationQueue<Api>
323where
324 Api: 'static,
325{
326 pub fn new(api: Arc<Api>, pool: Arc<Pool<Api>>) -> Self {
328 Self { api, pool, background: None }
329 }
330
331 pub fn new_with_interval(
333 api: Arc<Api>,
334 pool: Arc<Pool<Api>>,
335 interval: Duration,
336 best_block: BlockHash<Api>,
337 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
338 let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
339
340 let worker = RevalidationWorker::new(api.clone(), pool.clone(), best_block);
341
342 let queue = Self { api, pool, background: Some(to_worker) };
343
344 (queue, worker.run(from_queue, interval).boxed())
345 }
346
347 pub fn new_background(
349 api: Arc<Api>,
350 pool: Arc<Pool<Api>>,
351 best_block: BlockHash<Api>,
352 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
353 Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL, best_block)
354 }
355
356 pub async fn revalidate_later(
362 &self,
363 at: BlockHash<Api>,
364 transactions: Vec<ExtrinsicHash<Api>>,
365 ) {
366 if transactions.len() > 0 {
367 debug!(
368 target: LOG_TARGET,
369 transaction_count = transactions.len(),
370 "Sent transactions to revalidation queue."
371 );
372 }
373
374 if let Some(ref to_worker) = self.background {
375 if let Err(error) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
376 warn!(
377 target: LOG_TARGET,
378 ?error,
379 "Failed to update background worker."
380 );
381 }
382 } else {
383 debug!(
384 target: LOG_TARGET,
385 "Batch revalidate direct call."
386 );
387 let pool = self.pool.clone();
388 let api = self.api.clone();
389 batch_revalidate(pool, api, at, transactions).await
390 }
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::{
398 common::tests::{uxt, TestApi},
399 graph::Pool,
400 TimedTransactionSource,
401 };
402 use futures::executor::block_on;
403 use substrate_test_runtime::{AccountId, Transfer, H256};
404 use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
405
406 #[test]
407 fn revalidation_queue_works() {
408 let api = Arc::new(TestApi::default());
409 let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
410 Default::default(),
411 true.into(),
412 api.clone(),
413 ));
414 let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
415
416 let uxt = uxt(Transfer {
417 from: Alice.into(),
418 to: AccountId::from_h256(H256::from_low_u64_be(2)),
419 amount: 5,
420 nonce: 0,
421 });
422
423 let han_of_block0 = api.expect_hash_and_number(0);
424
425 let uxt_hash = block_on(pool.submit_one(
426 &han_of_block0,
427 TimedTransactionSource::new_external(false),
428 uxt.clone().into(),
429 ))
430 .expect("Should be valid")
431 .hash();
432
433 block_on(queue.revalidate_later(han_of_block0.hash, vec![uxt_hash]));
434
435 assert_eq!(api.validation_requests().len(), 2);
437 assert_eq!(pool.validated_pool().status().ready, 1);
439 }
440
441 #[test]
442 fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
443 let api = Arc::new(TestApi::default());
444 let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
445 Default::default(),
446 true.into(),
447 api.clone(),
448 ));
449 let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
450
451 let uxt0 = uxt(Transfer {
452 from: Alice.into(),
453 to: AccountId::from_h256(H256::from_low_u64_be(2)),
454 amount: 5,
455 nonce: 0,
456 });
457 let uxt1 = uxt(Transfer {
458 from: Bob.into(),
459 to: AccountId::from_h256(H256::from_low_u64_be(2)),
460 amount: 4,
461 nonce: 1,
462 });
463
464 let han_of_block0 = api.expect_hash_and_number(0);
465 let unknown_block = H256::repeat_byte(0x13);
466
467 let source = TimedTransactionSource::new_external(false);
468 let uxt_hashes = block_on(pool.submit_at(
469 &han_of_block0,
470 vec![(source.clone(), uxt0.into()), (source, uxt1.into())],
471 ValidateTransactionPriority::Submitted,
472 ))
473 .into_iter()
474 .map(|r| r.expect("Should be valid").hash())
475 .collect::<Vec<_>>();
476
477 assert_eq!(api.validation_requests().len(), 2);
478 assert_eq!(pool.validated_pool().status().ready, 2);
479
480 block_on(queue.revalidate_later(han_of_block0.hash, uxt_hashes.clone()));
482 assert_eq!(api.validation_requests().len(), 4);
483 assert_eq!(pool.validated_pool().status().ready, 2);
484
485 block_on(queue.revalidate_later(unknown_block, uxt_hashes));
487 assert_eq!(api.validation_requests().len(), 4);
489 assert_eq!(pool.validated_pool().status().ready, 2);
491 }
492}