referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/communication/
gossip.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Gossip and politeness for polite-grandpa.
20//!
21//! This module implements the following message types:
22//! #### Neighbor Packet
23//!
24//! The neighbor packet is sent to only our neighbors. It contains this information
25//!
26//!   - Current Round
27//!   - Current voter set ID
28//!   - Last finalized hash from commit messages.
29//!
30//! If a peer is at a given voter set, it is impolite to send messages from
31//! an earlier voter set. It is extremely impolite to send messages
32//! from a future voter set. "future-set" messages can be dropped and ignored.
33//!
34//! If a peer is at round r, is impolite to send messages about r-2 or earlier and extremely
35//! impolite to send messages about r+1 or later. "future-round" messages can
36//!  be dropped and ignored.
37//!
38//! It is impolite to send a neighbor packet which moves backwards or does not progress
39//! protocol state.
40//!
41//! This is beneficial if it conveys some progress in the protocol state of the peer.
42//!
43//! #### Prevote / Precommit
44//!
45//! These are votes within a round. Noting that we receive these messages
46//! from our peers who are not necessarily voters, we have to account the benefit
47//! based on what they might have seen.
48//!
49//! #### Propose
50//!
51//! This is a broadcast by a known voter of the last-round estimate.
52//!
53//! #### Commit
54//!
55//! These are used to announce past agreement of finality.
56//!
57//! It is impolite to send commits which are earlier than the last commit
58//! sent. It is especially impolite to send commits which are invalid, or from
59//! a different Set ID than the receiving peer has indicated.
60//!
61//! Sending a commit is polite when it may finalize something that the receiving peer
62//! was not aware of.
63//!
64//! #### Catch Up
65//!
66//! These allow a peer to request another peer, which they perceive to be in a
67//! later round, to provide all the votes necessary to complete a given round
68//! `R`.
69//!
70//! It is impolite to send a catch up request for a round `R` to a peer whose
71//! announced view is behind `R`. It is also impolite to send a catch up request
72//! to a peer in a new different Set ID.
73//!
74//! The logic for issuing and tracking pending catch up requests is implemented
75//! in the `GossipValidator`. A catch up request is issued anytime we see a
76//! neighbor packet from a peer at a round `CATCH_UP_THRESHOLD` higher than at
77//! we are.
78//!
79//! ## Expiration
80//!
81//! We keep some amount of recent rounds' messages, but do not accept new ones from rounds
82//! older than our current_round - 1.
83//!
84//! ## Message Validation
85//!
86//! We only send polite messages to peers,
87
88use ahash::{AHashMap, AHashSet};
89use codec::{Decode, DecodeAll, Encode};
90use log::{debug, trace};
91use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
92use rand::seq::SliceRandom;
93use sc_network::ReputationChange;
94use sc_network_common::role::ObservedRole;
95use sc_network_gossip::{MessageIntent, ValidatorContext};
96use sc_network_types::PeerId;
97use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG};
98use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
99use sp_consensus_grandpa::AuthorityId;
100use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
101
102use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD};
103use crate::{environment, CatchUp, CompactCommit, SignedMessage, LOG_TARGET};
104
105use std::{
106	collections::{HashSet, VecDeque},
107	time::{Duration, Instant},
108};
109
110const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
111const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(45);
112const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
113/// Maximum number of rounds we are behind a peer before issuing a
114/// catch up request.
115const CATCH_UP_THRESHOLD: u64 = 2;
116
117/// The total round duration measured in periods of gossip duration:
118/// 2 gossip durations for prevote timer
119/// 2 gossip durations for precommit timer
120/// 1 gossip duration for precommits to spread
121const ROUND_DURATION: u32 = 5;
122
123/// The period, measured in rounds, since the latest round start, after which we will start
124/// propagating gossip messages to more nodes than just the lucky ones.
125const PROPAGATION_SOME: f32 = 1.5;
126
127/// The period, measured in rounds, since the latest round start, after which we will start
128/// propagating gossip messages to all the nodes we are connected to.
129const PROPAGATION_ALL: f32 = 3.0;
130
131/// Assuming a network of 3000 nodes, using a fanout of 4, after about 6 iterations
132/// of gossip a message has very likely reached all nodes on the network (`log4(3000)`).
133const LUCKY_PEERS: usize = 4;
134
135type Report = (PeerId, ReputationChange);
136
137/// An outcome of examining a message.
138#[derive(Debug, PartialEq, Clone, Copy)]
139enum Consider {
140	/// Accept the message.
141	Accept,
142	/// Message is too early. Reject.
143	RejectPast,
144	/// Message is from the future. Reject.
145	RejectFuture,
146	/// Message cannot be evaluated. Reject.
147	RejectOutOfScope,
148}
149
150/// A view of protocol state.
151#[derive(Debug)]
152struct View<N> {
153	round: Round,                 // the current round we are at.
154	set_id: SetId,                // the current voter set id.
155	last_commit: Option<N>,       // commit-finalized block height, if any.
156	last_update: Option<Instant>, // last time we heard from peer, used for spamming detection.
157}
158
159impl<N> Default for View<N> {
160	fn default() -> Self {
161		View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None }
162	}
163}
164
165impl<N: Ord> View<N> {
166	/// Consider a round and set ID combination under a current view.
167	fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
168		// only from current set
169		if set_id < self.set_id {
170			return Consider::RejectPast
171		}
172		if set_id > self.set_id {
173			return Consider::RejectFuture
174		}
175
176		// only r-1 ... r+1
177		if round.0 > self.round.0.saturating_add(1) {
178			return Consider::RejectFuture
179		}
180		if round.0 < self.round.0.saturating_sub(1) {
181			return Consider::RejectPast
182		}
183
184		Consider::Accept
185	}
186
187	/// Consider a set-id global message. Rounds are not taken into account, but are implicitly
188	/// because we gate on finalization of a further block than a previous commit.
189	fn consider_global(&self, set_id: SetId, number: N) -> Consider {
190		// only from current set
191		if set_id < self.set_id {
192			return Consider::RejectPast
193		}
194		if set_id > self.set_id {
195			return Consider::RejectFuture
196		}
197
198		// only commits which claim to prove a higher block number than
199		// the one we're aware of.
200		match self.last_commit {
201			None => Consider::Accept,
202			Some(ref num) =>
203				if num < &number {
204					Consider::Accept
205				} else {
206					Consider::RejectPast
207				},
208		}
209	}
210}
211
212/// A local view of protocol state. Similar to `View` but we additionally track
213/// the round and set id at which the last commit was observed, and the instant
214/// at which the current round started.
215struct LocalView<N> {
216	round: Round,
217	set_id: SetId,
218	last_commit: Option<(N, Round, SetId)>,
219	round_start: Instant,
220}
221
222impl<N> LocalView<N> {
223	/// Creates a new `LocalView` at the given set id and round.
224	fn new(set_id: SetId, round: Round) -> LocalView<N> {
225		LocalView { set_id, round, last_commit: None, round_start: Instant::now() }
226	}
227
228	/// Converts the local view to a `View` discarding round and set id
229	/// information about the last commit.
230	fn as_view(&self) -> View<&N> {
231		View {
232			round: self.round,
233			set_id: self.set_id,
234			last_commit: self.last_commit_height(),
235			last_update: None,
236		}
237	}
238
239	/// Update the set ID. implies a reset to round 1.
240	fn update_set(&mut self, set_id: SetId) {
241		if set_id != self.set_id {
242			self.set_id = set_id;
243			self.round = Round(1);
244			self.round_start = Instant::now();
245		}
246	}
247
248	/// Updates the current round.
249	fn update_round(&mut self, round: Round) {
250		self.round = round;
251		self.round_start = Instant::now();
252	}
253
254	/// Returns the height of the block that the last observed commit finalizes.
255	fn last_commit_height(&self) -> Option<&N> {
256		self.last_commit.as_ref().map(|(number, _, _)| number)
257	}
258}
259
260const KEEP_RECENT_ROUNDS: usize = 3;
261
262/// Tracks gossip topics that we are keeping messages for. We keep topics of:
263///
264/// - the last `KEEP_RECENT_ROUNDS` complete GRANDPA rounds,
265///
266/// - the topic for the current and next round,
267///
268/// - and a global topic for commit and catch-up messages.
269struct KeepTopics<B: BlockT> {
270	current_set: SetId,
271	rounds: VecDeque<(Round, SetId)>,
272	reverse_map: AHashMap<B::Hash, (Option<Round>, SetId)>,
273}
274
275impl<B: BlockT> KeepTopics<B> {
276	fn new() -> Self {
277		KeepTopics {
278			current_set: SetId(0),
279			rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2),
280			reverse_map: Default::default(),
281		}
282	}
283
284	fn push(&mut self, round: Round, set_id: SetId) {
285		self.current_set = std::cmp::max(self.current_set, set_id);
286
287		// under normal operation the given round is already tracked (since we
288		// track one round ahead). if we skip rounds (with a catch up) the given
289		// round topic might not be tracked yet.
290		if !self.rounds.contains(&(round, set_id)) {
291			self.rounds.push_back((round, set_id));
292		}
293
294		// we also accept messages for the next round
295		self.rounds.push_back((Round(round.0.saturating_add(1)), set_id));
296
297		// the 2 is for the current and next round.
298		while self.rounds.len() > KEEP_RECENT_ROUNDS + 2 {
299			let _ = self.rounds.pop_front();
300		}
301
302		let mut map = AHashMap::with_capacity(KEEP_RECENT_ROUNDS + 3);
303		map.insert(super::global_topic::<B>(self.current_set.0), (None, self.current_set));
304
305		for &(round, set) in &self.rounds {
306			map.insert(super::round_topic::<B>(round.0, set.0), (Some(round), set));
307		}
308
309		self.reverse_map = map;
310	}
311
312	fn topic_info(&self, topic: &B::Hash) -> Option<(Option<Round>, SetId)> {
313		self.reverse_map.get(topic).cloned()
314	}
315}
316
317// topics to send to a neighbor based on their view.
318fn neighbor_topics<B: BlockT>(view: &View<NumberFor<B>>) -> Vec<B::Hash> {
319	let s = view.set_id;
320	let mut topics =
321		vec![super::global_topic::<B>(s.0), super::round_topic::<B>(view.round.0, s.0)];
322
323	if view.round.0 != 0 {
324		let r = Round(view.round.0 - 1);
325		topics.push(super::round_topic::<B>(r.0, s.0))
326	}
327
328	topics
329}
330
331/// Grandpa gossip message type.
332/// This is the root type that gets encoded and sent on the network.
333#[derive(Debug, Encode, Decode)]
334pub(super) enum GossipMessage<Block: BlockT> {
335	/// Grandpa message with round and set info.
336	Vote(VoteMessage<Block>),
337	/// Grandpa commit message with round and set info.
338	Commit(FullCommitMessage<Block>),
339	/// A neighbor packet. Not repropagated.
340	Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
341	/// Grandpa catch up request message with round and set info. Not repropagated.
342	CatchUpRequest(CatchUpRequestMessage),
343	/// Grandpa catch up message with round and set info. Not repropagated.
344	CatchUp(FullCatchUpMessage<Block>),
345}
346
347impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
348	fn from(neighbor: NeighborPacket<NumberFor<Block>>) -> Self {
349		GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor))
350	}
351}
352
353/// Network level vote message with topic information.
354#[derive(Debug, Encode, Decode)]
355pub(super) struct VoteMessage<Block: BlockT> {
356	/// The round this message is from.
357	pub(super) round: Round,
358	/// The voter set ID this message is from.
359	pub(super) set_id: SetId,
360	/// The message itself.
361	pub(super) message: SignedMessage<Block::Header>,
362}
363
364/// Network level commit message with topic information.
365#[derive(Debug, Encode, Decode)]
366pub(super) struct FullCommitMessage<Block: BlockT> {
367	/// The round this message is from.
368	pub(super) round: Round,
369	/// The voter set ID this message is from.
370	pub(super) set_id: SetId,
371	/// The compact commit message.
372	pub(super) message: CompactCommit<Block::Header>,
373}
374
375/// V1 neighbor packet. Neighbor packets are sent from nodes to their peers
376/// and are not repropagated. These contain information about the node's state.
377#[derive(Debug, Encode, Decode, Clone)]
378pub(super) struct NeighborPacket<N> {
379	/// The round the node is currently at.
380	pub(super) round: Round,
381	/// The set ID the node is currently at.
382	pub(super) set_id: SetId,
383	/// The highest finalizing commit observed.
384	pub(super) commit_finalized_height: N,
385}
386
387/// A versioned neighbor packet.
388#[derive(Debug, Encode, Decode)]
389pub(super) enum VersionedNeighborPacket<N> {
390	#[codec(index = 1)]
391	V1(NeighborPacket<N>),
392}
393
394impl<N> VersionedNeighborPacket<N> {
395	fn into_neighbor_packet(self) -> NeighborPacket<N> {
396		match self {
397			VersionedNeighborPacket::V1(p) => p,
398		}
399	}
400}
401
402/// A catch up request for a given round (or any further round) localized by set id.
403#[derive(Clone, Debug, Encode, Decode)]
404pub(super) struct CatchUpRequestMessage {
405	/// The round that we want to catch up to.
406	pub(super) round: Round,
407	/// The voter set ID this message is from.
408	pub(super) set_id: SetId,
409}
410
411/// Network level catch up message with topic information.
412#[derive(Debug, Encode, Decode)]
413pub(super) struct FullCatchUpMessage<Block: BlockT> {
414	/// The voter set ID this message is from.
415	pub(super) set_id: SetId,
416	/// The compact commit message.
417	pub(super) message: CatchUp<Block::Header>,
418}
419
420/// Misbehavior that peers can perform.
421///
422/// `cost` gives a cost that can be used to perform cost/benefit analysis of a
423/// peer.
424#[derive(Clone, Copy, Debug, PartialEq)]
425pub(super) enum Misbehavior {
426	// invalid neighbor message, considering the last one.
427	InvalidViewChange,
428	// duplicate neighbor message.
429	DuplicateNeighborMessage,
430	// could not decode neighbor message. bytes-length of the packet.
431	UndecodablePacket(i32),
432	// Bad catch up message (invalid signatures).
433	BadCatchUpMessage { signatures_checked: i32 },
434	// Bad commit message
435	BadCommitMessage { signatures_checked: i32, blocks_loaded: i32, equivocations_caught: i32 },
436	// A message received that's from the future relative to our view.
437	// always misbehavior.
438	FutureMessage,
439	// A message received that cannot be evaluated relative to our view.
440	// This happens before we have a view and have sent out neighbor packets.
441	// always misbehavior.
442	OutOfScopeMessage,
443}
444
445impl Misbehavior {
446	pub(super) fn cost(&self) -> ReputationChange {
447		use Misbehavior::*;
448
449		match *self {
450			InvalidViewChange => cost::INVALID_VIEW_CHANGE,
451			DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE,
452			UndecodablePacket(bytes) => ReputationChange::new(
453				bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
454				"Grandpa: Bad packet",
455			),
456			BadCatchUpMessage { signatures_checked } => ReputationChange::new(
457				cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked),
458				"Grandpa: Bad cath-up message",
459			),
460			BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => {
461				let cost = cost::PER_SIGNATURE_CHECKED
462					.saturating_mul(signatures_checked)
463					.saturating_add(cost::PER_BLOCK_LOADED.saturating_mul(blocks_loaded));
464
465				let benefit = equivocations_caught.saturating_mul(benefit::PER_EQUIVOCATION);
466
467				ReputationChange::new(
468					(benefit as i32).saturating_add(cost as i32),
469					"Grandpa: Bad commit",
470				)
471			},
472			FutureMessage => cost::FUTURE_MESSAGE,
473			OutOfScopeMessage => cost::OUT_OF_SCOPE_MESSAGE,
474		}
475	}
476}
477
478#[derive(Debug)]
479struct PeerInfo<N> {
480	view: View<N>,
481	roles: ObservedRole,
482}
483
484impl<N> PeerInfo<N> {
485	fn new(roles: ObservedRole) -> Self {
486		PeerInfo { view: View::default(), roles }
487	}
488}
489
490/// The peers we're connected to in gossip.
491struct Peers<N> {
492	inner: AHashMap<PeerId, PeerInfo<N>>,
493	/// The randomly picked set of `LUCKY_PEERS` we'll gossip to in the first stage of round
494	/// gossiping.
495	first_stage_peers: AHashSet<PeerId>,
496	/// The randomly picked set of peers we'll gossip to in the second stage of gossiping if the
497	/// first stage didn't allow us to spread the voting data enough to conclude the round. This
498	/// set should have size `sqrt(connected_peers)`.
499	second_stage_peers: HashSet<PeerId>,
500	/// The randomly picked set of `LUCKY_PEERS` light clients we'll gossip commit messages to.
501	lucky_light_peers: HashSet<PeerId>,
502	/// Neighbor packet rebroadcast period --- we reduce the reputation of peers sending duplicate
503	/// packets too often.
504	neighbor_rebroadcast_period: Duration,
505}
506
507impl<N: Ord> Peers<N> {
508	fn new(neighbor_rebroadcast_period: Duration) -> Self {
509		Peers {
510			inner: Default::default(),
511			first_stage_peers: Default::default(),
512			second_stage_peers: Default::default(),
513			lucky_light_peers: Default::default(),
514			neighbor_rebroadcast_period,
515		}
516	}
517
518	fn new_peer(&mut self, who: PeerId, role: ObservedRole) {
519		match role {
520			ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => {
521				self.first_stage_peers.insert(who);
522			},
523			ObservedRole::Authority if self.second_stage_peers.len() < LUCKY_PEERS => {
524				self.second_stage_peers.insert(who);
525			},
526			ObservedRole::Light if self.lucky_light_peers.len() < LUCKY_PEERS => {
527				self.lucky_light_peers.insert(who);
528			},
529			_ => {},
530		}
531
532		self.inner.insert(who, PeerInfo::new(role));
533	}
534
535	fn peer_disconnected(&mut self, who: &PeerId) {
536		self.inner.remove(who);
537		// This does not happen often enough compared to round duration,
538		// so we don't reshuffle.
539		self.first_stage_peers.remove(who);
540		self.second_stage_peers.remove(who);
541		self.lucky_light_peers.remove(who);
542	}
543
544	// returns a reference to the new view, if the peer is known.
545	fn update_peer_state(
546		&mut self,
547		who: &PeerId,
548		update: NeighborPacket<N>,
549	) -> Result<Option<&View<N>>, Misbehavior> {
550		let Some(peer) = self.inner.get_mut(who) else { return Ok(None) };
551
552		let invalid_change = peer.view.set_id > update.set_id ||
553			peer.view.round > update.round && peer.view.set_id == update.set_id ||
554			peer.view.last_commit.as_ref() > Some(&update.commit_finalized_height);
555
556		if invalid_change {
557			return Err(Misbehavior::InvalidViewChange)
558		}
559
560		let now = Instant::now();
561		let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) ==
562			(peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref());
563
564		if duplicate_packet {
565			if let Some(last_update) = peer.view.last_update {
566				if now < last_update + self.neighbor_rebroadcast_period / 2 {
567					return Err(Misbehavior::DuplicateNeighborMessage)
568				}
569			}
570		}
571
572		peer.view = View {
573			round: update.round,
574			set_id: update.set_id,
575			last_commit: Some(update.commit_finalized_height),
576			last_update: Some(now),
577		};
578
579		trace!(
580			target: LOG_TARGET,
581			"Peer {} updated view. Now at {:?}, {:?}",
582			who,
583			peer.view.round,
584			peer.view.set_id
585		);
586
587		Ok(Some(&peer.view))
588	}
589
590	fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> {
591		let peer = match self.inner.get_mut(who) {
592			None => return Ok(()),
593			Some(p) => p,
594		};
595
596		// this doesn't allow a peer to send us unlimited commits with the
597		// same height, because there is still a misbehavior condition based on
598		// sending commits that are <= the best we are aware of.
599		if peer.view.last_commit.as_ref() > Some(&new_height) {
600			return Err(Misbehavior::InvalidViewChange)
601		}
602
603		peer.view.last_commit = Some(new_height);
604
605		Ok(())
606	}
607
608	fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
609		self.inner.get(who)
610	}
611
612	fn reshuffle(&mut self) {
613		// we want to randomly select peers into three sets according to the following logic:
614		// - first set: LUCKY_PEERS random peers where at least LUCKY_PEERS/2 are authorities
615		//   (unless
616		// we're not connected to that many authorities)
617		// - second set: max(LUCKY_PEERS, sqrt(peers)) peers where at least LUCKY_PEERS are
618		//   authorities.
619		// - third set: LUCKY_PEERS random light client peers
620
621		let shuffled_peers = {
622			let mut peers =
623				self.inner.iter().map(|(peer_id, info)| (*peer_id, info)).collect::<Vec<_>>();
624
625			peers.shuffle(&mut rand::thread_rng());
626			peers
627		};
628
629		let shuffled_authorities = shuffled_peers.iter().filter_map(|(peer_id, info)| {
630			if matches!(info.roles, ObservedRole::Authority) {
631				Some(peer_id)
632			} else {
633				None
634			}
635		});
636
637		let mut first_stage_peers = AHashSet::new();
638		let mut second_stage_peers = HashSet::new();
639
640		// we start by allocating authorities to the first stage set and when the minimum of
641		// `LUCKY_PEERS / 2` is filled we start allocating to the second stage set.
642		let half_lucky = LUCKY_PEERS / 2;
643		let one_and_a_half_lucky = LUCKY_PEERS + half_lucky;
644		for (n_authorities_added, peer_id) in shuffled_authorities.enumerate() {
645			if n_authorities_added < half_lucky {
646				first_stage_peers.insert(*peer_id);
647			} else if n_authorities_added < one_and_a_half_lucky {
648				second_stage_peers.insert(*peer_id);
649			} else {
650				break
651			}
652		}
653
654		// fill up first and second sets with remaining peers (either full or authorities)
655		// prioritizing filling the first set over the second.
656		let n_second_stage_peers = LUCKY_PEERS.max((shuffled_peers.len() as f32).sqrt() as usize);
657		for (peer_id, info) in &shuffled_peers {
658			if info.roles.is_light() {
659				continue
660			}
661
662			if first_stage_peers.len() < LUCKY_PEERS {
663				first_stage_peers.insert(*peer_id);
664				second_stage_peers.remove(peer_id);
665			} else if second_stage_peers.len() < n_second_stage_peers {
666				if !first_stage_peers.contains(peer_id) {
667					second_stage_peers.insert(*peer_id);
668				}
669			} else {
670				break
671			}
672		}
673
674		// pick `LUCKY_PEERS` random light peers
675		let lucky_light_peers = shuffled_peers
676			.into_iter()
677			.filter_map(|(peer_id, info)| if info.roles.is_light() { Some(peer_id) } else { None })
678			.take(LUCKY_PEERS)
679			.collect();
680
681		self.first_stage_peers = first_stage_peers;
682		self.second_stage_peers = second_stage_peers;
683		self.lucky_light_peers = lucky_light_peers;
684	}
685}
686
687#[derive(Debug, PartialEq)]
688pub(super) enum Action<H> {
689	// repropagate under given topic, to the given peers, applying cost/benefit to originator.
690	Keep(H, ReputationChange),
691	// discard and process.
692	ProcessAndDiscard(H, ReputationChange),
693	// discard, applying cost/benefit to originator.
694	Discard(ReputationChange),
695}
696
697/// State of catch up request handling.
698#[derive(Debug)]
699enum PendingCatchUp {
700	/// No pending catch up requests.
701	None,
702	/// Pending catch up request which has not been answered yet.
703	Requesting { who: PeerId, request: CatchUpRequestMessage, instant: Instant },
704	/// Pending catch up request that was answered and is being processed.
705	Processing { instant: Instant },
706}
707
708/// Configuration for the round catch-up mechanism.
709enum CatchUpConfig {
710	/// Catch requests are enabled, our node will issue them whenever it sees a
711	/// neighbor packet for a round further than `CATCH_UP_THRESHOLD`. If
712	/// `only_from_authorities` is set, the node will only send catch-up
713	/// requests to other authorities it is connected to. This is useful if the
714	/// GRANDPA observer protocol is live on the network, in which case full
715	/// nodes (non-authorities) don't have the necessary round data to answer
716	/// catch-up requests.
717	Enabled { only_from_authorities: bool },
718	/// Catch-up requests are disabled, our node will never issue them. This is
719	/// useful for the GRANDPA observer mode, where we are only interested in
720	/// commit messages and don't need to follow the full round protocol.
721	Disabled,
722}
723
724impl CatchUpConfig {
725	fn enabled(only_from_authorities: bool) -> CatchUpConfig {
726		CatchUpConfig::Enabled { only_from_authorities }
727	}
728
729	fn disabled() -> CatchUpConfig {
730		CatchUpConfig::Disabled
731	}
732
733	fn request_allowed<N>(&self, peer: &PeerInfo<N>) -> bool {
734		match self {
735			CatchUpConfig::Disabled => false,
736			CatchUpConfig::Enabled { only_from_authorities, .. } => match peer.roles {
737				ObservedRole::Authority => true,
738				ObservedRole::Light => false,
739				ObservedRole::Full => !only_from_authorities,
740			},
741		}
742	}
743}
744
745struct Inner<Block: BlockT> {
746	local_view: Option<LocalView<NumberFor<Block>>>,
747	peers: Peers<NumberFor<Block>>,
748	live_topics: KeepTopics<Block>,
749	authorities: Vec<AuthorityId>,
750	config: crate::Config,
751	next_rebroadcast: Instant,
752	pending_catch_up: PendingCatchUp,
753	catch_up_config: CatchUpConfig,
754}
755
756type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
757
758impl<Block: BlockT> Inner<Block> {
759	fn new(config: crate::Config) -> Self {
760		let catch_up_config = if config.observer_enabled {
761			if config.local_role.is_authority() {
762				// since the observer protocol is enabled, we will only issue
763				// catch-up requests if we are an authority (and only to other
764				// authorities).
765				CatchUpConfig::enabled(true)
766			} else {
767				// otherwise, we are running the observer protocol and don't
768				// care about catch-up requests.
769				CatchUpConfig::disabled()
770			}
771		} else {
772			// if the observer protocol isn't enabled and we're not a light client, then any full
773			// node should be able to answer catch-up requests.
774			CatchUpConfig::enabled(false)
775		};
776
777		Inner {
778			local_view: None,
779			peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD),
780			live_topics: KeepTopics::new(),
781			next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
782			authorities: Vec::new(),
783			pending_catch_up: PendingCatchUp::None,
784			catch_up_config,
785			config,
786		}
787	}
788
789	/// Note a round in the current set has started. Does nothing if the last
790	/// call to the function was with the same `round`.
791	fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
792		let local_view = self.local_view.as_mut()?;
793		if local_view.round == round {
794			// Do not send neighbor packets out if `round` has not changed ---
795			// such behavior is punishable.
796			return None
797		}
798
799		let set_id = local_view.set_id;
800
801		debug!(
802			target: LOG_TARGET,
803			"Voter {} noting beginning of round {:?} to network.",
804			self.config.name(),
805			(round, set_id)
806		);
807
808		local_view.update_round(round);
809
810		self.live_topics.push(round, set_id);
811		self.peers.reshuffle();
812
813		self.multicast_neighbor_packet()
814	}
815
816	/// Note that a voter set with given ID has started. Does nothing if the last
817	/// call to the function was with the same `set_id`.
818	fn note_set(&mut self, set_id: SetId, authorities: Vec<AuthorityId>) -> MaybeMessage<Block> {
819		let local_view = match self.local_view {
820			ref mut x @ None => x.get_or_insert(LocalView::new(set_id, Round(1))),
821			Some(ref mut v) => {
822				if v.set_id == set_id {
823					let diff_authorities = self.authorities.iter().collect::<HashSet<_>>() !=
824						authorities.iter().collect::<HashSet<_>>();
825
826					if diff_authorities {
827						debug!(
828							target: LOG_TARGET,
829							"Gossip validator noted set {:?} twice with different authorities. \
830							Was the authority set hard forked?",
831							set_id,
832						);
833
834						self.authorities = authorities;
835					}
836
837					// Do not send neighbor packets out if the `set_id` has not changed ---
838					// such behavior is punishable.
839					return None
840				} else {
841					v
842				}
843			},
844		};
845
846		local_view.update_set(set_id);
847		self.live_topics.push(Round(1), set_id);
848		self.authorities = authorities;
849
850		self.multicast_neighbor_packet()
851	}
852
853	/// Note that we've imported a commit finalizing a given block. Does nothing if the last
854	/// call to the function was with the same or higher `finalized` number.
855	/// `set_id` & `round` are the ones the commit message is from.
856	fn note_commit_finalized(
857		&mut self,
858		round: Round,
859		set_id: SetId,
860		finalized: NumberFor<Block>,
861	) -> MaybeMessage<Block> {
862		let local_view = self.local_view.as_mut()?;
863		if local_view.last_commit_height() < Some(&finalized) {
864			local_view.last_commit = Some((finalized, round, set_id));
865		} else {
866			return None
867		}
868
869		self.multicast_neighbor_packet()
870	}
871
872	fn consider_vote(&self, round: Round, set_id: SetId) -> Consider {
873		self.local_view
874			.as_ref()
875			.map(LocalView::as_view)
876			.map(|v| v.consider_vote(round, set_id))
877			.unwrap_or(Consider::RejectOutOfScope)
878	}
879
880	fn consider_global(&self, set_id: SetId, number: NumberFor<Block>) -> Consider {
881		self.local_view
882			.as_ref()
883			.map(LocalView::as_view)
884			.map(|v| v.consider_global(set_id, &number))
885			.unwrap_or(Consider::RejectOutOfScope)
886	}
887
888	fn cost_past_rejection(
889		&self,
890		_who: &PeerId,
891		_round: Round,
892		_set_id: SetId,
893	) -> ReputationChange {
894		// hardcoded for now.
895		cost::PAST_REJECTION
896	}
897
898	fn validate_round_message(
899		&self,
900		who: &PeerId,
901		full: &VoteMessage<Block>,
902	) -> Action<Block::Hash> {
903		match self.consider_vote(full.round, full.set_id) {
904			Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
905			Consider::RejectOutOfScope =>
906				return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
907			Consider::RejectPast =>
908				return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
909			Consider::Accept => {},
910		}
911
912		// ensure authority is part of the set.
913		if !self.authorities.contains(&full.message.id) {
914			debug!(target: LOG_TARGET, "Message from unknown voter: {}", full.message.id);
915			telemetry!(
916				self.config.telemetry;
917				CONSENSUS_DEBUG;
918				"afg.bad_msg_signature";
919				"signature" => ?full.message.id,
920			);
921			return Action::Discard(cost::UNKNOWN_VOTER)
922		}
923
924		if !sp_consensus_grandpa::check_message_signature(
925			&full.message.message,
926			&full.message.id,
927			&full.message.signature,
928			full.round.0,
929			full.set_id.0,
930		)
931		.is_valid()
932		{
933			debug!(target: LOG_TARGET, "Bad message signature {}", full.message.id);
934			telemetry!(
935				self.config.telemetry;
936				CONSENSUS_DEBUG;
937				"afg.bad_msg_signature";
938				"signature" => ?full.message.id,
939			);
940			return Action::Discard(cost::BAD_SIGNATURE)
941		}
942
943		let topic = super::round_topic::<Block>(full.round.0, full.set_id.0);
944		Action::Keep(topic, benefit::ROUND_MESSAGE)
945	}
946
947	fn validate_commit_message(
948		&mut self,
949		who: &PeerId,
950		full: &FullCommitMessage<Block>,
951	) -> Action<Block::Hash> {
952		if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) {
953			return Action::Discard(misbehavior.cost())
954		}
955
956		match self.consider_global(full.set_id, full.message.target_number) {
957			Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()),
958			Consider::RejectPast =>
959				return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)),
960			Consider::RejectOutOfScope =>
961				return Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
962			Consider::Accept => {},
963		}
964
965		if full.message.precommits.len() != full.message.auth_data.len() ||
966			full.message.precommits.is_empty()
967		{
968			debug!(target: LOG_TARGET, "Malformed compact commit");
969			telemetry!(
970				self.config.telemetry;
971				CONSENSUS_DEBUG;
972				"afg.malformed_compact_commit";
973				"precommits_len" => ?full.message.precommits.len(),
974				"auth_data_len" => ?full.message.auth_data.len(),
975				"precommits_is_empty" => ?full.message.precommits.is_empty(),
976			);
977			return Action::Discard(cost::MALFORMED_COMMIT)
978		}
979
980		// always discard commits initially and rebroadcast after doing full
981		// checking.
982		let topic = super::global_topic::<Block>(full.set_id.0);
983		Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT)
984	}
985
986	fn validate_catch_up_message(
987		&mut self,
988		who: &PeerId,
989		full: &FullCatchUpMessage<Block>,
990	) -> Action<Block::Hash> {
991		match &self.pending_catch_up {
992			PendingCatchUp::Requesting { who: peer, request, instant } => {
993				if peer != who {
994					return Action::Discard(Misbehavior::OutOfScopeMessage.cost())
995				}
996
997				if request.set_id != full.set_id {
998					return Action::Discard(cost::MALFORMED_CATCH_UP)
999				}
1000
1001				if request.round.0 > full.message.round_number {
1002					return Action::Discard(cost::MALFORMED_CATCH_UP)
1003				}
1004
1005				if full.message.prevotes.is_empty() || full.message.precommits.is_empty() {
1006					return Action::Discard(cost::MALFORMED_CATCH_UP)
1007				}
1008
1009				// move request to pending processing state, we won't push out
1010				// any catch up requests until we import this one (either with a
1011				// success or failure).
1012				self.pending_catch_up = PendingCatchUp::Processing { instant: *instant };
1013
1014				// always discard catch up messages, they're point-to-point
1015				let topic = super::global_topic::<Block>(full.set_id.0);
1016				Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP)
1017			},
1018			_ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
1019		}
1020	}
1021
1022	fn note_catch_up_message_processed(&mut self) {
1023		match &self.pending_catch_up {
1024			PendingCatchUp::Processing { .. } => {
1025				self.pending_catch_up = PendingCatchUp::None;
1026			},
1027			state => debug!(
1028				target: LOG_TARGET,
1029				"Noted processed catch up message when state was: {:?}", state,
1030			),
1031		}
1032	}
1033
1034	fn handle_catch_up_request(
1035		&mut self,
1036		who: &PeerId,
1037		request: CatchUpRequestMessage,
1038		set_state: &environment::SharedVoterSetState<Block>,
1039	) -> (Option<GossipMessage<Block>>, Action<Block::Hash>) {
1040		let Some(local_view) = &self.local_view else {
1041			return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1042		};
1043
1044		if request.set_id != local_view.set_id {
1045			// NOTE: When we're close to a set change there is potentially a
1046			// race where the peer sent us the request before it observed that
1047			// we had transitioned to a new set. In this case we charge a lower
1048			// cost.
1049			if request.set_id.0.saturating_add(1) == local_view.set_id.0 &&
1050				local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0
1051			{
1052				return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP))
1053			}
1054
1055			return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1056		}
1057
1058		match self.peers.peer(who) {
1059			None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
1060			Some(peer) if peer.view.round >= request.round =>
1061				return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
1062			_ => {},
1063		}
1064
1065		let last_completed_round = set_state.read().last_completed_round();
1066		if last_completed_round.number < request.round.0 {
1067			return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()))
1068		}
1069
1070		trace!(
1071			target: LOG_TARGET,
1072			"Replying to catch-up request for round {} from {} with round {}",
1073			request.round.0,
1074			who,
1075			last_completed_round.number,
1076		);
1077
1078		let mut prevotes = Vec::new();
1079		let mut precommits = Vec::new();
1080
1081		// NOTE: the set of votes stored in `LastCompletedRound` is a minimal
1082		// set of votes, i.e. at most one equivocation is stored per voter. The
1083		// code below assumes this invariant is maintained when creating the
1084		// catch up reply since peers won't accept catch-up messages that have
1085		// too many equivocations (we exceed the fault-tolerance bound).
1086		for vote in last_completed_round.votes {
1087			match vote.message {
1088				finality_grandpa::Message::Prevote(prevote) => {
1089					prevotes.push(finality_grandpa::SignedPrevote {
1090						prevote,
1091						signature: vote.signature,
1092						id: vote.id,
1093					});
1094				},
1095				finality_grandpa::Message::Precommit(precommit) => {
1096					precommits.push(finality_grandpa::SignedPrecommit {
1097						precommit,
1098						signature: vote.signature,
1099						id: vote.id,
1100					});
1101				},
1102				_ => {},
1103			}
1104		}
1105
1106		let (base_hash, base_number) = last_completed_round.base;
1107
1108		let catch_up = CatchUp::<Block::Header> {
1109			round_number: last_completed_round.number,
1110			prevotes,
1111			precommits,
1112			base_hash,
1113			base_number,
1114		};
1115
1116		let full_catch_up = GossipMessage::CatchUp::<Block>(FullCatchUpMessage {
1117			set_id: request.set_id,
1118			message: catch_up,
1119		});
1120
1121		(Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY))
1122	}
1123
1124	fn try_catch_up(&mut self, who: &PeerId) -> (Option<GossipMessage<Block>>, Option<Report>) {
1125		let mut catch_up = None;
1126		let mut report = None;
1127
1128		// if the peer is on the same set and ahead of us by a margin bigger
1129		// than `CATCH_UP_THRESHOLD` then we should ask it for a catch up
1130		// message. we only send catch-up requests to authorities, observers
1131		// won't be able to reply since they don't follow the full GRANDPA
1132		// protocol and therefore might not have the vote data available.
1133		if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) {
1134			if self.catch_up_config.request_allowed(peer) &&
1135				peer.view.set_id == local_view.set_id &&
1136				peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0
1137			{
1138				// send catch up request if allowed
1139				let round = peer.view.round.0 - 1; // peer.view.round is > 0
1140				let request =
1141					CatchUpRequestMessage { set_id: peer.view.set_id, round: Round(round) };
1142
1143				let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request);
1144
1145				if catch_up_allowed {
1146					debug!(
1147						target: LOG_TARGET,
1148						"Sending catch-up request for round {} to {}", round, who,
1149					);
1150
1151					catch_up = Some(GossipMessage::<Block>::CatchUpRequest(request));
1152				}
1153
1154				report = catch_up_report;
1155			}
1156		}
1157
1158		(catch_up, report)
1159	}
1160
1161	fn import_neighbor_message(
1162		&mut self,
1163		who: &PeerId,
1164		update: NeighborPacket<NumberFor<Block>>,
1165	) -> (Vec<Block::Hash>, Action<Block::Hash>, Option<GossipMessage<Block>>, Option<Report>) {
1166		let update_res = self.peers.update_peer_state(who, update);
1167
1168		let (cost_benefit, topics) = match update_res {
1169			Ok(view) =>
1170				(benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::<Block>(view))),
1171			Err(misbehavior) => (misbehavior.cost(), None),
1172		};
1173
1174		let (catch_up, report) =
1175			if update_res.is_ok() { self.try_catch_up(who) } else { (None, None) };
1176
1177		let neighbor_topics = topics.unwrap_or_default();
1178
1179		// always discard neighbor messages, it's only valid for one hop.
1180		let action = Action::Discard(cost_benefit);
1181
1182		(neighbor_topics, action, catch_up, report)
1183	}
1184
1185	fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
1186		self.local_view.as_ref().map(|local_view| {
1187			let packet = NeighborPacket {
1188				round: local_view.round,
1189				set_id: local_view.set_id,
1190				commit_finalized_height: *local_view.last_commit_height().unwrap_or(&Zero::zero()),
1191			};
1192
1193			let peers = self.peers.inner.keys().cloned().collect();
1194
1195			(peers, packet)
1196		})
1197	}
1198
1199	fn note_catch_up_request(
1200		&mut self,
1201		who: &PeerId,
1202		catch_up_request: &CatchUpRequestMessage,
1203	) -> (bool, Option<Report>) {
1204		let report = match &self.pending_catch_up {
1205			PendingCatchUp::Requesting { who: peer, instant, .. } => {
1206				if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT {
1207					return (false, None)
1208				} else {
1209					// report peer for timeout
1210					Some((*peer, cost::CATCH_UP_REQUEST_TIMEOUT))
1211				}
1212			},
1213			PendingCatchUp::Processing { instant, .. } => {
1214				if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT {
1215					return (false, None)
1216				} else {
1217					None
1218				}
1219			},
1220			_ => None,
1221		};
1222
1223		self.pending_catch_up = PendingCatchUp::Requesting {
1224			who: *who,
1225			request: catch_up_request.clone(),
1226			instant: Instant::now(),
1227		};
1228
1229		(true, report)
1230	}
1231
1232	/// The initial logic for filtering round messages follows the given state
1233	/// transitions:
1234	///
1235	/// - State 1: allowed to LUCKY_PEERS random peers (where at least LUCKY_PEERS/2 are
1236	///   authorities)
1237	/// - State 2: allowed to max(LUCKY_PEERS, sqrt(random peers)) (where at least LUCKY_PEERS are
1238	///   authorities)
1239	/// - State 3: allowed to all peers
1240	///
1241	/// Transitions will be triggered on repropagation attempts by the underlying gossip layer.
1242	fn round_message_allowed(&self, who: &PeerId) -> bool {
1243		let round_duration = self.config.gossip_duration * ROUND_DURATION;
1244		let round_elapsed = match self.local_view {
1245			Some(ref local_view) => local_view.round_start.elapsed(),
1246			None => return false,
1247		};
1248
1249		if round_elapsed < round_duration.mul_f32(PROPAGATION_SOME) {
1250			self.peers.first_stage_peers.contains(who)
1251		} else if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
1252			self.peers.first_stage_peers.contains(who) ||
1253				self.peers.second_stage_peers.contains(who)
1254		} else {
1255			self.peers.peer(who).map(|info| !info.roles.is_light()).unwrap_or(false)
1256		}
1257	}
1258
1259	/// The initial logic for filtering global messages follows the given state
1260	/// transitions:
1261	///
1262	/// - State 1: allowed to max(LUCKY_PEERS, sqrt(peers)) (where at least LUCKY_PEERS are
1263	///   authorities)
1264	/// - State 2: allowed to all peers
1265	///
1266	/// We are more lenient with global messages since there should be a lot
1267	/// less global messages than round messages (just commits), and we want
1268	/// these to propagate to non-authorities fast enough so that they can
1269	/// observe finality.
1270	///
1271	/// Transitions will be triggered on repropagation attempts by the
1272	/// underlying gossip layer, which should happen every 30 seconds.
1273	fn global_message_allowed(&self, who: &PeerId) -> bool {
1274		let round_duration = self.config.gossip_duration * ROUND_DURATION;
1275		let round_elapsed = match self.local_view {
1276			Some(ref local_view) => local_view.round_start.elapsed(),
1277			None => return false,
1278		};
1279
1280		if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) {
1281			self.peers.first_stage_peers.contains(who) ||
1282				self.peers.second_stage_peers.contains(who) ||
1283				self.peers.lucky_light_peers.contains(who)
1284		} else {
1285			true
1286		}
1287	}
1288}
1289
1290// Prometheus metrics for [`GossipValidator`].
1291pub(crate) struct Metrics {
1292	messages_validated: CounterVec<U64>,
1293}
1294
1295impl Metrics {
1296	pub(crate) fn register(
1297		registry: &prometheus_endpoint::Registry,
1298	) -> Result<Self, PrometheusError> {
1299		Ok(Self {
1300			messages_validated: register(
1301				CounterVec::new(
1302					Opts::new(
1303						"substrate_finality_grandpa_communication_gossip_validator_messages",
1304						"Number of messages validated by the finality grandpa gossip validator.",
1305					),
1306					&["message", "action"],
1307				)?,
1308				registry,
1309			)?,
1310		})
1311	}
1312}
1313
1314/// A validator for GRANDPA gossip messages.
1315pub(super) struct GossipValidator<Block: BlockT> {
1316	inner: parking_lot::RwLock<Inner<Block>>,
1317	set_state: environment::SharedVoterSetState<Block>,
1318	report_sender: TracingUnboundedSender<PeerReport>,
1319	metrics: Option<Metrics>,
1320	telemetry: Option<TelemetryHandle>,
1321}
1322
1323impl<Block: BlockT> GossipValidator<Block> {
1324	/// Create a new gossip-validator. The current set is initialized to 0. If
1325	/// `catch_up_enabled` is set to false then the validator will not issue any
1326	/// catch up requests (useful e.g. when running just the GRANDPA observer).
1327	pub(super) fn new(
1328		config: crate::Config,
1329		set_state: environment::SharedVoterSetState<Block>,
1330		prometheus_registry: Option<&Registry>,
1331		telemetry: Option<TelemetryHandle>,
1332	) -> (GossipValidator<Block>, TracingUnboundedReceiver<PeerReport>) {
1333		let metrics = match prometheus_registry.map(Metrics::register) {
1334			Some(Ok(metrics)) => Some(metrics),
1335			Some(Err(e)) => {
1336				debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
1337				None
1338			},
1339			None => None,
1340		};
1341
1342		let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
1343		let val = GossipValidator {
1344			inner: parking_lot::RwLock::new(Inner::new(config)),
1345			set_state,
1346			report_sender: tx,
1347			metrics,
1348			telemetry,
1349		};
1350
1351		(val, rx)
1352	}
1353
1354	/// Note a round in the current set has started.
1355	pub(super) fn note_round<F>(&self, round: Round, send_neighbor: F)
1356	where
1357		F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1358	{
1359		let maybe_msg = self.inner.write().note_round(round);
1360		if let Some((to, msg)) = maybe_msg {
1361			send_neighbor(to, msg);
1362		}
1363	}
1364
1365	/// Note that a voter set with given ID has started. Updates the current set to given
1366	/// value and initializes the round to 0.
1367	pub(super) fn note_set<F>(&self, set_id: SetId, authorities: Vec<AuthorityId>, send_neighbor: F)
1368	where
1369		F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1370	{
1371		let maybe_msg = self.inner.write().note_set(set_id, authorities);
1372		if let Some((to, msg)) = maybe_msg {
1373			send_neighbor(to, msg);
1374		}
1375	}
1376
1377	/// Note that we've imported a commit finalizing a given block.
1378	/// `set_id` & `round` are the ones the commit message is from and not necessarily
1379	/// the latest set ID & round started.
1380	pub(super) fn note_commit_finalized<F>(
1381		&self,
1382		round: Round,
1383		set_id: SetId,
1384		finalized: NumberFor<Block>,
1385		send_neighbor: F,
1386	) where
1387		F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>),
1388	{
1389		let maybe_msg = self.inner.write().note_commit_finalized(round, set_id, finalized);
1390
1391		if let Some((to, msg)) = maybe_msg {
1392			send_neighbor(to, msg);
1393		}
1394	}
1395
1396	/// Note that we've processed a catch up message.
1397	pub(super) fn note_catch_up_message_processed(&self) {
1398		self.inner.write().note_catch_up_message_processed();
1399	}
1400
1401	fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
1402		let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
1403	}
1404
1405	pub(super) fn do_validate(
1406		&self,
1407		who: &PeerId,
1408		mut data: &[u8],
1409	) -> (Action<Block::Hash>, Vec<Block::Hash>, Option<GossipMessage<Block>>) {
1410		let mut broadcast_topics = Vec::new();
1411		let mut peer_reply = None;
1412
1413		// Message name for Prometheus metric recording.
1414		let message_name;
1415
1416		let action = {
1417			match GossipMessage::<Block>::decode_all(&mut data) {
1418				Ok(GossipMessage::Vote(ref message)) => {
1419					message_name = Some("vote");
1420					self.inner.write().validate_round_message(who, message)
1421				},
1422				Ok(GossipMessage::Commit(ref message)) => {
1423					message_name = Some("commit");
1424					self.inner.write().validate_commit_message(who, message)
1425				},
1426				Ok(GossipMessage::Neighbor(update)) => {
1427					message_name = Some("neighbor");
1428					let (topics, action, catch_up, report) = self
1429						.inner
1430						.write()
1431						.import_neighbor_message(who, update.into_neighbor_packet());
1432
1433					if let Some((peer, cost_benefit)) = report {
1434						self.report(peer, cost_benefit);
1435					}
1436
1437					broadcast_topics = topics;
1438					peer_reply = catch_up;
1439					action
1440				},
1441				Ok(GossipMessage::CatchUp(ref message)) => {
1442					message_name = Some("catch_up");
1443					self.inner.write().validate_catch_up_message(who, message)
1444				},
1445				Ok(GossipMessage::CatchUpRequest(request)) => {
1446					message_name = Some("catch_up_request");
1447					let (reply, action) =
1448						self.inner.write().handle_catch_up_request(who, request, &self.set_state);
1449
1450					peer_reply = reply;
1451					action
1452				},
1453				Err(e) => {
1454					message_name = None;
1455					debug!(target: LOG_TARGET, "Error decoding message: {}", e);
1456					telemetry!(
1457						self.telemetry;
1458						CONSENSUS_DEBUG;
1459						"afg.err_decoding_msg";
1460						"" => "",
1461					);
1462
1463					let len = std::cmp::min(i32::MAX as usize, data.len()) as i32;
1464					Action::Discard(Misbehavior::UndecodablePacket(len).cost())
1465				},
1466			}
1467		};
1468
1469		// Prometheus metric recording.
1470		if let (Some(metrics), Some(message_name)) = (&self.metrics, message_name) {
1471			let action_name = match action {
1472				Action::Keep(_, _) => "keep",
1473				Action::ProcessAndDiscard(_, _) => "process_and_discard",
1474				Action::Discard(_) => "discard",
1475			};
1476			metrics.messages_validated.with_label_values(&[message_name, action_name]).inc();
1477		}
1478
1479		(action, broadcast_topics, peer_reply)
1480	}
1481
1482	#[cfg(test)]
1483	fn inner(&self) -> &parking_lot::RwLock<Inner<Block>> {
1484		&self.inner
1485	}
1486}
1487
1488impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Block> {
1489	fn new_peer(
1490		&self,
1491		context: &mut dyn ValidatorContext<Block>,
1492		who: &PeerId,
1493		roles: ObservedRole,
1494	) {
1495		let packet = {
1496			let mut inner = self.inner.write();
1497			inner.peers.new_peer(*who, roles);
1498
1499			inner.local_view.as_ref().map(|v| NeighborPacket {
1500				round: v.round,
1501				set_id: v.set_id,
1502				commit_finalized_height: *v.last_commit_height().unwrap_or(&Zero::zero()),
1503			})
1504		};
1505
1506		if let Some(packet) = packet {
1507			let packet_data = GossipMessage::<Block>::from(packet).encode();
1508			context.send_message(who, packet_data);
1509		}
1510	}
1511
1512	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
1513		self.inner.write().peers.peer_disconnected(who);
1514	}
1515
1516	fn validate(
1517		&self,
1518		context: &mut dyn ValidatorContext<Block>,
1519		who: &PeerId,
1520		data: &[u8],
1521	) -> sc_network_gossip::ValidationResult<Block::Hash> {
1522		let (action, broadcast_topics, peer_reply) = self.do_validate(who, data);
1523
1524		// not with lock held!
1525		if let Some(msg) = peer_reply {
1526			context.send_message(who, msg.encode());
1527		}
1528
1529		for topic in broadcast_topics {
1530			context.send_topic(who, topic, false);
1531		}
1532
1533		match action {
1534			Action::Keep(topic, cb) => {
1535				self.report(*who, cb);
1536				context.broadcast_message(topic, data.to_vec(), false);
1537				sc_network_gossip::ValidationResult::ProcessAndKeep(topic)
1538			},
1539			Action::ProcessAndDiscard(topic, cb) => {
1540				self.report(*who, cb);
1541				sc_network_gossip::ValidationResult::ProcessAndDiscard(topic)
1542			},
1543			Action::Discard(cb) => {
1544				self.report(*who, cb);
1545				sc_network_gossip::ValidationResult::Discard
1546			},
1547		}
1548	}
1549
1550	fn message_allowed<'a>(
1551		&'a self,
1552	) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
1553		let (inner, do_rebroadcast) = {
1554			use parking_lot::RwLockWriteGuard;
1555
1556			let mut inner = self.inner.write();
1557			let now = Instant::now();
1558			let do_rebroadcast = if now >= inner.next_rebroadcast {
1559				inner.next_rebroadcast = now + REBROADCAST_AFTER;
1560				true
1561			} else {
1562				false
1563			};
1564
1565			// downgrade to read-lock.
1566			(RwLockWriteGuard::downgrade(inner), do_rebroadcast)
1567		};
1568
1569		Box::new(move |who, intent, topic, mut data| {
1570			if let MessageIntent::PeriodicRebroadcast = intent {
1571				return do_rebroadcast
1572			}
1573
1574			let peer = match inner.peers.peer(who) {
1575				None => return false,
1576				Some(x) => x,
1577			};
1578
1579			// if the topic is not something we're keeping at the moment,
1580			// do not send.
1581			let Some((maybe_round, set_id)) = inner.live_topics.topic_info(topic) else {
1582				return false
1583			};
1584
1585			if let MessageIntent::Broadcast = intent {
1586				if maybe_round.is_some() {
1587					if !inner.round_message_allowed(who) {
1588						// early return if the vote message isn't allowed at this stage.
1589						return false
1590					}
1591				} else if !inner.global_message_allowed(who) {
1592					// early return if the global message isn't allowed at this stage.
1593					return false
1594				}
1595			}
1596
1597			// if the topic is not something the peer accepts, discard.
1598			if let Some(round) = maybe_round {
1599				return peer.view.consider_vote(round, set_id) == Consider::Accept
1600			}
1601
1602			// global message.
1603			let Some(local_view) = &inner.local_view else {
1604				return false // cannot evaluate until we have a local view.
1605			};
1606
1607			match GossipMessage::<Block>::decode_all(&mut data) {
1608				Err(_) => false,
1609				Ok(GossipMessage::Commit(full)) => {
1610					// we only broadcast commit messages if they're for the same
1611					// set the peer is in and if the commit is better than the
1612					// last received by peer, additionally we make sure to only
1613					// broadcast our best commit.
1614					peer.view.consider_global(set_id, full.message.target_number) ==
1615						Consider::Accept && Some(&full.message.target_number) ==
1616						local_view.last_commit_height()
1617				},
1618				Ok(GossipMessage::Neighbor(_)) => false,
1619				Ok(GossipMessage::CatchUpRequest(_)) => false,
1620				Ok(GossipMessage::CatchUp(_)) => false,
1621				Ok(GossipMessage::Vote(_)) => false, // should not be the case.
1622			}
1623		})
1624	}
1625
1626	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
1627		let inner = self.inner.read();
1628		Box::new(move |topic, mut data| {
1629			// if the topic is not one of the ones that we are keeping at the moment,
1630			// it is expired.
1631			match inner.live_topics.topic_info(&topic) {
1632				None => return true,
1633				// round messages don't require further checking.
1634				Some((Some(_), _)) => return false,
1635				Some((None, _)) => {},
1636			};
1637
1638			let Some(local_view) = &inner.local_view else {
1639				return true // no local view means we can't evaluate or hold any topic.
1640			};
1641
1642			// global messages -- only keep the best commit.
1643			match GossipMessage::<Block>::decode_all(&mut data) {
1644				Err(_) => true,
1645				Ok(GossipMessage::Commit(full)) => match local_view.last_commit {
1646					Some((number, round, set_id)) =>
1647					// we expire any commit message that doesn't target the same block
1648					// as our best commit or isn't from the same round and set id
1649						!(full.message.target_number == number &&
1650							full.round == round && full.set_id == set_id),
1651					None => true,
1652				},
1653				Ok(_) => true,
1654			}
1655		})
1656	}
1657}
1658
1659/// Report specifying a reputation change for a given peer.
1660pub(super) struct PeerReport {
1661	pub who: PeerId,
1662	pub cost_benefit: ReputationChange,
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667	use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *};
1668	use crate::communication;
1669	use sc_network::config::Role;
1670	use sc_network_gossip::Validator as GossipValidatorT;
1671	use sp_core::{crypto::UncheckedFrom, H256};
1672	use std::time::Instant;
1673	use substrate_test_runtime_client::runtime::{Block, Header};
1674
1675	// some random config (not really needed)
1676	fn config() -> crate::Config {
1677		crate::Config {
1678			gossip_duration: Duration::from_millis(10),
1679			justification_generation_period: 256,
1680			keystore: None,
1681			name: None,
1682			local_role: Role::Authority,
1683			observer_enabled: true,
1684			telemetry: None,
1685			protocol_name: communication::grandpa_protocol_name::NAME.into(),
1686		}
1687	}
1688
1689	// dummy voter set state
1690	fn voter_set_state() -> SharedVoterSetState<Block> {
1691		use crate::{authorities::AuthoritySet, environment::VoterSetState};
1692
1693		let base = (H256::zero(), 0);
1694
1695		let voters = vec![(AuthorityId::unchecked_from([1; 32]), 1)];
1696		let voters = AuthoritySet::genesis(voters).unwrap();
1697
1698		let set_state = VoterSetState::live(0, &voters, base);
1699
1700		set_state.into()
1701	}
1702
1703	#[test]
1704	fn view_vote_rules() {
1705		let view = View {
1706			round: Round(100),
1707			set_id: SetId(1),
1708			last_commit: Some(1000u64),
1709			last_update: None,
1710		};
1711
1712		assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast);
1713		assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast);
1714		assert_eq!(view.consider_vote(Round(1000), SetId(0)), Consider::RejectPast);
1715
1716		assert_eq!(view.consider_vote(Round(99), SetId(1)), Consider::Accept);
1717		assert_eq!(view.consider_vote(Round(100), SetId(1)), Consider::Accept);
1718		assert_eq!(view.consider_vote(Round(101), SetId(1)), Consider::Accept);
1719
1720		assert_eq!(view.consider_vote(Round(102), SetId(1)), Consider::RejectFuture);
1721		assert_eq!(view.consider_vote(Round(1), SetId(2)), Consider::RejectFuture);
1722		assert_eq!(view.consider_vote(Round(1000), SetId(2)), Consider::RejectFuture);
1723	}
1724
1725	#[test]
1726	fn view_global_message_rules() {
1727		let view = View {
1728			round: Round(100),
1729			set_id: SetId(2),
1730			last_commit: Some(1000u64),
1731			last_update: None,
1732		};
1733
1734		assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture);
1735		assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture);
1736		assert_eq!(view.consider_global(SetId(3), 10000), Consider::RejectFuture);
1737
1738		assert_eq!(view.consider_global(SetId(1), 1), Consider::RejectPast);
1739		assert_eq!(view.consider_global(SetId(1), 1000), Consider::RejectPast);
1740		assert_eq!(view.consider_global(SetId(1), 10000), Consider::RejectPast);
1741
1742		assert_eq!(view.consider_global(SetId(2), 1), Consider::RejectPast);
1743		assert_eq!(view.consider_global(SetId(2), 1000), Consider::RejectPast);
1744		assert_eq!(view.consider_global(SetId(2), 1001), Consider::Accept);
1745		assert_eq!(view.consider_global(SetId(2), 10000), Consider::Accept);
1746	}
1747
1748	#[test]
1749	fn unknown_peer_cannot_be_updated() {
1750		let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
1751		let id = PeerId::random();
1752
1753		let update =
1754			NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50 };
1755
1756		let res = peers.update_peer_state(&id, update.clone());
1757		assert!(res.unwrap().is_none());
1758
1759		// connect & disconnect.
1760		peers.new_peer(id, ObservedRole::Authority);
1761		peers.peer_disconnected(&id);
1762
1763		let res = peers.update_peer_state(&id, update.clone());
1764		assert!(res.unwrap().is_none());
1765	}
1766
1767	#[test]
1768	fn update_peer_state() {
1769		let update1 =
1770			NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50u32 };
1771
1772		let update2 =
1773			NeighborPacket { round: Round(6), set_id: SetId(10), commit_finalized_height: 60 };
1774
1775		let update3 =
1776			NeighborPacket { round: Round(2), set_id: SetId(11), commit_finalized_height: 61 };
1777
1778		let update4 =
1779			NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 };
1780
1781		// Use shorter rebroadcast period to safely roll the clock back in the last test
1782		// and don't hit the system boot time on systems with unsigned time.
1783		const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1);
1784		let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD);
1785		let id = PeerId::random();
1786
1787		peers.new_peer(id, ObservedRole::Authority);
1788
1789		let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| {
1790			let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap();
1791			assert_eq!(view.round, update.round);
1792			assert_eq!(view.set_id, update.set_id);
1793			assert_eq!(view.last_commit, Some(update.commit_finalized_height));
1794		};
1795
1796		check_update(&mut peers, update1);
1797		check_update(&mut peers, update2);
1798		check_update(&mut peers, update3);
1799		check_update(&mut peers, update4.clone());
1800
1801		// Allow duplicate neighbor packets if enough time has passed.
1802		peers.inner.get_mut(&id).unwrap().view.last_update =
1803			Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD);
1804		check_update(&mut peers, update4);
1805	}
1806
1807	#[test]
1808	fn invalid_view_change() {
1809		let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
1810
1811		let id = PeerId::random();
1812		peers.new_peer(id, ObservedRole::Authority);
1813
1814		peers
1815			.update_peer_state(
1816				&id,
1817				NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
1818			)
1819			.unwrap()
1820			.unwrap();
1821
1822		let mut check_update = move |update: NeighborPacket<_>, misbehavior| {
1823			let err = peers.update_peer_state(&id, update.clone()).unwrap_err();
1824			assert_eq!(err, misbehavior);
1825		};
1826
1827		// round moves backwards.
1828		check_update(
1829			NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 },
1830			Misbehavior::InvalidViewChange,
1831		);
1832		// set ID moves backwards.
1833		check_update(
1834			NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 },
1835			Misbehavior::InvalidViewChange,
1836		);
1837		// commit finalized height moves backwards.
1838		check_update(
1839			NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 },
1840			Misbehavior::InvalidViewChange,
1841		);
1842		// duplicate packet without grace period.
1843		check_update(
1844			NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
1845			Misbehavior::DuplicateNeighborMessage,
1846		);
1847		// commit finalized height moves backwards while round moves forward.
1848		check_update(
1849			NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 },
1850			Misbehavior::InvalidViewChange,
1851		);
1852		// commit finalized height moves backwards while set ID moves forward.
1853		check_update(
1854			NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 },
1855			Misbehavior::InvalidViewChange,
1856		);
1857	}
1858
1859	#[test]
1860	fn messages_not_expired_immediately() {
1861		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1862
1863		let set_id = 1;
1864
1865		val.note_set(SetId(set_id), Vec::new(), |_, _| {});
1866
1867		for round_num in 1u64..10 {
1868			val.note_round(Round(round_num), |_, _| {});
1869		}
1870
1871		{
1872			let mut is_expired = val.message_expired();
1873			let last_kept_round = 10u64 - KEEP_RECENT_ROUNDS as u64 - 1;
1874
1875			// messages from old rounds are expired.
1876			for round_num in 1u64..last_kept_round {
1877				let topic = communication::round_topic::<Block>(round_num, 1);
1878				assert!(is_expired(topic, &[1, 2, 3]));
1879			}
1880
1881			// messages from not-too-old rounds are not expired.
1882			for round_num in last_kept_round..10 {
1883				let topic = communication::round_topic::<Block>(round_num, 1);
1884				assert!(!is_expired(topic, &[1, 2, 3]));
1885			}
1886		}
1887	}
1888
1889	#[test]
1890	fn message_from_unknown_authority_discarded() {
1891		assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE);
1892
1893		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1894		let set_id = 1;
1895		let auth = AuthorityId::unchecked_from([1u8; 32]);
1896		let peer = PeerId::random();
1897
1898		val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
1899		val.note_round(Round(1), |_, _| {});
1900
1901		let inner = val.inner.read();
1902		let unknown_voter = inner.validate_round_message(
1903			&peer,
1904			&VoteMessage {
1905				round: Round(1),
1906				set_id: SetId(set_id),
1907				message: SignedMessage::<Header> {
1908					message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
1909						target_hash: Default::default(),
1910						target_number: 10,
1911					}),
1912					signature: UncheckedFrom::unchecked_from([1; 64]),
1913					id: UncheckedFrom::unchecked_from([2u8; 32]),
1914				},
1915			},
1916		);
1917
1918		let bad_sig = inner.validate_round_message(
1919			&peer,
1920			&VoteMessage {
1921				round: Round(1),
1922				set_id: SetId(set_id),
1923				message: SignedMessage::<Header> {
1924					message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote {
1925						target_hash: Default::default(),
1926						target_number: 10,
1927					}),
1928					signature: UncheckedFrom::unchecked_from([1; 64]),
1929					id: auth.clone(),
1930				},
1931			},
1932		);
1933
1934		assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER));
1935		assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE));
1936	}
1937
1938	#[test]
1939	fn unsolicited_catch_up_messages_discarded() {
1940		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
1941
1942		let set_id = 1;
1943		let auth = AuthorityId::unchecked_from([1u8; 32]);
1944		let peer = PeerId::random();
1945
1946		val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
1947		val.note_round(Round(1), |_, _| {});
1948
1949		let validate_catch_up = || {
1950			let mut inner = val.inner.write();
1951			inner.validate_catch_up_message(
1952				&peer,
1953				&FullCatchUpMessage {
1954					set_id: SetId(set_id),
1955					message: finality_grandpa::CatchUp {
1956						round_number: 10,
1957						prevotes: Default::default(),
1958						precommits: Default::default(),
1959						base_hash: Default::default(),
1960						base_number: Default::default(),
1961					},
1962				},
1963			)
1964		};
1965
1966		// the catch up is discarded because we have no pending request
1967		assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
1968
1969		let noted = val.inner.write().note_catch_up_request(
1970			&peer,
1971			&CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
1972		);
1973
1974		assert!(noted.0);
1975
1976		// catch up is allowed because we have requested it, but it's rejected
1977		// because it's malformed (empty prevotes and precommits)
1978		assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP));
1979	}
1980
1981	#[test]
1982	fn unanswerable_catch_up_requests_discarded() {
1983		// create voter set state with round 2 completed
1984		let set_state: SharedVoterSetState<Block> = {
1985			let mut completed_rounds = voter_set_state().read().completed_rounds();
1986
1987			completed_rounds.push(environment::CompletedRound {
1988				number: 2,
1989				state: finality_grandpa::round::State::genesis(Default::default()),
1990				base: Default::default(),
1991				votes: Default::default(),
1992			});
1993
1994			let mut current_rounds = environment::CurrentRounds::<Block>::new();
1995			current_rounds.insert(3, environment::HasVoted::No);
1996
1997			let set_state =
1998				environment::VoterSetState::<Block>::Live { completed_rounds, current_rounds };
1999
2000			set_state.into()
2001		};
2002
2003		let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
2004
2005		let set_id = 1;
2006		let auth = AuthorityId::unchecked_from([1u8; 32]);
2007		let peer = PeerId::random();
2008
2009		val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
2010		val.note_round(Round(3), |_, _| {});
2011
2012		// add the peer making the request to the validator,
2013		// otherwise it is discarded
2014		let mut inner = val.inner.write();
2015		inner.peers.new_peer(peer, ObservedRole::Authority);
2016
2017		let res = inner.handle_catch_up_request(
2018			&peer,
2019			CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) },
2020			&set_state,
2021		);
2022
2023		// we're at round 3, a catch up request for round 10 is out of scope
2024		assert!(res.0.is_none());
2025		assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
2026
2027		let res = inner.handle_catch_up_request(
2028			&peer,
2029			CatchUpRequestMessage { set_id: SetId(set_id), round: Round(2) },
2030			&set_state,
2031		);
2032
2033		// a catch up request for round 2 should be answered successfully
2034		match res.0.unwrap() {
2035			GossipMessage::CatchUp(catch_up) => {
2036				assert_eq!(catch_up.set_id, SetId(set_id));
2037				assert_eq!(catch_up.message.round_number, 2);
2038
2039				assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY));
2040			},
2041			_ => panic!("expected catch up message"),
2042		};
2043	}
2044
2045	#[test]
2046	fn detects_honest_out_of_scope_catch_requests() {
2047		let set_state = voter_set_state();
2048		let (val, _) = GossipValidator::<Block>::new(config(), set_state.clone(), None, None);
2049
2050		// the validator starts at set id 2
2051		val.note_set(SetId(2), Vec::new(), |_, _| {});
2052
2053		// add the peer making the request to the validator,
2054		// otherwise it is discarded
2055		let peer = PeerId::random();
2056		val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2057
2058		let send_request = |set_id, round| {
2059			let mut inner = val.inner.write();
2060			inner.handle_catch_up_request(
2061				&peer,
2062				CatchUpRequestMessage { set_id: SetId(set_id), round: Round(round) },
2063				&set_state,
2064			)
2065		};
2066
2067		let assert_res = |res: (Option<_>, Action<_>), honest| {
2068			assert!(res.0.is_none());
2069			assert_eq!(
2070				res.1,
2071				if honest {
2072					Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP)
2073				} else {
2074					Action::Discard(Misbehavior::OutOfScopeMessage.cost())
2075				},
2076			);
2077		};
2078
2079		// the validator is at set id 2 and round 0. requests for set id 1
2080		// should not be answered but they should be considered an honest
2081		// mistake
2082		assert_res(send_request(1, 1), true);
2083
2084		assert_res(send_request(1, 10), true);
2085
2086		// requests for set id 0 should be considered out of scope
2087		assert_res(send_request(0, 1), false);
2088
2089		assert_res(send_request(0, 10), false);
2090
2091		// after the validator progresses further than CATCH_UP_THRESHOLD in set
2092		// id 2, any request for set id 1 should no longer be considered an
2093		// honest mistake.
2094		val.note_round(Round(3), |_, _| {});
2095
2096		assert_res(send_request(1, 1), false);
2097
2098		assert_res(send_request(1, 2), false);
2099	}
2100
2101	#[test]
2102	fn issues_catch_up_request_on_neighbor_packet_import() {
2103		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2104
2105		// the validator starts at set id 1.
2106		val.note_set(SetId(1), Vec::new(), |_, _| {});
2107
2108		// add the peer making the request to the validator,
2109		// otherwise it is discarded.
2110		let peer = PeerId::random();
2111		val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2112
2113		let import_neighbor_message = |set_id, round| {
2114			let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2115				&peer,
2116				NeighborPacket {
2117					round: Round(round),
2118					set_id: SetId(set_id),
2119					commit_finalized_height: 42,
2120				},
2121			);
2122
2123			catch_up_request
2124		};
2125
2126		// importing a neighbor message from a peer in the same set in a later
2127		// round should lead to a catch up request for the previous round.
2128		match import_neighbor_message(1, 42) {
2129			Some(GossipMessage::CatchUpRequest(request)) => {
2130				assert_eq!(request.set_id, SetId(1));
2131				assert_eq!(request.round, Round(41));
2132			},
2133			_ => panic!("expected catch up message"),
2134		}
2135
2136		// we note that we're at round 41.
2137		val.note_round(Round(41), |_, _| {});
2138
2139		// if we import a neighbor message within CATCH_UP_THRESHOLD then we
2140		// won't request a catch up.
2141		match import_neighbor_message(1, 42) {
2142			None => {},
2143			_ => panic!("expected no catch up message"),
2144		}
2145
2146		// or if the peer is on a lower round.
2147		match import_neighbor_message(1, 40) {
2148			None => {},
2149			_ => panic!("expected no catch up message"),
2150		}
2151
2152		// we also don't request a catch up if the peer is in a different set.
2153		match import_neighbor_message(2, 42) {
2154			None => {},
2155			_ => panic!("expected no catch up message"),
2156		}
2157	}
2158
2159	#[test]
2160	fn doesnt_send_catch_up_requests_when_disabled() {
2161		// we create a gossip validator with catch up requests disabled.
2162		let config = {
2163			let mut c = config();
2164
2165			// if the observer protocol is enabled and we are not an authority,
2166			// then we don't issue any catch-up requests.
2167			c.local_role = Role::Full;
2168			c.observer_enabled = true;
2169
2170			c
2171		};
2172
2173		let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2174
2175		// the validator starts at set id 1.
2176		val.note_set(SetId(1), Vec::new(), |_, _| {});
2177
2178		// add the peer making the request to the validator,
2179		// otherwise it is discarded.
2180		let peer = PeerId::random();
2181		val.inner.write().peers.new_peer(peer, ObservedRole::Authority);
2182
2183		// importing a neighbor message from a peer in the same set in a later
2184		// round should lead to a catch up request but since they're disabled
2185		// we should get `None`.
2186		let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2187			&peer,
2188			NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2189		);
2190
2191		match catch_up_request {
2192			None => {},
2193			_ => panic!("expected no catch up message"),
2194		}
2195	}
2196
2197	#[test]
2198	fn doesnt_send_catch_up_requests_to_non_authorities_when_observer_enabled() {
2199		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2200
2201		// the validator starts at set id 1.
2202		val.note_set(SetId(1), Vec::new(), |_, _| {});
2203
2204		// add the peers making the requests to the validator,
2205		// otherwise it is discarded.
2206		let peer_authority = PeerId::random();
2207		let peer_full = PeerId::random();
2208
2209		val.inner.write().peers.new_peer(peer_authority, ObservedRole::Authority);
2210		val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
2211
2212		let import_neighbor_message = |peer| {
2213			let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2214				&peer,
2215				NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2216			);
2217
2218			catch_up_request
2219		};
2220
2221		// importing a neighbor message from a peer in the same set in a later
2222		// round should lead to a catch up request but since the node is not an
2223		// authority we should get `None`.
2224		if import_neighbor_message(peer_full).is_some() {
2225			panic!("expected no catch up message");
2226		}
2227
2228		// importing the same neighbor message from a peer who is an authority
2229		// should lead to a catch up request.
2230		match import_neighbor_message(peer_authority) {
2231			Some(GossipMessage::CatchUpRequest(request)) => {
2232				assert_eq!(request.set_id, SetId(1));
2233				assert_eq!(request.round, Round(41));
2234			},
2235			_ => panic!("expected catch up message"),
2236		}
2237	}
2238
2239	#[test]
2240	fn sends_catch_up_requests_to_non_authorities_when_observer_disabled() {
2241		let config = {
2242			let mut c = config();
2243
2244			// if the observer protocol is disable any full-node should be able
2245			// to answer catch-up requests.
2246			c.observer_enabled = false;
2247
2248			c
2249		};
2250
2251		let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2252
2253		// the validator starts at set id 1.
2254		val.note_set(SetId(1), Vec::new(), |_, _| {});
2255
2256		// add the peer making the requests to the validator, otherwise it is
2257		// discarded.
2258		let peer_full = PeerId::random();
2259		val.inner.write().peers.new_peer(peer_full, ObservedRole::Full);
2260
2261		let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
2262			&peer_full,
2263			NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 },
2264		);
2265
2266		// importing a neighbor message from a peer in the same set in a later
2267		// round should lead to a catch up request, the node is not an
2268		// authority, but since the observer protocol is disabled we should
2269		// issue a catch-up request to it anyway.
2270		match catch_up_request {
2271			Some(GossipMessage::CatchUpRequest(request)) => {
2272				assert_eq!(request.set_id, SetId(1));
2273				assert_eq!(request.round, Round(41));
2274			},
2275			_ => panic!("expected catch up message"),
2276		}
2277	}
2278
2279	#[test]
2280	fn doesnt_expire_next_round_messages() {
2281		// NOTE: this is a regression test
2282		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2283
2284		// the validator starts at set id 1.
2285		val.note_set(SetId(1), Vec::new(), |_, _| {});
2286
2287		// we are at round 10
2288		val.note_round(Round(9), |_, _| {});
2289		val.note_round(Round(10), |_, _| {});
2290
2291		let mut is_expired = val.message_expired();
2292
2293		// we accept messages from rounds 9, 10 and 11
2294		// therefore neither of those should be considered expired
2295		for round in &[9, 10, 11] {
2296			assert!(!is_expired(communication::round_topic::<Block>(*round, 1), &[]))
2297		}
2298	}
2299
2300	#[test]
2301	fn progressively_gossips_to_more_peers_as_round_duration_increases() {
2302		let mut config = config();
2303		config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race
2304		let round_duration = config.gossip_duration * ROUND_DURATION;
2305
2306		let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2307
2308		// the validator start at set id 0
2309		val.note_set(SetId(0), Vec::new(), |_, _| {});
2310
2311		// add 60 peers, 30 authorities and 30 full nodes
2312		let mut authorities = Vec::new();
2313		authorities.resize_with(30, || PeerId::random());
2314
2315		let mut full_nodes = Vec::new();
2316		full_nodes.resize_with(30, || PeerId::random());
2317
2318		for i in 0..30 {
2319			val.inner.write().peers.new_peer(authorities[i], ObservedRole::Authority);
2320
2321			val.inner.write().peers.new_peer(full_nodes[i], ObservedRole::Full);
2322		}
2323
2324		let test = |rounds_elapsed, peers| {
2325			// rewind n round durations
2326			val.inner.write().local_view.as_mut().unwrap().round_start = Instant::now() -
2327				Duration::from_millis(
2328					(round_duration.as_millis() as f32 * rounds_elapsed) as u64,
2329				);
2330
2331			val.inner.write().peers.reshuffle();
2332
2333			let mut message_allowed = val.message_allowed();
2334
2335			move || {
2336				let mut allowed = 0;
2337				for peer in peers {
2338					if message_allowed(
2339						peer,
2340						MessageIntent::Broadcast,
2341						&communication::round_topic::<Block>(1, 0),
2342						&[],
2343					) {
2344						allowed += 1;
2345					}
2346				}
2347				allowed
2348			}
2349		};
2350
2351		fn trial<F: FnMut() -> usize>(mut test: F) -> usize {
2352			let mut results = Vec::new();
2353			let n = 1000;
2354
2355			for _ in 0..n {
2356				results.push(test());
2357			}
2358
2359			let n = results.len();
2360			let sum: usize = results.iter().sum();
2361
2362			sum / n
2363		}
2364
2365		let all_peers = authorities.iter().chain(full_nodes.iter()).cloned().collect();
2366
2367		// on the first attempt we will only gossip to 4 peers, either
2368		// authorities or full nodes, but we'll guarantee that half of those
2369		// are authorities
2370		assert!(trial(test(1.0, &authorities)) >= LUCKY_PEERS / 2);
2371		assert_eq!(trial(test(1.0, &all_peers)), LUCKY_PEERS);
2372
2373		// after more than 1.5 round durations have elapsed we should gossip to
2374		// `sqrt(peers)` we're connected to, but we guarantee that at least 4 of
2375		// those peers are authorities (plus the `LUCKY_PEERS` from the previous
2376		// stage)
2377		assert!(trial(test(PROPAGATION_SOME * 1.1, &authorities)) >= LUCKY_PEERS);
2378		assert_eq!(
2379			trial(test(2.0, &all_peers)),
2380			LUCKY_PEERS + (all_peers.len() as f64).sqrt() as usize,
2381		);
2382
2383		// after 3 rounds durations we should gossip to all peers we are
2384		// connected to
2385		assert_eq!(trial(test(PROPAGATION_ALL * 1.1, &all_peers)), all_peers.len());
2386	}
2387
2388	#[test]
2389	fn never_gossips_round_messages_to_light_clients() {
2390		let config = config();
2391		let round_duration = config.gossip_duration * ROUND_DURATION;
2392		let (val, _) = GossipValidator::<Block>::new(config, voter_set_state(), None, None);
2393
2394		// the validator starts at set id 0
2395		val.note_set(SetId(0), Vec::new(), |_, _| {});
2396
2397		// add a new light client as peer
2398		let light_peer = PeerId::random();
2399
2400		val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2401
2402		assert!(!val.message_allowed()(
2403			&light_peer,
2404			MessageIntent::Broadcast,
2405			&communication::round_topic::<Block>(1, 0),
2406			&[],
2407		));
2408
2409		// we reverse the round start time so that the elapsed time is higher
2410		// (which should lead to more peers getting the message)
2411		val.inner.write().local_view.as_mut().unwrap().round_start =
2412			Instant::now() - round_duration * 10;
2413
2414		// even after the round has been going for 10 round durations we will never
2415		// gossip to light clients
2416		assert!(!val.message_allowed()(
2417			&light_peer,
2418			MessageIntent::Broadcast,
2419			&communication::round_topic::<Block>(1, 0),
2420			&[],
2421		));
2422
2423		// update the peer state and local state wrt commits
2424		val.inner
2425			.write()
2426			.peers
2427			.update_peer_state(
2428				&light_peer,
2429				NeighborPacket { round: Round(1), set_id: SetId(0), commit_finalized_height: 1 },
2430			)
2431			.unwrap();
2432
2433		val.note_commit_finalized(Round(1), SetId(0), 2, |_, _| {});
2434
2435		let commit = {
2436			let commit = finality_grandpa::CompactCommit {
2437				target_hash: H256::random(),
2438				target_number: 2,
2439				precommits: Vec::new(),
2440				auth_data: Vec::new(),
2441			};
2442
2443			communication::gossip::GossipMessage::<Block>::Commit(
2444				communication::gossip::FullCommitMessage {
2445					round: Round(2),
2446					set_id: SetId(0),
2447					message: commit,
2448				},
2449			)
2450			.encode()
2451		};
2452
2453		// global messages are gossiped to light clients though
2454		assert!(val.message_allowed()(
2455			&light_peer,
2456			MessageIntent::Broadcast,
2457			&communication::global_topic::<Block>(0),
2458			&commit,
2459		));
2460	}
2461
2462	#[test]
2463	fn only_gossip_commits_to_peers_on_same_set() {
2464		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2465
2466		// the validator starts at set id 1
2467		val.note_set(SetId(1), Vec::new(), |_, _| {});
2468
2469		// add a new peer at set id 1
2470		let peer1 = PeerId::random();
2471
2472		val.inner.write().peers.new_peer(peer1, ObservedRole::Authority);
2473
2474		val.inner
2475			.write()
2476			.peers
2477			.update_peer_state(
2478				&peer1,
2479				NeighborPacket { round: Round(1), set_id: SetId(1), commit_finalized_height: 1 },
2480			)
2481			.unwrap();
2482
2483		// peer2 will default to set id 0
2484		let peer2 = PeerId::random();
2485		val.inner.write().peers.new_peer(peer2, ObservedRole::Authority);
2486
2487		// create a commit for round 1 of set id 1
2488		// targeting a block at height 2
2489		let commit = {
2490			let commit = finality_grandpa::CompactCommit {
2491				target_hash: H256::random(),
2492				target_number: 2,
2493				precommits: Vec::new(),
2494				auth_data: Vec::new(),
2495			};
2496
2497			communication::gossip::GossipMessage::<Block>::Commit(
2498				communication::gossip::FullCommitMessage {
2499					round: Round(1),
2500					set_id: SetId(1),
2501					message: commit,
2502				},
2503			)
2504			.encode()
2505		};
2506
2507		// note the commit in the validator
2508		val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
2509
2510		let mut message_allowed = val.message_allowed();
2511
2512		// the commit should be allowed to peer 1
2513		assert!(message_allowed(
2514			&peer1,
2515			MessageIntent::Broadcast,
2516			&communication::global_topic::<Block>(1),
2517			&commit,
2518		));
2519
2520		// but disallowed to peer 2 since the peer is on set id 0
2521		// the commit should be allowed to peer 1
2522		assert!(!message_allowed(
2523			&peer2,
2524			MessageIntent::Broadcast,
2525			&communication::global_topic::<Block>(1),
2526			&commit,
2527		));
2528	}
2529
2530	#[test]
2531	fn expire_commits_from_older_rounds() {
2532		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2533
2534		let commit = |round, set_id, target_number| {
2535			let commit = finality_grandpa::CompactCommit {
2536				target_hash: H256::random(),
2537				target_number,
2538				precommits: Vec::new(),
2539				auth_data: Vec::new(),
2540			};
2541
2542			communication::gossip::GossipMessage::<Block>::Commit(
2543				communication::gossip::FullCommitMessage {
2544					round: Round(round),
2545					set_id: SetId(set_id),
2546					message: commit,
2547				},
2548			)
2549			.encode()
2550		};
2551
2552		// note the beginning of a new set with id 1
2553		val.note_set(SetId(1), Vec::new(), |_, _| {});
2554
2555		// note a commit for round 1 in the validator
2556		// finalizing a block at height 2
2557		val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {});
2558
2559		let mut message_expired = val.message_expired();
2560
2561		// a commit message for round 1 that finalizes the same height as we
2562		// have observed previously should not be expired
2563		assert!(!message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 2),));
2564
2565		// it should be expired if it is for a lower block
2566		assert!(message_expired(communication::global_topic::<Block>(1), &commit(1, 1, 1)));
2567
2568		// or the same block height but from the previous round
2569		assert!(message_expired(communication::global_topic::<Block>(1), &commit(0, 1, 2)));
2570	}
2571
2572	#[test]
2573	fn allow_noting_different_authorities_for_same_set() {
2574		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2575
2576		let a1 = vec![UncheckedFrom::unchecked_from([0; 32])];
2577		val.note_set(SetId(1), a1.clone(), |_, _| {});
2578
2579		assert_eq!(val.inner().read().authorities, a1);
2580
2581		let a2 =
2582			vec![UncheckedFrom::unchecked_from([1; 32]), UncheckedFrom::unchecked_from([2; 32])];
2583		val.note_set(SetId(1), a2.clone(), |_, _| {});
2584
2585		assert_eq!(val.inner().read().authorities, a2);
2586	}
2587
2588	#[test]
2589	fn sends_neighbor_packets_to_all_peers_when_starting_a_new_round() {
2590		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2591
2592		// initialize the validator to a stable set id
2593		val.note_set(SetId(1), Vec::new(), |_, _| {});
2594
2595		let authority_peer = PeerId::random();
2596		let full_peer = PeerId::random();
2597		let light_peer = PeerId::random();
2598
2599		val.inner.write().peers.new_peer(authority_peer, ObservedRole::Authority);
2600		val.inner.write().peers.new_peer(full_peer, ObservedRole::Full);
2601		val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2602
2603		val.note_round(Round(2), |peers, message| {
2604			assert_eq!(peers.len(), 3);
2605			assert!(peers.contains(&authority_peer));
2606			assert!(peers.contains(&full_peer));
2607			assert!(peers.contains(&light_peer));
2608			assert!(matches!(message, NeighborPacket { set_id: SetId(1), round: Round(2), .. }));
2609		});
2610	}
2611
2612	#[test]
2613	fn sends_neighbor_packets_to_all_peers_when_starting_a_new_set() {
2614		let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None, None);
2615
2616		// initialize the validator to a stable set id
2617		val.note_set(SetId(1), Vec::new(), |_, _| {});
2618
2619		let authority_peer = PeerId::random();
2620		let full_peer = PeerId::random();
2621		let light_peer = PeerId::random();
2622
2623		val.inner.write().peers.new_peer(authority_peer, ObservedRole::Authority);
2624		val.inner.write().peers.new_peer(full_peer, ObservedRole::Full);
2625		val.inner.write().peers.new_peer(light_peer, ObservedRole::Light);
2626
2627		val.note_set(SetId(2), Vec::new(), |peers, message| {
2628			assert_eq!(peers.len(), 3);
2629			assert!(peers.contains(&authority_peer));
2630			assert!(peers.contains(&full_peer));
2631			assert!(peers.contains(&light_peer));
2632			assert!(matches!(message, NeighborPacket { set_id: SetId(2), round: Round(1), .. }));
2633		});
2634	}
2635}