1use futures::{channel::mpsc, prelude::*};
33use log::{debug, trace};
34use parking_lot::Mutex;
35use prometheus_endpoint::Registry;
36use std::{
37 pin::Pin,
38 sync::Arc,
39 task::{Context, Poll},
40 time::Duration,
41};
42
43use codec::{Decode, DecodeAll, Encode};
44use finality_grandpa::{
45 voter,
46 voter_set::VoterSet,
47 Message::{Precommit, Prevote, PrimaryPropose},
48};
49use sc_network::{NetworkBlock, NetworkSyncForkRequest, NotificationService, ReputationChange};
50use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
51use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
52use sp_keystore::KeystorePtr;
53use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
54
55use crate::{
56 environment::HasVoted, CatchUp, Commit, CommunicationIn, CommunicationOutH, CompactCommit,
57 Error, Message, SignedMessage, LOG_TARGET,
58};
59use gossip::{
60 FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
61};
62use sc_network_sync::SyncEventStream;
63use sc_utils::mpsc::TracingUnboundedReceiver;
64use sp_consensus_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};
65
66pub mod gossip;
67mod periodic;
68
69#[cfg(test)]
70pub(crate) mod tests;
71
72pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 * 60);
74
75pub mod grandpa_protocol_name {
76 use sc_chain_spec::ChainSpec;
77 use sc_network::types::ProtocolName;
78
79 pub(crate) const NAME: &str = "/grandpa/1";
80 pub(crate) const LEGACY_NAMES: [&str; 1] = ["/paritytech/grandpa/1"];
82
83 pub fn standard_name<Hash: AsRef<[u8]>>(
87 genesis_hash: &Hash,
88 chain_spec: &Box<dyn ChainSpec>,
89 ) -> ProtocolName {
90 let genesis_hash = genesis_hash.as_ref();
91 let chain_prefix = match chain_spec.fork_id() {
92 Some(fork_id) => format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), fork_id),
93 None => format!("/{}", array_bytes::bytes2hex("", genesis_hash)),
94 };
95 format!("{}{}", chain_prefix, NAME).into()
96 }
97}
98
99mod cost {
101 use sc_network::ReputationChange as Rep;
102 pub(super) const PAST_REJECTION: Rep = Rep::new(-50, "Grandpa: Past message");
103 pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "Grandpa: Bad signature");
104 pub(super) const MALFORMED_CATCH_UP: Rep = Rep::new(-1000, "Grandpa: Malformed cath-up");
105 pub(super) const MALFORMED_COMMIT: Rep = Rep::new(-1000, "Grandpa: Malformed commit");
106 pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Future message");
107 pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");
108
109 pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
110 pub(super) const DUPLICATE_NEIGHBOR_MESSAGE: Rep =
111 Rep::new(-500, "Grandpa: Duplicate neighbor message without grace period");
112 pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
113 pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
114 pub(super) const PER_BLOCK_LOADED: i32 = -10;
115 pub(super) const INVALID_CATCH_UP: Rep = Rep::new(-5000, "Grandpa: Invalid catch-up");
116 pub(super) const INVALID_COMMIT: Rep = Rep::new(-5000, "Grandpa: Invalid commit");
117 pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Out-of-scope message");
118 pub(super) const CATCH_UP_REQUEST_TIMEOUT: Rep =
119 Rep::new(-200, "Grandpa: Catch-up request timeout");
120
121 pub(super) const CATCH_UP_REPLY: Rep = Rep::new(-200, "Grandpa: Catch-up reply");
123 pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: Rep =
124 Rep::new(-200, "Grandpa: Out-of-scope catch-up");
125}
126
127mod benefit {
129 use sc_network::ReputationChange as Rep;
130 pub(super) const NEIGHBOR_MESSAGE: Rep = Rep::new(100, "Grandpa: Neighbor message");
131 pub(super) const ROUND_MESSAGE: Rep = Rep::new(100, "Grandpa: Round message");
132 pub(super) const BASIC_VALIDATED_CATCH_UP: Rep = Rep::new(200, "Grandpa: Catch-up message");
133 pub(super) const BASIC_VALIDATED_COMMIT: Rep = Rep::new(100, "Grandpa: Commit");
134 pub(super) const PER_EQUIVOCATION: i32 = 10;
135}
136
137pub struct LocalIdKeystore((AuthorityId, KeystorePtr));
140
141impl LocalIdKeystore {
142 fn local_id(&self) -> &AuthorityId {
144 &(self.0).0
145 }
146
147 fn keystore(&self) -> KeystorePtr {
149 (self.0).1.clone()
150 }
151}
152
153impl From<(AuthorityId, KeystorePtr)> for LocalIdKeystore {
154 fn from(inner: (AuthorityId, KeystorePtr)) -> LocalIdKeystore {
155 LocalIdKeystore(inner)
156 }
157}
158
159const TELEMETRY_VOTERS_LIMIT: usize = 10;
163
164pub trait Network<Block: BlockT>: GossipNetwork<Block> + Clone + Send + 'static {}
168
169impl<Block, T> Network<Block> for T
170where
171 Block: BlockT,
172 T: GossipNetwork<Block> + Clone + Send + 'static,
173{
174}
175
176pub trait Syncing<Block: BlockT>:
180 NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
181 + NetworkBlock<Block::Hash, NumberFor<Block>>
182 + SyncEventStream
183 + Clone
184 + Send
185 + 'static
186{
187}
188
189impl<Block, T> Syncing<Block> for T
190where
191 Block: BlockT,
192 T: NetworkSyncForkRequest<Block::Hash, NumberFor<Block>>
193 + NetworkBlock<Block::Hash, NumberFor<Block>>
194 + SyncEventStream
195 + Clone
196 + Send
197 + 'static,
198{
199}
200
201pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
203 <<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
204}
205
206pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
208 <<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
209}
210
211pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>, S: Syncing<B>> {
213 service: N,
214 sync: S,
215 gossip_engine: Arc<Mutex<GossipEngine<B>>>,
216 validator: Arc<GossipValidator<B>>,
217
218 neighbor_sender: periodic::NeighborPacketSender<B>,
223
224 neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
228
229 gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
236
237 telemetry: Option<TelemetryHandle>,
238}
239
240impl<B: BlockT, N: Network<B>, S: Syncing<B>> Unpin for NetworkBridge<B, N, S> {}
241
242impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
243 pub(crate) fn new(
248 service: N,
249 sync: S,
250 notification_service: Box<dyn NotificationService>,
251 config: crate::Config,
252 set_state: crate::environment::SharedVoterSetState<B>,
253 prometheus_registry: Option<&Registry>,
254 telemetry: Option<TelemetryHandle>,
255 ) -> Self {
256 let protocol = config.protocol_name.clone();
257 let (validator, report_stream) =
258 GossipValidator::new(config, set_state.clone(), prometheus_registry, telemetry.clone());
259
260 let validator = Arc::new(validator);
261 let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
262 service.clone(),
263 sync.clone(),
264 notification_service,
265 protocol,
266 validator.clone(),
267 prometheus_registry,
268 )));
269
270 {
271 let completed = set_state.read().completed_rounds();
274 let (set_id, voters) = completed.set_info();
275 validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
276 for round in completed.iter() {
277 let topic = round_topic::<B>(round.number, set_id);
278
279 validator.note_round(Round(round.number), |_, _| {});
282
283 for signed in round.votes.iter() {
284 let message = gossip::GossipMessage::Vote(gossip::VoteMessage::<B> {
285 message: signed.clone(),
286 round: Round(round.number),
287 set_id: SetId(set_id),
288 });
289
290 gossip_engine.lock().register_gossip_message(topic, message.encode());
291 }
292
293 trace!(
294 target: LOG_TARGET,
295 "Registered {} messages for topic {:?} (round: {}, set_id: {})",
296 round.votes.len(),
297 topic,
298 round.number,
299 set_id,
300 );
301 }
302 }
303
304 let (neighbor_packet_worker, neighbor_packet_sender) =
305 periodic::NeighborPacketWorker::new(NEIGHBOR_REBROADCAST_PERIOD);
306
307 NetworkBridge {
308 service,
309 sync,
310 gossip_engine,
311 validator,
312 neighbor_sender: neighbor_packet_sender,
313 neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
314 gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
315 telemetry,
316 }
317 }
318
319 pub(crate) fn note_round(&self, round: Round, set_id: SetId, voters: &VoterSet<AuthorityId>) {
321 self.validator.note_set(
323 set_id,
324 voters.iter().map(|(v, _)| v.clone()).collect(),
325 |to, neighbor| self.neighbor_sender.send(to, neighbor),
326 );
327
328 self.validator
329 .note_round(round, |to, neighbor| self.neighbor_sender.send(to, neighbor));
330 }
331
332 pub(crate) fn round_communication(
335 &self,
336 keystore: Option<LocalIdKeystore>,
337 round: Round,
338 set_id: SetId,
339 voters: Arc<VoterSet<AuthorityId>>,
340 has_voted: HasVoted<B::Header>,
341 ) -> (impl Stream<Item = SignedMessage<B::Header>> + Unpin, OutgoingMessages<B>) {
342 self.note_round(round, set_id, &voters);
343
344 let keystore = keystore.and_then(|ks| {
345 let id = ks.local_id();
346 if voters.contains(id) {
347 Some(ks)
348 } else {
349 None
350 }
351 });
352
353 let topic = round_topic::<B>(round.0, set_id.0);
354 let telemetry = self.telemetry.clone();
355 let incoming =
356 self.gossip_engine.lock().messages_for(topic).filter_map(move |notification| {
357 let decoded = GossipMessage::<B>::decode_all(&mut ¬ification.message[..]);
358
359 match decoded {
360 Err(ref e) => {
361 debug!(
362 target: LOG_TARGET,
363 "Skipping malformed message {:?}: {}", notification, e
364 );
365 future::ready(None)
366 },
367 Ok(GossipMessage::Vote(msg)) => {
368 if !voters.contains(&msg.message.id) {
370 debug!(
371 target: LOG_TARGET,
372 "Skipping message from unknown voter {}", msg.message.id
373 );
374 return future::ready(None)
375 }
376
377 if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
378 match &msg.message.message {
379 PrimaryPropose(propose) => {
380 telemetry!(
381 telemetry;
382 CONSENSUS_INFO;
383 "afg.received_propose";
384 "voter" => ?format!("{}", msg.message.id),
385 "target_number" => ?propose.target_number,
386 "target_hash" => ?propose.target_hash,
387 );
388 },
389 Prevote(prevote) => {
390 telemetry!(
391 telemetry;
392 CONSENSUS_INFO;
393 "afg.received_prevote";
394 "voter" => ?format!("{}", msg.message.id),
395 "target_number" => ?prevote.target_number,
396 "target_hash" => ?prevote.target_hash,
397 );
398 },
399 Precommit(precommit) => {
400 telemetry!(
401 telemetry;
402 CONSENSUS_INFO;
403 "afg.received_precommit";
404 "voter" => ?format!("{}", msg.message.id),
405 "target_number" => ?precommit.target_number,
406 "target_hash" => ?precommit.target_hash,
407 );
408 },
409 };
410 }
411
412 future::ready(Some(msg.message))
413 },
414 _ => {
415 debug!(target: LOG_TARGET, "Skipping unknown message type");
416 future::ready(None)
417 },
418 }
419 });
420
421 let (tx, out_rx) = mpsc::channel(0);
422 let outgoing = OutgoingMessages::<B> {
423 keystore,
424 round: round.0,
425 set_id: set_id.0,
426 network: self.gossip_engine.clone(),
427 sender: tx,
428 has_voted,
429 telemetry: self.telemetry.clone(),
430 };
431
432 let incoming = stream::select(incoming, out_rx);
436
437 (incoming, outgoing)
438 }
439
440 pub(crate) fn global_communication(
442 &self,
443 set_id: SetId,
444 voters: Arc<VoterSet<AuthorityId>>,
445 is_voter: bool,
446 ) -> (
447 impl Stream<Item = CommunicationIn<B>>,
448 impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin,
449 ) {
450 self.validator.note_set(
451 set_id,
452 voters.iter().map(|(v, _)| v.clone()).collect(),
453 |to, neighbor| self.neighbor_sender.send(to, neighbor),
454 );
455
456 let topic = global_topic::<B>(set_id.0);
457 let incoming = incoming_global(
458 self.gossip_engine.clone(),
459 topic,
460 voters,
461 self.validator.clone(),
462 self.neighbor_sender.clone(),
463 self.telemetry.clone(),
464 );
465
466 let outgoing = CommitsOut::<B>::new(
467 self.gossip_engine.clone(),
468 set_id.0,
469 is_voter,
470 self.validator.clone(),
471 self.neighbor_sender.clone(),
472 self.telemetry.clone(),
473 );
474
475 let outgoing = outgoing.with(|out| {
476 let voter::CommunicationOut::Commit(round, commit) = out;
477 future::ok((round, commit))
478 });
479
480 (incoming, outgoing)
481 }
482
483 pub(crate) fn set_sync_fork_request(
490 &self,
491 peers: Vec<sc_network_types::PeerId>,
492 hash: B::Hash,
493 number: NumberFor<B>,
494 ) {
495 self.sync.set_sync_fork_request(peers, hash, number)
496 }
497}
498
499impl<B: BlockT, N: Network<B>, S: Syncing<B>> Future for NetworkBridge<B, N, S> {
500 type Output = Result<(), Error>;
501
502 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
503 loop {
504 match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
505 Poll::Ready(Some((to, packet))) => {
506 self.gossip_engine.lock().send_message(to, packet.encode());
507 },
508 Poll::Ready(None) =>
509 return Poll::Ready(Err(Error::Network(
510 "Neighbor packet worker stream closed.".into(),
511 ))),
512 Poll::Pending => break,
513 }
514 }
515
516 loop {
517 match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
518 Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
519 self.gossip_engine.lock().report(who, cost_benefit);
520 },
521 Poll::Ready(None) =>
522 return Poll::Ready(Err(Error::Network(
523 "Gossip validator report stream closed.".into(),
524 ))),
525 Poll::Pending => break,
526 }
527 }
528
529 match self.gossip_engine.lock().poll_unpin(cx) {
530 Poll::Ready(()) =>
531 return Poll::Ready(Err(Error::Network("Gossip engine future finished.".into()))),
532 Poll::Pending => {},
533 }
534
535 Poll::Pending
536 }
537}
538
539fn incoming_global<B: BlockT>(
540 gossip_engine: Arc<Mutex<GossipEngine<B>>>,
541 topic: B::Hash,
542 voters: Arc<VoterSet<AuthorityId>>,
543 gossip_validator: Arc<GossipValidator<B>>,
544 neighbor_sender: periodic::NeighborPacketSender<B>,
545 telemetry: Option<TelemetryHandle>,
546) -> impl Stream<Item = CommunicationIn<B>> {
547 let process_commit = {
548 let telemetry = telemetry.clone();
549 move |msg: FullCommitMessage<B>,
550 mut notification: sc_network_gossip::TopicNotification,
551 gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
552 gossip_validator: &Arc<GossipValidator<B>>,
553 voters: &VoterSet<AuthorityId>| {
554 if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
555 let precommits_signed_by: Vec<String> =
556 msg.message.auth_data.iter().map(move |(_, a)| format!("{}", a)).collect();
557
558 telemetry!(
559 telemetry;
560 CONSENSUS_INFO;
561 "afg.received_commit";
562 "contains_precommits_signed_by" => ?precommits_signed_by,
563 "target_number" => ?msg.message.target_number.clone(),
564 "target_hash" => ?msg.message.target_hash.clone(),
565 );
566 }
567
568 if let Err(cost) = check_compact_commit::<B>(
569 &msg.message,
570 voters,
571 msg.round,
572 msg.set_id,
573 telemetry.as_ref(),
574 ) {
575 if let Some(who) = notification.sender {
576 gossip_engine.lock().report(who, cost);
577 }
578
579 return None
580 }
581
582 let round = msg.round;
583 let set_id = msg.set_id;
584 let commit = msg.message;
585 let finalized_number = commit.target_number;
586 let gossip_validator = gossip_validator.clone();
587 let gossip_engine = gossip_engine.clone();
588 let neighbor_sender = neighbor_sender.clone();
589 let cb = move |outcome| match outcome {
590 voter::CommitProcessingOutcome::Good(_) => {
591 gossip_validator.note_commit_finalized(
595 round,
596 set_id,
597 finalized_number,
598 |to, neighbor| neighbor_sender.send(to, neighbor),
599 );
600
601 gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
602 },
603 voter::CommitProcessingOutcome::Bad(_) => {
604 if let Some(who) = notification.sender.take() {
606 gossip_engine.lock().report(who, cost::INVALID_COMMIT);
607 }
608 },
609 };
610
611 let cb = voter::Callback::Work(Box::new(cb));
612
613 Some(voter::CommunicationIn::Commit(round.0, commit, cb))
614 }
615 };
616
617 let process_catch_up = move |msg: FullCatchUpMessage<B>,
618 mut notification: sc_network_gossip::TopicNotification,
619 gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
620 gossip_validator: &Arc<GossipValidator<B>>,
621 voters: &VoterSet<AuthorityId>| {
622 let gossip_validator = gossip_validator.clone();
623 let gossip_engine = gossip_engine.clone();
624
625 if let Err(cost) = check_catch_up::<B>(&msg.message, voters, msg.set_id, telemetry.clone())
626 {
627 if let Some(who) = notification.sender {
628 gossip_engine.lock().report(who, cost);
629 }
630
631 return None
632 }
633
634 let cb = move |outcome| {
635 if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
636 if let Some(who) = notification.sender.take() {
638 gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
639 }
640 }
641
642 gossip_validator.note_catch_up_message_processed();
643 };
644
645 let cb = voter::Callback::Work(Box::new(cb));
646
647 Some(voter::CommunicationIn::CatchUp(msg.message, cb))
648 };
649
650 gossip_engine
651 .clone()
652 .lock()
653 .messages_for(topic)
654 .filter_map(|notification| {
655 let decoded = GossipMessage::<B>::decode_all(&mut ¬ification.message[..]);
657 if let Err(ref e) = decoded {
658 trace!(
659 target: LOG_TARGET,
660 "Skipping malformed commit message {:?}: {}",
661 notification,
662 e
663 );
664 }
665 future::ready(decoded.map(move |d| (notification, d)).ok())
666 })
667 .filter_map(move |(notification, msg)| {
668 future::ready(match msg {
669 GossipMessage::Commit(msg) =>
670 process_commit(msg, notification, &gossip_engine, &gossip_validator, &voters),
671 GossipMessage::CatchUp(msg) =>
672 process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &voters),
673 _ => {
674 debug!(target: LOG_TARGET, "Skipping unknown message type");
675 None
676 },
677 })
678 })
679}
680
681impl<B: BlockT, N: Network<B>, S: Syncing<B>> Clone for NetworkBridge<B, N, S> {
682 fn clone(&self) -> Self {
683 NetworkBridge {
684 service: self.service.clone(),
685 sync: self.sync.clone(),
686 gossip_engine: self.gossip_engine.clone(),
687 validator: Arc::clone(&self.validator),
688 neighbor_sender: self.neighbor_sender.clone(),
689 neighbor_packet_worker: self.neighbor_packet_worker.clone(),
690 gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
691 telemetry: self.telemetry.clone(),
692 }
693 }
694}
695
696#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
698pub struct Round(pub RoundNumber);
699
700#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
702pub struct SetId(pub SetIdNumber);
703
704pub(crate) struct OutgoingMessages<Block: BlockT> {
712 round: RoundNumber,
713 set_id: SetIdNumber,
714 keystore: Option<LocalIdKeystore>,
715 sender: mpsc::Sender<SignedMessage<Block::Header>>,
716 network: Arc<Mutex<GossipEngine<Block>>>,
717 has_voted: HasVoted<Block::Header>,
718 telemetry: Option<TelemetryHandle>,
719}
720
721impl<B: BlockT> Unpin for OutgoingMessages<B> {}
722
723impl<Block: BlockT> Sink<Message<Block::Header>> for OutgoingMessages<Block> {
724 type Error = Error;
725
726 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
727 Sink::poll_ready(Pin::new(&mut self.sender), cx).map(|elem| {
728 elem.map_err(|e| {
729 Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
730 })
731 })
732 }
733
734 fn start_send(
735 mut self: Pin<&mut Self>,
736 mut msg: Message<Block::Header>,
737 ) -> Result<(), Self::Error> {
738 match &mut msg {
740 finality_grandpa::Message::PrimaryPropose(ref mut vote) => {
741 if let Some(propose) = self.has_voted.propose() {
742 *vote = propose.clone();
743 }
744 },
745 finality_grandpa::Message::Prevote(ref mut vote) => {
746 if let Some(prevote) = self.has_voted.prevote() {
747 *vote = prevote.clone();
748 }
749 },
750 finality_grandpa::Message::Precommit(ref mut vote) => {
751 if let Some(precommit) = self.has_voted.precommit() {
752 *vote = precommit.clone();
753 }
754 },
755 }
756
757 if let Some(ref keystore) = self.keystore {
759 let target_hash = *(msg.target().0);
760 let signed = sp_consensus_grandpa::sign_message(
761 keystore.keystore(),
762 msg,
763 keystore.local_id().clone(),
764 self.round,
765 self.set_id,
766 )
767 .ok_or_else(|| {
768 Error::Signing(format!(
769 "Failed to sign GRANDPA vote for round {} targeting {:?}",
770 self.round, target_hash
771 ))
772 })?;
773
774 let message = GossipMessage::Vote(VoteMessage::<Block> {
775 message: signed.clone(),
776 round: Round(self.round),
777 set_id: SetId(self.set_id),
778 });
779
780 debug!(
781 target: LOG_TARGET,
782 "Announcing block {} to peers which we voted on in round {} in set {}",
783 target_hash,
784 self.round,
785 self.set_id,
786 );
787
788 telemetry!(
789 self.telemetry;
790 CONSENSUS_DEBUG;
791 "afg.announcing_blocks_to_voted_peers";
792 "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
793 );
794
795 self.network.lock().announce(target_hash, None);
797
798 let topic = round_topic::<Block>(self.round, self.set_id);
800 self.network.lock().gossip_message(topic, message.encode(), false);
801
802 return self.sender.start_send(signed).map_err(|e| {
804 Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
805 })
806 };
807
808 Ok(())
809 }
810
811 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
812 Poll::Ready(Ok(()))
813 }
814
815 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
816 Sink::poll_close(Pin::new(&mut self.sender), cx).map(|elem| {
817 elem.map_err(|e| {
818 Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
819 })
820 })
821 }
822}
823
824fn check_compact_commit<Block: BlockT>(
827 msg: &CompactCommit<Block::Header>,
828 voters: &VoterSet<AuthorityId>,
829 round: Round,
830 set_id: SetId,
831 telemetry: Option<&TelemetryHandle>,
832) -> Result<(), ReputationChange> {
833 let f = voters.total_weight() - voters.threshold();
835 let full_threshold = (f + voters.total_weight()).0;
836
837 let mut total_weight = 0;
839 for (_, ref id) in &msg.auth_data {
840 if let Some(weight) = voters.get(id).map(|info| info.weight()) {
841 total_weight += weight.get();
842 if total_weight > full_threshold {
843 return Err(cost::MALFORMED_COMMIT)
844 }
845 } else {
846 debug!(target: LOG_TARGET, "Skipping commit containing unknown voter {}", id);
847 return Err(cost::MALFORMED_COMMIT)
848 }
849 }
850
851 if total_weight < voters.threshold().get() {
852 return Err(cost::MALFORMED_COMMIT)
853 }
854
855 let mut buf = Vec::new();
857 for (i, (precommit, (sig, id))) in msg.precommits.iter().zip(&msg.auth_data).enumerate() {
858 use crate::communication::gossip::Misbehavior;
859 use finality_grandpa::Message as GrandpaMessage;
860
861 if !sp_consensus_grandpa::check_message_signature_with_buffer(
862 &GrandpaMessage::Precommit(precommit.clone()),
863 id,
864 sig,
865 round.0,
866 set_id.0,
867 &mut buf,
868 )
869 .is_valid()
870 {
871 debug!(target: LOG_TARGET, "Bad commit message signature {}", id);
872 telemetry!(
873 telemetry;
874 CONSENSUS_DEBUG;
875 "afg.bad_commit_msg_signature";
876 "id" => ?id,
877 );
878 let cost = Misbehavior::BadCommitMessage {
879 signatures_checked: i as i32,
880 blocks_loaded: 0,
881 equivocations_caught: 0,
882 }
883 .cost();
884
885 return Err(cost)
886 }
887 }
888
889 Ok(())
890}
891
892fn check_catch_up<Block: BlockT>(
895 msg: &CatchUp<Block::Header>,
896 voters: &VoterSet<AuthorityId>,
897 set_id: SetId,
898 telemetry: Option<TelemetryHandle>,
899) -> Result<(), ReputationChange> {
900 let f = voters.total_weight() - voters.threshold();
902 let full_threshold = (f + voters.total_weight()).0;
903
904 fn check_weight<'a>(
906 voters: &'a VoterSet<AuthorityId>,
907 votes: impl Iterator<Item = &'a AuthorityId>,
908 full_threshold: u64,
909 ) -> Result<(), ReputationChange> {
910 let mut total_weight = 0;
911
912 for id in votes {
913 if let Some(weight) = voters.get(id).map(|info| info.weight()) {
914 total_weight += weight.get();
915 if total_weight > full_threshold {
916 return Err(cost::MALFORMED_CATCH_UP)
917 }
918 } else {
919 debug!(
920 target: LOG_TARGET,
921 "Skipping catch up message containing unknown voter {}", id
922 );
923 return Err(cost::MALFORMED_CATCH_UP)
924 }
925 }
926
927 if total_weight < voters.threshold().get() {
928 return Err(cost::MALFORMED_CATCH_UP)
929 }
930
931 Ok(())
932 }
933
934 check_weight(voters, msg.prevotes.iter().map(|vote| &vote.id), full_threshold)?;
935
936 check_weight(voters, msg.precommits.iter().map(|vote| &vote.id), full_threshold)?;
937
938 fn check_signatures<'a, B, I>(
939 messages: I,
940 round: RoundNumber,
941 set_id: SetIdNumber,
942 mut signatures_checked: usize,
943 buf: &mut Vec<u8>,
944 telemetry: Option<TelemetryHandle>,
945 ) -> Result<usize, ReputationChange>
946 where
947 B: BlockT,
948 I: Iterator<Item = (Message<B::Header>, &'a AuthorityId, &'a AuthoritySignature)>,
949 {
950 use crate::communication::gossip::Misbehavior;
951
952 for (msg, id, sig) in messages {
953 signatures_checked += 1;
954
955 if !sp_consensus_grandpa::check_message_signature_with_buffer(
956 &msg, id, sig, round, set_id, buf,
957 )
958 .is_valid()
959 {
960 debug!(target: LOG_TARGET, "Bad catch up message signature {}", id);
961 telemetry!(
962 telemetry;
963 CONSENSUS_DEBUG;
964 "afg.bad_catch_up_msg_signature";
965 "id" => ?id,
966 );
967
968 let cost = Misbehavior::BadCatchUpMessage {
969 signatures_checked: signatures_checked as i32,
970 }
971 .cost();
972
973 return Err(cost)
974 }
975 }
976
977 Ok(signatures_checked)
978 }
979
980 let mut buf = Vec::new();
981
982 let signatures_checked = check_signatures::<Block, _>(
984 msg.prevotes.iter().map(|vote| {
985 (finality_grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature)
986 }),
987 msg.round_number,
988 set_id.0,
989 0,
990 &mut buf,
991 telemetry.clone(),
992 )?;
993
994 check_signatures::<Block, _>(
996 msg.precommits.iter().map(|vote| {
997 (
998 finality_grandpa::Message::Precommit(vote.precommit.clone()),
999 &vote.id,
1000 &vote.signature,
1001 )
1002 }),
1003 msg.round_number,
1004 set_id.0,
1005 signatures_checked,
1006 &mut buf,
1007 telemetry,
1008 )?;
1009
1010 Ok(())
1011}
1012
1013struct CommitsOut<Block: BlockT> {
1015 network: Arc<Mutex<GossipEngine<Block>>>,
1016 set_id: SetId,
1017 is_voter: bool,
1018 gossip_validator: Arc<GossipValidator<Block>>,
1019 neighbor_sender: periodic::NeighborPacketSender<Block>,
1020 telemetry: Option<TelemetryHandle>,
1021}
1022
1023impl<Block: BlockT> CommitsOut<Block> {
1024 pub(crate) fn new(
1026 network: Arc<Mutex<GossipEngine<Block>>>,
1027 set_id: SetIdNumber,
1028 is_voter: bool,
1029 gossip_validator: Arc<GossipValidator<Block>>,
1030 neighbor_sender: periodic::NeighborPacketSender<Block>,
1031 telemetry: Option<TelemetryHandle>,
1032 ) -> Self {
1033 CommitsOut {
1034 network,
1035 set_id: SetId(set_id),
1036 is_voter,
1037 gossip_validator,
1038 neighbor_sender,
1039 telemetry,
1040 }
1041 }
1042}
1043
1044impl<Block: BlockT> Sink<(RoundNumber, Commit<Block::Header>)> for CommitsOut<Block> {
1045 type Error = Error;
1046
1047 fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1048 Poll::Ready(Ok(()))
1049 }
1050
1051 fn start_send(
1052 self: Pin<&mut Self>,
1053 input: (RoundNumber, Commit<Block::Header>),
1054 ) -> Result<(), Self::Error> {
1055 if !self.is_voter {
1056 return Ok(())
1057 }
1058
1059 let (round, commit) = input;
1060 let round = Round(round);
1061
1062 telemetry!(
1063 self.telemetry;
1064 CONSENSUS_DEBUG;
1065 "afg.commit_issued";
1066 "target_number" => ?commit.target_number,
1067 "target_hash" => ?commit.target_hash,
1068 );
1069 let (precommits, auth_data) = commit
1070 .precommits
1071 .into_iter()
1072 .map(|signed| (signed.precommit, (signed.signature, signed.id)))
1073 .unzip();
1074
1075 let compact_commit = CompactCommit::<Block::Header> {
1076 target_hash: commit.target_hash,
1077 target_number: commit.target_number,
1078 precommits,
1079 auth_data,
1080 };
1081
1082 let message = GossipMessage::Commit(FullCommitMessage::<Block> {
1083 round,
1084 set_id: self.set_id,
1085 message: compact_commit,
1086 });
1087
1088 let topic = global_topic::<Block>(self.set_id.0);
1089
1090 self.gossip_validator.note_commit_finalized(
1093 round,
1094 self.set_id,
1095 commit.target_number,
1096 |to, neighbor| self.neighbor_sender.send(to, neighbor),
1097 );
1098 self.network.lock().gossip_message(topic, message.encode(), false);
1099
1100 Ok(())
1101 }
1102
1103 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1104 Poll::Ready(Ok(()))
1105 }
1106
1107 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
1108 Poll::Ready(Ok(()))
1109 }
1110}