use codec::{Decode, DecodeAll, Encode};
use crate::{
best_justification, find_scheduled_change, AuthoritySetChanges, AuthoritySetHardFork,
BlockNumberOps, GrandpaJustification, SharedAuthoritySet,
};
use sc_client_api::Backend as ClientBackend;
use sc_network_sync::strategy::warp::{EncodedProof, VerificationResult, WarpSyncProvider};
use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
use sp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, NumberFor, One},
};
use std::{collections::HashMap, sync::Arc};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Failed to decode block hash: {0}.")]
DecodeScale(#[from] codec::Error),
#[error("{0}")]
Client(#[from] sp_blockchain::Error),
#[error("{0}")]
InvalidRequest(String),
#[error("{0}")]
InvalidProof(String),
#[error("Missing required data to be able to answer request.")]
MissingData,
}
pub(super) const MAX_WARP_SYNC_PROOF_SIZE: usize = 8 * 1024 * 1024;
#[derive(Decode, Encode, Debug)]
pub struct WarpSyncFragment<Block: BlockT> {
pub header: Block::Header,
pub justification: GrandpaJustification<Block>,
}
#[derive(Decode, Encode)]
pub struct WarpSyncProof<Block: BlockT> {
proofs: Vec<WarpSyncFragment<Block>>,
is_finished: bool,
}
impl<Block: BlockT> WarpSyncProof<Block> {
fn generate<Backend>(
backend: &Backend,
begin: Block::Hash,
set_changes: &AuthoritySetChanges<NumberFor<Block>>,
) -> Result<WarpSyncProof<Block>, Error>
where
Backend: ClientBackend<Block>,
{
let blockchain = backend.blockchain();
let begin_number = blockchain
.block_number_from_id(&BlockId::Hash(begin))?
.ok_or_else(|| Error::InvalidRequest("Missing start block".to_string()))?;
if begin_number > blockchain.info().finalized_number {
return Err(Error::InvalidRequest("Start block is not finalized".to_string()))
}
let canon_hash = blockchain.hash(begin_number)?.expect(
"begin number is lower than finalized number; \
all blocks below finalized number must have been imported; \
qed.",
);
if canon_hash != begin {
return Err(Error::InvalidRequest(
"Start block is not in the finalized chain".to_string(),
))
}
let mut proofs = Vec::new();
let mut proofs_encoded_len = 0;
let mut proof_limit_reached = false;
let set_changes = set_changes.iter_from(begin_number).ok_or(Error::MissingData)?;
for (_, last_block) in set_changes {
let hash = blockchain.block_hash_from_id(&BlockId::Number(*last_block))?
.expect("header number comes from previously applied set changes; corresponding hash must exist in db; qed.");
let header = blockchain
.header(hash)?
.expect("header hash obtained from header number exists in db; corresponding header must exist in db too; qed.");
if find_scheduled_change::<Block>(&header).is_none() {
break
}
let justification = blockchain
.justifications(header.hash())?
.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID))
.ok_or_else(|| Error::MissingData)?;
let justification = GrandpaJustification::<Block>::decode_all(&mut &justification[..])?;
let proof = WarpSyncFragment { header: header.clone(), justification };
let proof_size = proof.encoded_size();
if proofs_encoded_len + proof_size >= MAX_WARP_SYNC_PROOF_SIZE - 50 {
proof_limit_reached = true;
break
}
proofs_encoded_len += proof_size;
proofs.push(proof);
}
let is_finished = if proof_limit_reached {
false
} else {
let latest_justification = best_justification(backend)?.filter(|justification| {
let limit = proofs
.last()
.map(|proof| proof.justification.target().0 + One::one())
.unwrap_or(begin_number);
justification.target().0 >= limit
});
if let Some(latest_justification) = latest_justification {
let header = blockchain.header(latest_justification.target().1)?
.expect("header hash corresponds to a justification in db; must exist in db as well; qed.");
proofs.push(WarpSyncFragment { header, justification: latest_justification })
}
true
};
let final_outcome = WarpSyncProof { proofs, is_finished };
debug_assert!(final_outcome.encoded_size() <= MAX_WARP_SYNC_PROOF_SIZE);
Ok(final_outcome)
}
fn verify(
&self,
set_id: SetId,
authorities: AuthorityList,
hard_forks: &HashMap<(Block::Hash, NumberFor<Block>), (SetId, AuthorityList)>,
) -> Result<(SetId, AuthorityList), Error>
where
NumberFor<Block>: BlockNumberOps,
{
let mut current_set_id = set_id;
let mut current_authorities = authorities;
for (fragment_num, proof) in self.proofs.iter().enumerate() {
let hash = proof.header.hash();
let number = *proof.header.number();
if let Some((set_id, list)) = hard_forks.get(&(hash, number)) {
current_set_id = *set_id;
current_authorities = list.clone();
} else {
proof
.justification
.verify(current_set_id, ¤t_authorities)
.map_err(|err| Error::InvalidProof(err.to_string()))?;
if proof.justification.target().1 != hash {
return Err(Error::InvalidProof(
"Mismatch between header and justification".to_owned(),
))
}
if let Some(scheduled_change) = find_scheduled_change::<Block>(&proof.header) {
current_authorities = scheduled_change.next_authorities;
current_set_id += 1;
} else if fragment_num != self.proofs.len() - 1 || !self.is_finished {
return Err(Error::InvalidProof(
"Header is missing authority set change digest".to_string(),
))
}
}
}
Ok((current_set_id, current_authorities))
}
}
pub struct NetworkProvider<Block: BlockT, Backend: ClientBackend<Block>>
where
NumberFor<Block>: BlockNumberOps,
{
backend: Arc<Backend>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
hard_forks: HashMap<(Block::Hash, NumberFor<Block>), (SetId, AuthorityList)>,
}
impl<Block: BlockT, Backend: ClientBackend<Block>> NetworkProvider<Block, Backend>
where
NumberFor<Block>: BlockNumberOps,
{
pub fn new(
backend: Arc<Backend>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
hard_forks: Vec<AuthoritySetHardFork<Block>>,
) -> Self {
NetworkProvider {
backend,
authority_set,
hard_forks: hard_forks
.into_iter()
.map(|fork| (fork.block, (fork.set_id, fork.authorities)))
.collect(),
}
}
}
impl<Block: BlockT, Backend: ClientBackend<Block>> WarpSyncProvider<Block>
for NetworkProvider<Block, Backend>
where
NumberFor<Block>: BlockNumberOps,
{
fn generate(
&self,
start: Block::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>> {
let proof = WarpSyncProof::<Block>::generate(
&*self.backend,
start,
&self.authority_set.authority_set_changes(),
)
.map_err(Box::new)?;
Ok(EncodedProof(proof.encode()))
}
fn verify(
&self,
proof: &EncodedProof,
set_id: SetId,
authorities: AuthorityList,
) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>> {
let EncodedProof(proof) = proof;
let proof = WarpSyncProof::<Block>::decode_all(&mut proof.as_slice())
.map_err(|e| format!("Proof decoding error: {:?}", e))?;
let last_header = proof
.proofs
.last()
.map(|p| p.header.clone())
.ok_or_else(|| "Empty proof".to_string())?;
let (next_set_id, next_authorities) =
proof.verify(set_id, authorities, &self.hard_forks).map_err(Box::new)?;
if proof.is_finished {
Ok(VerificationResult::<Block>::Complete(next_set_id, next_authorities, last_header))
} else {
Ok(VerificationResult::<Block>::Partial(
next_set_id,
next_authorities,
last_header.hash(),
))
}
}
fn current_authorities(&self) -> AuthorityList {
self.authority_set.inner().current_authorities.clone()
}
}
#[cfg(test)]
mod tests {
use super::WarpSyncProof;
use crate::{AuthoritySetChanges, GrandpaJustification};
use codec::Encode;
use rand::prelude::*;
use sc_block_builder::BlockBuilderBuilder;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_consensus_grandpa::GRANDPA_ENGINE_ID;
use sp_keyring::Ed25519Keyring;
use std::sync::Arc;
use substrate_test_runtime_client::{
BlockBuilderExt, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt,
TestClientBuilder, TestClientBuilderExt,
};
#[test]
fn warp_sync_proof_generate_verify() {
let mut rng = rand::rngs::StdRng::from_seed([0; 32]);
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());
let available_authorities = Ed25519Keyring::iter().collect::<Vec<_>>();
let genesis_authorities = vec![(Ed25519Keyring::Alice.public().into(), 1)];
let mut current_authorities = vec![Ed25519Keyring::Alice];
let mut current_set_id = 0;
let mut authority_set_changes = Vec::new();
for n in 1..=100 {
let mut builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap();
let mut new_authorities = None;
if n != 0 && n % 10 == 0 {
let n_authorities = rng.gen_range(1..available_authorities.len());
let next_authorities = available_authorities
.choose_multiple(&mut rng, n_authorities)
.cloned()
.collect::<Vec<_>>();
new_authorities = Some(next_authorities.clone());
let next_authorities = next_authorities
.iter()
.map(|keyring| (keyring.public().into(), 1))
.collect::<Vec<_>>();
let digest = sp_runtime::generic::DigestItem::Consensus(
sp_consensus_grandpa::GRANDPA_ENGINE_ID,
sp_consensus_grandpa::ConsensusLog::ScheduledChange(
sp_consensus_grandpa::ScheduledChange { delay: 0u64, next_authorities },
)
.encode(),
);
builder.push_deposit_log_digest_item(digest).unwrap();
}
let block = builder.build().unwrap().block;
futures::executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
if let Some(new_authorities) = new_authorities {
let (target_hash, target_number) = {
let info = client.info();
(info.best_hash, info.best_number)
};
let mut precommits = Vec::new();
for keyring in ¤t_authorities {
let precommit = finality_grandpa::Precommit { target_hash, target_number };
let msg = finality_grandpa::Message::Precommit(precommit.clone());
let encoded = sp_consensus_grandpa::localized_payload(42, current_set_id, &msg);
let signature = keyring.sign(&encoded[..]).into();
let precommit = finality_grandpa::SignedPrecommit {
precommit,
signature,
id: keyring.public().into(),
};
precommits.push(precommit);
}
let commit = finality_grandpa::Commit { target_hash, target_number, precommits };
let justification = GrandpaJustification::from_commit(&client, 42, commit).unwrap();
client
.finalize_block(target_hash, Some((GRANDPA_ENGINE_ID, justification.encode())))
.unwrap();
authority_set_changes.push((current_set_id, n));
current_set_id += 1;
current_authorities = new_authorities;
}
}
let authority_set_changes = AuthoritySetChanges::from(authority_set_changes);
let genesis_hash = client.hash(0).unwrap().unwrap();
let warp_sync_proof =
WarpSyncProof::generate(&*backend, genesis_hash, &authority_set_changes).unwrap();
let (new_set_id, new_authorities) =
warp_sync_proof.verify(0, genesis_authorities, &Default::default()).unwrap();
let expected_authorities = current_authorities
.iter()
.map(|keyring| (keyring.public().into(), 1))
.collect::<Vec<_>>();
assert_eq!(new_set_id, current_set_id);
assert_eq!(new_authorities, expected_authorities);
}
}