referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/communication/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Communication streams for the polite-grandpa networking protocol.
20//!
21//! GRANDPA nodes communicate over a gossip network, where messages are not sent to
22//! peers until they have reached a given round.
23//!
24//! Rather than expressing protocol rules,
25//! polite-grandpa just carries a notion of impoliteness. Nodes which pass some arbitrary
26//! threshold of impoliteness are removed. Messages are either costly, or beneficial.
27//!
28//! For instance, it is _impolite_ to send the same message more than once.
29//! In the future, there will be a fallback for allowing sending the same message
30//! under certain conditions that are used to un-stick the protocol.
31
32use futures::{channel::mpsc, prelude::*};
33use log::{debug, trace};
34use parking_lot::Mutex;
35use prometheus_endpoint::Registry;
36use std::{
37	pin::Pin,
38	sync::Arc,
39	task::{Context, Poll},
40	time::Duration,
41};
42
43use codec::{Decode, DecodeAll, Encode};
44use finality_grandpa::{
45	voter,
46	voter_set::VoterSet,
47	Message::{Precommit, Prevote, PrimaryPropose},
48};
49use sc_network::{NetworkBlock, NetworkSyncForkRequest, NotificationService, ReputationChange};
50use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
51use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
52use sp_keystore::KeystorePtr;
53use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
54
55use crate::{
56	environment::HasVoted, CatchUp, Commit, CommunicationIn, CommunicationOutH, CompactCommit,
57	Error, Message, SignedMessage, LOG_TARGET,
58};
59use gossip::{
60	FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
61};
62use sc_network_sync::SyncEventStream;
63use sc_utils::mpsc::TracingUnboundedReceiver;
64use sp_consensus_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};
65
66pub mod gossip;
67mod periodic;
68
69#[cfg(test)]
70pub(crate) mod tests;
71
72// How often to rebroadcast neighbor packets, in cases where no new packets are created.
73pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 * 60);
74
75pub mod grandpa_protocol_name {
76	use sc_chain_spec::ChainSpec;
77	use sc_network::types::ProtocolName;
78
79	pub(crate) const NAME: &str = "/grandpa/1";
80	/// Old names for the notifications protocol, used for backward compatibility.
81	pub(crate) const LEGACY_NAMES: [&str; 1] = ["/paritytech/grandpa/1"];
82
83	/// Name of the notifications protocol used by GRANDPA.
84	///
85	/// Must be registered towards the networking in order for GRANDPA to properly function.
86	pub fn standard_name<Hash: AsRef<[u8]>>(
87		genesis_hash: &Hash,
88		chain_spec: &Box<dyn ChainSpec>,
89	) -> ProtocolName {
90		let genesis_hash = genesis_hash.as_ref();
91		let chain_prefix = match chain_spec.fork_id() {
92			Some(fork_id) => format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), fork_id),
93			None => format!("/{}", array_bytes::bytes2hex("", genesis_hash)),
94		};
95		format!("{}{}", chain_prefix, NAME).into()
96	}
97}
98
99// cost scalars for reporting peers.
100mod cost {
101	use sc_network::ReputationChange as Rep;
102	pub(super) const PAST_REJECTION: Rep = Rep::new(-50, "Grandpa: Past message");
103	pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "Grandpa: Bad signature");
104	pub(super) const MALFORMED_CATCH_UP: Rep = Rep::new(-1000, "Grandpa: Malformed cath-up");
105	pub(super) const MALFORMED_COMMIT: Rep = Rep::new(-1000, "Grandpa: Malformed commit");
106	pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Future message");
107	pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");
108
109	pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
110	pub(super) const DUPLICATE_NEIGHBOR_MESSAGE: Rep =
111		Rep::new(-500, "Grandpa: Duplicate neighbor message without grace period");
112	pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
113	pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
114	pub(super) const PER_BLOCK_LOADED: i32 = -10;
115	pub(super) const INVALID_CATCH_UP: Rep = Rep::new(-5000, "Grandpa: Invalid catch-up");
116	pub(super) const INVALID_COMMIT: Rep = Rep::new(-5000, "Grandpa: Invalid commit");
117	pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Out-of-scope message");
118	pub(super) const CATCH_UP_REQUEST_TIMEOUT: Rep =
119		Rep::new(-200, "Grandpa: Catch-up request timeout");
120
121	// cost of answering a catch up request
122	pub(super) const CATCH_UP_REPLY: Rep = Rep::new(-200, "Grandpa: Catch-up reply");
123	pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: Rep =
124		Rep::new(-200, "Grandpa: Out-of-scope catch-up");
125}
126
127// benefit scalars for reporting peers.
128mod benefit {
129	use sc_network::ReputationChange as Rep;
130	pub(super) const NEIGHBOR_MESSAGE: Rep = Rep::new(100, "Grandpa: Neighbor message");
131	pub(super) const ROUND_MESSAGE: Rep = Rep::new(100, "Grandpa: Round message");
132	pub(super) const BASIC_VALIDATED_CATCH_UP: Rep = Rep::new(200, "Grandpa: Catch-up message");
133	pub(super) const BASIC_VALIDATED_COMMIT: Rep = Rep::new(100, "Grandpa: Commit");
134	pub(super) const PER_EQUIVOCATION: i32 = 10;
135}
136
137/// A type that ties together our local authority id and a keystore where it is
138/// available for signing.
139pub struct LocalIdKeystore((AuthorityId, KeystorePtr));
140
141impl LocalIdKeystore {
142	/// Returns a reference to our local authority id.
143	fn local_id(&self) -> &AuthorityId {
144		&(self.0).0
145	}
146
147	/// Returns a reference to the keystore.
148	fn keystore(&self) -> KeystorePtr {
149		(self.0).1.clone()
150	}
151}
152
153impl From<(AuthorityId, KeystorePtr)> for LocalIdKeystore {
154	fn from(inner: (AuthorityId, KeystorePtr)) -> LocalIdKeystore {
155		LocalIdKeystore(inner)
156	}
157}
158
159/// If the voter set is larger than this value some telemetry events are not
160/// sent to avoid increasing usage resource on the node and flooding the
161/// telemetry server (e.g. received votes, received commits.)
162const TELEMETRY_VOTERS_LIMIT: usize = 10;
163
164/// A handle to the network.
165///
166/// Something that provides the capabilities needed for the `gossip_network::Network` trait.
167pub trait Network<Block: BlockT>: GossipNetwork<Block> + Clone + Send + 'static {}
168
169impl<Block, T> Network<Block> for T
170where
171	Block: BlockT,
172	T: GossipNetwork<Block> + Clone + Send + 'static,
173{
174}
175
176/// A handle to syncing-related services.
177///
178/// Something that provides the ability to set a fork sync request for a particular block.
179pub trait Syncing<Block: BlockT>:
180	NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
181	+ NetworkBlock<Block::Hash, NumberFor<Block>>
182	+ SyncEventStream
183	+ Clone
184	+ Send
185	+ 'static
186{
187}
188
189impl<Block, T> Syncing<Block> for T
190where
191	Block: BlockT,
192	T: NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
193		+ NetworkBlock<Block::Hash, NumberFor<Block>>
194		+ SyncEventStream
195		+ Clone
196		+ Send
197		+ 'static,
198{
199}
200
201/// Create a unique topic for a round and set-id combo.
202pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
203	<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
204}
205
206/// Create a unique topic for global messages on a set ID.
207pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
208	<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
209}
210
211/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
212pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>, S: Syncing<B>> {
213	service: N,
214	sync: S,
215	gossip_engine: Arc<Mutex<GossipEngine<B>>>,
216	validator: Arc<GossipValidator<B>>,
217
218	/// Sender side of the neighbor packet channel.
219	///
220	/// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to
221	/// the underlying `GossipEngine`.
222	neighbor_sender: periodic::NeighborPacketSender<B>,
223
224	/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
225	// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its
226	// children, thus one has to wrap `neighbor_packet_worker` with an `Arc` `Mutex`.
227	neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
228
229	/// Receiver side of the peer report stream populated by the gossip validator, forwarded to the
230	/// gossip engine.
231	// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its
232	// children, thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given
233	// that it is just an `UnboundedReceiver`, one could also switch to a
234	// multi-producer-*multi*-consumer channel implementation.
235	gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
236
237	telemetry: Option<TelemetryHandle>,
238}
239
240impl<B: BlockT, N: Network<B>, S: Syncing<B>> Unpin for NetworkBridge<B, N, S> {}
241
242impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
243	/// Create a new NetworkBridge to the given NetworkService. Returns the service
244	/// handle.
245	/// On creation it will register previous rounds' votes with the gossip
246	/// service taken from the VoterSetState.
247	pub(crate) fn new(
248		service: N,
249		sync: S,
250		notification_service: Box<dyn NotificationService>,
251		config: crate::Config,
252		set_state: crate::environment::SharedVoterSetState<B>,
253		prometheus_registry: Option<&Registry>,
254		telemetry: Option<TelemetryHandle>,
255	) -> Self {
256		let protocol = config.protocol_name.clone();
257		let (validator, report_stream) =
258			GossipValidator::new(config, set_state.clone(), prometheus_registry, telemetry.clone());
259
260		let validator = Arc::new(validator);
261		let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
262			service.clone(),
263			sync.clone(),
264			notification_service,
265			protocol,
266			validator.clone(),
267			prometheus_registry,
268		)));
269
270		{
271			// register all previous votes with the gossip service so that they're
272			// available to peers potentially stuck on a previous round.
273			let completed = set_state.read().completed_rounds();
274			let (set_id, voters) = completed.set_info();
275			validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
276			for round in completed.iter() {
277				let topic = round_topic::<B>(round.number, set_id);
278
279				// we need to note the round with the gossip validator otherwise
280				// messages will be ignored.
281				validator.note_round(Round(round.number), |_, _| {});
282
283				for signed in round.votes.iter() {
284					let message = gossip::GossipMessage::Vote(gossip::VoteMessage::<B> {
285						message: signed.clone(),
286						round: Round(round.number),
287						set_id: SetId(set_id),
288					});
289
290					gossip_engine.lock().register_gossip_message(topic, message.encode());
291				}
292
293				trace!(
294					target: LOG_TARGET,
295					"Registered {} messages for topic {:?} (round: {}, set_id: {})",
296					round.votes.len(),
297					topic,
298					round.number,
299					set_id,
300				);
301			}
302		}
303
304		let (neighbor_packet_worker, neighbor_packet_sender) =
305			periodic::NeighborPacketWorker::new(NEIGHBOR_REBROADCAST_PERIOD);
306
307		NetworkBridge {
308			service,
309			sync,
310			gossip_engine,
311			validator,
312			neighbor_sender: neighbor_packet_sender,
313			neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
314			gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
315			telemetry,
316		}
317	}
318
319	/// Note the beginning of a new round to the `GossipValidator`.
320	pub(crate) fn note_round(&self, round: Round, set_id: SetId, voters: &VoterSet<AuthorityId>) {
321		// is a no-op if currently in that set.
322		self.validator.note_set(
323			set_id,
324			voters.iter().map(|(v, _)| v.clone()).collect(),
325			|to, neighbor| self.neighbor_sender.send(to, neighbor),
326		);
327
328		self.validator
329			.note_round(round, |to, neighbor| self.neighbor_sender.send(to, neighbor));
330	}
331
332	/// Get a stream of signature-checked round messages from the network as well as a sink for
333	/// round messages to the network all within the current set.
334	pub(crate) fn round_communication(
335		&self,
336		keystore: Option<LocalIdKeystore>,
337		round: Round,
338		set_id: SetId,
339		voters: Arc<VoterSet<AuthorityId>>,
340		has_voted: HasVoted<B::Header>,
341	) -> (impl Stream<Item = SignedMessage<B::Header>> + Unpin, OutgoingMessages<B>) {
342		self.note_round(round, set_id, &voters);
343
344		let keystore = keystore.and_then(|ks| {
345			let id = ks.local_id();
346			if voters.contains(id) {
347				Some(ks)
348			} else {
349				None
350			}
351		});
352
353		let topic = round_topic::<B>(round.0, set_id.0);
354		let telemetry = self.telemetry.clone();
355		let incoming =
356			self.gossip_engine.lock().messages_for(topic).filter_map(move |notification| {
357				let decoded = GossipMessage::<B>::decode_all(&mut &notification.message[..]);
358
359				match decoded {
360					Err(ref e) => {
361						debug!(
362							target: LOG_TARGET,
363							"Skipping malformed message {:?}: {}", notification, e
364						);
365						future::ready(None)
366					},
367					Ok(GossipMessage::Vote(msg)) => {
368						// check signature.
369						if !voters.contains(&msg.message.id) {
370							debug!(
371								target: LOG_TARGET,
372								"Skipping message from unknown voter {}", msg.message.id
373							);
374							return future::ready(None)
375						}
376
377						if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
378							match &msg.message.message {
379								PrimaryPropose(propose) => {
380									telemetry!(
381										telemetry;
382										CONSENSUS_INFO;
383										"afg.received_propose";
384										"voter" => ?format!("{}", msg.message.id),
385										"target_number" => ?propose.target_number,
386										"target_hash" => ?propose.target_hash,
387									);
388								},
389								Prevote(prevote) => {
390									telemetry!(
391										telemetry;
392										CONSENSUS_INFO;
393										"afg.received_prevote";
394										"voter" => ?format!("{}", msg.message.id),
395										"target_number" => ?prevote.target_number,
396										"target_hash" => ?prevote.target_hash,
397									);
398								},
399								Precommit(precommit) => {
400									telemetry!(
401										telemetry;
402										CONSENSUS_INFO;
403										"afg.received_precommit";
404										"voter" => ?format!("{}", msg.message.id),
405										"target_number" => ?precommit.target_number,
406										"target_hash" => ?precommit.target_hash,
407									);
408								},
409							};
410						}
411
412						future::ready(Some(msg.message))
413					},
414					_ => {
415						debug!(target: LOG_TARGET, "Skipping unknown message type");
416						future::ready(None)
417					},
418				}
419			});
420
421		let (tx, out_rx) = mpsc::channel(0);
422		let outgoing = OutgoingMessages::<B> {
423			keystore,
424			round: round.0,
425			set_id: set_id.0,
426			network: self.gossip_engine.clone(),
427			sender: tx,
428			has_voted,
429			telemetry: self.telemetry.clone(),
430		};
431
432		// Combine incoming votes from external GRANDPA nodes with outgoing
433		// votes from our own GRANDPA voter to have a single
434		// vote-import-pipeline.
435		let incoming = stream::select(incoming, out_rx);
436
437		(incoming, outgoing)
438	}
439
440	/// Set up the global communication streams.
441	pub(crate) fn global_communication(
442		&self,
443		set_id: SetId,
444		voters: Arc<VoterSet<AuthorityId>>,
445		is_voter: bool,
446	) -> (
447		impl Stream<Item = CommunicationIn<B>>,
448		impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin,
449	) {
450		self.validator.note_set(
451			set_id,
452			voters.iter().map(|(v, _)| v.clone()).collect(),
453			|to, neighbor| self.neighbor_sender.send(to, neighbor),
454		);
455
456		let topic = global_topic::<B>(set_id.0);
457		let incoming = incoming_global(
458			self.gossip_engine.clone(),
459			topic,
460			voters,
461			self.validator.clone(),
462			self.neighbor_sender.clone(),
463			self.telemetry.clone(),
464		);
465
466		let outgoing = CommitsOut::<B>::new(
467			self.gossip_engine.clone(),
468			set_id.0,
469			is_voter,
470			self.validator.clone(),
471			self.neighbor_sender.clone(),
472			self.telemetry.clone(),
473		);
474
475		let outgoing = outgoing.with(|out| {
476			let voter::CommunicationOut::Commit(round, commit) = out;
477			future::ok((round, commit))
478		});
479
480		(incoming, outgoing)
481	}
482
483	/// Notifies the sync service to try and sync the given block from the given
484	/// peers.
485	///
486	/// If the given vector of peers is empty then the underlying implementation
487	/// should make a best effort to fetch the block from any peers it is
488	/// connected to (NOTE: this assumption will change in the future #3629).
489	pub(crate) fn set_sync_fork_request(
490		&self,
491		peers: Vec<sc_network_types::PeerId>,
492		hash: B::Hash,
493		number: NumberFor<B>,
494	) {
495		self.sync.set_sync_fork_request(peers, hash, number)
496	}
497}
498
499impl<B: BlockT, N: Network<B>, S: Syncing<B>> Future for NetworkBridge<B, N, S> {
500	type Output = Result<(), Error>;
501
502	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
503		loop {
504			match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
505				Poll::Ready(Some((to, packet))) => {
506					self.gossip_engine.lock().send_message(to, packet.encode());
507				},
508				Poll::Ready(None) =>
509					return Poll::Ready(Err(Error::Network(
510						"Neighbor packet worker stream closed.".into(),
511					))),
512				Poll::Pending => break,
513			}
514		}
515
516		loop {
517			match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
518				Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
519					self.gossip_engine.lock().report(who, cost_benefit);
520				},
521				Poll::Ready(None) =>
522					return Poll::Ready(Err(Error::Network(
523						"Gossip validator report stream closed.".into(),
524					))),
525				Poll::Pending => break,
526			}
527		}
528
529		match self.gossip_engine.lock().poll_unpin(cx) {
530			Poll::Ready(()) =>
531				return Poll::Ready(Err(Error::Network("Gossip engine future finished.".into()))),
532			Poll::Pending => {},
533		}
534
535		Poll::Pending
536	}
537}
538
539fn incoming_global<B: BlockT>(
540	gossip_engine: Arc<Mutex<GossipEngine<B>>>,
541	topic: B::Hash,
542	voters: Arc<VoterSet<AuthorityId>>,
543	gossip_validator: Arc<GossipValidator<B>>,
544	neighbor_sender: periodic::NeighborPacketSender<B>,
545	telemetry: Option<TelemetryHandle>,
546) -> impl Stream<Item = CommunicationIn<B>> {
547	let process_commit = {
548		let telemetry = telemetry.clone();
549		move |msg: FullCommitMessage<B>,
550		      mut notification: sc_network_gossip::TopicNotification,
551		      gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
552		      gossip_validator: &Arc<GossipValidator<B>>,
553		      voters: &VoterSet<AuthorityId>| {
554			if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
555				let precommits_signed_by: Vec<String> =
556					msg.message.auth_data.iter().map(move |(_, a)| format!("{}", a)).collect();
557
558				telemetry!(
559					telemetry;
560					CONSENSUS_INFO;
561					"afg.received_commit";
562					"contains_precommits_signed_by" => ?precommits_signed_by,
563					"target_number" => ?msg.message.target_number.clone(),
564					"target_hash" => ?msg.message.target_hash.clone(),
565				);
566			}
567
568			if let Err(cost) = check_compact_commit::<B>(
569				&msg.message,
570				voters,
571				msg.round,
572				msg.set_id,
573				telemetry.as_ref(),
574			) {
575				if let Some(who) = notification.sender {
576					gossip_engine.lock().report(who, cost);
577				}
578
579				return None
580			}
581
582			let round = msg.round;
583			let set_id = msg.set_id;
584			let commit = msg.message;
585			let finalized_number = commit.target_number;
586			let gossip_validator = gossip_validator.clone();
587			let gossip_engine = gossip_engine.clone();
588			let neighbor_sender = neighbor_sender.clone();
589			let cb = move |outcome| match outcome {
590				voter::CommitProcessingOutcome::Good(_) => {
591					// if it checks out, gossip it. not accounting for
592					// any discrepancy between the actual ghost and the claimed
593					// finalized number.
594					gossip_validator.note_commit_finalized(
595						round,
596						set_id,
597						finalized_number,
598						|to, neighbor| neighbor_sender.send(to, neighbor),
599					);
600
601					gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
602				},
603				voter::CommitProcessingOutcome::Bad(_) => {
604					// report peer and do not gossip.
605					if let Some(who) = notification.sender.take() {
606						gossip_engine.lock().report(who, cost::INVALID_COMMIT);
607					}
608				},
609			};
610
611			let cb = voter::Callback::Work(Box::new(cb));
612
613			Some(voter::CommunicationIn::Commit(round.0, commit, cb))
614		}
615	};
616
617	let process_catch_up = move |msg: FullCatchUpMessage<B>,
618	                             mut notification: sc_network_gossip::TopicNotification,
619	                             gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
620	                             gossip_validator: &Arc<GossipValidator<B>>,
621	                             voters: &VoterSet<AuthorityId>| {
622		let gossip_validator = gossip_validator.clone();
623		let gossip_engine = gossip_engine.clone();
624
625		if let Err(cost) = check_catch_up::<B>(&msg.message, voters, msg.set_id, telemetry.clone())
626		{
627			if let Some(who) = notification.sender {
628				gossip_engine.lock().report(who, cost);
629			}
630
631			return None
632		}
633
634		let cb = move |outcome| {
635			if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
636				// report peer
637				if let Some(who) = notification.sender.take() {
638					gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
639				}
640			}
641
642			gossip_validator.note_catch_up_message_processed();
643		};
644
645		let cb = voter::Callback::Work(Box::new(cb));
646
647		Some(voter::CommunicationIn::CatchUp(msg.message, cb))
648	};
649
650	gossip_engine
651		.clone()
652		.lock()
653		.messages_for(topic)
654		.filter_map(|notification| {
655			// this could be optimized by decoding piecewise.
656			let decoded = GossipMessage::<B>::decode_all(&mut &notification.message[..]);
657			if let Err(ref e) = decoded {
658				trace!(
659					target: LOG_TARGET,
660					"Skipping malformed commit message {:?}: {}",
661					notification,
662					e
663				);
664			}
665			future::ready(decoded.map(move |d| (notification, d)).ok())
666		})
667		.filter_map(move |(notification, msg)| {
668			future::ready(match msg {
669				GossipMessage::Commit(msg) =>
670					process_commit(msg, notification, &gossip_engine, &gossip_validator, &voters),
671				GossipMessage::CatchUp(msg) =>
672					process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &voters),
673				_ => {
674					debug!(target: LOG_TARGET, "Skipping unknown message type");
675					None
676				},
677			})
678		})
679}
680
681impl<B: BlockT, N: Network<B>, S: Syncing<B>> Clone for NetworkBridge<B, N, S> {
682	fn clone(&self) -> Self {
683		NetworkBridge {
684			service: self.service.clone(),
685			sync: self.sync.clone(),
686			gossip_engine: self.gossip_engine.clone(),
687			validator: Arc::clone(&self.validator),
688			neighbor_sender: self.neighbor_sender.clone(),
689			neighbor_packet_worker: self.neighbor_packet_worker.clone(),
690			gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
691			telemetry: self.telemetry.clone(),
692		}
693	}
694}
695
696/// Type-safe wrapper around a round number.
697#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
698pub struct Round(pub RoundNumber);
699
700/// Type-safe wrapper around a set ID.
701#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
702pub struct SetId(pub SetIdNumber);
703
704/// A sink for outgoing messages to the network. Any messages that are sent will
705/// be replaced, as appropriate, according to the given `HasVoted`.
706/// NOTE: The votes are stored unsigned, which means that the signatures need to
707/// be "stable", i.e. we should end up with the exact same signed message if we
708/// use the same raw message and key to sign. This is currently true for
709/// `ed25519` and `BLS` signatures (which we might use in the future), care must
710/// be taken when switching to different key types.
711pub(crate) struct OutgoingMessages<Block: BlockT> {
712	round: RoundNumber,
713	set_id: SetIdNumber,
714	keystore: Option<LocalIdKeystore>,
715	sender: mpsc::Sender<SignedMessage<Block::Header>>,
716	network: Arc<Mutex<GossipEngine<Block>>>,
717	has_voted: HasVoted<Block::Header>,
718	telemetry: Option<TelemetryHandle>,
719}
720
721impl<B: BlockT> Unpin for OutgoingMessages<B> {}
722
723impl<Block: BlockT> Sink<Message<Block::Header>> for OutgoingMessages<Block> {
724	type Error = Error;
725
726	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
727		Sink::poll_ready(Pin::new(&mut self.sender), cx).map(|elem| {
728			elem.map_err(|e| {
729				Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
730			})
731		})
732	}
733
734	fn start_send(
735		mut self: Pin<&mut Self>,
736		mut msg: Message<Block::Header>,
737	) -> Result<(), Self::Error> {
738		// if we've voted on this round previously under the same key, send that vote instead
739		match &mut msg {
740			finality_grandpa::Message::PrimaryPropose(ref mut vote) => {
741				if let Some(propose) = self.has_voted.propose() {
742					*vote = propose.clone();
743				}
744			},
745			finality_grandpa::Message::Prevote(ref mut vote) => {
746				if let Some(prevote) = self.has_voted.prevote() {
747					*vote = prevote.clone();
748				}
749			},
750			finality_grandpa::Message::Precommit(ref mut vote) => {
751				if let Some(precommit) = self.has_voted.precommit() {
752					*vote = precommit.clone();
753				}
754			},
755		}
756
757		// when locals exist, sign messages on import
758		if let Some(ref keystore) = self.keystore {
759			let target_hash = *(msg.target().0);
760			let signed = sp_consensus_grandpa::sign_message(
761				keystore.keystore(),
762				msg,
763				keystore.local_id().clone(),
764				self.round,
765				self.set_id,
766			)
767			.ok_or_else(|| {
768				Error::Signing(format!(
769					"Failed to sign GRANDPA vote for round {} targeting {:?}",
770					self.round, target_hash
771				))
772			})?;
773
774			let message = GossipMessage::Vote(VoteMessage::<Block> {
775				message: signed.clone(),
776				round: Round(self.round),
777				set_id: SetId(self.set_id),
778			});
779
780			debug!(
781				target: LOG_TARGET,
782				"Announcing block {} to peers which we voted on in round {} in set {}",
783				target_hash,
784				self.round,
785				self.set_id,
786			);
787
788			telemetry!(
789				self.telemetry;
790				CONSENSUS_DEBUG;
791				"afg.announcing_blocks_to_voted_peers";
792				"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
793			);
794
795			// announce the block we voted on to our peers.
796			self.network.lock().announce(target_hash, None);
797
798			// propagate the message to peers
799			let topic = round_topic::<Block>(self.round, self.set_id);
800			self.network.lock().gossip_message(topic, message.encode(), false);
801
802			// forward the message to the inner sender.
803			return self.sender.start_send(signed).map_err(|e| {
804				Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
805			})
806		};
807
808		Ok(())
809	}
810
811	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
812		Poll::Ready(Ok(()))
813	}
814
815	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
816		Sink::poll_close(Pin::new(&mut self.sender), cx).map(|elem| {
817			elem.map_err(|e| {
818				Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
819			})
820		})
821	}
822}
823
824// checks a compact commit. returns the cost associated with processing it if
825// the commit was bad.
826fn check_compact_commit<Block: BlockT>(
827	msg: &CompactCommit<Block::Header>,
828	voters: &VoterSet<AuthorityId>,
829	round: Round,
830	set_id: SetId,
831	telemetry: Option<&TelemetryHandle>,
832) -> Result<(), ReputationChange> {
833	// 4f + 1 = equivocations from f voters.
834	let f = voters.total_weight() - voters.threshold();
835	let full_threshold = (f + voters.total_weight()).0;
836
837	// check total weight is not out of range.
838	let mut total_weight = 0;
839	for (_, ref id) in &msg.auth_data {
840		if let Some(weight) = voters.get(id).map(|info| info.weight()) {
841			total_weight += weight.get();
842			if total_weight > full_threshold {
843				return Err(cost::MALFORMED_COMMIT)
844			}
845		} else {
846			debug!(target: LOG_TARGET, "Skipping commit containing unknown voter {}", id);
847			return Err(cost::MALFORMED_COMMIT)
848		}
849	}
850
851	if total_weight < voters.threshold().get() {
852		return Err(cost::MALFORMED_COMMIT)
853	}
854
855	// check signatures on all contained precommits.
856	let mut buf = Vec::new();
857	for (i, (precommit, (sig, id))) in msg.precommits.iter().zip(&msg.auth_data).enumerate() {
858		use crate::communication::gossip::Misbehavior;
859		use finality_grandpa::Message as GrandpaMessage;
860
861		if !sp_consensus_grandpa::check_message_signature_with_buffer(
862			&GrandpaMessage::Precommit(precommit.clone()),
863			id,
864			sig,
865			round.0,
866			set_id.0,
867			&mut buf,
868		)
869		.is_valid()
870		{
871			debug!(target: LOG_TARGET, "Bad commit message signature {}", id);
872			telemetry!(
873				telemetry;
874				CONSENSUS_DEBUG;
875				"afg.bad_commit_msg_signature";
876				"id" => ?id,
877			);
878			let cost = Misbehavior::BadCommitMessage {
879				signatures_checked: i as i32,
880				blocks_loaded: 0,
881				equivocations_caught: 0,
882			}
883			.cost();
884
885			return Err(cost)
886		}
887	}
888
889	Ok(())
890}
891
892// checks a catch up. returns the cost associated with processing it if
893// the catch up was bad.
894fn check_catch_up<Block: BlockT>(
895	msg: &CatchUp<Block::Header>,
896	voters: &VoterSet<AuthorityId>,
897	set_id: SetId,
898	telemetry: Option<TelemetryHandle>,
899) -> Result<(), ReputationChange> {
900	// 4f + 1 = equivocations from f voters.
901	let f = voters.total_weight() - voters.threshold();
902	let full_threshold = (f + voters.total_weight()).0;
903
904	// check total weight is not out of range for a set of votes.
905	fn check_weight<'a>(
906		voters: &'a VoterSet<AuthorityId>,
907		votes: impl Iterator<Item = &'a AuthorityId>,
908		full_threshold: u64,
909	) -> Result<(), ReputationChange> {
910		let mut total_weight = 0;
911
912		for id in votes {
913			if let Some(weight) = voters.get(id).map(|info| info.weight()) {
914				total_weight += weight.get();
915				if total_weight > full_threshold {
916					return Err(cost::MALFORMED_CATCH_UP)
917				}
918			} else {
919				debug!(
920					target: LOG_TARGET,
921					"Skipping catch up message containing unknown voter {}", id
922				);
923				return Err(cost::MALFORMED_CATCH_UP)
924			}
925		}
926
927		if total_weight < voters.threshold().get() {
928			return Err(cost::MALFORMED_CATCH_UP)
929		}
930
931		Ok(())
932	}
933
934	check_weight(voters, msg.prevotes.iter().map(|vote| &vote.id), full_threshold)?;
935
936	check_weight(voters, msg.precommits.iter().map(|vote| &vote.id), full_threshold)?;
937
938	fn check_signatures<'a, B, I>(
939		messages: I,
940		round: RoundNumber,
941		set_id: SetIdNumber,
942		mut signatures_checked: usize,
943		buf: &mut Vec<u8>,
944		telemetry: Option<TelemetryHandle>,
945	) -> Result<usize, ReputationChange>
946	where
947		B: BlockT,
948		I: Iterator<Item = (Message<B::Header>, &'a AuthorityId, &'a AuthoritySignature)>,
949	{
950		use crate::communication::gossip::Misbehavior;
951
952		for (msg, id, sig) in messages {
953			signatures_checked += 1;
954
955			if !sp_consensus_grandpa::check_message_signature_with_buffer(
956				&msg, id, sig, round, set_id, buf,
957			)
958			.is_valid()
959			{
960				debug!(target: LOG_TARGET, "Bad catch up message signature {}", id);
961				telemetry!(
962					telemetry;
963					CONSENSUS_DEBUG;
964					"afg.bad_catch_up_msg_signature";
965					"id" => ?id,
966				);
967
968				let cost = Misbehavior::BadCatchUpMessage {
969					signatures_checked: signatures_checked as i32,
970				}
971				.cost();
972
973				return Err(cost)
974			}
975		}
976
977		Ok(signatures_checked)
978	}
979
980	let mut buf = Vec::new();
981
982	// check signatures on all contained prevotes.
983	let signatures_checked = check_signatures::<Block, _>(
984		msg.prevotes.iter().map(|vote| {
985			(finality_grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature)
986		}),
987		msg.round_number,
988		set_id.0,
989		0,
990		&mut buf,
991		telemetry.clone(),
992	)?;
993
994	// check signatures on all contained precommits.
995	check_signatures::<Block, _>(
996		msg.precommits.iter().map(|vote| {
997			(
998				finality_grandpa::Message::Precommit(vote.precommit.clone()),
999				&vote.id,
1000				&vote.signature,
1001			)
1002		}),
1003		msg.round_number,
1004		set_id.0,
1005		signatures_checked,
1006		&mut buf,
1007		telemetry,
1008	)?;
1009
1010	Ok(())
1011}
1012
1013/// An output sink for commit messages.
1014struct CommitsOut<Block: BlockT> {
1015	network: Arc<Mutex<GossipEngine<Block>>>,
1016	set_id: SetId,
1017	is_voter: bool,
1018	gossip_validator: Arc<GossipValidator<Block>>,
1019	neighbor_sender: periodic::NeighborPacketSender<Block>,
1020	telemetry: Option<TelemetryHandle>,
1021}
1022
1023impl<Block: BlockT> CommitsOut<Block> {
1024	/// Create a new commit output stream.
1025	pub(crate) fn new(
1026		network: Arc<Mutex<GossipEngine<Block>>>,
1027		set_id: SetIdNumber,
1028		is_voter: bool,
1029		gossip_validator: Arc<GossipValidator<Block>>,
1030		neighbor_sender: periodic::NeighborPacketSender<Block>,
1031		telemetry: Option<TelemetryHandle>,
1032	) -> Self {
1033		CommitsOut {
1034			network,
1035			set_id: SetId(set_id),
1036			is_voter,
1037			gossip_validator,
1038			neighbor_sender,
1039			telemetry,
1040		}
1041	}
1042}
1043
1044impl<Block: BlockT> Sink<(RoundNumber, Commit<Block::Header>)> for CommitsOut<Block> {
1045	type Error = Error;
1046
1047	fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1048		Poll::Ready(Ok(()))
1049	}
1050
1051	fn start_send(
1052		self: Pin<&mut Self>,
1053		input: (RoundNumber, Commit<Block::Header>),
1054	) -> Result<(), Self::Error> {
1055		if !self.is_voter {
1056			return Ok(())
1057		}
1058
1059		let (round, commit) = input;
1060		let round = Round(round);
1061
1062		telemetry!(
1063			self.telemetry;
1064			CONSENSUS_DEBUG;
1065			"afg.commit_issued";
1066			"target_number" => ?commit.target_number,
1067			"target_hash" => ?commit.target_hash,
1068		);
1069		let (precommits, auth_data) = commit
1070			.precommits
1071			.into_iter()
1072			.map(|signed| (signed.precommit, (signed.signature, signed.id)))
1073			.unzip();
1074
1075		let compact_commit = CompactCommit::<Block::Header> {
1076			target_hash: commit.target_hash,
1077			target_number: commit.target_number,
1078			precommits,
1079			auth_data,
1080		};
1081
1082		let message = GossipMessage::Commit(FullCommitMessage::<Block> {
1083			round,
1084			set_id: self.set_id,
1085			message: compact_commit,
1086		});
1087
1088		let topic = global_topic::<Block>(self.set_id.0);
1089
1090		// the gossip validator needs to be made aware of the best commit-height we know of
1091		// before gossiping
1092		self.gossip_validator.note_commit_finalized(
1093			round,
1094			self.set_id,
1095			commit.target_number,
1096			|to, neighbor| self.neighbor_sender.send(to, neighbor),
1097		);
1098		self.network.lock().gossip_message(topic, message.encode(), false);
1099
1100		Ok(())
1101	}
1102
1103	fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1104		Poll::Ready(Ok(()))
1105	}
1106
1107	fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1108		Poll::Ready(Ok(()))
1109	}
1110}