sc_transaction_pool/fork_aware_txpool/
revalidation_worker.rs1use std::{marker::PhantomData, pin::Pin, sync::Arc};
25
26use crate::{graph::ChainApi, LOG_TARGET};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28use sp_blockchain::HashAndNumber;
29use sp_runtime::traits::Block as BlockT;
30
31use super::{tx_mem_pool::TxMemPool, view_store::ViewStore};
32use futures::prelude::*;
33use tracing::{debug, warn};
34
35use super::view::{FinishRevalidationWorkerChannels, View};
36
37enum WorkerPayload<Api, Block>
39where
40 Block: BlockT,
41 Api: ChainApi<Block = Block> + 'static,
42{
43 RevalidateView(Arc<View<Api>>, FinishRevalidationWorkerChannels<Api>),
47 RevalidateMempool(Arc<TxMemPool<Api, Block>>, Arc<ViewStore<Api, Block>>, HashAndNumber<Block>),
49}
50
51struct RevalidationWorker<Block: BlockT> {
53 _phantom: PhantomData<Block>,
54}
55
56impl<Block> RevalidationWorker<Block>
57where
58 Block: BlockT,
59 <Block as BlockT>::Hash: Unpin,
60{
61 fn new() -> Self {
63 Self { _phantom: Default::default() }
64 }
65
66 pub async fn run<Api: ChainApi<Block = Block> + 'static>(
71 self,
72 from_queue: TracingUnboundedReceiver<WorkerPayload<Api, Block>>,
73 ) {
74 let mut from_queue = from_queue.fuse();
75
76 loop {
77 let Some(payload) = from_queue.next().await else {
78 break;
80 };
81 match payload {
82 WorkerPayload::RevalidateView(view, worker_channels) =>
83 view.revalidate(worker_channels).await,
84 WorkerPayload::RevalidateMempool(
85 mempool,
86 view_store,
87 finalized_hash_and_number,
88 ) => mempool.revalidate(view_store, finalized_hash_and_number).await,
89 };
90 }
91 }
92}
93
94pub struct RevalidationQueue<Api, Block>
98where
99 Api: ChainApi<Block = Block> + 'static,
100 Block: BlockT,
101{
102 background: Option<TracingUnboundedSender<WorkerPayload<Api, Block>>>,
103}
104
105impl<Api, Block> RevalidationQueue<Api, Block>
106where
107 Api: ChainApi<Block = Block> + 'static,
108 Block: BlockT,
109 <Block as BlockT>::Hash: Unpin,
110{
111 pub fn new() -> Self {
115 Self { background: None }
116 }
117
118 pub fn new_with_worker() -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
122 let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);
123 (Self { background: Some(to_worker) }, RevalidationWorker::new().run(from_queue).boxed())
124 }
125
126 pub async fn revalidate_view(
134 &self,
135 view: Arc<View<Api>>,
136 finish_revalidation_worker_channels: FinishRevalidationWorkerChannels<Api>,
137 ) {
138 debug!(
139 target: LOG_TARGET,
140 view_at_hash = ?view.at.hash,
141 "revalidation_queue::revalidate_view: Sending view to revalidation queue"
142 );
143
144 if let Some(ref to_worker) = self.background {
145 if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateView(
146 view,
147 finish_revalidation_worker_channels,
148 )) {
149 warn!(
150 target: LOG_TARGET,
151 ?error,
152 "revalidation_queue::revalidate_view: Failed to update background worker"
153 );
154 }
155 } else {
156 view.revalidate(finish_revalidation_worker_channels).await
157 }
158 }
159
160 pub async fn revalidate_mempool(
168 &self,
169 mempool: Arc<TxMemPool<Api, Block>>,
170 view_store: Arc<ViewStore<Api, Block>>,
171 finalized_hash: HashAndNumber<Block>,
172 ) {
173 debug!(
174 target: LOG_TARGET,
175 ?finalized_hash,
176 "Sent mempool to revalidation queue"
177 );
178
179 if let Some(ref to_worker) = self.background {
180 if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool(
181 mempool,
182 view_store,
183 finalized_hash,
184 )) {
185 warn!(
186 target: LOG_TARGET,
187 ?error,
188 "Failed to update background worker"
189 );
190 }
191 } else {
192 mempool.revalidate(view_store, finalized_hash).await
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
200 use super::*;
201 use crate::{
202 common::tests::{uxt, TestApi},
203 fork_aware_txpool::view::FinishRevalidationLocalChannels,
204 TimedTransactionSource, ValidateTransactionPriority,
205 };
206 use futures::executor::block_on;
207 use substrate_test_runtime::{AccountId, Transfer, H256};
208 use substrate_test_runtime_client::Sr25519Keyring::Alice;
209 #[test]
210 fn revalidation_queue_works() {
211 let api = Arc::new(TestApi::default());
212 let block0 = api.expect_hash_and_number(0);
213
214 let view = Arc::new(
215 View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0,
216 );
217 let queue = Arc::new(RevalidationQueue::new());
218
219 let uxt = uxt(Transfer {
220 from: Alice.into(),
221 to: AccountId::from_h256(H256::from_low_u64_be(2)),
222 amount: 5,
223 nonce: 0,
224 });
225
226 let _ = block_on(view.submit_many(
227 std::iter::once((TimedTransactionSource::new_external(false), uxt.clone().into())),
228 ValidateTransactionPriority::Submitted,
229 ));
230 assert_eq!(api.validation_requests().len(), 1);
231
232 let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
233 tokio::sync::mpsc::channel(1);
234 let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1);
235
236 let finish_revalidation_worker_channels = FinishRevalidationWorkerChannels::new(
237 finish_revalidation_request_rx,
238 revalidation_result_tx,
239 );
240
241 let _finish_revalidation_local_channels = FinishRevalidationLocalChannels::new(
242 finish_revalidation_request_tx,
243 revalidation_result_rx,
244 );
245
246 block_on(queue.revalidate_view(view.clone(), finish_revalidation_worker_channels));
247
248 assert_eq!(api.validation_requests().len(), 2);
249 assert_eq!(view.status().ready, 1);
251 }
252}