referrerpolicy=no-referrer-when-downgrade

sc_network_transactions/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transactions handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`TransactionsHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`TransactionsHandlerPrototype::new`] to the
25//!   network configuration as an extra peers set.
26//! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a
27//! `Future` that processes transactions.
28
29use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{prelude::*, stream::FuturesUnordered};
33use log::{debug, trace, warn};
34
35use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
36use sc_network::{
37	config::{NonReservedPeerMode, ProtocolId, SetConfig},
38	error, multiaddr,
39	peer_store::PeerStoreProvider,
40	service::{
41		traits::{NotificationEvent, NotificationService, ValidationResult},
42		NotificationMetrics,
43	},
44	types::ProtocolName,
45	utils::{interval, LruHashSet},
46	NetworkBackend, NetworkEventStream, NetworkPeers,
47};
48use sc_network_common::{role::ObservedRole, ExHashT};
49use sc_network_sync::{SyncEvent, SyncEventStream};
50use sc_network_types::PeerId;
51use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
52use sp_runtime::traits::Block as BlockT;
53
54use std::{
55	collections::{hash_map::Entry, HashMap},
56	iter,
57	num::NonZeroUsize,
58	pin::Pin,
59	sync::Arc,
60	task::Poll,
61};
62
63pub mod config;
64
65/// A set of transactions.
66pub type Transactions<E> = Vec<E>;
67
68/// Logging target for the file.
69const LOG_TARGET: &str = "sync";
70
71mod rep {
72	use sc_network::ReputationChange as Rep;
73	/// Reputation change when a peer sends us any transaction.
74	///
75	/// This forces node to verify it, thus the negative value here. Once transaction is verified,
76	/// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
77	pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
78	/// Reputation change when a peer sends us any transaction that is not invalid.
79	pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
80	/// Reputation change when a peer sends us an transaction that we didn't know about.
81	pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
82	/// Reputation change when a peer sends us a bad transaction.
83	pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
84}
85
86struct Metrics {
87	propagated_transactions: Counter<U64>,
88}
89
90impl Metrics {
91	fn register(r: &Registry) -> Result<Self, PrometheusError> {
92		Ok(Self {
93			propagated_transactions: register(
94				Counter::new(
95					"substrate_sync_propagated_transactions",
96					"Number of transactions propagated to at least one peer",
97				)?,
98				r,
99			)?,
100		})
101	}
102}
103
104struct PendingTransaction<H> {
105	validation: TransactionImportFuture,
106	tx_hash: H,
107}
108
109impl<H> Unpin for PendingTransaction<H> {}
110
111impl<H: ExHashT> Future for PendingTransaction<H> {
112	type Output = (H, TransactionImport);
113
114	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
115		if let Poll::Ready(import_result) = self.validation.poll_unpin(cx) {
116			return Poll::Ready((self.tx_hash.clone(), import_result))
117		}
118
119		Poll::Pending
120	}
121}
122
123/// Prototype for a [`TransactionsHandler`].
124pub struct TransactionsHandlerPrototype {
125	/// Name of the transaction protocol.
126	protocol_name: ProtocolName,
127
128	/// Handle that is used to communicate with `sc_network::Notifications`.
129	notification_service: Box<dyn NotificationService>,
130}
131
132impl TransactionsHandlerPrototype {
133	/// Create a new instance.
134	pub fn new<
135		Hash: AsRef<[u8]>,
136		Block: BlockT,
137		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
138	>(
139		protocol_id: ProtocolId,
140		genesis_hash: Hash,
141		fork_id: Option<&str>,
142		metrics: NotificationMetrics,
143		peer_store_handle: Arc<dyn PeerStoreProvider>,
144	) -> (Self, Net::NotificationProtocolConfig) {
145		let genesis_hash = genesis_hash.as_ref();
146		let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
147			format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
148		} else {
149			format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
150		}
151		.into();
152		let (config, notification_service) = Net::notification_config(
153			protocol_name.clone(),
154			vec![format!("/{}/transactions/1", protocol_id.as_ref()).into()],
155			MAX_TRANSACTIONS_SIZE,
156			None,
157			SetConfig {
158				in_peers: 0,
159				out_peers: 0,
160				reserved_nodes: Vec::new(),
161				non_reserved_mode: NonReservedPeerMode::Deny,
162			},
163			metrics,
164			peer_store_handle,
165		);
166
167		(Self { protocol_name, notification_service }, config)
168	}
169
170	/// Turns the prototype into the actual handler. Returns a controller that allows controlling
171	/// the behaviour of the handler while it's running.
172	///
173	/// Important: the transactions handler is initially disabled and doesn't gossip transactions.
174	/// Gossiping is enabled when major syncing is done.
175	pub fn build<
176		B: BlockT + 'static,
177		H: ExHashT,
178		N: NetworkPeers + NetworkEventStream,
179		S: SyncEventStream + sp_consensus::SyncOracle,
180	>(
181		self,
182		network: N,
183		sync: S,
184		transaction_pool: Arc<dyn TransactionPool<H, B>>,
185		metrics_registry: Option<&Registry>,
186	) -> error::Result<(TransactionsHandler<B, H, N, S>, TransactionsHandlerController<H>)> {
187		let sync_event_stream = sync.event_stream("transactions-handler-sync");
188		let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);
189
190		let handler = TransactionsHandler {
191			protocol_name: self.protocol_name,
192			notification_service: self.notification_service,
193			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
194				as Pin<Box<dyn Stream<Item = ()> + Send>>)
195				.fuse(),
196			pending_transactions: FuturesUnordered::new(),
197			pending_transactions_peers: HashMap::new(),
198			network,
199			sync,
200			sync_event_stream: sync_event_stream.fuse(),
201			peers: HashMap::new(),
202			transaction_pool,
203			from_controller,
204			metrics: if let Some(r) = metrics_registry {
205				Some(Metrics::register(r)?)
206			} else {
207				None
208			},
209		};
210
211		let controller = TransactionsHandlerController { to_handler };
212
213		Ok((handler, controller))
214	}
215}
216
217/// Controls the behaviour of a [`TransactionsHandler`] it is connected to.
218pub struct TransactionsHandlerController<H: ExHashT> {
219	to_handler: TracingUnboundedSender<ToHandler<H>>,
220}
221
222impl<H: ExHashT> TransactionsHandlerController<H> {
223	/// You may call this when new transactions are imported by the transaction pool.
224	///
225	/// All transactions will be fetched from the `TransactionPool` that was passed at
226	/// initialization as part of the configuration and propagated to peers.
227	pub fn propagate_transactions(&self) {
228		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
229	}
230
231	/// You must call when new a transaction is imported by the transaction pool.
232	///
233	/// This transaction will be fetched from the `TransactionPool` that was passed at
234	/// initialization as part of the configuration and propagated to peers.
235	pub fn propagate_transaction(&self, hash: H) {
236		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
237	}
238}
239
240enum ToHandler<H: ExHashT> {
241	PropagateTransactions,
242	PropagateTransaction(H),
243}
244
245/// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing.
246pub struct TransactionsHandler<
247	B: BlockT + 'static,
248	H: ExHashT,
249	N: NetworkPeers + NetworkEventStream,
250	S: SyncEventStream + sp_consensus::SyncOracle,
251> {
252	protocol_name: ProtocolName,
253	/// Interval at which we call `propagate_transactions`.
254	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
255	/// Pending transactions verification tasks.
256	pending_transactions: FuturesUnordered<PendingTransaction<H>>,
257	/// As multiple peers can send us the same transaction, we group
258	/// these peers using the transaction hash while the transaction is
259	/// imported. This prevents that we import the same transaction
260	/// multiple times concurrently.
261	pending_transactions_peers: HashMap<H, Vec<PeerId>>,
262	/// Network service to use to send messages and manage peers.
263	network: N,
264	/// Syncing service.
265	sync: S,
266	/// Receiver for syncing-related events.
267	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
268	// All connected peers
269	peers: HashMap<PeerId, Peer<H>>,
270	transaction_pool: Arc<dyn TransactionPool<H, B>>,
271	from_controller: TracingUnboundedReceiver<ToHandler<H>>,
272	/// Prometheus metrics.
273	metrics: Option<Metrics>,
274	/// Handle that is used to communicate with `sc_network::Notifications`.
275	notification_service: Box<dyn NotificationService>,
276}
277
278/// Peer information
279#[derive(Debug)]
280struct Peer<H: ExHashT> {
281	/// Holds a set of transactions known to this peer.
282	known_transactions: LruHashSet<H>,
283	role: ObservedRole,
284}
285
286impl<B, H, N, S> TransactionsHandler<B, H, N, S>
287where
288	B: BlockT + 'static,
289	H: ExHashT,
290	N: NetworkPeers + NetworkEventStream,
291	S: SyncEventStream + sp_consensus::SyncOracle,
292{
293	/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
294	/// interrupted.
295	pub async fn run(mut self) {
296		loop {
297			futures::select! {
298				_ = self.propagate_timeout.next() => {
299					self.propagate_transactions();
300				},
301				(tx_hash, result) = self.pending_transactions.select_next_some() => {
302					if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
303						peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
304					} else {
305						warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
306					}
307				},
308				sync_event = self.sync_event_stream.next() => {
309					if let Some(sync_event) = sync_event {
310						self.handle_sync_event(sync_event);
311					} else {
312						// Syncing has seemingly closed. Closing as well.
313						return;
314					}
315				}
316				message = self.from_controller.select_next_some() => {
317					match message {
318						ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
319						ToHandler::PropagateTransactions => self.propagate_transactions(),
320					}
321				},
322				event = self.notification_service.next_event().fuse() => {
323					if let Some(event) = event {
324						self.handle_notification_event(event)
325					} else {
326						// `Notifications` has seemingly closed. Closing as well.
327						return
328					}
329				}
330			}
331		}
332	}
333
334	fn handle_notification_event(&mut self, event: NotificationEvent) {
335		match event {
336			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
337				// only accept peers whose role can be determined
338				let result = self
339					.network
340					.peer_role(peer, handshake)
341					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
342				let _ = result_tx.send(result);
343			},
344			NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
345				let Some(role) = self.network.peer_role(peer, handshake) else {
346					log::debug!(target: "sub-libp2p", "role for {peer} couldn't be determined");
347					return
348				};
349
350				let _was_in = self.peers.insert(
351					peer,
352					Peer {
353						known_transactions: LruHashSet::new(
354							NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
355						),
356						role,
357					},
358				);
359				debug_assert!(_was_in.is_none());
360			},
361			NotificationEvent::NotificationStreamClosed { peer } => {
362				let _peer = self.peers.remove(&peer);
363				debug_assert!(_peer.is_some());
364			},
365			NotificationEvent::NotificationReceived { peer, notification } => {
366				if let Ok(m) =
367					<Transactions<B::Extrinsic> as Decode>::decode(&mut notification.as_ref())
368				{
369					self.on_transactions(peer, m);
370				} else {
371					warn!(target: "sub-libp2p", "Failed to decode transactions list from peer {peer}");
372					self.network.report_peer(peer, rep::BAD_TRANSACTION);
373				}
374			},
375		}
376	}
377
378	fn handle_sync_event(&mut self, event: SyncEvent) {
379		match event {
380			SyncEvent::PeerConnected(remote) => {
381				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
382					.collect::<multiaddr::Multiaddr>();
383				let result = self.network.add_peers_to_reserved_set(
384					self.protocol_name.clone(),
385					iter::once(addr).collect(),
386				);
387				if let Err(err) = result {
388					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
389				}
390			},
391			SyncEvent::PeerDisconnected(remote) => {
392				let result = self.network.remove_peers_from_reserved_set(
393					self.protocol_name.clone(),
394					iter::once(remote).collect(),
395				);
396				if let Err(err) = result {
397					log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
398				}
399			},
400		}
401	}
402
403	/// Called when peer sends us new transactions
404	fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
405		// Accept transactions only when node is not major syncing
406		if self.sync.is_major_syncing() {
407			trace!(target: LOG_TARGET, "{} Ignoring transactions while major syncing", who);
408			return
409		}
410
411		trace!(target: LOG_TARGET, "Received {} transactions from {}", transactions.len(), who);
412		if let Some(ref mut peer) = self.peers.get_mut(&who) {
413			for t in transactions {
414				if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
415					debug!(
416						target: LOG_TARGET,
417						"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
418						MAX_PENDING_TRANSACTIONS,
419					);
420					break
421				}
422
423				let hash = self.transaction_pool.hash_of(&t);
424				peer.known_transactions.insert(hash.clone());
425
426				self.network.report_peer(who, rep::ANY_TRANSACTION);
427
428				match self.pending_transactions_peers.entry(hash.clone()) {
429					Entry::Vacant(entry) => {
430						self.pending_transactions.push(PendingTransaction {
431							validation: self.transaction_pool.import(t),
432							tx_hash: hash,
433						});
434						entry.insert(vec![who]);
435					},
436					Entry::Occupied(mut entry) => {
437						entry.get_mut().push(who);
438					},
439				}
440			}
441		}
442	}
443
444	fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
445		match import {
446			TransactionImport::KnownGood =>
447				self.network.report_peer(who, rep::ANY_TRANSACTION_REFUND),
448			TransactionImport::NewGood => self.network.report_peer(who, rep::GOOD_TRANSACTION),
449			TransactionImport::Bad => self.network.report_peer(who, rep::BAD_TRANSACTION),
450			TransactionImport::None => {},
451		}
452	}
453
454	/// Propagate one transaction.
455	pub fn propagate_transaction(&mut self, hash: &H) {
456		// Accept transactions only when node is not major syncing
457		if self.sync.is_major_syncing() {
458			return
459		}
460
461		debug!(target: LOG_TARGET, "Propagating transaction [{:?}]", hash);
462		if let Some(transaction) = self.transaction_pool.transaction(hash) {
463			let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
464			self.transaction_pool.on_broadcasted(propagated_to);
465		} else {
466			debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);
467		}
468	}
469
470	fn do_propagate_transactions(
471		&mut self,
472		transactions: &[(H, Arc<B::Extrinsic>)],
473	) -> HashMap<H, Vec<String>> {
474		let mut propagated_to = HashMap::<_, Vec<_>>::new();
475		let mut propagated_transactions = 0;
476
477		for (who, peer) in self.peers.iter_mut() {
478			// never send transactions to the light node
479			if matches!(peer.role, ObservedRole::Light) {
480				continue
481			}
482
483			let (hashes, to_send): (Vec<_>, Transactions<_>) = transactions
484				.iter()
485				.filter(|(hash, _)| peer.known_transactions.insert(hash.clone()))
486				.cloned()
487				.unzip();
488
489			propagated_transactions += hashes.len();
490
491			if !to_send.is_empty() {
492				for hash in hashes {
493					propagated_to.entry(hash).or_default().push(who.to_base58());
494				}
495				trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
496				// Historically, the format of a notification of the transactions protocol
497				// consisted in a (SCALE-encoded) `Vec<Transaction>`.
498				// After RFC 56, the format was modified in a backwards-compatible way to be
499				// a (SCALE-encoded) tuple `(Compact(1), Transaction)`, which is the same encoding
500				// as a `Vec` of length one. This is no coincidence, as the change was
501				// intentionally done in a backwards-compatible way.
502				// In other words, the `Vec` that is sent below **must** always have only a single
503				// element in it.
504				// See <https://github.com/polkadot-fellows/RFCs/blob/main/text/0056-one-transaction-per-notification.md>
505				for to_send in to_send {
506					let _ = self
507						.notification_service
508						.send_sync_notification(who, vec![to_send].encode());
509				}
510			}
511		}
512
513		if let Some(ref metrics) = self.metrics {
514			metrics.propagated_transactions.inc_by(propagated_transactions as _)
515		}
516
517		propagated_to
518	}
519
520	/// Call when we must propagate ready transactions to peers.
521	fn propagate_transactions(&mut self) {
522		// Accept transactions only when node is not major syncing
523		if self.sync.is_major_syncing() {
524			return
525		}
526
527		let transactions = self.transaction_pool.transactions();
528
529		if transactions.is_empty() {
530			return
531		}
532
533		debug!(target: LOG_TARGET, "Propagating transactions");
534
535		let propagated_to = self.do_propagate_transactions(&transactions);
536		self.transaction_pool.on_broadcasted(propagated_to);
537	}
538}