1use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{prelude::*, stream::FuturesUnordered};
33use log::{debug, trace, warn};
34
35use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
36use sc_network::{
37 config::{NonReservedPeerMode, ProtocolId, SetConfig},
38 error, multiaddr,
39 peer_store::PeerStoreProvider,
40 service::{
41 traits::{NotificationEvent, NotificationService, ValidationResult},
42 NotificationMetrics,
43 },
44 types::ProtocolName,
45 utils::{interval, LruHashSet},
46 NetworkBackend, NetworkEventStream, NetworkPeers,
47};
48use sc_network_common::{role::ObservedRole, ExHashT};
49use sc_network_sync::{SyncEvent, SyncEventStream};
50use sc_network_types::PeerId;
51use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
52use sp_runtime::traits::Block as BlockT;
53
54use std::{
55 collections::{hash_map::Entry, HashMap},
56 iter,
57 num::NonZeroUsize,
58 pin::Pin,
59 sync::Arc,
60 task::Poll,
61};
62
63pub mod config;
64
65pub type Transactions<E> = Vec<E>;
67
68const LOG_TARGET: &str = "sync";
70
71mod rep {
72 use sc_network::ReputationChange as Rep;
73 pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
78 pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
80 pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
82 pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
84}
85
86struct Metrics {
87 propagated_transactions: Counter<U64>,
88}
89
90impl Metrics {
91 fn register(r: &Registry) -> Result<Self, PrometheusError> {
92 Ok(Self {
93 propagated_transactions: register(
94 Counter::new(
95 "substrate_sync_propagated_transactions",
96 "Number of transactions propagated to at least one peer",
97 )?,
98 r,
99 )?,
100 })
101 }
102}
103
104struct PendingTransaction<H> {
105 validation: TransactionImportFuture,
106 tx_hash: H,
107}
108
109impl<H> Unpin for PendingTransaction<H> {}
110
111impl<H: ExHashT> Future for PendingTransaction<H> {
112 type Output = (H, TransactionImport);
113
114 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
115 if let Poll::Ready(import_result) = self.validation.poll_unpin(cx) {
116 return Poll::Ready((self.tx_hash.clone(), import_result))
117 }
118
119 Poll::Pending
120 }
121}
122
123pub struct TransactionsHandlerPrototype {
125 protocol_name: ProtocolName,
127
128 notification_service: Box<dyn NotificationService>,
130}
131
132impl TransactionsHandlerPrototype {
133 pub fn new<
135 Hash: AsRef<[u8]>,
136 Block: BlockT,
137 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
138 >(
139 protocol_id: ProtocolId,
140 genesis_hash: Hash,
141 fork_id: Option<&str>,
142 metrics: NotificationMetrics,
143 peer_store_handle: Arc<dyn PeerStoreProvider>,
144 ) -> (Self, Net::NotificationProtocolConfig) {
145 let genesis_hash = genesis_hash.as_ref();
146 let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
147 format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
148 } else {
149 format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
150 }
151 .into();
152 let (config, notification_service) = Net::notification_config(
153 protocol_name.clone(),
154 vec![format!("/{}/transactions/1", protocol_id.as_ref()).into()],
155 MAX_TRANSACTIONS_SIZE,
156 None,
157 SetConfig {
158 in_peers: 0,
159 out_peers: 0,
160 reserved_nodes: Vec::new(),
161 non_reserved_mode: NonReservedPeerMode::Deny,
162 },
163 metrics,
164 peer_store_handle,
165 );
166
167 (Self { protocol_name, notification_service }, config)
168 }
169
170 pub fn build<
176 B: BlockT + 'static,
177 H: ExHashT,
178 N: NetworkPeers + NetworkEventStream,
179 S: SyncEventStream + sp_consensus::SyncOracle,
180 >(
181 self,
182 network: N,
183 sync: S,
184 transaction_pool: Arc<dyn TransactionPool<H, B>>,
185 metrics_registry: Option<&Registry>,
186 ) -> error::Result<(TransactionsHandler<B, H, N, S>, TransactionsHandlerController<H>)> {
187 let sync_event_stream = sync.event_stream("transactions-handler-sync");
188 let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);
189
190 let handler = TransactionsHandler {
191 protocol_name: self.protocol_name,
192 notification_service: self.notification_service,
193 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
194 as Pin<Box<dyn Stream<Item = ()> + Send>>)
195 .fuse(),
196 pending_transactions: FuturesUnordered::new(),
197 pending_transactions_peers: HashMap::new(),
198 network,
199 sync,
200 sync_event_stream: sync_event_stream.fuse(),
201 peers: HashMap::new(),
202 transaction_pool,
203 from_controller,
204 metrics: if let Some(r) = metrics_registry {
205 Some(Metrics::register(r)?)
206 } else {
207 None
208 },
209 };
210
211 let controller = TransactionsHandlerController { to_handler };
212
213 Ok((handler, controller))
214 }
215}
216
217pub struct TransactionsHandlerController<H: ExHashT> {
219 to_handler: TracingUnboundedSender<ToHandler<H>>,
220}
221
222impl<H: ExHashT> TransactionsHandlerController<H> {
223 pub fn propagate_transactions(&self) {
228 let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
229 }
230
231 pub fn propagate_transaction(&self, hash: H) {
236 let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
237 }
238}
239
240enum ToHandler<H: ExHashT> {
241 PropagateTransactions,
242 PropagateTransaction(H),
243}
244
245pub struct TransactionsHandler<
247 B: BlockT + 'static,
248 H: ExHashT,
249 N: NetworkPeers + NetworkEventStream,
250 S: SyncEventStream + sp_consensus::SyncOracle,
251> {
252 protocol_name: ProtocolName,
253 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
255 pending_transactions: FuturesUnordered<PendingTransaction<H>>,
257 pending_transactions_peers: HashMap<H, Vec<PeerId>>,
262 network: N,
264 sync: S,
266 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
268 peers: HashMap<PeerId, Peer<H>>,
270 transaction_pool: Arc<dyn TransactionPool<H, B>>,
271 from_controller: TracingUnboundedReceiver<ToHandler<H>>,
272 metrics: Option<Metrics>,
274 notification_service: Box<dyn NotificationService>,
276}
277
278#[derive(Debug)]
280struct Peer<H: ExHashT> {
281 known_transactions: LruHashSet<H>,
283 role: ObservedRole,
284}
285
286impl<B, H, N, S> TransactionsHandler<B, H, N, S>
287where
288 B: BlockT + 'static,
289 H: ExHashT,
290 N: NetworkPeers + NetworkEventStream,
291 S: SyncEventStream + sp_consensus::SyncOracle,
292{
293 pub async fn run(mut self) {
296 loop {
297 futures::select! {
298 _ = self.propagate_timeout.next() => {
299 self.propagate_transactions();
300 },
301 (tx_hash, result) = self.pending_transactions.select_next_some() => {
302 if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
303 peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
304 } else {
305 warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
306 }
307 },
308 sync_event = self.sync_event_stream.next() => {
309 if let Some(sync_event) = sync_event {
310 self.handle_sync_event(sync_event);
311 } else {
312 return;
314 }
315 }
316 message = self.from_controller.select_next_some() => {
317 match message {
318 ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
319 ToHandler::PropagateTransactions => self.propagate_transactions(),
320 }
321 },
322 event = self.notification_service.next_event().fuse() => {
323 if let Some(event) = event {
324 self.handle_notification_event(event)
325 } else {
326 return
328 }
329 }
330 }
331 }
332 }
333
334 fn handle_notification_event(&mut self, event: NotificationEvent) {
335 match event {
336 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
337 let result = self
339 .network
340 .peer_role(peer, handshake)
341 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
342 let _ = result_tx.send(result);
343 },
344 NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
345 let Some(role) = self.network.peer_role(peer, handshake) else {
346 log::debug!(target: "sub-libp2p", "role for {peer} couldn't be determined");
347 return
348 };
349
350 let _was_in = self.peers.insert(
351 peer,
352 Peer {
353 known_transactions: LruHashSet::new(
354 NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
355 ),
356 role,
357 },
358 );
359 debug_assert!(_was_in.is_none());
360 },
361 NotificationEvent::NotificationStreamClosed { peer } => {
362 let _peer = self.peers.remove(&peer);
363 debug_assert!(_peer.is_some());
364 },
365 NotificationEvent::NotificationReceived { peer, notification } => {
366 if let Ok(m) =
367 <Transactions<B::Extrinsic> as Decode>::decode(&mut notification.as_ref())
368 {
369 self.on_transactions(peer, m);
370 } else {
371 warn!(target: "sub-libp2p", "Failed to decode transactions list from peer {peer}");
372 self.network.report_peer(peer, rep::BAD_TRANSACTION);
373 }
374 },
375 }
376 }
377
378 fn handle_sync_event(&mut self, event: SyncEvent) {
379 match event {
380 SyncEvent::PeerConnected(remote) => {
381 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
382 .collect::<multiaddr::Multiaddr>();
383 let result = self.network.add_peers_to_reserved_set(
384 self.protocol_name.clone(),
385 iter::once(addr).collect(),
386 );
387 if let Err(err) = result {
388 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
389 }
390 },
391 SyncEvent::PeerDisconnected(remote) => {
392 let result = self.network.remove_peers_from_reserved_set(
393 self.protocol_name.clone(),
394 iter::once(remote).collect(),
395 );
396 if let Err(err) = result {
397 log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
398 }
399 },
400 }
401 }
402
403 fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
405 if self.sync.is_major_syncing() {
407 trace!(target: LOG_TARGET, "{} Ignoring transactions while major syncing", who);
408 return
409 }
410
411 trace!(target: LOG_TARGET, "Received {} transactions from {}", transactions.len(), who);
412 if let Some(ref mut peer) = self.peers.get_mut(&who) {
413 for t in transactions {
414 if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
415 debug!(
416 target: LOG_TARGET,
417 "Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
418 MAX_PENDING_TRANSACTIONS,
419 );
420 break
421 }
422
423 let hash = self.transaction_pool.hash_of(&t);
424 peer.known_transactions.insert(hash.clone());
425
426 self.network.report_peer(who, rep::ANY_TRANSACTION);
427
428 match self.pending_transactions_peers.entry(hash.clone()) {
429 Entry::Vacant(entry) => {
430 self.pending_transactions.push(PendingTransaction {
431 validation: self.transaction_pool.import(t),
432 tx_hash: hash,
433 });
434 entry.insert(vec![who]);
435 },
436 Entry::Occupied(mut entry) => {
437 entry.get_mut().push(who);
438 },
439 }
440 }
441 }
442 }
443
444 fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
445 match import {
446 TransactionImport::KnownGood =>
447 self.network.report_peer(who, rep::ANY_TRANSACTION_REFUND),
448 TransactionImport::NewGood => self.network.report_peer(who, rep::GOOD_TRANSACTION),
449 TransactionImport::Bad => self.network.report_peer(who, rep::BAD_TRANSACTION),
450 TransactionImport::None => {},
451 }
452 }
453
454 pub fn propagate_transaction(&mut self, hash: &H) {
456 if self.sync.is_major_syncing() {
458 return
459 }
460
461 debug!(target: LOG_TARGET, "Propagating transaction [{:?}]", hash);
462 if let Some(transaction) = self.transaction_pool.transaction(hash) {
463 let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
464 self.transaction_pool.on_broadcasted(propagated_to);
465 } else {
466 debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);
467 }
468 }
469
470 fn do_propagate_transactions(
471 &mut self,
472 transactions: &[(H, Arc<B::Extrinsic>)],
473 ) -> HashMap<H, Vec<String>> {
474 let mut propagated_to = HashMap::<_, Vec<_>>::new();
475 let mut propagated_transactions = 0;
476
477 for (who, peer) in self.peers.iter_mut() {
478 if matches!(peer.role, ObservedRole::Light) {
480 continue
481 }
482
483 let (hashes, to_send): (Vec<_>, Transactions<_>) = transactions
484 .iter()
485 .filter(|(hash, _)| peer.known_transactions.insert(hash.clone()))
486 .cloned()
487 .unzip();
488
489 propagated_transactions += hashes.len();
490
491 if !to_send.is_empty() {
492 for hash in hashes {
493 propagated_to.entry(hash).or_default().push(who.to_base58());
494 }
495 trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
496 for to_send in to_send {
506 let _ = self
507 .notification_service
508 .send_sync_notification(who, vec![to_send].encode());
509 }
510 }
511 }
512
513 if let Some(ref metrics) = self.metrics {
514 metrics.propagated_transactions.inc_by(propagated_transactions as _)
515 }
516
517 propagated_to
518 }
519
520 fn propagate_transactions(&mut self) {
522 if self.sync.is_major_syncing() {
524 return
525 }
526
527 let transactions = self.transaction_pool.transactions();
528
529 if transactions.is_empty() {
530 return
531 }
532
533 debug!(target: LOG_TARGET, "Propagating transactions");
534
535 let propagated_to = self.do_propagate_transactions(&transactions);
536 self.transaction_pool.on_broadcasted(propagated_to);
537 }
538}