use crate::{
block_relay_protocol::{BlockDownloader, BlockResponseError},
blocks::BlockCollection,
justification_requests::ExtraRequests,
schema::v1::{StateRequest, StateResponse},
service::network::NetworkServiceHandle,
strategy::{
disconnected_peers::DisconnectedPeers,
state_sync::{ImportResult, StateSync, StateSyncProvider},
warp::{WarpSyncPhase, WarpSyncProgress},
StrategyKey, SyncingAction, SyncingStrategy,
},
types::{BadPeer, SyncState, SyncStatus},
LOG_TARGET,
};
use futures::{channel::oneshot, FutureExt};
use log::{debug, error, info, trace, warn};
use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
use prost::Message;
use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_network::{IfDisconnected, ProtocolName};
use sc_network_common::sync::message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
};
use sc_network_types::PeerId;
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
traits::{
Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
},
EncodedJustification, Justifications,
};
use std::{
any::Any,
collections::{HashMap, HashSet},
ops::Range,
sync::Arc,
};
#[cfg(test)]
mod test;
const MAX_IMPORTING_BLOCKS: usize = 2048;
const MAX_DOWNLOAD_AHEAD: u32 = 2048;
const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
const MAJOR_SYNC_BLOCKS: u8 = 5;
mod rep {
use sc_network::ReputationChange as Rep;
pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}
struct Metrics {
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
}
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
queued_blocks: {
let g =
Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
register(g, r)?
},
fork_targets: {
let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
register(g, r)?
},
})
}
}
#[derive(Debug, Clone)]
enum AllowedRequests {
Some(HashSet<PeerId>),
All,
}
impl AllowedRequests {
fn add(&mut self, id: &PeerId) {
if let Self::Some(ref mut set) = self {
set.insert(*id);
}
}
fn take(&mut self) -> Self {
std::mem::take(self)
}
fn set_all(&mut self) {
*self = Self::All;
}
fn contains(&self, id: &PeerId) -> bool {
match self {
Self::Some(set) => set.contains(id),
Self::All => true,
}
}
fn is_empty(&self) -> bool {
match self {
Self::Some(set) => set.is_empty(),
Self::All => false,
}
}
fn clear(&mut self) {
std::mem::take(self);
}
}
impl Default for AllowedRequests {
fn default() -> Self {
Self::Some(HashSet::default())
}
}
struct GapSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
target: NumberFor<B>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ChainSyncMode {
Full,
LightState {
skip_proofs: bool,
storage_chain_mode: bool,
},
}
#[derive(Debug, Clone)]
pub(crate) struct PeerSync<B: BlockT> {
pub peer_id: PeerId,
pub common_number: NumberFor<B>,
pub best_hash: B::Hash,
pub best_number: NumberFor<B>,
pub state: PeerSyncState<B>,
}
impl<B: BlockT> PeerSync<B> {
fn update_common_number(&mut self, new_common: NumberFor<B>) {
if self.common_number < new_common {
trace!(
target: LOG_TARGET,
"Updating peer {} common number from={} => to={}.",
self.peer_id,
self.common_number,
new_common,
);
self.common_number = new_common;
}
}
}
struct ForkTarget<B: BlockT> {
number: NumberFor<B>,
parent_hash: Option<B::Hash>,
peers: HashSet<PeerId>,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub(crate) enum PeerSyncState<B: BlockT> {
Available,
AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
DownloadingNew(NumberFor<B>),
DownloadingStale(B::Hash),
DownloadingJustification(B::Hash),
DownloadingState,
DownloadingGap(NumberFor<B>),
}
impl<B: BlockT> PeerSyncState<B> {
pub fn is_available(&self) -> bool {
matches!(self, Self::Available)
}
}
pub struct ChainSync<B: BlockT, Client> {
client: Arc<Client>,
peers: HashMap<PeerId, PeerSync<B>>,
disconnected_peers: DisconnectedPeers,
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
mode: ChainSyncMode,
extra_justifications: ExtraRequests<B>,
queue_blocks: HashSet<B::Hash>,
pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
allowed_requests: AllowedRequests,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
state_request_protocol_name: ProtocolName,
downloaded_blocks: usize,
state_sync: Option<StateSync<B, Client>>,
import_existing: bool,
block_downloader: Arc<dyn BlockDownloader<B>>,
gap_sync: Option<GapSync<B>>,
actions: Vec<SyncingAction<B>>,
metrics: Option<Metrics>,
}
impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
match self.add_peer_inner(peer_id, best_hash, best_number) {
Ok(Some(request)) => {
let action = self.create_block_request_action(peer_id, request);
self.actions.push(action);
},
Ok(None) => {},
Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
}
}
fn remove_peer(&mut self, peer_id: &PeerId) {
self.blocks.clear_peer_download(peer_id);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(peer_id)
}
if let Some(state) = self.peers.remove(peer_id) {
if !state.state.is_available() {
if let Some(bad_peer) =
self.disconnected_peers.on_disconnect_during_request(*peer_id)
{
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
}
}
self.extra_justifications.peer_disconnected(peer_id);
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
target.peers.remove(peer_id);
!target.peers.is_empty()
});
if let Some(metrics) = &self.metrics {
metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
}
let blocks = self.ready_blocks();
if !blocks.is_empty() {
self.validate_and_queue_blocks(blocks, false);
}
}
fn on_validated_block_announce(
&mut self,
is_best: bool,
peer_id: PeerId,
announce: &BlockAnnounce<B::Header>,
) -> Option<(B::Hash, NumberFor<B>)> {
let number = *announce.header.number();
let hash = announce.header.hash();
let parent_status =
self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
let known_parent = parent_status != BlockStatus::Unknown;
let ancient_parent = parent_status == BlockStatus::InChainPruned;
let known = self.is_known(&hash);
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
peer
} else {
error!(target: LOG_TARGET, "๐ Called `on_validated_block_announce` with a bad peer ID {peer_id}");
return Some((hash, number))
};
if let PeerSyncState::AncestorSearch { .. } = peer.state {
trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
return None
}
let peer_info = is_best.then(|| {
peer.best_number = number;
peer.best_hash = hash;
(hash, number)
});
if is_best {
if known && self.best_queued_number >= number {
self.update_peer_common_number(&peer_id, number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
{
self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
}
}
self.allowed_requests.add(&peer_id);
if known || self.is_already_downloading(&hash) {
trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
target.peers.insert(peer_id);
}
return peer_info
}
if ancient_parent {
trace!(
target: LOG_TARGET,
"Ignored ancient block announced from {}: {} {:?}",
peer_id,
hash,
announce.header,
);
return peer_info
}
if self.status().state == SyncState::Idle {
trace!(
target: LOG_TARGET,
"Added sync target for block announced from {}: {} {:?}",
peer_id,
hash,
announce.summary(),
);
self.fork_targets
.entry(hash)
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}
ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
}
})
.peers
.insert(peer_id);
}
peer_info
}
fn set_sync_fork_request(
&mut self,
mut peers: Vec<PeerId>,
hash: &B::Hash,
number: NumberFor<B>,
) {
if peers.is_empty() {
peers = self
.peers
.iter()
.filter(|(_, peer)| peer.best_number >= number)
.map(|(id, _)| *id)
.collect();
debug!(
target: LOG_TARGET,
"Explicit sync request for block {hash:?} with no peers specified. \
Syncing from these peers {peers:?} instead.",
);
} else {
debug!(
target: LOG_TARGET,
"Explicit sync request for block {hash:?} with {peers:?}",
);
}
if self.is_known(hash) {
debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
return
}
trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
for peer_id in &peers {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::AncestorSearch { .. } = peer.state {
continue
}
if number > peer.best_number {
peer.best_number = number;
peer.best_hash = *hash;
}
self.allowed_requests.add(peer_id);
}
}
self.fork_targets
.entry(*hash)
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}
ForkTarget { number, peers: Default::default(), parent_hash: None }
})
.peers
.extend(peers);
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let client = &self.client;
self.extra_justifications
.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
}
fn clear_justification_requests(&mut self) {
self.extra_justifications.reset();
}
fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications
.try_finalize_root((hash, number), finalization_result, true);
self.allowed_requests.set_all();
}
fn on_generic_response(
&mut self,
peer_id: &PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
response: Box<dyn Any + Send>,
) {
if Self::STRATEGY_KEY != key {
warn!(
target: LOG_TARGET,
"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
);
debug_assert!(false);
return;
}
if protocol_name == self.state_request_protocol_name {
let Ok(response) = response.downcast::<Vec<u8>>() else {
warn!(target: LOG_TARGET, "Failed to downcast state response");
debug_assert!(false);
return;
};
if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
} else if &protocol_name == self.block_downloader.protocol_name() {
let Ok(response) = response
.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
else {
warn!(target: LOG_TARGET, "Failed to downcast block response");
debug_assert!(false);
return;
};
let (request, response) = *response;
let blocks = match response {
Ok(blocks) => blocks,
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to decode block response from peer {:?}: {:?}.",
peer_id,
e
);
self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
return;
},
Err(BlockResponseError::ExtractionFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to extract blocks from peer response {:?}: {:?}.",
peer_id,
e
);
self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
return;
},
};
if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
} else {
warn!(
target: LOG_TARGET,
"Unexpected generic response protocol {protocol_name}, strategy key \
{key:?}",
);
debug_assert!(false);
}
}
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) {
trace!(target: LOG_TARGET, "Imported {imported} of {count}");
let mut has_error = false;
for (_, hash) in &results {
if self.queue_blocks.remove(hash) {
if let Some(metrics) = &self.metrics {
metrics.queued_blocks.dec();
}
}
self.blocks.clear_queued(hash);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_queued(hash);
}
}
for (result, hash) in results {
if has_error {
break
}
has_error |= result.is_err();
match result {
Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
if let Some(peer) = peer_id {
self.update_peer_common_number(&peer, number);
},
Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
if aux.clear_justification_requests {
trace!(
target: LOG_TARGET,
"Block imported clears all pending justification requests {number}: {hash:?}",
);
self.clear_justification_requests();
}
if aux.needs_justification {
trace!(
target: LOG_TARGET,
"Block imported but requires justification {number}: {hash:?}",
);
self.request_justification(&hash, number);
}
if aux.bad_justification {
if let Some(ref peer) = peer_id {
warn!("๐ Sent block with bad justification to import");
self.actions.push(SyncingAction::DropPeer(BadPeer(
*peer,
rep::BAD_JUSTIFICATION,
)));
}
}
if let Some(peer) = peer_id {
self.update_peer_common_number(&peer, number);
}
let state_sync_complete =
self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
if state_sync_complete {
info!(
target: LOG_TARGET,
"State sync is complete ({} MiB), restarting block sync.",
self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
);
self.state_sync = None;
self.mode = ChainSyncMode::Full;
self.restart();
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: LOG_TARGET,
"Block history download is complete."
);
self.gap_sync = None;
}
},
Err(BlockImportError::IncompleteHeader(peer_id)) =>
if let Some(peer) = peer_id {
warn!(
target: LOG_TARGET,
"๐ Peer sent block with incomplete header to import",
);
self.actions
.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
self.restart();
},
Err(BlockImportError::VerificationFailed(peer_id, e)) => {
let extra_message = peer_id
.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
warn!(
target: LOG_TARGET,
"๐ Verification failed for block {hash:?}{extra_message}: {e:?}",
);
if let Some(peer) = peer_id {
self.actions
.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
}
self.restart();
},
Err(BlockImportError::BadBlock(peer_id)) =>
if let Some(peer) = peer_id {
warn!(
target: LOG_TARGET,
"๐ Block {hash:?} received from peer {peer} has been blacklisted",
);
self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
},
Err(BlockImportError::MissingState) => {
trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
},
e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
warn!(target: LOG_TARGET, "๐ Error importing block {hash:?}: {}", e.unwrap_err());
self.state_sync = None;
self.restart();
},
Err(BlockImportError::Cancelled) => {},
};
}
self.allowed_requests.set_all();
}
fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let client = &self.client;
let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
is_descendent_of(&**client, base, block)
});
if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
if self.state_sync.is_none() {
if !self.peers.is_empty() && self.queue_blocks.is_empty() {
self.attempt_state_sync(*hash, number, *skip_proofs);
} else {
self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
}
}
}
if let Err(err) = r {
warn!(
target: LOG_TARGET,
"๐ Error cleaning up pending extra justification data requests: {err}",
);
}
}
fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
self.on_block_queued(best_hash, best_number);
}
fn is_major_syncing(&self) -> bool {
self.status().state.is_major_syncing()
}
fn num_peers(&self) -> usize {
self.peers.len()
}
fn status(&self) -> SyncStatus<B> {
let median_seen = self.median_seen();
let best_seen_block =
median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
let sync_state = if let Some(target) = median_seen {
let best_block = self.client.info().best_number;
if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
if target > self.best_queued_number {
SyncState::Downloading { target }
} else {
SyncState::Importing { target }
}
} else {
SyncState::Idle
}
} else {
SyncState::Idle
};
let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
total_bytes: 0,
});
SyncStatus {
state: sync_state,
best_seen_block,
num_peers: self.peers.len() as u32,
queued_blocks: self.queue_blocks.len() as u32,
state_sync: self.state_sync.as_ref().map(|s| s.progress()),
warp_sync: warp_sync_progress,
}
}
fn num_downloaded_blocks(&self) -> usize {
self.downloaded_blocks
}
fn num_sync_requests(&self) -> usize {
self.fork_targets
.values()
.filter(|f| f.number <= self.best_queued_number)
.count()
}
fn actions(
&mut self,
network_service: &NetworkServiceHandle,
) -> Result<Vec<SyncingAction<B>>, ClientError> {
if !self.peers.is_empty() && self.queue_blocks.is_empty() {
if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
self.attempt_state_sync(hash, number, skip_proofs);
}
}
let block_requests = self
.block_requests()
.into_iter()
.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
.collect::<Vec<_>>();
self.actions.extend(block_requests);
let justification_requests = self
.justification_requests()
.into_iter()
.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
.collect::<Vec<_>>();
self.actions.extend(justification_requests);
let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
trace!(
target: LOG_TARGET,
"Created `StrategyRequest` to {peer_id}.",
);
let (tx, rx) = oneshot::channel();
network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
request.encode_to_vec(),
tx,
IfDisconnected::ImmediateError,
);
SyncingAction::StartRequest {
peer_id,
key: Self::STRATEGY_KEY,
request: async move {
Ok(rx.await?.and_then(|(response, protocol_name)| {
Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
}))
}
.boxed(),
remove_obsolete: false,
}
});
self.actions.extend(state_request);
Ok(std::mem::take(&mut self.actions))
}
}
impl<B, Client> ChainSync<B, Client>
where
B: BlockT,
Client: HeaderBackend<B>
+ BlockBackend<B>
+ HeaderMetadata<B, Error = sp_blockchain::Error>
+ ProofProvider<B>
+ Send
+ Sync
+ 'static,
{
pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
pub fn new(
mode: ChainSyncMode,
client: Arc<Client>,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
state_request_protocol_name: ProtocolName,
block_downloader: Arc<dyn BlockDownloader<B>>,
metrics_registry: Option<&Registry>,
initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
) -> Result<Self, ClientError> {
let mut sync = Self {
client,
peers: HashMap::new(),
disconnected_peers: DisconnectedPeers::new(),
blocks: BlockCollection::new(),
best_queued_hash: Default::default(),
best_queued_number: Zero::zero(),
extra_justifications: ExtraRequests::new("justification", metrics_registry),
mode,
queue_blocks: Default::default(),
pending_state_sync_attempt: None,
fork_targets: Default::default(),
allowed_requests: Default::default(),
max_parallel_downloads,
max_blocks_per_request,
state_request_protocol_name,
downloaded_blocks: 0,
state_sync: None,
import_existing: false,
block_downloader,
gap_sync: None,
actions: Vec::new(),
metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
Ok(metrics) => Some(metrics),
Err(err) => {
log::error!(
target: LOG_TARGET,
"Failed to register `ChainSync` metrics {err:?}",
);
None
},
}),
};
sync.reset_sync_start_point()?;
initial_peers.for_each(|(peer_id, best_hash, best_number)| {
sync.add_peer(peer_id, best_hash, best_number);
});
Ok(sync)
}
#[must_use]
fn add_peer_inner(
&mut self,
peer_id: PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
) -> Result<Option<BlockRequest<B>>, BadPeer> {
match self.block_status(&best_hash) {
Err(e) => {
debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
},
Ok(BlockStatus::KnownBad) => {
info!(
"๐ New peer {peer_id} with known bad best block {best_hash} ({best_number})."
);
Err(BadPeer(peer_id, rep::BAD_BLOCK))
},
Ok(BlockStatus::Unknown) => {
if best_number.is_zero() {
info!(
"๐ New peer {} with unknown genesis hash {} ({}).",
peer_id, best_hash, best_number,
);
return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
}
if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() {
debug!(
target: LOG_TARGET,
"New peer {} with unknown best hash {} ({}), assuming common block.",
peer_id,
self.best_queued_hash,
self.best_queued_number
);
self.peers.insert(
peer_id,
PeerSync {
peer_id,
common_number: self.best_queued_number,
best_hash,
best_number,
state: PeerSyncState::Available,
},
);
return Ok(None);
}
let (state, req) = if self.best_queued_number.is_zero() {
debug!(
target: LOG_TARGET,
"New peer {peer_id} with best hash {best_hash} ({best_number}).",
);
(PeerSyncState::Available, None)
} else {
let common_best = std::cmp::min(self.best_queued_number, best_number);
debug!(
target: LOG_TARGET,
"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
peer_id,
best_hash,
best_number
);
(
PeerSyncState::AncestorSearch {
current: common_best,
start: self.best_queued_number,
state: AncestorSearchState::ExponentialBackoff(One::one()),
},
Some(ancestry_request::<B>(common_best)),
)
};
self.allowed_requests.add(&peer_id);
self.peers.insert(
peer_id,
PeerSync {
peer_id,
common_number: Zero::zero(),
best_hash,
best_number,
state,
},
);
Ok(req)
},
Ok(BlockStatus::Queued) |
Ok(BlockStatus::InChainWithState) |
Ok(BlockStatus::InChainPruned) => {
debug!(
target: LOG_TARGET,
"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
);
self.peers.insert(
peer_id,
PeerSync {
peer_id,
common_number: std::cmp::min(self.best_queued_number, best_number),
best_hash,
best_number,
state: PeerSyncState::Available,
},
);
self.allowed_requests.add(&peer_id);
Ok(None)
},
}
}
fn create_block_request_action(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
) -> SyncingAction<B> {
let downloader = self.block_downloader.clone();
SyncingAction::StartRequest {
peer_id,
key: Self::STRATEGY_KEY,
request: async move {
Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
|(response, protocol_name)| {
let decoded_response =
downloader.block_response_into_blocks(&request, response);
let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
Ok((result, protocol_name))
},
))
}
.boxed(),
remove_obsolete: true,
}
}
#[must_use]
fn on_block_data(
&mut self,
peer_id: &PeerId,
request: Option<BlockRequest<B>>,
response: BlockResponse<B>,
) -> Result<(), BadPeer> {
self.downloaded_blocks += response.blocks.len();
let mut gap = false;
let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
trace!(target: LOG_TARGET, "Reversing incoming block list");
blocks.reverse()
}
self.allowed_requests.add(peer_id);
if let Some(request) = request {
match &mut peer.state {
PeerSyncState::DownloadingNew(_) => {
self.blocks.clear_peer_download(peer_id);
peer.state = PeerSyncState::Available;
if let Some(start_block) =
validate_blocks::<B>(&blocks, peer_id, Some(request))?
{
self.blocks.insert(start_block, blocks, *peer_id);
}
self.ready_blocks()
},
PeerSyncState::DownloadingGap(_) => {
peer.state = PeerSyncState::Available;
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(peer_id);
if let Some(start_block) =
validate_blocks::<B>(&blocks, peer_id, Some(request))?
{
gap_sync.blocks.insert(start_block, blocks, *peer_id);
}
gap = true;
let blocks: Vec<_> = gap_sync
.blocks
.ready_blocks(gap_sync.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications =
block_data.block.justifications.or_else(|| {
legacy_justification_mapping(
block_data.block.justification,
)
});
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
indexed_body: block_data.block.indexed_body,
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: self.import_existing,
skip_execution: true,
state: None,
}
})
.collect();
debug!(
target: LOG_TARGET,
"Drained {} gap blocks from {}",
blocks.len(),
gap_sync.best_queued_number,
);
blocks
} else {
debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
return Err(BadPeer(*peer_id, rep::NO_BLOCK));
}
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
if blocks.is_empty() {
debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
return Err(BadPeer(*peer_id, rep::NO_BLOCK));
}
validate_blocks::<B>(&blocks, peer_id, Some(request))?;
blocks
.into_iter()
.map(|b| {
let justifications = b
.justifications
.or_else(|| legacy_justification_mapping(b.justification));
IncomingBlock {
hash: b.hash,
header: b.header,
body: b.body,
indexed_body: None,
justifications,
origin: Some(*peer_id),
allow_missing_state: true,
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
state: None,
}
})
.collect()
},
PeerSyncState::AncestorSearch { current, start, state } => {
let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
(Some(block), Ok(maybe_our_block_hash)) => {
trace!(
target: LOG_TARGET,
"Got ancestry block #{} ({}) from peer {}",
current,
block.hash,
peer_id,
);
maybe_our_block_hash.filter(|x| x == &block.hash)
},
(None, _) => {
debug!(
target: LOG_TARGET,
"Invalid response when searching for ancestor from {peer_id}",
);
return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
},
(_, Err(e)) => {
info!(
target: LOG_TARGET,
"โ Error answering legitimate blockchain query: {e}",
);
return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
},
};
if matching_hash.is_some() {
if *start < self.best_queued_number &&
self.best_queued_number <= peer.best_number
{
trace!(
target: LOG_TARGET,
"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
*peer_id,
peer.common_number,
self.best_queued_number,
);
peer.common_number = self.best_queued_number;
} else if peer.common_number < *current {
trace!(
target: LOG_TARGET,
"Ancestry search: updating peer {} common number from={} => to={}.",
*peer_id,
peer.common_number,
*current,
);
peer.common_number = *current;
}
}
if matching_hash.is_none() && current.is_zero() {
trace!(
target: LOG_TARGET,
"Ancestry search: genesis mismatch for peer {peer_id}",
);
return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
}
if let Some((next_state, next_num)) =
handle_ancestor_search_state(state, *current, matching_hash.is_some())
{
peer.state = PeerSyncState::AncestorSearch {
current: next_num,
start: *start,
state: next_state,
};
let request = ancestry_request::<B>(next_num);
let action = self.create_block_request_action(*peer_id, request);
self.actions.push(action);
return Ok(());
} else {
trace!(
target: LOG_TARGET,
"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
self.best_queued_hash,
self.best_queued_number,
peer.best_hash,
peer.best_number,
matching_hash,
peer.common_number,
);
if peer.common_number < peer.best_number &&
peer.best_number < self.best_queued_number
{
trace!(
target: LOG_TARGET,
"Added fork target {} for {}",
peer.best_hash,
peer_id,
);
self.fork_targets
.entry(peer.best_hash)
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}
ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
}
})
.peers
.insert(*peer_id);
}
peer.state = PeerSyncState::Available;
return Ok(());
}
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState => Vec::new(),
}
} else {
validate_blocks::<B>(&blocks, peer_id, None)?;
blocks
.into_iter()
.map(|b| {
let justifications = b
.justifications
.or_else(|| legacy_justification_mapping(b.justification));
IncomingBlock {
hash: b.hash,
header: b.header,
body: b.body,
indexed_body: None,
justifications,
origin: Some(*peer_id),
allow_missing_state: true,
import_existing: false,
skip_execution: true,
state: None,
}
})
.collect()
}
} else {
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
};
self.validate_and_queue_blocks(new_blocks, gap);
Ok(())
}
fn on_block_response(
&mut self,
peer_id: &PeerId,
key: StrategyKey,
request: BlockRequest<B>,
blocks: Vec<BlockData<B>>,
) -> Result<(), BadPeer> {
if key != Self::STRATEGY_KEY {
error!(
target: LOG_TARGET,
"`on_block_response()` called with unexpected key {key:?} for chain sync",
);
debug_assert!(false);
}
let block_response = BlockResponse::<B> { id: request.id, blocks };
let blocks_range = || match (
block_response
.blocks
.first()
.and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(
target: LOG_TARGET,
"BlockResponse {} from {} with {} blocks {}",
block_response.id,
peer_id,
block_response.blocks.len(),
blocks_range(),
);
if request.fields == BlockAttributes::JUSTIFICATION {
self.on_block_justification(*peer_id, block_response)
} else {
self.on_block_data(peer_id, Some(request), block_response)
}
}
#[must_use]
fn on_block_justification(
&mut self,
peer_id: PeerId,
response: BlockResponse<B>,
) -> Result<(), BadPeer> {
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
peer
} else {
error!(
target: LOG_TARGET,
"๐ Called on_block_justification with a peer ID of an unknown peer",
);
return Ok(());
};
self.allowed_requests.add(&peer_id);
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
let justification = if let Some(block) = response.blocks.into_iter().next() {
if hash != block.hash {
warn!(
target: LOG_TARGET,
"๐ Invalid block justification provided by {}: requested: {:?} got: {:?}",
peer_id,
hash,
block.hash,
);
return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
}
block
.justifications
.or_else(|| legacy_justification_mapping(block.justification))
} else {
trace!(
target: LOG_TARGET,
"Peer {peer_id:?} provided empty response for justification request {hash:?}",
);
None
};
if let Some((peer_id, hash, number, justifications)) =
self.extra_justifications.on_response(peer_id, justification)
{
self.actions.push(SyncingAction::ImportJustifications {
peer_id,
hash,
number,
justifications,
});
return Ok(());
}
}
Ok(())
}
fn median_seen(&self) -> Option<NumberFor<B>> {
let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
if best_seens.is_empty() {
None
} else {
let middle = best_seens.len() / 2;
Some(*best_seens.select_nth_unstable(middle).1)
}
}
fn required_block_attributes(&self) -> BlockAttributes {
match self.mode {
ChainSyncMode::Full =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
BlockAttributes::HEADER |
BlockAttributes::JUSTIFICATION |
BlockAttributes::INDEXED_BODY,
}
}
fn skip_execution(&self) -> bool {
match self.mode {
ChainSyncMode::Full => false,
ChainSyncMode::LightState { .. } => true,
}
}
fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
debug!(
target: LOG_TARGET,
"Ignoring {} blocks that are already queued",
orig_len - new_blocks.len(),
);
}
let origin = if !gap && !self.status().state.is_major_syncing() {
BlockOrigin::NetworkBroadcast
} else {
BlockOrigin::NetworkInitialSync
};
if let Some((h, n)) = new_blocks
.last()
.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
{
trace!(
target: LOG_TARGET,
"Accepted {} blocks ({:?}) with origin {:?}",
new_blocks.len(),
h,
origin,
);
self.on_block_queued(h, n)
}
self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
if let Some(metrics) = &self.metrics {
metrics
.queued_blocks
.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
}
self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
}
fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.update_common_number(new_common);
}
}
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if self.fork_targets.remove(hash).is_some() {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.dec();
}
trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
}
if let Some(gap_sync) = &mut self.gap_sync {
if number > gap_sync.best_queued_number && number <= gap_sync.target {
gap_sync.best_queued_number = number;
}
}
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = *hash;
for (n, peer) in self.peers.iter_mut() {
if let PeerSyncState::AncestorSearch { .. } = peer.state {
continue;
}
let new_common_number =
if peer.best_number >= number { number } else { peer.best_number };
trace!(
target: LOG_TARGET,
"Updating peer {} info, ours={}, common={}->{}, their best={}",
n,
number,
peer.common_number,
new_common_number,
peer.best_number,
);
peer.common_number = new_common_number;
}
}
self.allowed_requests.set_all();
}
fn restart(&mut self) {
self.blocks.clear();
if let Err(e) = self.reset_sync_start_point() {
warn!(target: LOG_TARGET, "๐ Unable to restart sync: {e}");
}
self.allowed_requests.set_all();
debug!(
target: LOG_TARGET,
"Restarted with {} ({})",
self.best_queued_number,
self.best_queued_hash,
);
let old_peers = std::mem::take(&mut self.peers);
old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
match peer_sync.state {
PeerSyncState::Available => {
self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
},
PeerSyncState::AncestorSearch { .. } |
PeerSyncState::DownloadingNew(_) |
PeerSyncState::DownloadingStale(_) |
PeerSyncState::DownloadingGap(_) |
PeerSyncState::DownloadingState => {
self.actions
.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
},
PeerSyncState::DownloadingJustification(_) => {
trace!(
target: LOG_TARGET,
"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
peer_id,
peer_sync.common_number,
self.best_queued_number,
);
peer_sync.common_number = self.best_queued_number;
self.peers.insert(peer_id, peer_sync);
},
}
});
}
fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
let info = self.client.info();
if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
warn!(
target: LOG_TARGET,
"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
);
self.mode = ChainSyncMode::Full;
}
self.import_existing = false;
self.best_queued_hash = info.best_hash;
self.best_queued_number = info.best_number;
if self.mode == ChainSyncMode::Full &&
self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
{
self.import_existing = true;
if let Some((hash, number)) = info.finalized_state {
debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
self.best_queued_hash = hash;
self.best_queued_number = number;
} else {
debug!(target: LOG_TARGET, "Restarting from genesis");
self.best_queued_hash = Default::default();
self.best_queued_number = Zero::zero();
}
}
if let Some(BlockGap { start, end, .. }) = info.block_gap {
debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
self.gap_sync = Some(GapSync {
best_queued_number: start - One::one(),
target: end,
blocks: BlockCollection::new(),
});
}
trace!(
target: LOG_TARGET,
"Restarted sync at #{} ({:?})",
self.best_queued_number,
self.best_queued_hash,
);
Ok(())
}
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
if self.queue_blocks.contains(hash) {
return Ok(BlockStatus::Queued);
}
self.client.block_status(*hash)
}
fn is_known(&self, hash: &B::Hash) -> bool {
self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
fn is_already_downloading(&self, hash: &B::Hash) -> bool {
self.peers
.iter()
.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
}
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
self.blocks
.ready_blocks(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications = block_data
.block
.justifications
.or_else(|| legacy_justification_mapping(block_data.block.justification));
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
indexed_body: block_data.block.indexed_body,
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
state: None,
}
})
.collect()
}
fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
let peers = &mut self.peers;
let mut matcher = self.extra_justifications.matcher();
std::iter::from_fn(move || {
if let Some((peer, request)) = matcher.next(peers) {
peers
.get_mut(&peer)
.expect(
"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
)
.state = PeerSyncState::DownloadingJustification(request.0);
let req = BlockRequest::<B> {
id: 0,
fields: BlockAttributes::JUSTIFICATION,
from: FromBlock::Hash(request.0),
direction: Direction::Ascending,
max: Some(1),
};
Some((peer, req))
} else {
None
}
})
.collect()
}
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Vec::new();
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: LOG_TARGET, "Too many blocks in the queue.");
return Vec::new();
}
let is_major_syncing = self.status().state.is_major_syncing();
let attrs = self.required_block_attributes();
let blocks = &mut self.blocks;
let fork_targets = &mut self.fork_targets;
let last_finalized =
std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
let best_queued = self.best_queued_number;
let client = &self.client;
let queue_blocks = &self.queue_blocks;
let allowed_requests = self.allowed_requests.clone();
let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
let max_blocks_per_request = self.max_blocks_per_request;
let gap_sync = &mut self.gap_sync;
let disconnected_peers = &mut self.disconnected_peers;
let metrics = self.metrics.as_ref();
let requests = self
.peers
.iter_mut()
.filter_map(move |(&id, peer)| {
if !peer.state.is_available() ||
!allowed_requests.contains(&id) ||
!disconnected_peers.is_peer_available(&id)
{
return None;
}
if best_queued.saturating_sub(peer.common_number) >
MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
best_queued < peer.best_number &&
peer.common_number < last_finalized &&
queue_blocks.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: LOG_TARGET,
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
best_queued,
);
let current = std::cmp::min(peer.best_number, best_queued);
peer.state = PeerSyncState::AncestorSearch {
current,
start: best_queued,
state: AncestorSearchState::ExponentialBackoff(One::one()),
};
Some((id, ancestry_request::<B>(current)))
} else if let Some((range, req)) = peer_block_request(
&id,
peer,
blocks,
attrs,
max_parallel,
max_blocks_per_request,
last_finalized,
best_queued,
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: LOG_TARGET,
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else if let Some((hash, req)) = fork_sync_request(
&id,
fork_targets,
best_queued,
last_finalized,
attrs,
|hash| {
if queue_blocks.contains(hash) {
BlockStatus::Queued
} else {
client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
}
},
max_blocks_per_request,
metrics,
) {
trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
peer.state = PeerSyncState::DownloadingStale(hash);
Some((id, req))
} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
peer_gap_block_request(
&id,
peer,
&mut sync.blocks,
attrs,
sync.target,
sync.best_queued_number,
max_blocks_per_request,
)
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
trace!(
target: LOG_TARGET,
"New gap block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else {
None
}
})
.collect::<Vec<_>>();
if !requests.is_empty() {
self.allowed_requests.take();
}
requests
}
fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if self.allowed_requests.is_empty() {
return None;
}
if self.state_sync.is_some() &&
self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
{
return None;
}
if let Some(sync) = &self.state_sync {
if sync.is_complete() {
return None;
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() &&
peer.common_number >= sync.target_number() &&
self.disconnected_peers.is_peer_available(&id)
{
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
self.allowed_requests.clear();
return Some((*id, request));
}
}
}
None
}
#[must_use]
fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
let response = match StateResponse::decode(response) {
Ok(response) => response,
Err(error) => {
debug!(
target: LOG_TARGET,
"Failed to decode state response from peer {peer_id:?}: {error:?}.",
);
return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
},
};
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::DownloadingState = peer.state {
peer.state = PeerSyncState::Available;
self.allowed_requests.set_all();
}
}
let import_result = if let Some(sync) = &mut self.state_sync {
debug!(
target: LOG_TARGET,
"Importing state data from {} with {} keys, {} proof nodes.",
peer_id,
response.entries.len(),
response.proof.len(),
);
sync.import(response)
} else {
debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
};
match import_result {
ImportResult::Import(hash, header, state, body, justifications) => {
let origin = BlockOrigin::NetworkInitialSync;
let block = IncomingBlock {
hash,
header: Some(header),
body,
indexed_body: None,
justifications,
origin: None,
allow_missing_state: true,
import_existing: true,
skip_execution: self.skip_execution(),
state: Some(state),
};
debug!(target: LOG_TARGET, "State download is complete. Import is queued");
self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
Ok(())
},
ImportResult::Continue => Ok(()),
ImportResult::BadResponse => {
debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
},
}
}
fn attempt_state_sync(
&mut self,
finalized_hash: B::Hash,
finalized_number: NumberFor<B>,
skip_proofs: bool,
) {
let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
heads.sort();
let median = heads[heads.len() / 2];
if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
if let Ok(Some(header)) = self.client.header(finalized_hash) {
log::debug!(
target: LOG_TARGET,
"Starting state sync for #{finalized_number} ({finalized_hash})",
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
self.allowed_requests.set_all();
} else {
log::error!(
target: LOG_TARGET,
"Failed to start state sync: header for finalized block \
#{finalized_number} ({finalized_hash}) is not available",
);
debug_assert!(false);
}
}
}
#[cfg(test)]
#[must_use]
fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
std::mem::take(&mut self.actions).into_iter()
}
}
fn legacy_justification_mapping(
justification: Option<EncodedJustification>,
) -> Option<Justifications> {
justification.map(|just| (*b"FRNK", just).into())
}
fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
BlockRequest::<B> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
from: FromBlock::Number(block),
direction: Direction::Ascending,
max: Some(1),
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub(crate) enum AncestorSearchState<B: BlockT> {
ExponentialBackoff(NumberFor<B>),
BinarySearch(NumberFor<B>, NumberFor<B>),
}
fn handle_ancestor_search_state<B: BlockT>(
state: &AncestorSearchState<B>,
curr_block_num: NumberFor<B>,
block_hash_match: bool,
) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
match state {
AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
let next_distance_to_tip = *next_distance_to_tip;
if block_hash_match && next_distance_to_tip == One::one() {
return None;
}
if block_hash_match {
let left = curr_block_num;
let right = left + next_distance_to_tip / two;
let middle = left + (right - left) / two;
Some((AncestorSearchState::BinarySearch(left, right), middle))
} else {
let next_block_num =
curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
let next_distance_to_tip = next_distance_to_tip * two;
Some((
AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
next_block_num,
))
}
},
AncestorSearchState::BinarySearch(mut left, mut right) => {
if left >= curr_block_num {
return None;
}
if block_hash_match {
left = curr_block_num;
} else {
right = curr_block_num;
}
assert!(right >= left);
let middle = left + (right - left) / two;
if middle == curr_block_num {
None
} else {
Some((AncestorSearchState::BinarySearch(left, right), middle))
}
},
}
}
fn peer_block_request<B: BlockT>(
id: &PeerId,
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: BlockAttributes,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
finalized: NumberFor<B>,
best_num: NumberFor<B>,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
if best_num >= peer.best_number {
return None;
} else if peer.common_number < finalized {
trace!(
target: LOG_TARGET,
"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
id, peer.common_number, finalized, peer.best_number, best_num,
);
}
let range = blocks.needed_blocks(
*id,
max_blocks_per_request,
peer.best_number,
peer.common_number,
max_parallel_downloads,
MAX_DOWNLOAD_AHEAD,
)?;
let last = range.end.saturating_sub(One::one());
let from = if peer.best_number == last {
FromBlock::Hash(peer.best_hash)
} else {
FromBlock::Number(last)
};
let request = BlockRequest::<B> {
id: 0,
fields: attrs,
from,
direction: Direction::Descending,
max: Some((range.end - range.start).saturated_into::<u32>()),
};
Some((range, request))
}
fn peer_gap_block_request<B: BlockT>(
id: &PeerId,
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: BlockAttributes,
target: NumberFor<B>,
common_number: NumberFor<B>,
max_blocks_per_request: u32,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let range = blocks.needed_blocks(
*id,
max_blocks_per_request,
std::cmp::min(peer.best_number, target),
common_number,
1,
MAX_DOWNLOAD_AHEAD,
)?;
let last = range.end.saturating_sub(One::one());
let from = FromBlock::Number(last);
let request = BlockRequest::<B> {
id: 0,
fields: attrs,
from,
direction: Direction::Descending,
max: Some((range.end - range.start).saturated_into::<u32>()),
};
Some((range, request))
}
fn fork_sync_request<B: BlockT>(
id: &PeerId,
fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
max_blocks_per_request: u32,
metrics: Option<&Metrics>,
) -> Option<(B::Hash, BlockRequest<B>)> {
fork_targets.retain(|hash, r| {
if r.number <= finalized {
trace!(
target: LOG_TARGET,
"Removed expired fork sync request {:?} (#{})",
hash,
r.number,
);
return false;
}
if check_block(hash) != BlockStatus::Unknown {
trace!(
target: LOG_TARGET,
"Removed obsolete fork sync request {:?} (#{})",
hash,
r.number,
);
return false;
}
true
});
if let Some(metrics) = metrics {
metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
}
for (hash, r) in fork_targets {
if !r.peers.contains(&id) {
continue;
}
if r.number <= best_num ||
(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
{
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
(r.number - finalized).saturated_into::<u32>() } else {
1
};
trace!(
target: LOG_TARGET,
"Downloading requested fork {hash:?} from {id}, {count} blocks",
);
return Some((
*hash,
BlockRequest::<B> {
id: 0,
fields: attributes,
from: FromBlock::Hash(*hash),
direction: Direction::Descending,
max: Some(count),
},
));
} else {
trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
}
}
None
}
fn is_descendent_of<Block, T>(
client: &T,
base: &Block::Hash,
block: &Block::Hash,
) -> sp_blockchain::Result<bool>
where
Block: BlockT,
T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
{
if base == block {
return Ok(false);
}
let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
Ok(ancestor.hash == *base)
}
pub fn validate_blocks<Block: BlockT>(
blocks: &Vec<BlockData<Block>>,
peer_id: &PeerId,
request: Option<BlockRequest<Block>>,
) -> Result<Option<NumberFor<Block>>, BadPeer> {
if let Some(request) = request {
if Some(blocks.len() as _) > request.max {
debug!(
target: LOG_TARGET,
"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
peer_id,
request.max,
blocks.len(),
);
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
}
let block_header =
if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
.and_then(|b| b.header.as_ref());
let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
FromBlock::Hash(hash) => h.hash() == hash,
FromBlock::Number(n) => h.number() == &n,
});
if !expected_block {
debug!(
target: LOG_TARGET,
"Received block that was not requested. Requested {:?}, got {:?}.",
request.from,
block_header,
);
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
}
if request.fields.contains(BlockAttributes::HEADER) &&
blocks.iter().any(|b| b.header.is_none())
{
trace!(
target: LOG_TARGET,
"Missing requested header for a block in response from {peer_id}.",
);
return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
}
if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
{
trace!(
target: LOG_TARGET,
"Missing requested body for a block in response from {peer_id}.",
);
return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
}
}
for b in blocks {
if let Some(header) = &b.header {
let hash = header.hash();
if hash != b.hash {
debug!(
target: LOG_TARGET,
"Bad header received from {}. Expected hash {:?}, got {:?}",
peer_id,
b.hash,
hash,
);
return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
}
}
}
Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
}