1use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt};
33use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
34use sc_network::{
35 config::{NonReservedPeerMode, SetConfig},
36 error, multiaddr,
37 peer_store::PeerStoreProvider,
38 service::{
39 traits::{NotificationEvent, NotificationService, ValidationResult},
40 NotificationMetrics,
41 },
42 types::ProtocolName,
43 utils::{interval, LruHashSet},
44 NetworkBackend, NetworkEventStream, NetworkPeers,
45};
46use sc_network_common::role::ObservedRole;
47use sc_network_sync::{SyncEvent, SyncEventStream};
48use sc_network_types::PeerId;
49use sp_runtime::traits::Block as BlockT;
50use sp_statement_store::{
51 Hash, NetworkPriority, Statement, StatementSource, StatementStore, SubmitResult,
52};
53use std::{
54 collections::{hash_map::Entry, HashMap, HashSet},
55 iter,
56 num::NonZeroUsize,
57 pin::Pin,
58 sync::Arc,
59};
60
61pub mod config;
62
63pub type Statements = Vec<Statement>;
65pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
67
68mod rep {
69 use sc_network::ReputationChange as Rep;
70 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
75 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
77 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 7, "Good statement");
79 pub const BAD_STATEMENT: Rep = Rep::new(-(1 << 12), "Bad statement");
81 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
83 pub const EXCELLENT_STATEMENT: Rep = Rep::new(1 << 8, "High priority statement");
85}
86
87const LOG_TARGET: &str = "statement-gossip";
88
89struct Metrics {
90 propagated_statements: Counter<U64>,
91}
92
93impl Metrics {
94 fn register(r: &Registry) -> Result<Self, PrometheusError> {
95 Ok(Self {
96 propagated_statements: register(
97 Counter::new(
98 "substrate_sync_propagated_statements",
99 "Number of statements propagated to at least one peer",
100 )?,
101 r,
102 )?,
103 })
104 }
105}
106
107pub struct StatementHandlerPrototype {
109 protocol_name: ProtocolName,
110 notification_service: Box<dyn NotificationService>,
111}
112
113impl StatementHandlerPrototype {
114 pub fn new<
116 Hash: AsRef<[u8]>,
117 Block: BlockT,
118 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
119 >(
120 genesis_hash: Hash,
121 fork_id: Option<&str>,
122 metrics: NotificationMetrics,
123 peer_store_handle: Arc<dyn PeerStoreProvider>,
124 ) -> (Self, Net::NotificationProtocolConfig) {
125 let genesis_hash = genesis_hash.as_ref();
126 let protocol_name = if let Some(fork_id) = fork_id {
127 format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
128 } else {
129 format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
130 };
131 let (config, notification_service) = Net::notification_config(
132 protocol_name.clone().into(),
133 Vec::new(),
134 MAX_STATEMENT_SIZE,
135 None,
136 SetConfig {
137 in_peers: 0,
138 out_peers: 0,
139 reserved_nodes: Vec::new(),
140 non_reserved_mode: NonReservedPeerMode::Deny,
141 },
142 metrics,
143 peer_store_handle,
144 );
145
146 (Self { protocol_name: protocol_name.into(), notification_service }, config)
147 }
148
149 pub fn build<
154 N: NetworkPeers + NetworkEventStream,
155 S: SyncEventStream + sp_consensus::SyncOracle,
156 >(
157 self,
158 network: N,
159 sync: S,
160 statement_store: Arc<dyn StatementStore>,
161 metrics_registry: Option<&Registry>,
162 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
163 ) -> error::Result<StatementHandler<N, S>> {
164 let sync_event_stream = sync.event_stream("statement-handler-sync");
165 let (queue_sender, mut queue_receiver) = async_channel::bounded(100_000);
166
167 let store = statement_store.clone();
168 executor(
169 async move {
170 loop {
171 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
172 queue_receiver.next().await;
173 match task {
174 None => return,
175 Some((statement, completion)) => {
176 let result = store.submit(statement, StatementSource::Network);
177 if completion.send(result).is_err() {
178 log::debug!(
179 target: LOG_TARGET,
180 "Error sending validation completion"
181 );
182 }
183 },
184 }
185 }
186 }
187 .boxed(),
188 );
189
190 let handler = StatementHandler {
191 protocol_name: self.protocol_name,
192 notification_service: self.notification_service,
193 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
194 as Pin<Box<dyn Stream<Item = ()> + Send>>)
195 .fuse(),
196 pending_statements: FuturesUnordered::new(),
197 pending_statements_peers: HashMap::new(),
198 network,
199 sync,
200 sync_event_stream: sync_event_stream.fuse(),
201 peers: HashMap::new(),
202 statement_store,
203 queue_sender,
204 metrics: if let Some(r) = metrics_registry {
205 Some(Metrics::register(r)?)
206 } else {
207 None
208 },
209 };
210
211 Ok(handler)
212 }
213}
214
215pub struct StatementHandler<
217 N: NetworkPeers + NetworkEventStream,
218 S: SyncEventStream + sp_consensus::SyncOracle,
219> {
220 protocol_name: ProtocolName,
221 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
223 pending_statements:
225 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
226 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
231 network: N,
233 sync: S,
235 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
237 notification_service: Box<dyn NotificationService>,
239 peers: HashMap<PeerId, Peer>,
241 statement_store: Arc<dyn StatementStore>,
242 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
243 metrics: Option<Metrics>,
245}
246
247#[derive(Debug)]
249struct Peer {
250 known_statements: LruHashSet<Hash>,
252 role: ObservedRole,
253}
254
255impl<N, S> StatementHandler<N, S>
256where
257 N: NetworkPeers + NetworkEventStream,
258 S: SyncEventStream + sp_consensus::SyncOracle,
259{
260 pub async fn run(mut self) {
263 loop {
264 futures::select! {
265 _ = self.propagate_timeout.next() => {
266 self.propagate_statements();
267 },
268 (hash, result) = self.pending_statements.select_next_some() => {
269 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
270 if let Some(result) = result {
271 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
272 }
273 } else {
274 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
275 }
276 },
277 sync_event = self.sync_event_stream.next() => {
278 if let Some(sync_event) = sync_event {
279 self.handle_sync_event(sync_event);
280 } else {
281 return;
283 }
284 }
285 event = self.notification_service.next_event().fuse() => {
286 if let Some(event) = event {
287 self.handle_notification_event(event)
288 } else {
289 return
291 }
292 }
293 }
294 }
295 }
296
297 fn handle_sync_event(&mut self, event: SyncEvent) {
298 match event {
299 SyncEvent::PeerConnected(remote) => {
300 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
301 .collect::<multiaddr::Multiaddr>();
302 let result = self.network.add_peers_to_reserved_set(
303 self.protocol_name.clone(),
304 iter::once(addr).collect(),
305 );
306 if let Err(err) = result {
307 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
308 }
309 },
310 SyncEvent::PeerDisconnected(remote) => {
311 let result = self.network.remove_peers_from_reserved_set(
312 self.protocol_name.clone(),
313 iter::once(remote).collect(),
314 );
315 if let Err(err) = result {
316 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
317 }
318 },
319 }
320 }
321
322 fn handle_notification_event(&mut self, event: NotificationEvent) {
323 match event {
324 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
325 let result = self
327 .network
328 .peer_role(peer, handshake)
329 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
330 let _ = result_tx.send(result);
331 },
332 NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
333 let Some(role) = self.network.peer_role(peer, handshake) else {
334 log::debug!(target: LOG_TARGET, "role for {peer} couldn't be determined");
335 return
336 };
337
338 let _was_in = self.peers.insert(
339 peer,
340 Peer {
341 known_statements: LruHashSet::new(
342 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
343 ),
344 role,
345 },
346 );
347 debug_assert!(_was_in.is_none());
348 },
349 NotificationEvent::NotificationStreamClosed { peer } => {
350 let _peer = self.peers.remove(&peer);
351 debug_assert!(_peer.is_some());
352 },
353 NotificationEvent::NotificationReceived { peer, notification } => {
354 if self.sync.is_major_syncing() {
356 log::trace!(
357 target: LOG_TARGET,
358 "{peer}: Ignoring statements while major syncing or offline"
359 );
360 return
361 }
362
363 if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
364 self.on_statements(peer, statements);
365 } else {
366 log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
367 }
368 },
369 }
370 }
371
372 fn on_statements(&mut self, who: PeerId, statements: Statements) {
374 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
375 if let Some(ref mut peer) = self.peers.get_mut(&who) {
376 for s in statements {
377 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
378 log::debug!(
379 target: LOG_TARGET,
380 "Ignoring any further statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
381 MAX_PENDING_STATEMENTS,
382 );
383 break
384 }
385
386 let hash = s.hash();
387 peer.known_statements.insert(hash);
388
389 self.network.report_peer(who, rep::ANY_STATEMENT);
390
391 match self.pending_statements_peers.entry(hash) {
392 Entry::Vacant(entry) => {
393 let (completion_sender, completion_receiver) = oneshot::channel();
394 match self.queue_sender.try_send((s, completion_sender)) {
395 Ok(()) => {
396 self.pending_statements.push(
397 async move {
398 let res = completion_receiver.await;
399 (hash, res.ok())
400 }
401 .boxed(),
402 );
403 entry.insert(HashSet::from_iter([who]));
404 },
405 Err(async_channel::TrySendError::Full(_)) => {
406 log::debug!(
407 target: LOG_TARGET,
408 "Dropped statement because validation channel is full",
409 );
410 },
411 Err(async_channel::TrySendError::Closed(_)) => {
412 log::trace!(
413 target: LOG_TARGET,
414 "Dropped statement because validation channel is closed",
415 );
416 },
417 }
418 },
419 Entry::Occupied(mut entry) => {
420 if !entry.get_mut().insert(who) {
421 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
423 }
424 },
425 }
426 }
427 }
428 }
429
430 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
431 match import {
432 SubmitResult::New(NetworkPriority::High) =>
433 self.network.report_peer(who, rep::EXCELLENT_STATEMENT),
434 SubmitResult::New(NetworkPriority::Low) =>
435 self.network.report_peer(who, rep::GOOD_STATEMENT),
436 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
437 SubmitResult::KnownExpired => {},
438 SubmitResult::Ignored => {},
439 SubmitResult::Bad(_) => self.network.report_peer(who, rep::BAD_STATEMENT),
440 SubmitResult::InternalError(_) => {},
441 }
442 }
443
444 pub fn propagate_statement(&mut self, hash: &Hash) {
446 if self.sync.is_major_syncing() {
448 return
449 }
450
451 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
452 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
453 self.do_propagate_statements(&[(*hash, statement)]);
454 }
455 }
456
457 fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
458 let mut propagated_statements = 0;
459
460 for (who, peer) in self.peers.iter_mut() {
461 if matches!(peer.role, ObservedRole::Light) {
463 continue
464 }
465
466 let to_send = statements
467 .iter()
468 .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
469 .collect::<Vec<_>>();
470
471 propagated_statements += to_send.len();
472
473 if !to_send.is_empty() {
474 log::trace!(target: LOG_TARGET, "Sending {} statements to {}", to_send.len(), who);
475 self.notification_service.send_sync_notification(who, to_send.encode());
476 }
477 }
478
479 if let Some(ref metrics) = self.metrics {
480 metrics.propagated_statements.inc_by(propagated_statements as _)
481 }
482 }
483
484 fn propagate_statements(&mut self) {
486 if self.sync.is_major_syncing() {
488 return
489 }
490
491 log::debug!(target: LOG_TARGET, "Propagating statements");
492 if let Ok(statements) = self.statement_store.statements() {
493 self.do_propagate_statements(&statements);
494 }
495 }
496}