use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use codec::Decode;
use log::debug;
use parking_lot::Mutex;
use sc_client_api::{backend::Backend, utils::is_descendent_of};
use sc_consensus::{
shared_data::{SharedDataLocked, SharedDataLockedUpgradable},
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
};
use sc_telemetry::TelemetryHandle;
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::{Core, RuntimeApiInfo};
use sp_blockchain::BlockStatus;
use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
use sp_consensus_grandpa::{ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::{
generic::OpaqueDigestItemId,
traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
Justification,
};
use crate::{
authorities::{AuthoritySet, DelayKind, PendingChange, SharedAuthoritySet},
environment,
justification::GrandpaJustification,
notification::GrandpaJustificationSender,
AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand,
LOG_TARGET,
};
pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
inner: Arc<Client>,
justification_import_period: u32,
select_chain: SC,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
authority_set_hard_forks:
Mutex<HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>>,
justification_sender: GrandpaJustificationSender<Block>,
telemetry: Option<TelemetryHandle>,
_phantom: PhantomData<Backend>,
}
impl<Backend, Block: BlockT, Client, SC: Clone> Clone
for GrandpaBlockImport<Backend, Block, Client, SC>
{
fn clone(&self) -> Self {
GrandpaBlockImport {
inner: self.inner.clone(),
justification_import_period: self.justification_import_period,
select_chain: self.select_chain.clone(),
authority_set: self.authority_set.clone(),
send_voter_commands: self.send_voter_commands.clone(),
authority_set_hard_forks: Mutex::new(self.authority_set_hard_forks.lock().clone()),
justification_sender: self.justification_sender.clone(),
telemetry: self.telemetry.clone(),
_phantom: PhantomData,
}
}
}
#[async_trait::async_trait]
impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
for GrandpaBlockImport<BE, Block, Client, SC>
where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
BE: Backend<Block>,
Client: ClientForGrandpa<Block, BE>,
SC: SelectChain<Block>,
{
type Error = ConsensusError;
async fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
let mut out = Vec::new();
let chain_info = self.inner.info();
let pending_changes: Vec<_> =
self.authority_set.inner().pending_changes().cloned().collect();
for pending_change in pending_changes {
if pending_change.delay_kind == DelayKind::Finalized &&
pending_change.effective_number() > chain_info.finalized_number &&
pending_change.effective_number() <= chain_info.best_number
{
let effective_block_hash = if !pending_change.delay.is_zero() {
self.select_chain
.finality_target(
pending_change.canon_hash,
Some(pending_change.effective_number()),
)
.await
} else {
Ok(pending_change.canon_hash)
};
if let Ok(hash) = effective_block_hash {
if let Ok(Some(header)) = self.inner.header(hash) {
if *header.number() == pending_change.effective_number() {
out.push((header.hash(), *header.number()));
}
}
}
}
}
out
}
async fn import_justification(
&mut self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
}
}
enum AppliedChanges<H, N> {
Standard(bool), Forced(NewAuthoritySet<H, N>),
None,
}
impl<H, N> AppliedChanges<H, N> {
fn needs_justification(&self) -> bool {
match *self {
AppliedChanges::Standard(_) => true,
AppliedChanges::Forced(_) | AppliedChanges::None => false,
}
}
}
struct PendingSetChanges<Block: BlockT> {
just_in_case: Option<(
AuthoritySet<Block::Hash, NumberFor<Block>>,
SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
)>,
applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
do_pause: bool,
}
impl<Block: BlockT> PendingSetChanges<Block> {
fn revert(self) {}
fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
self.just_in_case = None;
let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
(applied_changes, self.do_pause)
}
}
impl<Block: BlockT> Drop for PendingSetChanges<Block> {
fn drop(&mut self) {
if let Some((old_set, mut authorities)) = self.just_in_case.take() {
*authorities.upgrade() = old_set;
}
}
}
pub fn find_scheduled_change<B: BlockT>(
header: &B::Header,
) -> Option<ScheduledChange<NumberFor<B>>> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
ConsensusLog::ScheduledChange(change) => Some(change),
_ => None,
};
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
pub fn find_forced_change<B: BlockT>(
header: &B::Header,
) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
_ => None,
};
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
BE: Backend<Block>,
Client: ClientForGrandpa<Block, BE>,
Client::Api: GrandpaApi<Block>,
for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
{
fn check_new_change(
&self,
header: &Block::Header,
hash: Block::Hash,
) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
if let Some(change) = self.authority_set_hard_forks.lock().get(&hash) {
return Some(change.clone())
}
if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
return Some(PendingChange {
next_authorities: change.next_authorities,
delay: change.delay,
canon_height: *header.number(),
canon_hash: hash,
delay_kind: DelayKind::Best { median_last_finalized },
})
}
let change = find_scheduled_change::<Block>(header)?;
Some(PendingChange {
next_authorities: change.next_authorities,
delay: change.delay,
canon_height: *header.number(),
canon_hash: hash,
delay_kind: DelayKind::Finalized,
})
}
fn make_authorities_changes(
&self,
block: &mut BlockImportParams<Block>,
hash: Block::Hash,
initial_sync: bool,
) -> Result<PendingSetChanges<Block>, ConsensusError> {
struct InnerGuard<'a, H, N> {
old: Option<AuthoritySet<H, N>>,
guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
}
impl<'a, H, N> InnerGuard<'a, H, N> {
fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
self.guard.as_mut().expect("only taken on deconstruction; qed")
}
fn set_old(&mut self, old: AuthoritySet<H, N>) {
if self.old.is_none() {
self.old = Some(old);
}
}
fn consume(
mut self,
) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
self.old
.take()
.map(|old| (old, self.guard.take().expect("only taken on deconstruction; qed")))
}
}
impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
fn drop(&mut self) {
if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
*guard = old;
}
}
}
let number = *(block.header.number());
let maybe_change = self.check_new_change(&block.header, hash);
let parent_hash = *block.header.parent_hash();
let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
let mut guard = InnerGuard { guard: Some(self.authority_set.inner_locked()), old: None };
let mut do_pause = false;
if let Some(change) = maybe_change {
let old = guard.as_mut().clone();
guard.set_old(old);
if let DelayKind::Best { .. } = change.delay_kind {
do_pause = true;
}
guard
.as_mut()
.add_pending_change(change, &is_descendent_of)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
}
let applied_changes = {
let forced_change_set = guard
.as_mut()
.apply_forced_changes(
hash,
number,
&is_descendent_of,
initial_sync,
self.telemetry.clone(),
)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))
.map_err(ConsensusError::from)?;
if let Some((median_last_finalized_number, new_set)) = forced_change_set {
let new_authorities = {
let (set_id, new_authorities) = new_set.current();
let best_finalized_number = self.inner.info().finalized_number;
let canon_number = best_finalized_number.min(median_last_finalized_number);
let canon_hash = self.inner.hash(canon_number)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
.expect(
"the given block number is less or equal than the current best finalized number; \
current best finalized number must exist in chain; qed."
);
NewAuthoritySet {
canon_number,
canon_hash,
set_id,
authorities: new_authorities.to_vec(),
}
};
let old = ::std::mem::replace(guard.as_mut(), new_set);
guard.set_old(old);
AppliedChanges::Forced(new_authorities)
} else {
let did_standard = guard
.as_mut()
.enacts_standard_change(hash, number, &is_descendent_of)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))
.map_err(ConsensusError::from)?;
if let Some(root) = did_standard {
AppliedChanges::Standard(root)
} else {
AppliedChanges::None
}
}
};
let just_in_case = guard.consume();
if let Some((_, ref authorities)) = just_in_case {
let authorities_change = match applied_changes {
AppliedChanges::Forced(ref new) => Some(new),
AppliedChanges::Standard(_) => None, AppliedChanges::None => None,
};
crate::aux_schema::update_authority_set::<Block, _, _>(
authorities,
authorities_change,
|insert| {
block
.auxiliary
.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
},
);
}
let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
}
fn current_set_id(&self, hash: Block::Hash) -> Result<SetId, ConsensusError> {
let runtime_version = self.inner.runtime_api().version(hash).map_err(|e| {
ConsensusError::ClientImport(format!(
"Unable to retrieve current runtime version. {}",
e
))
})?;
if runtime_version
.api_version(&<dyn GrandpaApi<Block>>::ID)
.map_or(false, |v| v < 3)
{
for prefix in ["GrandpaFinality", "Grandpa"] {
let k = [
sp_crypto_hashing::twox_128(prefix.as_bytes()),
sp_crypto_hashing::twox_128(b"CurrentSetId"),
]
.concat();
if let Ok(Some(id)) =
self.inner.storage(hash, &sc_client_api::StorageKey(k.to_vec()))
{
if let Ok(id) = SetId::decode(&mut id.0.as_ref()) {
return Ok(id)
}
}
}
Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into()))
} else {
self.inner
.runtime_api()
.current_set_id(hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))
}
}
async fn import_state(
&self,
mut block: BlockImportParams<Block>,
) -> Result<ImportResult, ConsensusError> {
let hash = block.post_hash();
let number = *block.header.number();
block.finalized = true;
let import_result = (&*self.inner).import_block(block).await;
match import_result {
Ok(ImportResult::Imported(aux)) => {
self.authority_set_hard_forks.lock().clear();
let authorities = self
.inner
.runtime_api()
.grandpa_authorities(hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
let set_id = self.current_set_id(hash)?;
let authority_set = AuthoritySet::new(
authorities.clone(),
set_id,
fork_tree::ForkTree::new(),
Vec::new(),
AuthoritySetChanges::empty(),
)
.ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?;
*self.authority_set.inner_locked() = authority_set.clone();
crate::aux_schema::update_authority_set::<Block, _, _>(
&authority_set,
None,
|insert| self.inner.insert_aux(insert, []),
)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
let new_set =
NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
let _ = self
.send_voter_commands
.unbounded_send(VoterCommand::ChangeAuthorities(new_set));
Ok(ImportResult::Imported(aux))
},
Ok(r) => Ok(r),
Err(e) => Err(ConsensusError::ClientImport(e.to_string())),
}
}
}
#[async_trait::async_trait]
impl<BE, Block: BlockT, Client, SC> BlockImport<Block> for GrandpaBlockImport<BE, Block, Client, SC>
where
NumberFor<Block>: finality_grandpa::BlockNumberOps,
BE: Backend<Block>,
Client: ClientForGrandpa<Block, BE>,
Client::Api: GrandpaApi<Block>,
for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
SC: Send + Sync,
{
type Error = ConsensusError;
async fn import_block(
&self,
mut block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
let hash = block.post_hash();
let number = *block.header.number();
match self.inner.status(hash) {
Ok(BlockStatus::InChain) => {
let _justifications = block.justifications.take();
return (&*self.inner).import_block(block).await
},
Ok(BlockStatus::Unknown) => {},
Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
}
if block.with_state() {
return self.import_state(block).await
}
if number <= self.inner.info().finalized_number {
if self.check_new_change(&block.header, hash).is_some() {
if block.justifications.is_none() {
return Err(ConsensusError::ClientImport(
"Justification required when importing \
an old block with authority set change."
.into(),
))
}
let mut authority_set = self.authority_set.inner_locked();
authority_set.authority_set_changes.insert(number);
crate::aux_schema::update_authority_set::<Block, _, _>(
&authority_set,
None,
|insert| {
block
.auxiliary
.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
},
);
}
return (&*self.inner).import_block(block).await
}
let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;
let mut justifications = block.justifications.take();
let import_result = (&*self.inner).import_block(block).await;
let mut imported_aux = {
match import_result {
Ok(ImportResult::Imported(aux)) => aux,
Ok(r) => {
debug!(
target: LOG_TARGET,
"Restoring old authority set after block import result: {:?}", r,
);
pending_changes.revert();
return Ok(r)
},
Err(e) => {
debug!(
target: LOG_TARGET,
"Restoring old authority set after block import error: {}", e,
);
pending_changes.revert();
return Err(ConsensusError::ClientImport(e.to_string()))
},
}
};
let (applied_changes, do_pause) = pending_changes.defuse();
if do_pause {
let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
"Forced change scheduled after inactivity".to_string(),
));
}
let needs_justification = applied_changes.needs_justification();
match applied_changes {
AppliedChanges::Forced(new) => {
let _ =
self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
imported_aux.clear_justification_requests = true;
},
AppliedChanges::Standard(false) => {
justifications.take();
},
_ => {},
}
let grandpa_justification =
justifications.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID));
match grandpa_justification {
Some(justification) => {
if environment::should_process_justification(
&*self.inner,
self.justification_import_period,
number,
needs_justification,
) {
let import_res = self.import_justification(
hash,
number,
(GRANDPA_ENGINE_ID, justification),
needs_justification,
initial_sync,
);
import_res.unwrap_or_else(|err| {
if needs_justification {
debug!(
target: LOG_TARGET,
"Requesting justification from peers due to imported block #{} that enacts authority set change with invalid justification: {}",
number,
err
);
imported_aux.bad_justification = true;
imported_aux.needs_justification = true;
}
});
} else {
debug!(
target: LOG_TARGET,
"Ignoring unnecessary justification for block #{}",
number,
);
}
},
None =>
if needs_justification {
debug!(
target: LOG_TARGET,
"Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
number,
);
imported_aux.needs_justification = true;
},
}
Ok(ImportResult::Imported(imported_aux))
}
async fn check_block(
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await
}
}
impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
pub(crate) fn new(
inner: Arc<Client>,
justification_import_period: u32,
select_chain: SC,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
justification_sender: GrandpaJustificationSender<Block>,
telemetry: Option<TelemetryHandle>,
) -> GrandpaBlockImport<Backend, Block, Client, SC> {
if let Some((_, change)) = authority_set_hard_forks
.iter()
.find(|(set_id, _)| *set_id == authority_set.set_id())
{
authority_set.inner().current_authorities = change.next_authorities.clone();
}
let authority_set_hard_forks = authority_set_hard_forks
.into_iter()
.map(|(_, change)| (change.canon_hash, change))
.collect::<HashMap<_, _>>();
{
let mut authority_set = authority_set.inner();
authority_set.pending_standard_changes =
authority_set.pending_standard_changes.clone().map(&mut |hash, _, original| {
authority_set_hard_forks.get(hash).cloned().unwrap_or(original)
});
}
GrandpaBlockImport {
inner,
justification_import_period,
select_chain,
authority_set,
send_voter_commands,
authority_set_hard_forks: Mutex::new(authority_set_hard_forks),
justification_sender,
telemetry,
_phantom: PhantomData,
}
}
}
impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
where
BE: Backend<Block>,
Client: ClientForGrandpa<Block, BE>,
NumberFor<Block>: finality_grandpa::BlockNumberOps,
{
fn import_justification(
&self,
hash: Block::Hash,
number: NumberFor<Block>,
justification: Justification,
enacts_change: bool,
initial_sync: bool,
) -> Result<(), ConsensusError> {
if justification.0 != GRANDPA_ENGINE_ID {
return Ok(())
}
let justification = GrandpaJustification::decode_and_verify_finalizes(
&justification.1,
(hash, number),
self.authority_set.set_id(),
&self.authority_set.current_authorities(),
);
let justification = match justification {
Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
Ok(justification) => justification,
};
let result = environment::finalize_block(
self.inner.clone(),
&self.authority_set,
None,
hash,
number,
justification.into(),
initial_sync,
Some(&self.justification_sender),
self.telemetry.clone(),
);
match result {
Err(CommandOrError::VoterCommand(command)) => {
grandpa_log!(
initial_sync,
"👴 Imported justification for block #{} that triggers \
command {}, signaling voter.",
number,
command,
);
let _ = self.send_voter_commands.unbounded_send(command);
},
Err(CommandOrError::Error(e)) =>
return Err(match e {
Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
Error::Network(error) => ConsensusError::ClientImport(error),
Error::Blockchain(error) => ConsensusError::ClientImport(error),
Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
Error::Safety(error) => ConsensusError::ClientImport(error),
Error::Signing(error) => ConsensusError::ClientImport(error),
Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
}),
Ok(_) => {
assert!(
!enacts_change,
"returns Ok when no authority set change should be enacted; qed;"
);
},
}
Ok(())
}
}