#![cfg(feature = "full-node")]
use super::{HeaderProvider, HeaderProviderProvider};
use futures::channel::oneshot;
use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG;
use polkadot_node_subsystem::messages::{
ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::{AllMessages, Handle};
use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader};
use sp_consensus::{Error as ConsensusError, SelectChain};
use std::sync::Arc;
pub use sc_service::SpawnTaskHandle;
const MAX_FINALITY_LAG: polkadot_primitives::BlockNumber = PRIMITIVES_MAX_FINALITY_LAG;
const LOG_TARGET: &str = "parachain::chain-selection";
#[derive(Debug, Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
#[derive(Debug, Clone)]
struct MetricsInner {
approval_checking_finality_lag: prometheus::Gauge<prometheus::U64>,
disputes_finality_lag: prometheus::Gauge<prometheus::U64>,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
approval_checking_finality_lag: prometheus::register(
prometheus::Gauge::with_opts(
prometheus::Opts::new(
"polkadot_parachain_approval_checking_finality_lag",
"How far behind the head of the chain the Approval Checking protocol wants to vote",
)
)?,
registry,
)?,
disputes_finality_lag: prometheus::register(
prometheus::Gauge::with_opts(
prometheus::Opts::new(
"polkadot_parachain_disputes_finality_lag",
"How far behind the head of the chain the Disputes protocol wants to vote",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl Metrics {
fn note_approval_checking_finality_lag(&self, lag: BlockNumber) {
if let Some(ref metrics) = self.0 {
metrics.approval_checking_finality_lag.set(lag as _);
}
}
fn note_disputes_finality_lag(&self, lag: BlockNumber) {
if let Some(ref metrics) = self.0 {
metrics.disputes_finality_lag.set(lag as _);
}
}
}
enum IsDisputesAwareWithOverseer<B: sc_client_api::Backend<PolkadotBlock>> {
Yes(SelectRelayChainInner<B, Handle>),
No,
}
impl<B> Clone for IsDisputesAwareWithOverseer<B>
where
B: sc_client_api::Backend<PolkadotBlock>,
SelectRelayChainInner<B, Handle>: Clone,
{
fn clone(&self) -> Self {
match self {
Self::Yes(ref inner) => Self::Yes(inner.clone()),
Self::No => Self::No,
}
}
}
pub struct SelectRelayChain<B: sc_client_api::Backend<PolkadotBlock>> {
longest_chain: sc_consensus::LongestChain<B, PolkadotBlock>,
selection: IsDisputesAwareWithOverseer<B>,
}
impl<B> Clone for SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock>,
SelectRelayChainInner<B, Handle>: Clone,
{
fn clone(&self) -> Self {
Self { longest_chain: self.longest_chain.clone(), selection: self.selection.clone() }
}
}
impl<B> SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
pub fn new_longest_chain(backend: Arc<B>) -> Self {
gum::debug!(target: LOG_TARGET, "Using {} chain selection algorithm", "longest");
Self {
longest_chain: sc_consensus::LongestChain::new(backend.clone()),
selection: IsDisputesAwareWithOverseer::No,
}
}
pub fn new_with_overseer(
backend: Arc<B>,
overseer: Handle,
metrics: Metrics,
spawn_handle: Option<SpawnTaskHandle>,
approval_voting_parallel_enabled: bool,
) -> Self {
gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",);
SelectRelayChain {
longest_chain: sc_consensus::LongestChain::new(backend.clone()),
selection: IsDisputesAwareWithOverseer::Yes(SelectRelayChainInner::new(
backend,
overseer,
metrics,
spawn_handle,
approval_voting_parallel_enabled,
)),
}
}
pub fn as_longest_chain(&self) -> &sc_consensus::LongestChain<B, PolkadotBlock> {
&self.longest_chain
}
}
#[async_trait::async_trait]
impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
async fn leaves(&self) -> Result<Vec<Hash>, ConsensusError> {
match self.selection {
IsDisputesAwareWithOverseer::Yes(ref selection) => selection.leaves().await,
IsDisputesAwareWithOverseer::No => self.longest_chain.leaves().await,
}
}
async fn best_chain(&self) -> Result<PolkadotHeader, ConsensusError> {
match self.selection {
IsDisputesAwareWithOverseer::Yes(ref selection) => selection.best_chain().await,
IsDisputesAwareWithOverseer::No => self.longest_chain.best_chain().await,
}
}
async fn finality_target(
&self,
target_hash: Hash,
maybe_max_number: Option<BlockNumber>,
) -> Result<Hash, ConsensusError> {
if let IsDisputesAwareWithOverseer::Yes(ref selection) = self.selection {
selection
.finality_target_with_longest_chain(target_hash, maybe_max_number)
.await
} else {
self.longest_chain.finality_target(target_hash, maybe_max_number).await
}
}
}
pub struct SelectRelayChainInner<B, OH> {
backend: Arc<B>,
overseer: OH,
metrics: Metrics,
spawn_handle: Option<SpawnTaskHandle>,
approval_voting_parallel_enabled: bool,
}
impl<B, OH> SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock>,
OH: OverseerHandleT,
{
pub fn new(
backend: Arc<B>,
overseer: OH,
metrics: Metrics,
spawn_handle: Option<SpawnTaskHandle>,
approval_voting_parallel_enabled: bool,
) -> Self {
SelectRelayChainInner {
backend,
overseer,
metrics,
spawn_handle,
approval_voting_parallel_enabled,
}
}
fn block_header(&self, hash: Hash) -> Result<PolkadotHeader, ConsensusError> {
match HeaderProvider::header(self.backend.header_provider(), hash) {
Ok(Some(header)) => Ok(header),
Ok(None) =>
Err(ConsensusError::ChainLookup(format!("Missing header with hash {:?}", hash,))),
Err(e) => Err(ConsensusError::ChainLookup(format!(
"Lookup failed for header with hash {:?}: {:?}",
hash, e,
))),
}
}
fn block_number(&self, hash: Hash) -> Result<BlockNumber, ConsensusError> {
match HeaderProvider::number(self.backend.header_provider(), hash) {
Ok(Some(number)) => Ok(number),
Ok(None) =>
Err(ConsensusError::ChainLookup(format!("Missing number with hash {:?}", hash,))),
Err(e) => Err(ConsensusError::ChainLookup(format!(
"Lookup failed for number with hash {:?}: {:?}",
hash, e,
))),
}
}
}
impl<B, OH> Clone for SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock> + Send + Sync,
OH: OverseerHandleT,
{
fn clone(&self) -> Self {
SelectRelayChainInner {
backend: self.backend.clone(),
overseer: self.overseer.clone(),
metrics: self.metrics.clone(),
spawn_handle: self.spawn_handle.clone(),
approval_voting_parallel_enabled: self.approval_voting_parallel_enabled,
}
}
}
#[derive(thiserror::Error, Debug)]
enum Error {
#[error("Request for leaves from chain selection got canceled")]
LeavesCanceled(oneshot::Canceled),
#[error("Request for leaves from chain selection got canceled")]
BestLeafContainingCanceled(oneshot::Canceled),
#[error("Request for determining the undisputed chain from DisputeCoordinator got canceled")]
DetermineUndisputedChainCanceled(oneshot::Canceled),
#[error("Request approved ancestor from approval voting got canceled")]
ApprovedAncestorCanceled(oneshot::Canceled),
#[error("ChainSelection returned no leaves")]
EmptyLeaves,
}
#[async_trait::async_trait]
pub trait OverseerHandleT: Clone + Send + Sync {
async fn send_msg<M: Send + Into<AllMessages>>(&mut self, msg: M, origin: &'static str);
}
#[async_trait::async_trait]
impl OverseerHandleT for Handle {
async fn send_msg<M: Send + Into<AllMessages>>(&mut self, msg: M, origin: &'static str) {
Handle::send_msg(self, msg, origin).await
}
}
impl<B, OH> SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock>,
OH: OverseerHandleT + 'static,
{
async fn leaves(&self) -> Result<Vec<Hash>, ConsensusError> {
let (tx, rx) = oneshot::channel();
self.overseer
.clone()
.send_msg(ChainSelectionMessage::Leaves(tx), std::any::type_name::<Self>())
.await;
let leaves = rx
.await
.map_err(Error::LeavesCanceled)
.map_err(|e| ConsensusError::Other(Box::new(e)))?;
gum::trace!(target: LOG_TARGET, ?leaves, "Chain selection leaves");
Ok(leaves)
}
async fn best_chain(&self) -> Result<PolkadotHeader, ConsensusError> {
let best_leaf = *self
.leaves()
.await?
.first()
.ok_or_else(|| ConsensusError::Other(Box::new(Error::EmptyLeaves)))?;
gum::trace!(target: LOG_TARGET, ?best_leaf, "Best chain");
self.block_header(best_leaf)
}
pub(crate) async fn finality_target_with_longest_chain(
&self,
target_hash: Hash,
maybe_max_number: Option<BlockNumber>,
) -> Result<Hash, ConsensusError> {
let mut overseer = self.overseer.clone();
let subchain_head = {
let (tx, rx) = oneshot::channel();
overseer
.send_msg(
ChainSelectionMessage::BestLeafContaining(target_hash, tx),
std::any::type_name::<Self>(),
)
.await;
let best = rx
.await
.map_err(Error::BestLeafContainingCanceled)
.map_err(|e| ConsensusError::Other(Box::new(e)))?;
gum::trace!(target: LOG_TARGET, ?best, "Best leaf containing");
match best {
None => return Ok(target_hash),
Some(best) => best,
}
};
let target_number = self.block_number(target_hash)?;
let subchain_head = match maybe_max_number {
None => subchain_head,
Some(max) => {
if max <= target_number {
if max < target_number {
gum::warn!(
LOG_TARGET,
max_number = max,
target_number,
"`finality_target` max number is less than target number",
);
}
return Ok(target_hash)
}
let subchain_header = self.block_header(subchain_head)?;
if subchain_header.number <= max {
gum::trace!(target: LOG_TARGET, ?subchain_head, "Constrained sub-chain head",);
subchain_head
} else {
let (ancestor_hash, _) =
crate::grandpa_support::walk_backwards_to_target_block(
self.backend.header_provider(),
max,
&subchain_header,
)
.map_err(|e| ConsensusError::ChainLookup(format!("{:?}", e)))?;
gum::trace!(
target: LOG_TARGET,
?ancestor_hash,
"Grandpa walk backwards sub-chain head"
);
ancestor_hash
}
},
};
let initial_leaf = subchain_head;
let initial_leaf_number = self.block_number(initial_leaf)?;
let (subchain_head, subchain_number, subchain_block_descriptions) = {
let (tx, rx) = oneshot::channel();
if self.approval_voting_parallel_enabled {
overseer
.send_msg(
ApprovalVotingParallelMessage::ApprovedAncestor(
subchain_head,
target_number,
tx,
),
std::any::type_name::<Self>(),
)
.await;
} else {
overseer
.send_msg(
ApprovalVotingMessage::ApprovedAncestor(subchain_head, target_number, tx),
std::any::type_name::<Self>(),
)
.await;
}
match rx
.await
.map_err(Error::ApprovedAncestorCanceled)
.map_err(|e| ConsensusError::Other(Box::new(e)))?
{
None => (target_hash, target_number, Vec::new()),
Some(HighestApprovedAncestorBlock { number, hash, descriptions }) =>
(hash, number, descriptions),
}
};
gum::trace!(target: LOG_TARGET, ?subchain_head, "Ancestor approval restriction applied",);
let lag = initial_leaf_number.saturating_sub(subchain_number);
self.metrics.note_approval_checking_finality_lag(lag);
if let Some(spawn_handle) = &self.spawn_handle {
let mut overseer_handle = self.overseer.clone();
let approval_voting_parallel_enabled = self.approval_voting_parallel_enabled;
let lag_update_task = async move {
if approval_voting_parallel_enabled {
overseer_handle
.send_msg(
ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag),
std::any::type_name::<Self>(),
)
.await;
} else {
overseer_handle
.send_msg(
ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
std::any::type_name::<Self>(),
)
.await;
}
};
spawn_handle.spawn(
"approval-checking-lag-update",
Some("relay-chain-selection"),
Box::pin(lag_update_task),
);
}
let (lag, subchain_head) = {
if Some(subchain_block_descriptions.len() as _) !=
subchain_number.checked_sub(target_number)
{
gum::error!(
LOG_TARGET,
present_block_descriptions = subchain_block_descriptions.len(),
target_number,
subchain_number,
"Mismatch of anticipated block descriptions and block number difference.",
);
return Ok(target_hash)
}
let (tx, rx) = oneshot::channel();
overseer
.send_msg(
DisputeCoordinatorMessage::DetermineUndisputedChain {
base: (target_number, target_hash),
block_descriptions: subchain_block_descriptions,
tx,
},
std::any::type_name::<Self>(),
)
.await;
let (lag, subchain_head) =
match rx.await.map_err(Error::DetermineUndisputedChainCanceled) {
Ok((subchain_number, subchain_head)) => {
let lag_disputes = initial_leaf_number.saturating_sub(subchain_number);
self.metrics.note_disputes_finality_lag(lag_disputes);
(lag_disputes, subchain_head)
},
Err(e) => {
gum::error!(
target: LOG_TARGET,
error = ?e,
"Call to `DetermineUndisputedChain` failed",
);
return Ok(target_hash)
},
};
(lag, subchain_head)
};
gum::trace!(
target: LOG_TARGET,
?subchain_head,
"Disputed blocks in ancestry restriction applied",
);
if lag > MAX_FINALITY_LAG {
let safe_target = initial_leaf_number - MAX_FINALITY_LAG;
if safe_target <= target_number {
gum::warn!(target: LOG_TARGET, ?target_hash, "Safeguard enforced finalization");
Ok(target_hash)
} else {
let initial_leaf_header = self.block_header(initial_leaf)?;
let (forced_target, _) = crate::grandpa_support::walk_backwards_to_target_block(
self.backend.header_provider(),
safe_target,
&initial_leaf_header,
)
.map_err(|e| ConsensusError::ChainLookup(format!("{:?}", e)))?;
gum::warn!(
target: LOG_TARGET,
?forced_target,
"Safeguard enforced finalization of child"
);
Ok(forced_target)
}
} else {
Ok(subchain_head)
}
}
}