1use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
20
21use ahash::AHashSet;
22use sc_network_types::PeerId;
23use schnellru::{ByLength, LruMap};
24
25use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
26use sc_network::{types::ProtocolName, NotificationService};
27use sc_network_common::role::ObservedRole;
28use sp_runtime::traits::{Block as BlockT, Hash, HashingFor};
29use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
30
31const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
41
42const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
43
44pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
45
46mod rep {
47 use sc_network::ReputationChange as Rep;
48 pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
50 pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
52}
53
54struct PeerConsensus<H> {
55 known_messages: AHashSet<H>,
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
60pub struct TopicNotification {
61 pub message: Vec<u8>,
63 pub sender: Option<PeerId>,
65}
66
67struct MessageEntry<B: BlockT> {
68 message_hash: B::Hash,
69 topic: B::Hash,
70 message: Vec<u8>,
71 sender: Option<PeerId>,
72}
73
74struct NetworkContext<'g, 'p, B: BlockT> {
76 gossip: &'g mut ConsensusGossip<B>,
77 notification_service: &'p mut Box<dyn NotificationService>,
78}
79
80impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
81 fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
83 self.gossip.broadcast_topic(self.notification_service, topic, force);
84 }
85
86 fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
88 self.gossip.multicast(self.notification_service, topic, message, force);
89 }
90
91 fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
93 self.notification_service.send_sync_notification(who, message);
94 }
95
96 fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
98 self.gossip.send_topic(self.notification_service, who, topic, force);
99 }
100}
101
102fn propagate<'a, B: BlockT, I>(
103 notification_service: &mut Box<dyn NotificationService>,
104 protocol: ProtocolName,
105 messages: I,
106 intent: MessageIntent,
107 peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
108 validator: &Arc<dyn Validator<B>>,
109)
110where
112 I: Clone + IntoIterator<Item = (&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
113{
114 let mut message_allowed = validator.message_allowed();
115
116 for (id, ref mut peer) in peers.iter_mut() {
117 for (message_hash, topic, message) in messages.clone() {
118 let intent = match intent {
119 MessageIntent::Broadcast { .. } =>
120 if peer.known_messages.contains(message_hash) {
121 continue
122 } else {
123 MessageIntent::Broadcast
124 },
125 MessageIntent::PeriodicRebroadcast => {
126 if peer.known_messages.contains(message_hash) {
127 MessageIntent::PeriodicRebroadcast
128 } else {
129 MessageIntent::Broadcast
132 }
133 },
134 other => other,
135 };
136
137 if !message_allowed(id, intent, topic, message) {
138 continue
139 }
140
141 peer.known_messages.insert(*message_hash);
142
143 tracing::trace!(
144 target: "gossip",
145 to = %id,
146 %protocol,
147 ?message,
148 "Propagating message",
149 );
150 notification_service.send_sync_notification(id, message.clone());
151 }
152 }
153}
154
155pub struct ConsensusGossip<B: BlockT> {
157 peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
158 messages: Vec<MessageEntry<B>>,
159 known_messages: LruMap<B::Hash, ()>,
160 protocol: ProtocolName,
161 validator: Arc<dyn Validator<B>>,
162 next_broadcast: Instant,
163 metrics: Option<Metrics>,
164}
165
166impl<B: BlockT> ConsensusGossip<B> {
167 pub fn new(
169 validator: Arc<dyn Validator<B>>,
170 protocol: ProtocolName,
171 metrics_registry: Option<&Registry>,
172 ) -> Self {
173 let metrics = match metrics_registry.map(Metrics::register) {
174 Some(Ok(metrics)) => Some(metrics),
175 Some(Err(e)) => {
176 tracing::debug!(target: "gossip", "Failed to register metrics: {:?}", e);
177 None
178 },
179 None => None,
180 };
181
182 ConsensusGossip {
183 peers: HashMap::new(),
184 messages: Default::default(),
185 known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
186 protocol,
187 validator,
188 next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
189 metrics,
190 }
191 }
192
193 pub fn new_peer(
195 &mut self,
196 notification_service: &mut Box<dyn NotificationService>,
197 who: PeerId,
198 role: ObservedRole,
199 ) {
200 tracing::trace!(
201 target:"gossip",
202 %who,
203 protocol = %self.protocol,
204 ?role,
205 "Registering peer",
206 );
207 self.peers.insert(who, PeerConsensus { known_messages: Default::default() });
208
209 let validator = self.validator.clone();
210 let mut context = NetworkContext { gossip: self, notification_service };
211 validator.new_peer(&mut context, &who, role);
212 }
213
214 fn register_message_hashed(
215 &mut self,
216 message_hash: B::Hash,
217 topic: B::Hash,
218 message: Vec<u8>,
219 sender: Option<PeerId>,
220 ) {
221 if self.known_messages.insert(message_hash, ()) {
222 self.messages.push(MessageEntry { message_hash, topic, message, sender });
223
224 if let Some(ref metrics) = self.metrics {
225 metrics.registered_messages.inc();
226 }
227 }
228 }
229
230 pub fn register_message(&mut self, topic: B::Hash, message: Vec<u8>) {
236 let message_hash = HashingFor::<B>::hash(&message[..]);
237 self.register_message_hashed(message_hash, topic, message, None);
238 }
239
240 pub fn peer_disconnected(
242 &mut self,
243 notification_service: &mut Box<dyn NotificationService>,
244 who: PeerId,
245 ) {
246 let validator = self.validator.clone();
247 let mut context = NetworkContext { gossip: self, notification_service };
248 validator.peer_disconnected(&mut context, &who);
249 self.peers.remove(&who);
250 }
251
252 pub fn tick(&mut self, notification_service: &mut Box<dyn NotificationService>) {
254 self.collect_garbage();
255 if Instant::now() >= self.next_broadcast {
256 self.rebroadcast(notification_service);
257 self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
258 }
259 }
260
261 fn rebroadcast(&mut self, notification_service: &mut Box<dyn NotificationService>) {
263 let messages = self
264 .messages
265 .iter()
266 .map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
267
268 propagate(
269 notification_service,
270 self.protocol.clone(),
271 messages,
272 MessageIntent::PeriodicRebroadcast,
273 &mut self.peers,
274 &self.validator,
275 );
276 }
277
278 pub fn broadcast_topic(
280 &mut self,
281 notification_service: &mut Box<dyn NotificationService>,
282 topic: B::Hash,
283 force: bool,
284 ) {
285 let messages = self.messages.iter().filter_map(|entry| {
286 if entry.topic == topic {
287 Some((&entry.message_hash, &entry.topic, &entry.message))
288 } else {
289 None
290 }
291 });
292 let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
293 propagate(
294 notification_service,
295 self.protocol.clone(),
296 messages,
297 intent,
298 &mut self.peers,
299 &self.validator,
300 );
301 }
302
303 pub fn collect_garbage(&mut self) {
306 let known_messages = &mut self.known_messages;
307 let before = self.messages.len();
308
309 let mut message_expired = self.validator.message_expired();
310 self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
311
312 let expired_messages = before - self.messages.len();
313
314 if let Some(ref metrics) = self.metrics {
315 metrics.expired_messages.inc_by(expired_messages as u64)
316 }
317
318 tracing::trace!(
319 target: "gossip",
320 protocol = %self.protocol,
321 "Cleaned up {} stale messages, {} left ({} known)",
322 expired_messages,
323 self.messages.len(),
324 known_messages.len(),
325 );
326
327 for (_, ref mut peer) in self.peers.iter_mut() {
328 peer.known_messages.retain(|h| known_messages.get(h).is_some());
329 }
330 }
331
332 pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
334 self.messages
335 .iter()
336 .filter(move |e| e.topic == topic)
337 .map(|entry| TopicNotification { message: entry.message.clone(), sender: entry.sender })
338 }
339
340 pub fn on_incoming(
343 &mut self,
344 network: &mut dyn Network<B>,
345 notification_service: &mut Box<dyn NotificationService>,
346 who: PeerId,
347 messages: Vec<Vec<u8>>,
348 ) -> Vec<(B::Hash, TopicNotification)> {
349 let mut to_forward = vec![];
350
351 if !messages.is_empty() {
352 tracing::trace!(
353 target: "gossip",
354 messages_num = %messages.len(),
355 %who,
356 protocol = %self.protocol,
357 "Received messages from peer",
358 );
359 }
360
361 for message in messages {
362 let message_hash = HashingFor::<B>::hash(&message[..]);
363
364 if self.known_messages.get(&message_hash).is_some() {
365 tracing::trace!(
366 target: "gossip",
367 %who,
368 protocol = %self.protocol,
369 "Ignored already known message",
370 );
371
372 if self
374 .peers
375 .get_mut(&who)
376 .map_or(false, |p| !p.known_messages.insert(message_hash))
377 {
378 network.report_peer(who, rep::DUPLICATE_GOSSIP);
379 }
380 continue
381 }
382
383 let validation = {
385 let validator = self.validator.clone();
386 let mut context = NetworkContext { gossip: self, notification_service };
387 validator.validate(&mut context, &who, &message)
388 };
389
390 let (topic, keep) = match validation {
391 ValidationResult::ProcessAndKeep(topic) => (topic, true),
392 ValidationResult::ProcessAndDiscard(topic) => (topic, false),
393 ValidationResult::Discard => {
394 tracing::trace!(
395 target: "gossip",
396 %who,
397 protocol = %self.protocol,
398 "Discard message from peer",
399 );
400 continue
401 },
402 };
403
404 let peer = match self.peers.get_mut(&who) {
405 Some(peer) => peer,
406 None => {
407 tracing::error!(
408 target: "gossip",
409 %who,
410 protocol = %self.protocol,
411 "Got message from unregistered peer",
412 );
413 continue
414 },
415 };
416
417 network.report_peer(who, rep::GOSSIP_SUCCESS);
418 peer.known_messages.insert(message_hash);
419 to_forward
420 .push((topic, TopicNotification { message: message.clone(), sender: Some(who) }));
421
422 if keep {
423 self.register_message_hashed(message_hash, topic, message, Some(who));
424 }
425 }
426
427 to_forward
428 }
429
430 pub fn send_topic(
432 &mut self,
433 notification_service: &mut Box<dyn NotificationService>,
434 who: &PeerId,
435 topic: B::Hash,
436 force: bool,
437 ) {
438 let mut message_allowed = self.validator.message_allowed();
439
440 if let Some(ref mut peer) = self.peers.get_mut(who) {
441 for entry in self.messages.iter().filter(|m| m.topic == topic) {
442 let intent =
443 if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
444
445 if !force && peer.known_messages.contains(&entry.message_hash) {
446 continue
447 }
448
449 if !message_allowed(who, intent, &entry.topic, &entry.message) {
450 continue
451 }
452
453 peer.known_messages.insert(entry.message_hash);
454
455 tracing::trace!(
456 target: "gossip",
457 to = %who,
458 protocol = %self.protocol,
459 ?entry.message,
460 "Sending topic message",
461 );
462 notification_service.send_sync_notification(who, entry.message.clone());
463 }
464 }
465 }
466
467 pub fn multicast(
469 &mut self,
470 notification_service: &mut Box<dyn NotificationService>,
471 topic: B::Hash,
472 message: Vec<u8>,
473 force: bool,
474 ) {
475 let message_hash = HashingFor::<B>::hash(&message);
476 self.register_message_hashed(message_hash, topic, message.clone(), None);
477 let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
478 propagate(
479 notification_service,
480 self.protocol.clone(),
481 iter::once((&message_hash, &topic, &message)),
482 intent,
483 &mut self.peers,
484 &self.validator,
485 );
486 }
487
488 pub fn send_message(
491 &mut self,
492 notification_service: &mut Box<dyn NotificationService>,
493 who: &PeerId,
494 message: Vec<u8>,
495 ) {
496 let peer = match self.peers.get_mut(who) {
497 None => return,
498 Some(peer) => peer,
499 };
500
501 let message_hash = HashingFor::<B>::hash(&message);
502
503 tracing::trace!(
504 target: "gossip",
505 to = %who,
506 protocol = %self.protocol,
507 ?message,
508 "Sending direct message",
509 );
510
511 peer.known_messages.insert(message_hash);
512 notification_service.send_sync_notification(who, message)
513 }
514}
515
516struct Metrics {
517 registered_messages: Counter<U64>,
518 expired_messages: Counter<U64>,
519}
520
521impl Metrics {
522 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
523 Ok(Self {
524 registered_messages: register(
525 Counter::new(
526 "substrate_network_gossip_registered_messages_total",
527 "Number of registered messages by the gossip service.",
528 )?,
529 registry,
530 )?,
531 expired_messages: register(
532 Counter::new(
533 "substrate_network_gossip_expired_messages_total",
534 "Number of expired messages by the gossip service.",
535 )?,
536 registry,
537 )?,
538 })
539 }
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545 use futures::prelude::*;
546 use sc_network::{
547 config::MultiaddrWithPeerId, event::Event, service::traits::NotificationEvent, MessageSink,
548 NetworkBlock, NetworkEventStream, NetworkPeers, ReputationChange,
549 };
550 use sc_network_types::multiaddr::Multiaddr;
551 use sp_runtime::{
552 testing::{Block as RawBlock, MockCallU64, TestXt, H256},
553 traits::NumberFor,
554 };
555 use std::{
556 collections::HashSet,
557 pin::Pin,
558 sync::{Arc, Mutex},
559 };
560
561 type Block = RawBlock<TestXt<MockCallU64, ()>>;
562
563 macro_rules! push_msg {
564 ($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
565 if $consensus.known_messages.insert($hash, ()) {
566 $consensus.messages.push(MessageEntry {
567 message_hash: $hash,
568 topic: $topic,
569 message: $m,
570 sender: None,
571 });
572 }
573 };
574 }
575
576 struct AllowAll;
577 impl Validator<Block> for AllowAll {
578 fn validate(
579 &self,
580 _context: &mut dyn ValidatorContext<Block>,
581 _sender: &PeerId,
582 _data: &[u8],
583 ) -> ValidationResult<H256> {
584 ValidationResult::ProcessAndKeep(H256::default())
585 }
586 }
587
588 struct DiscardAll;
589 impl Validator<Block> for DiscardAll {
590 fn validate(
591 &self,
592 _context: &mut dyn ValidatorContext<Block>,
593 _sender: &PeerId,
594 _data: &[u8],
595 ) -> ValidationResult<H256> {
596 ValidationResult::Discard
597 }
598 }
599
600 #[derive(Clone, Default)]
601 struct NoOpNetwork {
602 inner: Arc<Mutex<NoOpNetworkInner>>,
603 }
604
605 #[derive(Clone, Default)]
606 struct NoOpNetworkInner {
607 peer_reports: Vec<(PeerId, ReputationChange)>,
608 }
609
610 #[async_trait::async_trait]
611 impl NetworkPeers for NoOpNetwork {
612 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
613 unimplemented!();
614 }
615
616 fn set_authorized_only(&self, _reserved_only: bool) {
617 unimplemented!();
618 }
619
620 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
621 unimplemented!();
622 }
623
624 fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
625 self.inner.lock().unwrap().peer_reports.push((peer_id, cost_benefit));
626 }
627
628 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
629 unimplemented!()
630 }
631
632 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
633 unimplemented!();
634 }
635
636 fn accept_unreserved_peers(&self) {
637 unimplemented!();
638 }
639
640 fn deny_unreserved_peers(&self) {
641 unimplemented!();
642 }
643
644 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
645 unimplemented!();
646 }
647
648 fn remove_reserved_peer(&self, _peer_id: PeerId) {
649 unimplemented!();
650 }
651
652 fn set_reserved_peers(
653 &self,
654 _protocol: ProtocolName,
655 _peers: HashSet<Multiaddr>,
656 ) -> Result<(), String> {
657 unimplemented!();
658 }
659
660 fn add_peers_to_reserved_set(
661 &self,
662 _protocol: ProtocolName,
663 _peers: HashSet<Multiaddr>,
664 ) -> Result<(), String> {
665 unimplemented!();
666 }
667
668 fn remove_peers_from_reserved_set(
669 &self,
670 _protocol: ProtocolName,
671 _peers: Vec<PeerId>,
672 ) -> Result<(), String> {
673 unimplemented!();
674 }
675
676 fn sync_num_connected(&self) -> usize {
677 unimplemented!();
678 }
679
680 fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
681 None
682 }
683
684 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
685 unimplemented!();
686 }
687 }
688
689 impl NetworkEventStream for NoOpNetwork {
690 fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
691 unimplemented!();
692 }
693 }
694
695 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
696 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
697 unimplemented!();
698 }
699
700 fn new_best_block_imported(
701 &self,
702 _hash: <Block as BlockT>::Hash,
703 _number: NumberFor<Block>,
704 ) {
705 unimplemented!();
706 }
707 }
708
709 #[derive(Debug, Default)]
710 struct NoOpNotificationService {}
711
712 #[async_trait::async_trait]
713 impl NotificationService for NoOpNotificationService {
714 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
716 unimplemented!();
717 }
718
719 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
721 unimplemented!();
722 }
723
724 fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
726 unimplemented!();
727 }
728
729 async fn send_async_notification(
731 &mut self,
732 _peer: &PeerId,
733 _notification: Vec<u8>,
734 ) -> Result<(), sc_network::error::Error> {
735 unimplemented!();
736 }
737
738 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
740 unimplemented!();
741 }
742
743 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
744 unimplemented!();
745 }
746
747 async fn next_event(&mut self) -> Option<NotificationEvent> {
749 None
750 }
751
752 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
753 unimplemented!();
754 }
755
756 fn protocol(&self) -> &ProtocolName {
757 unimplemented!();
758 }
759
760 fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
761 unimplemented!();
762 }
763 }
764
765 #[test]
766 fn collects_garbage() {
767 struct AllowOne;
768 impl Validator<Block> for AllowOne {
769 fn validate(
770 &self,
771 _context: &mut dyn ValidatorContext<Block>,
772 _sender: &PeerId,
773 data: &[u8],
774 ) -> ValidationResult<H256> {
775 if data[0] == 1 {
776 ValidationResult::ProcessAndKeep(H256::default())
777 } else {
778 ValidationResult::Discard
779 }
780 }
781
782 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
783 Box::new(move |_topic, data| data[0] != 1)
784 }
785 }
786
787 let prev_hash = H256::random();
788 let best_hash = H256::random();
789 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
790 let m1_hash = H256::random();
791 let m2_hash = H256::random();
792 let m1 = vec![1, 2, 3];
793 let m2 = vec![4, 5, 6];
794
795 push_msg!(consensus, prev_hash, m1_hash, m1);
796 push_msg!(consensus, best_hash, m2_hash, m2);
797 consensus.known_messages.insert(m1_hash, ());
798 consensus.known_messages.insert(m2_hash, ());
799
800 consensus.collect_garbage();
801 assert_eq!(consensus.messages.len(), 2);
802 assert_eq!(consensus.known_messages.len(), 2);
803
804 consensus.validator = Arc::new(AllowOne);
805
806 consensus.collect_garbage();
808 assert_eq!(consensus.messages.len(), 1);
809 assert_eq!(consensus.known_messages.len(), 2);
811 assert!(consensus.known_messages.get(&m2_hash).is_some());
812 }
813
814 #[test]
815 fn message_stream_include_those_sent_before_asking() {
816 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
817
818 let message = vec![4, 5, 6];
820 let topic = HashingFor::<Block>::hash(&[1, 2, 3]);
821 consensus.register_message(topic, message.clone());
822
823 assert_eq!(
824 consensus.messages_for(topic).next(),
825 Some(TopicNotification { message, sender: None }),
826 );
827 }
828
829 #[test]
830 fn can_keep_multiple_messages_per_topic() {
831 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
832
833 let topic = [1; 32].into();
834 let msg_a = vec![1, 2, 3];
835 let msg_b = vec![4, 5, 6];
836
837 consensus.register_message(topic, msg_a);
838 consensus.register_message(topic, msg_b);
839
840 assert_eq!(consensus.messages.len(), 2);
841 }
842
843 #[test]
844 fn peer_is_removed_on_disconnect() {
845 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
846
847 let mut notification_service: Box<dyn NotificationService> =
848 Box::new(NoOpNotificationService::default());
849
850 let peer_id = PeerId::random();
851 consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
852 assert!(consensus.peers.contains_key(&peer_id));
853
854 consensus.peer_disconnected(&mut notification_service, peer_id);
855 assert!(!consensus.peers.contains_key(&peer_id));
856 }
857
858 #[test]
859 fn on_incoming_ignores_discarded_messages() {
860 let mut notification_service: Box<dyn NotificationService> =
861 Box::new(NoOpNotificationService::default());
862 let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
863 .on_incoming(
864 &mut NoOpNetwork::default(),
865 &mut notification_service,
866 PeerId::random(),
867 vec![vec![1, 2, 3]],
868 );
869
870 assert!(
871 to_forward.is_empty(),
872 "Expected `on_incoming` to ignore discarded message but got {:?}",
873 to_forward,
874 );
875 }
876
877 #[test]
878 fn on_incoming_ignores_unregistered_peer() {
879 let mut network = NoOpNetwork::default();
880 let mut notification_service: Box<dyn NotificationService> =
881 Box::new(NoOpNotificationService::default());
882 let remote = PeerId::random();
883
884 let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
885 .on_incoming(
886 &mut network,
887 &mut notification_service,
888 remote,
890 vec![vec![1, 2, 3]],
891 );
892
893 assert!(
894 to_forward.is_empty(),
895 "Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
896 to_forward,
897 );
898 }
899
900 #[test]
903 fn do_not_report_peer_for_first_time_duplicate_gossip_message() {
904 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
905
906 let mut network = NoOpNetwork::default();
907 let mut notification_service: Box<dyn NotificationService> =
908 Box::new(NoOpNotificationService::default());
909
910 let peer_id = PeerId::random();
911 consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
912 assert!(consensus.peers.contains_key(&peer_id));
913
914 let peer_id2 = PeerId::random();
915 consensus.new_peer(&mut notification_service, peer_id2, ObservedRole::Full);
916 assert!(consensus.peers.contains_key(&peer_id2));
917
918 let message = vec![vec![1, 2, 3]];
919 consensus.on_incoming(&mut network, &mut notification_service, peer_id, message.clone());
920 consensus.on_incoming(&mut network, &mut notification_service, peer_id2, message.clone());
921
922 assert_eq!(
923 vec![(peer_id, rep::GOSSIP_SUCCESS)],
924 network.inner.lock().unwrap().peer_reports
925 );
926 }
927}