1use crate::{
20 state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL},
21 Network, Syncing, Validator,
22};
23
24use sc_network::{
25 service::traits::{NotificationEvent, ValidationResult},
26 types::ProtocolName,
27 NotificationService, ReputationChange,
28};
29use sc_network_sync::SyncEvent;
30
31use futures::{
32 channel::mpsc::{channel, Receiver, Sender},
33 prelude::*,
34};
35use log::trace;
36use prometheus_endpoint::Registry;
37use sc_network_types::PeerId;
38use sp_runtime::traits::Block as BlockT;
39use std::{
40 collections::{HashMap, VecDeque},
41 pin::Pin,
42 sync::Arc,
43 task::{Context, Poll},
44};
45
46pub struct GossipEngine<B: BlockT> {
49 state_machine: ConsensusGossip<B>,
50 network: Box<dyn Network<B> + Send>,
51 sync: Box<dyn Syncing<B>>,
52 periodic_maintenance_interval: futures_timer::Delay,
53 protocol: ProtocolName,
54
55 sync_event_stream: Pin<Box<dyn Stream<Item = SyncEvent> + Send>>,
57 notification_service: Box<dyn NotificationService>,
59 message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
61 forwarding_state: ForwardingState<B>,
63
64 is_terminated: bool,
65}
66
67enum ForwardingState<B: BlockT> {
72 Idle,
75 Busy(VecDeque<(B::Hash, TopicNotification)>),
79}
80
81impl<B: BlockT> Unpin for GossipEngine<B> {}
82
83impl<B: BlockT> GossipEngine<B> {
84 pub fn new<N, S>(
86 network: N,
87 sync: S,
88 notification_service: Box<dyn NotificationService>,
89 protocol: impl Into<ProtocolName>,
90 validator: Arc<dyn Validator<B>>,
91 metrics_registry: Option<&Registry>,
92 ) -> Self
93 where
94 B: 'static,
95 N: Network<B> + Send + Clone + 'static,
96 S: Syncing<B> + Send + Clone + 'static,
97 {
98 let protocol = protocol.into();
99 let sync_event_stream = sync.event_stream("network-gossip");
100
101 GossipEngine {
102 state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
103 network: Box::new(network),
104 sync: Box::new(sync),
105 notification_service,
106 periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
107 protocol,
108
109 sync_event_stream,
110 message_sinks: HashMap::new(),
111 forwarding_state: ForwardingState::Idle,
112
113 is_terminated: false,
114 }
115 }
116
117 pub fn report(&self, who: PeerId, reputation: ReputationChange) {
118 self.network.report_peer(who, reputation);
119 }
120
121 pub fn register_gossip_message(&mut self, topic: B::Hash, message: Vec<u8>) {
127 self.state_machine.register_message(topic, message);
128 }
129
130 pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
132 self.state_machine.broadcast_topic(&mut self.notification_service, topic, force);
133 }
134
135 pub fn messages_for(&mut self, topic: B::Hash) -> Receiver<TopicNotification> {
137 let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
138 let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
144
145 for notification in past_messages {
146 tx.try_send(notification)
147 .expect("receiver known to be live, and buffer size known to suffice; qed");
148 }
149
150 self.message_sinks.entry(topic).or_default().push(tx);
151
152 rx
153 }
154
155 pub fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
157 self.state_machine.send_topic(&mut self.notification_service, who, topic, force)
158 }
159
160 pub fn gossip_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
162 self.state_machine
163 .multicast(&mut self.notification_service, topic, message, force)
164 }
165
166 pub fn send_message(&mut self, who: Vec<PeerId>, data: Vec<u8>) {
169 for who in &who {
170 self.state_machine
171 .send_message(&mut self.notification_service, who, data.clone());
172 }
173 }
174
175 pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
180 self.sync.announce_block(block, associated_data);
181 }
182
183 pub fn take_notification_service(self) -> Box<dyn NotificationService> {
185 self.notification_service
186 }
187}
188
189impl<B: BlockT> Future for GossipEngine<B> {
190 type Output = ();
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
193 let this = &mut *self;
194
195 'outer: loop {
196 match &mut this.forwarding_state {
197 ForwardingState::Idle => {
198 let next_notification_event =
199 this.notification_service.next_event().poll_unpin(cx);
200 let sync_event_stream = this.sync_event_stream.poll_next_unpin(cx);
201
202 if next_notification_event.is_pending() && sync_event_stream.is_pending() {
203 break
204 }
205
206 match next_notification_event {
207 Poll::Ready(Some(event)) => match event {
208 NotificationEvent::ValidateInboundSubstream {
209 peer,
210 handshake,
211 result_tx,
212 ..
213 } => {
214 let result = this
216 .network
217 .peer_role(peer, handshake)
218 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
219 let _ = result_tx.send(result);
220 },
221 NotificationEvent::NotificationStreamOpened {
222 peer, handshake, ..
223 } =>
224 if let Some(role) = this.network.peer_role(peer, handshake) {
225 this.state_machine.new_peer(
226 &mut this.notification_service,
227 peer,
228 role,
229 );
230 } else {
231 log::debug!(target: "gossip", "role for {peer} couldn't be determined");
232 },
233 NotificationEvent::NotificationStreamClosed { peer } => {
234 this.state_machine
235 .peer_disconnected(&mut this.notification_service, peer);
236 },
237 NotificationEvent::NotificationReceived { peer, notification } => {
238 let to_forward = this.state_machine.on_incoming(
239 &mut *this.network,
240 &mut this.notification_service,
241 peer,
242 vec![notification],
243 );
244 this.forwarding_state = ForwardingState::Busy(to_forward.into());
245 },
246 },
247 Poll::Ready(None) => {
249 self.is_terminated = true;
250 return Poll::Ready(())
251 },
252 Poll::Pending => {},
253 }
254
255 match sync_event_stream {
256 Poll::Ready(Some(event)) => match event {
257 SyncEvent::PeerConnected(remote) =>
258 this.network.add_set_reserved(remote, this.protocol.clone()),
259 SyncEvent::PeerDisconnected(remote) =>
260 this.network.remove_set_reserved(remote, this.protocol.clone()),
261 },
262 Poll::Ready(None) => {
264 self.is_terminated = true;
265 return Poll::Ready(())
266 },
267 Poll::Pending => {},
268 }
269 },
270 ForwardingState::Busy(to_forward) => {
271 let (topic, notification) = match to_forward.pop_front() {
272 Some(n) => n,
273 None => {
274 this.forwarding_state = ForwardingState::Idle;
275 continue
276 },
277 };
278
279 let sinks = match this.message_sinks.get_mut(&topic) {
280 Some(sinks) => sinks,
281 None => continue,
282 };
283
284 for sink in sinks.iter_mut() {
286 match sink.poll_ready(cx) {
287 Poll::Ready(Ok(())) => {},
288 Poll::Ready(Err(_)) => {},
290 Poll::Pending => {
291 to_forward.push_front((topic, notification));
293 break 'outer
294 },
295 }
296 }
297
298 sinks.retain(|sink| !sink.is_closed()); if sinks.is_empty() {
302 this.message_sinks.remove(&topic);
303 continue
304 }
305
306 trace!(
307 target: "gossip",
308 "Pushing consensus message to sinks for {}.", topic,
309 );
310
311 for sink in sinks {
313 match sink.start_send(notification.clone()) {
314 Ok(()) => {},
315 Err(e) if e.is_full() => {
316 unreachable!("Previously ensured that all sinks are ready; qed.")
317 },
318 Err(_) => {},
320 }
321 }
322 },
323 }
324 }
325
326 while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
327 this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
328 this.state_machine.tick(&mut this.notification_service);
329
330 this.message_sinks.retain(|_, sinks| {
331 sinks.retain(|sink| !sink.is_closed());
332 !sinks.is_empty()
333 });
334 }
335
336 Poll::Pending
337 }
338}
339
340impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
341 fn is_terminated(&self) -> bool {
342 self.is_terminated
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::{ValidationResult, ValidatorContext};
350 use codec::{DecodeAll, Encode};
351 use futures::{
352 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
353 executor::{block_on, block_on_stream},
354 future::poll_fn,
355 };
356 use quickcheck::{Arbitrary, Gen, QuickCheck};
357 use sc_network::{
358 config::MultiaddrWithPeerId,
359 service::traits::{Direction, MessageSink, NotificationEvent},
360 Event, NetworkBlock, NetworkEventStream, NetworkPeers, NotificationService, Roles,
361 };
362 use sc_network_common::role::ObservedRole;
363 use sc_network_sync::SyncEventStream;
364 use sc_network_types::multiaddr::Multiaddr;
365 use sp_runtime::{
366 testing::H256,
367 traits::{Block as BlockT, NumberFor},
368 };
369 use std::{
370 collections::HashSet,
371 sync::{Arc, Mutex},
372 };
373 use substrate_test_runtime_client::runtime::Block;
374
375 #[derive(Clone, Default)]
376 struct TestNetwork {}
377
378 #[async_trait::async_trait]
379 impl NetworkPeers for TestNetwork {
380 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
381 unimplemented!();
382 }
383
384 fn set_authorized_only(&self, _reserved_only: bool) {
385 unimplemented!();
386 }
387
388 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
389 unimplemented!();
390 }
391
392 fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {}
393
394 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
395 unimplemented!()
396 }
397
398 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
399 unimplemented!();
400 }
401
402 fn accept_unreserved_peers(&self) {
403 unimplemented!();
404 }
405
406 fn deny_unreserved_peers(&self) {
407 unimplemented!();
408 }
409
410 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
411 unimplemented!();
412 }
413
414 fn remove_reserved_peer(&self, _peer_id: PeerId) {
415 unimplemented!();
416 }
417
418 fn set_reserved_peers(
419 &self,
420 _protocol: ProtocolName,
421 _peers: HashSet<Multiaddr>,
422 ) -> Result<(), String> {
423 unimplemented!();
424 }
425
426 fn add_peers_to_reserved_set(
427 &self,
428 _protocol: ProtocolName,
429 _peers: HashSet<Multiaddr>,
430 ) -> Result<(), String> {
431 unimplemented!();
432 }
433
434 fn remove_peers_from_reserved_set(
435 &self,
436 _protocol: ProtocolName,
437 _peers: Vec<PeerId>,
438 ) -> Result<(), String> {
439 unimplemented!();
440 }
441
442 fn sync_num_connected(&self) -> usize {
443 unimplemented!();
444 }
445
446 fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
447 Roles::decode_all(&mut &handshake[..])
448 .ok()
449 .and_then(|role| Some(ObservedRole::from(role)))
450 }
451
452 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
453 unimplemented!();
454 }
455 }
456
457 impl NetworkEventStream for TestNetwork {
458 fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
459 unimplemented!();
460 }
461 }
462
463 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
464 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
465 unimplemented!();
466 }
467
468 fn new_best_block_imported(
469 &self,
470 _hash: <Block as BlockT>::Hash,
471 _number: NumberFor<Block>,
472 ) {
473 unimplemented!();
474 }
475 }
476
477 #[derive(Clone, Default)]
478 struct TestSync {
479 inner: Arc<Mutex<TestSyncInner>>,
480 }
481
482 #[derive(Clone, Default)]
483 struct TestSyncInner {
484 event_senders: Vec<UnboundedSender<SyncEvent>>,
485 }
486
487 impl SyncEventStream for TestSync {
488 fn event_stream(
489 &self,
490 _name: &'static str,
491 ) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
492 let (tx, rx) = unbounded();
493 self.inner.lock().unwrap().event_senders.push(tx);
494
495 Box::pin(rx)
496 }
497 }
498
499 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestSync {
500 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
501 unimplemented!();
502 }
503
504 fn new_best_block_imported(
505 &self,
506 _hash: <Block as BlockT>::Hash,
507 _number: NumberFor<Block>,
508 ) {
509 unimplemented!();
510 }
511 }
512
513 #[derive(Debug)]
514 pub(crate) struct TestNotificationService {
515 rx: UnboundedReceiver<NotificationEvent>,
516 }
517
518 #[async_trait::async_trait]
519 impl sc_network::service::traits::NotificationService for TestNotificationService {
520 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
521 unimplemented!();
522 }
523
524 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
525 unimplemented!();
526 }
527
528 fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
529 unimplemented!();
530 }
531
532 async fn send_async_notification(
533 &mut self,
534 _peer: &PeerId,
535 _notification: Vec<u8>,
536 ) -> Result<(), sc_network::error::Error> {
537 unimplemented!();
538 }
539
540 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
541 unimplemented!();
542 }
543
544 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
545 unimplemented!();
546 }
547
548 async fn next_event(&mut self) -> Option<NotificationEvent> {
549 self.rx.next().await
550 }
551
552 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
553 unimplemented!();
554 }
555
556 fn protocol(&self) -> &ProtocolName {
557 unimplemented!();
558 }
559
560 fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
561 unimplemented!();
562 }
563 }
564
565 struct AllowAll;
566 impl Validator<Block> for AllowAll {
567 fn validate(
568 &self,
569 _context: &mut dyn ValidatorContext<Block>,
570 _sender: &PeerId,
571 _data: &[u8],
572 ) -> ValidationResult<H256> {
573 ValidationResult::ProcessAndKeep(H256::default())
574 }
575 }
576
577 #[test]
582 fn returns_when_network_event_stream_closes() {
583 let network = TestNetwork::default();
584 let sync = Arc::new(TestSync::default());
585 let (tx, rx) = unbounded();
586 let notification_service = Box::new(TestNotificationService { rx });
587 let mut gossip_engine = GossipEngine::<Block>::new(
588 network.clone(),
589 sync,
590 notification_service,
591 "/my_protocol",
592 Arc::new(AllowAll {}),
593 None,
594 );
595
596 drop(tx);
598
599 block_on(poll_fn(move |ctx| {
600 if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
601 panic!(
602 "Expected gossip engine to finish on first poll, given that \
603 `GossipEngine.network_event_stream` closes right away."
604 )
605 }
606 Poll::Ready(())
607 }))
608 }
609
610 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
611 async fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
612 let topic = H256::default();
613 let protocol = ProtocolName::from("/my_protocol");
614 let remote_peer = PeerId::random();
615 let network = TestNetwork::default();
616 let sync = Arc::new(TestSync::default());
617 let (mut tx, rx) = unbounded();
618 let notification_service = Box::new(TestNotificationService { rx });
619
620 let mut gossip_engine = GossipEngine::<Block>::new(
621 network.clone(),
622 sync.clone(),
623 notification_service,
624 protocol.clone(),
625 Arc::new(AllowAll {}),
626 None,
627 );
628
629 tx.send(NotificationEvent::NotificationStreamOpened {
631 peer: remote_peer,
632 direction: Direction::Inbound,
633 negotiated_fallback: None,
634 handshake: Roles::FULL.encode(),
635 })
636 .await
637 .unwrap();
638
639 let messages = vec![vec![1], vec![2]];
640
641 tx.send(NotificationEvent::NotificationReceived {
643 peer: remote_peer,
644 notification: messages[0].clone().into(),
645 })
646 .await
647 .unwrap();
648
649 let mut subscribers = vec![];
650 for _ in 0..2 {
651 subscribers.push(gossip_engine.messages_for(topic));
652 }
653
654 tx.send(NotificationEvent::NotificationReceived {
656 peer: remote_peer,
657 notification: messages[1].clone().into(),
658 })
659 .await
660 .unwrap();
661
662 tokio::spawn(gossip_engine);
663
664 let mut subscribers =
667 subscribers.into_iter().map(|s| block_on_stream(s)).collect::<Vec<_>>();
668
669 for message in messages {
671 for subscriber in subscribers.iter_mut() {
672 assert_eq!(
673 subscriber.next(),
674 Some(TopicNotification { message: message.clone(), sender: Some(remote_peer) }),
675 );
676 }
677 }
678 }
679
680 #[test]
681 fn forwarding_to_different_size_and_topic_channels() {
682 #[derive(Clone, Debug)]
683 struct ChannelLengthAndTopic {
684 length: usize,
685 topic: H256,
686 }
687
688 impl Arbitrary for ChannelLengthAndTopic {
689 fn arbitrary(g: &mut Gen) -> Self {
690 let possible_length = (0..100).collect::<Vec<usize>>();
691 let possible_topics = (0..10).collect::<Vec<u64>>();
692 Self {
693 length: *g.choose(&possible_length).unwrap(),
694 topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
697 }
698 }
699 }
700
701 #[derive(Clone, Debug)]
702 struct Message {
703 topic: H256,
704 }
705
706 impl Arbitrary for Message {
707 fn arbitrary(g: &mut Gen) -> Self {
708 let possible_topics = (0..10).collect::<Vec<u64>>();
709 Self {
710 topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
713 }
714 }
715 }
716
717 struct TestValidator;
720
721 impl Validator<Block> for TestValidator {
722 fn validate(
723 &self,
724 _context: &mut dyn ValidatorContext<Block>,
725 _sender: &PeerId,
726 data: &[u8],
727 ) -> ValidationResult<H256> {
728 ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
729 }
730 }
731
732 fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
733 let protocol = ProtocolName::from("/my_protocol");
734 let remote_peer = PeerId::random();
735 let network = TestNetwork::default();
736 let sync = Arc::new(TestSync::default());
737 let (mut tx, rx) = unbounded();
738 let notification_service = Box::new(TestNotificationService { rx });
739
740 let num_channels_per_topic = channels.iter().fold(
741 HashMap::new(),
742 |mut acc, ChannelLengthAndTopic { topic, .. }| {
743 acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
744 acc
745 },
746 );
747
748 let expected_msgs_per_topic_all_chan = notifications
749 .iter()
750 .fold(HashMap::new(), |mut acc, messages| {
751 for message in messages {
752 acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
753 }
754 acc
755 })
756 .into_iter()
757 .map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
761 .collect::<HashMap<H256, _>>();
762
763 let mut gossip_engine = GossipEngine::<Block>::new(
764 network.clone(),
765 sync.clone(),
766 notification_service,
767 protocol.clone(),
768 Arc::new(TestValidator {}),
769 None,
770 );
771
772 let (txs, mut rxs) = channels
774 .iter()
775 .map(|ChannelLengthAndTopic { length, topic }| (*topic, channel(*length)))
776 .fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
777 acc.0.push((topic, tx));
778 acc.1.push((topic, rx));
779 acc
780 });
781
782 for (topic, tx) in txs {
784 match gossip_engine.message_sinks.get_mut(&topic) {
785 Some(entry) => entry.push(tx),
786 None => {
787 gossip_engine.message_sinks.insert(topic, vec![tx]);
788 },
789 }
790 }
791
792 tx.start_send(NotificationEvent::NotificationStreamOpened {
794 peer: remote_peer,
795 direction: Direction::Inbound,
796 negotiated_fallback: None,
797 handshake: Roles::FULL.encode(),
798 })
799 .unwrap();
800
801 for (i_notification, messages) in notifications.iter().enumerate() {
803 let messages: Vec<Vec<u8>> = messages
804 .into_iter()
805 .enumerate()
806 .map(|(i_message, Message { topic })| {
807 let mut message = topic.as_bytes().to_vec();
810
811 message.push(i_notification.try_into().unwrap());
814 message.push(i_message.try_into().unwrap());
815
816 message.into()
817 })
818 .collect();
819
820 for message in messages {
821 tx.start_send(NotificationEvent::NotificationReceived {
822 peer: remote_peer,
823 notification: message,
824 })
825 .unwrap();
826 }
827 }
828
829 let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
830
831 block_on(poll_fn(|cx| {
833 loop {
834 if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
835 unreachable!(
836 "Event stream sender side is not dropped, thus gossip engine does not \
837 terminate",
838 );
839 }
840
841 let mut progress = false;
842
843 for (topic, rx) in rxs.iter_mut() {
844 match rx.poll_next_unpin(cx) {
845 Poll::Ready(Some(_)) => {
846 progress = true;
847 received_msgs_per_topic_all_chan
848 .entry(*topic)
849 .and_modify(|e| *e += 1)
850 .or_insert(1);
851 },
852 Poll::Ready(None) => {
853 unreachable!("Sender side of channel is never dropped")
854 },
855 Poll::Pending => {},
856 }
857 }
858
859 if !progress {
860 break
861 }
862 }
863 Poll::Ready(())
864 }));
865
866 for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
868 assert_eq!(
869 received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
870 expected_num,
871 );
872 }
873 for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
874 assert_eq!(
875 expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
876 received_num,
877 );
878 }
879 }
880
881 prop(vec![], vec![vec![Message { topic: H256::default() }]]);
883 prop(
884 vec![ChannelLengthAndTopic { length: 71, topic: H256::default() }],
885 vec![vec![Message { topic: H256::default() }]],
886 );
887
888 QuickCheck::new().quickcheck(prop as fn(_, _))
889 }
890}