1use futures::{channel::mpsc, prelude::*};
33use log::{debug, trace};
34use parking_lot::Mutex;
35use prometheus_endpoint::Registry;
36use std::{
37 pin::Pin,
38 sync::Arc,
39 task::{Context, Poll},
40 time::Duration,
41};
42
43use codec::{Decode, DecodeAll, Encode};
44use finality_grandpa::{
45 voter,
46 voter_set::VoterSet,
47 Message::{Precommit, Prevote, PrimaryPropose},
48};
49use sc_network::{NetworkBlock, NetworkSyncForkRequest, NotificationService, ReputationChange};
50use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
51use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
52use sp_keystore::KeystorePtr;
53use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
54
55use crate::{
56 environment::HasVoted, CatchUp, Commit, CommunicationIn, CommunicationOutH, CompactCommit,
57 Error, Message, SignedMessage, LOG_TARGET,
58};
59use gossip::{
60 FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
61};
62use sc_network_sync::SyncEventStream;
63use sc_utils::mpsc::TracingUnboundedReceiver;
64use sp_consensus_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};
65
66pub mod gossip;
67mod periodic;
68
69#[cfg(test)]
70pub(crate) mod tests;
71
72pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 * 60);
74
75pub mod grandpa_protocol_name {
76 use sc_chain_spec::ChainSpec;
77 use sc_network::types::ProtocolName;
78
79 pub(crate) const NAME: &str = "/grandpa/1";
80 pub(crate) const LEGACY_NAMES: [&str; 1] = ["/paritytech/grandpa/1"];
82
83 pub fn standard_name<Hash: AsRef<[u8]>>(
87 genesis_hash: &Hash,
88 chain_spec: &Box<dyn ChainSpec>,
89 ) -> ProtocolName {
90 let genesis_hash = genesis_hash.as_ref();
91 let chain_prefix = match chain_spec.fork_id() {
92 Some(fork_id) => format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), fork_id),
93 None => format!("/{}", array_bytes::bytes2hex("", genesis_hash)),
94 };
95 format!("{}{}", chain_prefix, NAME).into()
96 }
97}
98
99mod cost {
101 use sc_network::ReputationChange as Rep;
102 pub(super) const PAST_REJECTION: Rep = Rep::new(-50, "Grandpa: Past message");
103 pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "Grandpa: Bad signature");
104 pub(super) const MALFORMED_CATCH_UP: Rep = Rep::new(-1000, "Grandpa: Malformed cath-up");
105 pub(super) const MALFORMED_COMMIT: Rep = Rep::new(-1000, "Grandpa: Malformed commit");
106 pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Future message");
107 pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");
108
109 pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
110 pub(super) const DUPLICATE_NEIGHBOR_MESSAGE: Rep =
111 Rep::new(-500, "Grandpa: Duplicate neighbor message without grace period");
112 pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
113 pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
114 pub(super) const PER_BLOCK_LOADED: i32 = -10;
115 pub(super) const INVALID_CATCH_UP: Rep = Rep::new(-5000, "Grandpa: Invalid catch-up");
116 pub(super) const INVALID_COMMIT: Rep = Rep::new(-5000, "Grandpa: Invalid commit");
117 pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Out-of-scope message");
118 pub(super) const CATCH_UP_REQUEST_TIMEOUT: Rep =
119 Rep::new(-200, "Grandpa: Catch-up request timeout");
120
121 pub(super) const CATCH_UP_REPLY: Rep = Rep::new(-200, "Grandpa: Catch-up reply");
123 pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: Rep =
124 Rep::new(-200, "Grandpa: Out-of-scope catch-up");
125}
126
127mod benefit {
129 use sc_network::ReputationChange as Rep;
130 pub(super) const NEIGHBOR_MESSAGE: Rep = Rep::new(100, "Grandpa: Neighbor message");
131 pub(super) const ROUND_MESSAGE: Rep = Rep::new(100, "Grandpa: Round message");
132 pub(super) const BASIC_VALIDATED_CATCH_UP: Rep = Rep::new(200, "Grandpa: Catch-up message");
133 pub(super) const BASIC_VALIDATED_COMMIT: Rep = Rep::new(100, "Grandpa: Commit");
134 pub(super) const PER_EQUIVOCATION: i32 = 10;
135}
136
137pub struct LocalIdKeystore((AuthorityId, KeystorePtr));
140
141impl LocalIdKeystore {
142 fn local_id(&self) -> &AuthorityId {
144 &(self.0).0
145 }
146
147 fn keystore(&self) -> KeystorePtr {
149 (self.0).1.clone()
150 }
151}
152
153impl From<(AuthorityId, KeystorePtr)> for LocalIdKeystore {
154 fn from(inner: (AuthorityId, KeystorePtr)) -> LocalIdKeystore {
155 LocalIdKeystore(inner)
156 }
157}
158
159const TELEMETRY_VOTERS_LIMIT: usize = 10;
163
164pub trait Network<Block: BlockT>: GossipNetwork<Block> + Clone + Send + 'static {}
168
169impl<Block, T> Network<Block> for T
170where
171 Block: BlockT,
172 T: GossipNetwork<Block> + Clone + Send + 'static,
173{
174}
175
176pub trait Syncing<Block: BlockT>:
180 NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
181 + NetworkBlock<Block::Hash, NumberFor<Block>>
182 + SyncEventStream
183 + Clone
184 + Send
185 + 'static
186{
187}
188
189impl<Block, T> Syncing<Block> for T
190where
191 Block: BlockT,
192 T: NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
193 + NetworkBlock<Block::Hash, NumberFor<Block>>
194 + SyncEventStream
195 + Clone
196 + Send
197 + 'static,
198{
199}
200
201pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
203 <<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
204}
205
206pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
208 <<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
209}
210
211pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>, S: Syncing<B>> {
213 service: N,
214 sync: S,
215 gossip_engine: Arc<Mutex<GossipEngine<B>>>,
216 validator: Arc<GossipValidator<B>>,
217
218 neighbor_sender: periodic::NeighborPacketSender<B>,
223
224 neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
228
229 gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
236
237 telemetry: Option<TelemetryHandle>,
238}
239
240impl<B: BlockT, N: Network<B>, S: Syncing<B>> Unpin for NetworkBridge<B, N, S> {}
241
242impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
243 pub(crate) fn new(
248 service: N,
249 sync: S,
250 notification_service: Box<dyn NotificationService>,
251 config: crate::Config,
252 set_state: crate::environment::SharedVoterSetState<B>,
253 prometheus_registry: Option<&Registry>,
254 telemetry: Option<TelemetryHandle>,
255 ) -> Self {
256 let protocol = config.protocol_name.clone();
257 let (validator, report_stream) =
258 GossipValidator::new(config, set_state.clone(), prometheus_registry, telemetry.clone());
259
260 let validator = Arc::new(validator);
261 let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
262 service.clone(),
263 sync.clone(),
264 notification_service,
265 protocol,
266 validator.clone(),
267 prometheus_registry,
268 )));
269
270 {
271 let completed = set_state.read().completed_rounds();
274 let (set_id, voters) = completed.set_info();
275 validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
276 for round in completed.iter() {
277 let topic = round_topic::<B>(round.number, set_id);
278
279 validator.note_round(Round(round.number), |_, _| {});
282
283 for signed in round.votes.iter() {
284 let message = gossip::GossipMessage::Vote(gossip::VoteMessage::<B> {
285 message: signed.clone(),
286 round: Round(round.number),
287 set_id: SetId(set_id),
288 });
289
290 gossip_engine.lock().register_gossip_message(topic, message.encode());
291 }
292
293 trace!(
294 target: LOG_TARGET,
295 "Registered {} messages for topic {:?} (round: {}, set_id: {})",
296 round.votes.len(),
297 topic,
298 round.number,
299 set_id,
300 );
301 }
302 }
303
304 let (neighbor_packet_worker, neighbor_packet_sender) =
305 periodic::NeighborPacketWorker::new(NEIGHBOR_REBROADCAST_PERIOD);
306
307 NetworkBridge {
308 service,
309 sync,
310 gossip_engine,
311 validator,
312 neighbor_sender: neighbor_packet_sender,
313 neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
314 gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
315 telemetry,
316 }
317 }
318
319 pub(crate) fn note_round(&self, round: Round, set_id: SetId, voters: &VoterSet<AuthorityId>) {
321 self.validator.note_set(
323 set_id,
324 voters.iter().map(|(v, _)| v.clone()).collect(),
325 |to, neighbor| self.neighbor_sender.send(to, neighbor),
326 );
327
328 self.validator
329 .note_round(round, |to, neighbor| self.neighbor_sender.send(to, neighbor));
330 }
331
332 pub(crate) fn round_communication(
335 &self,
336 keystore: Option<LocalIdKeystore>,
337 round: Round,
338 set_id: SetId,
339 voters: Arc<VoterSet<AuthorityId>>,
340 has_voted: HasVoted<B::Header>,
341 ) -> (impl Stream<Item = SignedMessage<B::Header>> + Unpin, OutgoingMessages<B>) {
342 self.note_round(round, set_id, &voters);
343
344 let keystore = keystore.and_then(|ks| {
345 let id = ks.local_id();
346 if voters.contains(id) {
347 Some(ks)
348 } else {
349 None
350 }
351 });
352
353 let topic = round_topic::<B>(round.0, set_id.0);
354 let telemetry = self.telemetry.clone();
355 let incoming =
356 self.gossip_engine.lock().messages_for(topic).filter_map(move |notification| {
357 let decoded = GossipMessage::<B>::decode_all(&mut ¬ification.message[..]);
358
359 match decoded {
360 Err(ref e) => {
361 debug!(
362 target: LOG_TARGET,
363 "Skipping malformed message {:?}: {}", notification, e
364 );
365 future::ready(None)
366 },
367 Ok(GossipMessage::Vote(msg)) => {
368 if !voters.contains(&msg.message.id) {
370 debug!(
371 target: LOG_TARGET,
372 "Skipping message from unknown voter {}", msg.message.id
373 );
374 return future::ready(None);
375 }
376
377 if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
378 match &msg.message.message {
379 PrimaryPropose(propose) => {
380 telemetry!(
381 telemetry;
382 CONSENSUS_INFO;
383 "afg.received_propose";
384 "voter" => ?format!("{}", msg.message.id),
385 "target_number" => ?propose.target_number,
386 "target_hash" => ?propose.target_hash,
387 );
388 },
389 Prevote(prevote) => {
390 telemetry!(
391 telemetry;
392 CONSENSUS_INFO;
393 "afg.received_prevote";
394 "voter" => ?format!("{}", msg.message.id),
395 "target_number" => ?prevote.target_number,
396 "target_hash" => ?prevote.target_hash,
397 );
398 },
399 Precommit(precommit) => {
400 telemetry!(
401 telemetry;
402 CONSENSUS_INFO;
403 "afg.received_precommit";
404 "voter" => ?format!("{}", msg.message.id),
405 "target_number" => ?precommit.target_number,
406 "target_hash" => ?precommit.target_hash,
407 );
408 },
409 };
410 }
411
412 future::ready(Some(msg.message))
413 },
414 _ => {
415 debug!(target: LOG_TARGET, "Skipping unknown message type");
416 future::ready(None)
417 },
418 }
419 });
420
421 let (tx, out_rx) = mpsc::channel(0);
422 let outgoing = OutgoingMessages::<B> {
423 keystore,
424 round: round.0,
425 set_id: set_id.0,
426 network: self.gossip_engine.clone(),
427 sender: tx,
428 has_voted,
429 telemetry: self.telemetry.clone(),
430 };
431
432 let incoming = stream::select(incoming, out_rx);
436
437 (incoming, outgoing)
438 }
439
440 pub(crate) fn global_communication(
442 &self,
443 set_id: SetId,
444 voters: Arc<VoterSet<AuthorityId>>,
445 is_voter: bool,
446 ) -> (
447 impl Stream<Item = CommunicationIn<B>>,
448 impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin,
449 ) {
450 self.validator.note_set(
451 set_id,
452 voters.iter().map(|(v, _)| v.clone()).collect(),
453 |to, neighbor| self.neighbor_sender.send(to, neighbor),
454 );
455
456 let topic = global_topic::<B>(set_id.0);
457 let incoming = incoming_global(
458 self.gossip_engine.clone(),
459 topic,
460 voters,
461 self.validator.clone(),
462 self.neighbor_sender.clone(),
463 self.telemetry.clone(),
464 );
465
466 let outgoing = CommitsOut::<B>::new(
467 self.gossip_engine.clone(),
468 set_id.0,
469 is_voter,
470 self.validator.clone(),
471 self.neighbor_sender.clone(),
472 self.telemetry.clone(),
473 );
474
475 let outgoing = outgoing.with(|out| {
476 let voter::CommunicationOut::Commit(round, commit) = out;
477 future::ok((round, commit))
478 });
479
480 (incoming, outgoing)
481 }
482
483 pub(crate) fn gossip_commit(
485 &self,
486 round: RoundNumber,
487 set_id: SetIdNumber,
488 commit: Commit<B::Header>,
489 ) {
490 let target_number = commit.target_number;
491 let round = Round(round);
492 let set_id = SetId(set_id);
493 let topic = global_topic::<B>(set_id.0);
494 let encoded = encode_commit_message::<B>(round, set_id, commit);
495
496 self.validator
497 .note_commit_finalized(round, set_id, target_number, |to, neighbor| {
498 self.neighbor_sender.send(to, neighbor)
499 });
500 self.gossip_engine.lock().gossip_message(topic, encoded, false);
501 }
502
503 pub(crate) fn set_sync_fork_request(
510 &self,
511 peers: Vec<sc_network_types::PeerId>,
512 hash: B::Hash,
513 number: NumberFor<B>,
514 ) {
515 self.sync.set_sync_fork_request(peers, hash, number)
516 }
517}
518
519impl<B: BlockT, N: Network<B>, S: Syncing<B>> Future for NetworkBridge<B, N, S> {
520 type Output = Result<(), Error>;
521
522 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
523 loop {
524 match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
525 Poll::Ready(Some((to, packet))) => {
526 self.gossip_engine.lock().send_message(to, packet.encode());
527 },
528 Poll::Ready(None) => {
529 return Poll::Ready(Err(Error::Network(
530 "Neighbor packet worker stream closed.".into(),
531 )))
532 },
533 Poll::Pending => break,
534 }
535 }
536
537 loop {
538 match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
539 Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
540 self.gossip_engine.lock().report(who, cost_benefit);
541 },
542 Poll::Ready(None) => {
543 return Poll::Ready(Err(Error::Network(
544 "Gossip validator report stream closed.".into(),
545 )))
546 },
547 Poll::Pending => break,
548 }
549 }
550
551 match self.gossip_engine.lock().poll_unpin(cx) {
552 Poll::Ready(()) => {
553 return Poll::Ready(Err(Error::Network("Gossip engine future finished.".into())))
554 },
555 Poll::Pending => {},
556 }
557
558 Poll::Pending
559 }
560}
561
562fn incoming_global<B: BlockT>(
563 gossip_engine: Arc<Mutex<GossipEngine<B>>>,
564 topic: B::Hash,
565 voters: Arc<VoterSet<AuthorityId>>,
566 gossip_validator: Arc<GossipValidator<B>>,
567 neighbor_sender: periodic::NeighborPacketSender<B>,
568 telemetry: Option<TelemetryHandle>,
569) -> impl Stream<Item = CommunicationIn<B>> {
570 let process_commit = {
571 let telemetry = telemetry.clone();
572 move |msg: FullCommitMessage<B>,
573 mut notification: sc_network_gossip::TopicNotification,
574 gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
575 gossip_validator: &Arc<GossipValidator<B>>,
576 voters: &VoterSet<AuthorityId>| {
577 if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
578 let precommits_signed_by: Vec<String> =
579 msg.message.auth_data.iter().map(move |(_, a)| format!("{}", a)).collect();
580
581 telemetry!(
582 telemetry;
583 CONSENSUS_INFO;
584 "afg.received_commit";
585 "contains_precommits_signed_by" => ?precommits_signed_by,
586 "target_number" => ?msg.message.target_number.clone(),
587 "target_hash" => ?msg.message.target_hash.clone(),
588 );
589 }
590
591 if let Err(cost) = check_compact_commit::<B>(
592 &msg.message,
593 voters,
594 msg.round,
595 msg.set_id,
596 telemetry.as_ref(),
597 ) {
598 if let Some(who) = notification.sender {
599 gossip_engine.lock().report(who, cost);
600 }
601
602 return None;
603 }
604
605 let round = msg.round;
606 let set_id = msg.set_id;
607 let commit = msg.message;
608 let finalized_number = commit.target_number;
609 let gossip_validator = gossip_validator.clone();
610 let gossip_engine = gossip_engine.clone();
611 let neighbor_sender = neighbor_sender.clone();
612 let cb = move |outcome| match outcome {
613 voter::CommitProcessingOutcome::Good(_) => {
614 gossip_validator.note_commit_finalized(
618 round,
619 set_id,
620 finalized_number,
621 |to, neighbor| neighbor_sender.send(to, neighbor),
622 );
623
624 gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
625 },
626 voter::CommitProcessingOutcome::Bad(_) => {
627 if let Some(who) = notification.sender.take() {
629 gossip_engine.lock().report(who, cost::INVALID_COMMIT);
630 }
631 },
632 };
633
634 let cb = voter::Callback::Work(Box::new(cb));
635
636 Some(voter::CommunicationIn::Commit(round.0, commit, cb))
637 }
638 };
639
640 let process_catch_up = move |msg: FullCatchUpMessage<B>,
641 mut notification: sc_network_gossip::TopicNotification,
642 gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
643 gossip_validator: &Arc<GossipValidator<B>>,
644 voters: &VoterSet<AuthorityId>| {
645 let gossip_validator = gossip_validator.clone();
646 let gossip_engine = gossip_engine.clone();
647
648 if let Err(cost) = check_catch_up::<B>(&msg.message, voters, msg.set_id, telemetry.clone())
649 {
650 if let Some(who) = notification.sender {
651 gossip_engine.lock().report(who, cost);
652 }
653
654 return None;
655 }
656
657 let cb = move |outcome| {
658 if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
659 if let Some(who) = notification.sender.take() {
661 gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
662 }
663 }
664
665 gossip_validator.note_catch_up_message_processed();
666 };
667
668 let cb = voter::Callback::Work(Box::new(cb));
669
670 Some(voter::CommunicationIn::CatchUp(msg.message, cb))
671 };
672
673 gossip_engine
674 .clone()
675 .lock()
676 .messages_for(topic)
677 .filter_map(|notification| {
678 let decoded = GossipMessage::<B>::decode_all(&mut ¬ification.message[..]);
680 if let Err(ref e) = decoded {
681 trace!(
682 target: LOG_TARGET,
683 "Skipping malformed commit message {:?}: {}",
684 notification,
685 e
686 );
687 }
688 future::ready(decoded.map(move |d| (notification, d)).ok())
689 })
690 .filter_map(move |(notification, msg)| {
691 future::ready(match msg {
692 GossipMessage::Commit(msg) => {
693 process_commit(msg, notification, &gossip_engine, &gossip_validator, &voters)
694 },
695 GossipMessage::CatchUp(msg) => {
696 process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &voters)
697 },
698 _ => {
699 debug!(target: LOG_TARGET, "Skipping unknown message type");
700 None
701 },
702 })
703 })
704}
705
706impl<B: BlockT, N: Network<B>, S: Syncing<B>> Clone for NetworkBridge<B, N, S> {
707 fn clone(&self) -> Self {
708 NetworkBridge {
709 service: self.service.clone(),
710 sync: self.sync.clone(),
711 gossip_engine: self.gossip_engine.clone(),
712 validator: Arc::clone(&self.validator),
713 neighbor_sender: self.neighbor_sender.clone(),
714 neighbor_packet_worker: self.neighbor_packet_worker.clone(),
715 gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
716 telemetry: self.telemetry.clone(),
717 }
718 }
719}
720
721#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
723pub struct Round(pub RoundNumber);
724
725#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
727pub struct SetId(pub SetIdNumber);
728
729pub(crate) struct OutgoingMessages<Block: BlockT> {
737 round: RoundNumber,
738 set_id: SetIdNumber,
739 keystore: Option<LocalIdKeystore>,
740 sender: mpsc::Sender<SignedMessage<Block::Header>>,
741 network: Arc<Mutex<GossipEngine<Block>>>,
742 has_voted: HasVoted<Block::Header>,
743 telemetry: Option<TelemetryHandle>,
744}
745
746impl<B: BlockT> Unpin for OutgoingMessages<B> {}
747
748impl<Block: BlockT> Sink<Message<Block::Header>> for OutgoingMessages<Block> {
749 type Error = Error;
750
751 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
752 Sink::poll_ready(Pin::new(&mut self.sender), cx).map(|elem| {
753 elem.map_err(|e| {
754 Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
755 })
756 })
757 }
758
759 fn start_send(
760 mut self: Pin<&mut Self>,
761 mut msg: Message<Block::Header>,
762 ) -> Result<(), Self::Error> {
763 match &mut msg {
765 finality_grandpa::Message::PrimaryPropose(ref mut vote) => {
766 if let Some(propose) = self.has_voted.propose() {
767 *vote = propose.clone();
768 }
769 },
770 finality_grandpa::Message::Prevote(ref mut vote) => {
771 if let Some(prevote) = self.has_voted.prevote() {
772 *vote = prevote.clone();
773 }
774 },
775 finality_grandpa::Message::Precommit(ref mut vote) => {
776 if let Some(precommit) = self.has_voted.precommit() {
777 *vote = precommit.clone();
778 }
779 },
780 }
781
782 if let Some(ref keystore) = self.keystore {
784 let target_hash = *(msg.target().0);
785 let signed = sp_consensus_grandpa::sign_message(
786 keystore.keystore(),
787 msg,
788 keystore.local_id().clone(),
789 self.round,
790 self.set_id,
791 )
792 .ok_or_else(|| {
793 Error::Signing(format!(
794 "Failed to sign GRANDPA vote for round {} targeting {:?}",
795 self.round, target_hash
796 ))
797 })?;
798
799 let message = GossipMessage::Vote(VoteMessage::<Block> {
800 message: signed.clone(),
801 round: Round(self.round),
802 set_id: SetId(self.set_id),
803 });
804
805 debug!(
806 target: LOG_TARGET,
807 "Announcing block {} to peers which we voted on in round {} in set {}",
808 target_hash,
809 self.round,
810 self.set_id,
811 );
812
813 telemetry!(
814 self.telemetry;
815 CONSENSUS_DEBUG;
816 "afg.announcing_blocks_to_voted_peers";
817 "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
818 );
819
820 self.network.lock().announce(target_hash, None);
822
823 let topic = round_topic::<Block>(self.round, self.set_id);
825 self.network.lock().gossip_message(topic, message.encode(), false);
826
827 return self.sender.start_send(signed).map_err(|e| {
829 Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
830 });
831 };
832
833 Ok(())
834 }
835
836 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
837 Poll::Ready(Ok(()))
838 }
839
840 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
841 Sink::poll_close(Pin::new(&mut self.sender), cx).map(|elem| {
842 elem.map_err(|e| {
843 Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
844 })
845 })
846 }
847}
848
849fn check_compact_commit<Block: BlockT>(
852 msg: &CompactCommit<Block::Header>,
853 voters: &VoterSet<AuthorityId>,
854 round: Round,
855 set_id: SetId,
856 telemetry: Option<&TelemetryHandle>,
857) -> Result<(), ReputationChange> {
858 let f = voters.total_weight() - voters.threshold();
860 let full_threshold = (f + voters.total_weight()).0;
861
862 let mut total_weight = 0;
864 for (_, ref id) in &msg.auth_data {
865 if let Some(weight) = voters.get(id).map(|info| info.weight()) {
866 total_weight += weight.get();
867 if total_weight > full_threshold {
868 return Err(cost::MALFORMED_COMMIT);
869 }
870 } else {
871 debug!(target: LOG_TARGET, "Skipping commit containing unknown voter {}", id);
872 return Err(cost::MALFORMED_COMMIT);
873 }
874 }
875
876 if total_weight < voters.threshold().get() {
877 return Err(cost::MALFORMED_COMMIT);
878 }
879
880 let mut buf = Vec::new();
882 for (i, (precommit, (sig, id))) in msg.precommits.iter().zip(&msg.auth_data).enumerate() {
883 use crate::communication::gossip::Misbehavior;
884 use finality_grandpa::Message as GrandpaMessage;
885
886 if !sp_consensus_grandpa::check_message_signature_with_buffer(
887 &GrandpaMessage::Precommit(precommit.clone()),
888 id,
889 sig,
890 round.0,
891 set_id.0,
892 &mut buf,
893 )
894 .is_valid()
895 {
896 debug!(target: LOG_TARGET, "Bad commit message signature {}", id);
897 telemetry!(
898 telemetry;
899 CONSENSUS_DEBUG;
900 "afg.bad_commit_msg_signature";
901 "id" => ?id,
902 );
903 let cost = Misbehavior::BadCommitMessage {
904 signatures_checked: i as i32,
905 blocks_loaded: 0,
906 equivocations_caught: 0,
907 }
908 .cost();
909
910 return Err(cost);
911 }
912 }
913
914 Ok(())
915}
916
917fn check_catch_up<Block: BlockT>(
920 msg: &CatchUp<Block::Header>,
921 voters: &VoterSet<AuthorityId>,
922 set_id: SetId,
923 telemetry: Option<TelemetryHandle>,
924) -> Result<(), ReputationChange> {
925 let f = voters.total_weight() - voters.threshold();
927 let full_threshold = (f + voters.total_weight()).0;
928
929 fn check_weight<'a>(
931 voters: &'a VoterSet<AuthorityId>,
932 votes: impl Iterator<Item = &'a AuthorityId>,
933 full_threshold: u64,
934 ) -> Result<(), ReputationChange> {
935 let mut total_weight = 0;
936
937 for id in votes {
938 if let Some(weight) = voters.get(id).map(|info| info.weight()) {
939 total_weight += weight.get();
940 if total_weight > full_threshold {
941 return Err(cost::MALFORMED_CATCH_UP);
942 }
943 } else {
944 debug!(
945 target: LOG_TARGET,
946 "Skipping catch up message containing unknown voter {}", id
947 );
948 return Err(cost::MALFORMED_CATCH_UP);
949 }
950 }
951
952 if total_weight < voters.threshold().get() {
953 return Err(cost::MALFORMED_CATCH_UP);
954 }
955
956 Ok(())
957 }
958
959 check_weight(voters, msg.prevotes.iter().map(|vote| &vote.id), full_threshold)?;
960
961 check_weight(voters, msg.precommits.iter().map(|vote| &vote.id), full_threshold)?;
962
963 fn check_signatures<'a, B, I>(
964 messages: I,
965 round: RoundNumber,
966 set_id: SetIdNumber,
967 mut signatures_checked: usize,
968 buf: &mut Vec<u8>,
969 telemetry: Option<TelemetryHandle>,
970 ) -> Result<usize, ReputationChange>
971 where
972 B: BlockT,
973 I: Iterator<Item = (Message<B::Header>, &'a AuthorityId, &'a AuthoritySignature)>,
974 {
975 use crate::communication::gossip::Misbehavior;
976
977 for (msg, id, sig) in messages {
978 signatures_checked += 1;
979
980 if !sp_consensus_grandpa::check_message_signature_with_buffer(
981 &msg, id, sig, round, set_id, buf,
982 )
983 .is_valid()
984 {
985 debug!(target: LOG_TARGET, "Bad catch up message signature {}", id);
986 telemetry!(
987 telemetry;
988 CONSENSUS_DEBUG;
989 "afg.bad_catch_up_msg_signature";
990 "id" => ?id,
991 );
992
993 let cost = Misbehavior::BadCatchUpMessage {
994 signatures_checked: signatures_checked as i32,
995 }
996 .cost();
997
998 return Err(cost);
999 }
1000 }
1001
1002 Ok(signatures_checked)
1003 }
1004
1005 let mut buf = Vec::new();
1006
1007 let signatures_checked = check_signatures::<Block, _>(
1009 msg.prevotes.iter().map(|vote| {
1010 (finality_grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature)
1011 }),
1012 msg.round_number,
1013 set_id.0,
1014 0,
1015 &mut buf,
1016 telemetry.clone(),
1017 )?;
1018
1019 check_signatures::<Block, _>(
1021 msg.precommits.iter().map(|vote| {
1022 (
1023 finality_grandpa::Message::Precommit(vote.precommit.clone()),
1024 &vote.id,
1025 &vote.signature,
1026 )
1027 }),
1028 msg.round_number,
1029 set_id.0,
1030 signatures_checked,
1031 &mut buf,
1032 telemetry,
1033 )?;
1034
1035 Ok(())
1036}
1037
1038fn encode_commit_message<Block: BlockT>(
1040 round: Round,
1041 set_id: SetId,
1042 commit: Commit<Block::Header>,
1043) -> Vec<u8> {
1044 let (precommits, auth_data) = commit
1045 .precommits
1046 .into_iter()
1047 .map(|signed| (signed.precommit, (signed.signature, signed.id)))
1048 .unzip();
1049
1050 let compact_commit = CompactCommit::<Block::Header> {
1051 target_hash: commit.target_hash,
1052 target_number: commit.target_number,
1053 precommits,
1054 auth_data,
1055 };
1056
1057 let message = GossipMessage::Commit(FullCommitMessage::<Block> {
1058 round,
1059 set_id,
1060 message: compact_commit,
1061 });
1062
1063 message.encode()
1064}
1065
1066struct CommitsOut<Block: BlockT> {
1068 network: Arc<Mutex<GossipEngine<Block>>>,
1069 set_id: SetId,
1070 is_voter: bool,
1071 gossip_validator: Arc<GossipValidator<Block>>,
1072 neighbor_sender: periodic::NeighborPacketSender<Block>,
1073 telemetry: Option<TelemetryHandle>,
1074}
1075
1076impl<Block: BlockT> CommitsOut<Block> {
1077 pub(crate) fn new(
1079 network: Arc<Mutex<GossipEngine<Block>>>,
1080 set_id: SetIdNumber,
1081 is_voter: bool,
1082 gossip_validator: Arc<GossipValidator<Block>>,
1083 neighbor_sender: periodic::NeighborPacketSender<Block>,
1084 telemetry: Option<TelemetryHandle>,
1085 ) -> Self {
1086 CommitsOut {
1087 network,
1088 set_id: SetId(set_id),
1089 is_voter,
1090 gossip_validator,
1091 neighbor_sender,
1092 telemetry,
1093 }
1094 }
1095}
1096
1097impl<Block: BlockT> Sink<(RoundNumber, Commit<Block::Header>)> for CommitsOut<Block> {
1098 type Error = Error;
1099
1100 fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1101 Poll::Ready(Ok(()))
1102 }
1103
1104 fn start_send(
1105 self: Pin<&mut Self>,
1106 input: (RoundNumber, Commit<Block::Header>),
1107 ) -> Result<(), Self::Error> {
1108 if !self.is_voter {
1109 return Ok(());
1110 }
1111
1112 let (round, commit) = input;
1113 let round = Round(round);
1114
1115 telemetry!(
1116 self.telemetry;
1117 CONSENSUS_DEBUG;
1118 "afg.commit_issued";
1119 "target_number" => ?commit.target_number,
1120 "target_hash" => ?commit.target_hash,
1121 );
1122
1123 let target_number = commit.target_number;
1124 let topic = global_topic::<Block>(self.set_id.0);
1125 let encoded = encode_commit_message::<Block>(round, self.set_id, commit);
1126
1127 self.gossip_validator.note_commit_finalized(
1130 round,
1131 self.set_id,
1132 target_number,
1133 |to, neighbor| self.neighbor_sender.send(to, neighbor),
1134 );
1135 self.network.lock().gossip_message(topic, encoded, false);
1136
1137 Ok(())
1138 }
1139
1140 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1141 Poll::Ready(Ok(()))
1142 }
1143
1144 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1145 Poll::Ready(Ok(()))
1146 }
1147}