use crate::{
strategy::chain_sync::{PeerSync, PeerSyncState},
LOG_TARGET,
};
use fork_tree::ForkTree;
use log::{debug, trace, warn};
use prometheus_endpoint::{
prometheus::core::GenericGauge, register, GaugeVec, Opts, PrometheusError, Registry, U64,
};
use sc_network_types::PeerId;
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet, VecDeque},
time::{Duration, Instant},
};
const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);
#[derive(Debug)]
struct Metrics {
pending: GenericGauge<U64>,
active: GenericGauge<U64>,
failed: GenericGauge<U64>,
importing: GenericGauge<U64>,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
let justifications = GaugeVec::<U64>::new(
Opts::new(
"substrate_sync_extra_justifications",
"Number of extra justifications requests",
),
&["status"],
)?;
let justifications = register(justifications, registry)?;
Ok(Self {
pending: justifications.with_label_values(&["pending"]),
active: justifications.with_label_values(&["active"]),
failed: justifications.with_label_values(&["failed"]),
importing: justifications.with_label_values(&["importing"]),
})
}
}
#[derive(Debug)]
pub(crate) struct ExtraRequests<B: BlockT> {
tree: ForkTree<B::Hash, NumberFor<B>, ()>,
best_seen_finalized_number: NumberFor<B>,
pending_requests: VecDeque<ExtraRequest<B>>,
active_requests: HashMap<PeerId, ExtraRequest<B>>,
failed_requests: HashMap<ExtraRequest<B>, Vec<(PeerId, Instant)>>,
importing_requests: HashSet<ExtraRequest<B>>,
request_type_name: &'static str,
metrics: Option<Metrics>,
}
impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new(
request_type_name: &'static str,
metrics_registry: Option<&Registry>,
) -> Self {
Self {
tree: ForkTree::new(),
best_seen_finalized_number: Zero::zero(),
pending_requests: VecDeque::new(),
active_requests: HashMap::new(),
failed_requests: HashMap::new(),
importing_requests: HashSet::new(),
request_type_name,
metrics: metrics_registry.and_then(|registry| {
Metrics::register(registry)
.inspect_err(|error| {
log::error!(
target: LOG_TARGET,
"Failed to register `ExtraRequests` metrics {error}",
);
})
.ok()
}),
}
}
pub(crate) fn reset(&mut self) {
self.tree = ForkTree::new();
self.pending_requests.clear();
self.active_requests.clear();
self.failed_requests.clear();
if let Some(metrics) = &self.metrics {
metrics.pending.set(0);
metrics.active.set(0);
metrics.failed.set(0);
}
}
pub(crate) fn matcher(&mut self) -> Matcher<B> {
Matcher::new(self)
}
pub(crate) fn schedule<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
where
F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
{
match self.tree.import(request.0, request.1, (), &is_descendent_of) {
Ok(true) => {
self.pending_requests.push_back((request.0, request.1));
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
},
Err(fork_tree::Error::Revert) => {
},
Err(err) => {
debug!(target: LOG_TARGET, "Failed to insert request {:?} into tree: {}", request, err);
},
_ => (),
}
}
pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
if let Some(request) = self.active_requests.remove(who) {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.active.dec();
metrics.pending.inc();
}
}
}
pub(crate) fn on_response<R>(
&mut self,
who: PeerId,
resp: Option<R>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, R)> {
if let Some(request) = self.active_requests.remove(&who) {
if let Some(metrics) = &self.metrics {
metrics.active.dec();
}
if let Some(r) = resp {
trace!(target: LOG_TARGET,
"Queuing import of {} from {:?} for {:?}",
self.request_type_name, who, request,
);
if self.importing_requests.insert(request) {
if let Some(metrics) = &self.metrics {
metrics.importing.inc();
}
}
return Some((who, request.0, request.1, r))
} else {
trace!(target: LOG_TARGET,
"Empty {} response from {:?} for {:?}",
self.request_type_name, who, request,
);
}
self.failed_requests.entry(request).or_default().push((who, Instant::now()));
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
metrics.pending.inc();
}
} else {
trace!(target: LOG_TARGET,
"No active {} request to {:?}",
self.request_type_name, who,
);
}
None
}
pub(crate) fn on_block_finalized<F>(
&mut self,
best_finalized_hash: &B::Hash,
best_finalized_number: NumberFor<B>,
is_descendent_of: F,
) -> Result<(), fork_tree::Error<ClientError>>
where
F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
{
let request = (*best_finalized_hash, best_finalized_number);
if self.try_finalize_root::<()>(request, Ok(request), false) {
return Ok(())
}
if best_finalized_number > self.best_seen_finalized_number {
match self.tree.finalize_with_ancestors(
best_finalized_hash,
best_finalized_number,
&is_descendent_of,
) {
Err(fork_tree::Error::Revert) => {
},
Err(err) => return Err(err),
Ok(_) => {},
}
self.best_seen_finalized_number = best_finalized_number;
}
let roots = self.tree.roots().collect::<HashSet<_>>();
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
if let Some(metrics) = &self.metrics {
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
metrics.active.set(self.active_requests.len().try_into().unwrap_or(u64::MAX));
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
}
Ok(())
}
pub(crate) fn try_finalize_root<E>(
&mut self,
request: ExtraRequest<B>,
result: Result<ExtraRequest<B>, E>,
reschedule_on_failure: bool,
) -> bool {
if !self.importing_requests.remove(&request) {
return false
}
if let Some(metrics) = &self.metrics {
metrics.importing.dec();
}
let (finalized_hash, finalized_number) = match result {
Ok(req) => (req.0, req.1),
Err(_) => {
if reschedule_on_failure {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
}
return true
},
};
if self.tree.finalize_root(&finalized_hash).is_none() {
warn!(target: LOG_TARGET,
"‼️ Imported {:?} {:?} which isn't a root in the tree: {:?}",
finalized_hash, finalized_number, self.tree.roots().collect::<Vec<_>>()
);
return true
}
self.failed_requests.clear();
self.active_requests.clear();
self.pending_requests.clear();
self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
if let Some(metrics) = &self.metrics {
metrics.failed.set(0);
metrics.active.set(0);
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
}
self.best_seen_finalized_number = finalized_number;
true
}
#[cfg(test)]
pub(crate) fn active_requests(&self) -> impl Iterator<Item = (&PeerId, &ExtraRequest<B>)> {
self.active_requests.iter()
}
#[cfg(test)]
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
self.pending_requests.iter()
}
}
#[derive(Debug)]
pub(crate) struct Matcher<'a, B: BlockT> {
remaining: usize,
extras: &'a mut ExtraRequests<B>,
}
impl<'a, B: BlockT> Matcher<'a, B> {
fn new(extras: &'a mut ExtraRequests<B>) -> Self {
Self { remaining: extras.pending_requests.len(), extras }
}
pub(crate) fn next(
&mut self,
peers: &HashMap<PeerId, PeerSync<B>>,
) -> Option<(PeerId, ExtraRequest<B>)> {
if self.remaining == 0 {
return None
}
for requests in self.extras.failed_requests.values_mut() {
requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
}
if let Some(metrics) = &self.extras.metrics {
metrics
.failed
.set(self.extras.failed_requests.len().try_into().unwrap_or(u64::MAX));
}
while let Some(request) = self.extras.pending_requests.pop_front() {
if let Some(metrics) = &self.extras.metrics {
metrics.pending.dec();
}
for (peer, sync) in
peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
{
if sync.best_number < request.1 {
continue
}
if self.extras.active_requests.contains_key(peer) {
continue
}
if self
.extras
.failed_requests
.get(&request)
.map(|rr| rr.iter().any(|i| &i.0 == peer))
.unwrap_or(false)
{
continue
}
self.extras.active_requests.insert(*peer, request);
if let Some(metrics) = &self.extras.metrics {
metrics.active.inc();
}
trace!(target: LOG_TARGET,
"Sending {} request to {:?} for {:?}",
self.extras.request_type_name, peer, request,
);
return Some((*peer, request))
}
self.extras.pending_requests.push_back(request);
if let Some(metrics) = &self.extras.metrics {
metrics.pending.inc();
}
self.remaining -= 1;
if self.remaining == 0 {
break
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::strategy::chain_sync::PeerSync;
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sp_blockchain::Error as ClientError;
use sp_test_primitives::{Block, BlockNumber, Hash};
use std::collections::{HashMap, HashSet};
#[test]
fn requests_are_processed_in_order() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test", None);
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
let pending = requests.pending_requests.clone();
let mut m = requests.matcher();
for p in &pending {
let (peer, r) = m.next(&peers.0).unwrap();
assert_eq!(p, &r);
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
}
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
}
#[test]
fn new_roots_schedule_new_request() {
fn property(data: Vec<BlockNumber>) {
let mut requests = ExtraRequests::<Block>::new("test", None);
for (i, number) in data.into_iter().enumerate() {
let hash = [i as u8; 32].into();
let pending = requests.pending_requests.len();
let is_root = requests.tree.roots().any(|(&h, &n, _)| hash == h && number == n);
requests.schedule((hash, number), |a, b| Ok(a[0] >= b[0]));
if !is_root {
assert_eq!(1 + pending, requests.pending_requests.len())
}
}
}
QuickCheck::new().quickcheck(property as fn(Vec<BlockNumber>))
}
#[test]
fn disconnecting_implies_rescheduling() {
fn property(mut peers: ArbitraryPeers) -> bool {
let mut requests = ExtraRequests::<Block>::new("test", None);
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
assert!(requests.pending_requests.is_empty());
let active_peers = requests.active_requests.keys().cloned().collect::<Vec<_>>();
let previously_active =
requests.active_requests.values().cloned().collect::<HashSet<_>>();
for peer in &active_peers {
requests.peer_disconnected(peer)
}
assert!(requests.active_requests.is_empty());
previously_active == requests.pending_requests.iter().cloned().collect::<HashSet<_>>()
}
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers) -> bool)
}
#[test]
fn no_response_reschedules() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test", None);
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
let active = requests.active_requests.iter().map(|(&p, &r)| (p, r)).collect::<Vec<_>>();
for (peer, req) in &active {
assert!(requests.failed_requests.get(req).is_none());
assert!(!requests.pending_requests.contains(req));
assert!(requests.on_response::<()>(*peer, None).is_none());
assert!(requests.pending_requests.contains(req));
assert_eq!(
1,
requests
.failed_requests
.get(req)
.unwrap()
.iter()
.filter(|(p, _)| p == peer)
.count()
)
}
}
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
}
#[test]
fn request_is_rescheduled_when_earlier_block_is_finalized() {
sp_tracing::try_init_simple();
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
let hash6 = [6; 32].into();
let hash7 = [7; 32].into();
fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
Ok(target[0] >= base[0])
}
finality_proofs.tree.import(hash4, 4, (), &is_descendent_of).unwrap();
finality_proofs.tree.finalize_root(&hash4);
finality_proofs.schedule((hash6, 6), is_descendent_of);
finality_proofs.importing_requests.insert((hash6, 6));
finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash5, 5)), true);
assert_eq!(finality_proofs.pending_requests.iter().collect::<Vec<_>>(), vec![&(hash6, 6)]);
finality_proofs.importing_requests.insert((hash6, 6));
finality_proofs.on_block_finalized(&hash6, 6, is_descendent_of).unwrap();
finality_proofs.on_block_finalized(&hash7, 7, is_descendent_of).unwrap();
finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true);
assert_eq!(
finality_proofs.pending_requests.iter().collect::<Vec<_>>(),
Vec::<&(Hash, u64)>::new()
);
}
#[test]
fn ancestor_roots_are_finalized_when_finality_notification_is_missed() {
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
Ok(target[0] >= base[0])
}
finality_proofs.schedule((hash4, 4), is_descendent_of);
finality_proofs.importing_requests.insert((hash4, 5));
finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
assert_eq!(finality_proofs.tree.roots().count(), 0);
}
#[derive(Debug, Clone)]
struct ArbitraryPeerSyncState(PeerSyncState<Block>);
impl Arbitrary for ArbitraryPeerSyncState {
fn arbitrary(g: &mut Gen) -> Self {
let s = match u8::arbitrary(g) % 4 {
0 => PeerSyncState::Available,
1 => PeerSyncState::DownloadingNew(BlockNumber::arbitrary(g)),
2 => PeerSyncState::DownloadingStale(Hash::random()),
_ => PeerSyncState::DownloadingJustification(Hash::random()),
};
ArbitraryPeerSyncState(s)
}
}
#[derive(Debug, Clone)]
struct ArbitraryPeerSync(PeerSync<Block>);
impl Arbitrary for ArbitraryPeerSync {
fn arbitrary(g: &mut Gen) -> Self {
let ps = PeerSync {
peer_id: PeerId::random(),
common_number: u64::arbitrary(g),
best_hash: Hash::random(),
best_number: u64::arbitrary(g),
state: ArbitraryPeerSyncState::arbitrary(g).0,
};
ArbitraryPeerSync(ps)
}
}
#[derive(Debug, Clone)]
struct ArbitraryPeers(HashMap<PeerId, PeerSync<Block>>);
impl Arbitrary for ArbitraryPeers {
fn arbitrary(g: &mut Gen) -> Self {
let mut peers = HashMap::with_capacity(g.size());
for _ in 0..g.size() {
let ps = ArbitraryPeerSync::arbitrary(g).0;
peers.insert(ps.peer_id, ps);
}
ArbitraryPeers(peers)
}
}
}