pub use sp_consensus_grandpa::{AuthorityList, SetId};
use crate::{
block_relay_protocol::{BlockDownloader, BlockResponseError},
service::network::NetworkServiceHandle,
strategy::{
chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
SyncingAction,
},
types::{BadPeer, SyncState, SyncStatus},
LOG_TARGET,
};
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use log::{debug, error, trace, warn};
use sc_network::{IfDisconnected, ProtocolName};
use sc_network_common::sync::message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
};
use sc_network_types::PeerId;
use sp_blockchain::HeaderBackend;
use sp_runtime::{
traits::{Block as BlockT, Header, NumberFor, Zero},
Justifications, SaturatedConversion,
};
use std::{any::Any, collections::HashMap, fmt, sync::Arc};
const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
pub struct EncodedProof(pub Vec<u8>);
#[derive(Encode, Decode, Debug, Clone)]
pub struct WarpProofRequest<B: BlockT> {
pub begin: B::Hash,
}
pub enum VerificationResult<Block: BlockT> {
Partial(SetId, AuthorityList, Block::Hash),
Complete(SetId, AuthorityList, Block::Header),
}
pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
fn generate(
&self,
start: Block::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
fn verify(
&self,
proof: &EncodedProof,
set_id: SetId,
authorities: AuthorityList,
) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
fn current_authorities(&self) -> AuthorityList;
}
mod rep {
use sc_network::ReputationChange as Rep;
pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase<Block: BlockT> {
AwaitingPeers { required_peers: usize },
DownloadingWarpProofs,
DownloadingTargetBlock,
DownloadingState,
ImportingState,
DownloadingBlocks(NumberFor<Block>),
Complete,
}
impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::AwaitingPeers { required_peers } =>
write!(f, "Waiting for {required_peers} peers to be connected"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
Self::Complete => write!(f, "Warp sync is complete"),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress<Block: BlockT> {
pub phase: WarpSyncPhase<Block>,
pub total_bytes: u64,
}
pub enum WarpSyncConfig<Block: BlockT> {
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
WithTarget(<Block as BlockT>::Header),
}
enum Phase<B: BlockT> {
WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
WarpProof {
set_id: SetId,
authorities: AuthorityList,
last_hash: B::Hash,
warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
},
TargetBlock(B::Header),
Complete,
}
enum PeerState {
Available,
DownloadingProofs,
DownloadingTargetBlock,
}
impl PeerState {
fn is_available(&self) -> bool {
matches!(self, PeerState::Available)
}
}
struct Peer<B: BlockT> {
best_number: NumberFor<B>,
state: PeerState,
}
pub struct WarpSyncResult<B: BlockT> {
pub target_header: B::Header,
pub target_body: Option<Vec<B::Extrinsic>>,
pub target_justifications: Option<Justifications>,
}
pub struct WarpSync<B: BlockT, Client> {
phase: Phase<B>,
client: Arc<Client>,
total_proof_bytes: u64,
total_state_bytes: u64,
peers: HashMap<PeerId, Peer<B>>,
disconnected_peers: DisconnectedPeers,
protocol_name: Option<ProtocolName>,
block_downloader: Arc<dyn BlockDownloader<B>>,
actions: Vec<SyncingAction<B>>,
result: Option<WarpSyncResult<B>>,
}
impl<B, Client> WarpSync<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + 'static,
{
pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
pub fn new(
client: Arc<Client>,
warp_sync_config: WarpSyncConfig<B>,
protocol_name: Option<ProtocolName>,
block_downloader: Arc<dyn BlockDownloader<B>>,
) -> Self {
if client.info().finalized_state.is_some() {
error!(
target: LOG_TARGET,
"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
);
return Self {
client,
phase: Phase::Complete,
total_proof_bytes: 0,
total_state_bytes: 0,
peers: HashMap::new(),
disconnected_peers: DisconnectedPeers::new(),
protocol_name,
block_downloader,
actions: vec![SyncingAction::Finished],
result: None,
}
}
let phase = match warp_sync_config {
WarpSyncConfig::WithProvider(warp_sync_provider) =>
Phase::WaitingForPeers { warp_sync_provider },
WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
};
Self {
client,
phase,
total_proof_bytes: 0,
total_state_bytes: 0,
peers: HashMap::new(),
disconnected_peers: DisconnectedPeers::new(),
protocol_name,
block_downloader,
actions: Vec::new(),
result: None,
}
}
pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
self.try_to_start_warp_sync();
}
pub fn remove_peer(&mut self, peer_id: &PeerId) {
if let Some(state) = self.peers.remove(peer_id) {
if !state.state.is_available() {
if let Some(bad_peer) =
self.disconnected_peers.on_disconnect_during_request(*peer_id)
{
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
}
}
}
#[must_use]
pub fn on_validated_block_announce(
&mut self,
is_best: bool,
peer_id: PeerId,
announce: &BlockAnnounce<B::Header>,
) -> Option<(B::Hash, NumberFor<B>)> {
is_best.then(|| {
let best_number = *announce.header.number();
let best_hash = announce.header.hash();
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.best_number = best_number;
}
(best_hash, best_number)
})
}
fn try_to_start_warp_sync(&mut self) {
let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
if self.peers.len() < MIN_PEERS_TO_START_WARP_SYNC {
return
}
self.phase = Phase::WarpProof {
set_id: 0,
authorities: warp_sync_provider.current_authorities(),
last_hash: self.client.info().genesis_hash,
warp_sync_provider: Arc::clone(warp_sync_provider),
};
trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
}
pub fn on_generic_response(
&mut self,
peer_id: &PeerId,
protocol_name: ProtocolName,
response: Box<dyn Any + Send>,
) {
if &protocol_name == self.block_downloader.protocol_name() {
let Ok(response) = response
.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
else {
warn!(target: LOG_TARGET, "Failed to downcast block response");
debug_assert!(false);
return;
};
let (request, response) = *response;
let blocks = match response {
Ok(blocks) => blocks,
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to decode block response from peer {:?}: {:?}.",
peer_id,
e
);
self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
return;
},
Err(BlockResponseError::ExtractionFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to extract blocks from peer response {:?}: {:?}.",
peer_id,
e
);
self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
return;
},
};
self.on_block_response(*peer_id, request, blocks);
} else {
let Ok(response) = response.downcast::<Vec<u8>>() else {
warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
debug_assert!(false);
return;
};
self.on_warp_proof_response(peer_id, EncodedProof(*response));
}
}
pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.state = PeerState::Available;
}
let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
&mut self.phase
else {
debug!(target: LOG_TARGET, "Unexpected warp proof response");
self.actions
.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
return
};
match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
Err(e) => {
debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
self.actions
.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
},
Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash)) => {
log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
*set_id = new_set_id;
*authorities = new_authorities;
*last_hash = new_last_hash;
self.total_proof_bytes += response.0.len() as u64;
},
Ok(VerificationResult::Complete(new_set_id, _, header)) => {
log::debug!(
target: LOG_TARGET,
"Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
new_set_id,
header.hash(),
header.number(),
);
self.total_proof_bytes += response.0.len() as u64;
self.phase = Phase::TargetBlock(header);
},
}
}
pub fn on_block_response(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
blocks: Vec<BlockData<B>>,
) {
if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
}
fn on_block_response_inner(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
mut blocks: Vec<BlockData<B>>,
) -> Result<(), BadPeer> {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state = PeerState::Available;
}
let Phase::TargetBlock(header) = &mut self.phase else {
debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
};
if blocks.is_empty() {
debug!(
target: LOG_TARGET,
"Downloading target block failed: empty block response from {peer_id}",
);
return Err(BadPeer(peer_id, rep::NO_BLOCK))
}
if blocks.len() > 1 {
debug!(
target: LOG_TARGET,
"Too many blocks ({}) in warp target block response from {peer_id}",
blocks.len(),
);
return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
}
validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
let block = blocks.pop().expect("`blocks` len checked above; qed");
let Some(block_header) = &block.header else {
debug!(
target: LOG_TARGET,
"Downloading target block failed: missing header in response from {peer_id}.",
);
return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
};
if block_header != header {
debug!(
target: LOG_TARGET,
"Downloading target block failed: different header in response from {peer_id}.",
);
return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
}
if block.body.is_none() {
debug!(
target: LOG_TARGET,
"Downloading target block failed: missing body in response from {peer_id}.",
);
return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
}
self.result = Some(WarpSyncResult {
target_header: header.clone(),
target_body: block.body,
target_justifications: block.justifications,
});
self.phase = Phase::Complete;
self.actions.push(SyncingAction::Finished);
Ok(())
}
fn schedule_next_peer(
&mut self,
new_state: PeerState,
min_best_number: Option<NumberFor<B>>,
) -> Option<PeerId> {
let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
if targets.is_empty() {
return None
}
targets.sort();
let median = targets[targets.len() / 2];
let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
for (peer_id, peer) in self.peers.iter_mut() {
if peer.state.is_available() &&
peer.best_number >= threshold &&
self.disconnected_peers.is_peer_available(peer_id)
{
peer.state = new_state;
return Some(*peer_id)
}
}
None
}
fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
let begin = *last_hash;
if self
.peers
.values()
.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
{
return None
}
let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
let request = WarpProofRequest { begin };
let Some(protocol_name) = self.protocol_name.clone() else {
warn!(
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
return None;
};
Some((peer_id, protocol_name, request))
}
fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
let Phase::TargetBlock(target_header) = &self.phase else { return None };
if self
.peers
.values()
.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
{
return None
}
let target_hash = target_header.hash();
let target_number = *target_header.number();
let peer_id =
self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
trace!(
target: LOG_TARGET,
"New target block request to {peer_id}, target: {} ({}).",
target_hash,
target_number,
);
Some((
peer_id,
BlockRequest::<B> {
id: 0,
fields: BlockAttributes::HEADER |
BlockAttributes::BODY |
BlockAttributes::JUSTIFICATION,
from: FromBlock::Hash(target_hash),
direction: Direction::Ascending,
max: Some(1),
},
))
}
pub fn progress(&self) -> WarpSyncProgress<B> {
match &self.phase {
Phase::WaitingForPeers { .. } => WarpSyncProgress {
phase: WarpSyncPhase::AwaitingPeers {
required_peers: MIN_PEERS_TO_START_WARP_SYNC,
},
total_bytes: self.total_proof_bytes,
},
Phase::WarpProof { .. } => WarpSyncProgress {
phase: WarpSyncPhase::DownloadingWarpProofs,
total_bytes: self.total_proof_bytes,
},
Phase::TargetBlock(_) => WarpSyncProgress {
phase: WarpSyncPhase::DownloadingTargetBlock,
total_bytes: self.total_proof_bytes,
},
Phase::Complete => WarpSyncProgress {
phase: WarpSyncPhase::Complete,
total_bytes: self.total_proof_bytes + self.total_state_bytes,
},
}
}
pub fn num_peers(&self) -> usize {
self.peers.len()
}
pub fn status(&self) -> SyncStatus<B> {
SyncStatus {
state: match &self.phase {
Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
Phase::Complete => SyncState::Idle,
},
best_seen_block: match &self.phase {
Phase::WaitingForPeers { .. } => None,
Phase::WarpProof { .. } => None,
Phase::TargetBlock(header) => Some(*header.number()),
Phase::Complete => None,
},
num_peers: self.peers.len().saturated_into(),
queued_blocks: 0,
state_sync: None,
warp_sync: Some(self.progress()),
}
}
#[must_use]
pub fn actions(
&mut self,
network_service: &NetworkServiceHandle,
) -> impl Iterator<Item = SyncingAction<B>> {
let warp_proof_request =
self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
trace!(
target: LOG_TARGET,
"Created `WarpProofRequest` to {}, request: {:?}.",
peer_id,
request,
);
let (tx, rx) = oneshot::channel();
network_service.start_request(
peer_id,
protocol_name,
request.encode(),
tx,
IfDisconnected::ImmediateError,
);
SyncingAction::StartRequest {
peer_id,
key: Self::STRATEGY_KEY,
request: async move {
Ok(rx.await?.and_then(|(response, protocol_name)| {
Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
}))
}
.boxed(),
remove_obsolete: false,
}
});
self.actions.extend(warp_proof_request);
let target_block_request =
self.target_block_request().into_iter().map(|(peer_id, request)| {
let downloader = self.block_downloader.clone();
SyncingAction::StartRequest {
peer_id,
key: Self::STRATEGY_KEY,
request: async move {
Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
|(response, protocol_name)| {
let decoded_response =
downloader.block_response_into_blocks(&request, response);
let result =
Box::new((request, decoded_response)) as Box<dyn Any + Send>;
Ok((result, protocol_name))
},
))
}
.boxed(),
remove_obsolete: true,
}
});
self.actions.extend(target_block_request);
std::mem::take(&mut self.actions).into_iter()
}
#[must_use]
pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
self.result.take()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
use sc_block_builder::BlockBuilderBuilder;
use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{io::ErrorKind, sync::Arc};
use substrate_test_runtime_client::{
runtime::{Block, Hash},
BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};
mockall::mock! {
pub Client<B: BlockT> {}
impl<B: BlockT> HeaderBackend<B> for Client<B> {
fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
fn info(&self) -> Info<B>;
fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
fn number(
&self,
hash: B::Hash,
) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
}
}
mockall::mock! {
pub WarpSyncProvider<B: BlockT> {}
impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
fn generate(
&self,
start: B::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
fn verify(
&self,
proof: &EncodedProof,
set_id: SetId,
authorities: AuthorityList,
) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
fn current_authorities(&self) -> AuthorityList;
}
}
fn mock_client_with_state() -> MockClient<Block> {
let mut client = MockClient::<Block>::new();
let genesis_hash = Hash::random();
client.expect_info().return_once(move || Info {
best_hash: genesis_hash,
best_number: 0,
genesis_hash,
finalized_hash: genesis_hash,
finalized_number: 0,
finalized_state: Some((genesis_hash, 0)),
number_leaves: 0,
block_gap: None,
});
client
}
fn mock_client_without_state() -> MockClient<Block> {
let mut client = MockClient::<Block>::new();
let genesis_hash = Hash::random();
client.expect_info().returning(move || Info {
best_hash: genesis_hash,
best_number: 0,
genesis_hash,
finalized_hash: genesis_hash,
finalized_number: 0,
finalized_state: None,
number_leaves: 0,
block_gap: None,
});
client
}
#[test]
fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
let client = mock_client_with_state();
let provider = MockWarpSyncProvider::<Block>::new();
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], SyncingAction::Finished));
assert!(warp_sync.take_result().is_none());
}
#[test]
fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
let client = mock_client_with_state();
let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
1,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], SyncingAction::Finished));
assert!(warp_sync.take_result().is_none());
}
#[test]
fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
let client = mock_client_without_state();
let provider = MockWarpSyncProvider::<Block>::new();
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
assert_eq!(warp_sync.actions(&network_handle).count(), 0)
}
#[test]
fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
let client = mock_client_without_state();
let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
1,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
assert_eq!(warp_sync.actions(&network_handle).count(), 0)
}
#[test]
fn warp_sync_is_started_only_when_there_is_enough_peers() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
}
warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
}
#[test]
fn no_peer_is_scheduled_if_no_peers_connected() {
let client = mock_client_without_state();
let provider = MockWarpSyncProvider::<Block>::new();
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
}
#[test]
fn enough_peers_are_used_in_tests() {
assert!(
10 >= MIN_PEERS_TO_START_WARP_SYNC,
"Tests must be updated to use that many initial peers.",
);
}
#[test]
fn at_least_median_synced_peer_is_scheduled() {
for _ in 0..100 {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
}
}
#[test]
fn min_best_number_peer_is_scheduled() {
for _ in 0..10 {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
}
}
#[test]
fn backedoff_number_peer_is_not_scheduled() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
let ninth_peer =
*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
let tenth_peer =
*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
warp_sync.remove_peer(&tenth_peer);
assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
warp_sync.add_peer(tenth_peer, H256::random(), 10);
let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
assert_eq!(tenth_peer, peer_id.unwrap());
warp_sync.remove_peer(&tenth_peer);
assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
warp_sync.add_peer(tenth_peer, H256::random(), 10);
let peer_id: Option<PeerId> =
warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
assert!(peer_id.is_none());
let peer_id: Option<PeerId> =
warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
assert_eq!(ninth_peer, peer_id.unwrap());
}
#[test]
fn no_warp_proof_request_in_another_phase() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
Arc::new(client),
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
));
assert!(warp_sync.warp_proof_request().is_none());
}
#[test]
fn warp_proof_request_starts_at_last_hash() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
Arc::new(client),
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
let known_last_hash = Hash::random();
match &mut warp_sync.phase {
Phase::WarpProof { last_hash, .. } => {
*last_hash = known_last_hash;
},
_ => panic!("Invalid phase."),
}
let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
assert_eq!(request.begin, known_last_hash);
}
#[test]
fn no_parallel_warp_proof_requests() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
Arc::new(client),
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
assert!(warp_sync.warp_proof_request().is_some());
assert!(warp_sync.warp_proof_request().is_none());
}
#[test]
fn bad_warp_proof_response_drops_peer() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
Arc::new(client),
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
panic!("Invalid action");
};
warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
let actions = std::mem::take(&mut warp_sync.actions);
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
));
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
}
#[test]
fn partial_warp_proof_doesnt_advance_phase() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
provider.expect_verify().return_once(|_proof, set_id, authorities| {
Ok(VerificationResult::Partial(set_id, authorities, Hash::random()))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
Arc::new(client),
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
panic!("Invalid action");
};
warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
assert!(warp_sync.actions.is_empty(), "No extra actions generated");
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
}
#[test]
fn complete_warp_proof_advances_phase() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync = WarpSync::new(
client,
config,
Some(ProtocolName::Static("")),
Arc::new(MockBlockDownloader::new()),
);
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
panic!("Invalid action.");
};
warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
assert!(warp_sync.actions.is_empty(), "No extra actions generated.");
assert!(
matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
);
}
#[test]
fn no_target_block_requests_in_another_phase() {
let client = mock_client_without_state();
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(Arc::new(client), config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
assert!(warp_sync.target_block_request().is_none());
}
#[test]
fn target_block_request_is_correct() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
let (_peer_id, request) = warp_sync.target_block_request().unwrap();
assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
assert_eq!(
request.fields,
BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
);
assert_eq!(request.max, Some(1));
}
#[test]
fn externally_set_target_block_is_requested() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let target_header = target_block.header().clone();
let config = WarpSyncConfig::WithTarget(target_header);
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
let (_peer_id, request) = warp_sync.target_block_request().unwrap();
assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
assert_eq!(
request.fields,
BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
);
assert_eq!(request.max, Some(1));
}
#[test]
fn no_parallel_target_block_requests() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
assert!(warp_sync.target_block_request().is_some());
assert!(warp_sync.target_block_request().is_none());
}
#[test]
fn target_block_response_with_no_blocks_drops_peer() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
let (peer_id, request) = warp_sync.target_block_request().unwrap();
let response = Vec::new();
assert!(matches!(
warp_sync.on_block_response_inner(peer_id, request, response),
Err(BadPeer(id, _rep)) if id == peer_id,
));
}
#[test]
fn target_block_response_with_extra_blocks_drops_peer() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap();
extra_block_builder
.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
.unwrap();
let extra_block = extra_block_builder.build().unwrap().block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
let (peer_id, request) = warp_sync.target_block_request().unwrap();
let response = vec![
BlockData::<Block> {
hash: target_block.header().hash(),
header: Some(target_block.header().clone()),
body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
},
BlockData::<Block> {
hash: extra_block.header().hash(),
header: Some(extra_block.header().clone()),
body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
},
];
assert!(matches!(
warp_sync.on_block_response_inner(peer_id, request, response),
Err(BadPeer(id, _rep)) if id == peer_id,
));
}
#[test]
fn target_block_response_with_wrong_block_drops_peer() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let target_block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap();
wrong_block_builder
.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
.unwrap();
let wrong_block = wrong_block_builder.build().unwrap().block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
let (peer_id, request) = warp_sync.target_block_request().unwrap();
let response = vec![BlockData::<Block> {
hash: wrong_block.header().hash(),
header: Some(wrong_block.header().clone()),
body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
}];
assert!(matches!(
warp_sync.on_block_response_inner(peer_id, request, response),
Err(BadPeer(id, _rep)) if id == peer_id,
));
}
#[test]
fn correct_target_block_response_sets_strategy_result() {
let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
let mut provider = MockWarpSyncProvider::<Block>::new();
provider
.expect_current_authorities()
.once()
.return_const(AuthorityList::default());
let mut target_block_builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap();
target_block_builder
.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
.unwrap();
let target_block = target_block_builder.build().unwrap().block;
let target_header = target_block.header().clone();
provider.expect_verify().return_once(move |_proof, set_id, authorities| {
Ok(VerificationResult::Complete(set_id, authorities, target_header))
});
let config = WarpSyncConfig::WithProvider(Arc::new(provider));
let mut warp_sync =
WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()));
for best_number in 1..11 {
warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
}
warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
let (peer_id, request) = warp_sync.target_block_request().unwrap();
let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
let response = vec![BlockData::<Block> {
hash: target_block.header().hash(),
header: Some(target_block.header().clone()),
body: body.clone(),
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: justifications.clone(),
}];
assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], SyncingAction::Finished));
let result = warp_sync.take_result().unwrap();
assert_eq!(result.target_header, *target_block.header());
assert_eq!(result.target_body, body);
assert_eq!(result.target_justifications, justifications);
}
}