referrerpolicy=no-referrer-when-downgrade

sc_consensus_beefy/
worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	communication::{
21		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage},
22		request_response::outgoing_requests_engine::ResponseInfo,
23	},
24	error::Error,
25	find_authorities_change,
26	fisherman::Fisherman,
27	justification::BeefyVersionedFinalityProof,
28	keystore::BeefyKeystore,
29	metric_inc, metric_set,
30	metrics::VoterMetrics,
31	round::{Rounds, VoteImportResult},
32	BeefyComms, BeefyVoterLinks, UnpinnedFinalityNotification, LOG_TARGET,
33};
34use sp_application_crypto::RuntimeAppPublic;
35
36use codec::{Codec, Decode, DecodeAll, Encode};
37use futures::{stream::Fuse, FutureExt, StreamExt};
38use log::{debug, error, info, trace, warn};
39use sc_client_api::{Backend, HeaderBackend};
40use sc_utils::notification::NotificationReceiver;
41use sp_api::ProvideRuntimeApi;
42use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
43use sp_consensus::SyncOracle;
44use sp_consensus_beefy::{
45	AuthorityIdBound, BeefyApi, Commitment, DoubleVotingProof, PayloadProvider, ValidatorSet,
46	VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
47};
48use sp_runtime::{
49	generic::BlockId,
50	traits::{Block, Header, NumberFor, Zero},
51	SaturatedConversion,
52};
53use std::{
54	collections::{BTreeMap, VecDeque},
55	fmt::Debug,
56	marker::PhantomData,
57	sync::Arc,
58};
59
60/// Bound for the number of pending justifications - use 2400 - the max number
61/// of justifications possible in a single session.
62const MAX_BUFFERED_JUSTIFICATIONS: usize = 2400;
63
64pub(crate) enum RoundAction {
65	Drop,
66	Process,
67	Enqueue,
68}
69
70/// Responsible for the voting strategy.
71/// It chooses which incoming votes to accept and which votes to generate.
72/// Keeps track of voting seen for current and future rounds.
73///
74/// Note: this is part of `PersistedState` so any changes here should also bump
75/// aux-db schema version.
76#[derive(Debug, Decode, Encode, PartialEq)]
77pub(crate) struct VoterOracle<B: Block, AuthorityId: AuthorityIdBound> {
78	/// Queue of known sessions. Keeps track of voting rounds (block numbers) within each session.
79	///
80	/// There are three voter states corresponding to three queue states:
81	/// 1. voter uninitialized: queue empty,
82	/// 2. up-to-date - all mandatory blocks leading up to current GRANDPA finalized: queue has ONE
83	///    element, the 'current session' where `mandatory_done == true`,
84	/// 3. lagging behind GRANDPA: queue has [1, N] elements, where all `mandatory_done == false`.
85	///    In this state, every time a session gets its mandatory block BEEFY finalized, it's
86	///    popped off the queue, eventually getting to state `2. up-to-date`.
87	sessions: VecDeque<Rounds<B, AuthorityId>>,
88	/// Min delta in block numbers between two blocks, BEEFY should vote on.
89	min_block_delta: u32,
90	/// Best block we received a GRANDPA finality for.
91	best_grandpa_block_header: <B as Block>::Header,
92	/// Best block a BEEFY voting round has been concluded for.
93	best_beefy_block: NumberFor<B>,
94	_phantom: PhantomData<fn() -> AuthorityId>,
95}
96
97impl<B: Block, AuthorityId> VoterOracle<B, AuthorityId>
98where
99	AuthorityId: AuthorityIdBound,
100{
101	/// Verify provided `sessions` satisfies requirements, then build `VoterOracle`.
102	pub fn checked_new(
103		sessions: VecDeque<Rounds<B, AuthorityId>>,
104		min_block_delta: u32,
105		grandpa_header: <B as Block>::Header,
106		best_beefy: NumberFor<B>,
107	) -> Option<Self> {
108		let mut prev_start = Zero::zero();
109		let mut prev_validator_id = None;
110		// verifies the
111		let mut validate = || -> bool {
112			let best_grandpa = *grandpa_header.number();
113			if sessions.is_empty() || best_beefy > best_grandpa {
114				return false;
115			}
116			for (idx, session) in sessions.iter().enumerate() {
117				let start = session.session_start();
118				if session.validators().is_empty() {
119					return false;
120				}
121				if start > best_grandpa || start <= prev_start {
122					return false;
123				}
124				#[cfg(not(test))]
125				if let Some(prev_id) = prev_validator_id {
126					if session.validator_set_id() <= prev_id {
127						return false;
128					}
129				}
130				if idx != 0 && session.mandatory_done() {
131					return false;
132				}
133				prev_start = session.session_start();
134				prev_validator_id = Some(session.validator_set_id());
135			}
136			true
137		};
138		if validate() {
139			Some(VoterOracle {
140				sessions,
141				// Always target at least one block better than current best beefy.
142				min_block_delta: min_block_delta.max(1),
143				best_grandpa_block_header: grandpa_header,
144				best_beefy_block: best_beefy,
145				_phantom: PhantomData,
146			})
147		} else {
148			error!(
149				target: LOG_TARGET,
150				"🥩 Invalid sessions queue: {:?}; best-beefy {:?} best-grandpa-header {:?}.",
151				sessions,
152				best_beefy,
153				grandpa_header
154			);
155			None
156		}
157	}
158
159	// Return reference to rounds pertaining to first session in the queue.
160	// Voting will always happen at the head of the queue.
161	fn active_rounds(&self) -> Result<&Rounds<B, AuthorityId>, Error> {
162		self.sessions.front().ok_or(Error::UninitSession)
163	}
164
165	// Return mutable reference to rounds pertaining to first session in the queue.
166	// Voting will always happen at the head of the queue.
167	fn active_rounds_mut(&mut self) -> Result<&mut Rounds<B, AuthorityId>, Error> {
168		self.sessions.front_mut().ok_or(Error::UninitSession)
169	}
170
171	fn current_validator_set(&self) -> Result<&ValidatorSet<AuthorityId>, Error> {
172		self.active_rounds().map(|r| r.validator_set())
173	}
174
175	// Prune the sessions queue to keep the Oracle in one of the expected three states.
176	//
177	// To be called on each BEEFY finality and on each new rounds/session addition.
178	fn try_prune(&mut self) {
179		if self.sessions.len() > 1 {
180			// when there's multiple sessions, only keep the `!mandatory_done()` ones.
181			self.sessions.retain(|s| !s.mandatory_done())
182		}
183	}
184
185	/// Check if an observed session can be added to the Oracle.
186	pub fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
187		let latest_known_session_start =
188			self.sessions.back().map(|session| session.session_start());
189		Some(session_start) > latest_known_session_start
190	}
191
192	/// Add new observed session to the Oracle.
193	pub fn add_session(&mut self, rounds: Rounds<B, AuthorityId>) {
194		self.sessions.push_back(rounds);
195		// Once we add a new session we can drop/prune previous session if it's been finalized.
196		self.try_prune();
197	}
198
199	/// Finalize a particular block.
200	pub fn finalize(&mut self, block: NumberFor<B>) -> Result<(), Error> {
201		// Conclude voting round for this block.
202		self.active_rounds_mut()?.conclude(block);
203		// Prune any now "finalized" sessions from queue.
204		self.try_prune();
205		Ok(())
206	}
207
208	/// Return current pending mandatory block, if any, plus its active validator set.
209	pub fn mandatory_pending(&self) -> Option<(NumberFor<B>, ValidatorSet<AuthorityId>)> {
210		self.sessions.front().and_then(|round| {
211			if round.mandatory_done() {
212				None
213			} else {
214				Some((round.session_start(), round.validator_set().clone()))
215			}
216		})
217	}
218
219	/// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept.
220	pub fn accepted_interval(&self) -> Result<(NumberFor<B>, NumberFor<B>), Error> {
221		let rounds = self.sessions.front().ok_or(Error::UninitSession)?;
222
223		if rounds.mandatory_done() {
224			// There's only one session active and its mandatory is done.
225			// Accept any vote for a GRANDPA finalized block in a better round.
226			Ok((
227				rounds.session_start().max(self.best_beefy_block),
228				(*self.best_grandpa_block_header.number()),
229			))
230		} else {
231			// Current session has mandatory not done.
232			// Only accept votes for the mandatory block in the front of queue.
233			Ok((rounds.session_start(), rounds.session_start()))
234		}
235	}
236
237	/// Utility function to quickly decide what to do for each round.
238	pub fn triage_round(&self, round: NumberFor<B>) -> Result<RoundAction, Error> {
239		let (start, end) = self.accepted_interval()?;
240		if start <= round && round <= end {
241			Ok(RoundAction::Process)
242		} else if round > end {
243			Ok(RoundAction::Enqueue)
244		} else {
245			Ok(RoundAction::Drop)
246		}
247	}
248
249	/// Return `Some(number)` if we should be voting on block `number`,
250	/// return `None` if there is no block we should vote on.
251	pub fn voting_target(&self) -> Option<NumberFor<B>> {
252		let rounds = self.sessions.front().or_else(|| {
253			debug!(target: LOG_TARGET, "🥩 No voting round started");
254			None
255		})?;
256		let best_grandpa = *self.best_grandpa_block_header.number();
257		let best_beefy = self.best_beefy_block;
258
259		// `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`.
260		let target =
261			vote_target(best_grandpa, best_beefy, rounds.session_start(), self.min_block_delta);
262		trace!(
263			target: LOG_TARGET,
264			"🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}",
265			best_beefy,
266			best_grandpa,
267			target
268		);
269		target
270	}
271}
272
273/// BEEFY voter state persisted in aux DB.
274///
275/// Note: Any changes here should also bump aux-db schema version.
276#[derive(Debug, Decode, Encode, PartialEq)]
277pub(crate) struct PersistedState<B: Block, AuthorityId: AuthorityIdBound> {
278	/// Best block we voted on.
279	best_voted: NumberFor<B>,
280	/// Chooses which incoming votes to accept and which votes to generate.
281	/// Keeps track of voting seen for current and future rounds.
282	voting_oracle: VoterOracle<B, AuthorityId>,
283	/// Pallet-beefy genesis block - block number when BEEFY consensus started for this chain.
284	pallet_genesis: NumberFor<B>,
285}
286
287impl<B: Block, AuthorityId: AuthorityIdBound> PersistedState<B, AuthorityId> {
288	pub fn checked_new(
289		grandpa_header: <B as Block>::Header,
290		best_beefy: NumberFor<B>,
291		sessions: VecDeque<Rounds<B, AuthorityId>>,
292		min_block_delta: u32,
293		pallet_genesis: NumberFor<B>,
294	) -> Option<Self> {
295		VoterOracle::checked_new(sessions, min_block_delta, grandpa_header, best_beefy).map(
296			|voting_oracle| PersistedState {
297				best_voted: Zero::zero(),
298				voting_oracle,
299				pallet_genesis,
300			},
301		)
302	}
303
304	pub fn pallet_genesis(&self) -> NumberFor<B> {
305		self.pallet_genesis
306	}
307
308	pub(crate) fn set_min_block_delta(&mut self, min_block_delta: u32) {
309		self.voting_oracle.min_block_delta = min_block_delta.max(1);
310	}
311
312	pub fn best_beefy(&self) -> NumberFor<B> {
313		self.voting_oracle.best_beefy_block
314	}
315
316	pub(crate) fn set_best_beefy(&mut self, best_beefy: NumberFor<B>) {
317		self.voting_oracle.best_beefy_block = best_beefy;
318	}
319
320	pub(crate) fn set_best_grandpa(&mut self, best_grandpa: <B as Block>::Header) {
321		self.voting_oracle.best_grandpa_block_header = best_grandpa;
322	}
323
324	pub fn voting_oracle(&self) -> &VoterOracle<B, AuthorityId> {
325		&self.voting_oracle
326	}
327
328	pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B, AuthorityId>, Error> {
329		let (start, end) = self.voting_oracle.accepted_interval()?;
330		let validator_set = self.voting_oracle.current_validator_set()?;
331		Ok(GossipFilterCfg { start, end, validator_set })
332	}
333
334	/// Handle session changes by starting new voting round for mandatory blocks.
335	pub fn init_session_at(
336		&mut self,
337		new_session_start: NumberFor<B>,
338		validator_set: ValidatorSet<AuthorityId>,
339		key_store: &BeefyKeystore<AuthorityId>,
340		metrics: &Option<VoterMetrics>,
341		is_authority: bool,
342	) {
343		debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
344
345		// BEEFY should finalize a mandatory block during each session.
346		if let Ok(active_session) = self.voting_oracle.active_rounds() {
347			if !active_session.mandatory_done() {
348				debug!(
349					target: LOG_TARGET,
350					"🥩 New session {} while active session {} is still lagging.",
351					validator_set.id(),
352					active_session.validator_set_id(),
353				);
354				metric_inc!(metrics, beefy_lagging_sessions);
355			}
356		}
357
358		// verify we have some BEEFY key available in keystore when role is authority.
359		if is_authority && key_store.public_keys().map_or(false, |k| k.is_empty()) {
360			error!(
361				target: LOG_TARGET,
362				"🥩 for session starting at block {:?} no BEEFY authority key found in store, \
363				you must generate valid session keys \
364				(https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#generating-the-session-keys)",
365				new_session_start,
366			);
367			metric_inc!(metrics, beefy_no_authority_found_in_store);
368		}
369
370		let id = validator_set.id();
371		self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set));
372		metric_set!(metrics, beefy_validator_set_id, id);
373		info!(
374			target: LOG_TARGET,
375			"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
376			id,
377			new_session_start
378		);
379	}
380}
381
382/// A BEEFY worker/voter that follows the BEEFY protocol
383pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S, N, AuthorityId: AuthorityIdBound> {
384	// utilities
385	pub backend: Arc<BE>,
386	pub runtime: Arc<RuntimeApi>,
387	pub key_store: Arc<BeefyKeystore<AuthorityId>>,
388	pub payload_provider: P,
389	pub sync: Arc<S>,
390	pub fisherman: Arc<Fisherman<B, BE, RuntimeApi, AuthorityId>>,
391
392	// communication (created once, but returned and reused if worker is restarted/reinitialized)
393	pub comms: BeefyComms<B, N, AuthorityId>,
394
395	// channels
396	/// Links between the block importer, the background voter and the RPC layer.
397	pub links: BeefyVoterLinks<B, AuthorityId>,
398
399	// voter state
400	/// Buffer holding justifications for future processing.
401	pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B, AuthorityId>>,
402	/// Persisted voter state.
403	pub persisted_state: PersistedState<B, AuthorityId>,
404	/// BEEFY voter metrics
405	pub metrics: Option<VoterMetrics>,
406	/// Node runs under "Authority" role.
407	pub is_authority: bool,
408}
409
410impl<B, BE, P, R, S, N, AuthorityId> BeefyWorker<B, BE, P, R, S, N, AuthorityId>
411where
412	B: Block + Codec,
413	BE: Backend<B>,
414	P: PayloadProvider<B>,
415	S: SyncOracle,
416	R: ProvideRuntimeApi<B>,
417	R::Api: BeefyApi<B, AuthorityId>,
418	AuthorityId: AuthorityIdBound,
419{
420	fn best_grandpa_block(&self) -> NumberFor<B> {
421		*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
422	}
423
424	fn voting_oracle(&self) -> &VoterOracle<B, AuthorityId> {
425		&self.persisted_state.voting_oracle
426	}
427
428	#[cfg(test)]
429	fn active_rounds(&mut self) -> Result<&Rounds<B, AuthorityId>, Error> {
430		self.persisted_state.voting_oracle.active_rounds()
431	}
432
433	/// Handle session changes by starting new voting round for mandatory blocks.
434	fn init_session_at(
435		&mut self,
436		validator_set: ValidatorSet<AuthorityId>,
437		new_session_start: NumberFor<B>,
438	) {
439		self.persisted_state.init_session_at(
440			new_session_start,
441			validator_set,
442			&self.key_store,
443			&self.metrics,
444			self.is_authority,
445		);
446	}
447
448	fn handle_finality_notification(
449		&mut self,
450		notification: &UnpinnedFinalityNotification<B>,
451	) -> Result<(), Error> {
452		let header = &notification.header;
453		debug!(
454			target: LOG_TARGET,
455			"🥩 Finality notification: header(number {:?}, hash {:?}) tree_route {:?}",
456			header.number(),
457			notification.hash,
458			notification.tree_route,
459		);
460
461		match self.runtime.runtime_api().beefy_genesis(notification.hash) {
462			Ok(Some(genesis)) if genesis != self.persisted_state.pallet_genesis => {
463				debug!(target: LOG_TARGET, "🥩 ConsensusReset detected. Expected genesis: {}, found genesis: {}", self.persisted_state.pallet_genesis, genesis);
464				return Err(Error::ConsensusReset)
465			},
466			Ok(_) => {},
467			Err(api_error) => {
468				// This can happen in case the block was already pruned.
469				// Mostly after warp sync when finality notifications are piled up.
470				debug!(target: LOG_TARGET, "🥩 Unable to check beefy genesis: {}", api_error);
471			},
472		}
473
474		let mut new_session_added = false;
475		if *header.number() > self.best_grandpa_block() {
476			// update best GRANDPA finalized block we have seen
477			self.persisted_state.set_best_grandpa(header.clone());
478
479			// Check all (newly) finalized blocks for new session(s).
480			let backend = self.backend.clone();
481			for header in notification
482				.tree_route
483				.iter()
484				.map(|hash| {
485					backend
486						.blockchain()
487						.expect_header(*hash)
488						.expect("just finalized block should be available; qed.")
489				})
490				.chain(std::iter::once(header.clone()))
491			{
492				if let Some(new_validator_set) = find_authorities_change::<B, AuthorityId>(&header)
493				{
494					self.init_session_at(new_validator_set, *header.number());
495					new_session_added = true;
496				}
497			}
498
499			if new_session_added {
500				crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
501					.map_err(|e| Error::Backend(e.to_string()))?;
502			}
503
504			// Update gossip validator votes filter.
505			if let Err(e) = self
506				.persisted_state
507				.gossip_filter_config()
508				.map(|filter| self.comms.gossip_validator.update_filter(filter))
509			{
510				error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
511			}
512		}
513
514		Ok(())
515	}
516
517	/// Based on [VoterOracle] this vote is either processed here or discarded.
518	fn triage_incoming_vote(
519		&mut self,
520		vote: VoteMessage<NumberFor<B>, AuthorityId, <AuthorityId as RuntimeAppPublic>::Signature>,
521	) -> Result<(), Error>
522	where
523		<AuthorityId as RuntimeAppPublic>::Signature: Encode + Decode,
524	{
525		let block_num = vote.commitment.block_number;
526		match self.voting_oracle().triage_round(block_num)? {
527			RoundAction::Process =>
528				if let Some(finality_proof) = self.handle_vote(vote)? {
529					let gossip_proof =
530						GossipMessage::<B, AuthorityId>::FinalityProof(finality_proof);
531					let encoded_proof = gossip_proof.encode();
532					self.comms.gossip_engine.gossip_message(
533						proofs_topic::<B>(),
534						encoded_proof,
535						true,
536					);
537				},
538			RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes),
539			RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
540		};
541		Ok(())
542	}
543
544	/// Based on [VoterOracle] this justification is either processed here or enqueued for later.
545	///
546	/// Expects `justification` to be valid.
547	fn triage_incoming_justif(
548		&mut self,
549		justification: BeefyVersionedFinalityProof<B, AuthorityId>,
550	) -> Result<(), Error> {
551		let signed_commitment = match justification {
552			VersionedFinalityProof::V1(ref sc) => sc,
553		};
554		let block_num = signed_commitment.commitment.block_number;
555		match self.voting_oracle().triage_round(block_num)? {
556			RoundAction::Process => {
557				debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
558				metric_inc!(self.metrics, beefy_imported_justifications);
559				self.finalize(justification)?
560			},
561			RoundAction::Enqueue => {
562				debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
563				if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
564					self.pending_justifications.entry(block_num).or_insert(justification);
565					metric_inc!(self.metrics, beefy_buffered_justifications);
566				} else {
567					metric_inc!(self.metrics, beefy_buffered_justifications_dropped);
568					warn!(
569						target: LOG_TARGET,
570						"🥩 Buffer justification dropped for round: {:?}.", block_num
571					);
572				}
573			},
574			RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications),
575		};
576		Ok(())
577	}
578
579	fn handle_vote(
580		&mut self,
581		vote: VoteMessage<NumberFor<B>, AuthorityId, <AuthorityId as RuntimeAppPublic>::Signature>,
582	) -> Result<Option<BeefyVersionedFinalityProof<B, AuthorityId>>, Error> {
583		let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
584
585		let block_number = vote.commitment.block_number;
586		match rounds.add_vote(vote) {
587			VoteImportResult::RoundConcluded(signed_commitment) => {
588				let finality_proof = VersionedFinalityProof::V1(signed_commitment);
589				debug!(
590					target: LOG_TARGET,
591					"🥩 Round #{} concluded, finality_proof: {:?}.", block_number, finality_proof
592				);
593				// We created the `finality_proof` and know to be valid.
594				// New state is persisted after finalization.
595				self.finalize(finality_proof.clone())?;
596				metric_inc!(self.metrics, beefy_good_votes_processed);
597				return Ok(Some(finality_proof));
598			},
599			VoteImportResult::Ok => {
600				// Persist state after handling mandatory block vote.
601				if self
602					.voting_oracle()
603					.mandatory_pending()
604					.map(|(mandatory_num, _)| mandatory_num == block_number)
605					.unwrap_or(false)
606				{
607					crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
608						.map_err(|e| Error::Backend(e.to_string()))?;
609				}
610				metric_inc!(self.metrics, beefy_good_votes_processed);
611			},
612			VoteImportResult::DoubleVoting(proof) => {
613				metric_inc!(self.metrics, beefy_equivocation_votes);
614				self.report_double_voting(proof)?;
615			},
616			VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes),
617			VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes),
618		};
619		Ok(None)
620	}
621
622	/// Provide BEEFY finality for block based on `finality_proof`:
623	/// 1. Prune now-irrelevant past sessions from the oracle,
624	/// 2. Set BEEFY best block,
625	/// 3. Persist voter state,
626	/// 4. Send best block hash and `finality_proof` to RPC worker.
627	///
628	/// Expects `finality proof` to be valid and for a block > current-best-beefy.
629	fn finalize(
630		&mut self,
631		finality_proof: BeefyVersionedFinalityProof<B, AuthorityId>,
632	) -> Result<(), Error> {
633		let block_num = match finality_proof {
634			VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number,
635		};
636
637		if block_num <= self.persisted_state.voting_oracle.best_beefy_block {
638			// we've already finalized this round before, short-circuit.
639			return Ok(());
640		}
641
642		// Finalize inner round and update voting_oracle state.
643		self.persisted_state.voting_oracle.finalize(block_num)?;
644
645		// Set new best BEEFY block number.
646		self.persisted_state.set_best_beefy(block_num);
647		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
648			.map_err(|e| Error::Backend(e.to_string()))?;
649
650		metric_set!(self.metrics, beefy_best_block, block_num);
651
652		self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
653
654		if let Err(e) = self
655			.backend
656			.blockchain()
657			.expect_block_hash_from_id(&BlockId::Number(block_num))
658			.and_then(|hash| {
659				self.links
660					.to_rpc_best_block_sender
661					.notify(|| Ok::<_, ()>(hash))
662					.expect("forwards closure result; the closure always returns Ok; qed.");
663
664				self.backend
665					.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
666			}) {
667			debug!(
668				target: LOG_TARGET,
669				"🥩 Error {:?} on appending justification: {:?}", e, finality_proof
670			);
671		}
672
673		self.links
674			.to_rpc_justif_sender
675			.notify(|| Ok::<_, ()>(finality_proof))
676			.expect("forwards closure result; the closure always returns Ok; qed.");
677
678		// Update gossip validator votes filter.
679		self.persisted_state
680			.gossip_filter_config()
681			.map(|filter| self.comms.gossip_validator.update_filter(filter))?;
682		Ok(())
683	}
684
685	/// Handle previously buffered justifications, that now land in the voting interval.
686	fn try_pending_justifications(&mut self) -> Result<(), Error> {
687		// Interval of blocks for which we can process justifications and votes right now.
688		let (start, end) = self.voting_oracle().accepted_interval()?;
689		// Process pending justifications.
690		if !self.pending_justifications.is_empty() {
691			// These are still pending.
692			let still_pending =
693				self.pending_justifications.split_off(&end.saturating_add(1u32.into()));
694			// These can be processed.
695			let justifs_to_process = self.pending_justifications.split_off(&start);
696			// The rest can be dropped.
697			self.pending_justifications = still_pending;
698
699			for (num, justification) in justifs_to_process.into_iter() {
700				debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
701				metric_inc!(self.metrics, beefy_imported_justifications);
702				if let Err(err) = self.finalize(justification) {
703					error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
704				}
705			}
706			metric_set!(
707				self.metrics,
708				beefy_buffered_justifications,
709				self.pending_justifications.len()
710			);
711		}
712		Ok(())
713	}
714
715	/// Decide if should vote, then vote.. or don't..
716	fn try_to_vote(&mut self) -> Result<(), Error> {
717		// Vote if there's now a new vote target.
718		if let Some(target) = self.voting_oracle().voting_target() {
719			metric_set!(self.metrics, beefy_should_vote_on, target);
720			if target > self.persisted_state.best_voted {
721				self.do_vote(target)?;
722			}
723		}
724		Ok(())
725	}
726
727	/// Create and gossip Signed Commitment for block number `target_number`.
728	///
729	/// Also handle this self vote by calling `self.handle_vote()` for it.
730	fn do_vote(&mut self, target_number: NumberFor<B>) -> Result<(), Error> {
731		debug!(target: LOG_TARGET, "🥩 Try voting on {}", target_number);
732
733		// Most of the time we get here, `target` is actually `best_grandpa`,
734		// avoid getting header from backend in that case.
735		let target_header = if target_number == self.best_grandpa_block() {
736			self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
737		} else {
738			let hash = self
739				.backend
740				.blockchain()
741				.expect_block_hash_from_id(&BlockId::Number(target_number))
742				.map_err(|err| {
743					let err_msg = format!(
744						"Couldn't get hash for block #{:?} (error: {:?}), skipping vote..",
745						target_number, err
746					);
747					Error::Backend(err_msg)
748				})?;
749
750			self.backend.blockchain().expect_header(hash).map_err(|err| {
751				let err_msg = format!(
752					"Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
753					target_number, hash, err
754				);
755				Error::Backend(err_msg)
756			})?
757		};
758		let target_hash = target_header.hash();
759
760		let payload = if let Some(hash) = self.payload_provider.payload(&target_header) {
761			hash
762		} else {
763			warn!(target: LOG_TARGET, "🥩 No MMR root digest found for: {:?}", target_hash);
764			return Ok(());
765		};
766
767		let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
768		let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
769
770		let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
771			debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
772			id
773		} else {
774			debug!(
775				target: LOG_TARGET,
776				"🥩 Missing validator id - can't vote for: {:?}", target_hash
777			);
778			return Ok(());
779		};
780
781		let commitment = Commitment { payload, block_number: target_number, validator_set_id };
782		let encoded_commitment = commitment.encode();
783
784		let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
785			Ok(sig) => sig,
786			Err(err) => {
787				warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
788				return Ok(());
789			},
790		};
791
792		trace!(
793			target: LOG_TARGET,
794			"🥩 Produced signature using {:?}, is_valid: {:?}",
795			authority_id,
796			BeefyKeystore::verify(&authority_id, &signature, &encoded_commitment)
797		);
798
799		let vote = VoteMessage { commitment, id: authority_id, signature };
800		if let Some(finality_proof) = self.handle_vote(vote.clone()).map_err(|err| {
801			error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err);
802			err
803		})? {
804			let encoded_proof =
805				GossipMessage::<B, AuthorityId>::FinalityProof(finality_proof).encode();
806			self.comms
807				.gossip_engine
808				.gossip_message(proofs_topic::<B>(), encoded_proof, true);
809		} else {
810			metric_inc!(self.metrics, beefy_votes_sent);
811			debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
812			let encoded_vote = GossipMessage::<B, AuthorityId>::Vote(vote).encode();
813			self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
814		}
815
816		// Persist state after vote to avoid double voting in case of voter restarts.
817		self.persisted_state.best_voted = target_number;
818		metric_set!(self.metrics, beefy_best_voted, target_number);
819		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
820			.map_err(|e| Error::Backend(e.to_string()))
821	}
822
823	fn process_new_state(&mut self) {
824		// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
825		if let Err(err) = self.try_pending_justifications() {
826			debug!(target: LOG_TARGET, "🥩 {}", err);
827		}
828
829		// Don't bother voting or requesting justifications during major sync.
830		if !self.sync.is_major_syncing() {
831			// There were external events, 'state' is changed, author a vote if needed/possible.
832			if let Err(err) = self.try_to_vote() {
833				debug!(target: LOG_TARGET, "🥩 {}", err);
834			}
835			// If the current target is a mandatory block,
836			// make sure there's also an on-demand justification request out for it.
837			if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
838				// This only starts new request if there isn't already an active one.
839				self.comms.on_demand_justifications.request(block, active);
840			}
841		}
842	}
843
844	/// Main loop for BEEFY worker.
845	///
846	/// Run the main async loop which is driven by finality notifications and gossiped votes.
847	/// Should never end, returns `Error` otherwise.
848	pub(crate) async fn run(
849		mut self,
850		block_import_justif: &mut Fuse<
851			NotificationReceiver<BeefyVersionedFinalityProof<B, AuthorityId>>,
852		>,
853		finality_notifications: &mut Fuse<crate::FinalityNotifications<B>>,
854	) -> (Error, BeefyComms<B, N, AuthorityId>) {
855		info!(
856			target: LOG_TARGET,
857			"🥩 run BEEFY worker, best grandpa: #{:?}.",
858			self.best_grandpa_block()
859		);
860
861		let mut votes = Box::pin(
862			self.comms
863				.gossip_engine
864				.messages_for(votes_topic::<B>())
865				.filter_map(|notification| async move {
866					let vote =
867						GossipMessage::<B, AuthorityId>::decode_all(&mut &notification.message[..])
868							.ok()
869							.and_then(|message| message.unwrap_vote());
870					trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote);
871					vote
872				})
873				.fuse(),
874		);
875		let mut gossip_proofs = Box::pin(
876			self.comms
877				.gossip_engine
878				.messages_for(proofs_topic::<B>())
879				.filter_map(|notification| async move {
880					let proof =
881						GossipMessage::<B, AuthorityId>::decode_all(&mut &notification.message[..])
882							.ok()
883							.and_then(|message| message.unwrap_finality_proof());
884					trace!(target: LOG_TARGET, "🥩 Got gossip proof message: {:?}", proof);
885					proof
886				})
887				.fuse(),
888		);
889
890		self.process_new_state();
891		let error = loop {
892			// Mutable reference used to drive the gossip engine.
893			let mut gossip_engine = &mut self.comms.gossip_engine;
894
895			// Wait for, and handle external events.
896			// The branches below only change 'state', actual voting happens afterwards,
897			// based on the new resulting 'state'.
898			futures::select_biased! {
899				// Use `select_biased!` to prioritize order below.
900				// Process finality notifications first since these drive the voter.
901				notification = finality_notifications.next() => {
902					if let Some(notif) = notification {
903						if let Err(err) = self.handle_finality_notification(&notif) {
904							break err;
905						}
906					} else {
907						break Error::FinalityStreamTerminated;
908					}
909				},
910				// Make sure to pump gossip engine.
911				_ = gossip_engine => {
912					break Error::GossipEngineTerminated;
913				},
914				// Process incoming justifications as these can make some in-flight votes obsolete.
915				response_info = self.comms.on_demand_justifications.next().fuse() => {
916					match response_info {
917						ResponseInfo::ValidProof(justif, peer_report) => {
918							if let Err(err) = self.triage_incoming_justif(justif) {
919								debug!(target: LOG_TARGET, "🥩 {}", err);
920							}
921							self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit);
922						},
923						ResponseInfo::PeerReport(peer_report) => {
924							self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit);
925						},
926						ResponseInfo::Pending => {},
927					}
928				},
929				justif = block_import_justif.next() => {
930					if let Some(justif) = justif {
931						// Block import justifications have already been verified to be valid
932						// by `BeefyBlockImport`.
933						if let Err(err) = self.triage_incoming_justif(justif) {
934							debug!(target: LOG_TARGET, "🥩 {}", err);
935						}
936					} else {
937						break Error::BlockImportStreamTerminated;
938					}
939				},
940				justif = gossip_proofs.next() => {
941					if let Some(justif) = justif {
942						// Gossiped justifications have already been verified by `GossipValidator`.
943						if let Err(err) = self.triage_incoming_justif(justif) {
944							debug!(target: LOG_TARGET, "🥩 {}", err);
945						}
946					} else {
947						break Error::FinalityProofGossipStreamTerminated;
948					}
949				},
950				// Finally process incoming votes.
951				vote = votes.next() => {
952					if let Some(vote) = vote {
953						// Votes have already been verified to be valid by the gossip validator.
954						if let Err(err) = self.triage_incoming_vote(vote) {
955							debug!(target: LOG_TARGET, "🥩 {}", err);
956						}
957					} else {
958						break Error::VotesGossipStreamTerminated;
959					}
960				},
961			}
962
963			// Act on changed 'state'.
964			self.process_new_state();
965		};
966
967		// return error _and_ `comms` that can be reused
968		(error, self.comms)
969	}
970
971	/// Report the given equivocation to the BEEFY runtime module.
972	fn report_double_voting(
973		&self,
974		proof: DoubleVotingProof<
975			NumberFor<B>,
976			AuthorityId,
977			<AuthorityId as RuntimeAppPublic>::Signature,
978		>,
979	) -> Result<(), Error> {
980		let rounds = self.persisted_state.voting_oracle.active_rounds()?;
981		self.fisherman.report_double_voting(proof, rounds)
982	}
983}
984
985/// Calculate next block number to vote on.
986///
987/// Return `None` if there is no votable target yet.
988fn vote_target<N>(best_grandpa: N, best_beefy: N, session_start: N, min_delta: u32) -> Option<N>
989where
990	N: AtLeast32Bit + Copy + Debug,
991{
992	// if the mandatory block (session_start) does not have a beefy justification yet,
993	// we vote on it
994	let target = if best_beefy < session_start {
995		debug!(target: LOG_TARGET, "🥩 vote target - mandatory block: #{:?}", session_start);
996		session_start
997	} else {
998		let diff = best_grandpa.saturating_sub(best_beefy) + 1u32.into();
999		let diff = diff.saturated_into::<u32>() / 2;
1000		let target = best_beefy + min_delta.max(diff.next_power_of_two()).into();
1001		trace!(
1002			target: LOG_TARGET,
1003			"🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}",
1004			diff,
1005			diff.next_power_of_two(),
1006			target,
1007		);
1008
1009		target
1010	};
1011
1012	// Don't vote for targets until they've been finalized
1013	// (`target` can be > `best_grandpa` when `min_delta` is big enough).
1014	if target > best_grandpa {
1015		None
1016	} else {
1017		Some(target)
1018	}
1019}
1020
1021#[cfg(test)]
1022pub(crate) mod tests {
1023	use super::*;
1024	use crate::{
1025		communication::{
1026			gossip::{tests::TestNetwork, GossipValidator},
1027			notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
1028			request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
1029		},
1030		tests::{
1031			create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet,
1032			TestApi,
1033		},
1034		BeefyRPCLinks, KnownPeers,
1035	};
1036	use futures::{future::poll_fn, task::Poll};
1037	use parking_lot::Mutex;
1038	use sc_client_api::{Backend as BackendT, HeaderBackend};
1039	use sc_network_gossip::GossipEngine;
1040	use sc_network_sync::SyncingService;
1041	use sc_network_test::TestNetFactory;
1042	use sp_blockchain::Backend as BlockchainBackendT;
1043	use sp_consensus_beefy::{
1044		ecdsa_crypto, known_payloads,
1045		known_payloads::MMR_ROOT_ID,
1046		mmr::MmrRootProvider,
1047		test_utils::{generate_double_voting_proof, Keyring},
1048		ConsensusLog, Payload, SignedCommitment,
1049	};
1050	use sp_runtime::traits::{Header as HeaderT, One};
1051	use substrate_test_runtime_client::{
1052		runtime::{Block, Digest, DigestItem, Header},
1053		Backend,
1054	};
1055
1056	impl<B: super::Block, AuthorityId: AuthorityIdBound> PersistedState<B, AuthorityId> {
1057		pub fn active_round(&self) -> Result<&Rounds<B, AuthorityId>, Error> {
1058			self.voting_oracle.active_rounds()
1059		}
1060
1061		pub fn best_grandpa_number(&self) -> NumberFor<B> {
1062			*self.voting_oracle.best_grandpa_block_header.number()
1063		}
1064	}
1065
1066	impl<B: super::Block> VoterOracle<B, ecdsa_crypto::AuthorityId> {
1067		pub fn sessions(&self) -> &VecDeque<Rounds<B, ecdsa_crypto::AuthorityId>> {
1068			&self.sessions
1069		}
1070	}
1071
1072	fn create_beefy_worker(
1073		peer: &mut BeefyPeer,
1074		key: &Keyring<ecdsa_crypto::AuthorityId>,
1075		min_block_delta: u32,
1076		genesis_validator_set: ValidatorSet<ecdsa_crypto::AuthorityId>,
1077	) -> BeefyWorker<
1078		Block,
1079		Backend,
1080		MmrRootProvider<Block, TestApi>,
1081		TestApi,
1082		Arc<SyncingService<Block>>,
1083		TestNetwork,
1084		ecdsa_crypto::AuthorityId,
1085	> {
1086		let keystore = create_beefy_keystore(key);
1087
1088		let (to_rpc_justif_sender, from_voter_justif_stream) =
1089			BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
1090		let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
1091			BeefyBestBlockStream::<Block>::channel();
1092		let (_, from_block_import_justif_stream) =
1093			BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
1094
1095		let beefy_rpc_links =
1096			BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream };
1097		*peer.data.beefy_rpc_links.lock() = Some(beefy_rpc_links);
1098
1099		let links = BeefyVoterLinks {
1100			from_block_import_justif_stream,
1101			to_rpc_justif_sender,
1102			to_rpc_best_block_sender,
1103		};
1104
1105		let backend = peer.client().as_backend();
1106		let beefy_genesis = 1;
1107		let api = Arc::new(TestApi::with_validator_set(&genesis_validator_set));
1108		let network = peer.network_service().clone();
1109		let sync = peer.sync_service().clone();
1110		let notification_service = peer
1111			.take_notification_service(&crate::tests::beefy_gossip_proto_name())
1112			.unwrap();
1113		let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
1114		let gossip_validator =
1115			GossipValidator::new(known_peers.clone(), Arc::new(TestNetwork::new().0));
1116		let gossip_validator = Arc::new(gossip_validator);
1117		let gossip_engine = GossipEngine::new(
1118			network.clone(),
1119			sync.clone(),
1120			notification_service,
1121			"/beefy/1",
1122			gossip_validator.clone(),
1123			None,
1124		);
1125		let metrics = None;
1126		let on_demand_justifications = OnDemandJustificationsEngine::new(
1127			network.clone(),
1128			"/beefy/justifs/1".into(),
1129			known_peers,
1130			None,
1131		);
1132		// Push 1 block - will start first session.
1133		let hashes = peer.push_blocks(1, false);
1134		backend.finalize_block(hashes[0], None).unwrap();
1135		let first_header = backend
1136			.blockchain()
1137			.expect_header(backend.blockchain().info().best_hash)
1138			.unwrap();
1139		let persisted_state = PersistedState::checked_new(
1140			first_header,
1141			Zero::zero(),
1142			vec![Rounds::new(One::one(), genesis_validator_set)].into(),
1143			min_block_delta,
1144			beefy_genesis,
1145		)
1146		.unwrap();
1147		let payload_provider = MmrRootProvider::new(api.clone());
1148		let comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications };
1149		let key_store: Arc<BeefyKeystore<ecdsa_crypto::AuthorityId>> =
1150			Arc::new(Some(keystore).into());
1151		BeefyWorker {
1152			backend: backend.clone(),
1153			runtime: api.clone(),
1154			key_store: key_store.clone(),
1155			metrics,
1156			payload_provider,
1157			sync: Arc::new(sync),
1158			fisherman: Arc::new(Fisherman::new(backend, api, key_store)),
1159			links,
1160			comms,
1161			pending_justifications: BTreeMap::new(),
1162			persisted_state,
1163			is_authority: true,
1164		}
1165	}
1166
1167	#[test]
1168	fn vote_on_min_block_delta() {
1169		let t = vote_target(1u32, 1, 1, 4);
1170		assert_eq!(None, t);
1171		let t = vote_target(2u32, 1, 1, 4);
1172		assert_eq!(None, t);
1173		let t = vote_target(4u32, 2, 1, 4);
1174		assert_eq!(None, t);
1175		let t = vote_target(6u32, 2, 1, 4);
1176		assert_eq!(Some(6), t);
1177
1178		let t = vote_target(9u32, 4, 1, 4);
1179		assert_eq!(Some(8), t);
1180
1181		let t = vote_target(10u32, 10, 1, 8);
1182		assert_eq!(None, t);
1183		let t = vote_target(12u32, 10, 1, 8);
1184		assert_eq!(None, t);
1185		let t = vote_target(18u32, 10, 1, 8);
1186		assert_eq!(Some(18), t);
1187	}
1188
1189	#[test]
1190	fn vote_on_power_of_two() {
1191		let t = vote_target(1008u32, 1000, 1, 4);
1192		assert_eq!(Some(1004), t);
1193
1194		let t = vote_target(1016u32, 1000, 1, 4);
1195		assert_eq!(Some(1008), t);
1196
1197		let t = vote_target(1032u32, 1000, 1, 4);
1198		assert_eq!(Some(1016), t);
1199
1200		let t = vote_target(1064u32, 1000, 1, 4);
1201		assert_eq!(Some(1032), t);
1202
1203		let t = vote_target(1128u32, 1000, 1, 4);
1204		assert_eq!(Some(1064), t);
1205
1206		let t = vote_target(1256u32, 1000, 1, 4);
1207		assert_eq!(Some(1128), t);
1208
1209		let t = vote_target(1512u32, 1000, 1, 4);
1210		assert_eq!(Some(1256), t);
1211
1212		let t = vote_target(1024u32, 1, 1, 4);
1213		assert_eq!(Some(513), t);
1214	}
1215
1216	#[test]
1217	fn vote_on_target_block() {
1218		let t = vote_target(1008u32, 1002, 1, 4);
1219		assert_eq!(Some(1006), t);
1220		let t = vote_target(1010u32, 1002, 1, 4);
1221		assert_eq!(Some(1006), t);
1222
1223		let t = vote_target(1016u32, 1006, 1, 4);
1224		assert_eq!(Some(1014), t);
1225		let t = vote_target(1022u32, 1006, 1, 4);
1226		assert_eq!(Some(1014), t);
1227
1228		let t = vote_target(1032u32, 1012, 1, 4);
1229		assert_eq!(Some(1028), t);
1230		let t = vote_target(1044u32, 1012, 1, 4);
1231		assert_eq!(Some(1028), t);
1232
1233		let t = vote_target(1064u32, 1014, 1, 4);
1234		assert_eq!(Some(1046), t);
1235		let t = vote_target(1078u32, 1014, 1, 4);
1236		assert_eq!(Some(1046), t);
1237
1238		let t = vote_target(1128u32, 1008, 1, 4);
1239		assert_eq!(Some(1072), t);
1240		let t = vote_target(1136u32, 1008, 1, 4);
1241		assert_eq!(Some(1072), t);
1242	}
1243
1244	#[test]
1245	fn vote_on_mandatory_block() {
1246		let t = vote_target(1008u32, 1002, 1004, 4);
1247		assert_eq!(Some(1004), t);
1248		let t = vote_target(1016u32, 1006, 1007, 4);
1249		assert_eq!(Some(1007), t);
1250		let t = vote_target(1064u32, 1014, 1063, 4);
1251		assert_eq!(Some(1063), t);
1252		let t = vote_target(1320u32, 1012, 1234, 4);
1253		assert_eq!(Some(1234), t);
1254
1255		let t = vote_target(1128u32, 1008, 1008, 4);
1256		assert_eq!(Some(1072), t);
1257	}
1258
1259	#[test]
1260	fn should_vote_target() {
1261		let header = Header::new(
1262			1u32.into(),
1263			Default::default(),
1264			Default::default(),
1265			Default::default(),
1266			Digest::default(),
1267		);
1268		let mut oracle = VoterOracle::<Block, ecdsa_crypto::AuthorityId> {
1269			best_beefy_block: 0,
1270			best_grandpa_block_header: header,
1271			min_block_delta: 1,
1272			sessions: VecDeque::new(),
1273			_phantom: PhantomData,
1274		};
1275		let voting_target_with = |oracle: &mut VoterOracle<Block, ecdsa_crypto::AuthorityId>,
1276		                          best_beefy: NumberFor<Block>,
1277		                          best_grandpa: NumberFor<Block>|
1278		 -> Option<NumberFor<Block>> {
1279			oracle.best_beefy_block = best_beefy;
1280			oracle.best_grandpa_block_header.number = best_grandpa;
1281			oracle.voting_target()
1282		};
1283
1284		// rounds not initialized -> should vote: `None`
1285		assert_eq!(voting_target_with(&mut oracle, 0, 1), None);
1286
1287		let keys = &[Keyring::Alice];
1288		let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1289
1290		oracle.add_session(Rounds::new(1, validator_set.clone()));
1291
1292		// under min delta
1293		oracle.min_block_delta = 4;
1294		assert_eq!(voting_target_with(&mut oracle, 1, 1), None);
1295		assert_eq!(voting_target_with(&mut oracle, 2, 5), None);
1296
1297		// vote on min delta
1298		assert_eq!(voting_target_with(&mut oracle, 4, 9), Some(8));
1299		oracle.min_block_delta = 8;
1300		assert_eq!(voting_target_with(&mut oracle, 10, 18), Some(18));
1301
1302		// vote on power of two
1303		oracle.min_block_delta = 1;
1304		assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1004));
1305		assert_eq!(voting_target_with(&mut oracle, 1000, 1016), Some(1008));
1306
1307		// nothing new to vote on
1308		assert_eq!(voting_target_with(&mut oracle, 1000, 1000), None);
1309
1310		// vote on mandatory
1311		oracle.sessions.clear();
1312		oracle.add_session(Rounds::new(1000, validator_set.clone()));
1313		assert_eq!(voting_target_with(&mut oracle, 0, 1008), Some(1000));
1314		oracle.sessions.clear();
1315		oracle.add_session(Rounds::new(1001, validator_set.clone()));
1316		assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1001));
1317	}
1318
1319	#[test]
1320	fn test_oracle_accepted_interval() {
1321		let keys = &[Keyring::Alice];
1322		let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1323
1324		let header = Header::new(
1325			1u32.into(),
1326			Default::default(),
1327			Default::default(),
1328			Default::default(),
1329			Digest::default(),
1330		);
1331		let mut oracle = VoterOracle::<Block, ecdsa_crypto::AuthorityId> {
1332			best_beefy_block: 0,
1333			best_grandpa_block_header: header,
1334			min_block_delta: 1,
1335			sessions: VecDeque::new(),
1336			_phantom: PhantomData,
1337		};
1338		let accepted_interval_with =
1339			|oracle: &mut VoterOracle<Block, ecdsa_crypto::AuthorityId>,
1340			 best_grandpa: NumberFor<Block>|
1341			 -> Result<(NumberFor<Block>, NumberFor<Block>), Error> {
1342				oracle.best_grandpa_block_header.number = best_grandpa;
1343				oracle.accepted_interval()
1344			};
1345
1346		// rounds not initialized -> should accept votes: `None`
1347		assert!(accepted_interval_with(&mut oracle, 1).is_err());
1348
1349		let session_one = 1;
1350		oracle.add_session(Rounds::new(session_one, validator_set.clone()));
1351		// mandatory not done, only accept mandatory
1352		for i in 0..15 {
1353			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one)));
1354		}
1355
1356		// add more sessions, nothing changes
1357		let session_two = 11;
1358		let session_three = 21;
1359		oracle.add_session(Rounds::new(session_two, validator_set.clone()));
1360		oracle.add_session(Rounds::new(session_three, validator_set.clone()));
1361		// mandatory not done, should accept mandatory for session_one
1362		for i in session_three..session_three + 15 {
1363			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one)));
1364		}
1365
1366		// simulate finish mandatory for session one, prune oracle
1367		oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1368		oracle.try_prune();
1369		// session_one pruned, should accept mandatory for session_two
1370		for i in session_three..session_three + 15 {
1371			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_two, session_two)));
1372		}
1373
1374		// simulate finish mandatory for session two, prune oracle
1375		oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1376		oracle.try_prune();
1377		// session_two pruned, should accept mandatory for session_three
1378		for i in session_three..session_three + 15 {
1379			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, session_three)));
1380		}
1381
1382		// simulate finish mandatory for session three
1383		oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true);
1384		// verify all other blocks in this session are now open to voting
1385		for i in session_three..session_three + 15 {
1386			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i)));
1387		}
1388		// pruning does nothing in this case
1389		oracle.try_prune();
1390		for i in session_three..session_three + 15 {
1391			assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i)));
1392		}
1393
1394		// adding new session automatically prunes "finalized" previous session
1395		let session_four = 31;
1396		oracle.add_session(Rounds::new(session_four, validator_set.clone()));
1397		assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four);
1398		assert_eq!(
1399			accepted_interval_with(&mut oracle, session_four + 10),
1400			Ok((session_four, session_four))
1401		);
1402	}
1403
1404	#[test]
1405	fn extract_authorities_change_digest() {
1406		let mut header = Header::new(
1407			1u32.into(),
1408			Default::default(),
1409			Default::default(),
1410			Default::default(),
1411			Digest::default(),
1412		);
1413
1414		// verify empty digest shows nothing
1415		assert!(find_authorities_change::<Block, ecdsa_crypto::AuthorityId>(&header).is_none());
1416
1417		let peers = &[Keyring::One, Keyring::Two];
1418		let id = 42;
1419		let validator_set = ValidatorSet::new(make_beefy_ids(peers), id).unwrap();
1420		header.digest_mut().push(DigestItem::Consensus(
1421			BEEFY_ENGINE_ID,
1422			ConsensusLog::<ecdsa_crypto::AuthorityId>::AuthoritiesChange(validator_set.clone())
1423				.encode(),
1424		));
1425
1426		// verify validator set is correctly extracted from digest
1427		let extracted = find_authorities_change::<Block, ecdsa_crypto::AuthorityId>(&header);
1428		assert_eq!(extracted, Some(validator_set));
1429	}
1430
1431	#[tokio::test]
1432	async fn should_finalize_correctly() {
1433		let keys = [Keyring::Alice];
1434		let validator_set = ValidatorSet::new(make_beefy_ids(&keys), 0).unwrap();
1435		let mut net = BeefyTestNet::new(1);
1436		let backend = net.peer(0).client().as_backend();
1437		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1438		// remove default session, will manually add custom one.
1439		worker.persisted_state.voting_oracle.sessions.clear();
1440
1441		let keys = keys.iter().cloned().enumerate();
1442		let (mut best_block_streams, mut finality_proofs) =
1443			get_beefy_streams(&mut net, keys.clone());
1444		let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1445		let mut finality_proof = finality_proofs.drain(..).next().unwrap();
1446
1447		let create_finality_proof = |block_num: NumberFor<Block>| {
1448			let commitment = Commitment {
1449				payload: Payload::from_single_entry(known_payloads::MMR_ROOT_ID, vec![]),
1450				block_number: block_num,
1451				validator_set_id: validator_set.id(),
1452			};
1453			VersionedFinalityProof::V1(SignedCommitment { commitment, signatures: vec![None] })
1454		};
1455
1456		// no 'best beefy block' or finality proofs
1457		assert_eq!(worker.persisted_state.best_beefy(), 0);
1458		poll_fn(move |cx| {
1459			assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending);
1460			assert_eq!(finality_proof.poll_next_unpin(cx), Poll::Pending);
1461			Poll::Ready(())
1462		})
1463		.await;
1464
1465		let client = net.peer(0).client().as_client();
1466		// unknown hash for block #1
1467		let (mut best_block_streams, mut finality_proofs) =
1468			get_beefy_streams(&mut net, keys.clone());
1469		let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1470		let mut finality_proof = finality_proofs.drain(..).next().unwrap();
1471		let justif = create_finality_proof(1);
1472		// create new session at block #1
1473		worker
1474			.persisted_state
1475			.voting_oracle
1476			.add_session(Rounds::new(1, validator_set.clone()));
1477		// try to finalize block #1
1478		worker.finalize(justif.clone()).unwrap();
1479		// verify block finalized
1480		assert_eq!(worker.persisted_state.best_beefy(), 1);
1481		poll_fn(move |cx| {
1482			// expect Some(hash-of-block-1)
1483			match best_block_stream.poll_next_unpin(cx) {
1484				Poll::Ready(Some(hash)) => {
1485					let block_num = client.number(hash).unwrap();
1486					assert_eq!(block_num, Some(1));
1487				},
1488				v => panic!("unexpected value: {:?}", v),
1489			}
1490			// commitment streamed
1491			match finality_proof.poll_next_unpin(cx) {
1492				// expect justification
1493				Poll::Ready(Some(received)) => assert_eq!(received, justif),
1494				v => panic!("unexpected value: {:?}", v),
1495			}
1496			Poll::Ready(())
1497		})
1498		.await;
1499
1500		// generate 2 blocks, try again expect success
1501		let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys);
1502		let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
1503		let hashes = net.peer(0).push_blocks(1, false);
1504		// finalize 1 and 2 without justifications (hashes does not contain genesis)
1505		let hashof2 = hashes[0];
1506		backend.finalize_block(hashof2, None).unwrap();
1507
1508		let justif = create_finality_proof(2);
1509		// create new session at block #2
1510		worker.persisted_state.voting_oracle.add_session(Rounds::new(2, validator_set));
1511		worker.finalize(justif).unwrap();
1512		// verify old session pruned
1513		assert_eq!(worker.voting_oracle().sessions.len(), 1);
1514		// new session starting at #2 is in front
1515		assert_eq!(worker.active_rounds().unwrap().session_start(), 2);
1516		// verify block finalized
1517		assert_eq!(worker.persisted_state.best_beefy(), 2);
1518		poll_fn(move |cx| {
1519			match best_block_stream.poll_next_unpin(cx) {
1520				// expect Some(hash-of-block-2)
1521				Poll::Ready(Some(hash)) => {
1522					let block_num = net.peer(0).client().as_client().number(hash).unwrap();
1523					assert_eq!(block_num, Some(2));
1524				},
1525				v => panic!("unexpected value: {:?}", v),
1526			}
1527			Poll::Ready(())
1528		})
1529		.await;
1530
1531		// check BEEFY justifications are also appended to backend
1532		let justifs = backend.blockchain().justifications(hashof2).unwrap().unwrap();
1533		assert!(justifs.get(BEEFY_ENGINE_ID).is_some())
1534	}
1535
1536	#[tokio::test]
1537	async fn should_init_session() {
1538		let keys = &[Keyring::Alice, Keyring::Bob];
1539		let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
1540		let mut net = BeefyTestNet::new(1);
1541		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1542
1543		let worker_rounds = worker.active_rounds().unwrap();
1544		assert_eq!(worker_rounds.session_start(), 1);
1545		assert_eq!(worker_rounds.validators(), validator_set.validators());
1546		assert_eq!(worker_rounds.validator_set_id(), validator_set.id());
1547
1548		// new validator set
1549		let keys = &[Keyring::Bob];
1550		let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
1551
1552		worker.init_session_at(new_validator_set.clone(), 11);
1553		// Since mandatory is not done for old rounds, we still get those.
1554		let rounds = worker.persisted_state.voting_oracle.active_rounds_mut().unwrap();
1555		assert_eq!(rounds.validator_set_id(), validator_set.id());
1556		// Let's finalize mandatory.
1557		rounds.test_set_mandatory_done(true);
1558		worker.persisted_state.voting_oracle.try_prune();
1559		// Now we should get the next round.
1560		let rounds = worker.active_rounds().unwrap();
1561		// Expect new values.
1562		assert_eq!(rounds.session_start(), 11);
1563		assert_eq!(rounds.validators(), new_validator_set.validators());
1564		assert_eq!(rounds.validator_set_id(), new_validator_set.id());
1565	}
1566
1567	#[tokio::test]
1568	async fn should_not_report_bad_old_or_self_equivocations() {
1569		let block_num = 1;
1570		let set_id = 1;
1571		let keys = [Keyring::Alice];
1572		let validator_set = ValidatorSet::new(make_beefy_ids(&keys), set_id).unwrap();
1573		// Alice votes on good MMR roots, equivocations are allowed/expected
1574		let mut api_alice = TestApi::with_validator_set(&validator_set);
1575		api_alice.allow_equivocations();
1576		let api_alice = Arc::new(api_alice);
1577
1578		let mut net = BeefyTestNet::new(1);
1579		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
1580		worker.runtime = api_alice.clone();
1581		worker.fisherman = Arc::new(Fisherman::new(
1582			worker.backend.clone(),
1583			worker.runtime.clone(),
1584			worker.key_store.clone(),
1585		));
1586
1587		// let there be a block with num = 1:
1588		let _ = net.peer(0).push_blocks(1, false);
1589
1590		let payload1 = Payload::from_single_entry(MMR_ROOT_ID, vec![42]);
1591		let payload2 = Payload::from_single_entry(MMR_ROOT_ID, vec![128]);
1592
1593		// generate an equivocation proof, with Bob as perpetrator
1594		let good_proof = generate_double_voting_proof(
1595			(block_num, payload1.clone(), set_id, &Keyring::Bob),
1596			(block_num, payload2.clone(), set_id, &Keyring::Bob),
1597		);
1598		{
1599			// expect voter (Alice) to successfully report it
1600			assert_eq!(worker.report_double_voting(good_proof.clone()), Ok(()));
1601			// verify Alice reports Bob equivocation to runtime
1602			let reported = api_alice.reported_equivocations.as_ref().unwrap().lock();
1603			assert_eq!(reported.len(), 1);
1604			assert_eq!(*reported.get(0).unwrap(), good_proof);
1605		}
1606		api_alice.reported_equivocations.as_ref().unwrap().lock().clear();
1607
1608		// now let's try with a bad proof
1609		let mut bad_proof = good_proof.clone();
1610		bad_proof.first.id = Keyring::Charlie.public();
1611		// bad proofs are simply ignored
1612		assert_eq!(worker.report_double_voting(bad_proof), Ok(()));
1613		// verify nothing reported to runtime
1614		assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1615
1616		// now let's try with old set it
1617		let mut old_proof = good_proof.clone();
1618		old_proof.first.commitment.validator_set_id = 0;
1619		old_proof.second.commitment.validator_set_id = 0;
1620		// old proofs are simply ignored
1621		assert_eq!(worker.report_double_voting(old_proof), Ok(()));
1622		// verify nothing reported to runtime
1623		assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1624
1625		// now let's try reporting a self-equivocation
1626		let self_proof = generate_double_voting_proof(
1627			(block_num, payload1.clone(), set_id, &Keyring::Alice),
1628			(block_num, payload2.clone(), set_id, &Keyring::Alice),
1629		);
1630		// equivocations done by 'self' are simply ignored (not reported)
1631		assert_eq!(worker.report_double_voting(self_proof), Ok(()));
1632		// verify nothing reported to runtime
1633		assert!(api_alice.reported_equivocations.as_ref().unwrap().lock().is_empty());
1634	}
1635}