sc_transaction_pool/
revalidation.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Pool periodic revalidation.
20
21use std::{
22	collections::{BTreeMap, HashMap, HashSet},
23	pin::Pin,
24	sync::Arc,
25};
26
27use crate::{
28	graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction},
29	LOG_TARGET,
30};
31use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
32use sp_runtime::{
33	generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
34};
35
36use futures::prelude::*;
37use std::time::Duration;
38
39const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
40
41const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
42
43/// Payload from queue to worker.
44struct WorkerPayload<Api: ChainApi> {
45	at: BlockHash<Api>,
46	transactions: Vec<ExtrinsicHash<Api>>,
47}
48
49/// Async revalidation worker.
50///
51/// Implements future and can be spawned in place or in background.
52struct RevalidationWorker<Api: ChainApi> {
53	api: Arc<Api>,
54	pool: Arc<Pool<Api>>,
55	best_block: BlockHash<Api>,
56	block_ordered: BTreeMap<BlockHash<Api>, HashSet<ExtrinsicHash<Api>>>,
57	members: HashMap<ExtrinsicHash<Api>, BlockHash<Api>>,
58}
59
60impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
61
62/// Revalidate batch of transaction.
63///
64/// Each transaction is validated  against chain, and invalid are
65/// removed from the `pool`, while valid are resubmitted.
66async fn batch_revalidate<Api: ChainApi>(
67	pool: Arc<Pool<Api>>,
68	api: Arc<Api>,
69	at: BlockHash<Api>,
70	batch: impl IntoIterator<Item = ExtrinsicHash<Api>>,
71) {
72	// This conversion should work. Otherwise, for unknown block the revalidation shall be skipped,
73	// all the transactions will be kept in the validated pool, and can be scheduled for
74	// revalidation with the next request.
75	let block_number = match api.block_id_to_number(&BlockId::Hash(at)) {
76		Ok(Some(n)) => n,
77		Ok(None) => {
78			log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}, could not get block number.");
79			return
80		},
81		Err(e) => {
82			log::debug!(target: LOG_TARGET, "revalidation skipped at block {at:?}: {e:?}.");
83			return
84		},
85	};
86
87	let mut invalid_hashes = Vec::new();
88	let mut revalidated = HashMap::new();
89
90	let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
91		pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
92			api.validate_transaction(at, ext.source, ext.data.clone())
93				.map(move |validation_result| (validation_result, ext_hash, ext))
94		})
95	}))
96	.await;
97
98	for (validation_result, ext_hash, ext) in validation_results {
99		match validation_result {
100			Ok(Err(TransactionValidityError::Invalid(err))) => {
101				log::debug!(
102					target: LOG_TARGET,
103					"[{:?}]: Revalidation: invalid {:?}",
104					ext_hash,
105					err,
106				);
107				invalid_hashes.push(ext_hash);
108			},
109			Ok(Err(TransactionValidityError::Unknown(err))) => {
110				// skipping unknown, they might be pushed by valid or invalid transaction
111				// when latter resubmitted.
112				log::trace!(
113					target: LOG_TARGET,
114					"[{:?}]: Unknown during revalidation: {:?}",
115					ext_hash,
116					err,
117				);
118			},
119			Ok(Ok(validity)) => {
120				revalidated.insert(
121					ext_hash,
122					ValidatedTransaction::valid_at(
123						block_number.saturated_into::<u64>(),
124						ext_hash,
125						ext.source,
126						ext.data.clone(),
127						api.hash_and_length(&ext.data).1,
128						validity,
129					),
130				);
131			},
132			Err(validation_err) => {
133				log::debug!(
134					target: LOG_TARGET,
135					"[{:?}]: Removing due to error during revalidation: {}",
136					ext_hash,
137					validation_err
138				);
139				invalid_hashes.push(ext_hash);
140			},
141		}
142	}
143
144	pool.validated_pool().remove_invalid(&invalid_hashes);
145	if revalidated.len() > 0 {
146		pool.resubmit(revalidated);
147	}
148}
149
150impl<Api: ChainApi> RevalidationWorker<Api> {
151	fn new(api: Arc<Api>, pool: Arc<Pool<Api>>, best_block: BlockHash<Api>) -> Self {
152		Self {
153			api,
154			pool,
155			best_block,
156			block_ordered: Default::default(),
157			members: Default::default(),
158		}
159	}
160
161	fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
162		let mut queued_exts = Vec::new();
163		let mut left =
164			std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);
165
166		// Take maximum of count transaction by order
167		// which they got into the pool
168		while left > 0 {
169			let first_block = match self.block_ordered.keys().next().cloned() {
170				Some(bn) => bn,
171				None => break,
172			};
173			let mut block_drained = false;
174			if let Some(extrinsics) = self.block_ordered.get_mut(&first_block) {
175				let to_queue = extrinsics.iter().take(left).cloned().collect::<Vec<_>>();
176				if to_queue.len() == extrinsics.len() {
177					block_drained = true;
178				} else {
179					for xt in &to_queue {
180						extrinsics.remove(xt);
181					}
182				}
183				left -= to_queue.len();
184				queued_exts.extend(to_queue);
185			}
186
187			if block_drained {
188				self.block_ordered.remove(&first_block);
189			}
190		}
191
192		for hash in queued_exts.iter() {
193			self.members.remove(hash);
194		}
195
196		queued_exts
197	}
198
199	fn len(&self) -> usize {
200		self.block_ordered.iter().map(|b| b.1.len()).sum()
201	}
202
203	fn push(&mut self, worker_payload: WorkerPayload<Api>) {
204		// we don't add something that already scheduled for revalidation
205		let transactions = worker_payload.transactions;
206		let block_number = worker_payload.at;
207
208		for ext_hash in transactions {
209			// we don't add something that already scheduled for revalidation
210			if self.members.contains_key(&ext_hash) {
211				log::trace!(
212					target: LOG_TARGET,
213					"[{:?}] Skipped adding for revalidation: Already there.",
214					ext_hash,
215				);
216
217				continue
218			}
219
220			self.block_ordered
221				.entry(block_number)
222				.and_modify(|value| {
223					value.insert(ext_hash);
224				})
225				.or_insert_with(|| {
226					let mut bt = HashSet::new();
227					bt.insert(ext_hash);
228					bt
229				});
230			self.members.insert(ext_hash, block_number);
231		}
232	}
233
234	/// Background worker main loop.
235	///
236	/// It does two things: periodically tries to process some transactions
237	/// from the queue and also accepts messages to enqueue some more
238	/// transactions from the pool.
239	pub async fn run(
240		mut self,
241		from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
242		interval: Duration,
243	) {
244		let interval_fut = futures_timer::Delay::new(interval);
245		let from_queue = from_queue.fuse();
246		futures::pin_mut!(interval_fut, from_queue);
247		let this = &mut self;
248
249		loop {
250			futures::select! {
251				// Using `fuse()` in here is okay, because we reset the interval when it has fired.
252				_ = (&mut interval_fut).fuse() => {
253					let next_batch = this.prepare_batch();
254					let batch_len = next_batch.len();
255
256					batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
257
258					if batch_len > 0 || this.len() > 0 {
259						log::debug!(
260							target: LOG_TARGET,
261							"Revalidated {} transactions. Left in the queue for revalidation: {}.",
262							batch_len,
263							this.len(),
264						);
265					}
266
267					interval_fut.reset(interval);
268				},
269				workload = from_queue.next() => {
270					match workload {
271						Some(worker_payload) => {
272							this.best_block = worker_payload.at;
273							this.push(worker_payload);
274
275							if this.members.len() > 0 {
276								log::debug!(
277									target: LOG_TARGET,
278									"Updated revalidation queue at {:?}. Transactions: {:?}",
279									this.best_block,
280									this.members,
281								);
282							}
283
284							continue;
285						},
286						// R.I.P. worker!
287						None => break,
288					}
289				}
290			}
291		}
292	}
293}
294
295/// Revalidation queue.
296///
297/// Can be configured background (`new_background`)
298/// or immediate (just `new`).
299pub struct RevalidationQueue<Api: ChainApi> {
300	pool: Arc<Pool<Api>>,
301	api: Arc<Api>,
302	background: Option<TracingUnboundedSender<WorkerPayload<Api>>>,
303}
304
305impl<Api: ChainApi> RevalidationQueue<Api>
306where
307	Api: 'static,
308{
309	/// New revalidation queue without background worker.
310	pub fn new(api: Arc<Api>, pool: Arc<Pool<Api>>) -> Self {
311		Self { api, pool, background: None }
312	}
313
314	/// New revalidation queue with background worker.
315	pub fn new_with_interval(
316		api: Arc<Api>,
317		pool: Arc<Pool<Api>>,
318		interval: Duration,
319		best_block: BlockHash<Api>,
320	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
321		let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
322
323		let worker = RevalidationWorker::new(api.clone(), pool.clone(), best_block);
324
325		let queue = Self { api, pool, background: Some(to_worker) };
326
327		(queue, worker.run(from_queue, interval).boxed())
328	}
329
330	/// New revalidation queue with background worker.
331	pub fn new_background(
332		api: Arc<Api>,
333		pool: Arc<Pool<Api>>,
334		best_block: BlockHash<Api>,
335	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
336		Self::new_with_interval(api, pool, BACKGROUND_REVALIDATION_INTERVAL, best_block)
337	}
338
339	/// Queue some transaction for later revalidation.
340	///
341	/// If queue configured with background worker, this will return immediately.
342	/// If queue configured without background worker, this will resolve after
343	/// revalidation is actually done.
344	pub async fn revalidate_later(
345		&self,
346		at: BlockHash<Api>,
347		transactions: Vec<ExtrinsicHash<Api>>,
348	) {
349		if transactions.len() > 0 {
350			log::debug!(
351				target: LOG_TARGET,
352				"Sent {} transactions to revalidation queue",
353				transactions.len(),
354			);
355		}
356
357		if let Some(ref to_worker) = self.background {
358			if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
359				log::warn!(target: LOG_TARGET, "Failed to update background worker: {:?}", e);
360			}
361		} else {
362			let pool = self.pool.clone();
363			let api = self.api.clone();
364			batch_revalidate(pool, api, at, transactions).await
365		}
366	}
367}
368
369#[cfg(test)]
370mod tests {
371	use super::*;
372	use crate::{
373		graph::Pool,
374		tests::{uxt, TestApi},
375	};
376	use futures::executor::block_on;
377	use sc_transaction_pool_api::TransactionSource;
378	use substrate_test_runtime::{AccountId, Transfer, H256};
379	use substrate_test_runtime_client::AccountKeyring::{Alice, Bob};
380
381	#[test]
382	fn revalidation_queue_works() {
383		let api = Arc::new(TestApi::default());
384		let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
385		let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
386
387		let uxt = uxt(Transfer {
388			from: Alice.into(),
389			to: AccountId::from_h256(H256::from_low_u64_be(2)),
390			amount: 5,
391			nonce: 0,
392		});
393
394		let hash_of_block0 = api.expect_hash_from_number(0);
395
396		let uxt_hash =
397			block_on(pool.submit_one(hash_of_block0, TransactionSource::External, uxt.clone()))
398				.expect("Should be valid");
399
400		block_on(queue.revalidate_later(hash_of_block0, vec![uxt_hash]));
401
402		// revalidated in sync offload 2nd time
403		assert_eq!(api.validation_requests().len(), 2);
404		// number of ready
405		assert_eq!(pool.validated_pool().status().ready, 1);
406	}
407
408	#[test]
409	fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
410		let api = Arc::new(TestApi::default());
411		let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
412		let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));
413
414		let uxt0 = uxt(Transfer {
415			from: Alice.into(),
416			to: AccountId::from_h256(H256::from_low_u64_be(2)),
417			amount: 5,
418			nonce: 0,
419		});
420		let uxt1 = uxt(Transfer {
421			from: Bob.into(),
422			to: AccountId::from_h256(H256::from_low_u64_be(2)),
423			amount: 4,
424			nonce: 1,
425		});
426
427		let hash_of_block0 = api.expect_hash_from_number(0);
428		let unknown_block = H256::repeat_byte(0x13);
429
430		let uxt_hashes =
431			block_on(pool.submit_at(hash_of_block0, TransactionSource::External, vec![uxt0, uxt1]))
432				.expect("Should be valid")
433				.into_iter()
434				.map(|r| r.expect("Should be valid"))
435				.collect::<Vec<_>>();
436
437		assert_eq!(api.validation_requests().len(), 2);
438		assert_eq!(pool.validated_pool().status().ready, 2);
439
440		// revalidation works fine for block 0:
441		block_on(queue.revalidate_later(hash_of_block0, uxt_hashes.clone()));
442		assert_eq!(api.validation_requests().len(), 4);
443		assert_eq!(pool.validated_pool().status().ready, 2);
444
445		// revalidation shall be skipped for unknown block:
446		block_on(queue.revalidate_later(unknown_block, uxt_hashes));
447		// no revalidation shall be done
448		assert_eq!(api.validation_requests().len(), 4);
449		// number of ready shall not change
450		assert_eq!(pool.validated_pool().status().ready, 2);
451	}
452}