referrerpolicy=no-referrer-when-downgrade

sc_network_statement/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Statement handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
25//!   configuration as an extra peers set.
26//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
27//! `Future` that processes statements.
28
29use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt};
33use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
34use sc_network::{
35	config::{NonReservedPeerMode, SetConfig},
36	error, multiaddr,
37	peer_store::PeerStoreProvider,
38	service::{
39		traits::{NotificationEvent, NotificationService, ValidationResult},
40		NotificationMetrics,
41	},
42	types::ProtocolName,
43	utils::{interval, LruHashSet},
44	NetworkBackend, NetworkEventStream, NetworkPeers,
45};
46use sc_network_common::role::ObservedRole;
47use sc_network_sync::{SyncEvent, SyncEventStream};
48use sc_network_types::PeerId;
49use sp_runtime::traits::Block as BlockT;
50use sp_statement_store::{
51	Hash, NetworkPriority, Statement, StatementSource, StatementStore, SubmitResult,
52};
53use std::{
54	collections::{hash_map::Entry, HashMap, HashSet},
55	iter,
56	num::NonZeroUsize,
57	pin::Pin,
58	sync::Arc,
59};
60
61pub mod config;
62
63/// A set of statements.
64pub type Statements = Vec<Statement>;
65/// Future resolving to statement import result.
66pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
67
68mod rep {
69	use sc_network::ReputationChange as Rep;
70	/// Reputation change when a peer sends us any statement.
71	///
72	/// This forces node to verify it, thus the negative value here. Once statement is verified,
73	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
74	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
75	/// Reputation change when a peer sends us any statement that is not invalid.
76	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
77	/// Reputation change when a peer sends us an statement that we didn't know about.
78	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 7, "Good statement");
79	/// Reputation change when a peer sends us a bad statement.
80	pub const BAD_STATEMENT: Rep = Rep::new(-(1 << 12), "Bad statement");
81	/// Reputation change when a peer sends us a duplicate statement.
82	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
83	/// Reputation change when a peer sends us particularly useful statement
84	pub const EXCELLENT_STATEMENT: Rep = Rep::new(1 << 8, "High priority statement");
85}
86
87const LOG_TARGET: &str = "statement-gossip";
88
89struct Metrics {
90	propagated_statements: Counter<U64>,
91}
92
93impl Metrics {
94	fn register(r: &Registry) -> Result<Self, PrometheusError> {
95		Ok(Self {
96			propagated_statements: register(
97				Counter::new(
98					"substrate_sync_propagated_statements",
99					"Number of statements propagated to at least one peer",
100				)?,
101				r,
102			)?,
103		})
104	}
105}
106
107/// Prototype for a [`StatementHandler`].
108pub struct StatementHandlerPrototype {
109	protocol_name: ProtocolName,
110	notification_service: Box<dyn NotificationService>,
111}
112
113impl StatementHandlerPrototype {
114	/// Create a new instance.
115	pub fn new<
116		Hash: AsRef<[u8]>,
117		Block: BlockT,
118		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
119	>(
120		genesis_hash: Hash,
121		fork_id: Option<&str>,
122		metrics: NotificationMetrics,
123		peer_store_handle: Arc<dyn PeerStoreProvider>,
124	) -> (Self, Net::NotificationProtocolConfig) {
125		let genesis_hash = genesis_hash.as_ref();
126		let protocol_name = if let Some(fork_id) = fork_id {
127			format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
128		} else {
129			format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
130		};
131		let (config, notification_service) = Net::notification_config(
132			protocol_name.clone().into(),
133			Vec::new(),
134			MAX_STATEMENT_SIZE,
135			None,
136			SetConfig {
137				in_peers: 0,
138				out_peers: 0,
139				reserved_nodes: Vec::new(),
140				non_reserved_mode: NonReservedPeerMode::Deny,
141			},
142			metrics,
143			peer_store_handle,
144		);
145
146		(Self { protocol_name: protocol_name.into(), notification_service }, config)
147	}
148
149	/// Turns the prototype into the actual handler.
150	///
151	/// Important: the statements handler is initially disabled and doesn't gossip statements.
152	/// Gossiping is enabled when major syncing is done.
153	pub fn build<
154		N: NetworkPeers + NetworkEventStream,
155		S: SyncEventStream + sp_consensus::SyncOracle,
156	>(
157		self,
158		network: N,
159		sync: S,
160		statement_store: Arc<dyn StatementStore>,
161		metrics_registry: Option<&Registry>,
162		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
163	) -> error::Result<StatementHandler<N, S>> {
164		let sync_event_stream = sync.event_stream("statement-handler-sync");
165		let (queue_sender, mut queue_receiver) = async_channel::bounded(100_000);
166
167		let store = statement_store.clone();
168		executor(
169			async move {
170				loop {
171					let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
172						queue_receiver.next().await;
173					match task {
174						None => return,
175						Some((statement, completion)) => {
176							let result = store.submit(statement, StatementSource::Network);
177							if completion.send(result).is_err() {
178								log::debug!(
179									target: LOG_TARGET,
180									"Error sending validation completion"
181								);
182							}
183						},
184					}
185				}
186			}
187			.boxed(),
188		);
189
190		let handler = StatementHandler {
191			protocol_name: self.protocol_name,
192			notification_service: self.notification_service,
193			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
194				as Pin<Box<dyn Stream<Item = ()> + Send>>)
195				.fuse(),
196			pending_statements: FuturesUnordered::new(),
197			pending_statements_peers: HashMap::new(),
198			network,
199			sync,
200			sync_event_stream: sync_event_stream.fuse(),
201			peers: HashMap::new(),
202			statement_store,
203			queue_sender,
204			metrics: if let Some(r) = metrics_registry {
205				Some(Metrics::register(r)?)
206			} else {
207				None
208			},
209		};
210
211		Ok(handler)
212	}
213}
214
215/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
216pub struct StatementHandler<
217	N: NetworkPeers + NetworkEventStream,
218	S: SyncEventStream + sp_consensus::SyncOracle,
219> {
220	protocol_name: ProtocolName,
221	/// Interval at which we call `propagate_statements`.
222	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
223	/// Pending statements verification tasks.
224	pending_statements:
225		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
226	/// As multiple peers can send us the same statement, we group
227	/// these peers using the statement hash while the statement is
228	/// imported. This prevents that we import the same statement
229	/// multiple times concurrently.
230	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
231	/// Network service to use to send messages and manage peers.
232	network: N,
233	/// Syncing service.
234	sync: S,
235	/// Receiver for syncing-related events.
236	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
237	/// Notification service.
238	notification_service: Box<dyn NotificationService>,
239	// All connected peers
240	peers: HashMap<PeerId, Peer>,
241	statement_store: Arc<dyn StatementStore>,
242	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
243	/// Prometheus metrics.
244	metrics: Option<Metrics>,
245}
246
247/// Peer information
248#[derive(Debug)]
249struct Peer {
250	/// Holds a set of statements known to this peer.
251	known_statements: LruHashSet<Hash>,
252	role: ObservedRole,
253}
254
255impl<N, S> StatementHandler<N, S>
256where
257	N: NetworkPeers + NetworkEventStream,
258	S: SyncEventStream + sp_consensus::SyncOracle,
259{
260	/// Turns the [`StatementHandler`] into a future that should run forever and not be
261	/// interrupted.
262	pub async fn run(mut self) {
263		loop {
264			futures::select! {
265				_ = self.propagate_timeout.next() => {
266					self.propagate_statements();
267				},
268				(hash, result) = self.pending_statements.select_next_some() => {
269					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
270						if let Some(result) = result {
271							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
272						}
273					} else {
274						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
275					}
276				},
277				sync_event = self.sync_event_stream.next() => {
278					if let Some(sync_event) = sync_event {
279						self.handle_sync_event(sync_event);
280					} else {
281						// Syncing has seemingly closed. Closing as well.
282						return;
283					}
284				}
285				event = self.notification_service.next_event().fuse() => {
286					if let Some(event) = event {
287						self.handle_notification_event(event)
288					} else {
289						// `Notifications` has seemingly closed. Closing as well.
290						return
291					}
292				}
293			}
294		}
295	}
296
297	fn handle_sync_event(&mut self, event: SyncEvent) {
298		match event {
299			SyncEvent::PeerConnected(remote) => {
300				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
301					.collect::<multiaddr::Multiaddr>();
302				let result = self.network.add_peers_to_reserved_set(
303					self.protocol_name.clone(),
304					iter::once(addr).collect(),
305				);
306				if let Err(err) = result {
307					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
308				}
309			},
310			SyncEvent::PeerDisconnected(remote) => {
311				let result = self.network.remove_peers_from_reserved_set(
312					self.protocol_name.clone(),
313					iter::once(remote).collect(),
314				);
315				if let Err(err) = result {
316					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
317				}
318			},
319		}
320	}
321
322	fn handle_notification_event(&mut self, event: NotificationEvent) {
323		match event {
324			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
325				// only accept peers whose role can be determined
326				let result = self
327					.network
328					.peer_role(peer, handshake)
329					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
330				let _ = result_tx.send(result);
331			},
332			NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
333				let Some(role) = self.network.peer_role(peer, handshake) else {
334					log::debug!(target: LOG_TARGET, "role for {peer} couldn't be determined");
335					return
336				};
337
338				let _was_in = self.peers.insert(
339					peer,
340					Peer {
341						known_statements: LruHashSet::new(
342							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
343						),
344						role,
345					},
346				);
347				debug_assert!(_was_in.is_none());
348			},
349			NotificationEvent::NotificationStreamClosed { peer } => {
350				let _peer = self.peers.remove(&peer);
351				debug_assert!(_peer.is_some());
352			},
353			NotificationEvent::NotificationReceived { peer, notification } => {
354				// Accept statements only when node is not major syncing
355				if self.sync.is_major_syncing() {
356					log::trace!(
357						target: LOG_TARGET,
358						"{peer}: Ignoring statements while major syncing or offline"
359					);
360					return
361				}
362
363				if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
364					self.on_statements(peer, statements);
365				} else {
366					log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
367				}
368			},
369		}
370	}
371
372	/// Called when peer sends us new statements
373	fn on_statements(&mut self, who: PeerId, statements: Statements) {
374		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
375		if let Some(ref mut peer) = self.peers.get_mut(&who) {
376			for s in statements {
377				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
378					log::debug!(
379						target: LOG_TARGET,
380						"Ignoring any further statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
381						MAX_PENDING_STATEMENTS,
382					);
383					break
384				}
385
386				let hash = s.hash();
387				peer.known_statements.insert(hash);
388
389				self.network.report_peer(who, rep::ANY_STATEMENT);
390
391				match self.pending_statements_peers.entry(hash) {
392					Entry::Vacant(entry) => {
393						let (completion_sender, completion_receiver) = oneshot::channel();
394						match self.queue_sender.try_send((s, completion_sender)) {
395							Ok(()) => {
396								self.pending_statements.push(
397									async move {
398										let res = completion_receiver.await;
399										(hash, res.ok())
400									}
401									.boxed(),
402								);
403								entry.insert(HashSet::from_iter([who]));
404							},
405							Err(async_channel::TrySendError::Full(_)) => {
406								log::debug!(
407									target: LOG_TARGET,
408									"Dropped statement because validation channel is full",
409								);
410							},
411							Err(async_channel::TrySendError::Closed(_)) => {
412								log::trace!(
413									target: LOG_TARGET,
414									"Dropped statement because validation channel is closed",
415								);
416							},
417						}
418					},
419					Entry::Occupied(mut entry) => {
420						if !entry.get_mut().insert(who) {
421							// Already received this from the same peer.
422							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
423						}
424					},
425				}
426			}
427		}
428	}
429
430	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
431		match import {
432			SubmitResult::New(NetworkPriority::High) =>
433				self.network.report_peer(who, rep::EXCELLENT_STATEMENT),
434			SubmitResult::New(NetworkPriority::Low) =>
435				self.network.report_peer(who, rep::GOOD_STATEMENT),
436			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
437			SubmitResult::KnownExpired => {},
438			SubmitResult::Ignored => {},
439			SubmitResult::Bad(_) => self.network.report_peer(who, rep::BAD_STATEMENT),
440			SubmitResult::InternalError(_) => {},
441		}
442	}
443
444	/// Propagate one statement.
445	pub fn propagate_statement(&mut self, hash: &Hash) {
446		// Accept statements only when node is not major syncing
447		if self.sync.is_major_syncing() {
448			return
449		}
450
451		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
452		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
453			self.do_propagate_statements(&[(*hash, statement)]);
454		}
455	}
456
457	fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
458		let mut propagated_statements = 0;
459
460		for (who, peer) in self.peers.iter_mut() {
461			// never send statements to light nodes
462			if matches!(peer.role, ObservedRole::Light) {
463				continue
464			}
465
466			let to_send = statements
467				.iter()
468				.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
469				.collect::<Vec<_>>();
470
471			propagated_statements += to_send.len();
472
473			if !to_send.is_empty() {
474				log::trace!(target: LOG_TARGET, "Sending {} statements to {}", to_send.len(), who);
475				self.notification_service.send_sync_notification(who, to_send.encode());
476			}
477		}
478
479		if let Some(ref metrics) = self.metrics {
480			metrics.propagated_statements.inc_by(propagated_statements as _)
481		}
482	}
483
484	/// Call when we must propagate ready statements to peers.
485	fn propagate_statements(&mut self) {
486		// Send out statements only when node is not major syncing
487		if self.sync.is_major_syncing() {
488			return
489		}
490
491		log::debug!(target: LOG_TARGET, "Propagating statements");
492		if let Ok(statements) = self.statement_store.statements() {
493			self.do_propagate_statements(&statements);
494		}
495	}
496}