1use crate::{
20 communication::{
21 gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage},
22 request_response::outgoing_requests_engine::ResponseInfo,
23 },
24 error::Error,
25 find_authorities_change,
26 fisherman::Fisherman,
27 justification::BeefyVersionedFinalityProof,
28 keystore::BeefyKeystore,
29 metric_inc, metric_set,
30 metrics::VoterMetrics,
31 round::{Rounds, VoteImportResult},
32 BeefyComms, BeefyVoterLinks, UnpinnedFinalityNotification, LOG_TARGET,
33};
34use sp_application_crypto::RuntimeAppPublic;
35
36use codec::{Codec, Decode, DecodeAll, Encode};
37use futures::{stream::Fuse, FutureExt, StreamExt};
38use log::{debug, error, info, trace, warn};
39use sc_client_api::{Backend, HeaderBackend};
40use sc_utils::notification::NotificationReceiver;
41use sp_api::ProvideRuntimeApi;
42use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
43use sp_consensus::SyncOracle;
44use sp_consensus_beefy::{
45 AuthorityIdBound, BeefyApi, Commitment, DoubleVotingProof, PayloadProvider, ValidatorSet,
46 VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
47};
48use sp_runtime::{
49 generic::BlockId,
50 traits::{Block, Header, NumberFor, Zero},
51 SaturatedConversion,
52};
53use std::{
54 collections::{BTreeMap, VecDeque},
55 fmt::Debug,
56 marker::PhantomData,
57 sync::Arc,
58};
59
60const MAX_BUFFERED_JUSTIFICATIONS: usize = 2400;
63
64pub(crate) enum RoundAction {
65 Drop,
66 Process,
67 Enqueue,
68}
69
70#[derive(Debug, Decode, Encode, PartialEq)]
77pub(crate) struct VoterOracle<B: Block, AuthorityId: AuthorityIdBound> {
78 sessions: VecDeque<Rounds<B, AuthorityId>>,
88 min_block_delta: u32,
90 best_grandpa_block_header: <B as Block>::Header,
92 best_beefy_block: NumberFor<B>,
94 _phantom: PhantomData<fn() -> AuthorityId>,
95}
96
97impl<B: Block, AuthorityId> VoterOracle<B, AuthorityId>
98where
99 AuthorityId: AuthorityIdBound,
100{
101 pub fn checked_new(
103 sessions: VecDeque<Rounds<B, AuthorityId>>,
104 min_block_delta: u32,
105 grandpa_header: <B as Block>::Header,
106 best_beefy: NumberFor<B>,
107 ) -> Option<Self> {
108 let mut prev_start = Zero::zero();
109 let mut prev_validator_id = None;
110 let mut validate = || -> bool {
112 let best_grandpa = *grandpa_header.number();
113 if sessions.is_empty() || best_beefy > best_grandpa {
114 return false;
115 }
116 for (idx, session) in sessions.iter().enumerate() {
117 let start = session.session_start();
118 if session.validators().is_empty() {
119 return false;
120 }
121 if start > best_grandpa || start <= prev_start {
122 return false;
123 }
124 #[cfg(not(test))]
125 if let Some(prev_id) = prev_validator_id {
126 if session.validator_set_id() <= prev_id {
127 return false;
128 }
129 }
130 if idx != 0 && session.mandatory_done() {
131 return false;
132 }
133 prev_start = session.session_start();
134 prev_validator_id = Some(session.validator_set_id());
135 }
136 true
137 };
138 if validate() {
139 Some(VoterOracle {
140 sessions,
141 min_block_delta: min_block_delta.max(1),
143 best_grandpa_block_header: grandpa_header,
144 best_beefy_block: best_beefy,
145 _phantom: PhantomData,
146 })
147 } else {
148 error!(
149 target: LOG_TARGET,
150 "🥩 Invalid sessions queue: {:?}; best-beefy {:?} best-grandpa-header {:?}.",
151 sessions,
152 best_beefy,
153 grandpa_header
154 );
155 None
156 }
157 }
158
159 fn active_rounds(&self) -> Result<&Rounds<B, AuthorityId>, Error> {
162 self.sessions.front().ok_or(Error::UninitSession)
163 }
164
165 fn active_rounds_mut(&mut self) -> Result<&mut Rounds<B, AuthorityId>, Error> {
168 self.sessions.front_mut().ok_or(Error::UninitSession)
169 }
170
171 fn current_validator_set(&self) -> Result<&ValidatorSet<AuthorityId>, Error> {
172 self.active_rounds().map(|r| r.validator_set())
173 }
174
175 fn try_prune(&mut self) {
179 if self.sessions.len() > 1 {
180 self.sessions.retain(|s| !s.mandatory_done())
182 }
183 }
184
185 pub fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
187 let latest_known_session_start =
188 self.sessions.back().map(|session| session.session_start());
189 Some(session_start) > latest_known_session_start
190 }
191
192 pub fn add_session(&mut self, rounds: Rounds<B, AuthorityId>) {
194 self.sessions.push_back(rounds);
195 self.try_prune();
197 }
198
199 pub fn finalize(&mut self, block: NumberFor<B>) -> Result<(), Error> {
201 self.active_rounds_mut()?.conclude(block);
203 self.try_prune();
205 Ok(())
206 }
207
208 pub fn mandatory_pending(&self) -> Option<(NumberFor<B>, ValidatorSet<AuthorityId>)> {
210 self.sessions.front().and_then(|round| {
211 if round.mandatory_done() {
212 None
213 } else {
214 Some((round.session_start(), round.validator_set().clone()))
215 }
216 })
217 }
218
219 pub fn accepted_interval(&self) -> Result<(NumberFor<B>, NumberFor<B>), Error> {
221 let rounds = self.sessions.front().ok_or(Error::UninitSession)?;
222
223 if rounds.mandatory_done() {
224 Ok((
227 rounds.session_start().max(self.best_beefy_block),
228 (*self.best_grandpa_block_header.number()),
229 ))
230 } else {
231 Ok((rounds.session_start(), rounds.session_start()))
234 }
235 }
236
237 pub fn triage_round(&self, round: NumberFor<B>) -> Result<RoundAction, Error> {
239 let (start, end) = self.accepted_interval()?;
240 if start <= round && round <= end {
241 Ok(RoundAction::Process)
242 } else if round > end {
243 Ok(RoundAction::Enqueue)
244 } else {
245 Ok(RoundAction::Drop)
246 }
247 }
248
249 pub fn voting_target(&self) -> Option<NumberFor<B>> {
252 let rounds = self.sessions.front().or_else(|| {
253 debug!(target: LOG_TARGET, "🥩 No voting round started");
254 None
255 })?;
256 let best_grandpa = *self.best_grandpa_block_header.number();
257 let best_beefy = self.best_beefy_block;
258
259 let target =
261 vote_target(best_grandpa, best_beefy, rounds.session_start(), self.min_block_delta);
262 trace!(
263 target: LOG_TARGET,
264 "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}",
265 best_beefy,
266 best_grandpa,
267 target
268 );
269 target
270 }
271}
272
273#[derive(Debug, Decode, Encode, PartialEq)]
277pub(crate) struct PersistedState<B: Block, AuthorityId: AuthorityIdBound> {
278 best_voted: NumberFor<B>,
280 voting_oracle: VoterOracle<B, AuthorityId>,
283 pallet_genesis: NumberFor<B>,
285}
286
287impl<B: Block, AuthorityId: AuthorityIdBound> PersistedState<B, AuthorityId> {
288 pub fn checked_new(
289 grandpa_header: <B as Block>::Header,
290 best_beefy: NumberFor<B>,
291 sessions: VecDeque<Rounds<B, AuthorityId>>,
292 min_block_delta: u32,
293 pallet_genesis: NumberFor<B>,
294 ) -> Option<Self> {
295 VoterOracle::checked_new(sessions, min_block_delta, grandpa_header, best_beefy).map(
296 |voting_oracle| PersistedState {
297 best_voted: Zero::zero(),
298 voting_oracle,
299 pallet_genesis,
300 },
301 )
302 }
303
304 pub fn pallet_genesis(&self) -> NumberFor<B> {
305 self.pallet_genesis
306 }
307
308 pub(crate) fn set_min_block_delta(&mut self, min_block_delta: u32) {
309 self.voting_oracle.min_block_delta = min_block_delta.max(1);
310 }
311
312 pub fn best_beefy(&self) -> NumberFor<B> {
313 self.voting_oracle.best_beefy_block
314 }
315
316 pub(crate) fn set_best_beefy(&mut self, best_beefy: NumberFor<B>) {
317 self.voting_oracle.best_beefy_block = best_beefy;
318 }
319
320 pub(crate) fn set_best_grandpa(&mut self, best_grandpa: <B as Block>::Header) {
321 self.voting_oracle.best_grandpa_block_header = best_grandpa;
322 }
323
324 pub fn voting_oracle(&self) -> &VoterOracle<B, AuthorityId> {
325 &self.voting_oracle
326 }
327
328 pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B, AuthorityId>, Error> {
329 let (start, end) = self.voting_oracle.accepted_interval()?;
330 let validator_set = self.voting_oracle.current_validator_set()?;
331 Ok(GossipFilterCfg { start, end, validator_set })
332 }
333
334 pub fn init_session_at(
336 &mut self,
337 new_session_start: NumberFor<B>,
338 validator_set: ValidatorSet<AuthorityId>,
339 key_store: &BeefyKeystore<AuthorityId>,
340 metrics: &Option<VoterMetrics>,
341 is_authority: bool,
342 ) {
343 debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
344
345 if let Ok(active_session) = self.voting_oracle.active_rounds() {
347 if !active_session.mandatory_done() {
348 debug!(
349 target: LOG_TARGET,
350 "🥩 New session {} while active session {} is still lagging.",
351 validator_set.id(),
352 active_session.validator_set_id(),
353 );
354 metric_inc!(metrics, beefy_lagging_sessions);
355 }
356 }
357
358 if is_authority && key_store.public_keys().map_or(false, |k| k.is_empty()) {
360 error!(
361 target: LOG_TARGET,
362 "🥩 for session starting at block {:?} no BEEFY authority key found in store, \
363 you must generate valid session keys \
364 (https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#generating-the-session-keys)",
365 new_session_start,
366 );
367 metric_inc!(metrics, beefy_no_authority_found_in_store);
368 }
369
370 let id = validator_set.id();
371 self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set));
372 metric_set!(metrics, beefy_validator_set_id, id);
373 info!(
374 target: LOG_TARGET,
375 "🥩 New Rounds for validator set id: {:?} with session_start {:?}",
376 id,
377 new_session_start
378 );
379 }
380}
381
382pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S, N, AuthorityId: AuthorityIdBound> {
384 pub backend: Arc<BE>,
386 pub runtime: Arc<RuntimeApi>,
387 pub key_store: Arc<BeefyKeystore<AuthorityId>>,
388 pub payload_provider: P,
389 pub sync: Arc<S>,
390 pub fisherman: Arc<Fisherman<B, BE, RuntimeApi, AuthorityId>>,
391
392 pub comms: BeefyComms<B, N, AuthorityId>,
394
395 pub links: BeefyVoterLinks<B, AuthorityId>,
398
399 pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B, AuthorityId>>,
402 pub persisted_state: PersistedState<B, AuthorityId>,
404 pub metrics: Option<VoterMetrics>,
406 pub is_authority: bool,
408}
409
410impl<B, BE, P, R, S, N, AuthorityId> BeefyWorker<B, BE, P, R, S, N, AuthorityId>
411where
412 B: Block + Codec,
413 BE: Backend<B>,
414 P: PayloadProvider<B>,
415 S: SyncOracle,
416 R: ProvideRuntimeApi<B>,
417 R::Api: BeefyApi<B, AuthorityId>,
418 AuthorityId: AuthorityIdBound,
419{
420 fn best_grandpa_block(&self) -> NumberFor<B> {
421 *self.persisted_state.voting_oracle.best_grandpa_block_header.number()
422 }
423
424 fn voting_oracle(&self) -> &VoterOracle<B, AuthorityId> {
425 &self.persisted_state.voting_oracle
426 }
427
428 #[cfg(test)]
429 fn active_rounds(&mut self) -> Result<&Rounds<B, AuthorityId>, Error> {
430 self.persisted_state.voting_oracle.active_rounds()
431 }
432
433 fn init_session_at(
435 &mut self,
436 validator_set: ValidatorSet<AuthorityId>,
437 new_session_start: NumberFor<B>,
438 ) {
439 self.persisted_state.init_session_at(
440 new_session_start,
441 validator_set,
442 &self.key_store,
443 &self.metrics,
444 self.is_authority,
445 );
446 }
447
448 fn handle_finality_notification(
449 &mut self,
450 notification: &UnpinnedFinalityNotification<B>,
451 ) -> Result<(), Error> {
452 let header = ¬ification.header;
453 debug!(
454 target: LOG_TARGET,
455 "🥩 Finality notification: header(number {:?}, hash {:?}) tree_route {:?}",
456 header.number(),
457 notification.hash,
458 notification.tree_route,
459 );
460
461 match self.runtime.runtime_api().beefy_genesis(notification.hash) {
462 Ok(Some(genesis)) if genesis != self.persisted_state.pallet_genesis => {
463 debug!(target: LOG_TARGET, "🥩 ConsensusReset detected. Expected genesis: {}, found genesis: {}", self.persisted_state.pallet_genesis, genesis);
464 return Err(Error::ConsensusReset)
465 },
466 Ok(_) => {},
467 Err(api_error) => {
468 debug!(target: LOG_TARGET, "🥩 Unable to check beefy genesis: {}", api_error);
471 },
472 }
473
474 let mut new_session_added = false;
475 if *header.number() > self.best_grandpa_block() {
476 self.persisted_state.set_best_grandpa(header.clone());
478
479 let backend = self.backend.clone();
481 for header in notification
482 .tree_route
483 .iter()
484 .map(|hash| {
485 backend
486 .blockchain()
487 .expect_header(*hash)
488 .expect("just finalized block should be available; qed.")
489 })
490 .chain(std::iter::once(header.clone()))
491 {
492 if let Some(new_validator_set) = find_authorities_change::<B, AuthorityId>(&header)
493 {
494 self.init_session_at(new_validator_set, *header.number());
495 new_session_added = true;
496 }
497 }
498
499 if new_session_added {
500 crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
501 .map_err(|e| Error::Backend(e.to_string()))?;
502 }
503
504 if let Err(e) = self
506 .persisted_state
507 .gossip_filter_config()
508 .map(|filter| self.comms.gossip_validator.update_filter(filter))
509 {
510 error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
511 }
512 }
513
514 Ok(())
515 }
516
517 fn triage_incoming_vote(
519 &mut self,
520 vote: VoteMessage<NumberFor<B>, AuthorityId, <AuthorityId as RuntimeAppPublic>::Signature>,
521 ) -> Result<(), Error>
522 where
523 <AuthorityId as RuntimeAppPublic>::Signature: Encode + Decode,
524 {
525 let block_num = vote.commitment.block_number;
526 match self.voting_oracle().triage_round(block_num)? {
527 RoundAction::Process =>
528 if let Some(finality_proof) = self.handle_vote(vote)? {
529 let gossip_proof =
530 GossipMessage::<B, AuthorityId>::FinalityProof(finality_proof);
531 let encoded_proof = gossip_proof.encode();
532 self.comms.gossip_engine.gossip_message(
533 proofs_topic::<B>(),
534 encoded_proof,
535 true,
536 );
537 },
538 RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes),
539 RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
540 };
541 Ok(())
542 }
543
544 fn triage_incoming_justif(
548 &mut self,
549 justification: BeefyVersionedFinalityProof<B, AuthorityId>,
550 ) -> Result<(), Error> {
551 let signed_commitment = match justification {
552 VersionedFinalityProof::V1(ref sc) => sc,
553 };
554 let block_num = signed_commitment.commitment.block_number;
555 match self.voting_oracle().triage_round(block_num)? {
556 RoundAction::Process => {
557 debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
558 metric_inc!(self.metrics, beefy_imported_justifications);
559 self.finalize(justification)?
560 },
561 RoundAction::Enqueue => {
562 debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
563 if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
564 self.pending_justifications.entry(block_num).or_insert(justification);
565 metric_inc!(self.metrics, beefy_buffered_justifications);
566 } else {
567 metric_inc!(self.metrics, beefy_buffered_justifications_dropped);
568 warn!(
569 target: LOG_TARGET,
570 "🥩 Buffer justification dropped for round: {:?}.", block_num
571 );
572 }
573 },
574 RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications),
575 };
576 Ok(())
577 }
578
579 fn handle_vote(
580 &mut self,
581 vote: VoteMessage<NumberFor<B>, AuthorityId, <AuthorityId as RuntimeAppPublic>::Signature>,
582 ) -> Result<Option<BeefyVersionedFinalityProof<B, AuthorityId>>, Error> {
583 let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
584
585 let block_number = vote.commitment.block_number;
586 match rounds.add_vote(vote) {
587 VoteImportResult::RoundConcluded(signed_commitment) => {
588 let finality_proof = VersionedFinalityProof::V1(signed_commitment);
589 debug!(
590 target: LOG_TARGET,
591 "🥩 Round #{} concluded, finality_proof: {:?}.", block_number, finality_proof
592 );
593 self.finalize(finality_proof.clone())?;
596 metric_inc!(self.metrics, beefy_good_votes_processed);
597 return Ok(Some(finality_proof));
598 },
599 VoteImportResult::Ok => {
600 if self
602 .voting_oracle()
603 .mandatory_pending()
604 .map(|(mandatory_num, _)| mandatory_num == block_number)
605 .unwrap_or(false)
606 {
607 crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
608 .map_err(|e| Error::Backend(e.to_string()))?;
609 }
610 metric_inc!(self.metrics, beefy_good_votes_processed);
611 },
612 VoteImportResult::DoubleVoting(proof) => {
613 metric_inc!(self.metrics, beefy_equivocation_votes);
614 self.report_double_voting(proof)?;
615 },
616 VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes),
617 VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes),
618 };
619 Ok(None)
620 }
621
622 fn finalize(
630 &mut self,
631 finality_proof: BeefyVersionedFinalityProof<B, AuthorityId>,
632 ) -> Result<(), Error> {
633 let block_num = match finality_proof {
634 VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number,
635 };
636
637 if block_num <= self.persisted_state.voting_oracle.best_beefy_block {
638 return Ok(());
640 }
641
642 self.persisted_state.voting_oracle.finalize(block_num)?;
644
645 self.persisted_state.set_best_beefy(block_num);
647 crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
648 .map_err(|e| Error::Backend(e.to_string()))?;
649
650 metric_set!(self.metrics, beefy_best_block, block_num);
651
652 self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
653
654 if let Err(e) = self
655 .backend
656 .blockchain()
657 .expect_block_hash_from_id(&BlockId::Number(block_num))
658 .and_then(|hash| {
659 self.links
660 .to_rpc_best_block_sender
661 .notify(|| Ok::<_, ()>(hash))
662 .expect("forwards closure result; the closure always returns Ok; qed.");
663
664 self.backend
665 .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
666 }) {
667 debug!(
668 target: LOG_TARGET,
669 "🥩 Error {:?} on appending justification: {:?}", e, finality_proof
670 );
671 }
672
673 self.links
674 .to_rpc_justif_sender
675 .notify(|| Ok::<_, ()>(finality_proof))
676 .expect("forwards closure result; the closure always returns Ok; qed.");
677
678 self.persisted_state
680 .gossip_filter_config()
681 .map(|filter| self.comms.gossip_validator.update_filter(filter))?;
682 Ok(())
683 }
684
685 fn try_pending_justifications(&mut self) -> Result<(), Error> {
687 let (start, end) = self.voting_oracle().accepted_interval()?;
689 if !self.pending_justifications.is_empty() {
691 let still_pending =
693 self.pending_justifications.split_off(&end.saturating_add(1u32.into()));
694 let justifs_to_process = self.pending_justifications.split_off(&start);
696 self.pending_justifications = still_pending;
698
699 for (num, justification) in justifs_to_process.into_iter() {
700 debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
701 metric_inc!(self.metrics, beefy_imported_justifications);
702 if let Err(err) = self.finalize(justification) {
703 error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
704 }
705 }
706 metric_set!(
707 self.metrics,
708 beefy_buffered_justifications,
709 self.pending_justifications.len()
710 );
711 }
712 Ok(())
713 }
714
715 fn try_to_vote(&mut self) -> Result<(), Error> {
717 if let Some(target) = self.voting_oracle().voting_target() {
719 metric_set!(self.metrics, beefy_should_vote_on, target);
720 if target > self.persisted_state.best_voted {
721 self.do_vote(target)?;
722 }
723 }
724 Ok(())
725 }
726
727 fn do_vote(&mut self, target_number: NumberFor<B>) -> Result<(), Error> {
731 debug!(target: LOG_TARGET, "🥩 Try voting on {}", target_number);
732
733 let target_header = if target_number == self.best_grandpa_block() {
736 self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
737 } else {
738 let hash = self
739 .backend
740 .blockchain()
741 .expect_block_hash_from_id(&BlockId::Number(target_number))
742 .map_err(|err| {
743 let err_msg = format!(
744 "Couldn't get hash for block #{:?} (error: {:?}), skipping vote..",
745 target_number, err
746 );
747 Error::Backend(err_msg)
748 })?;
749
750 self.backend.blockchain().expect_header(hash).map_err(|err| {
751 let err_msg = format!(
752 "Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
753 target_number, hash, err
754 );
755 Error::Backend(err_msg)
756 })?
757 };
758 let target_hash = target_header.hash();
759
760 let payload = if let Some(hash) = self.payload_provider.payload(&target_header) {
761 hash
762 } else {
763 warn!(target: LOG_TARGET, "🥩 No MMR root digest found for: {:?}", target_hash);
764 return Ok(());
765 };
766
767 let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
768 let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
769
770 let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
771 debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
772 id
773 } else {
774 debug!(
775 target: LOG_TARGET,
776 "🥩 Missing validator id - can't vote for: {:?}", target_hash
777 );
778 return Ok(());
779 };
780
781 let commitment = Commitment { payload, block_number: target_number, validator_set_id };
782 let encoded_commitment = commitment.encode();
783
784 let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
785 Ok(sig) => sig,
786 Err(err) => {
787 warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
788 return Ok(());
789 },
790 };
791
792 trace!(
793 target: LOG_TARGET,
794 "🥩 Produced signature using {:?}, is_valid: {:?}",
795 authority_id,
796 BeefyKeystore::verify(&authority_id, &signature, &encoded_commitment)
797 );
798
799 let vote = VoteMessage { commitment, id: authority_id, signature };
800 if let Some(finality_proof) = self.handle_vote(vote.clone()).map_err(|err| {
801 error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err);
802 err
803 })? {
804 let encoded_proof =
805 GossipMessage::<B, AuthorityId>::FinalityProof(finality_proof).encode();
806 self.comms
807 .gossip_engine
808 .gossip_message(proofs_topic::<B>(), encoded_proof, true);
809 } else {
810 metric_inc!(self.metrics, beefy_votes_sent);
811 debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
812 let encoded_vote = GossipMessage::<B, AuthorityId>::Vote(vote).encode();
813 self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
814 }
815
816 self.persisted_state.best_voted = target_number;
818 metric_set!(self.metrics, beefy_best_voted, target_number);
819 crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
820 .map_err(|e| Error::Backend(e.to_string()))
821 }
822
823 fn process_new_state(&mut self) {
824 if let Err(err) = self.try_pending_justifications() {
826 debug!(target: LOG_TARGET, "🥩 {}", err);
827 }
828
829 if !self.sync.is_major_syncing() {
831 if let Err(err) = self.try_to_vote() {
833 debug!(target: LOG_TARGET, "🥩 {}", err);
834 }
835 if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
838 self.comms.on_demand_justifications.request(block, active);
840 }
841 }
842 }
843
844 pub(crate) async fn run(
849 mut self,
850 block_import_justif: &mut Fuse<
851 NotificationReceiver<BeefyVersionedFinalityProof<B, AuthorityId>>,
852 >,
853 finality_notifications: &mut Fuse<crate::FinalityNotifications<B>>,
854 ) -> (Error, BeefyComms<B, N, AuthorityId>) {
855 info!(
856 target: LOG_TARGET,
857 "🥩 run BEEFY worker, best grandpa: #{:?}.",
858 self.best_grandpa_block()
859 );
860
861 let mut votes = Box::pin(
862 self.comms
863 .gossip_engine
864 .messages_for(votes_topic::<B>())
865 .filter_map(|notification| async move {
866 let vote =
867 GossipMessage::<B, AuthorityId>::decode_all(&mut ¬ification.message[..])
868 .ok()
869 .and_then(|message| message.unwrap_vote());
870 trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote);
871 vote
872 })
873 .fuse(),
874 );
875 let mut gossip_proofs = Box::pin(
876 self.comms
877 .gossip_engine
878 .messages_for(proofs_topic::<B>())
879 .filter_map(|notification| async move {
880 let proof =
881 GossipMessage::<B, AuthorityId>::decode_all(&mut ¬ification.message[..])
882 .ok()
883 .and_then(|message| message.unwrap_finality_proof());
884 trace!(target: LOG_TARGET, "🥩 Got gossip proof message: {:?}", proof);
885 proof
886 })
887 .fuse(),
888 );
889
890 self.process_new_state();
891 let error = loop {
892 let mut gossip_engine = &mut self.comms.gossip_engine;
894
895 futures::select_biased! {
899 notification = finality_notifications.next() => {
902 if let Some(notif) = notification {
903 if let Err(err) = self.handle_finality_notification(¬if) {
904 break err;
905 }
906 } else {
907 break Error::FinalityStreamTerminated;
908 }
909 },
910 _ = gossip_engine => {
912 break Error::GossipEngineTerminated;
913 },
914 response_info = self.comms.on_demand_justifications.next().fuse() => {
916 match response_info {
917 ResponseInfo::ValidProof(justif, peer_report) => {
918 if let Err(err) = self.triage_incoming_justif(justif) {
919 debug!(target: LOG_TARGET, "🥩 {}", err);
920 }
921 self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit);
922 },
923 ResponseInfo::PeerReport(peer_report) => {
924 self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit);
925 },
926 ResponseInfo::Pending => {},
927 }
928 },
929 justif = block_import_justif.next() => {
930 if let Some(justif) = justif {
931 if let Err(err) = self.triage_incoming_justif(justif) {
934 debug!(target: LOG_TARGET, "🥩 {}", err);
935 }
936 } else {
937 break Error::BlockImportStreamTerminated;
938 }
939 },
940 justif = gossip_proofs.next() => {
941 if let Some(justif) = justif {
942 if let Err(err) = self.triage_incoming_justif(justif) {
944 debug!(target: LOG_TARGET, "🥩 {}", err);
945 }
946 } else {
947 break Error::FinalityProofGossipStreamTerminated;
948 }
949 },
950 vote = votes.next() => {
952 if let Some(vote) = vote {
953 if let Err(err) = self.triage_incoming_vote(vote) {
955 debug!(target: LOG_TARGET, "🥩 {}", err);
956 }
957 } else {
958 break Error::VotesGossipStreamTerminated;
959 }
960 },
961 }
962
963 self.process_new_state();
965 };
966
967 (error, self.comms)
969 }
970
971 fn report_double_voting(
973 &self,
974 proof: DoubleVotingProof<
975 NumberFor<B>,
976 AuthorityId,
977 <AuthorityId as RuntimeAppPublic>::Signature,
978 >,
979 ) -> Result<(), Error> {
980 let rounds = self.persisted_state.voting_oracle.active_rounds()?;
981 self.fisherman.report_double_voting(proof, rounds)
982 }
983}
984
985fn vote_target<N>(best_grandpa: N, best_beefy: N, session_start: N, min_delta: u32) -> Option<N>
989where
990 N: AtLeast32Bit + Copy + Debug,
991{
992 let target = if best_beefy < session_start {
995 debug!(target: LOG_TARGET, "🥩 vote target - mandatory block: #{:?}", session_start);
996 session_start
997 } else {
998 let diff = best_grandpa.saturating_sub(best_beefy) + 1u32.into();
999 let diff = diff.saturated_into::<u32>() / 2;
1000 let target = best_beefy + min_delta.max(diff.next_power_of_two()).into();
1001 trace!(
1002 target: LOG_TARGET,
1003 "🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}",
1004 diff,
1005 diff.next_power_of_two(),
1006 target,
1007 );
1008
1009 target
1010 };
1011
1012 if target > best_grandpa {
1015 None
1016 } else {
1017 Some(target)
1018 }
1019}
1020
1021#[cfg(test)]
1022pub(crate) mod tests {
1023 use super::*;
1024 use crate::{
1025 communication::{
1026 gossip::{tests::TestNetwork, GossipValidator},
1027 notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
1028 request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
1029 },
1030 tests::{
1031 create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet,
1032 TestApi,
1033 },
1034 BeefyRPCLinks, KnownPeers,
1035 };
1036 use futures::{future::poll_fn, task::Poll};
1037 use parking_lot::Mutex;
1038 use sc_client_api::{Backend as BackendT, HeaderBackend};
1039 use sc_network_gossip::GossipEngine;
1040 use sc_network_sync::SyncingService;
1041 use sc_network_test::TestNetFactory;
1042 use sp_blockchain::Backend as BlockchainBackendT;
1043 use sp_consensus_beefy::{
1044 ecdsa_crypto, known_payloads,
1045 known_payloads::MMR_ROOT_ID,
1046 mmr::MmrRootProvider,
1047 test_utils::{generate_double_voting_proof, Keyring},
1048 ConsensusLog, Payload, SignedCommitment,
1049 };
1050 use sp_runtime::traits::{Header as HeaderT, One};
1051 use substrate_test_runtime_client::{
1052 runtime::{Block, Digest, DigestItem, Header},
1053 Backend,
1054 };
1055
1056 impl<B: super::Block, AuthorityId: AuthorityIdBound> PersistedState<B, AuthorityId> {
1057 pub fn active_round(&self) -> Result<&Rounds<B, AuthorityId>, Error> {
1058 self.voting_oracle.active_rounds()
1059 }
1060
1061 pub fn best_grandpa_number(&self) -> NumberFor<B> {
1062 *self.voting_oracle.best_grandpa_block_header.number()
1063 }
1064 }
1065
1066 impl<B: super::Block> VoterOracle<B, ecdsa_crypto::AuthorityId> {
1067 pub fn sessions(&self) -> &VecDeque<Rounds<B, ecdsa_crypto::AuthorityId>> {
1068 &self.sessions
1069 }
1070 }
1071
1072 fn create_beefy_worker(
1073 peer: &mut BeefyPeer,
1074 key: &Keyring<ecdsa_crypto::AuthorityId>,
1075 min_block_delta: u32,
1076 genesis_validator_set: ValidatorSet<ecdsa_crypto::AuthorityId>,
1077 ) -> BeefyWorker<
1078 Block,
1079 Backend,
1080 MmrRootProvider<Block, TestApi>,
1081 TestApi,
1082 Arc<SyncingService<Block>>,
1083 TestNetwork,
1084 ecdsa_crypto::AuthorityId,
1085 > {
1086 let keystore = create_beefy_keystore(key);
1087
1088 let (to_rpc_justif_sender, from_voter_justif_stream) =
1089 BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
1090 let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
1091 BeefyBestBlockStream::<Block>::channel();
1092 let (_, from_block_import_justif_stream) =
1093 BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
1094
1095 let beefy_rpc_links =
1096 BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream };
1097 *peer.data.beefy_rpc_links.lock() = Some(beefy_rpc_links);
1098
1099 let links = BeefyVoterLinks {
1100 from_block_import_justif_stream,
1101 to_rpc_justif_sender,
1102 to_rpc_best_block_sender,
1103 };
1104
1105 let backend = peer.client().as_backend();
1106 let beefy_genesis = 1;
1107 let api = Arc::new(TestApi::with_validator_set(&genesis_validator_set));
1108 let network = peer.network_service().clone();
1109 let sync = peer.sync_service().clone();
1110 let notification_service = peer
1111 .take_notification_service(&crate::tests::beefy_gossip_proto_name())
1112 .unwrap();
1113 let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
1114 let gossip_validator =
1115 GossipValidator::new(known_peers.clone(), Arc::new(TestNetwork::new().0));
1116 let gossip_validator = Arc::new(gossip_validator);
1117 let gossip_engine = GossipEngine::new(
1118 network.clone(),
1119 sync.clone(),
1120 notification_service,
1121 "/beefy/1",
1122 gossip_validator.clone(),
1123 None,
1124 );
1125 let metrics = None;
1126 let on_demand_justifications = OnDemandJustificationsEngine::new(
1127 network.clone(),
1128 "/beefy/justifs/1".into(),
1129 known_peers,
1130 None,
1131 );
1132 let hashes = peer.push_blocks(1, false);
1134 backend.finalize_block(hashes[0], None).unwrap();
1135 let first_header = backend
1136 .blockchain()
1137 .expect_header(backend.blockchain().info().best_hash)
1138 .unwrap();
1139 let persisted_state = PersistedState::checked_new(
1140 first_header,
1141 Zero::zero(),
1142 vec![Rounds::new(One::one(), genesis_validator_set)].into(),
1143 min_block_delta,
1144 beefy_genesis,
1145 )
1146 .unwrap();
1147 let payload_provider = MmrRootProvider::new(api.clone());
1148 let comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications };
1149 let key_store: Arc<BeefyKeystore<ecdsa_crypto::AuthorityId>> =
1150 Arc::new(Some(keystore).into());
1151 BeefyWorker {
1152 backend: backend.clone(),
1153 runtime: api.clone(),
1154 key_store: key_store.clone(),
1155 metrics,
1156 payload_provider,
1157 sync: Arc::new(sync),
1158 fisherman: Arc::new(Fisherman::new(backend, api, key_store)),
1159 links,
1160 comms,
1161 pending_justifications: BTreeMap::new(),
1162 persisted_state,
1163 is_authority: true,
1164 }
1165 }
1166
1167 #[test]
1168 fn vote_on_min_block_delta() {
1169 let t = vote_target(1u32, 1, 1, 4);
1170 assert_eq!(None, t);
1171 let t = vote_target(2u32, 1, 1, 4);
1172 assert_eq!(None, t);
1173 let t = vote_target(4u32, 2, 1, 4);
1174 assert_eq!(None, t);
1175 let t = vote_target(6u32, 2, 1, 4);
1176 assert_eq!(Some(6), t);
1177
1178 let t = vote_target(9u32, 4, 1, 4);
1179 assert_eq!(Some(8), t);
1180
1181 let t = vote_target(10u32, 10, 1, 8);
1182 assert_eq!(None, t);
1183 let t = vote_target(12u32, 10, 1, 8);
1184 assert_eq!(None, t);
1185 let t = vote_target(18u32, 10, 1, 8);
1186 assert_eq!(Some(18), t);
1187 }
1188
1189 #[test]
1190 fn vote_on_power_of_two() {
1191 let t = vote_target(1008u32, 1000, 1, 4);
1192 assert_eq!(Some(1004), t);
1193
1194 let t = vote_target(1016u32, 1000, 1, 4);
1195 assert_eq!(Some(1008), t);
1196
1197 let t = vote_target(1032u32, 1000, 1, 4);
1198 assert_eq!(Some(1016), t);
1199
1200 let t = vote_target(1064u32, 1000, 1, 4);
1201 assert_eq!(Some(1032), t);
1202
1203 let t = vote_target(1128u32, 1000, 1, 4);
1204 assert_eq!(Some(1064), t);
1205
1206 let t = vote_target(1256u32, 1000, 1, 4);
1207 assert_eq!(Some(1128), t);
1208
1209 let t = vote_target(1512u32, 1000, 1, 4);
1210 assert_eq!(Some(1256), t);
1211
1212 let t = vote_target(1024u32, 1, 1, 4);
1213 assert_eq!(Some(513), t);
1214 }
1215
1216 #[test]
1217 fn vote_on_target_block() {
1218 let t = vote_target(1008u32, 1002, 1, 4);
1219 assert_eq!(Some(1006), t);
1220 let t = vote_target(1010u32, 1002, 1, 4);
1221 assert_eq!(Some(1006), t);
1222
1223 let t = vote_target(1016u32, 1006, 1, 4);
1224 assert_eq!(Some(1014), t);
1225 let t = vote_target(1022u32, 1006, 1, 4);
1226 assert_eq!(Some(1014), t);
1227
1228 let t = vote_target(1032u32, 1012, 1, 4);
1229 assert_eq!(Some(1028), t);
1230 let t = vote_target(1044u32, 1012, 1, 4);
1231 assert_eq!(Some(1028), t);
1232
1233 let t = vote_target(1064u32, 1014, 1, 4);
1234 assert_eq!(Some(1046), t);
1235 let t = vote_target(1078u32, 1014, 1, 4);
1236 assert_eq!(Some(1046), t);
1237
1238 let t = vote_target(1128u32, 1008, 1, 4);
1239 assert_eq!(Some(1072), t);
1240 let t = vote_target(1136u32, 1008, 1, 4);
1241 assert_eq!(Some(1072), t);
1242 }
1243
1244 #[test]
1245 fn vote_on_mandatory_block() {
1246 let t = vote_target(1008u32, 1002, 1004, 4);
1247 assert_eq!(Some(1004), t);
1248 let t = vote_target(1016u32, 1006, 1007, 4);
1249 assert_eq!(Some(1007), t);
1250 let t = vote_target(1064u32, 1014, 1063, 4);
1251 assert_eq!(Some(1063), t);
1252 let t = vote_target(1320u32, 1012, 1234, 4);
1253 assert_eq!(Some(1234), t);
1254
1255 let t = vote_target(1128u32, 1008, 1008, 4);
1256 assert_eq!(Some(1072), t);
1257 }
1258
1259 #[test]
1260 fn should_vote_target() {
1261 let header = Header::new(
1262 1u32.into(),
1263 Default::default(),
1264 Default::default(),
1265 Default::default(),
1266 Digest::default(),
1267 );
1268 let mut oracle = VoterOracle::<Block, ecdsa_crypto::AuthorityId> {
1269 best_beefy_block: 0,
1270 best_grandpa_block_header: header,
1271 min_block_delta: 1,
1272 sessions: VecDeque::new(),
1273 _phantom: PhantomData,
1274 };
1275 let voting_target_with = |oracle: &mut VoterOracle<Block, ecdsa_crypto::AuthorityId>,
1276 best_beefy: NumberFor<Block>,
1277 best_grandpa: NumberFor<Block>|
1278 -> Option<NumberFor<Block>> {
1279 oracle.best_beefy_block = best_beefy;
1280 oracle.best_grandpa_block_header.number = best_grandpa;
1281 oracle.voting_target()
1282 };
1283
1284 assert_eq!(voting_target_with(&mut oracle, 0, 1), None);
1286
1287 let keys = &[Keyring::Alice];
1288 let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1289
1290 oracle.add_session(Rounds::new(1, validator_set.clone()));
1291
1292 oracle.min_block_delta = 4;
1294 assert_eq!(voting_target_with(&mut oracle, 1, 1), None);
1295 assert_eq!(voting_target_with(&mut oracle, 2, 5), None);
1296
1297 assert_eq!(voting_target_with(&mut oracle, 4, 9), Some(8));
1299 oracle.min_block_delta = 8;
1300 assert_eq!(voting_target_with(&mut oracle, 10, 18), Some(18));
1301
1302 oracle.min_block_delta = 1;
1304 assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1004));
1305 assert_eq!(voting_target_with(&mut oracle, 1000, 1016), Some(1008));
1306
1307 assert_eq!(voting_target_with(&mut oracle, 1000, 1000), None);
1309
1310 oracle.sessions.clear();
1312 oracle.add_session(Rounds::new(1000, validator_set.clone()));
1313 assert_eq!(voting_target_with(&mut oracle, 0, 1008), Some(1000));
1314 oracle.sessions.clear();
1315 oracle.add_session(Rounds::new(1001, validator_set.clone()));
1316 assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1001));
1317 }
1318
1319 #[test]
1320 fn test_oracle_accepted_interval() {
1321 let keys = &[Keyring::Alice];
1322 let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1323
1324 let header = Header::new(
1325 1u32.into(),
1326 Default::default(),
1327 Default::default(),
1328 Default::default(),
1329 Digest::default(),
1330 );
1331 let mut oracle = VoterOracle::<Block, ecdsa_crypto::AuthorityId> {
1332 best_beefy_block: 0,
1333 best_grandpa_block_header: header,
1334 min_block_delta: 1,
1335 sessions: VecDeque::new(),
1336 _phantom: PhantomData,
1337 };
1338 let accepted_interval_with =
1339 |oracle: &mut VoterOracle<Block, ecdsa_crypto::AuthorityId>,
1340 best_grandpa: NumberFor<Block>|
1341 -> Result<(NumberFor<Block>, NumberFor<Block>), Error> {
1342 oracle.best_grandpa_block_header.number = best_grandpa;
1343 oracle.accepted_interval()
1344 };
1345
1346 assert!(accepted_interval_with(&mut oracle, 1).is_err());
1348
1349 let session_one = 1;
1350 oracle.add_session(Rounds::new(session_one, validator_set.clone()));
1351 for i in 0..15 {
1353 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one)));
1354 }
1355
1356 let session_two = 11;
1358 let session_three = 21;
1359 oracle.add_session(Rounds::new(session_two, validator_set.clone()));
1360 oracle.add_session(Rounds::new(session_three, validator_set.clone()));
1361 for i in session_three..session_three + 15 {
1363 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one)));
1364 }
1365
1366 oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1368 oracle.try_prune();
1369 for i in session_three..session_three + 15 {
1371 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_two, session_two)));
1372 }
1373
1374 oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1376 oracle.try_prune();
1377 for i in session_three..session_three + 15 {
1379 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, session_three)));
1380 }
1381
1382 oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1384 for i in session_three..session_three + 15 {
1386 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i)));
1387 }
1388 oracle.try_prune();
1390 for i in session_three..session_three + 15 {
1391 assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i)));
1392 }
1393
1394 let session_four = 31;
1396 oracle.add_session(Rounds::new(session_four, validator_set.clone()));
1397 assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four);
1398 assert_eq!(
1399 accepted_interval_with(&mut oracle, session_four + 10),
1400 Ok((session_four, session_four))
1401 );
1402 }
1403
1404 #[test]
1405 fn extract_authorities_change_digest() {
1406 let mut header = Header::new(
1407 1u32.into(),
1408 Default::default(),
1409 Default::default(),
1410 Default::default(),
1411 Digest::default(),
1412 );
1413
1414 assert!(find_authorities_change::<Block, ecdsa_crypto::AuthorityId>(&header).is_none());
1416
1417 let peers = &[Keyring::One, Keyring::Two];
1418 let id = 42;
1419 let validator_set = ValidatorSet::new(make_beefy_ids(peers), id).unwrap();
1420 header.digest_mut().push(DigestItem::Consensus(
1421 BEEFY_ENGINE_ID,
1422 ConsensusLog::<ecdsa_crypto::AuthorityId>::AuthoritiesChange(validator_set.clone())
1423 .encode(),
1424 ));
1425
1426 let extracted = find_authorities_change::<Block, ecdsa_crypto::AuthorityId>(&header);
1428 assert_eq!(extracted, Some(validator_set));
1429 }
1430
1431 #[tokio::test]
1432 async fn should_finalize_correctly() {
1433 let keys = [Keyring::Alice];
1434 let validator_set = ValidatorSet::new(make_beefy_ids(&keys), 0).unwrap();
1435 let mut net = BeefyTestNet::new(1);
1436 let backend = net.peer(0).client().as_backend();
1437 let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1438 worker.persisted_state.voting_oracle.sessions.clear();
1440
1441 let keys = keys.iter().cloned().enumerate();
1442 let (mut best_block_streams, mut finality_proofs) =
1443 get_beefy_streams(&mut net, keys.clone());
1444 let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1445 let mut finality_proof = finality_proofs.drain(..).next().unwrap();
1446
1447 let create_finality_proof = |block_num: NumberFor<Block>| {
1448 let commitment = Commitment {
1449 payload: Payload::from_single_entry(known_payloads::MMR_ROOT_ID, vec![]),
1450 block_number: block_num,
1451 validator_set_id: validator_set.id(),
1452 };
1453 VersionedFinalityProof::V1(SignedCommitment { commitment, signatures: vec![None] })
1454 };
1455
1456 assert_eq!(worker.persisted_state.best_beefy(), 0);
1458 poll_fn(move |cx| {
1459 assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending);
1460 assert_eq!(finality_proof.poll_next_unpin(cx), Poll::Pending);
1461 Poll::Ready(())
1462 })
1463 .await;
1464
1465 let client = net.peer(0).client().as_client();
1466 let (mut best_block_streams, mut finality_proofs) =
1468 get_beefy_streams(&mut net, keys.clone());
1469 let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1470 let mut finality_proof = finality_proofs.drain(..).next().unwrap();
1471 let justif = create_finality_proof(1);
1472 worker
1474 .persisted_state
1475 .voting_oracle
1476 .add_session(Rounds::new(1, validator_set.clone()));
1477 worker.finalize(justif.clone()).unwrap();
1479 assert_eq!(worker.persisted_state.best_beefy(), 1);
1481 poll_fn(move |cx| {
1482 match best_block_stream.poll_next_unpin(cx) {
1484 Poll::Ready(Some(hash)) => {
1485 let block_num = client.number(hash).unwrap();
1486 assert_eq!(block_num, Some(1));
1487 },
1488 v => panic!("unexpected value: {:?}", v),
1489 }
1490 match finality_proof.poll_next_unpin(cx) {
1492 Poll::Ready(Some(received)) => assert_eq!(received, justif),
1494 v => panic!("unexpected value: {:?}", v),
1495 }
1496 Poll::Ready(())
1497 })
1498 .await;
1499
1500 let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys);
1502 let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1503 let hashes = net.peer(0).push_blocks(1, false);
1504 let hashof2 = hashes[0];
1506 backend.finalize_block(hashof2, None).unwrap();
1507
1508 let justif = create_finality_proof(2);
1509 worker.persisted_state.voting_oracle.add_session(Rounds::new(2, validator_set));
1511 worker.finalize(justif).unwrap();
1512 assert_eq!(worker.voting_oracle().sessions.len(), 1);
1514 assert_eq!(worker.active_rounds().unwrap().session_start(), 2);
1516 assert_eq!(worker.persisted_state.best_beefy(), 2);
1518 poll_fn(move |cx| {
1519 match best_block_stream.poll_next_unpin(cx) {
1520 Poll::Ready(Some(hash)) => {
1522 let block_num = net.peer(0).client().as_client().number(hash).unwrap();
1523 assert_eq!(block_num, Some(2));
1524 },
1525 v => panic!("unexpected value: {:?}", v),
1526 }
1527 Poll::Ready(())
1528 })
1529 .await;
1530
1531 let justifs = backend.blockchain().justifications(hashof2).unwrap().unwrap();
1533 assert!(justifs.get(BEEFY_ENGINE_ID).is_some())
1534 }
1535
1536 #[tokio::test]
1537 async fn should_init_session() {
1538 let keys = &[Keyring::Alice, Keyring::Bob];
1539 let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1540 let mut net = BeefyTestNet::new(1);
1541 let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1542
1543 let worker_rounds = worker.active_rounds().unwrap();
1544 assert_eq!(worker_rounds.session_start(), 1);
1545 assert_eq!(worker_rounds.validators(), validator_set.validators());
1546 assert_eq!(worker_rounds.validator_set_id(), validator_set.id());
1547
1548 let keys = &[Keyring::Bob];
1550 let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
1551
1552 worker.init_session_at(new_validator_set.clone(), 11);
1553 let rounds = worker.persisted_state.voting_oracle.active_rounds_mut().unwrap();
1555 assert_eq!(rounds.validator_set_id(), validator_set.id());
1556 rounds.test_set_mandatory_done(true);
1558 worker.persisted_state.voting_oracle.try_prune();
1559 let rounds = worker.active_rounds().unwrap();
1561 assert_eq!(rounds.session_start(), 11);
1563 assert_eq!(rounds.validators(), new_validator_set.validators());
1564 assert_eq!(rounds.validator_set_id(), new_validator_set.id());
1565 }
1566
1567 #[tokio::test]
1568 async fn should_not_report_bad_old_or_self_equivocations() {
1569 let block_num = 1;
1570 let set_id = 1;
1571 let keys = [Keyring::Alice];
1572 let validator_set = ValidatorSet::new(make_beefy_ids(&keys), set_id).unwrap();
1573 let mut api_alice = TestApi::with_validator_set(&validator_set);
1575 api_alice.allow_equivocations();
1576 let api_alice = Arc::new(api_alice);
1577
1578 let mut net = BeefyTestNet::new(1);
1579 let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1580 worker.runtime = api_alice.clone();
1581 worker.fisherman = Arc::new(Fisherman::new(
1582 worker.backend.clone(),
1583 worker.runtime.clone(),
1584 worker.key_store.clone(),
1585 ));
1586
1587 let _ = net.peer(0).push_blocks(1, false);
1589
1590 let payload1 = Payload::from_single_entry(MMR_ROOT_ID, vec![42]);
1591 let payload2 = Payload::from_single_entry(MMR_ROOT_ID, vec![128]);
1592
1593 let good_proof = generate_double_voting_proof(
1595 (block_num, payload1.clone(), set_id, &Keyring::Bob),
1596 (block_num, payload2.clone(), set_id, &Keyring::Bob),
1597 );
1598 {
1599 assert_eq!(worker.report_double_voting(good_proof.clone()), Ok(()));
1601 let reported = api_alice.reported_equivocations.as_ref().unwrap().lock();
1603 assert_eq!(reported.len(), 1);
1604 assert_eq!(*reported.get(0).unwrap(), good_proof);
1605 }
1606 api_alice.reported_equivocations.as_ref().unwrap().lock().clear();
1607
1608 let mut bad_proof = good_proof.clone();
1610 bad_proof.first.id = Keyring::Charlie.public();
1611 assert_eq!(worker.report_double_voting(bad_proof), Ok(()));
1613 assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1615
1616 let mut old_proof = good_proof.clone();
1618 old_proof.first.commitment.validator_set_id = 0;
1619 old_proof.second.commitment.validator_set_id = 0;
1620 assert_eq!(worker.report_double_voting(old_proof), Ok(()));
1622 assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1624
1625 let self_proof = generate_double_voting_proof(
1627 (block_num, payload1.clone(), set_id, &Keyring::Alice),
1628 (block_num, payload2.clone(), set_id, &Keyring::Alice),
1629 );
1630 assert_eq!(worker.report_double_voting(self_proof), Ok(()));
1632 assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1634 }
1635}