use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
};
use codec::{Decode, Encode};
use finality_grandpa::{
round::State as RoundState, voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError,
};
use futures::prelude::*;
use futures_timer::Delay;
use log::{debug, warn};
use parking_lot::RwLock;
use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, U64};
use sc_client_api::{
backend::{apply_aux, Backend as BackendT},
utils::is_descendent_of,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_api::ApiExt;
use sp_blockchain::HeaderMetadata;
use sp_consensus::SelectChain as SelectChainT;
use sp_consensus_grandpa::{
AuthorityId, AuthoritySignature, Equivocation, EquivocationProof, GrandpaApi, RoundNumber,
SetId, GRANDPA_ENGINE_ID,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
use crate::{
authorities::{AuthoritySet, SharedAuthoritySet},
communication::{Network as NetworkT, Syncing as SyncingT},
justification::GrandpaJustification,
local_authority_id,
notification::GrandpaJustificationSender,
until_imported::UntilVoteTargetImported,
voting_rule::VotingRule as VotingRuleT,
ClientForGrandpa, CommandOrError, Commit, Config, Error, NewAuthoritySet, Precommit, Prevote,
PrimaryPropose, SignedMessage, VoterCommand, LOG_TARGET,
};
type HistoricalVotes<Block> = finality_grandpa::HistoricalVotes<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct CompletedRound<Block: BlockT> {
pub number: RoundNumber,
pub state: RoundState<Block::Hash, NumberFor<Block>>,
pub base: (Block::Hash, NumberFor<Block>),
pub votes: Vec<SignedMessage<Block::Header>>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CompletedRounds<Block: BlockT> {
rounds: Vec<CompletedRound<Block>>,
set_id: SetId,
voters: Vec<AuthorityId>,
}
const NUM_LAST_COMPLETED_ROUNDS: usize = 2;
impl<Block: BlockT> Encode for CompletedRounds<Block> {
fn encode(&self) -> Vec<u8> {
let v = Vec::from_iter(&self.rounds);
(&v, &self.set_id, &self.voters).encode()
}
}
impl<Block: BlockT> codec::EncodeLike for CompletedRounds<Block> {}
impl<Block: BlockT> Decode for CompletedRounds<Block> {
fn decode<I: codec::Input>(value: &mut I) -> Result<Self, codec::Error> {
<(Vec<CompletedRound<Block>>, SetId, Vec<AuthorityId>)>::decode(value)
.map(|(rounds, set_id, voters)| CompletedRounds { rounds, set_id, voters })
}
}
impl<Block: BlockT> CompletedRounds<Block> {
pub(crate) fn new(
genesis: CompletedRound<Block>,
set_id: SetId,
voters: &AuthoritySet<Block::Hash, NumberFor<Block>>,
) -> CompletedRounds<Block> {
let mut rounds = Vec::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
rounds.push(genesis);
let voters = voters.current_authorities.iter().map(|(a, _)| a.clone()).collect();
CompletedRounds { rounds, set_id, voters }
}
pub fn set_info(&self) -> (SetId, &[AuthorityId]) {
(self.set_id, &self.voters[..])
}
pub fn iter(&self) -> impl Iterator<Item = &CompletedRound<Block>> {
self.rounds.iter().rev()
}
pub fn last(&self) -> &CompletedRound<Block> {
self.rounds
.first()
.expect("inner is never empty; always contains at least genesis; qed")
}
pub fn push(&mut self, completed_round: CompletedRound<Block>) {
use std::cmp::Reverse;
match self
.rounds
.binary_search_by_key(&Reverse(completed_round.number), |completed_round| {
Reverse(completed_round.number)
}) {
Ok(idx) => self.rounds[idx] = completed_round,
Err(idx) => self.rounds.insert(idx, completed_round),
};
if self.rounds.len() > NUM_LAST_COMPLETED_ROUNDS {
self.rounds.pop();
}
}
}
pub type CurrentRounds<Block> = BTreeMap<RoundNumber, HasVoted<<Block as BlockT>::Header>>;
#[derive(Debug, Decode, Encode, PartialEq)]
pub enum VoterSetState<Block: BlockT> {
Live {
completed_rounds: CompletedRounds<Block>,
current_rounds: CurrentRounds<Block>,
},
Paused {
completed_rounds: CompletedRounds<Block>,
},
}
impl<Block: BlockT> VoterSetState<Block> {
pub(crate) fn live(
set_id: SetId,
authority_set: &AuthoritySet<Block::Hash, NumberFor<Block>>,
genesis_state: (Block::Hash, NumberFor<Block>),
) -> VoterSetState<Block> {
let state = RoundState::genesis((genesis_state.0, genesis_state.1));
let completed_rounds = CompletedRounds::new(
CompletedRound {
number: 0,
state,
base: (genesis_state.0, genesis_state.1),
votes: Vec::new(),
},
set_id,
authority_set,
);
let mut current_rounds = CurrentRounds::<Block>::new();
current_rounds.insert(1, HasVoted::No);
VoterSetState::Live { completed_rounds, current_rounds }
}
pub(crate) fn completed_rounds(&self) -> CompletedRounds<Block> {
match self {
VoterSetState::Live { completed_rounds, .. } => completed_rounds.clone(),
VoterSetState::Paused { completed_rounds } => completed_rounds.clone(),
}
}
pub(crate) fn last_completed_round(&self) -> CompletedRound<Block> {
match self {
VoterSetState::Live { completed_rounds, .. } => completed_rounds.last().clone(),
VoterSetState::Paused { completed_rounds } => completed_rounds.last().clone(),
}
}
pub fn with_current_round(
&self,
round: RoundNumber,
) -> Result<(&CompletedRounds<Block>, &CurrentRounds<Block>), Error> {
if let VoterSetState::Live { completed_rounds, current_rounds } = self {
if current_rounds.contains_key(&round) {
Ok((completed_rounds, current_rounds))
} else {
let msg = "Voter acting on a live round we are not tracking.";
Err(Error::Safety(msg.to_string()))
}
} else {
let msg = "Voter acting while in paused state.";
Err(Error::Safety(msg.to_string()))
}
}
}
#[derive(Clone, Debug, Decode, Encode, PartialEq)]
pub enum HasVoted<Header: HeaderT> {
No,
Yes(AuthorityId, Vote<Header>),
}
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub enum Vote<Header: HeaderT> {
Propose(PrimaryPropose<Header>),
Prevote(Option<PrimaryPropose<Header>>, Prevote<Header>),
Precommit(Option<PrimaryPropose<Header>>, Prevote<Header>, Precommit<Header>),
}
impl<Header: HeaderT> HasVoted<Header> {
pub fn propose(&self) -> Option<&PrimaryPropose<Header>> {
match self {
HasVoted::Yes(_, Vote::Propose(propose)) => Some(propose),
HasVoted::Yes(_, Vote::Prevote(propose, _)) |
HasVoted::Yes(_, Vote::Precommit(propose, _, _)) => propose.as_ref(),
_ => None,
}
}
pub fn prevote(&self) -> Option<&Prevote<Header>> {
match self {
HasVoted::Yes(_, Vote::Prevote(_, prevote)) |
HasVoted::Yes(_, Vote::Precommit(_, prevote, _)) => Some(prevote),
_ => None,
}
}
pub fn precommit(&self) -> Option<&Precommit<Header>> {
match self {
HasVoted::Yes(_, Vote::Precommit(_, _, precommit)) => Some(precommit),
_ => None,
}
}
pub fn can_propose(&self) -> bool {
self.propose().is_none()
}
pub fn can_prevote(&self) -> bool {
self.prevote().is_none()
}
pub fn can_precommit(&self) -> bool {
self.precommit().is_none()
}
}
#[derive(Clone)]
pub struct SharedVoterSetState<Block: BlockT> {
inner: Arc<RwLock<VoterSetState<Block>>>,
voting: Arc<RwLock<HashMap<RoundNumber, AuthorityId>>>,
}
impl<Block: BlockT> From<VoterSetState<Block>> for SharedVoterSetState<Block> {
fn from(set_state: VoterSetState<Block>) -> Self {
SharedVoterSetState::new(set_state)
}
}
impl<Block: BlockT> SharedVoterSetState<Block> {
pub(crate) fn new(state: VoterSetState<Block>) -> Self {
SharedVoterSetState {
inner: Arc::new(RwLock::new(state)),
voting: Arc::new(RwLock::new(HashMap::new())),
}
}
pub(crate) fn read(&self) -> parking_lot::RwLockReadGuard<VoterSetState<Block>> {
self.inner.read()
}
pub(crate) fn voting_on(&self, round: RoundNumber) -> Option<AuthorityId> {
self.voting.read().get(&round).cloned()
}
pub(crate) fn started_voting_on(&self, round: RoundNumber, local_id: AuthorityId) {
self.voting.write().insert(round, local_id);
}
pub(crate) fn finished_voting_on(&self, round: RoundNumber) {
self.voting.write().remove(&round);
}
pub(crate) fn has_voted(&self, round: RoundNumber) -> HasVoted<Block::Header> {
match &*self.inner.read() {
VoterSetState::Live { current_rounds, .. } => current_rounds
.get(&round)
.and_then(|has_voted| match has_voted {
HasVoted::Yes(id, vote) => Some(HasVoted::Yes(id.clone(), vote.clone())),
_ => None,
})
.unwrap_or(HasVoted::No),
_ => HasVoted::No,
}
}
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut VoterSetState<Block>) -> R,
{
f(&mut *self.inner.write())
}
}
#[derive(Clone)]
pub(crate) struct Metrics {
finality_grandpa_round: Gauge<U64>,
finality_grandpa_prevotes: Counter<U64>,
finality_grandpa_precommits: Counter<U64>,
}
impl Metrics {
pub(crate) fn register(
registry: &prometheus_endpoint::Registry,
) -> Result<Self, PrometheusError> {
Ok(Self {
finality_grandpa_round: register(
Gauge::new("substrate_finality_grandpa_round", "Highest completed GRANDPA round.")?,
registry,
)?,
finality_grandpa_prevotes: register(
Counter::new(
"substrate_finality_grandpa_prevotes_total",
"Total number of GRANDPA prevotes cast locally.",
)?,
registry,
)?,
finality_grandpa_precommits: register(
Counter::new(
"substrate_finality_grandpa_precommits_total",
"Total number of GRANDPA precommits cast locally.",
)?,
registry,
)?,
})
}
}
pub(crate) struct Environment<
Backend,
Block: BlockT,
C,
N: NetworkT<Block>,
S: SyncingT<Block>,
SC,
VR,
> {
pub(crate) client: Arc<C>,
pub(crate) select_chain: SC,
pub(crate) voters: Arc<VoterSet<AuthorityId>>,
pub(crate) config: Config,
pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
pub(crate) network: crate::communication::NetworkBridge<Block, N, S>,
pub(crate) set_id: SetId,
pub(crate) voter_set_state: SharedVoterSetState<Block>,
pub(crate) voting_rule: VR,
pub(crate) metrics: Option<Metrics>,
pub(crate) justification_sender: Option<GrandpaJustificationSender<Block>>,
pub(crate) telemetry: Option<TelemetryHandle>,
pub(crate) offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
pub(crate) _phantom: PhantomData<Backend>,
}
impl<BE, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR>
Environment<BE, Block, C, N, S, SC, VR>
{
pub(crate) fn update_voter_set_state<F>(&self, f: F) -> Result<(), Error>
where
F: FnOnce(&VoterSetState<Block>) -> Result<Option<VoterSetState<Block>>, Error>,
{
self.voter_set_state.with(|voter_set_state| {
if let Some(set_state) = f(voter_set_state)? {
*voter_set_state = set_state;
if let Some(metrics) = self.metrics.as_ref() {
if let VoterSetState::Live { completed_rounds, .. } = voter_set_state {
let highest = completed_rounds
.rounds
.iter()
.map(|round| round.number)
.max()
.expect("There is always one completed round (genesis); qed");
metrics.finality_grandpa_round.set(highest);
}
}
}
Ok(())
})
}
}
impl<BE, Block, C, N, S, SC, VR> Environment<BE, Block, C, N, S, SC, VR>
where
Block: BlockT,
BE: BackendT<Block>,
C: ClientForGrandpa<Block, BE>,
C::Api: GrandpaApi<Block>,
N: NetworkT<Block>,
S: SyncingT<Block>,
SC: SelectChainT<Block>,
{
pub(crate) fn report_equivocation(
&self,
equivocation: Equivocation<Block::Hash, NumberFor<Block>>,
) -> Result<(), Error> {
if let Some(local_id) = self.voter_set_state.voting_on(equivocation.round_number()) {
if *equivocation.offender() == local_id {
return Err(Error::Safety(
"Refraining from sending equivocation report for our own equivocation.".into(),
))
}
}
let is_descendent_of = is_descendent_of(&*self.client, None);
let (best_block_hash, best_block_number) = {
let info = self.client.info();
(info.best_hash, info.best_number)
};
let authority_set = self.authority_set.inner();
let next_change = authority_set
.next_change(&best_block_hash, &is_descendent_of)
.map_err(|e| Error::Safety(e.to_string()))?;
let current_set_latest_hash = match next_change {
Some((_, n)) if n.is_zero() =>
return Err(Error::Safety("Authority set change signalled at genesis.".to_string())),
Some((_, n)) if n > best_block_number => best_block_hash,
Some((h, _)) => {
let header = self.client.header(h)?.expect(
"got block hash from registered pending change; \
pending changes are only registered on block import; qed.",
);
*header.parent_hash()
},
None => best_block_hash,
};
let key_owner_proof = match self
.client
.runtime_api()
.generate_key_ownership_proof(
current_set_latest_hash,
authority_set.set_id,
equivocation.offender().clone(),
)
.map_err(Error::RuntimeApi)?
{
Some(proof) => proof,
None => {
debug!(
target: LOG_TARGET,
"Equivocation offender is not part of the authority set."
);
return Ok(())
},
};
let equivocation_proof = EquivocationProof::new(authority_set.set_id, equivocation);
let mut runtime_api = self.client.runtime_api();
runtime_api.register_extension(
self.offchain_tx_pool_factory.offchain_transaction_pool(best_block_hash),
);
runtime_api
.submit_report_equivocation_unsigned_extrinsic(
best_block_hash,
equivocation_proof,
key_owner_proof,
)
.map_err(Error::RuntimeApi)?;
Ok(())
}
}
impl<BE, Block, C, N, S, SC, VR> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
for Environment<BE, Block, C, N, S, SC, VR>
where
Block: BlockT,
BE: BackendT<Block>,
C: ClientForGrandpa<Block, BE>,
N: NetworkT<Block>,
S: SyncingT<Block>,
SC: SelectChainT<Block>,
VR: VotingRuleT<Block, C>,
NumberFor<Block>: BlockNumberOps,
{
fn ancestry(
&self,
base: Block::Hash,
block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError> {
ancestry(&self.client, base, block)
}
}
pub(crate) fn ancestry<Block: BlockT, Client>(
client: &Arc<Client>,
base: Block::Hash,
block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError>
where
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
{
if base == block {
return Err(GrandpaError::NotDescendent)
}
let tree_route_res = sp_blockchain::tree_route(&**client, block, base);
let tree_route = match tree_route_res {
Ok(tree_route) => tree_route,
Err(e) => {
debug!(
target: LOG_TARGET,
"Encountered error computing ancestry between block {:?} and base {:?}: {}",
block,
base,
e
);
return Err(GrandpaError::NotDescendent)
},
};
if tree_route.common_block().hash != base {
return Err(GrandpaError::NotDescendent)
}
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
}
impl<B, Block, C, N, S, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, Block, C, N, S, SC, VR>
where
Block: BlockT,
B: BackendT<Block>,
C: ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block>,
N: NetworkT<Block>,
S: SyncingT<Block>,
SC: SelectChainT<Block> + 'static,
VR: VotingRuleT<Block, C> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
{
type Timer = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
type BestChain = Pin<
Box<
dyn Future<Output = Result<Option<(Block::Hash, NumberFor<Block>)>, Self::Error>>
+ Send,
>,
>;
type Id = AuthorityId;
type Signature = AuthoritySignature;
type In = Pin<
Box<
dyn Stream<
Item = Result<
::finality_grandpa::SignedMessage<
Block::Hash,
NumberFor<Block>,
Self::Signature,
Self::Id,
>,
Self::Error,
>,
> + Send,
>,
>;
type Out = Pin<
Box<
dyn Sink<
::finality_grandpa::Message<Block::Hash, NumberFor<Block>>,
Error = Self::Error,
> + Send,
>,
>;
type Error = CommandOrError<Block::Hash, NumberFor<Block>>;
fn best_chain_containing(&self, block: Block::Hash) -> Self::BestChain {
let client = self.client.clone();
let authority_set = self.authority_set.clone();
let select_chain = self.select_chain.clone();
let voting_rule = self.voting_rule.clone();
let set_id = self.set_id;
Box::pin(async move {
if set_id != authority_set.set_id() {
return Ok(None)
}
best_chain_containing(block, client, authority_set, select_chain, voting_rule)
.await
.map_err(|e| e.into())
})
}
fn round_data(
&self,
round: RoundNumber,
) -> voter::RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
let prevote_timer = Delay::new(self.config.gossip_duration * 2);
let precommit_timer = Delay::new(self.config.gossip_duration * 4);
let local_id = local_authority_id(&self.voters, self.config.keystore.as_ref());
let has_voted = match self.voter_set_state.has_voted(round) {
HasVoted::Yes(id, vote) =>
if local_id.as_ref().map(|k| k == &id).unwrap_or(false) {
HasVoted::Yes(id, vote)
} else {
HasVoted::No
},
HasVoted::No => HasVoted::No,
};
if let Some(id) = local_id.as_ref() {
self.voter_set_state.started_voting_on(round, id.clone());
}
let keystore = match (local_id.as_ref(), self.config.keystore.as_ref()) {
(Some(id), Some(keystore)) => Some((id.clone(), keystore.clone()).into()),
_ => None,
};
let (incoming, outgoing) = self.network.round_communication(
keystore,
crate::communication::Round(round),
crate::communication::SetId(self.set_id),
self.voters.clone(),
has_voted,
);
let incoming = Box::pin(
UntilVoteTargetImported::new(
self.client.import_notification_stream(),
self.network.clone(),
self.client.clone(),
incoming,
"round",
None,
)
.map_err(Into::into),
);
let outgoing = Box::pin(outgoing.sink_err_into());
voter::RoundData {
voter_id: local_id,
prevote_timer: Box::pin(prevote_timer.map(Ok)),
precommit_timer: Box::pin(precommit_timer.map(Ok)),
incoming,
outgoing,
}
}
fn proposed(
&self,
round: RoundNumber,
propose: PrimaryPropose<Block::Header>,
) -> Result<(), Self::Error> {
let local_id = match self.voter_set_state.voting_on(round) {
Some(id) => id,
None => return Ok(()),
};
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
let current_round = current_rounds
.get(&round)
.expect("checked in with_current_round that key exists; qed.");
if !current_round.can_propose() {
return Ok(None)
}
let mut current_rounds = current_rounds.clone();
let current_round = current_rounds
.get_mut(&round)
.expect("checked previously that key exists; qed.");
*current_round = HasVoted::Yes(local_id, Vote::Propose(propose));
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn prevoted(
&self,
round: RoundNumber,
prevote: Prevote<Block::Header>,
) -> Result<(), Self::Error> {
let local_id = match self.voter_set_state.voting_on(round) {
Some(id) => id,
None => return Ok(()),
};
let report_prevote_metrics = |prevote: &Prevote<Block::Header>| {
telemetry!(
self.telemetry;
CONSENSUS_DEBUG;
"afg.prevote_issued";
"round" => round,
"target_number" => ?prevote.target_number,
"target_hash" => ?prevote.target_hash,
);
if let Some(metrics) = self.metrics.as_ref() {
metrics.finality_grandpa_prevotes.inc();
}
};
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
let current_round = current_rounds
.get(&round)
.expect("checked in with_current_round that key exists; qed.");
if !current_round.can_prevote() {
return Ok(None)
}
report_prevote_metrics(&prevote);
let propose = current_round.propose();
let mut current_rounds = current_rounds.clone();
let current_round = current_rounds
.get_mut(&round)
.expect("checked previously that key exists; qed.");
*current_round = HasVoted::Yes(local_id, Vote::Prevote(propose.cloned(), prevote));
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn precommitted(
&self,
round: RoundNumber,
precommit: Precommit<Block::Header>,
) -> Result<(), Self::Error> {
let local_id = match self.voter_set_state.voting_on(round) {
Some(id) => id,
None => return Ok(()),
};
let report_precommit_metrics = |precommit: &Precommit<Block::Header>| {
telemetry!(
self.telemetry;
CONSENSUS_DEBUG;
"afg.precommit_issued";
"round" => round,
"target_number" => ?precommit.target_number,
"target_hash" => ?precommit.target_hash,
);
if let Some(metrics) = self.metrics.as_ref() {
metrics.finality_grandpa_precommits.inc();
}
};
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
let current_round = current_rounds
.get(&round)
.expect("checked in with_current_round that key exists; qed.");
if !current_round.can_precommit() {
return Ok(None)
}
report_precommit_metrics(&precommit);
let propose = current_round.propose();
let prevote = match current_round {
HasVoted::Yes(_, Vote::Prevote(_, prevote)) => prevote,
_ => {
let msg = "Voter precommitting before prevoting.";
return Err(Error::Safety(msg.to_string()))
},
};
let mut current_rounds = current_rounds.clone();
let current_round = current_rounds
.get_mut(&round)
.expect("checked previously that key exists; qed.");
*current_round = HasVoted::Yes(
local_id,
Vote::Precommit(propose.cloned(), prevote.clone(), precommit),
);
let set_state = VoterSetState::<Block>::Live {
completed_rounds: completed_rounds.clone(),
current_rounds,
};
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn completed(
&self,
round: RoundNumber,
state: RoundState<Block::Hash, NumberFor<Block>>,
base: (Block::Hash, NumberFor<Block>),
historical_votes: &HistoricalVotes<Block>,
) -> Result<(), Self::Error> {
debug!(
target: LOG_TARGET,
"Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
self.config.name(),
round,
self.set_id,
state.estimate.as_ref().map(|e| e.1),
state.finalized.as_ref().map(|e| e.1),
);
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, current_rounds) =
if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
(completed_rounds, current_rounds)
} else {
let msg = "Voter acting while in paused state.";
return Err(Error::Safety(msg.to_string()))
};
let mut completed_rounds = completed_rounds.clone();
let votes = historical_votes.seen().to_vec();
completed_rounds.push(CompletedRound {
number: round,
state: state.clone(),
base,
votes,
});
let mut current_rounds = current_rounds.clone();
current_rounds.remove(&round);
current_rounds.entry(round + 1).or_insert(HasVoted::No);
let set_state = VoterSetState::<Block>::Live { completed_rounds, current_rounds };
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
self.voter_set_state.finished_voting_on(round);
Ok(())
}
fn concluded(
&self,
round: RoundNumber,
state: RoundState<Block::Hash, NumberFor<Block>>,
_base: (Block::Hash, NumberFor<Block>),
historical_votes: &HistoricalVotes<Block>,
) -> Result<(), Self::Error> {
debug!(
target: LOG_TARGET,
"Voter {} concluded round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
self.config.name(),
round,
self.set_id,
state.estimate.as_ref().map(|e| e.1),
state.finalized.as_ref().map(|e| e.1),
);
self.update_voter_set_state(|voter_set_state| {
let (completed_rounds, current_rounds) =
if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
(completed_rounds, current_rounds)
} else {
let msg = "Voter acting while in paused state.";
return Err(Error::Safety(msg.to_string()))
};
let mut completed_rounds = completed_rounds.clone();
if let Some(already_completed) =
completed_rounds.rounds.iter_mut().find(|r| r.number == round)
{
let n_existing_votes = already_completed.votes.len();
already_completed
.votes
.extend(historical_votes.seen().iter().skip(n_existing_votes).cloned());
already_completed.state = state;
crate::aux_schema::write_concluded_round(&*self.client, already_completed)?;
}
let set_state = VoterSetState::<Block>::Live {
completed_rounds,
current_rounds: current_rounds.clone(),
};
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
Ok(Some(set_state))
})?;
Ok(())
}
fn finalize_block(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
round: RoundNumber,
commit: Commit<Block::Header>,
) -> Result<(), Self::Error> {
finalize_block(
self.client.clone(),
&self.authority_set,
Some(self.config.justification_generation_period),
hash,
number,
(round, commit).into(),
false,
self.justification_sender.as_ref(),
self.telemetry.clone(),
)
}
fn round_commit_timer(&self) -> Self::Timer {
use rand::{thread_rng, Rng};
let delay: u64 =
thread_rng().gen_range(0..2 * self.config.gossip_duration.as_millis() as u64);
Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok))
}
fn prevote_equivocation(
&self,
_round: RoundNumber,
equivocation: finality_grandpa::Equivocation<
Self::Id,
Prevote<Block::Header>,
Self::Signature,
>,
) {
warn!(
target: LOG_TARGET,
"Detected prevote equivocation in the finality worker: {:?}", equivocation
);
if let Err(err) = self.report_equivocation(equivocation.into()) {
warn!(target: LOG_TARGET, "Error reporting prevote equivocation: {}", err);
}
}
fn precommit_equivocation(
&self,
_round: RoundNumber,
equivocation: finality_grandpa::Equivocation<
Self::Id,
Precommit<Block::Header>,
Self::Signature,
>,
) {
warn!(
target: LOG_TARGET,
"Detected precommit equivocation in the finality worker: {:?}", equivocation
);
if let Err(err) = self.report_equivocation(equivocation.into()) {
warn!(target: LOG_TARGET, "Error reporting precommit equivocation: {}", err);
}
}
}
pub(crate) enum JustificationOrCommit<Block: BlockT> {
Justification(GrandpaJustification<Block>),
Commit((RoundNumber, Commit<Block::Header>)),
}
impl<Block: BlockT> From<(RoundNumber, Commit<Block::Header>)> for JustificationOrCommit<Block> {
fn from(commit: (RoundNumber, Commit<Block::Header>)) -> JustificationOrCommit<Block> {
JustificationOrCommit::Commit(commit)
}
}
impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationOrCommit<Block> {
fn from(justification: GrandpaJustification<Block>) -> JustificationOrCommit<Block> {
JustificationOrCommit::Justification(justification)
}
}
async fn best_chain_containing<Block, Backend, Client, SelectChain, VotingRule>(
block: Block::Hash,
client: Arc<Client>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
select_chain: SelectChain,
voting_rule: VotingRule,
) -> Result<Option<(Block::Hash, NumberFor<Block>)>, Error>
where
Backend: BackendT<Block>,
Block: BlockT,
Client: ClientForGrandpa<Block, Backend>,
SelectChain: SelectChainT<Block> + 'static,
VotingRule: VotingRuleT<Block, Client>,
{
let base_header = match client.header(block)? {
Some(h) => h,
None => {
warn!(
target: LOG_TARGET,
"Encountered error finding best chain containing {:?}: couldn't find base block",
block,
);
return Ok(None)
},
};
let limit = authority_set.current_limit(*base_header.number());
debug!(
target: LOG_TARGET,
"Finding best chain containing block {:?} with number limit {:?}", block, limit
);
let mut target_header = match select_chain.finality_target(block, None).await {
Ok(target_hash) => client
.header(target_hash)?
.expect("Header known to exist after `finality_target` call; qed"),
Err(err) => {
debug!(
target: LOG_TARGET,
"Encountered error finding best chain containing {:?}: couldn't find target block: {}",
block,
err,
);
base_header.clone()
},
};
let mut best_header = match select_chain.best_chain().await {
Ok(best_header) => best_header,
Err(err) => {
warn!(
target: LOG_TARGET,
"Encountered error finding best chain containing {:?}: couldn't find best block: {}",
block,
err,
);
return Ok(None)
},
};
let is_descendent_of = is_descendent_of(&*client, None);
if target_header.number() > best_header.number() ||
target_header.number() == best_header.number() &&
target_header.hash() != best_header.hash() ||
!is_descendent_of(&target_header.hash(), &best_header.hash())?
{
debug!(
target: LOG_TARGET,
"SelectChain returned a finality target inconsistent with its best block. Restricting best block to target block"
);
best_header = target_header.clone();
}
debug!(
target: LOG_TARGET,
"SelectChain: finality target: #{} ({}), best block: #{} ({})",
target_header.number(),
target_header.hash(),
best_header.number(),
best_header.hash(),
);
if let Some(target_number) = limit.filter(|limit| limit < target_header.number()) {
loop {
if *target_header.number() < target_number {
unreachable!(
"we are traversing backwards from a known block; \
blocks are stored contiguously; \
qed"
);
}
if *target_header.number() == target_number {
break
}
target_header = client
.header(*target_header.parent_hash())?
.expect("Header known to exist after `finality_target` call; qed");
}
debug!(
target: LOG_TARGET,
"Finality target restricted to #{} ({}) due to pending authority set change",
target_header.number(),
target_header.hash()
)
}
Ok(voting_rule
.restrict_vote(client.clone(), &base_header, &best_header, &target_header)
.await
.filter(|(_, restricted_number)| {
restricted_number >= base_header.number() && restricted_number < target_header.number()
})
.or_else(|| Some((target_header.hash(), *target_header.number()))))
}
pub(crate) fn should_process_justification<BE, Block, Client>(
client: &Client,
justification_period: u32,
number: NumberFor<Block>,
enacts_change: bool,
) -> bool
where
Block: BlockT,
BE: BackendT<Block>,
Client: ClientForGrandpa<Block, BE>,
{
if enacts_change {
return true
}
let last_finalized_number = client.info().finalized_number;
if last_finalized_number.is_zero() {
return true
}
last_finalized_number / justification_period.into() != number / justification_period.into()
}
pub(crate) fn finalize_block<BE, Block, Client>(
client: Arc<Client>,
authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
justification_generation_period: Option<u32>,
hash: Block::Hash,
number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>,
initial_sync: bool,
justification_sender: Option<&GrandpaJustificationSender<Block>>,
telemetry: Option<TelemetryHandle>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
where
Block: BlockT,
BE: BackendT<Block>,
Client: ClientForGrandpa<Block, BE>,
{
let mut authority_set = authority_set.inner();
let status = client.info();
if number <= status.finalized_number && client.hash(number)? == Some(hash) {
warn!(target: LOG_TARGET, "Re-finalized block #{:?} ({:?}) in the canonical chain, current best finalized is #{:?}",
hash,
number,
status.finalized_number,
);
return Ok(())
}
let old_authority_set = authority_set.clone();
let update_res: Result<_, Error> = client.lock_import_and_run(|import_op| {
let status = authority_set
.apply_standard_changes(
hash,
number,
&is_descendent_of::<Block, _>(&*client, None),
initial_sync,
None,
)
.map_err(|e| Error::Safety(e.to_string()))?;
fn notify_justification<Block: BlockT>(
justification_sender: Option<&GrandpaJustificationSender<Block>>,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) {
if let Some(sender) = justification_sender {
if let Err(err) = sender.notify(justification) {
warn!(
target: LOG_TARGET,
"Error creating justification for subscriber: {}", err
);
}
}
}
let (justification_required, justification) = match justification_or_commit {
JustificationOrCommit::Justification(justification) => (true, justification),
JustificationOrCommit::Commit((round_number, commit)) => {
let enacts_change = status.new_set_block.is_some();
let justification_required = justification_generation_period
.map(|period| {
should_process_justification(&*client, period, number, enacts_change)
})
.unwrap_or(enacts_change);
let justification =
GrandpaJustification::from_commit(&client, round_number, commit)?;
(justification_required, justification)
},
};
notify_justification(justification_sender, || Ok(justification.clone()));
let persisted_justification = if justification_required {
Some((GRANDPA_ENGINE_ID, justification.encode()))
} else {
None
};
client
.apply_finality(import_op, hash, persisted_justification, true)
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Error applying finality to block {:?}: {}",
(hash, number),
e
);
e
})?;
debug!(target: LOG_TARGET, "Finalizing blocks up to ({:?}, {})", number, hash);
telemetry!(
telemetry;
CONSENSUS_INFO;
"afg.finalized_blocks_up_to";
"number" => ?number, "hash" => ?hash,
);
crate::aux_schema::update_best_justification(&justification, |insert| {
apply_aux(import_op, insert, &[])
})?;
let new_authorities = if let Some((canon_hash, canon_number)) = status.new_set_block {
let (new_id, set_ref) = authority_set.current();
if set_ref.len() > 16 {
grandpa_log!(
initial_sync,
"👴 Applying GRANDPA set change to new set with {} authorities",
set_ref.len(),
);
} else {
grandpa_log!(
initial_sync,
"👴 Applying GRANDPA set change to new set {:?}",
set_ref
);
}
telemetry!(
telemetry;
CONSENSUS_INFO;
"afg.generating_new_authority_set";
"number" => ?canon_number, "hash" => ?canon_hash,
"authorities" => ?set_ref.to_vec(),
"set_id" => ?new_id,
);
Some(NewAuthoritySet {
canon_hash,
canon_number,
set_id: new_id,
authorities: set_ref.to_vec(),
})
} else {
None
};
if status.changed {
let write_result = crate::aux_schema::update_authority_set::<Block, _, _>(
&authority_set,
new_authorities.as_ref(),
|insert| apply_aux(import_op, insert, &[]),
);
if let Err(e) = write_result {
warn!(
target: LOG_TARGET,
"Failed to write updated authority set to disk. Bailing."
);
warn!(target: LOG_TARGET, "Node is in a potentially inconsistent state.");
return Err(e.into())
}
}
Ok(new_authorities.map(VoterCommand::ChangeAuthorities))
});
match update_res {
Ok(Some(command)) => Err(CommandOrError::VoterCommand(command)),
Ok(None) => Ok(()),
Err(e) => {
*authority_set = old_authority_set;
Err(CommandOrError::Error(e))
},
}
}