1use ahash::{AHashMap, AHashSet};
89use codec::{Decode, DecodeAll, Encode};
90use log::{debug, trace};
91use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
92use rand::seq::SliceRandom;
93use sc_network::ReputationChange;
94use sc_network_common::role::ObservedRole;
95use sc_network_gossip::{MessageIntent, ValidatorContext};
96use sc_network_types::PeerId;
97use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG};
98use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
99use sp_consensus_grandpa::AuthorityId;
100use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
101
102use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD};
103use crate::{environment, CatchUp, CompactCommit, SignedMessage, LOG_TARGET};
104
105use std::{
106 collections::{HashSet, VecDeque},
107 time::{Duration, Instant},
108};
109
110const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
111const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(45);
112const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
113const CATCH_UP_THRESHOLD: u64 = 2;
116
117const ROUND_DURATION: u32 = 5;
122
123const PROPAGATION_SOME: f32 = 1.5;
126
127const PROPAGATION_ALL: f32 = 3.0;
130
131const LUCKY_PEERS: usize = 4;
134
135type Report = (PeerId, ReputationChange);
136
137#[derive(Debug, PartialEq, Clone, Copy)]
139enum Consider {
140 Accept,
142 RejectPast,
144 RejectFuture,
146 RejectOutOfScope,
148}
149
150#[derive(Debug)]
152struct View<N> {
153 round: Round, set_id: SetId, last_commit: Option<N>, last_update: Option<Instant>, }
158
159impl<N> Default for View<N> {
160 fn default() -> Self {
161 View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None }
162 }
163}
164
165impl<N: Ord> View<N> {
166 fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
168 if set_id < self.set_id {
170 return Consider::RejectPast
171 }
172 if set_id > self.set_id {
173 return Consider::RejectFuture
174 }
175
176 if round.0 > self.round.0.saturating_add(1) {
178 return Consider::RejectFuture
179 }
180 if round.0 < self.round.0.saturating_sub(1) {
181 return Consider::RejectPast
182 }
183
184 Consider::Accept
185 }
186
187 fn consider_global(&self, set_id: SetId, number: N) -> Consider {
190 if set_id < self.set_id {
192 return Consider::RejectPast
193 }
194 if set_id > self.set_id {
195 return Consider::RejectFuture
196 }
197
198 match self.last_commit {
201 None => Consider::Accept,
202 Some(ref num) =>
203 if num < &number {
204 Consider::Accept
205 } else {
206 Consider::RejectPast
207 },
208 }
209 }
210}
211
212struct LocalView<N> {
216 round: Round,
217 set_id: SetId,
218 last_commit: Option<(N, Round, SetId)>,
219 round_start: Instant,
220}
221
222impl<N> LocalView<N> {
223 fn new(set_id: SetId, round: Round) -> LocalView<N> {
225 LocalView { set_id, round, last_commit: None, round_start: Instant::now() }
226 }
227
228 fn as_view(&self) -> View<&N> {
231 View {
232 round: self.round,
233 set_id: self.set_id,
234 last_commit: self.last_commit_height(),
235 last_update: None,
236 }
237 }
238
239 fn update_set(&mut self, set_id: SetId) {
241 if set_id != self.set_id {
242 self.set_id = set_id;
243 self.round = Round(1);
244 self.round_start = Instant::now();
245 }
246 }
247
248 fn update_round(&mut self, round: Round) {
250 self.round = round;
251 self.round_start = Instant::now();
252 }
253
254 fn last_commit_height(&self) -> Option<&N> {
256 self.last_commit.as_ref().map(|(number, _, _)| number)
257 }
258}
259
260const KEEP_RECENT_ROUNDS: usize = 3;
261
262struct KeepTopics<B: BlockT> {
270 current_set: SetId,
271 rounds: VecDeque<(Round, SetId)>,
272 reverse_map: AHashMap<B::Hash, (Option<Round>, SetId)>,
273}
274
275impl<B: BlockT> KeepTopics<B> {
276 fn new() -> Self {
277 KeepTopics {
278 current_set: SetId(0),
279 rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2),
280 reverse_map: Default::default(),
281 }
282 }
283
284 fn push(&mut self, round: Round, set_id: SetId) {
285 self.current_set = std::cmp::max(self.current_set, set_id);
286
287 if !self.rounds.contains(&(round, set_id)) {
291 self.rounds.push_back((round, set_id));
292 }
293
294 self.rounds.push_back((Round(round.0.saturating_add(1)), set_id));
296
297 while self.rounds.len() > KEEP_RECENT_ROUNDS + 2 {
299 let _ = self.rounds.pop_front();
300 }
301
302 let mut map = AHashMap::with_capacity(KEEP_RECENT_ROUNDS + 3);
303 map.insert(super::global_topic::<B>(self.current_set.0), (None, self.current_set));
304
305 for &(round, set) in &self.rounds {
306 map.insert(super::round_topic::<B>(round.0, set.0), (Some(round), set));
307 }
308
309 self.reverse_map = map;
310 }
311
312 fn topic_info(&self, topic: &B::Hash) -> Option<(Option<Round>, SetId)> {
313 self.reverse_map.get(topic).cloned()
314 }
315}
316
317fn neighbor_topics<B: BlockT>(view: &View<NumberFor<B>>) -> Vec<B::Hash> {
319 let s = view.set_id;
320 let mut topics =
321 vec![super::global_topic::<B>(s.0), super::round_topic::<B>(view.round.0, s.0)];
322
323 if view.round.0 != 0 {
324 let r = Round(view.round.0 - 1);
325 topics.push(super::round_topic::<B>(r.0, s.0))
326 }
327
328 topics
329}
330
331#[derive(Debug, Encode, Decode)]
334pub(super) enum GossipMessage<Block: BlockT> {
335 Vote(VoteMessage<Block>),
337 Commit(FullCommitMessage<Block>),
339 Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
341 CatchUpRequest(CatchUpRequestMessage),
343 CatchUp(FullCatchUpMessage<Block>),
345}
346
347impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
348 fn from(neighbor: NeighborPacket<NumberFor<Block>>) -> Self {
349 GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor))
350 }
351}
352
353#[derive(Debug, Encode, Decode)]
355pub(super) struct VoteMessage<Block: BlockT> {
356 pub(super) round: Round,
358 pub(super) set_id: SetId,
360 pub(super) message: SignedMessage<Block::Header>,
362}
363
364#[derive(Debug, Encode, Decode)]
366pub(super) struct FullCommitMessage<Block: BlockT> {
367 pub(super) round: Round,
369 pub(super) set_id: SetId,
371 pub(super) message: CompactCommit<Block::Header>,
373}
374
375#[derive(Debug, Encode, Decode, Clone)]
378pub(super) struct NeighborPacket<N> {
379 pub(super) round: Round,
381 pub(super) set_id: SetId,
383 pub(super) commit_finalized_height: N,
385}
386
387#[derive(Debug, Encode, Decode)]
389pub(super) enum VersionedNeighborPacket<N> {
390 #[codec(index = 1)]
391 V1(NeighborPacket<N>),
392}
393
394impl<N> VersionedNeighborPacket<N> {
395 fn into_neighbor_packet(self) -> NeighborPacket<N> {
396 match self {
397 VersionedNeighborPacket::V1(p) => p,
398 }
399 }
400}
401
402#[derive(Clone, Debug, Encode, Decode)]
404pub(super) struct CatchUpRequestMessage {
405 pub(super) round: Round,
407 pub(super) set_id: SetId,
409}
410
411#[derive(Debug, Encode, Decode)]
413pub(super) struct FullCatchUpMessage<Block: BlockT> {
414 pub(super) set_id: SetId,
416 pub(super) message: CatchUp<Block::Header>,
418}
419
420#[derive(Clone, Copy, Debug, PartialEq)]
425pub(super) enum Misbehavior {
426 InvalidViewChange,
428 DuplicateNeighborMessage,
430 UndecodablePacket(i32),
432 BadCatchUpMessage { signatures_checked: i32 },
434 BadCommitMessage { signatures_checked: i32, blocks_loaded: i32, equivocations_caught: i32 },
436 FutureMessage,
439 OutOfScopeMessage,
443}
444
445impl Misbehavior {
446 pub(super) fn cost(&self) -> ReputationChange {
447 use Misbehavior::*;
448
449 match *self {
450 InvalidViewChange => cost::INVALID_VIEW_CHANGE,
451 DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE,
452 UndecodablePacket(bytes) => ReputationChange::new(
453 bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
454 "Grandpa: Bad packet",
455 ),
456 BadCatchUpMessage { signatures_checked } => ReputationChange::new(
457 cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked),
458 "Grandpa: Bad cath-up message",
459 ),
460 BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => {
461 let cost = cost::PER_SIGNATURE_CHECKED
462 .saturating_mul(signatures_checked)
463 .saturating_add(cost::PER_BLOCK_LOADED.saturating_mul(blocks_loaded));
464
465 let benefit = equivocations_caught.saturating_mul(benefit::PER_EQUIVOCATION);
466
467 ReputationChange::new(
468 (benefit as i32).saturating_add(cost as i32),
469 "Grandpa: Bad commit",
470 )
471 },
472 FutureMessage => cost::FUTURE_MESSAGE,
473 OutOfScopeMessage => cost::OUT_OF_SCOPE_MESSAGE,
474 }
475 }
476}
477
478#[derive(Debug)]
479struct PeerInfo<N> {
480 view: View<N>,
481 roles: ObservedRole,
482}
483
484impl<N> PeerInfo<N> {
485 fn new(roles: ObservedRole) -> Self {
486 PeerInfo { view: View::default(), roles }
487 }
488}
489
490struct Peers<N> {
492 inner: AHashMap<PeerId, PeerInfo<N>>,
493 first_stage_peers: AHashSet<PeerId>,
496 second_stage_peers: HashSet<PeerId>,
500 lucky_light_peers: HashSet<PeerId>,
502 neighbor_rebroadcast_period: Duration,
505}
506
507impl<N: Ord> Peers<N> {
508 fn new(neighbor_rebroadcast_period: Duration) -> Self {
509 Peers {
510 inner: Default::default(),
511 first_stage_peers: Default::default(),
512 second_stage_peers: Default::default(),
513 lucky_light_peers: Default::default(),
514 neighbor_rebroadcast_period,
515 }
516 }
517
518 fn new_peer(&mut self, who: PeerId, role: ObservedRole) {
519 match role {
520 ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => {
521 self.first_stage_peers.insert(who);
522 },
523 ObservedRole::Authority if self.second_stage_peers.len() < LUCKY_PEERS => {
524 self.second_stage_peers.insert(who);
525 },
526 ObservedRole::Light if self.lucky_light_peers.len() < LUCKY_PEERS => {
527 self.lucky_light_peers.insert(who);
528 },
529 _ => {},
530 }
531
532 self.inner.insert(who, PeerInfo::new(role));
533 }
534
535 fn peer_disconnected(&mut self, who: &PeerId) {
536 self.inner.remove(who);
537 self.first_stage_peers.remove(who);
540 self.second_stage_peers.remove(who);
541 self.lucky_light_peers.remove(who);
542 }
543
544 fn update_peer_state(
546 &mut self,
547 who: &PeerId,
548 update: NeighborPacket<N>,
549 ) -> Result<Option<&View<N>>, Misbehavior> {
550 let Some(peer) = self.inner.get_mut(who) else { return Ok(None) };
551
552 let invalid_change = peer.view.set_id > update.set_id ||
553 peer.view.round > update.round && peer.view.set_id == update.set_id ||
554 peer.view.last_commit.as_ref() > Some(&update.commit_finalized_height);
555
556 if invalid_change {
557 return Err(Misbehavior::InvalidViewChange)
558 }
559
560 let now = Instant::now();
561 let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) ==
562 (peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref());
563
564 if duplicate_packet {
565 if let Some(last_update) = peer.view.last_update {
566 if now < last_update + self.neighbor_rebroadcast_period / 2 {
567 return Err(Misbehavior::DuplicateNeighborMessage)
568 }
569 }
570 }
571
572 peer.view = View {
573 round: update.round,
574 set_id: update.set_id,
575 last_commit: Some(update.commit_finalized_height),
576 last_update: Some(now),
577 };
578
579 trace!(
580 target: LOG_TARGET,
581 "Peer {} updated view. Now at {:?}, {:?}",
582 who,
583 peer.view.round,
584 peer.view.set_id
585 );
586
587 Ok(Some(&peer.view))
588 }
589
590 fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> {
591 let peer = match self.inner.get_mut(who) {
592 None => return Ok(()),
593 Some(p) => p,
594 };
595
596 if peer.view.last_commit.as_ref() > Some(&new_height) {
600 return Err(Misbehavior::InvalidViewChange)
601 }
602
603 peer.view.last_commit = Some(new_height);
604
605 Ok(())
606 }
607
608 fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
609 self.inner.get(who)
610 }
611
612 fn reshuffle(&mut self) {
613 let shuffled_peers = {
622 let mut peers =
623 self.inner.iter().map(|(peer_id, info)| (*peer_id, info)).collect::<Vec<_>>();
624
625 peers.shuffle(&mut rand::thread_rng());
626 peers
627 };
628
629 let shuffled_authorities = shuffled_peers.iter().filter_map(|(peer_id, info)| {
630 if matches!(info.roles, ObservedRole::Authority) {
631 Some(peer_id)
632 } else {
633 None
634 }
635 });
636
637 let mut first_stage_peers = AHashSet::new();
638 let mut second_stage_peers = HashSet::new();
639
640 let half_lucky = LUCKY_PEERS / 2;
643 let one_and_a_half_lucky = LUCKY_PEERS + half_lucky;
644 for (n_authorities_added, peer_id) in shuffled_authorities.enumerate() {
645 if n_authorities_added < half_lucky {
646 first_stage_peers.insert(*peer_id);
647 } else if n_authorities_added < one_and_a_half_lucky {
648 second_stage_peers.insert(*peer_id);
649 } else {
650 break
651 }
652 }
653
654 let n_second_stage_peers = LUCKY_PEERS.max((shuffled_peers.len() as f32).sqrt() as usize);
657 for (peer_id, info) in &shuffled_peers {
658 if info.roles.is_light() {
659 continue
660 }
661
662 if first_stage_peers.len() < LUCKY_PEERS {
663 first_stage_peers.insert(*peer_id);
664 second_stage_peers.remove(peer_id);
665 } else if second_stage_peers.len() < n_second_stage_peers {
666 if !first_stage_peers.contains(peer_id) {
667 second_stage_peers.insert(*peer_id);
668 }
669 } else {
670 break
671 }
672 }
673
674 let lucky_light_peers = shuffled_peers
676 .into_iter()
677 .filter_map(|(peer_id, info)| if info.roles.is_light() { Some(peer_id) } else { None })
678 .take(LUCKY_PEERS)
679 .collect();
680
681 self.first_stage_peers = first_stage_peers;
682 self.second_stage_peers = second_stage_peers;
683 self.lucky_light_peers = lucky_light_peers;
684 }
685}
686
687#[derive(Debug, PartialEq)]
688pub(super) enum Action<H> {
689 Keep(H, ReputationChange),
691 ProcessAndDiscard(H, ReputationChange),
693 Discard(ReputationChange),
695}
696
697#[derive(Debug)]
699enum PendingCatchUp {
700 None,
702 Requesting { who: PeerId, request: CatchUpRequestMessage, instant: Instant },
704 Processing { instant: Instant },
706}
707
708enum CatchUpConfig {
710 Enabled { only_from_authorities: bool },
718 Disabled,
722}
723
724impl CatchUpConfig {
725 fn enabled(only_from_authorities: bool) -> CatchUpConfig {
726 CatchUpConfig::Enabled { only_from_authorities }
727 }
728
729 fn disabled() -> CatchUpConfig {
730 CatchUpConfig::Disabled
731 }
732
733 fn request_allowed<N>(&self, peer: &PeerInfo<N>) -> bool {
734 match self {
735 CatchUpConfig::Disabled => false,
736 CatchUpConfig::Enabled { only_from_authorities, .. } => match peer.roles {
737 ObservedRole::Authority => true,
738 ObservedRole::Light => false,
739 ObservedRole::Full => !only_from_authorities,
740 },
741 }
742 }
743}
744
745struct Inner<Block: BlockT> {
746 local_view: Option<LocalView<NumberFor<Block>>>,
747 peers: Peers<NumberFor<Block>>,
748 live_topics: KeepTopics<Block>,
749 authorities: Vec<AuthorityId>,
750 config: crate::Config,
751 next_rebroadcast: Instant,
752 pending_catch_up: PendingCatchUp,
753 catch_up_config: CatchUpConfig,
754}
755
756type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
757
758impl<Block: BlockT> Inner<Block> {
759 fn new(config: crate::Config) -> Self {
760 let catch_up_config = if config.observer_enabled {
761 if config.local_role.is_authority() {
762 CatchUpConfig::enabled(true)
766 } else {
767 CatchUpConfig::disabled()
770 }
771 } else {
772 CatchUpConfig::enabled(false)
775 };
776
777 Inner {
778 local_view: None,
779 peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD),
780 live_topics: KeepTopics::new(),
781 next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
782 authorities: Vec::new(),
783 pending_catch_up: PendingCatchUp::None,
784 catch_up_config,
785 config,
786 }
787 }
788
789 fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
792 let local_view = self.local_view.as_mut()?;
793 if local_view.round == round {
794 return None
797 }
798
799 let set_id = local_view.set_id;
800
801 debug!(
802 target: LOG_TARGET,
803 "Voter {} noting beginning of round {:?} to network.",
804 self.config.name(),
805 (round, set_id)
806 );
807
808 local_view.update_round(round);
809
810 self.live_topics.push(round, set_id);
811 self.peers.reshuffle();
812
813 self.multicast_neighbor_packet()
814 }
815
816 fn note_set(&mut self, set_id: SetId, authorities: Vec<AuthorityId>) -> MaybeMessage<Block> {
819 let local_view = match self.local_view {
820 ref mut x @ None => x.get_or_insert(LocalView::new(set_id, Round(1))),
821 Some(ref mut v) => {
822 if v.set_id == set_id {
823 let diff_authorities = self.authorities.iter().collect::<HashSet<_>>() !=
824 authorities.iter().collect::<HashSet<_>>();
825
826 if diff_authorities {
827 debug!(
828 target: LOG_TARGET,
829 "Gossip validator noted set {:?} twice with different authorities. \
830 Was the authority set hard forked?",
831 set_id,
832 );
833
834 self.authorities = authorities;
835 }
836
837 return None
840 } else {
841 v
842 }
843 },
844 };
845
846 local_view.update_set(set_id);
847 self.live_topics.push(Round(1), set_id);
848 self.authorities = authorities;
849
850 self.multicast_neighbor_packet()
851 }
852
853 fn note_commit_finalized(
857 &mut self,
858 round: Round,
859 set_id: SetId,
860 finalized: NumberFor<Block>,
861 ) -> MaybeMessage<Block> {
862 let local_view = self.local_view.as_mut()?;
863 if local_view.last_commit_height() < Some(&finalized) {
864 local_view.last_commit = Some((finalized, round, set_id));
865 } else {
866 return None
867 }
868
869 self.multicast_neighbor_packet()
870 }
871
872 fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
873 self.local_view
874 .as_ref()
875 .map(LocalView::as_view)
876 .map(|v| v.consider_vote(round, set_id))
877 .unwrap_or(Consider::RejectOutOfScope)
878 }
879
880 fn consider_global(&self, set_id: SetId, number: NumberFor<Block>) -> Consider {
881 self.local_view
882 .as_ref()
883 .map(LocalView::as_view)
884 .map(|v| v.consider_global(set_id, &number))
885 .unwrap_or(Consider::RejectOutOfScope)
886 }
887
888 fn cost_past_rejection(
889 &self,
890 _who: &PeerId,
891 _round: Round,
892 _set_id: SetId,
893 ) -> ReputationChange {
894 cost::PAST_REJECTION
896 }
897
898 fn validate_round_message(
899 &self,
900 who: &PeerId,
901 full: &VoteMessage<Block>,
902 ) -> Action<Block::Hash> {
903 match self.consider_vote(full.round, full.set_id) {
904 Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
905 Consider::RejectOutOfScope =>
906 return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
907 Consider::RejectPast =>
908 return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
909 Consider::Accept => {},
910 }
911
912 if !self.authorities.contains(&full.message.id) {
914 debug!(target: LOG_TARGET, "Message from unknown voter: {}", full.message.id);
915 telemetry!(
916 self.config.telemetry;
917 CONSENSUS_DEBUG;
918 "afg.bad_msg_signature";
919 "signature" => ?full.message.id,
920 );
921 return Action::Discard(cost::UNKNOWN_VOTER)
922 }
923
924 if !sp_consensus_grandpa::check_message_signature(
925 &full.message.message,
926 &full.message.id,
927 &full.message.signature,
928 full.round.0,
929 full.set_id.0,
930 )
931 .is_valid()
932 {
933 debug!(target: LOG_TARGET, "Bad message signature {}", full.message.id);
934 telemetry!(
935 self.config.telemetry;
936 CONSENSUS_DEBUG;
937 "afg.bad_msg_signature";
938 "signature" => ?full.message.id,
939 );
940 return Action::Discard(cost::BAD_SIGNATURE)
941 }
942
943 let topic = super::round_topic::<Block>(full.round.0, full.set_id.0);
944 Action::Keep(topic, benefit::ROUND_MESSAGE)
945 }
946
947 fn validate_commit_message(
948 &mut self,
949 who: &PeerId,
950 full: &FullCommitMessage<Block>,
951 ) -> Action<Block::Hash> {
952 if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) {
953 return Action::Discard(misbehavior.cost())
954 }
955
956 match self.consider_global(full.set_id, full.message.target_number) {
957 Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
958 Consider::RejectPast =>
959 return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
960 Consider::RejectOutOfScope =>
961 return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
962 Consider::Accept => {},
963 }
964
965 if full.message.precommits.len() != full.message.auth_data.len() ||
966 full.message.precommits.is_empty()
967 {
968 debug!(target: LOG_TARGET, "Malformed compact commit");
969 telemetry!(
970 self.config.telemetry;
971 CONSENSUS_DEBUG;
972 "afg.malformed_compact_commit";
973 "precommits_len" => ?full.message.precommits.len(),
974 "auth_data_len" => ?full.message.auth_data.len(),
975 "precommits_is_empty" => ?full.message.precommits.is_empty(),
976 );
977 return Action::Discard(cost::MALFORMED_COMMIT)
978 }
979
980 let topic = super::global_topic::<Block>(full.set_id.0);
983 Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT)
984 }
985
986 fn validate_catch_up_message(
987 &mut self,
988 who: &PeerId,
989 full: &FullCatchUpMessage<Block>,
990 ) -> Action<Block::Hash> {
991 match &self.pending_catch_up {
992 PendingCatchUp::Requesting { who: peer, request, instant } => {
993 if peer != who {
994 return Action::Discard(Misbehavior::OutOfScopeMessage.cost())
995 }
996
997 if request.set_id != full.set_id {
998 return Action::Discard(cost::MALFORMED_CATCH_UP)
999 }
1000
1001 if request.round.0 > full.message.round_number {
1002 return Action::Discard(cost::MALFORMED_CATCH_UP)
1003 }
1004
1005 if full.message.prevotes.is_empty() || full.message.precommits.is_empty() {
1006 return Action::Discard(cost::MALFORMED_CATCH_UP)
1007 }
1008
1009 self.pending_catch_up = PendingCatchUp::Processing { instant: *instant };
1013
1014 let topic = super::global_topic::<Block>(full.set_id.0);
1016 Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP)
1017 },
1018 _ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
1019 }
1020 }
1021
1022 fn note_catch_up_message_processed(&mut self) {
1023 match &self.pending_catch_up {
1024 PendingCatchUp::Processing { .. } => {
1025 self.pending_catch_up = PendingCatchUp::None;
1026 },
1027 state => debug!(
1028 target: LOG_TARGET,
1029 "Noted processed catch up message when state was: {:?}", state,
1030 ),
1031 }
1032 }
1033
1034 fn handle_catch_up_request(
1035 &mut self,
1036 who: &PeerId,
1037 request: CatchUpRequestMessage,
1038 set_state: &environment::SharedVoterSetState<Block>,
1039 ) -> (Option<GossipMessage<Block>>, Action<Block::Hash>) {
1040 let Some(local_view) = &self.local_view else {
1041 return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1042 };
1043
1044 if request.set_id != local_view.set_id {
1045 if request.set_id.0.saturating_add(1) == local_view.set_id.0 &&
1050 local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0
1051 {
1052 return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP))
1053 }
1054
1055 return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1056 }
1057
1058 match self.peers.peer(who) {
1059 None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
1060 Some(peer) if peer.view.round >= request.round =>
1061 return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
1062 _ => {},
1063 }
1064
1065 let last_completed_round = set_state.read().last_completed_round();
1066 if last_completed_round.number < request.round.0 {
1067 return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1068 }
1069
1070 trace!(
1071 target: LOG_TARGET,
1072 "Replying to catch-up request for round {} from {} with round {}",
1073 request.round.0,
1074 who,
1075 last_completed_round.number,
1076 );
1077
1078 let mut prevotes = Vec::new();
1079 let mut precommits = Vec::new();
1080
1081 for vote in last_completed_round.votes {
1087 match vote.message {
1088 finality_grandpa::Message::Prevote(prevote) => {
1089 prevotes.push(finality_grandpa::SignedPrevote {
1090 prevote,
1091 signature: vote.signature,
1092 id: vote.id,
1093 });
1094 },
1095 finality_grandpa::Message::Precommit(precommit) => {
1096 precommits.push(finality_grandpa::SignedPrecommit {
1097 precommit,
1098 signature: vote.signature,
1099 id: vote.id,
1100 });
1101 },
1102 _ => {},
1103 }
1104 }
1105
1106 let (base_hash, base_number) = last_completed_round.base;
1107
1108 let catch_up = CatchUp::<Block::Header> {
1109 round_number: last_completed_round.number,
1110 prevotes,
1111 precommits,
1112 base_hash,
1113 base_number,
1114 };
1115
1116 let full_catch_up = GossipMessage::CatchUp::<Block>(FullCatchUpMessage {
1117 set_id: request.set_id,
1118 message: catch_up,
1119 });
1120
1121 (Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY))
1122 }
1123
1124 fn try_catch_up(&mut self, who: &PeerId) -> (Option<GossipMessage<Block>>, Option<Report>) {
1125 let mut catch_up = None;
1126 let mut report = None;
1127
1128 if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) {
1134 if self.catch_up_config.request_allowed(peer) &&
1135 peer.view.set_id == local_view.set_id &&
1136 peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0
1137 {
1138 let round = peer.view.round.0 - 1; let request =
1141 CatchUpRequestMessage { set_id: peer.view.set_id, round: Round(round) };
1142
1143 let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request);
1144
1145 if catch_up_allowed {
1146 debug!(
1147 target: LOG_TARGET,
1148 "Sending catch-up request for round {} to {}", round, who,
1149 );
1150
1151 catch_up = Some(GossipMessage::<Block>::CatchUpRequest(request));
1152 }
1153
1154 report = catch_up_report;
1155 }
1156 }
1157
1158 (catch_up, report)
1159 }
1160
1161 fn import_neighbor_message(
1162 &mut self,
1163 who: &PeerId,
1164 update: NeighborPacket<NumberFor<Block>>,
1165 ) -> (Vec<Block::Hash>, Action<Block::Hash>, Option<GossipMessage<Block>>, Option<Report>) {
1166 let update_res = self.peers.update_peer_state(who, update);
1167
1168 let (cost_benefit, topics) = match update_res {
1169 Ok(view) =>
1170 (benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::<Block>(view))),
1171 Err(misbehavior) => (misbehavior.cost(), None),
1172 };
1173
1174 let (catch_up, report) =
1175 if update_res.is_ok() { self.try_catch_up(who) } else { (None, None) };
1176
1177 let neighbor_topics = topics.unwrap_or_default();
1178
1179 let action = Action::Discard(cost_benefit);
1181
1182 (neighbor_topics, action, catch_up, report)
1183 }
1184
1185 fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
1186 self.local_view.as_ref().map(|local_view| {
1187 let packet = NeighborPacket {
1188 round: local_view.round,
1189 set_id: local_view.set_id,
1190 commit_finalized_height: *local_view.last_commit_height().unwrap_or(&Zero::zero()),
1191 };
1192
1193 let peers = self.peers.inner.keys().cloned().collect();
1194
1195 (peers, packet)
1196 })
1197 }
1198
1199 fn note_catch_up_request(
1200 &mut self,
1201 who: &PeerId,
1202 catch_up_request: &CatchUpRequestMessage,
1203 ) -> (bool, Option<Report>) {
1204 let report = match &self.pending_catch_up {
1205 PendingCatchUp::Requesting { who: peer, instant, .. } => {
1206 if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT {
1207 return (false, None)
1208 } else {
1209 Some((*peer, cost::CATCH_UP_REQUEST_TIMEOUT))
1211 }
1212 },
1213 PendingCatchUp::Processing { instant, .. } => {
1214 if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT {
1215 return (false, None)
1216 } else {
1217 None
1218 }
1219 },
1220 _ => None,
1221 };
1222
1223 self.pending_catch_up = PendingCatchUp::Requesting {
1224 who: *who,
1225 request: catch_up_request.clone(),
1226 instant: Instant::now(),
1227 };
1228
1229 (true, report)
1230 }
1231
1232 fn round_message_allowed(&self, who: &PeerId) -> bool {
1243 let round_duration = self.config.gossip_duration * ROUND_DURATION;
1244 let round_elapsed = match self.local_view {
1245 Some(ref local_view) => local_view.round_start.elapsed(),
1246 None => return false,
1247 };
1248
1249 if round_elapsed < round_duration.mul_f32(PROPAGATION_SOME) {
1250 self.peers.first_stage_peers.contains(who)
1251 } else if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
1252 self.peers.first_stage_peers.contains(who) ||
1253 self.peers.second_stage_peers.contains(who)
1254 } else {
1255 self.peers.peer(who).map(|info| !info.roles.is_light()).unwrap_or(false)
1256 }
1257 }
1258
1259 fn global_message_allowed(&self, who: &PeerId) -> bool {
1274 let round_duration = self.config.gossip_duration * ROUND_DURATION;
1275 let round_elapsed = match self.local_view {
1276 Some(ref local_view) => local_view.round_start.elapsed(),
1277 None => return false,
1278 };
1279
1280 if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
1281 self.peers.first_stage_peers.contains(who) ||
1282 self.peers.second_stage_peers.contains(who) ||
1283 self.peers.lucky_light_peers.contains(who)
1284 } else {
1285 true
1286 }
1287 }
1288}
1289
1290pub(crate) struct Metrics {
1292 messages_validated: CounterVec<U64>,
1293}
1294
1295impl Metrics {
1296 pub(crate) fn register(
1297 registry: &prometheus_endpoint::Registry,
1298 ) -> Result<Self, PrometheusError> {
1299 Ok(Self {
1300 messages_validated: register(
1301 CounterVec::new(
1302 Opts::new(
1303 "substrate_finality_grandpa_communication_gossip_validator_messages",
1304 "Number of messages validated by the finality grandpa gossip validator.",
1305 ),
1306 &["message", "action"],
1307 )?,
1308 registry,
1309 )?,
1310 })
1311 }
1312}
1313
1314pub(super) struct GossipValidator<Block: BlockT> {
1316 inner: parking_lot::RwLock<Inner<Block>>,
1317 set_state: environment::SharedVoterSetState<Block>,
1318 report_sender: TracingUnboundedSender<PeerReport>,
1319 metrics: Option<Metrics>,
1320 telemetry: Option<TelemetryHandle>,
1321}
1322
1323impl<Block: BlockT> GossipValidator<Block> {
1324 pub(super) fn new(
1328 config: crate::Config,
1329 set_state: environment::SharedVoterSetState<Block>,
1330 prometheus_registry: Option<&Registry>,
1331 telemetry: Option<TelemetryHandle>,
1332 ) -> (GossipValidator<Block>, TracingUnboundedReceiver<PeerReport>) {
1333 let metrics = match prometheus_registry.map(Metrics::register) {
1334 Some(Ok(metrics)) => Some(metrics),
1335 Some(Err(e)) => {
1336 debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
1337 None
1338 },
1339 None => None,
1340 };
1341
1342 let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
1343 let val = GossipValidator {
1344 inner: parking_lot::RwLock::new(Inner::new(config)),
1345 set_state,
1346 report_sender: tx,
1347 metrics,
1348 telemetry,
1349 };
1350
1351 (val, rx)
1352 }
1353
1354 pub(super) fn note_round<F>(&self, round: Round, send_neighbor: F)
1356 where
1357 F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1358 {
1359 let maybe_msg = self.inner.write().note_round(round);
1360 if let Some((to, msg)) = maybe_msg {
1361 send_neighbor(to, msg);
1362 }
1363 }
1364
1365 pub(super) fn note_set<F>(&self, set_id: SetId, authorities: Vec<AuthorityId>, send_neighbor: F)
1368 where
1369 F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1370 {
1371 let maybe_msg = self.inner.write().note_set(set_id, authorities);
1372 if let Some((to, msg)) = maybe_msg {
1373 send_neighbor(to, msg);
1374 }
1375 }
1376
1377 pub(super) fn note_commit_finalized<F>(
1381 &self,
1382 round: Round,
1383 set_id: SetId,
1384 finalized: NumberFor<Block>,
1385 send_neighbor: F,
1386 ) where
1387 F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1388 {
1389 let maybe_msg = self.inner.write().note_commit_finalized(round, set_id, finalized);
1390
1391 if let Some((to, msg)) = maybe_msg {
1392 send_neighbor(to, msg);
1393 }
1394 }
1395
1396 pub(super) fn note_catch_up_message_processed(&self) {
1398 self.inner.write().note_catch_up_message_processed();
1399 }
1400
1401 fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
1402 let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
1403 }
1404
1405 pub(super) fn do_validate(
1406 &self,
1407 who: &PeerId,
1408 mut data: &[u8],
1409 ) -> (Action<Block::Hash>, Vec<Block::Hash>, Option<GossipMessage<Block>>) {
1410 let mut broadcast_topics = Vec::new();
1411 let mut peer_reply = None;
1412
1413 let message_name;
1415
1416 let action = {
1417 match GossipMessage::<Block>::decode_all(&mut data) {
1418 Ok(GossipMessage::Vote(ref message)) => {
1419 message_name = Some("vote");
1420 self.inner.write().validate_round_message(who, message)
1421 },
1422 Ok(GossipMessage::Commit(ref message)) => {
1423 message_name = Some("commit");
1424 self.inner.write().validate_commit_message(who, message)
1425 },
1426 Ok(GossipMessage::Neighbor(update)) => {
1427 message_name = Some("neighbor");
1428 let (topics, action, catch_up, report) = self
1429 .inner
1430 .write()
1431 .import_neighbor_message(who, update.into_neighbor_packet());
1432
1433 if let Some((peer, cost_benefit)) = report {
1434 self.report(peer, cost_benefit);
1435 }
1436
1437 broadcast_topics = topics;
1438 peer_reply = catch_up;
1439 action
1440 },
1441 Ok(GossipMessage::CatchUp(ref message)) => {
1442 message_name = Some("catch_up");
1443 self.inner.write().validate_catch_up_message(who, message)
1444 },
1445 Ok(GossipMessage::CatchUpRequest(request)) => {
1446 message_name = Some("catch_up_request");
1447 let (reply, action) =
1448 self.inner.write().handle_catch_up_request(who, request, &self.set_state);
1449
1450 peer_reply = reply;
1451 action
1452 },
1453 Err(e) => {
1454 message_name = None;
1455 debug!(target: LOG_TARGET, "Error decoding message: {}", e);
1456 telemetry!(
1457 self.telemetry;
1458 CONSENSUS_DEBUG;
1459 "afg.err_decoding_msg";
1460 "" => "",
1461 );
1462
1463 let len = std::cmp::min(i32::MAX as usize, data.len()) as i32;
1464 Action::Discard(Misbehavior::UndecodablePacket(len).cost())
1465 },
1466 }
1467 };
1468
1469 if let (Some(metrics), Some(message_name)) = (&self.metrics, message_name) {
1471 let action_name = match action {
1472 Action::Keep(_, _) => "keep",
1473 Action::ProcessAndDiscard(_, _) => "process_and_discard",
1474 Action::Discard(_) => "discard",
1475 };
1476 metrics.messages_validated.with_label_values(&[message_name, action_name]).inc();
1477 }
1478
1479 (action, broadcast_topics, peer_reply)
1480 }
1481
1482 #[cfg(test)]
1483 fn inner(&self) -> &parking_lot::RwLock<Inner<Block>> {
1484 &self.inner
1485 }
1486}
1487
1488impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Block> {
1489 fn new_peer(
1490 &self,
1491 context: &mut dyn ValidatorContext<Block>,
1492 who: &PeerId,
1493 roles: ObservedRole,
1494 ) {
1495 let packet = {
1496 let mut inner = self.inner.write();
1497 inner.peers.new_peer(*who, roles);
1498
1499 inner.local_view.as_ref().map(|v| NeighborPacket {
1500 round: v.round,
1501 set_id: v.set_id,
1502 commit_finalized_height: *v.last_commit_height().unwrap_or(&Zero::zero()),
1503 })
1504 };
1505
1506 if let Some(packet) = packet {
1507 let packet_data = GossipMessage::<Block>::from(packet).encode();
1508 context.send_message(who, packet_data);
1509 }
1510 }
1511
1512 fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
1513 self.inner.write().peers.peer_disconnected(who);
1514 }
1515
1516 fn validate(
1517 &self,
1518 context: &mut dyn ValidatorContext<Block>,
1519 who: &PeerId,
1520 data: &[u8],
1521 ) -> sc_network_gossip::ValidationResult<Block::Hash> {
1522 let (action, broadcast_topics, peer_reply) = self.do_validate(who, data);
1523
1524 if let Some(msg) = peer_reply {
1526 context.send_message(who, msg.encode());
1527 }
1528
1529 for topic in broadcast_topics {
1530 context.send_topic(who, topic, false);
1531 }
1532
1533 match action {
1534 Action::Keep(topic, cb) => {
1535 self.report(*who, cb);
1536 context.broadcast_message(topic, data.to_vec(), false);
1537 sc_network_gossip::ValidationResult::ProcessAndKeep(topic)
1538 },
1539 Action::ProcessAndDiscard(topic, cb) => {
1540 self.report(*who, cb);
1541 sc_network_gossip::ValidationResult::ProcessAndDiscard(topic)
1542 },
1543 Action::Discard(cb) => {
1544 self.report(*who, cb);
1545 sc_network_gossip::ValidationResult::Discard
1546 },
1547 }
1548 }
1549
1550 fn message_allowed<'a>(
1551 &'a self,
1552 ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
1553 let (inner, do_rebroadcast) = {
1554 use parking_lot::RwLockWriteGuard;
1555
1556 let mut inner = self.inner.write();
1557 let now = Instant::now();
1558 let do_rebroadcast = if now >= inner.next_rebroadcast {
1559 inner.next_rebroadcast = now + REBROADCAST_AFTER;
1560 true
1561 } else {
1562 false
1563 };
1564
1565 (RwLockWriteGuard::downgrade(inner), do_rebroadcast)
1567 };
1568
1569 Box::new(move |who, intent, topic, mut data| {
1570 if let MessageIntent::PeriodicRebroadcast = intent {
1571 return do_rebroadcast
1572 }
1573
1574 let peer = match inner.peers.peer(who) {
1575 None => return false,
1576 Some(x) => x,
1577 };
1578
1579 let Some((maybe_round, set_id)) = inner.live_topics.topic_info(topic) else {
1582 return false
1583 };
1584
1585 if let MessageIntent::Broadcast = intent {
1586 if maybe_round.is_some() {
1587 if !inner.round_message_allowed(who) {
1588 return false
1590 }
1591 } else if !inner.global_message_allowed(who) {
1592 return false
1594 }
1595 }
1596
1597 if let Some(round) = maybe_round {
1599 return peer.view.consider_vote(round, set_id) == Consider::Accept
1600 }
1601
1602 let Some(local_view) = &inner.local_view else {
1604 return false };
1606
1607 match GossipMessage::<Block>::decode_all(&mut data) {
1608 Err(_) => false,
1609 Ok(GossipMessage::Commit(full)) => {
1610 peer.view.consider_global(set_id, full.message.target_number) ==
1615 Consider::Accept && Some(&full.message.target_number) ==
1616 local_view.last_commit_height()
1617 },
1618 Ok(GossipMessage::Neighbor(_)) => false,
1619 Ok(GossipMessage::CatchUpRequest(_)) => false,
1620 Ok(GossipMessage::CatchUp(_)) => false,
1621 Ok(GossipMessage::Vote(_)) => false, }
1623 })
1624 }
1625
1626 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
1627 let inner = self.inner.read();
1628 Box::new(move |topic, mut data| {
1629 match inner.live_topics.topic_info(&topic) {
1632 None => return true,
1633 Some((Some(_), _)) => return false,
1635 Some((None, _)) => {},
1636 };
1637
1638 let Some(local_view) = &inner.local_view else {
1639 return true };
1641
1642 match GossipMessage::<Block>::decode_all(&mut data) {
1644 Err(_) => true,
1645 Ok(GossipMessage::Commit(full)) => match local_view.last_commit {
1646 Some((number, round, set_id)) =>
1647 !(full.message.target_number == number &&
1650 full.round == round && full.set_id == set_id),
1651 None => true,
1652 },
1653 Ok(_) => true,
1654 }
1655 })
1656 }
1657}
1658
1659pub(super) struct PeerReport {
1661 pub who: PeerId,
1662 pub cost_benefit: ReputationChange,
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667 use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *};
1668 use crate::communication;
1669 use sc_network::config::Role;
1670 use sc_network_gossip::Validator as GossipValidatorT;
1671 use sp_core::{crypto::UncheckedFrom, H256};
1672 use std::time::Instant;
1673 use substrate_test_runtime_client::runtime::{Block, Header};
1674
1675 fn config() -> crate::Config {
1677 crate::Config {
1678 gossip_duration: Duration::from_millis(10),
1679 justification_generation_period: 256,
1680 keystore: None,
1681 name: None,
1682 local_role: Role::Authority,
1683 observer_enabled: true,
1684 telemetry: None,
1685 protocol_name: communication::grandpa_protocol_name::NAME.into(),
1686 }
1687 }
1688
1689 fn voter_set_state() -> SharedVoterSetState<Block> {
1691 use crate::{authorities::AuthoritySet, environment::VoterSetState};
1692
1693 let base = (H256::zero(), 0);
1694
1695 let voters = vec![(AuthorityId::unchecked_from([1; 32]), 1)];
1696 let voters = AuthoritySet::genesis(voters).unwrap();
1697
1698 let set_state = VoterSetState::live(0, &voters, base);
1699
1700 set_state.into()
1701 }
1702
1703 #[test]
1704 fn view_vote_rules() {
1705 let view = View {
1706 round: Round(100),
1707 set_id: SetId(1),
1708 last_commit: Some(1000u64),
1709 last_update: None,
1710 };
1711
1712 assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast);
1713 assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast);
1714 assert_eq!(view.consider_vote(Round(1000), SetId(0)), Consider::RejectPast);
1715
1716 assert_eq!(view.consider_vote(Round(99), SetId(1)), Consider::Accept);
1717 assert_eq!(view.consider_vote(Round(100), SetId(1)), Consider::Accept);
1718 assert_eq!(view.consider_vote(Round(101), SetId(1)), Consider::Accept);
1719
1720 assert_eq!(view.consider_vote(Round(102), SetId(1)), Consider::RejectFuture);
1721 assert_eq!(view.consider_vote(Round(1), SetId(2)), Consider::RejectFuture);
1722 assert_eq!(view.consider_vote(Round(1000), SetId(2)), Consider::RejectFuture);
1723 }
1724
1725 #[test]
1726 fn view_global_message_rules() {
1727 let view = View {
1728 round: Round(100),
1729 set_id: SetId(2),
1730 last_commit: Some(1000u64),
1731 last_update: None,
1732 };
1733
1734 assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture);
1735 assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture);
1736 assert_eq!(view.consider_global(SetId(3), 10000), Consider::RejectFuture);
1737
1738 assert_eq!(view.consider_global(SetId(1), 1), Consider::RejectPast);
1739 assert_eq!(view.consider_global(SetId(1), 1000), Consider::RejectPast);
1740 assert_eq!(view.consider_global(SetId(1), 10000), Consider::RejectPast);
1741
1742 assert_eq!(view.consider_global(SetId(2), 1), Consider::RejectPast);
1743 assert_eq!(view.consider_global(SetId(2), 1000), Consider::RejectPast);
1744 assert_eq!(view.consider_global(SetId(2), 1001), Consider::Accept);
1745 assert_eq!(view.consider_global(SetId(2), 10000), Consider::Accept);
1746 }
1747
1748 #[test]
1749 fn unknown_peer_cannot_be_updated() {
1750 let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
1751 let id = PeerId::random();
1752
1753 let update =
1754 NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50 };
1755
1756 let res = peers.update_peer_state(&id, update.clone());
1757 assert!(res.unwrap().is_none());
1758
1759 peers.new_peer(id, ObservedRole::Authority);
1761 peers.peer_disconnected(&id);
1762
1763 let res = peers.update_peer_state(&id, update.clone());
1764 assert!(res.unwrap().is_none());
1765 }
1766
1767 #[test]
1768 fn update_peer_state() {
1769 let update1 =
1770 NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50u32 };
1771
1772 let update2 =
1773 NeighborPacket { round: Round(6), set_id: SetId(10), commit_finalized_height: 60 };
1774
1775 let update3 =
1776 NeighborPacket { round: Round(2), set_id: SetId(11), commit_finalized_height: 61 };
1777
1778 let update4 =
1779 NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 };
1780
1781 const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1);
1784 let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD);
1785 let id = PeerId::random();
1786
1787 peers.new_peer(id, ObservedRole::Authority);
1788
1789 let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| {
1790 let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap();
1791 assert_eq!(view.round, update.round);
1792 assert_eq!(view.set_id, update.set_id);
1793 assert_eq!(view.last_commit, Some(update.commit_finalized_height));
1794 };
1795
1796 check_update(&mut peers, update1);
1797 check_update(&mut peers, update2);
1798 check_update(&mut peers, update3);
1799 check_update(&mut peers, update4.clone());
1800
1801 peers.inner.get_mut(&id).unwrap().view.last_update =
1803 Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD);
1804 check_update(&mut peers, update4);
1805 }
1806
1807 #[test]
1808 fn invalid_view_change() {
1809 let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
1810
1811 let id = PeerId::random();
1812 peers.new_peer(id, ObservedRole::Authority);
1813
1814 peers
1815 .update_peer_state(
1816 &id,
1817 NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
1818 )
1819 .unwrap()
1820 .unwrap();
1821
1822 let mut check_update = move |update: NeighborPacket<_>, misbehavior| {
1823 let err = peers.update_peer_state(&id, update.clone()).unwrap_err();
1824 assert_eq!(err, misbehavior);
1825 };
1826
1827 check_update(
1829 NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 },
1830 Misbehavior::InvalidViewChange,
1831 );
1832 check_update(
1834 NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 },
1835 Misbehavior::InvalidViewChange,
1836 );
1837 check_update(
1839 NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 },
1840 Misbehavior::InvalidViewChange,
1841 );
1842 check_update(
1844 NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
1845 Misbehavior::DuplicateNeighborMessage,
1846 );
1847 check_update(
1849 NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 },
1850 Misbehavior::InvalidViewChange,
1851 );
1852 check_update(
1854 NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 },
1855 Misbehavior::InvalidViewChange,
1856 );
1857 }
1858
1859 #[test]
1860 fn messages_not_expired_immediately() {
1861 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1862
1863 let set_id = 1;
1864
1865 val.note_set(SetId(set_id), Vec::new(), |_, _| {});
1866
1867 for round_num in 1u64..10 {
1868 val.note_round(Round(round_num), |_, _| {});
1869 }
1870
1871 {
1872 let mut is_expired = val.message_expired();
1873 let last_kept_round = 10u64 - KEEP_RECENT_ROUNDS as u64 - 1;
1874
1875 for round_num in 1u64..last_kept_round {
1877 let topic = communication::round_topic::<Block>(round_num, 1);
1878 assert!(is_expired(topic, &[1, 2, 3]));
1879 }
1880
1881 for round_num in last_kept_round..10 {
1883 let topic = communication::round_topic::<Block>(round_num, 1);
1884 assert!(!is_expired(topic, &[1, 2, 3]));
1885 }
1886 }
1887 }
1888
1889 #[test]
1890 fn message_from_unknown_authority_discarded() {
1891 assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE);
1892
1893 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1894 let set_id = 1;
1895 let auth = AuthorityId::unchecked_from([1u8; 32]);
1896 let peer = PeerId::random();
1897
1898 val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
1899 val.note_round(Round(1), |_, _| {});
1900
1901 let inner = val.inner.read();
1902 let unknown_voter = inner.validate_round_message(
1903 &peer,
1904 &VoteMessage {
1905 round: Round(1),
1906 set_id: SetId(set_id),
1907 message: SignedMessage::<Header> {
1908 message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
1909 target_hash: Default::default(),
1910 target_number: 10,
1911 }),
1912 signature: UncheckedFrom::unchecked_from([1; 64]),
1913 id: UncheckedFrom::unchecked_from([2u8; 32]),
1914 },
1915 },
1916 );
1917
1918 let bad_sig = inner.validate_round_message(
1919 &peer,
1920 &VoteMessage {
1921 round: Round(1),
1922 set_id: SetId(set_id),
1923 message: SignedMessage::<Header> {
1924 message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
1925 target_hash: Default::default(),
1926 target_number: 10,
1927 }),
1928 signature: UncheckedFrom::unchecked_from([1; 64]),
1929 id: auth.clone(),
1930 },
1931 },
1932 );
1933
1934 assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER));
1935 assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE));
1936 }
1937
1938 #[test]
1939 fn unsolicited_catch_up_messages_discarded() {
1940 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1941
1942 let set_id = 1;
1943 let auth = AuthorityId::unchecked_from([1u8; 32]);
1944 let peer = PeerId::random();
1945
1946 val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
1947 val.note_round(Round(1), |_, _| {});
1948
1949 let validate_catch_up = || {
1950 let mut inner = val.inner.write();
1951 inner.validate_catch_up_message(
1952 &peer,
1953 &FullCatchUpMessage {
1954 set_id: SetId(set_id),
1955 message: finality_grandpa::CatchUp {
1956 round_number: 10,
1957 prevotes: Default::default(),
1958 precommits: Default::default(),
1959 base_hash: Default::default(),
1960 base_number: Default::default(),
1961 },
1962 },
1963 )
1964 };
1965
1966 assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
1968
1969 let noted = val.inner.write().note_catch_up_request(
1970 &peer,
1971 &CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
1972 );
1973
1974 assert!(noted.0);
1975
1976 assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP));
1979 }
1980
1981 #[test]
1982 fn unanswerable_catch_up_requests_discarded() {
1983 let set_state: SharedVoterSetState<Block> = {
1985 let mut completed_rounds = voter_set_state().read().completed_rounds();
1986
1987 completed_rounds.push(environment::CompletedRound {
1988 number: 2,
1989 state: finality_grandpa::round::State::genesis(Default::default()),
1990 base: Default::default(),
1991 votes: Default::default(),
1992 });
1993
1994 let mut current_rounds = environment::CurrentRounds::<Block>::new();
1995 current_rounds.insert(3, environment::HasVoted::No);
1996
1997 let set_state =
1998 environment::VoterSetState::<Block>::Live { completed_rounds, current_rounds };
1999
2000 set_state.into()
2001 };
2002
2003 let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
2004
2005 let set_id = 1;
2006 let auth = AuthorityId::unchecked_from([1u8; 32]);
2007 let peer = PeerId::random();
2008
2009 val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
2010 val.note_round(Round(3), |_, _| {});
2011
2012 let mut inner = val.inner.write();
2015 inner.peers.new_peer(peer, ObservedRole::Authority);
2016
2017 let res = inner.handle_catch_up_request(
2018 &peer,
2019 CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
2020 &set_state,
2021 );
2022
2023 assert!(res.0.is_none());
2025 assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
2026
2027 let res = inner.handle_catch_up_request(
2028 &peer,
2029 CatchUpRequestMessage { set_id: SetId(set_id), round: Round(2) },
2030 &set_state,
2031 );
2032
2033 match res.0.unwrap() {
2035 GossipMessage::CatchUp(catch_up) => {
2036 assert_eq!(catch_up.set_id, SetId(set_id));
2037 assert_eq!(catch_up.message.round_number, 2);
2038
2039 assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY));
2040 },
2041 _ => panic!("expected catch up message"),
2042 };
2043 }
2044
2045 #[test]
2046 fn detects_honest_out_of_scope_catch_requests() {
2047 let set_state = voter_set_state();
2048 let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
2049
2050 val.note_set(SetId(2), Vec::new(), |_, _| {});
2052
2053 let peer = PeerId::random();
2056 val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2057
2058 let send_request = |set_id, round| {
2059 let mut inner = val.inner.write();
2060 inner.handle_catch_up_request(
2061 &peer,
2062 CatchUpRequestMessage { set_id: SetId(set_id), round: Round(round) },
2063 &set_state,
2064 )
2065 };
2066
2067 let assert_res = |res: (Option<_>, Action<_>), honest| {
2068 assert!(res.0.is_none());
2069 assert_eq!(
2070 res.1,
2071 if honest {
2072 Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP)
2073 } else {
2074 Action::Discard(Misbehavior::OutOfScopeMessage.cost())
2075 },
2076 );
2077 };
2078
2079 assert_res(send_request(1, 1), true);
2083
2084 assert_res(send_request(1, 10), true);
2085
2086 assert_res(send_request(0, 1), false);
2088
2089 assert_res(send_request(0, 10), false);
2090
2091 val.note_round(Round(3), |_, _| {});
2095
2096 assert_res(send_request(1, 1), false);
2097
2098 assert_res(send_request(1, 2), false);
2099 }
2100
2101 #[test]
2102 fn issues_catch_up_request_on_neighbor_packet_import() {
2103 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2104
2105 val.note_set(SetId(1), Vec::new(), |_, _| {});
2107
2108 let peer = PeerId::random();
2111 val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2112
2113 let import_neighbor_message = |set_id, round| {
2114 let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2115 &peer,
2116 NeighborPacket {
2117 round: Round(round),
2118 set_id: SetId(set_id),
2119 commit_finalized_height: 42,
2120 },
2121 );
2122
2123 catch_up_request
2124 };
2125
2126 match import_neighbor_message(1, 42) {
2129 Some(GossipMessage::CatchUpRequest(request)) => {
2130 assert_eq!(request.set_id, SetId(1));
2131 assert_eq!(request.round, Round(41));
2132 },
2133 _ => panic!("expected catch up message"),
2134 }
2135
2136 val.note_round(Round(41), |_, _| {});
2138
2139 match import_neighbor_message(1, 42) {
2142 None => {},
2143 _ => panic!("expected no catch up message"),
2144 }
2145
2146 match import_neighbor_message(1, 40) {
2148 None => {},
2149 _ => panic!("expected no catch up message"),
2150 }
2151
2152 match import_neighbor_message(2, 42) {
2154 None => {},
2155 _ => panic!("expected no catch up message"),
2156 }
2157 }
2158
2159 #[test]
2160 fn doesnt_send_catch_up_requests_when_disabled() {
2161 let config = {
2163 let mut c = config();
2164
2165 c.local_role = Role::Full;
2168 c.observer_enabled = true;
2169
2170 c
2171 };
2172
2173 let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2174
2175 val.note_set(SetId(1), Vec::new(), |_, _| {});
2177
2178 let peer = PeerId::random();
2181 val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2182
2183 let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2187 &peer,
2188 NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2189 );
2190
2191 match catch_up_request {
2192 None => {},
2193 _ => panic!("expected no catch up message"),
2194 }
2195 }
2196
2197 #[test]
2198 fn doesnt_send_catch_up_requests_to_non_authorities_when_observer_enabled() {
2199 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2200
2201 val.note_set(SetId(1), Vec::new(), |_, _| {});
2203
2204 let peer_authority = PeerId::random();
2207 let peer_full = PeerId::random();
2208
2209 val.inner.write().peers.new_peer(peer_authority, ObservedRole::Authority);
2210 val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
2211
2212 let import_neighbor_message = |peer| {
2213 let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2214 &peer,
2215 NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2216 );
2217
2218 catch_up_request
2219 };
2220
2221 if import_neighbor_message(peer_full).is_some() {
2225 panic!("expected no catch up message");
2226 }
2227
2228 match import_neighbor_message(peer_authority) {
2231 Some(GossipMessage::CatchUpRequest(request)) => {
2232 assert_eq!(request.set_id, SetId(1));
2233 assert_eq!(request.round, Round(41));
2234 },
2235 _ => panic!("expected catch up message"),
2236 }
2237 }
2238
2239 #[test]
2240 fn sends_catch_up_requests_to_non_authorities_when_observer_disabled() {
2241 let config = {
2242 let mut c = config();
2243
2244 c.observer_enabled = false;
2247
2248 c
2249 };
2250
2251 let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2252
2253 val.note_set(SetId(1), Vec::new(), |_, _| {});
2255
2256 let peer_full = PeerId::random();
2259 val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
2260
2261 let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2262 &peer_full,
2263 NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2264 );
2265
2266 match catch_up_request {
2271 Some(GossipMessage::CatchUpRequest(request)) => {
2272 assert_eq!(request.set_id, SetId(1));
2273 assert_eq!(request.round, Round(41));
2274 },
2275 _ => panic!("expected catch up message"),
2276 }
2277 }
2278
2279 #[test]
2280 fn doesnt_expire_next_round_messages() {
2281 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2283
2284 val.note_set(SetId(1), Vec::new(), |_, _| {});
2286
2287 val.note_round(Round(9), |_, _| {});
2289 val.note_round(Round(10), |_, _| {});
2290
2291 let mut is_expired = val.message_expired();
2292
2293 for round in &[9, 10, 11] {
2296 assert!(!is_expired(communication::round_topic::<Block>(*round, 1), &[]))
2297 }
2298 }
2299
2300 #[test]
2301 fn progressively_gossips_to_more_peers_as_round_duration_increases() {
2302 let mut config = config();
2303 config.gossip_duration = Duration::from_secs(300); let round_duration = config.gossip_duration * ROUND_DURATION;
2305
2306 let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2307
2308 val.note_set(SetId(0), Vec::new(), |_, _| {});
2310
2311 let mut authorities = Vec::new();
2313 authorities.resize_with(30, || PeerId::random());
2314
2315 let mut full_nodes = Vec::new();
2316 full_nodes.resize_with(30, || PeerId::random());
2317
2318 for i in 0..30 {
2319 val.inner.write().peers.new_peer(authorities[i], ObservedRole::Authority);
2320
2321 val.inner.write().peers.new_peer(full_nodes[i], ObservedRole::Full);
2322 }
2323
2324 let test = |rounds_elapsed, peers| {
2325 val.inner.write().local_view.as_mut().unwrap().round_start = Instant::now() -
2327 Duration::from_millis(
2328 (round_duration.as_millis() as f32 * rounds_elapsed) as u64,
2329 );
2330
2331 val.inner.write().peers.reshuffle();
2332
2333 let mut message_allowed = val.message_allowed();
2334
2335 move || {
2336 let mut allowed = 0;
2337 for peer in peers {
2338 if message_allowed(
2339 peer,
2340 MessageIntent::Broadcast,
2341 &communication::round_topic::<Block>(1, 0),
2342 &[],
2343 ) {
2344 allowed += 1;
2345 }
2346 }
2347 allowed
2348 }
2349 };
2350
2351 fn trial<F: FnMut() -> usize>(mut test: F) -> usize {
2352 let mut results = Vec::new();
2353 let n = 1000;
2354
2355 for _ in 0..n {
2356 results.push(test());
2357 }
2358
2359 let n = results.len();
2360 let sum: usize = results.iter().sum();
2361
2362 sum / n
2363 }
2364
2365 let all_peers = authorities.iter().chain(full_nodes.iter()).cloned().collect();
2366
2367 assert!(trial(test(1.0, &authorities)) >= LUCKY_PEERS / 2);
2371 assert_eq!(trial(test(1.0, &all_peers)), LUCKY_PEERS);
2372
2373 assert!(trial(test(PROPAGATION_SOME * 1.1, &authorities)) >= LUCKY_PEERS);
2378 assert_eq!(
2379 trial(test(2.0, &all_peers)),
2380 LUCKY_PEERS + (all_peers.len() as f64).sqrt() as usize,
2381 );
2382
2383 assert_eq!(trial(test(PROPAGATION_ALL * 1.1, &all_peers)), all_peers.len());
2386 }
2387
2388 #[test]
2389 fn never_gossips_round_messages_to_light_clients() {
2390 let config = config();
2391 let round_duration = config.gossip_duration * ROUND_DURATION;
2392 let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2393
2394 val.note_set(SetId(0), Vec::new(), |_, _| {});
2396
2397 let light_peer = PeerId::random();
2399
2400 val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2401
2402 assert!(!val.message_allowed()(
2403 &light_peer,
2404 MessageIntent::Broadcast,
2405 &communication::round_topic::<Block>(1, 0),
2406 &[],
2407 ));
2408
2409 val.inner.write().local_view.as_mut().unwrap().round_start =
2412 Instant::now() - round_duration * 10;
2413
2414 assert!(!val.message_allowed()(
2417 &light_peer,
2418 MessageIntent::Broadcast,
2419 &communication::round_topic::<Block>(1, 0),
2420 &[],
2421 ));
2422
2423 val.inner
2425 .write()
2426 .peers
2427 .update_peer_state(
2428 &light_peer,
2429 NeighborPacket { round: Round(1), set_id: SetId(0), commit_finalized_height: 1 },
2430 )
2431 .unwrap();
2432
2433 val.note_commit_finalized(Round(1), SetId(0), 2, |_, _| {});
2434
2435 let commit = {
2436 let commit = finality_grandpa::CompactCommit {
2437 target_hash: H256::random(),
2438 target_number: 2,
2439 precommits: Vec::new(),
2440 auth_data: Vec::new(),
2441 };
2442
2443 communication::gossip::GossipMessage::<Block>::Commit(
2444 communication::gossip::FullCommitMessage {
2445 round: Round(2),
2446 set_id: SetId(0),
2447 message: commit,
2448 },
2449 )
2450 .encode()
2451 };
2452
2453 assert!(val.message_allowed()(
2455 &light_peer,
2456 MessageIntent::Broadcast,
2457 &communication::global_topic::<Block>(0),
2458 &commit,
2459 ));
2460 }
2461
2462 #[test]
2463 fn only_gossip_commits_to_peers_on_same_set() {
2464 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2465
2466 val.note_set(SetId(1), Vec::new(), |_, _| {});
2468
2469 let peer1 = PeerId::random();
2471
2472 val.inner.write().peers.new_peer(peer1, ObservedRole::Authority);
2473
2474 val.inner
2475 .write()
2476 .peers
2477 .update_peer_state(
2478 &peer1,
2479 NeighborPacket { round: Round(1), set_id: SetId(1), commit_finalized_height: 1 },
2480 )
2481 .unwrap();
2482
2483 let peer2 = PeerId::random();
2485 val.inner.write().peers.new_peer(peer2, ObservedRole::Authority);
2486
2487 let commit = {
2490 let commit = finality_grandpa::CompactCommit {
2491 target_hash: H256::random(),
2492 target_number: 2,
2493 precommits: Vec::new(),
2494 auth_data: Vec::new(),
2495 };
2496
2497 communication::gossip::GossipMessage::<Block>::Commit(
2498 communication::gossip::FullCommitMessage {
2499 round: Round(1),
2500 set_id: SetId(1),
2501 message: commit,
2502 },
2503 )
2504 .encode()
2505 };
2506
2507 val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
2509
2510 let mut message_allowed = val.message_allowed();
2511
2512 assert!(message_allowed(
2514 &peer1,
2515 MessageIntent::Broadcast,
2516 &communication::global_topic::<Block>(1),
2517 &commit,
2518 ));
2519
2520 assert!(!message_allowed(
2523 &peer2,
2524 MessageIntent::Broadcast,
2525 &communication::global_topic::<Block>(1),
2526 &commit,
2527 ));
2528 }
2529
2530 #[test]
2531 fn expire_commits_from_older_rounds() {
2532 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2533
2534 let commit = |round, set_id, target_number| {
2535 let commit = finality_grandpa::CompactCommit {
2536 target_hash: H256::random(),
2537 target_number,
2538 precommits: Vec::new(),
2539 auth_data: Vec::new(),
2540 };
2541
2542 communication::gossip::GossipMessage::<Block>::Commit(
2543 communication::gossip::FullCommitMessage {
2544 round: Round(round),
2545 set_id: SetId(set_id),
2546 message: commit,
2547 },
2548 )
2549 .encode()
2550 };
2551
2552 val.note_set(SetId(1), Vec::new(), |_, _| {});
2554
2555 val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
2558
2559 let mut message_expired = val.message_expired();
2560
2561 assert!(!message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 2),));
2564
2565 assert!(message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 1)));
2567
2568 assert!(message_expired(communication::global_topic::<Block>(1), &commit(0, 1, 2)));
2570 }
2571
2572 #[test]
2573 fn allow_noting_different_authorities_for_same_set() {
2574 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2575
2576 let a1 = vec![UncheckedFrom::unchecked_from([0; 32])];
2577 val.note_set(SetId(1), a1.clone(), |_, _| {});
2578
2579 assert_eq!(val.inner().read().authorities, a1);
2580
2581 let a2 =
2582 vec![UncheckedFrom::unchecked_from([1; 32]), UncheckedFrom::unchecked_from([2; 32])];
2583 val.note_set(SetId(1), a2.clone(), |_, _| {});
2584
2585 assert_eq!(val.inner().read().authorities, a2);
2586 }
2587
2588 #[test]
2589 fn sends_neighbor_packets_to_all_peers_when_starting_a_new_round() {
2590 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2591
2592 val.note_set(SetId(1), Vec::new(), |_, _| {});
2594
2595 let authority_peer = PeerId::random();
2596 let full_peer = PeerId::random();
2597 let light_peer = PeerId::random();
2598
2599 val.inner.write().peers.new_peer(authority_peer, ObservedRole::Authority);
2600 val.inner.write().peers.new_peer(full_peer, ObservedRole::Full);
2601 val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2602
2603 val.note_round(Round(2), |peers, message| {
2604 assert_eq!(peers.len(), 3);
2605 assert!(peers.contains(&authority_peer));
2606 assert!(peers.contains(&full_peer));
2607 assert!(peers.contains(&light_peer));
2608 assert!(matches!(message, NeighborPacket { set_id: SetId(1), round: Round(2), .. }));
2609 });
2610 }
2611
2612 #[test]
2613 fn sends_neighbor_packets_to_all_peers_when_starting_a_new_set() {
2614 let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2615
2616 val.note_set(SetId(1), Vec::new(), |_, _| {});
2618
2619 let authority_peer = PeerId::random();
2620 let full_peer = PeerId::random();
2621 let light_peer = PeerId::random();
2622
2623 val.inner.write().peers.new_peer(authority_peer, ObservedRole::Authority);
2624 val.inner.write().peers.new_peer(full_peer, ObservedRole::Full);
2625 val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2626
2627 val.note_set(SetId(2), Vec::new(), |peers, message| {
2628 assert_eq!(peers.len(), 3);
2629 assert!(peers.contains(&authority_peer));
2630 assert!(peers.contains(&full_peer));
2631 assert!(peers.contains(&light_peer));
2632 assert!(matches!(message, NeighborPacket { set_id: SetId(2), round: Round(1), .. }));
2633 });
2634 }
2635}