referrerpolicy=no-referrer-when-downgrade

sc_network_gossip/
state_machine.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
20
21use ahash::AHashSet;
22use sc_network_types::PeerId;
23use schnellru::{ByLength, LruMap};
24
25use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
26use sc_network::{types::ProtocolName, NotificationService};
27use sc_network_common::role::ObservedRole;
28use sp_runtime::traits::{Block as BlockT, Hash, HashingFor};
29use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
30
31// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
32// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
33// the current main gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such,
34// each GRANDPA round should generate ~1600 messages, and we currently keep track of the last 2
35// completed rounds and the current live one. That makes it so that at any point we will be holding
36// ~4800 live messages.
37//
38// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
39// this cache should take about 256 KB of memory.
40const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
41
42const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
43
44pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
45
46mod rep {
47	use sc_network::ReputationChange as Rep;
48	/// Reputation change when a peer sends us a gossip message that we didn't know about.
49	pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
50	/// Reputation change when a peer sends us a gossip message that we already knew about.
51	pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
52}
53
54struct PeerConsensus<H> {
55	known_messages: AHashSet<H>,
56}
57
58/// Topic stream message with sender.
59#[derive(Clone, Debug, Eq, PartialEq)]
60pub struct TopicNotification {
61	/// Message data.
62	pub message: Vec<u8>,
63	/// Sender if available.
64	pub sender: Option<PeerId>,
65}
66
67struct MessageEntry<B: BlockT> {
68	message_hash: B::Hash,
69	topic: B::Hash,
70	message: Vec<u8>,
71	sender: Option<PeerId>,
72}
73
74/// Local implementation of `ValidatorContext`.
75struct NetworkContext<'g, 'p, B: BlockT> {
76	gossip: &'g mut ConsensusGossip<B>,
77	notification_service: &'p mut Box<dyn NotificationService>,
78}
79
80impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
81	/// Broadcast all messages with given topic to peers that do not have it yet.
82	fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
83		self.gossip.broadcast_topic(self.notification_service, topic, force);
84	}
85
86	/// Broadcast a message to all peers that have not received it previously.
87	fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
88		self.gossip.multicast(self.notification_service, topic, message, force);
89	}
90
91	/// Send addressed message to a peer.
92	fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
93		self.notification_service.send_sync_notification(who, message);
94	}
95
96	/// Send all messages with given topic to a peer.
97	fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
98		self.gossip.send_topic(self.notification_service, who, topic, force);
99	}
100}
101
102fn propagate<'a, B: BlockT, I>(
103	notification_service: &mut Box<dyn NotificationService>,
104	protocol: ProtocolName,
105	messages: I,
106	intent: MessageIntent,
107	peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
108	validator: &Arc<dyn Validator<B>>,
109)
110// (msg_hash, topic, message)
111where
112	I: Clone + IntoIterator<Item = (&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
113{
114	let mut message_allowed = validator.message_allowed();
115
116	for (id, ref mut peer) in peers.iter_mut() {
117		for (message_hash, topic, message) in messages.clone() {
118			let intent = match intent {
119				MessageIntent::Broadcast { .. } =>
120					if peer.known_messages.contains(message_hash) {
121						continue
122					} else {
123						MessageIntent::Broadcast
124					},
125				MessageIntent::PeriodicRebroadcast => {
126					if peer.known_messages.contains(message_hash) {
127						MessageIntent::PeriodicRebroadcast
128					} else {
129						// peer doesn't know message, so the logic should treat it as an
130						// initial broadcast.
131						MessageIntent::Broadcast
132					}
133				},
134				other => other,
135			};
136
137			if !message_allowed(id, intent, topic, message) {
138				continue
139			}
140
141			peer.known_messages.insert(*message_hash);
142
143			tracing::trace!(
144				target: "gossip",
145				to = %id,
146				%protocol,
147				?message,
148				"Propagating message",
149			);
150			notification_service.send_sync_notification(id, message.clone());
151		}
152	}
153}
154
155/// Consensus network protocol handler. Manages statements and candidate requests.
156pub struct ConsensusGossip<B: BlockT> {
157	peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
158	messages: Vec<MessageEntry<B>>,
159	known_messages: LruMap<B::Hash, ()>,
160	protocol: ProtocolName,
161	validator: Arc<dyn Validator<B>>,
162	next_broadcast: Instant,
163	metrics: Option<Metrics>,
164}
165
166impl<B: BlockT> ConsensusGossip<B> {
167	/// Create a new instance using the given validator.
168	pub fn new(
169		validator: Arc<dyn Validator<B>>,
170		protocol: ProtocolName,
171		metrics_registry: Option<&Registry>,
172	) -> Self {
173		let metrics = match metrics_registry.map(Metrics::register) {
174			Some(Ok(metrics)) => Some(metrics),
175			Some(Err(e)) => {
176				tracing::debug!(target: "gossip", "Failed to register metrics: {:?}", e);
177				None
178			},
179			None => None,
180		};
181
182		ConsensusGossip {
183			peers: HashMap::new(),
184			messages: Default::default(),
185			known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
186			protocol,
187			validator,
188			next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
189			metrics,
190		}
191	}
192
193	/// Handle new connected peer.
194	pub fn new_peer(
195		&mut self,
196		notification_service: &mut Box<dyn NotificationService>,
197		who: PeerId,
198		role: ObservedRole,
199	) {
200		tracing::trace!(
201			target:"gossip",
202			%who,
203			protocol = %self.protocol,
204			?role,
205			"Registering peer",
206		);
207		self.peers.insert(who, PeerConsensus { known_messages: Default::default() });
208
209		let validator = self.validator.clone();
210		let mut context = NetworkContext { gossip: self, notification_service };
211		validator.new_peer(&mut context, &who, role);
212	}
213
214	fn register_message_hashed(
215		&mut self,
216		message_hash: B::Hash,
217		topic: B::Hash,
218		message: Vec<u8>,
219		sender: Option<PeerId>,
220	) {
221		if self.known_messages.insert(message_hash, ()) {
222			self.messages.push(MessageEntry { message_hash, topic, message, sender });
223
224			if let Some(ref metrics) = self.metrics {
225				metrics.registered_messages.inc();
226			}
227		}
228	}
229
230	/// Registers a message without propagating it to any peers. The message
231	/// becomes available to new peers or when the service is asked to gossip
232	/// the message's topic. No validation is performed on the message, if the
233	/// message is already expired it should be dropped on the next garbage
234	/// collection.
235	pub fn register_message(&mut self, topic: B::Hash, message: Vec<u8>) {
236		let message_hash = HashingFor::<B>::hash(&message[..]);
237		self.register_message_hashed(message_hash, topic, message, None);
238	}
239
240	/// Call when a peer has been disconnected to stop tracking gossip status.
241	pub fn peer_disconnected(
242		&mut self,
243		notification_service: &mut Box<dyn NotificationService>,
244		who: PeerId,
245	) {
246		let validator = self.validator.clone();
247		let mut context = NetworkContext { gossip: self, notification_service };
248		validator.peer_disconnected(&mut context, &who);
249		self.peers.remove(&who);
250	}
251
252	/// Perform periodic maintenance
253	pub fn tick(&mut self, notification_service: &mut Box<dyn NotificationService>) {
254		self.collect_garbage();
255		if Instant::now() >= self.next_broadcast {
256			self.rebroadcast(notification_service);
257			self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
258		}
259	}
260
261	/// Rebroadcast all messages to all peers.
262	fn rebroadcast(&mut self, notification_service: &mut Box<dyn NotificationService>) {
263		let messages = self
264			.messages
265			.iter()
266			.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
267
268		propagate(
269			notification_service,
270			self.protocol.clone(),
271			messages,
272			MessageIntent::PeriodicRebroadcast,
273			&mut self.peers,
274			&self.validator,
275		);
276	}
277
278	/// Broadcast all messages with given topic.
279	pub fn broadcast_topic(
280		&mut self,
281		notification_service: &mut Box<dyn NotificationService>,
282		topic: B::Hash,
283		force: bool,
284	) {
285		let messages = self.messages.iter().filter_map(|entry| {
286			if entry.topic == topic {
287				Some((&entry.message_hash, &entry.topic, &entry.message))
288			} else {
289				None
290			}
291		});
292		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
293		propagate(
294			notification_service,
295			self.protocol.clone(),
296			messages,
297			intent,
298			&mut self.peers,
299			&self.validator,
300		);
301	}
302
303	/// Prune old or no longer relevant consensus messages. Provide a predicate
304	/// for pruning, which returns `false` when the items with a given topic should be pruned.
305	pub fn collect_garbage(&mut self) {
306		let known_messages = &mut self.known_messages;
307		let before = self.messages.len();
308
309		let mut message_expired = self.validator.message_expired();
310		self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
311
312		let expired_messages = before - self.messages.len();
313
314		if let Some(ref metrics) = self.metrics {
315			metrics.expired_messages.inc_by(expired_messages as u64)
316		}
317
318		tracing::trace!(
319			target: "gossip",
320			protocol = %self.protocol,
321			"Cleaned up {} stale messages, {} left ({} known)",
322			expired_messages,
323			self.messages.len(),
324			known_messages.len(),
325		);
326
327		for (_, ref mut peer) in self.peers.iter_mut() {
328			peer.known_messages.retain(|h| known_messages.get(h).is_some());
329		}
330	}
331
332	/// Get valid messages received in the past for a topic (might have expired meanwhile).
333	pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
334		self.messages
335			.iter()
336			.filter(move |e| e.topic == topic)
337			.map(|entry| TopicNotification { message: entry.message.clone(), sender: entry.sender })
338	}
339
340	/// Register incoming messages and return the ones that are new and valid (according to a gossip
341	/// validator) and should thus be forwarded to the upper layers.
342	pub fn on_incoming(
343		&mut self,
344		network: &mut dyn Network<B>,
345		notification_service: &mut Box<dyn NotificationService>,
346		who: PeerId,
347		messages: Vec<Vec<u8>>,
348	) -> Vec<(B::Hash, TopicNotification)> {
349		let mut to_forward = vec![];
350
351		if !messages.is_empty() {
352			tracing::trace!(
353				target: "gossip",
354				messages_num = %messages.len(),
355				%who,
356				protocol = %self.protocol,
357				"Received messages from peer",
358			);
359		}
360
361		for message in messages {
362			let message_hash = HashingFor::<B>::hash(&message[..]);
363
364			if self.known_messages.get(&message_hash).is_some() {
365				tracing::trace!(
366					target: "gossip",
367					%who,
368					protocol = %self.protocol,
369					"Ignored already known message",
370				);
371
372				// If the peer already send us the message once, let's report them.
373				if self
374					.peers
375					.get_mut(&who)
376					.map_or(false, |p| !p.known_messages.insert(message_hash))
377				{
378					network.report_peer(who, rep::DUPLICATE_GOSSIP);
379				}
380				continue
381			}
382
383			// validate the message
384			let validation = {
385				let validator = self.validator.clone();
386				let mut context = NetworkContext { gossip: self, notification_service };
387				validator.validate(&mut context, &who, &message)
388			};
389
390			let (topic, keep) = match validation {
391				ValidationResult::ProcessAndKeep(topic) => (topic, true),
392				ValidationResult::ProcessAndDiscard(topic) => (topic, false),
393				ValidationResult::Discard => {
394					tracing::trace!(
395						target: "gossip",
396						%who,
397						protocol = %self.protocol,
398						"Discard message from peer",
399					);
400					continue
401				},
402			};
403
404			let peer = match self.peers.get_mut(&who) {
405				Some(peer) => peer,
406				None => {
407					tracing::error!(
408						target: "gossip",
409						%who,
410						protocol = %self.protocol,
411						"Got message from unregistered peer",
412					);
413					continue
414				},
415			};
416
417			network.report_peer(who, rep::GOSSIP_SUCCESS);
418			peer.known_messages.insert(message_hash);
419			to_forward
420				.push((topic, TopicNotification { message: message.clone(), sender: Some(who) }));
421
422			if keep {
423				self.register_message_hashed(message_hash, topic, message, Some(who));
424			}
425		}
426
427		to_forward
428	}
429
430	/// Send all messages with given topic to a peer.
431	pub fn send_topic(
432		&mut self,
433		notification_service: &mut Box<dyn NotificationService>,
434		who: &PeerId,
435		topic: B::Hash,
436		force: bool,
437	) {
438		let mut message_allowed = self.validator.message_allowed();
439
440		if let Some(ref mut peer) = self.peers.get_mut(who) {
441			for entry in self.messages.iter().filter(|m| m.topic == topic) {
442				let intent =
443					if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
444
445				if !force && peer.known_messages.contains(&entry.message_hash) {
446					continue
447				}
448
449				if !message_allowed(who, intent, &entry.topic, &entry.message) {
450					continue
451				}
452
453				peer.known_messages.insert(entry.message_hash);
454
455				tracing::trace!(
456					target: "gossip",
457					to = %who,
458					protocol = %self.protocol,
459					?entry.message,
460					"Sending topic message",
461				);
462				notification_service.send_sync_notification(who, entry.message.clone());
463			}
464		}
465	}
466
467	/// Multicast a message to all peers.
468	pub fn multicast(
469		&mut self,
470		notification_service: &mut Box<dyn NotificationService>,
471		topic: B::Hash,
472		message: Vec<u8>,
473		force: bool,
474	) {
475		let message_hash = HashingFor::<B>::hash(&message);
476		self.register_message_hashed(message_hash, topic, message.clone(), None);
477		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
478		propagate(
479			notification_service,
480			self.protocol.clone(),
481			iter::once((&message_hash, &topic, &message)),
482			intent,
483			&mut self.peers,
484			&self.validator,
485		);
486	}
487
488	/// Send addressed message to a peer. The message is not kept or multicast
489	/// later on.
490	pub fn send_message(
491		&mut self,
492		notification_service: &mut Box<dyn NotificationService>,
493		who: &PeerId,
494		message: Vec<u8>,
495	) {
496		let peer = match self.peers.get_mut(who) {
497			None => return,
498			Some(peer) => peer,
499		};
500
501		let message_hash = HashingFor::<B>::hash(&message);
502
503		tracing::trace!(
504			target: "gossip",
505			to = %who,
506			protocol = %self.protocol,
507			?message,
508			"Sending direct message",
509		);
510
511		peer.known_messages.insert(message_hash);
512		notification_service.send_sync_notification(who, message)
513	}
514}
515
516struct Metrics {
517	registered_messages: Counter<U64>,
518	expired_messages: Counter<U64>,
519}
520
521impl Metrics {
522	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
523		Ok(Self {
524			registered_messages: register(
525				Counter::new(
526					"substrate_network_gossip_registered_messages_total",
527					"Number of registered messages by the gossip service.",
528				)?,
529				registry,
530			)?,
531			expired_messages: register(
532				Counter::new(
533					"substrate_network_gossip_expired_messages_total",
534					"Number of expired messages by the gossip service.",
535				)?,
536				registry,
537			)?,
538		})
539	}
540}
541
542#[cfg(test)]
543mod tests {
544	use super::*;
545	use futures::prelude::*;
546	use sc_network::{
547		config::MultiaddrWithPeerId, event::Event, service::traits::NotificationEvent, MessageSink,
548		NetworkBlock, NetworkEventStream, NetworkPeers, ReputationChange,
549	};
550	use sc_network_types::multiaddr::Multiaddr;
551	use sp_runtime::{
552		testing::{Block as RawBlock, MockCallU64, TestXt, H256},
553		traits::NumberFor,
554	};
555	use std::{
556		collections::HashSet,
557		pin::Pin,
558		sync::{Arc, Mutex},
559	};
560
561	type Block = RawBlock<TestXt<MockCallU64, ()>>;
562
563	macro_rules! push_msg {
564		($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
565			if $consensus.known_messages.insert($hash, ()) {
566				$consensus.messages.push(MessageEntry {
567					message_hash: $hash,
568					topic: $topic,
569					message: $m,
570					sender: None,
571				});
572			}
573		};
574	}
575
576	struct AllowAll;
577	impl Validator<Block> for AllowAll {
578		fn validate(
579			&self,
580			_context: &mut dyn ValidatorContext<Block>,
581			_sender: &PeerId,
582			_data: &[u8],
583		) -> ValidationResult<H256> {
584			ValidationResult::ProcessAndKeep(H256::default())
585		}
586	}
587
588	struct DiscardAll;
589	impl Validator<Block> for DiscardAll {
590		fn validate(
591			&self,
592			_context: &mut dyn ValidatorContext<Block>,
593			_sender: &PeerId,
594			_data: &[u8],
595		) -> ValidationResult<H256> {
596			ValidationResult::Discard
597		}
598	}
599
600	#[derive(Clone, Default)]
601	struct NoOpNetwork {
602		inner: Arc<Mutex<NoOpNetworkInner>>,
603	}
604
605	#[derive(Clone, Default)]
606	struct NoOpNetworkInner {
607		peer_reports: Vec<(PeerId, ReputationChange)>,
608	}
609
610	#[async_trait::async_trait]
611	impl NetworkPeers for NoOpNetwork {
612		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
613			unimplemented!();
614		}
615
616		fn set_authorized_only(&self, _reserved_only: bool) {
617			unimplemented!();
618		}
619
620		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
621			unimplemented!();
622		}
623
624		fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
625			self.inner.lock().unwrap().peer_reports.push((peer_id, cost_benefit));
626		}
627
628		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
629			unimplemented!()
630		}
631
632		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
633			unimplemented!();
634		}
635
636		fn accept_unreserved_peers(&self) {
637			unimplemented!();
638		}
639
640		fn deny_unreserved_peers(&self) {
641			unimplemented!();
642		}
643
644		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
645			unimplemented!();
646		}
647
648		fn remove_reserved_peer(&self, _peer_id: PeerId) {
649			unimplemented!();
650		}
651
652		fn set_reserved_peers(
653			&self,
654			_protocol: ProtocolName,
655			_peers: HashSet<Multiaddr>,
656		) -> Result<(), String> {
657			unimplemented!();
658		}
659
660		fn add_peers_to_reserved_set(
661			&self,
662			_protocol: ProtocolName,
663			_peers: HashSet<Multiaddr>,
664		) -> Result<(), String> {
665			unimplemented!();
666		}
667
668		fn remove_peers_from_reserved_set(
669			&self,
670			_protocol: ProtocolName,
671			_peers: Vec<PeerId>,
672		) -> Result<(), String> {
673			unimplemented!();
674		}
675
676		fn sync_num_connected(&self) -> usize {
677			unimplemented!();
678		}
679
680		fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
681			None
682		}
683
684		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
685			unimplemented!();
686		}
687	}
688
689	impl NetworkEventStream for NoOpNetwork {
690		fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
691			unimplemented!();
692		}
693	}
694
695	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
696		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
697			unimplemented!();
698		}
699
700		fn new_best_block_imported(
701			&self,
702			_hash: <Block as BlockT>::Hash,
703			_number: NumberFor<Block>,
704		) {
705			unimplemented!();
706		}
707	}
708
709	#[derive(Debug, Default)]
710	struct NoOpNotificationService {}
711
712	#[async_trait::async_trait]
713	impl NotificationService for NoOpNotificationService {
714		/// Instruct `Notifications` to open a new substream for `peer`.
715		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
716			unimplemented!();
717		}
718
719		/// Instruct `Notifications` to close substream for `peer`.
720		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
721			unimplemented!();
722		}
723
724		/// Send synchronous `notification` to `peer`.
725		fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
726			unimplemented!();
727		}
728
729		/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
730		async fn send_async_notification(
731			&mut self,
732			_peer: &PeerId,
733			_notification: Vec<u8>,
734		) -> Result<(), sc_network::error::Error> {
735			unimplemented!();
736		}
737
738		/// Set handshake for the notification protocol replacing the old handshake.
739		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
740			unimplemented!();
741		}
742
743		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
744			unimplemented!();
745		}
746
747		/// Get next event from the `Notifications` event stream.
748		async fn next_event(&mut self) -> Option<NotificationEvent> {
749			None
750		}
751
752		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
753			unimplemented!();
754		}
755
756		fn protocol(&self) -> &ProtocolName {
757			unimplemented!();
758		}
759
760		fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
761			unimplemented!();
762		}
763	}
764
765	#[test]
766	fn collects_garbage() {
767		struct AllowOne;
768		impl Validator<Block> for AllowOne {
769			fn validate(
770				&self,
771				_context: &mut dyn ValidatorContext<Block>,
772				_sender: &PeerId,
773				data: &[u8],
774			) -> ValidationResult<H256> {
775				if data[0] == 1 {
776					ValidationResult::ProcessAndKeep(H256::default())
777				} else {
778					ValidationResult::Discard
779				}
780			}
781
782			fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
783				Box::new(move |_topic, data| data[0] != 1)
784			}
785		}
786
787		let prev_hash = H256::random();
788		let best_hash = H256::random();
789		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
790		let m1_hash = H256::random();
791		let m2_hash = H256::random();
792		let m1 = vec![1, 2, 3];
793		let m2 = vec![4, 5, 6];
794
795		push_msg!(consensus, prev_hash, m1_hash, m1);
796		push_msg!(consensus, best_hash, m2_hash, m2);
797		consensus.known_messages.insert(m1_hash, ());
798		consensus.known_messages.insert(m2_hash, ());
799
800		consensus.collect_garbage();
801		assert_eq!(consensus.messages.len(), 2);
802		assert_eq!(consensus.known_messages.len(), 2);
803
804		consensus.validator = Arc::new(AllowOne);
805
806		// m2 is expired
807		consensus.collect_garbage();
808		assert_eq!(consensus.messages.len(), 1);
809		// known messages are only pruned based on size.
810		assert_eq!(consensus.known_messages.len(), 2);
811		assert!(consensus.known_messages.get(&m2_hash).is_some());
812	}
813
814	#[test]
815	fn message_stream_include_those_sent_before_asking() {
816		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
817
818		// Register message.
819		let message = vec![4, 5, 6];
820		let topic = HashingFor::<Block>::hash(&[1, 2, 3]);
821		consensus.register_message(topic, message.clone());
822
823		assert_eq!(
824			consensus.messages_for(topic).next(),
825			Some(TopicNotification { message, sender: None }),
826		);
827	}
828
829	#[test]
830	fn can_keep_multiple_messages_per_topic() {
831		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
832
833		let topic = [1; 32].into();
834		let msg_a = vec![1, 2, 3];
835		let msg_b = vec![4, 5, 6];
836
837		consensus.register_message(topic, msg_a);
838		consensus.register_message(topic, msg_b);
839
840		assert_eq!(consensus.messages.len(), 2);
841	}
842
843	#[test]
844	fn peer_is_removed_on_disconnect() {
845		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
846
847		let mut notification_service: Box<dyn NotificationService> =
848			Box::new(NoOpNotificationService::default());
849
850		let peer_id = PeerId::random();
851		consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
852		assert!(consensus.peers.contains_key(&peer_id));
853
854		consensus.peer_disconnected(&mut notification_service, peer_id);
855		assert!(!consensus.peers.contains_key(&peer_id));
856	}
857
858	#[test]
859	fn on_incoming_ignores_discarded_messages() {
860		let mut notification_service: Box<dyn NotificationService> =
861			Box::new(NoOpNotificationService::default());
862		let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
863			.on_incoming(
864				&mut NoOpNetwork::default(),
865				&mut notification_service,
866				PeerId::random(),
867				vec![vec![1, 2, 3]],
868			);
869
870		assert!(
871			to_forward.is_empty(),
872			"Expected `on_incoming` to ignore discarded message but got {:?}",
873			to_forward,
874		);
875	}
876
877	#[test]
878	fn on_incoming_ignores_unregistered_peer() {
879		let mut network = NoOpNetwork::default();
880		let mut notification_service: Box<dyn NotificationService> =
881			Box::new(NoOpNotificationService::default());
882		let remote = PeerId::random();
883
884		let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
885			.on_incoming(
886				&mut network,
887				&mut notification_service,
888				// Unregistered peer.
889				remote,
890				vec![vec![1, 2, 3]],
891			);
892
893		assert!(
894			to_forward.is_empty(),
895			"Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
896			to_forward,
897		);
898	}
899
900	// Two peers can send us the same gossip message. We should not report the second peer
901	// sending the gossip message as long as its the first time the peer send us this message.
902	#[test]
903	fn do_not_report_peer_for_first_time_duplicate_gossip_message() {
904		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
905
906		let mut network = NoOpNetwork::default();
907		let mut notification_service: Box<dyn NotificationService> =
908			Box::new(NoOpNotificationService::default());
909
910		let peer_id = PeerId::random();
911		consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
912		assert!(consensus.peers.contains_key(&peer_id));
913
914		let peer_id2 = PeerId::random();
915		consensus.new_peer(&mut notification_service, peer_id2, ObservedRole::Full);
916		assert!(consensus.peers.contains_key(&peer_id2));
917
918		let message = vec![vec![1, 2, 3]];
919		consensus.on_incoming(&mut network, &mut notification_service, peer_id, message.clone());
920		consensus.on_incoming(&mut network, &mut notification_service, peer_id2, message.clone());
921
922		assert_eq!(
923			vec![(peer_id, rep::GOSSIP_SUCCESS)],
924			network.inner.lock().unwrap().peer_reports
925		);
926	}
927}