referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/single_state_txpool/
revalidation.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Pool periodic revalidation.
20
21use crate::graph::{
22	BlockHash, ChainApi, ExtrinsicHash, ValidateTransactionPriority, ValidatedTransaction,
23};
24use futures::prelude::*;
25use indexmap::IndexMap;
26use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
27use sp_runtime::{
28	generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
29};
30use std::{
31	collections::{BTreeMap, HashMap, HashSet},
32	pin::Pin,
33	sync::Arc,
34	time::Duration,
35};
36use tracing::{debug, trace, warn};
37
38const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
39
40const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
41
42const LOG_TARGET: &str = "txpool::revalidation";
43
44type Pool<Api> = crate::graph::Pool<Api, ()>;
45
46/// Payload from queue to worker.
47struct WorkerPayload<Api: ChainApi> {
48	at: BlockHash<Api>,
49	transactions: Vec<ExtrinsicHash<Api>>,
50}
51
52/// Async revalidation worker.
53///
54/// Implements future and can be spawned in place or in background.
55struct RevalidationWorker<Api: ChainApi> {
56	api: Arc<Api>,
57	pool: Arc<Pool<Api>>,
58	best_block: BlockHash<Api>,
59	block_ordered: BTreeMap<BlockHash<Api>, HashSet<ExtrinsicHash<Api>>>,
60	members: HashMap<ExtrinsicHash<Api>, BlockHash<Api>>,
61}
62
63impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
64
65/// Revalidate batch of transaction.
66///
67/// Each transaction is validated  against chain, and invalid are
68/// removed from the `pool`, while valid are resubmitted.
69async fn batch_revalidate<Api: ChainApi>(
70	pool: Arc<Pool<Api>>,
71	api: Arc<Api>,
72	at: BlockHash<Api>,
73	batch: impl IntoIterator<Item = ExtrinsicHash<Api>>,
74) {
75	// This conversion should work. Otherwise, for unknown block the revalidation shall be skipped,
76	// all the transactions will be kept in the validated pool, and can be scheduled for
77	// revalidation with the next request.
78	let block_number = match api.block_id_to_number(&BlockId::Hash(at)) {
79		Ok(Some(n)) => n,
80		Ok(None) => {
81			trace!(
82				target: LOG_TARGET,
83				?at,
84				"Revalidation skipped: could not get block number"
85			);
86			return
87		},
88		Err(error) => {
89			trace!(
90				target: LOG_TARGET,
91				?at,
92				?error,
93				"Revalidation skipped."
94			);
95			return
96		},
97	};
98
99	let mut invalid_hashes = Vec::new();
100	let mut revalidated = IndexMap::new();
101
102	let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
103		pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
104			api.validate_transaction(
105				at,
106				ext.source.clone().into(),
107				ext.data.clone(),
108				ValidateTransactionPriority::Submitted,
109			)
110			.map(move |validation_result| (validation_result, ext_hash, ext))
111		})
112	}))
113	.await;
114
115	for (validation_result, tx_hash, ext) in validation_results {
116		match validation_result {
117			Ok(Err(TransactionValidityError::Invalid(error))) => {
118				trace!(
119					target: LOG_TARGET,
120					?tx_hash,
121					?error,
122					"Revalidation: invalid."
123				);
124				invalid_hashes.push(tx_hash);
125			},
126			Ok(Err(TransactionValidityError::Unknown(error))) => {
127				// skipping unknown, they might be pushed by valid or invalid transaction
128				// when latter resubmitted.
129				trace!(
130					target: LOG_TARGET,
131					?tx_hash,
132					?error,
133					"Unknown during revalidation."
134				);
135			},
136			Ok(Ok(validity)) => {
137				revalidated.insert(
138					tx_hash,
139					ValidatedTransaction::valid_at(
140						block_number.saturated_into::<u64>(),
141						tx_hash,
142						ext.source.clone(),
143						ext.data.clone(),
144						api.hash_and_length(&ext.data).1,
145						validity,
146					),
147				);
148			},
149			Err(error) => {
150				trace!(
151					target: LOG_TARGET,
152					?tx_hash,
153					?error,
154					"Removing due to error during revalidation."
155				);
156				invalid_hashes.push(tx_hash);
157			},
158		}
159	}
160
161	pool.validated_pool().remove_invalid(&invalid_hashes);
162	if revalidated.len() > 0 {
163		pool.resubmit(revalidated);
164	}
165}
166
167impl<Api: ChainApi> RevalidationWorker<Api> {
168	fn new(api: Arc<Api>, pool: Arc<Pool<Api>>, best_block: BlockHash<Api>) -> Self {
169		Self {
170			api,
171			pool,
172			best_block,
173			block_ordered: Default::default(),
174			members: Default::default(),
175		}
176	}
177
178	fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
179		let mut queued_exts = Vec::new();
180		let mut left =
181			std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);
182
183		// Take maximum of count transaction by order
184		// which they got into the pool
185		while left > 0 {
186			let first_block = match self.block_ordered.keys().next().cloned() {
187				Some(bn) => bn,
188				None => break,
189			};
190			let mut block_drained = false;
191			if let Some(extrinsics) = self.block_ordered.get_mut(&first_block) {
192				let to_queue = extrinsics.iter().take(left).cloned().collect::<Vec<_>>();
193				if to_queue.len() == extrinsics.len() {
194					block_drained = true;
195				} else {
196					for xt in &to_queue {
197						extrinsics.remove(xt);
198					}
199				}
200				left -= to_queue.len();
201				queued_exts.extend(to_queue);
202			}
203
204			if block_drained {
205				self.block_ordered.remove(&first_block);
206			}
207		}
208
209		for hash in queued_exts.iter() {
210			self.members.remove(hash);
211		}
212
213		queued_exts
214	}
215
216	fn len(&self) -> usize {
217		self.block_ordered.iter().map(|b| b.1.len()).sum()
218	}
219
220	fn push(&mut self, worker_payload: WorkerPayload<Api>) {
221		// we don't add something that already scheduled for revalidation
222		let transactions = worker_payload.transactions;
223		let block_number = worker_payload.at;
224
225		for tx_hash in transactions {
226			// we don't add something that already scheduled for revalidation
227			if self.members.contains_key(&tx_hash) {
228				trace!(
229					target: LOG_TARGET,
230					?tx_hash,
231					"Skipped adding for revalidation: Already there."
232				);
233
234				continue
235			}
236
237			self.block_ordered
238				.entry(block_number)
239				.and_modify(|value| {
240					value.insert(tx_hash);
241				})
242				.or_insert_with(|| {
243					let mut bt = HashSet::new();
244					bt.insert(tx_hash);
245					bt
246				});
247			self.members.insert(tx_hash, block_number);
248		}
249	}
250
251	/// Background worker main loop.
252	///
253	/// It does two things: periodically tries to process some transactions
254	/// from the queue and also accepts messages to enqueue some more
255	/// transactions from the pool.
256	pub async fn run(
257		mut self,
258		from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
259		interval: Duration,
260	) {
261		let interval_fut = futures_timer::Delay::new(interval);
262		let from_queue = from_queue.fuse();
263		futures::pin_mut!(interval_fut, from_queue);
264		let this = &mut self;
265
266		loop {
267			futures::select! {
268				// Using `fuse()` in here is okay, because we reset the interval when it has fired.
269				_ = (&mut interval_fut).fuse() => {
270					let next_batch = this.prepare_batch();
271					let batch_len = next_batch.len();
272
273					batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
274
275					if batch_len > 0 || this.len() > 0 {
276						trace!(
277							target: LOG_TARGET,
278							batch_len,
279							queue_len = this.len(),
280							"Revalidated transactions. Left in the queue for revalidation."
281						);
282					}
283
284					interval_fut.reset(interval);
285				},
286				workload = from_queue.next() => {
287					match workload {
288						Some(worker_payload) => {
289							this.best_block = worker_payload.at;
290							this.push(worker_payload);
291
292							if this.members.len() > 0 {
293								trace!(
294									target: LOG_TARGET,
295									at = ?this.best_block,
296									transactions = ?this.members,
297									"Updated revalidation queue."
298								);
299							}
300
301							continue;
302						},
303						// R.I.P. worker!
304						None => break,
305					}
306				}
307			}
308		}
309	}
310}
311
312/// Revalidation queue.
313///
314/// Can be configured background (`new_background`)
315/// or immediate (just `new`).
316pub struct RevalidationQueue<Api: ChainApi> {
317	pool: Arc<Pool<Api>>,
318	api: Arc<Api>,
319	background: Option<TracingUnboundedSender<WorkerPayload<Api>>>,
320}
321
322impl<Api: ChainApi> RevalidationQueue<Api>
323where
324	Api: 'static,
325{
326	/// New revalidation queue without background worker.
327	pub fn new(api: Arc<Api>, pool: Arc<Pool<Api>>) -> Self {
328		Self { api, pool, background: None }
329	}
330
331	/// New revalidation queue with background worker.
332	pub fn new_with_interval(
333		api: Arc<Api>,
334		pool: Arc<Pool<Api>>,
335		interval: Duration,
336		best_block: BlockHash<Api>,
337	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
338		let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
339
340		let worker = RevalidationWorker::new(api.clone(), pool.clone(), best_block);
341
342		let queue = Self { api, pool, background: Some(to_worker) };
343
344		(queue, worker.run(from_queue, interval).boxed())
345	}
346
347	/// New revalidation queue with background worker.
348	pub fn new_background(
349		api: Arc<Api>,
350		pool: Arc<Pool<Api>>,
351		best_block: BlockHash<Api>,
352	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
353		Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL, best_block)
354	}
355
356	/// Queue some transaction for later revalidation.
357	///
358	/// If queue configured with background worker, this will return immediately.
359	/// If queue configured without background worker, this will resolve after
360	/// revalidation is actually done.
361	pub async fn revalidate_later(
362		&self,
363		at: BlockHash<Api>,
364		transactions: Vec<ExtrinsicHash<Api>>,
365	) {
366		if transactions.len() > 0 {
367			debug!(
368				target: LOG_TARGET,
369				transaction_count = transactions.len(),
370				"Sent transactions to revalidation queue."
371			);
372		}
373
374		if let Some(ref to_worker) = self.background {
375			if let Err(error) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
376				warn!(
377					target: LOG_TARGET,
378					?error,
379					"Failed to update background worker."
380				);
381			}
382		} else {
383			debug!(
384				target: LOG_TARGET,
385				"Batch revalidate direct call."
386			);
387			let pool = self.pool.clone();
388			let api = self.api.clone();
389			batch_revalidate(pool, api, at, transactions).await
390		}
391	}
392}
393
394#[cfg(test)]
395mod tests {
396	use super::*;
397	use crate::{
398		common::tests::{uxt, TestApi},
399		graph::Pool,
400		TimedTransactionSource,
401	};
402	use futures::executor::block_on;
403	use substrate_test_runtime::{AccountId, Transfer, H256};
404	use substrate_test_runtime_client::Sr25519Keyring::{Alice, Bob};
405
406	#[test]
407	fn revalidation_queue_works() {
408		let api = Arc::new(TestApi::default());
409		let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
410			Default::default(),
411			true.into(),
412			api.clone(),
413		));
414		let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
415
416		let uxt = uxt(Transfer {
417			from: Alice.into(),
418			to: AccountId::from_h256(H256::from_low_u64_be(2)),
419			amount: 5,
420			nonce: 0,
421		});
422
423		let han_of_block0 = api.expect_hash_and_number(0);
424
425		let uxt_hash = block_on(pool.submit_one(
426			&han_of_block0,
427			TimedTransactionSource::new_external(false),
428			uxt.clone().into(),
429		))
430		.expect("Should be valid")
431		.hash();
432
433		block_on(queue.revalidate_later(han_of_block0.hash, vec![uxt_hash]));
434
435		// revalidated in sync offload 2nd time
436		assert_eq!(api.validation_requests().len(), 2);
437		// number of ready
438		assert_eq!(pool.validated_pool().status().ready, 1);
439	}
440
441	#[test]
442	fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
443		let api = Arc::new(TestApi::default());
444		let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
445			Default::default(),
446			true.into(),
447			api.clone(),
448		));
449		let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
450
451		let uxt0 = uxt(Transfer {
452			from: Alice.into(),
453			to: AccountId::from_h256(H256::from_low_u64_be(2)),
454			amount: 5,
455			nonce: 0,
456		});
457		let uxt1 = uxt(Transfer {
458			from: Bob.into(),
459			to: AccountId::from_h256(H256::from_low_u64_be(2)),
460			amount: 4,
461			nonce: 1,
462		});
463
464		let han_of_block0 = api.expect_hash_and_number(0);
465		let unknown_block = H256::repeat_byte(0x13);
466
467		let source = TimedTransactionSource::new_external(false);
468		let uxt_hashes = block_on(pool.submit_at(
469			&han_of_block0,
470			vec![(source.clone(), uxt0.into()), (source, uxt1.into())],
471			ValidateTransactionPriority::Submitted,
472		))
473		.into_iter()
474		.map(|r| r.expect("Should be valid").hash())
475		.collect::<Vec<_>>();
476
477		assert_eq!(api.validation_requests().len(), 2);
478		assert_eq!(pool.validated_pool().status().ready, 2);
479
480		// revalidation works fine for block 0:
481		block_on(queue.revalidate_later(han_of_block0.hash, uxt_hashes.clone()));
482		assert_eq!(api.validation_requests().len(), 4);
483		assert_eq!(pool.validated_pool().status().ready, 2);
484
485		// revalidation shall be skipped for unknown block:
486		block_on(queue.revalidate_later(unknown_block, uxt_hashes));
487		// no revalidation shall be done
488		assert_eq!(api.validation_requests().len(), 4);
489		// number of ready shall not change
490		assert_eq!(pool.validated_pool().status().ready, 2);
491	}
492}