referrerpolicy=no-referrer-when-downgrade

sc_network_gossip/
bridge.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL},
21	Network, Syncing, Validator,
22};
23
24use sc_network::{
25	service::traits::{NotificationEvent, ValidationResult},
26	types::ProtocolName,
27	NotificationService, ReputationChange,
28};
29use sc_network_sync::SyncEvent;
30
31use futures::{
32	channel::mpsc::{channel, Receiver, Sender},
33	prelude::*,
34};
35use log::trace;
36use prometheus_endpoint::Registry;
37use sc_network_types::PeerId;
38use sp_runtime::traits::Block as BlockT;
39use std::{
40	collections::{HashMap, VecDeque},
41	pin::Pin,
42	sync::Arc,
43	task::{Context, Poll},
44};
45
46/// Wraps around an implementation of the [`Network`] trait and provides gossiping capabilities on
47/// top of it.
48pub struct GossipEngine<B: BlockT> {
49	state_machine: ConsensusGossip<B>,
50	network: Box<dyn Network<B> + Send>,
51	sync: Box<dyn Syncing<B>>,
52	periodic_maintenance_interval: futures_timer::Delay,
53	protocol: ProtocolName,
54
55	/// Incoming events from the syncing service.
56	sync_event_stream: Pin<Box<dyn Stream<Item = SyncEvent> + Send>>,
57	/// Handle for polling notification-related events.
58	notification_service: Box<dyn NotificationService>,
59	/// Outgoing events to the consumer.
60	message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
61	/// Buffered messages (see [`ForwardingState`]).
62	forwarding_state: ForwardingState<B>,
63
64	is_terminated: bool,
65}
66
67/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
68/// them to upper layers via the `message_sinks`. In the scenario where messages have been received
69/// from the network but a subscribed message sink is not yet ready to receive the messages, the
70/// messages are buffered. To model this process a gossip engine can be in two states.
71enum ForwardingState<B: BlockT> {
72	/// The gossip engine is currently not forwarding any messages and will poll the network for
73	/// more messages to forward.
74	Idle,
75	/// The gossip engine is in the progress of forwarding messages and thus will not poll the
76	/// network for more messages until it has send all current messages into the subscribed
77	/// message sinks.
78	Busy(VecDeque<(B::Hash, TopicNotification)>),
79}
80
81impl<B: BlockT> Unpin for GossipEngine<B> {}
82
83impl<B: BlockT> GossipEngine<B> {
84	/// Create a new instance.
85	pub fn new<N, S>(
86		network: N,
87		sync: S,
88		notification_service: Box<dyn NotificationService>,
89		protocol: impl Into<ProtocolName>,
90		validator: Arc<dyn Validator<B>>,
91		metrics_registry: Option<&Registry>,
92	) -> Self
93	where
94		B: 'static,
95		N: Network<B> + Send + Clone + 'static,
96		S: Syncing<B> + Send + Clone + 'static,
97	{
98		let protocol = protocol.into();
99		let sync_event_stream = sync.event_stream("network-gossip");
100
101		GossipEngine {
102			state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
103			network: Box::new(network),
104			sync: Box::new(sync),
105			notification_service,
106			periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
107			protocol,
108
109			sync_event_stream,
110			message_sinks: HashMap::new(),
111			forwarding_state: ForwardingState::Idle,
112
113			is_terminated: false,
114		}
115	}
116
117	pub fn report(&self, who: PeerId, reputation: ReputationChange) {
118		self.network.report_peer(who, reputation);
119	}
120
121	/// Registers a message without propagating it to any peers. The message
122	/// becomes available to new peers or when the service is asked to gossip
123	/// the message's topic. No validation is performed on the message, if the
124	/// message is already expired it should be dropped on the next garbage
125	/// collection.
126	pub fn register_gossip_message(&mut self, topic: B::Hash, message: Vec<u8>) {
127		self.state_machine.register_message(topic, message);
128	}
129
130	/// Broadcast all messages with given topic.
131	pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
132		self.state_machine.broadcast_topic(&mut self.notification_service, topic, force);
133	}
134
135	/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
136	pub fn messages_for(&mut self, topic: B::Hash) -> Receiver<TopicNotification> {
137		let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
138		// The channel length is not critical for correctness. By the implementation of `channel`
139		// each sender is guaranteed a single buffer slot, making it a non-rendezvous channel and
140		// thus preventing direct dead-locks. A minimum channel length of 10 is an estimate based on
141		// the fact that despite `NotificationsReceived` having a `Vec` of messages, it only ever
142		// contains a single message.
143		let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
144
145		for notification in past_messages {
146			tx.try_send(notification)
147				.expect("receiver known to be live, and buffer size known to suffice; qed");
148		}
149
150		self.message_sinks.entry(topic).or_default().push(tx);
151
152		rx
153	}
154
155	/// Send all messages with given topic to a peer.
156	pub fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
157		self.state_machine.send_topic(&mut self.notification_service, who, topic, force)
158	}
159
160	/// Multicast a message to all peers.
161	pub fn gossip_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
162		self.state_machine
163			.multicast(&mut self.notification_service, topic, message, force)
164	}
165
166	/// Send addressed message to the given peers. The message is not kept or multicast
167	/// later on.
168	pub fn send_message(&mut self, who: Vec<PeerId>, data: Vec<u8>) {
169		for who in &who {
170			self.state_machine
171				.send_message(&mut self.notification_service, who, data.clone());
172		}
173	}
174
175	/// Notify everyone we're connected to that we have the given block.
176	///
177	/// Note: this method isn't strictly related to gossiping and should eventually be moved
178	/// somewhere else.
179	pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
180		self.sync.announce_block(block, associated_data);
181	}
182
183	/// Consume [`GossipEngine`] and return the notification service.
184	pub fn take_notification_service(self) -> Box<dyn NotificationService> {
185		self.notification_service
186	}
187}
188
189impl<B: BlockT> Future for GossipEngine<B> {
190	type Output = ();
191
192	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
193		let this = &mut *self;
194
195		'outer: loop {
196			match &mut this.forwarding_state {
197				ForwardingState::Idle => {
198					let next_notification_event =
199						this.notification_service.next_event().poll_unpin(cx);
200					let sync_event_stream = this.sync_event_stream.poll_next_unpin(cx);
201
202					if next_notification_event.is_pending() && sync_event_stream.is_pending() {
203						break
204					}
205
206					match next_notification_event {
207						Poll::Ready(Some(event)) => match event {
208							NotificationEvent::ValidateInboundSubstream {
209								peer,
210								handshake,
211								result_tx,
212								..
213							} => {
214								// only accept peers whose role can be determined
215								let result = this
216									.network
217									.peer_role(peer, handshake)
218									.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
219								let _ = result_tx.send(result);
220							},
221							NotificationEvent::NotificationStreamOpened {
222								peer, handshake, ..
223							} =>
224								if let Some(role) = this.network.peer_role(peer, handshake) {
225									this.state_machine.new_peer(
226										&mut this.notification_service,
227										peer,
228										role,
229									);
230								} else {
231									log::debug!(target: "gossip", "role for {peer} couldn't be determined");
232								},
233							NotificationEvent::NotificationStreamClosed { peer } => {
234								this.state_machine
235									.peer_disconnected(&mut this.notification_service, peer);
236							},
237							NotificationEvent::NotificationReceived { peer, notification } => {
238								let to_forward = this.state_machine.on_incoming(
239									&mut *this.network,
240									&mut this.notification_service,
241									peer,
242									vec![notification],
243								);
244								this.forwarding_state = ForwardingState::Busy(to_forward.into());
245							},
246						},
247						// The network event stream closed. Do the same for [`GossipValidator`].
248						Poll::Ready(None) => {
249							self.is_terminated = true;
250							return Poll::Ready(())
251						},
252						Poll::Pending => {},
253					}
254
255					match sync_event_stream {
256						Poll::Ready(Some(event)) => match event {
257							SyncEvent::PeerConnected(remote) =>
258								this.network.add_set_reserved(remote, this.protocol.clone()),
259							SyncEvent::PeerDisconnected(remote) =>
260								this.network.remove_set_reserved(remote, this.protocol.clone()),
261						},
262						// The sync event stream closed. Do the same for [`GossipValidator`].
263						Poll::Ready(None) => {
264							self.is_terminated = true;
265							return Poll::Ready(())
266						},
267						Poll::Pending => {},
268					}
269				},
270				ForwardingState::Busy(to_forward) => {
271					let (topic, notification) = match to_forward.pop_front() {
272						Some(n) => n,
273						None => {
274							this.forwarding_state = ForwardingState::Idle;
275							continue
276						},
277					};
278
279					let sinks = match this.message_sinks.get_mut(&topic) {
280						Some(sinks) => sinks,
281						None => continue,
282					};
283
284					// Make sure all sinks for the given topic are ready.
285					for sink in sinks.iter_mut() {
286						match sink.poll_ready(cx) {
287							Poll::Ready(Ok(())) => {},
288							// Receiver has been dropped. Ignore for now, filtered out in (1).
289							Poll::Ready(Err(_)) => {},
290							Poll::Pending => {
291								// Push back onto queue for later.
292								to_forward.push_front((topic, notification));
293								break 'outer
294							},
295						}
296					}
297
298					// Filter out all closed sinks.
299					sinks.retain(|sink| !sink.is_closed()); // (1)
300
301					if sinks.is_empty() {
302						this.message_sinks.remove(&topic);
303						continue
304					}
305
306					trace!(
307						target: "gossip",
308						"Pushing consensus message to sinks for {}.", topic,
309					);
310
311					// Send the notification on each sink.
312					for sink in sinks {
313						match sink.start_send(notification.clone()) {
314							Ok(()) => {},
315							Err(e) if e.is_full() => {
316								unreachable!("Previously ensured that all sinks are ready; qed.")
317							},
318							// Receiver got dropped. Will be removed in next iteration (See (1)).
319							Err(_) => {},
320						}
321					}
322				},
323			}
324		}
325
326		while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
327			this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
328			this.state_machine.tick(&mut this.notification_service);
329
330			this.message_sinks.retain(|_, sinks| {
331				sinks.retain(|sink| !sink.is_closed());
332				!sinks.is_empty()
333			});
334		}
335
336		Poll::Pending
337	}
338}
339
340impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
341	fn is_terminated(&self) -> bool {
342		self.is_terminated
343	}
344}
345
346#[cfg(test)]
347mod tests {
348	use super::*;
349	use crate::{ValidationResult, ValidatorContext};
350	use codec::{DecodeAll, Encode};
351	use futures::{
352		channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
353		executor::{block_on, block_on_stream},
354		future::poll_fn,
355	};
356	use quickcheck::{Arbitrary, Gen, QuickCheck};
357	use sc_network::{
358		config::MultiaddrWithPeerId,
359		service::traits::{Direction, MessageSink, NotificationEvent},
360		Event, NetworkBlock, NetworkEventStream, NetworkPeers, NotificationService, Roles,
361	};
362	use sc_network_common::role::ObservedRole;
363	use sc_network_sync::SyncEventStream;
364	use sc_network_types::multiaddr::Multiaddr;
365	use sp_runtime::{
366		testing::H256,
367		traits::{Block as BlockT, NumberFor},
368	};
369	use std::{
370		collections::HashSet,
371		sync::{Arc, Mutex},
372	};
373	use substrate_test_runtime_client::runtime::Block;
374
375	#[derive(Clone, Default)]
376	struct TestNetwork {}
377
378	#[async_trait::async_trait]
379	impl NetworkPeers for TestNetwork {
380		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
381			unimplemented!();
382		}
383
384		fn set_authorized_only(&self, _reserved_only: bool) {
385			unimplemented!();
386		}
387
388		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
389			unimplemented!();
390		}
391
392		fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {}
393
394		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
395			unimplemented!()
396		}
397
398		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
399			unimplemented!();
400		}
401
402		fn accept_unreserved_peers(&self) {
403			unimplemented!();
404		}
405
406		fn deny_unreserved_peers(&self) {
407			unimplemented!();
408		}
409
410		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
411			unimplemented!();
412		}
413
414		fn remove_reserved_peer(&self, _peer_id: PeerId) {
415			unimplemented!();
416		}
417
418		fn set_reserved_peers(
419			&self,
420			_protocol: ProtocolName,
421			_peers: HashSet<Multiaddr>,
422		) -> Result<(), String> {
423			unimplemented!();
424		}
425
426		fn add_peers_to_reserved_set(
427			&self,
428			_protocol: ProtocolName,
429			_peers: HashSet<Multiaddr>,
430		) -> Result<(), String> {
431			unimplemented!();
432		}
433
434		fn remove_peers_from_reserved_set(
435			&self,
436			_protocol: ProtocolName,
437			_peers: Vec<PeerId>,
438		) -> Result<(), String> {
439			unimplemented!();
440		}
441
442		fn sync_num_connected(&self) -> usize {
443			unimplemented!();
444		}
445
446		fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
447			Roles::decode_all(&mut &handshake[..])
448				.ok()
449				.and_then(|role| Some(ObservedRole::from(role)))
450		}
451
452		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
453			unimplemented!();
454		}
455	}
456
457	impl NetworkEventStream for TestNetwork {
458		fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
459			unimplemented!();
460		}
461	}
462
463	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
464		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
465			unimplemented!();
466		}
467
468		fn new_best_block_imported(
469			&self,
470			_hash: <Block as BlockT>::Hash,
471			_number: NumberFor<Block>,
472		) {
473			unimplemented!();
474		}
475	}
476
477	#[derive(Clone, Default)]
478	struct TestSync {
479		inner: Arc<Mutex<TestSyncInner>>,
480	}
481
482	#[derive(Clone, Default)]
483	struct TestSyncInner {
484		event_senders: Vec<UnboundedSender<SyncEvent>>,
485	}
486
487	impl SyncEventStream for TestSync {
488		fn event_stream(
489			&self,
490			_name: &'static str,
491		) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
492			let (tx, rx) = unbounded();
493			self.inner.lock().unwrap().event_senders.push(tx);
494
495			Box::pin(rx)
496		}
497	}
498
499	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestSync {
500		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
501			unimplemented!();
502		}
503
504		fn new_best_block_imported(
505			&self,
506			_hash: <Block as BlockT>::Hash,
507			_number: NumberFor<Block>,
508		) {
509			unimplemented!();
510		}
511	}
512
513	#[derive(Debug)]
514	pub(crate) struct TestNotificationService {
515		rx: UnboundedReceiver<NotificationEvent>,
516	}
517
518	#[async_trait::async_trait]
519	impl sc_network::service::traits::NotificationService for TestNotificationService {
520		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
521			unimplemented!();
522		}
523
524		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
525			unimplemented!();
526		}
527
528		fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
529			unimplemented!();
530		}
531
532		async fn send_async_notification(
533			&mut self,
534			_peer: &PeerId,
535			_notification: Vec<u8>,
536		) -> Result<(), sc_network::error::Error> {
537			unimplemented!();
538		}
539
540		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
541			unimplemented!();
542		}
543
544		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
545			unimplemented!();
546		}
547
548		async fn next_event(&mut self) -> Option<NotificationEvent> {
549			self.rx.next().await
550		}
551
552		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
553			unimplemented!();
554		}
555
556		fn protocol(&self) -> &ProtocolName {
557			unimplemented!();
558		}
559
560		fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
561			unimplemented!();
562		}
563	}
564
565	struct AllowAll;
566	impl Validator<Block> for AllowAll {
567		fn validate(
568			&self,
569			_context: &mut dyn ValidatorContext<Block>,
570			_sender: &PeerId,
571			_data: &[u8],
572		) -> ValidationResult<H256> {
573			ValidationResult::ProcessAndKeep(H256::default())
574		}
575	}
576
577	/// Regression test for the case where the `GossipEngine.network_event_stream` closes. One
578	/// should not ignore a `Poll::Ready(None)` as `poll_next_unpin` will panic on subsequent calls.
579	///
580	/// See https://github.com/paritytech/substrate/issues/5000 for details.
581	#[test]
582	fn returns_when_network_event_stream_closes() {
583		let network = TestNetwork::default();
584		let sync = Arc::new(TestSync::default());
585		let (tx, rx) = unbounded();
586		let notification_service = Box::new(TestNotificationService { rx });
587		let mut gossip_engine = GossipEngine::<Block>::new(
588			network.clone(),
589			sync,
590			notification_service,
591			"/my_protocol",
592			Arc::new(AllowAll {}),
593			None,
594		);
595
596		// drop notification service sender side.
597		drop(tx);
598
599		block_on(poll_fn(move |ctx| {
600			if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
601				panic!(
602					"Expected gossip engine to finish on first poll, given that \
603					 `GossipEngine.network_event_stream` closes right away."
604				)
605			}
606			Poll::Ready(())
607		}))
608	}
609
610	#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
611	async fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
612		let topic = H256::default();
613		let protocol = ProtocolName::from("/my_protocol");
614		let remote_peer = PeerId::random();
615		let network = TestNetwork::default();
616		let sync = Arc::new(TestSync::default());
617		let (mut tx, rx) = unbounded();
618		let notification_service = Box::new(TestNotificationService { rx });
619
620		let mut gossip_engine = GossipEngine::<Block>::new(
621			network.clone(),
622			sync.clone(),
623			notification_service,
624			protocol.clone(),
625			Arc::new(AllowAll {}),
626			None,
627		);
628
629		// Register the remote peer.
630		tx.send(NotificationEvent::NotificationStreamOpened {
631			peer: remote_peer,
632			direction: Direction::Inbound,
633			negotiated_fallback: None,
634			handshake: Roles::FULL.encode(),
635		})
636		.await
637		.unwrap();
638
639		let messages = vec![vec![1], vec![2]];
640
641		// Send first event before subscribing.
642		tx.send(NotificationEvent::NotificationReceived {
643			peer: remote_peer,
644			notification: messages[0].clone().into(),
645		})
646		.await
647		.unwrap();
648
649		let mut subscribers = vec![];
650		for _ in 0..2 {
651			subscribers.push(gossip_engine.messages_for(topic));
652		}
653
654		// Send second event after subscribing.
655		tx.send(NotificationEvent::NotificationReceived {
656			peer: remote_peer,
657			notification: messages[1].clone().into(),
658		})
659		.await
660		.unwrap();
661
662		tokio::spawn(gossip_engine);
663
664		// Note: `block_on_stream()`-derived iterator will block the current thread,
665		//       so we need a `multi_thread` `tokio::test` runtime flavor.
666		let mut subscribers =
667			subscribers.into_iter().map(|s| block_on_stream(s)).collect::<Vec<_>>();
668
669		// Expect each subscriber to receive both events.
670		for message in messages {
671			for subscriber in subscribers.iter_mut() {
672				assert_eq!(
673					subscriber.next(),
674					Some(TopicNotification { message: message.clone(), sender: Some(remote_peer) }),
675				);
676			}
677		}
678	}
679
680	#[test]
681	fn forwarding_to_different_size_and_topic_channels() {
682		#[derive(Clone, Debug)]
683		struct ChannelLengthAndTopic {
684			length: usize,
685			topic: H256,
686		}
687
688		impl Arbitrary for ChannelLengthAndTopic {
689			fn arbitrary(g: &mut Gen) -> Self {
690				let possible_length = (0..100).collect::<Vec<usize>>();
691				let possible_topics = (0..10).collect::<Vec<u64>>();
692				Self {
693					length: *g.choose(&possible_length).unwrap(),
694					// Make sure channel topics and message topics overlap by choosing a small
695					// range.
696					topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
697				}
698			}
699		}
700
701		#[derive(Clone, Debug)]
702		struct Message {
703			topic: H256,
704		}
705
706		impl Arbitrary for Message {
707			fn arbitrary(g: &mut Gen) -> Self {
708				let possible_topics = (0..10).collect::<Vec<u64>>();
709				Self {
710					// Make sure channel topics and message topics overlap by choosing a small
711					// range.
712					topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
713				}
714			}
715		}
716
717		/// Validator that always returns `ProcessAndKeep` interpreting the first 32 bytes of data
718		/// as the message topic.
719		struct TestValidator;
720
721		impl Validator<Block> for TestValidator {
722			fn validate(
723				&self,
724				_context: &mut dyn ValidatorContext<Block>,
725				_sender: &PeerId,
726				data: &[u8],
727			) -> ValidationResult<H256> {
728				ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
729			}
730		}
731
732		fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
733			let protocol = ProtocolName::from("/my_protocol");
734			let remote_peer = PeerId::random();
735			let network = TestNetwork::default();
736			let sync = Arc::new(TestSync::default());
737			let (mut tx, rx) = unbounded();
738			let notification_service = Box::new(TestNotificationService { rx });
739
740			let num_channels_per_topic = channels.iter().fold(
741				HashMap::new(),
742				|mut acc, ChannelLengthAndTopic { topic, .. }| {
743					acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
744					acc
745				},
746			);
747
748			let expected_msgs_per_topic_all_chan = notifications
749				.iter()
750				.fold(HashMap::new(), |mut acc, messages| {
751					for message in messages {
752						acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
753					}
754					acc
755				})
756				.into_iter()
757				// Messages are cloned for each channel with the corresponding topic, thus multiply
758				// with the amount of channels per topic. If there is no channel for a given topic,
759				// don't expect any messages for the topic to be received.
760				.map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
761				.collect::<HashMap<H256, _>>();
762
763			let mut gossip_engine = GossipEngine::<Block>::new(
764				network.clone(),
765				sync.clone(),
766				notification_service,
767				protocol.clone(),
768				Arc::new(TestValidator {}),
769				None,
770			);
771
772			// Create channels.
773			let (txs, mut rxs) = channels
774				.iter()
775				.map(|ChannelLengthAndTopic { length, topic }| (*topic, channel(*length)))
776				.fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
777					acc.0.push((topic, tx));
778					acc.1.push((topic, rx));
779					acc
780				});
781
782			// Insert sender sides into `gossip_engine`.
783			for (topic, tx) in txs {
784				match gossip_engine.message_sinks.get_mut(&topic) {
785					Some(entry) => entry.push(tx),
786					None => {
787						gossip_engine.message_sinks.insert(topic, vec![tx]);
788					},
789				}
790			}
791
792			// Register the remote peer.
793			tx.start_send(NotificationEvent::NotificationStreamOpened {
794				peer: remote_peer,
795				direction: Direction::Inbound,
796				negotiated_fallback: None,
797				handshake: Roles::FULL.encode(),
798			})
799			.unwrap();
800
801			// Send messages into the network event stream.
802			for (i_notification, messages) in notifications.iter().enumerate() {
803				let messages: Vec<Vec<u8>> = messages
804					.into_iter()
805					.enumerate()
806					.map(|(i_message, Message { topic })| {
807						// Embed the topic in the first 256 bytes of the message to be extracted by
808						// the [`TestValidator`] later on.
809						let mut message = topic.as_bytes().to_vec();
810
811						// Make sure the message is unique via `i_notification` and `i_message` to
812						// ensure [`ConsensusBridge`] does not deduplicate it.
813						message.push(i_notification.try_into().unwrap());
814						message.push(i_message.try_into().unwrap());
815
816						message.into()
817					})
818					.collect();
819
820				for message in messages {
821					tx.start_send(NotificationEvent::NotificationReceived {
822						peer: remote_peer,
823						notification: message,
824					})
825					.unwrap();
826				}
827			}
828
829			let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
830
831			// Poll both gossip engine and each receiver and track the amount of received messages.
832			block_on(poll_fn(|cx| {
833				loop {
834					if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
835						unreachable!(
836							"Event stream sender side is not dropped, thus gossip engine does not \
837							 terminate",
838						);
839					}
840
841					let mut progress = false;
842
843					for (topic, rx) in rxs.iter_mut() {
844						match rx.poll_next_unpin(cx) {
845							Poll::Ready(Some(_)) => {
846								progress = true;
847								received_msgs_per_topic_all_chan
848									.entry(*topic)
849									.and_modify(|e| *e += 1)
850									.or_insert(1);
851							},
852							Poll::Ready(None) => {
853								unreachable!("Sender side of channel is never dropped")
854							},
855							Poll::Pending => {},
856						}
857					}
858
859					if !progress {
860						break
861					}
862				}
863				Poll::Ready(())
864			}));
865
866			// Compare amount of expected messages with amount of received messages.
867			for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
868				assert_eq!(
869					received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
870					expected_num,
871				);
872			}
873			for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
874				assert_eq!(
875					expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
876					received_num,
877				);
878			}
879		}
880
881		// Past regressions.
882		prop(vec![], vec![vec![Message { topic: H256::default() }]]);
883		prop(
884			vec![ChannelLengthAndTopic { length: 71, topic: H256::default() }],
885			vec![vec![Message { topic: H256::default() }]],
886		);
887
888		QuickCheck::new().quickcheck(prop as fn(_, _))
889	}
890}