referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
revalidation_worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! The background worker for the [`View`] and [`TxMemPool`] revalidation.
20//!
21//! The [*Background tasks*](../index.html#background-tasks) section provides some extra details on
22//! revalidation process.
23
24use std::{marker::PhantomData, pin::Pin, sync::Arc};
25
26use crate::{graph::ChainApi, LOG_TARGET};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28use sp_blockchain::HashAndNumber;
29use sp_runtime::traits::Block as BlockT;
30
31use super::{tx_mem_pool::TxMemPool, view_store::ViewStore};
32use futures::prelude::*;
33use tracing::{debug, warn};
34
35use super::view::{FinishRevalidationWorkerChannels, View};
36
37/// Revalidation request payload sent from the queue to the worker.
38enum WorkerPayload<Api, Block>
39where
40	Block: BlockT,
41	Api: ChainApi<Block = Block> + 'static,
42{
43	/// Request to revalidated the given instance of the [`View`]
44	///
45	/// Communication channels with maintain thread are also provided.
46	RevalidateView(Arc<View<Api>>, FinishRevalidationWorkerChannels<Api>),
47	/// Request to revalidated the given instance of the [`TxMemPool`] at provided block hash.
48	RevalidateMempool(Arc<TxMemPool<Api, Block>>, Arc<ViewStore<Api, Block>>, HashAndNumber<Block>),
49}
50
51/// The background revalidation worker.
52struct RevalidationWorker<Block: BlockT> {
53	_phantom: PhantomData<Block>,
54}
55
56impl<Block> RevalidationWorker<Block>
57where
58	Block: BlockT,
59	<Block as BlockT>::Hash: Unpin,
60{
61	/// Create a new instance of the background worker.
62	fn new() -> Self {
63		Self { _phantom: Default::default() }
64	}
65
66	/// A background worker main loop.
67	///
68	/// Waits for and dispatches the [`WorkerPayload`] messages sent from the
69	/// [`RevalidationQueue`].
70	pub async fn run<Api: ChainApi<Block = Block> + 'static>(
71		self,
72		from_queue: TracingUnboundedReceiver<WorkerPayload<Api, Block>>,
73	) {
74		let mut from_queue = from_queue.fuse();
75
76		loop {
77			let Some(payload) = from_queue.next().await else {
78				// R.I.P. worker!
79				break;
80			};
81			match payload {
82				WorkerPayload::RevalidateView(view, worker_channels) =>
83					view.revalidate(worker_channels).await,
84				WorkerPayload::RevalidateMempool(
85					mempool,
86					view_store,
87					finalized_hash_and_number,
88				) => mempool.revalidate(view_store, finalized_hash_and_number).await,
89			};
90		}
91	}
92}
93
94/// A Revalidation queue.
95///
96/// Allows to send the revalidation requests to the [`RevalidationWorker`].
97pub struct RevalidationQueue<Api, Block>
98where
99	Api: ChainApi<Block = Block> + 'static,
100	Block: BlockT,
101{
102	background: Option<TracingUnboundedSender<WorkerPayload<Api, Block>>>,
103}
104
105impl<Api, Block> RevalidationQueue<Api, Block>
106where
107	Api: ChainApi<Block = Block> + 'static,
108	Block: BlockT,
109	<Block as BlockT>::Hash: Unpin,
110{
111	/// New revalidation queue without background worker.
112	///
113	/// All validation requests will be blocking.
114	pub fn new() -> Self {
115		Self { background: None }
116	}
117
118	/// New revalidation queue with background worker.
119	///
120	/// All validation requests will be executed in the background.
121	pub fn new_with_worker() -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
122		let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
123		(Self { background: Some(to_worker) }, RevalidationWorker::new().run(from_queue).boxed())
124	}
125
126	/// Queue the view for later revalidation.
127	///
128	/// If the queue is configured with background worker, this will return immediately.
129	/// If the queue is configured without background worker, this will resolve after
130	/// revalidation is actually done.
131	///
132	/// Schedules execution of the [`View::revalidate`].
133	pub async fn revalidate_view(
134		&self,
135		view: Arc<View<Api>>,
136		finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<Api>,
137	) {
138		debug!(
139			target: LOG_TARGET,
140			view_at_hash = ?view.at.hash,
141			"revalidation_queue::revalidate_view: Sending view to revalidation queue"
142		);
143
144		if let Some(ref to_worker) = self.background {
145			if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateView(
146				view,
147				finish_revalidation_worker_channels,
148			)) {
149				warn!(
150					target: LOG_TARGET,
151					?error,
152					"revalidation_queue::revalidate_view: Failed to update background worker"
153				);
154			}
155		} else {
156			view.revalidate(finish_revalidation_worker_channels).await
157		}
158	}
159
160	/// Revalidates the given mempool instance.
161	///
162	/// If queue configured with background worker, this will return immediately.
163	/// If queue configured without background worker, this will resolve after
164	/// revalidation is actually done.
165	///
166	/// Schedules execution of the [`TxMemPool::revalidate`].
167	pub async fn revalidate_mempool(
168		&self,
169		mempool: Arc<TxMemPool<Api, Block>>,
170		view_store: Arc<ViewStore<Api, Block>>,
171		finalized_hash: HashAndNumber<Block>,
172	) {
173		debug!(
174			target: LOG_TARGET,
175			?finalized_hash,
176			"Sent mempool to revalidation queue"
177		);
178
179		if let Some(ref to_worker) = self.background {
180			if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool(
181				mempool,
182				view_store,
183				finalized_hash,
184			)) {
185				warn!(
186					target: LOG_TARGET,
187					?error,
188					"Failed to update background worker"
189				);
190			}
191		} else {
192			mempool.revalidate(view_store, finalized_hash).await
193		}
194	}
195}
196
197#[cfg(test)]
198//todo: add more tests [#5480]
199mod tests {
200	use super::*;
201	use crate::{
202		common::tests::{uxt, TestApi},
203		fork_aware_txpool::view::FinishRevalidationLocalChannels,
204		TimedTransactionSource, ValidateTransactionPriority,
205	};
206	use futures::executor::block_on;
207	use substrate_test_runtime::{AccountId, Transfer, H256};
208	use substrate_test_runtime_client::Sr25519Keyring::Alice;
209	#[test]
210	fn revalidation_queue_works() {
211		let api = Arc::new(TestApi::default());
212		let block0 = api.expect_hash_and_number(0);
213
214		let view = Arc::new(
215			View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0,
216		);
217		let queue = Arc::new(RevalidationQueue::new());
218
219		let uxt = uxt(Transfer {
220			from: Alice.into(),
221			to: AccountId::from_h256(H256::from_low_u64_be(2)),
222			amount: 5,
223			nonce: 0,
224		});
225
226		let _ = block_on(view.submit_many(
227			std::iter::once((TimedTransactionSource::new_external(false), uxt.clone().into())),
228			ValidateTransactionPriority::Submitted,
229		));
230		assert_eq!(api.validation_requests().len(), 1);
231
232		let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
233			tokio::sync::mpsc::channel(1);
234		let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
235
236		let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
237			finish_revalidation_request_rx,
238			revalidation_result_tx,
239		);
240
241		let _finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
242			finish_revalidation_request_tx,
243			revalidation_result_rx,
244		);
245
246		block_on(queue.revalidate_view(view.clone(), finish_revalidation_worker_channels));
247
248		assert_eq!(api.validation_requests().len(), 2);
249		// number of ready
250		assert_eq!(view.status().ready, 1);
251	}
252}