1use std::{
20 collections::{HashMap, HashSet},
21 hash,
22 sync::Arc,
23};
24
25use crate::LOG_TARGET;
26use futures::channel::mpsc::{channel, Sender};
27use parking_lot::{Mutex, RwLock};
28use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
29use serde::Serialize;
30use sp_runtime::{
31 generic::BlockId,
32 traits::{self, SaturatedConversion},
33 transaction_validity::{TransactionSource, TransactionTag as Tag, ValidTransaction},
34};
35use std::time::Instant;
36
37use super::{
38 base_pool::{self as base, PruneStatus},
39 listener::Listener,
40 pool::{
41 BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
42 },
43 rotator::PoolRotator,
44 watcher::Watcher,
45};
46
47#[derive(Debug)]
49pub enum ValidatedTransaction<Hash, Ex, Error> {
50 Valid(base::Transaction<Hash, Ex>),
52 Invalid(Hash, Error),
54 Unknown(Hash, Error),
58}
59
60impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
61 pub fn valid_at(
63 at: u64,
64 hash: Hash,
65 source: TransactionSource,
66 data: Ex,
67 bytes: usize,
68 validity: ValidTransaction,
69 ) -> Self {
70 Self::Valid(base::Transaction {
71 data,
72 bytes,
73 hash,
74 source,
75 priority: validity.priority,
76 requires: validity.requires,
77 provides: validity.provides,
78 propagate: validity.propagate,
79 valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
80 })
81 }
82}
83
84pub type ValidatedTransactionFor<B> =
86 ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
87
88pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
90
91impl From<bool> for IsValidator {
92 fn from(is_validator: bool) -> Self {
93 Self(Box::new(move || is_validator))
94 }
95}
96
97impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
98 fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
99 Self(is_validator)
100 }
101}
102
103pub struct ValidatedPool<B: ChainApi> {
105 api: Arc<B>,
106 is_validator: IsValidator,
107 options: Options,
108 listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
109 pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
110 import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
111 rotator: PoolRotator<ExtrinsicHash<B>>,
112}
113
114impl<B: ChainApi> ValidatedPool<B> {
115 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
117 let base_pool = base::BasePool::new(options.reject_future_transactions);
118 let ban_time = options.ban_time;
119 Self {
120 is_validator,
121 options,
122 listener: Default::default(),
123 api,
124 pool: RwLock::new(base_pool),
125 import_notification_sinks: Default::default(),
126 rotator: PoolRotator::new(ban_time),
127 }
128 }
129
130 pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item = ExtrinsicHash<B>>) {
132 self.rotator.ban(now, hashes)
133 }
134
135 pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
137 self.rotator.is_banned(hash)
138 }
139
140 pub fn check_is_known(
146 &self,
147 tx_hash: &ExtrinsicHash<B>,
148 ignore_banned: bool,
149 ) -> Result<(), B::Error> {
150 if !ignore_banned && self.is_banned(tx_hash) {
151 Err(error::Error::TemporarilyBanned.into())
152 } else if self.pool.read().is_imported(tx_hash) {
153 Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
154 } else {
155 Ok(())
156 }
157 }
158
159 pub fn submit(
161 &self,
162 txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
163 ) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
164 let results = txs
165 .into_iter()
166 .map(|validated_tx| self.submit_one(validated_tx))
167 .collect::<Vec<_>>();
168
169 let removed = if results.iter().any(|res| res.is_ok()) {
171 self.enforce_limits()
172 } else {
173 Default::default()
174 };
175
176 results
177 .into_iter()
178 .map(|res| match res {
179 Ok(ref hash) if removed.contains(hash) =>
180 Err(error::Error::ImmediatelyDropped.into()),
181 other => other,
182 })
183 .collect()
184 }
185
186 fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
188 match tx {
189 ValidatedTransaction::Valid(tx) => {
190 if !tx.propagate && !(self.is_validator.0)() {
191 return Err(error::Error::Unactionable.into())
192 }
193
194 let imported = self.pool.write().import(tx)?;
195
196 if let base::Imported::Ready { ref hash, .. } = imported {
197 let sinks = &mut self.import_notification_sinks.lock();
198 sinks.retain_mut(|sink| match sink.try_send(*hash) {
199 Ok(()) => true,
200 Err(e) =>
201 if e.is_full() {
202 log::warn!(
203 target: LOG_TARGET,
204 "[{:?}] Trying to notify an import but the channel is full",
205 hash,
206 );
207 true
208 } else {
209 false
210 },
211 });
212 }
213
214 let mut listener = self.listener.write();
215 fire_events(&mut *listener, &imported);
216 Ok(*imported.hash())
217 },
218 ValidatedTransaction::Invalid(hash, err) => {
219 self.rotator.ban(&Instant::now(), std::iter::once(hash));
220 Err(err)
221 },
222 ValidatedTransaction::Unknown(hash, err) => {
223 self.listener.write().invalid(&hash);
224 Err(err)
225 },
226 }
227 }
228
229 fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
230 let status = self.pool.read().status();
231 let ready_limit = &self.options.ready;
232 let future_limit = &self.options.future;
233
234 log::debug!(target: LOG_TARGET, "Pool Status: {:?}", status);
235 if ready_limit.is_exceeded(status.ready, status.ready_bytes) ||
236 future_limit.is_exceeded(status.future, status.future_bytes)
237 {
238 log::debug!(
239 target: LOG_TARGET,
240 "Enforcing limits ({}/{}kB ready, {}/{}kB future",
241 ready_limit.count,
242 ready_limit.total_bytes / 1024,
243 future_limit.count,
244 future_limit.total_bytes / 1024,
245 );
246
247 let removed = {
249 let mut pool = self.pool.write();
250 let removed = pool
251 .enforce_limits(ready_limit, future_limit)
252 .into_iter()
253 .map(|x| x.hash)
254 .collect::<HashSet<_>>();
255 self.rotator.ban(&Instant::now(), removed.iter().copied());
257 removed
258 };
259 if !removed.is_empty() {
260 log::debug!(target: LOG_TARGET, "Enforcing limits: {} dropped", removed.len());
261 }
262
263 let mut listener = self.listener.write();
265 for h in &removed {
266 listener.dropped(h, None);
267 }
268
269 removed
270 } else {
271 Default::default()
272 }
273 }
274
275 pub fn submit_and_watch(
277 &self,
278 tx: ValidatedTransactionFor<B>,
279 ) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
280 match tx {
281 ValidatedTransaction::Valid(tx) => {
282 let hash = self.api.hash_and_length(&tx.data).0;
283 let watcher = self.listener.write().create_watcher(hash);
284 self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
285 .pop()
286 .expect("One extrinsic passed; one result returned; qed")
287 .map(|_| watcher)
288 },
289 ValidatedTransaction::Invalid(hash, err) => {
290 self.rotator.ban(&Instant::now(), std::iter::once(hash));
291 Err(err)
292 },
293 ValidatedTransaction::Unknown(_, err) => Err(err),
294 }
295 }
296
297 pub fn resubmit(
302 &self,
303 mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
304 ) {
305 #[derive(Debug, Clone, Copy, PartialEq)]
306 enum Status {
307 Future,
308 Ready,
309 Failed,
310 Dropped,
311 }
312
313 let (mut initial_statuses, final_statuses) = {
314 let mut pool = self.pool.write();
315
316 let mut initial_statuses = HashMap::new();
324 let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
325 while !updated_transactions.is_empty() {
326 let hash = updated_transactions
327 .keys()
328 .next()
329 .cloned()
330 .expect("transactions is not empty; qed");
331
332 let removed = pool.remove_subtree(&[hash]);
336 for removed_tx in removed {
337 let removed_hash = removed_tx.hash;
338 let updated_transaction = updated_transactions.remove(&removed_hash);
339 let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
340 updated_tx
341 } else {
342 let transaction = match Arc::try_unwrap(removed_tx) {
345 Ok(transaction) => transaction,
346 Err(transaction) => transaction.duplicate(),
347 };
348 ValidatedTransaction::Valid(transaction)
349 };
350
351 initial_statuses.insert(removed_hash, Status::Ready);
352 txs_to_resubmit.push((removed_hash, tx_to_resubmit));
353 }
354 updated_transactions.remove(&hash);
356 }
357
358 pool.with_futures_enabled(|pool, reject_future_transactions| {
363 let mut final_statuses = HashMap::new();
365 for (hash, tx_to_resubmit) in txs_to_resubmit {
366 match tx_to_resubmit {
367 ValidatedTransaction::Valid(tx) => match pool.import(tx) {
368 Ok(imported) => match imported {
369 base::Imported::Ready { promoted, failed, removed, .. } => {
370 final_statuses.insert(hash, Status::Ready);
371 for hash in promoted {
372 final_statuses.insert(hash, Status::Ready);
373 }
374 for hash in failed {
375 final_statuses.insert(hash, Status::Failed);
376 }
377 for tx in removed {
378 final_statuses.insert(tx.hash, Status::Dropped);
379 }
380 },
381 base::Imported::Future { .. } => {
382 final_statuses.insert(hash, Status::Future);
383 },
384 },
385 Err(err) => {
386 log::warn!(
391 target: LOG_TARGET,
392 "[{:?}] Removing invalid transaction from update: {}",
393 hash,
394 err,
395 );
396 final_statuses.insert(hash, Status::Failed);
397 },
398 },
399 ValidatedTransaction::Invalid(_, _) |
400 ValidatedTransaction::Unknown(_, _) => {
401 final_statuses.insert(hash, Status::Failed);
402 },
403 }
404 }
405
406 if reject_future_transactions {
409 for future_tx in pool.clear_future() {
410 final_statuses.insert(future_tx.hash, Status::Dropped);
411 }
412 }
413
414 (initial_statuses, final_statuses)
415 })
416 };
417
418 let mut listener = self.listener.write();
420 for (hash, final_status) in final_statuses {
421 let initial_status = initial_statuses.remove(&hash);
422 if initial_status.is_none() || Some(final_status) != initial_status {
423 match final_status {
424 Status::Future => listener.future(&hash),
425 Status::Ready => listener.ready(&hash, None),
426 Status::Dropped => listener.dropped(&hash, None),
427 Status::Failed => listener.invalid(&hash),
428 }
429 }
430 }
431 }
432
433 pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
435 self.pool
436 .read()
437 .by_hashes(hashes)
438 .into_iter()
439 .map(|existing_in_pool| {
440 existing_in_pool.map(|transaction| transaction.provides.to_vec())
441 })
442 .collect()
443 }
444
445 pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
447 self.pool.read().ready_by_hash(hash)
448 }
449
450 pub fn prune_tags(
452 &self,
453 tags: impl IntoIterator<Item = Tag>,
454 ) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
455 let status = self.pool.write().prune_tags(tags);
457 {
460 let mut listener = self.listener.write();
461 for promoted in &status.promoted {
462 fire_events(&mut *listener, promoted);
463 }
464 for f in &status.failed {
465 listener.dropped(f, None);
466 }
467 }
468
469 Ok(status)
470 }
471
472 pub fn resubmit_pruned(
474 &self,
475 at: &BlockId<B::Block>,
476 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
477 pruned_hashes: Vec<ExtrinsicHash<B>>,
478 pruned_xts: Vec<ValidatedTransactionFor<B>>,
479 ) -> Result<(), B::Error> {
480 debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
481
482 let results = self.submit(pruned_xts);
484
485 let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| {
488 match r.map_err(error::IntoPoolError::into_pool_error) {
489 Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
490 _ => None,
491 }
492 });
493 let hashes = hashes.chain(known_imported_hashes.into_iter());
496 self.fire_pruned(at, hashes)?;
497
498 self.clear_stale(at)?;
501 Ok(())
502 }
503
504 pub fn fire_pruned(
506 &self,
507 at: &BlockId<B::Block>,
508 hashes: impl Iterator<Item = ExtrinsicHash<B>>,
509 ) -> Result<(), B::Error> {
510 let header_hash = self
511 .api
512 .block_id_to_hash(at)?
513 .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?;
514 let mut listener = self.listener.write();
515 let mut set = HashSet::with_capacity(hashes.size_hint().0);
516 for h in hashes {
517 if !set.contains(&h) {
520 listener.pruned(header_hash, &h);
521 set.insert(h);
522 }
523 }
524 Ok(())
525 }
526
527 pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
533 let block_number = self
534 .api
535 .block_id_to_number(at)?
536 .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?
537 .saturated_into::<u64>();
538 let now = Instant::now();
539 let to_remove = {
540 self.ready()
541 .filter(|tx| self.rotator.ban_if_stale(&now, block_number, tx))
542 .map(|tx| tx.hash)
543 .collect::<Vec<_>>()
544 };
545 let futures_to_remove: Vec<ExtrinsicHash<B>> = {
546 let p = self.pool.read();
547 let mut hashes = Vec::new();
548 for tx in p.futures() {
549 if self.rotator.ban_if_stale(&now, block_number, tx) {
550 hashes.push(tx.hash);
551 }
552 }
553 hashes
554 };
555 self.remove_invalid(&to_remove);
557 self.remove_invalid(&futures_to_remove);
558 self.rotator.clear_timeouts(&now);
560
561 Ok(())
562 }
563
564 pub fn api(&self) -> &B {
566 &self.api
567 }
568
569 pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
574 const CHANNEL_BUFFER_SIZE: usize = 1024;
575
576 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
577 self.import_notification_sinks.lock().push(sink);
578 stream
579 }
580
581 pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
583 let mut listener = self.listener.write();
584 for (hash, peers) in propagated.into_iter() {
585 listener.broadcasted(&hash, peers);
586 }
587 }
588
589 pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
596 if hashes.is_empty() {
598 return vec![]
599 }
600
601 log::debug!(target: LOG_TARGET, "Removing invalid transactions: {:?}", hashes);
602
603 self.rotator.ban(&Instant::now(), hashes.iter().cloned());
605
606 let invalid = self.pool.write().remove_subtree(hashes);
607
608 log::debug!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid);
609
610 let mut listener = self.listener.write();
611 for tx in &invalid {
612 listener.invalid(&tx.hash);
613 }
614
615 invalid
616 }
617
618 pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
620 self.pool.read().ready()
621 }
622
623 pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
625 self.pool.read().futures().map(|tx| (tx.hash, tx.data.clone())).collect()
626 }
627
628 pub fn status(&self) -> PoolStatus {
630 self.pool.read().status()
631 }
632
633 pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
635 log::trace!(
636 target: LOG_TARGET,
637 "Attempting to notify watchers of finalization for {}",
638 block_hash,
639 );
640 self.listener.write().finalized(block_hash);
641 Ok(())
642 }
643
644 pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
646 self.listener.write().retracted(block_hash)
647 }
648}
649
650fn fire_events<H, B, Ex>(listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>)
651where
652 H: hash::Hash + Eq + traits::Member + Serialize,
653 B: ChainApi,
654{
655 match *imported {
656 base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
657 listener.ready(hash, None);
658 failed.iter().for_each(|f| listener.invalid(f));
659 removed.iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
660 promoted.iter().for_each(|p| listener.ready(p, None));
661 },
662 base::Imported::Future { ref hash } => listener.future(hash),
663 }
664}