sc_transaction_pool_api/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool client facing API.
20#![warn(missing_docs)]
21
22pub mod error;
23
24use async_trait::async_trait;
25use codec::Codec;
26use futures::{Future, Stream};
27use serde::{de::DeserializeOwned, Deserialize, Serialize};
28use sp_core::offchain::TransactionPoolExt;
29use sp_runtime::traits::{Block as BlockT, Member, NumberFor};
30use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
31
32const LOG_TARGET: &str = "txpool::api";
33
34pub use sp_runtime::transaction_validity::{
35	TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
36};
37
38/// Transaction pool status.
39#[derive(Debug)]
40pub struct PoolStatus {
41	/// Number of transactions in the ready queue.
42	pub ready: usize,
43	/// Sum of bytes of ready transaction encodings.
44	pub ready_bytes: usize,
45	/// Number of transactions in the future queue.
46	pub future: usize,
47	/// Sum of bytes of ready transaction encodings.
48	pub future_bytes: usize,
49}
50
51impl PoolStatus {
52	/// Returns true if the are no transactions in the pool.
53	pub fn is_empty(&self) -> bool {
54		self.ready == 0 && self.future == 0
55	}
56}
57
58/// Possible transaction status events.
59///
60/// This events are being emitted by `TransactionPool` watchers,
61/// which are also exposed over RPC.
62///
63/// The status events can be grouped based on their kinds as:
64/// 1. Entering/Moving within the pool:
65/// 		- [Future](TransactionStatus::Future)
66/// 		- [Ready](TransactionStatus::Ready)
67/// 2. Inside `Ready` queue:
68/// 		- [Broadcast](TransactionStatus::Broadcast)
69/// 3. Leaving the pool:
70/// 		- [InBlock](TransactionStatus::InBlock)
71/// 		- [Invalid](TransactionStatus::Invalid)
72/// 		- [Usurped](TransactionStatus::Usurped)
73/// 		- [Dropped](TransactionStatus::Dropped)
74/// 	4. Re-entering the pool:
75/// 		- [Retracted](TransactionStatus::Retracted)
76/// 	5. Block finalized:
77/// 		- [Finalized](TransactionStatus::Finalized)
78/// 		- [FinalityTimeout](TransactionStatus::FinalityTimeout)
79///
80/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
81/// Substrate validates the transaction before it enters the pool.
82///
83/// A transaction is placed in the `Future` queue if it will become valid at a future time.
84/// For example, submitting a transaction with a higher account nonce than the current
85/// expected nonce will place the transaction in the `Future` queue.
86///
87/// The events will always be received in the order described above, however
88/// there might be cases where transactions alternate between `Future` and `Ready`
89/// pool, and are `Broadcast` in the meantime.
90///
91/// There is also only single event causing the transaction to leave the pool.
92/// I.e. only one of the listed ones should be triggered.
93///
94/// Note that there are conditions that may cause transactions to reappear in the pool.
95/// 1. Due to possible forks, the transaction that ends up being in included
96/// in one block, may later re-enter the pool or be marked as invalid.
97/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
98/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
99/// resubmitted.
100/// 3. `Invalid` transaction may become valid at some point in the future.
101/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
102/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
103/// 4. `Retracted` transactions might be included in some next block.
104///
105/// The `FinalityTimeout` event will be emitted when the block did not reach finality
106/// within 512 blocks. This either indicates that finality is not available for your chain,
107/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
108/// re-subscribe for a particular transaction hash manually again.
109///
110/// ### Last Event
111///
112/// The stream is considered finished when one of the following events happen:
113/// - [Finalized](TransactionStatus::Finalized)
114/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
115/// - [Usurped](TransactionStatus::Usurped)
116/// - [Invalid](TransactionStatus::Invalid)
117/// - [Dropped](TransactionStatus::Dropped)
118///
119/// See [`TransactionStatus::is_final`] for more details.
120///
121/// ### Resubmit Transactions
122///
123/// Users might resubmit the transaction at a later time for the following events:
124/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
125/// - [Invalid](TransactionStatus::Invalid)
126/// - [Dropped](TransactionStatus::Dropped)
127///
128/// See [`TransactionStatus::is_retriable`] for more details.
129#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
130#[serde(rename_all = "camelCase")]
131pub enum TransactionStatus<Hash, BlockHash> {
132	/// Transaction is part of the future queue.
133	Future,
134	/// Transaction is part of the ready queue.
135	Ready,
136	/// The transaction has been broadcast to the given peers.
137	Broadcast(Vec<String>),
138	/// Transaction has been included in block with given hash
139	/// at the given position.
140	#[serde(with = "v1_compatible")]
141	InBlock((BlockHash, TxIndex)),
142	/// The block this transaction was included in has been retracted.
143	Retracted(BlockHash),
144	/// Maximum number of finality watchers has been reached,
145	/// old watchers are being removed.
146	FinalityTimeout(BlockHash),
147	/// Transaction has been finalized by a finality-gadget, e.g GRANDPA.
148	#[serde(with = "v1_compatible")]
149	Finalized((BlockHash, TxIndex)),
150	/// Transaction has been replaced in the pool, by another transaction
151	/// that provides the same tags. (e.g. same (sender, nonce)).
152	Usurped(Hash),
153	/// Transaction has been dropped from the pool because of the limit.
154	Dropped,
155	/// Transaction is no longer valid in the current state.
156	Invalid,
157}
158
159impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
160	/// Returns true if this is the last event emitted by [`TransactionStatusStream`].
161	pub fn is_final(&self) -> bool {
162		// The state must be kept in sync with `crate::graph::Sender`.
163		match self {
164			Self::Usurped(_) |
165			Self::Finalized(_) |
166			Self::FinalityTimeout(_) |
167			Self::Invalid |
168			Self::Dropped => true,
169			_ => false,
170		}
171	}
172
173	/// Returns true if the transaction could be re-submitted to the pool in the future.
174	///
175	/// For example, `TransactionStatus::Dropped` is retriable, because the transaction
176	/// may enter the pool if there is space for it in the future.
177	pub fn is_retriable(&self) -> bool {
178		match self {
179			// The number of finality watchers has been reached.
180			Self::FinalityTimeout(_) |
181			// An invalid transaction might be valid at a later time.
182			Self::Invalid |
183			// The transaction was dropped because of the limits of the pool.
184			// It can reenter the pool when other transactions are removed / finalized.
185			Self::Dropped => true,
186			_ => false,
187		}
188	}
189}
190
191/// The stream of transaction events.
192pub type TransactionStatusStream<Hash, BlockHash> =
193	dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
194
195/// The import notification event stream.
196pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
197
198/// Transaction hash type for a pool.
199pub type TxHash<P> = <P as TransactionPool>::Hash;
200/// Block hash type for a pool.
201pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
202/// Transaction type for a pool.
203pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
204/// Type of transactions event stream for a pool.
205pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
206/// Transaction type for a local pool.
207pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
208/// Transaction's index within the block in which it was included.
209pub type TxIndex = usize;
210
211/// Typical future type used in transaction pool api.
212pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
213
214/// In-pool transaction interface.
215///
216/// The pool is container of transactions that are implementing this trait.
217/// See `sp_runtime::ValidTransaction` for details about every field.
218pub trait InPoolTransaction {
219	/// Transaction type.
220	type Transaction;
221	/// Transaction hash type.
222	type Hash;
223
224	/// Get the reference to the transaction data.
225	fn data(&self) -> &Self::Transaction;
226	/// Get hash of the transaction.
227	fn hash(&self) -> &Self::Hash;
228	/// Get priority of the transaction.
229	fn priority(&self) -> &TransactionPriority;
230	/// Get longevity of the transaction.
231	fn longevity(&self) -> &TransactionLongevity;
232	/// Get transaction dependencies.
233	fn requires(&self) -> &[TransactionTag];
234	/// Get tags that transaction provides.
235	fn provides(&self) -> &[TransactionTag];
236	/// Return a flag indicating if the transaction should be propagated to other peers.
237	fn is_propagable(&self) -> bool;
238}
239
240/// Transaction pool interface.
241pub trait TransactionPool: Send + Sync {
242	/// Block type.
243	type Block: BlockT;
244	/// Transaction hash type.
245	type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
246	/// In-pool transaction type.
247	type InPoolTransaction: InPoolTransaction<
248		Transaction = TransactionFor<Self>,
249		Hash = TxHash<Self>,
250	>;
251	/// Error type.
252	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
253
254	// *** RPC
255
256	/// Returns a future that imports a bunch of unverified transactions to the pool.
257	fn submit_at(
258		&self,
259		at: <Self::Block as BlockT>::Hash,
260		source: TransactionSource,
261		xts: Vec<TransactionFor<Self>>,
262	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
263
264	/// Returns a future that imports one unverified transaction to the pool.
265	fn submit_one(
266		&self,
267		at: <Self::Block as BlockT>::Hash,
268		source: TransactionSource,
269		xt: TransactionFor<Self>,
270	) -> PoolFuture<TxHash<Self>, Self::Error>;
271
272	/// Returns a future that import a single transaction and starts to watch their progress in the
273	/// pool.
274	fn submit_and_watch(
275		&self,
276		at: <Self::Block as BlockT>::Hash,
277		source: TransactionSource,
278		xt: TransactionFor<Self>,
279	) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
280
281	// *** Block production / Networking
282	/// Get an iterator for ready transactions ordered by priority.
283	///
284	/// Guarantees to return only when transaction pool got updated at `at` block.
285	/// Guarantees to return immediately when `None` is passed.
286	fn ready_at(
287		&self,
288		at: NumberFor<Self::Block>,
289	) -> Pin<
290		Box<
291			dyn Future<
292					Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
293				> + Send,
294		>,
295	>;
296
297	/// Get an iterator for ready transactions ordered by priority.
298	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
299
300	// *** Block production
301	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
302	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
303
304	// *** logging
305	/// Get futures transaction list.
306	fn futures(&self) -> Vec<Self::InPoolTransaction>;
307
308	/// Returns pool status.
309	fn status(&self) -> PoolStatus;
310
311	// *** logging / RPC / networking
312	/// Return an event stream of transactions imported to the pool.
313	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
314
315	// *** networking
316	/// Notify the pool about transactions broadcast.
317	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
318
319	/// Returns transaction hash
320	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
321
322	/// Return specific ready transaction by hash, if there is one.
323	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
324}
325
326/// An iterator of ready transactions.
327///
328/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
329/// last-returned element as invalid.
330///
331/// The implementation is then allowed, for performance reasons, to change the elements
332/// returned next, by e.g.  skipping elements that are known to depend on the reported
333/// transaction, which yields them invalid as well.
334pub trait ReadyTransactions: Iterator {
335	/// Report given transaction as invalid.
336	///
337	/// This might affect subsequent elements returned by the iterator, so dependent transactions
338	/// are skipped for performance reasons.
339	fn report_invalid(&mut self, _tx: &Self::Item);
340}
341
342/// A no-op implementation for an empty iterator.
343impl<T> ReadyTransactions for std::iter::Empty<T> {
344	fn report_invalid(&mut self, _tx: &T) {}
345}
346
347/// Events that the transaction pool listens for.
348pub enum ChainEvent<B: BlockT> {
349	/// New best block have been added to the chain.
350	NewBestBlock {
351		/// Hash of the block.
352		hash: B::Hash,
353		/// Tree route from old best to new best parent that was calculated on import.
354		///
355		/// If `None`, no re-org happened on import.
356		tree_route: Option<Arc<sp_blockchain::TreeRoute<B>>>,
357	},
358	/// An existing block has been finalized.
359	Finalized {
360		/// Hash of just finalized block.
361		hash: B::Hash,
362		/// Path from old finalized to new finalized parent.
363		tree_route: Arc<[B::Hash]>,
364	},
365}
366
367impl<B: BlockT> ChainEvent<B> {
368	/// Returns the block hash associated to the event.
369	pub fn hash(&self) -> B::Hash {
370		match self {
371			Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
372		}
373	}
374
375	/// Is `self == Self::Finalized`?
376	pub fn is_finalized(&self) -> bool {
377		matches!(self, Self::Finalized { .. })
378	}
379}
380
381/// Trait for transaction pool maintenance.
382#[async_trait]
383pub trait MaintainedTransactionPool: TransactionPool {
384	/// Perform maintenance
385	async fn maintain(&self, event: ChainEvent<Self::Block>);
386}
387
388/// Transaction pool interface for submitting local transactions that exposes a
389/// blocking interface for submission.
390pub trait LocalTransactionPool: Send + Sync {
391	/// Block type.
392	type Block: BlockT;
393	/// Transaction hash type.
394	type Hash: Hash + Eq + Member + Serialize;
395	/// Error type.
396	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
397
398	/// Submits the given local unverified transaction to the pool blocking the
399	/// current thread for any necessary pre-verification.
400	/// NOTE: It MUST NOT be used for transactions that originate from the
401	/// network or RPC, since the validation is performed with
402	/// `TransactionSource::Local`.
403	fn submit_local(
404		&self,
405		at: <Self::Block as BlockT>::Hash,
406		xt: LocalTransactionFor<Self>,
407	) -> Result<Self::Hash, Self::Error>;
408}
409
410impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
411	type Block = T::Block;
412
413	type Hash = T::Hash;
414
415	type Error = T::Error;
416
417	fn submit_local(
418		&self,
419		at: <Self::Block as BlockT>::Hash,
420		xt: LocalTransactionFor<Self>,
421	) -> Result<Self::Hash, Self::Error> {
422		(**self).submit_local(at, xt)
423	}
424}
425
426/// An abstraction for [`LocalTransactionPool`]
427///
428/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
429/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
430/// the wrapping in a `Arc`.
431trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
432	/// Submit transaction.
433	///
434	/// The transaction will end up in the pool and be propagated to others.
435	fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
436}
437
438impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
439	fn submit_at(
440		&self,
441		at: <TPool::Block as BlockT>::Hash,
442		extrinsic: <TPool::Block as BlockT>::Extrinsic,
443	) -> Result<(), ()> {
444		log::debug!(
445			target: LOG_TARGET,
446			"(offchain call) Submitting a transaction to the pool: {:?}",
447			extrinsic
448		);
449
450		let result = self.submit_local(at, extrinsic);
451
452		result.map(|_| ()).map_err(|e| {
453			log::warn!(
454				target: LOG_TARGET,
455				"(offchain call) Error submitting a transaction to the pool: {}",
456				e
457			)
458		})
459	}
460}
461
462/// Factory for creating [`TransactionPoolExt`]s.
463///
464/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
465/// the wasm execution environment to send transactions from an offchain call to the  runtime.
466#[derive(Clone)]
467pub struct OffchainTransactionPoolFactory<Block: BlockT> {
468	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
469}
470
471impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
472	/// Creates a new instance using the given `tx_pool`.
473	pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
474		Self { pool: Arc::new(tx_pool) as Arc<_> }
475	}
476
477	/// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
478	///
479	/// Transactions that are being submitted by this instance will be submitted with `block_hash`
480	/// as context for validation.
481	pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
482		TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
483	}
484}
485
486/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`].
487struct OffchainTransactionPool<Block: BlockT> {
488	block_hash: Block::Hash,
489	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
490}
491
492impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
493	fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
494		let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
495			Ok(t) => t,
496			Err(e) => {
497				log::error!(
498					target: LOG_TARGET,
499					"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
500				);
501
502				return Err(())
503			},
504		};
505
506		self.pool.submit_at(self.block_hash, extrinsic)
507	}
508}
509
510/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
511mod v1_compatible {
512	use serde::{Deserialize, Deserializer, Serialize, Serializer};
513
514	pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
515	where
516		S: Serializer,
517		H: Serialize,
518	{
519		let (hash, _) = data;
520		serde::Serialize::serialize(&hash, serializer)
521	}
522
523	pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
524	where
525		D: Deserializer<'de>,
526		H: Deserialize<'de>,
527	{
528		let hash: H = serde::Deserialize::deserialize(deserializer)?;
529		Ok((hash, 0))
530	}
531}
532
533/// Transaction pool that rejects all submitted transactions.
534///
535/// Could be used for example in tests.
536pub struct RejectAllTxPool<Block>(PhantomData<Block>);
537
538impl<Block> Default for RejectAllTxPool<Block> {
539	fn default() -> Self {
540		Self(PhantomData)
541	}
542}
543
544impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
545	type Block = Block;
546
547	type Hash = Block::Hash;
548
549	type Error = error::Error;
550
551	fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
552		Err(error::Error::ImmediatelyDropped)
553	}
554}
555
556#[cfg(test)]
557mod tests {
558	use super::*;
559
560	#[test]
561	fn tx_status_compatibility() {
562		let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
563		let ser = serde_json::to_string(&event).unwrap();
564
565		let exp = r#"{"inBlock":1}"#;
566		assert_eq!(ser, exp);
567
568		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
569		assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
570
571		let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
572		let ser = serde_json::to_string(&event).unwrap();
573
574		let exp = r#"{"finalized":1}"#;
575		assert_eq!(ser, exp);
576
577		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
578		assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
579	}
580}