1use std::{
22 collections::{BTreeMap, HashMap, HashSet},
23 pin::Pin,
24 sync::Arc,
25};
26
27use crate::{
28 graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction},
29 LOG_TARGET,
30};
31use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
32use sp_runtime::{
33 generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
34};
35
36use futures::prelude::*;
37use std::time::Duration;
38
39const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
40
41const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
42
43struct WorkerPayload<Api: ChainApi> {
45 at: BlockHash<Api>,
46 transactions: Vec<ExtrinsicHash<Api>>,
47}
48
49struct RevalidationWorker<Api: ChainApi> {
53 api: Arc<Api>,
54 pool: Arc<Pool<Api>>,
55 best_block: BlockHash<Api>,
56 block_ordered: BTreeMap<BlockHash<Api>, HashSet<ExtrinsicHash<Api>>>,
57 members: HashMap<ExtrinsicHash<Api>, BlockHash<Api>>,
58}
59
60impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
61
62async fn batch_revalidate<Api: ChainApi>(
67 pool: Arc<Pool<Api>>,
68 api: Arc<Api>,
69 at: BlockHash<Api>,
70 batch: impl IntoIterator<Item = ExtrinsicHash<Api>>,
71) {
72 let block_number = match api.block_id_to_number(&BlockId::Hash(at)) {
76 Ok(Some(n)) => n,
77 Ok(None) => {
78 log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}, could not get block number.");
79 return
80 },
81 Err(e) => {
82 log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}: {e:?}.");
83 return
84 },
85 };
86
87 let mut invalid_hashes = Vec::new();
88 let mut revalidated = HashMap::new();
89
90 let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
91 pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
92 api.validate_transaction(at, ext.source, ext.data.clone())
93 .map(move |validation_result| (validation_result, ext_hash, ext))
94 })
95 }))
96 .await;
97
98 for (validation_result, ext_hash, ext) in validation_results {
99 match validation_result {
100 Ok(Err(TransactionValidityError::Invalid(err))) => {
101 log::debug!(
102 target: LOG_TARGET,
103 "[{:?}]: Revalidation: invalid {:?}",
104 ext_hash,
105 err,
106 );
107 invalid_hashes.push(ext_hash);
108 },
109 Ok(Err(TransactionValidityError::Unknown(err))) => {
110 log::trace!(
113 target: LOG_TARGET,
114 "[{:?}]: Unknown during revalidation: {:?}",
115 ext_hash,
116 err,
117 );
118 },
119 Ok(Ok(validity)) => {
120 revalidated.insert(
121 ext_hash,
122 ValidatedTransaction::valid_at(
123 block_number.saturated_into::<u64>(),
124 ext_hash,
125 ext.source,
126 ext.data.clone(),
127 api.hash_and_length(&ext.data).1,
128 validity,
129 ),
130 );
131 },
132 Err(validation_err) => {
133 log::debug!(
134 target: LOG_TARGET,
135 "[{:?}]: Removing due to error during revalidation: {}",
136 ext_hash,
137 validation_err
138 );
139 invalid_hashes.push(ext_hash);
140 },
141 }
142 }
143
144 pool.validated_pool().remove_invalid(&invalid_hashes);
145 if revalidated.len() > 0 {
146 pool.resubmit(revalidated);
147 }
148}
149
150impl<Api: ChainApi> RevalidationWorker<Api> {
151 fn new(api: Arc<Api>, pool: Arc<Pool<Api>>, best_block: BlockHash<Api>) -> Self {
152 Self {
153 api,
154 pool,
155 best_block,
156 block_ordered: Default::default(),
157 members: Default::default(),
158 }
159 }
160
161 fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
162 let mut queued_exts = Vec::new();
163 let mut left =
164 std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);
165
166 while left > 0 {
169 let first_block = match self.block_ordered.keys().next().cloned() {
170 Some(bn) => bn,
171 None => break,
172 };
173 let mut block_drained = false;
174 if let Some(extrinsics) = self.block_ordered.get_mut(&first_block) {
175 let to_queue = extrinsics.iter().take(left).cloned().collect::<Vec<_>>();
176 if to_queue.len() == extrinsics.len() {
177 block_drained = true;
178 } else {
179 for xt in &to_queue {
180 extrinsics.remove(xt);
181 }
182 }
183 left -= to_queue.len();
184 queued_exts.extend(to_queue);
185 }
186
187 if block_drained {
188 self.block_ordered.remove(&first_block);
189 }
190 }
191
192 for hash in queued_exts.iter() {
193 self.members.remove(hash);
194 }
195
196 queued_exts
197 }
198
199 fn len(&self) -> usize {
200 self.block_ordered.iter().map(|b| b.1.len()).sum()
201 }
202
203 fn push(&mut self, worker_payload: WorkerPayload<Api>) {
204 let transactions = worker_payload.transactions;
206 let block_number = worker_payload.at;
207
208 for ext_hash in transactions {
209 if self.members.contains_key(&ext_hash) {
211 log::trace!(
212 target: LOG_TARGET,
213 "[{:?}] Skipped adding for revalidation: Already there.",
214 ext_hash,
215 );
216
217 continue
218 }
219
220 self.block_ordered
221 .entry(block_number)
222 .and_modify(|value| {
223 value.insert(ext_hash);
224 })
225 .or_insert_with(|| {
226 let mut bt = HashSet::new();
227 bt.insert(ext_hash);
228 bt
229 });
230 self.members.insert(ext_hash, block_number);
231 }
232 }
233
234 pub async fn run(
240 mut self,
241 from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
242 interval: Duration,
243 ) {
244 let interval_fut = futures_timer::Delay::new(interval);
245 let from_queue = from_queue.fuse();
246 futures::pin_mut!(interval_fut, from_queue);
247 let this = &mut self;
248
249 loop {
250 futures::select! {
251 _ = (&mut interval_fut).fuse() => {
253 let next_batch = this.prepare_batch();
254 let batch_len = next_batch.len();
255
256 batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
257
258 if batch_len > 0 || this.len() > 0 {
259 log::debug!(
260 target: LOG_TARGET,
261 "Revalidated {} transactions. Left in the queue for revalidation: {}.",
262 batch_len,
263 this.len(),
264 );
265 }
266
267 interval_fut.reset(interval);
268 },
269 workload = from_queue.next() => {
270 match workload {
271 Some(worker_payload) => {
272 this.best_block = worker_payload.at;
273 this.push(worker_payload);
274
275 if this.members.len() > 0 {
276 log::debug!(
277 target: LOG_TARGET,
278 "Updated revalidation queue at {:?}. Transactions: {:?}",
279 this.best_block,
280 this.members,
281 );
282 }
283
284 continue;
285 },
286 None => break,
288 }
289 }
290 }
291 }
292 }
293}
294
295pub struct RevalidationQueue<Api: ChainApi> {
300 pool: Arc<Pool<Api>>,
301 api: Arc<Api>,
302 background: Option<TracingUnboundedSender<WorkerPayload<Api>>>,
303}
304
305impl<Api: ChainApi> RevalidationQueue<Api>
306where
307 Api: 'static,
308{
309 pub fn new(api: Arc<Api>, pool: Arc<Pool<Api>>) -> Self {
311 Self { api, pool, background: None }
312 }
313
314 pub fn new_with_interval(
316 api: Arc<Api>,
317 pool: Arc<Pool<Api>>,
318 interval: Duration,
319 best_block: BlockHash<Api>,
320 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
321 let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
322
323 let worker = RevalidationWorker::new(api.clone(), pool.clone(), best_block);
324
325 let queue = Self { api, pool, background: Some(to_worker) };
326
327 (queue, worker.run(from_queue, interval).boxed())
328 }
329
330 pub fn new_background(
332 api: Arc<Api>,
333 pool: Arc<Pool<Api>>,
334 best_block: BlockHash<Api>,
335 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
336 Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL, best_block)
337 }
338
339 pub async fn revalidate_later(
345 &self,
346 at: BlockHash<Api>,
347 transactions: Vec<ExtrinsicHash<Api>>,
348 ) {
349 if transactions.len() > 0 {
350 log::debug!(
351 target: LOG_TARGET,
352 "Sent {} transactions to revalidation queue",
353 transactions.len(),
354 );
355 }
356
357 if let Some(ref to_worker) = self.background {
358 if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
359 log::warn!(target: LOG_TARGET, "Failed to update background worker: {:?}", e);
360 }
361 } else {
362 let pool = self.pool.clone();
363 let api = self.api.clone();
364 batch_revalidate(pool, api, at, transactions).await
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::{
373 graph::Pool,
374 tests::{uxt, TestApi},
375 };
376 use futures::executor::block_on;
377 use sc_transaction_pool_api::TransactionSource;
378 use substrate_test_runtime::{AccountId, Transfer, H256};
379 use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
380
381 #[test]
382 fn revalidation_queue_works() {
383 let api = Arc::new(TestApi::default());
384 let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
385 let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
386
387 let uxt = uxt(Transfer {
388 from: Alice.into(),
389 to: AccountId::from_h256(H256::from_low_u64_be(2)),
390 amount: 5,
391 nonce: 0,
392 });
393
394 let hash_of_block0 = api.expect_hash_from_number(0);
395
396 let uxt_hash =
397 block_on(pool.submit_one(hash_of_block0, TransactionSource::External, uxt.clone()))
398 .expect("Should be valid");
399
400 block_on(queue.revalidate_later(hash_of_block0, vec![uxt_hash]));
401
402 assert_eq!(api.validation_requests().len(), 2);
404 assert_eq!(pool.validated_pool().status().ready, 1);
406 }
407
408 #[test]
409 fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
410 let api = Arc::new(TestApi::default());
411 let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
412 let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
413
414 let uxt0 = uxt(Transfer {
415 from: Alice.into(),
416 to: AccountId::from_h256(H256::from_low_u64_be(2)),
417 amount: 5,
418 nonce: 0,
419 });
420 let uxt1 = uxt(Transfer {
421 from: Bob.into(),
422 to: AccountId::from_h256(H256::from_low_u64_be(2)),
423 amount: 4,
424 nonce: 1,
425 });
426
427 let hash_of_block0 = api.expect_hash_from_number(0);
428 let unknown_block = H256::repeat_byte(0x13);
429
430 let uxt_hashes =
431 block_on(pool.submit_at(hash_of_block0, TransactionSource::External, vec![uxt0, uxt1]))
432 .expect("Should be valid")
433 .into_iter()
434 .map(|r| r.expect("Should be valid"))
435 .collect::<Vec<_>>();
436
437 assert_eq!(api.validation_requests().len(), 2);
438 assert_eq!(pool.validated_pool().status().ready, 2);
439
440 block_on(queue.revalidate_later(hash_of_block0, uxt_hashes.clone()));
442 assert_eq!(api.validation_requests().len(), 4);
443 assert_eq!(pool.validated_pool().status().ready, 2);
444
445 block_on(queue.revalidate_later(unknown_block, uxt_hashes));
447 assert_eq!(api.validation_requests().len(), 4);
449 assert_eq!(pool.validated_pool().status().ready, 2);
451 }
452}