use crate::finality::SubmitFinalityProofCallBuilder;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bp_header_chain::ConsensusLogReader;
use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::{One, Saturating, Zero};
use sp_runtime::traits::Header;
use finality_relay::{FinalitySyncParams, HeadersToRelay, TargetClient as FinalityTargetClient};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
HeaderIdOf,
};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError,
STALL_TIMEOUT,
};
use crate::{
finality::{
source::{RequiredHeaderNumberRef, SubstrateFinalitySource},
target::SubstrateFinalityTarget,
SubstrateFinalitySyncPipeline, RECENT_FINALITY_PROOFS_LIMIT,
},
finality_base::engine::Engine,
on_demand::OnDemandRelay,
TransactionParams,
};
#[derive(Clone)]
pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline, SourceClnt, TargetClnt> {
relay_task_name: String,
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
source_client: SourceClnt,
target_client: TargetClnt,
}
impl<
P: SubstrateFinalitySyncPipeline,
SourceClnt: Client<P::SourceChain>,
TargetClnt: Client<P::TargetChain>,
> OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
{
pub fn new(
source_client: SourceClnt,
target_client: TargetClnt,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
headers_to_relay: HeadersToRelay,
metrics_params: Option<MetricsParams>,
) -> Self
where
AccountIdOf<P::TargetChain>:
From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
{
let required_header_number = Arc::new(Mutex::new(Zero::zero()));
let this = OnDemandHeadersRelay {
relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
required_header_number: required_header_number.clone(),
source_client: source_client.clone(),
target_client: target_client.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
source_client,
target_client,
target_transaction_params,
headers_to_relay,
required_header_number,
metrics_params,
)
.await;
});
this
}
}
#[async_trait]
impl<
P: SubstrateFinalitySyncPipeline,
SourceClnt: Client<P::SourceChain>,
TargetClnt: Client<P::TargetChain>,
> OnDemandRelay<P::SourceChain, P::TargetChain>
for OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
self.source_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await
}
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
let mut required_header_number = self.required_header_number.lock().await;
if required_header > *required_header_number {
log::trace!(
target: "bridge",
"[{}] More {} headers required. Going to sync up to the {}",
self.relay_task_name,
P::SourceChain::NAME,
required_header,
);
*required_header_number = required_header;
}
}
async fn prove_header(
&self,
required_header: BlockNumberOf<P::SourceChain>,
) -> Result<(HeaderIdOf<P::SourceChain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
const MAX_ITERATIONS: u32 = 4;
let mut iterations = 0;
let mut current_required_header = required_header;
loop {
let finality_source =
SubstrateFinalitySource::<P, _>::new(self.source_client.clone(), None);
let (header, mut proof) =
finality_source.prove_block_finality(current_required_header).await?;
let header_id = header.id();
let context = P::FinalityEngine::verify_and_optimize_proof(
&self.target_client,
&header,
&mut proof,
)
.await?;
let check_result = P::FinalityEngine::check_max_expected_call_limits(&header, &proof);
if check_result.is_weight_limit_exceeded || check_result.extra_size != 0 {
iterations += 1;
current_required_header = header_id.number().saturating_add(One::one());
if iterations < MAX_ITERATIONS {
log::debug!(
target: "bridge",
"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?}. But it exceeds limits: {:?}. \
Going to select next header",
self.relay_task_name,
P::SourceChain::NAME,
required_header,
P::SourceChain::NAME,
header_id,
check_result,
);
continue;
}
}
log::debug!(
target: "bridge",
"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} (after {} iterations)",
self.relay_task_name,
P::SourceChain::NAME,
required_header,
P::SourceChain::NAME,
header_id,
iterations,
);
let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(
header, proof, false, context,
);
return Ok((header_id, vec![call]));
}
}
}
async fn background_task<P: SubstrateFinalitySyncPipeline>(
source_client: impl Client<P::SourceChain>,
target_client: impl Client<P::TargetChain>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
headers_to_relay: HeadersToRelay,
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
metrics_params: Option<MetricsParams>,
) where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
{
let relay_task_name = on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>();
let target_transactions_mortality = target_transaction_params.mortality;
let mut finality_source = SubstrateFinalitySource::<P, _>::new(
source_client.clone(),
Some(required_header_number.clone()),
);
let mut finality_target =
SubstrateFinalityTarget::new(target_client.clone(), target_transaction_params);
let mut latest_non_mandatory_at_source = Zero::zero();
let mut restart_relay = true;
let finality_relay_task = futures::future::Fuse::terminated();
futures::pin_mut!(finality_relay_task);
loop {
select! {
_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
_ = finality_relay_task => {
restart_relay = true;
},
}
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Source,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue
}
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target::<P, _>(&finality_target, &relay_task_name)
.await;
if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Target,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue
}
let best_finalized_source_header_at_source_fmt =
format!("{best_finalized_source_header_at_source:?}");
let best_finalized_source_header_at_target_fmt =
format!("{best_finalized_source_header_at_target:?}");
let required_header_number_value = *required_header_number.lock().await;
let mandatory_scan_range = mandatory_headers_scan_range::<P::SourceChain>(
best_finalized_source_header_at_source.ok(),
best_finalized_source_header_at_target.ok(),
required_header_number_value,
)
.await;
log::trace!(
target: "bridge",
"[{}] Mandatory headers scan range: ({:?}, {:?}, {:?}) -> {:?}",
relay_task_name,
required_header_number_value,
best_finalized_source_header_at_source_fmt,
best_finalized_source_header_at_target_fmt,
mandatory_scan_range,
);
if let Some(mandatory_scan_range) = mandatory_scan_range {
let relay_mandatory_header_result = relay_mandatory_header_from_range(
&finality_source,
&required_header_number,
best_finalized_source_header_at_target_fmt,
(
std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source),
mandatory_scan_range.1,
),
&relay_task_name,
)
.await;
match relay_mandatory_header_result {
Ok(true) => (),
Ok(false) => {
latest_non_mandatory_at_source = mandatory_scan_range.1;
log::trace!(
target: "bridge",
"[{}] No mandatory {} headers in the range {:?}",
relay_task_name,
P::SourceChain::NAME,
mandatory_scan_range,
);
},
Err(e) => {
log::warn!(
target: "bridge",
"[{}] Failed to scan mandatory {} headers range ({:?}): {:?}",
relay_task_name,
P::SourceChain::NAME,
mandatory_scan_range,
e,
);
if e.is_connection_error() {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Source,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue
}
},
}
}
if restart_relay {
let stall_timeout = relay_substrate_client::transaction_stall_timeout(
target_transactions_mortality,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
STALL_TIMEOUT,
);
log::info!(
target: "bridge",
"[{}] Starting on-demand headers relay task\n\t\
Headers to relay: {:?}\n\t\
Tx mortality: {:?} (~{}m)\n\t\
Stall timeout: {:?}",
relay_task_name,
headers_to_relay,
target_transactions_mortality,
stall_timeout.as_secs_f64() / 60.0f64,
stall_timeout,
);
finality_relay_task.set(
finality_relay::run(
finality_source.clone(),
finality_target.clone(),
FinalitySyncParams {
tick: std::cmp::max(
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
),
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
stall_timeout,
headers_to_relay,
},
metrics_params.clone().unwrap_or_else(MetricsParams::disabled),
futures::future::pending(),
)
.fuse(),
);
restart_relay = false;
}
}
}
async fn mandatory_headers_scan_range<C: Chain>(
best_finalized_source_header_at_source: Option<C::BlockNumber>,
best_finalized_source_header_at_target: Option<C::BlockNumber>,
required_header_number: BlockNumberOf<C>,
) -> Option<(C::BlockNumber, C::BlockNumber)> {
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target.unwrap_or(required_header_number);
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
if required_header_number >= best_finalized_source_header_at_source {
return None
}
Some((
best_finalized_source_header_at_target + One::one(),
best_finalized_source_header_at_source,
))
}
async fn relay_mandatory_header_from_range<P, SourceClnt>(
finality_source: &SubstrateFinalitySource<P, SourceClnt>,
required_header_number: &RequiredHeaderNumberRef<P::SourceChain>,
best_finalized_source_header_at_target: String,
range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
relay_task_name: &str,
) -> Result<bool, relay_substrate_client::Error>
where
P: SubstrateFinalitySyncPipeline,
SourceClnt: Client<P::SourceChain>,
{
let mandatory_source_header_number =
find_mandatory_header_in_range(finality_source, range).await?;
let mandatory_source_header_number = match mandatory_source_header_number {
Some(mandatory_source_header_number) => mandatory_source_header_number,
None => return Ok(false),
};
let mut required_header_number = required_header_number.lock().await;
if *required_header_number >= mandatory_source_header_number {
return Ok(false)
}
log::trace!(
target: "bridge",
"[{}] Too many {} headers missing at target ({} vs {}). Going to sync up to the mandatory {}",
relay_task_name,
P::SourceChain::NAME,
best_finalized_source_header_at_target,
range.1,
mandatory_source_header_number,
);
*required_header_number = mandatory_source_header_number;
Ok(true)
}
async fn best_finalized_source_header_at_source<P, SourceClnt>(
finality_source: &SubstrateFinalitySource<P, SourceClnt>,
relay_task_name: &str,
) -> Result<BlockNumberOf<P::SourceChain>, relay_substrate_client::Error>
where
P: SubstrateFinalitySyncPipeline,
SourceClnt: Client<P::SourceChain>,
{
finality_source.on_chain_best_finalized_block_number().await.map_err(|error| {
log::error!(
target: "bridge",
"[{}] Failed to read best finalized source header from source: {:?}",
relay_task_name,
error,
);
error
})
}
async fn best_finalized_source_header_at_target<P, TargetClnt>(
finality_target: &SubstrateFinalityTarget<P, TargetClnt>,
relay_task_name: &str,
) -> Result<
BlockNumberOf<P::SourceChain>,
<SubstrateFinalityTarget<P, TargetClnt> as RelayClient>::Error,
>
where
P: SubstrateFinalitySyncPipeline,
TargetClnt: Client<P::TargetChain>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
{
finality_target
.best_finalized_source_block_id()
.await
.map_err(|error| {
log::error!(
target: "bridge",
"[{}] Failed to read best finalized source header from target: {:?}",
relay_task_name,
error,
);
error
})
.map(|id| id.0)
}
async fn find_mandatory_header_in_range<P, SourceClnt>(
finality_source: &SubstrateFinalitySource<P, SourceClnt>,
range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
) -> Result<Option<BlockNumberOf<P::SourceChain>>, relay_substrate_client::Error>
where
P: SubstrateFinalitySyncPipeline,
SourceClnt: Client<P::SourceChain>,
{
let mut current = range.0;
while current <= range.1 {
let header = finality_source.client().header_by_number(current).await?;
if <P::FinalityEngine as Engine<P::SourceChain>>::ConsensusLogReader::schedules_authorities_change(
header.digest(),
) {
return Ok(Some(current))
}
current += One::one();
}
Ok(None)
}
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
format!("{}-to-{}-on-demand-headers", SourceChain::NAME, TargetChain::NAME)
}
#[cfg(test)]
mod tests {
use super::*;
use relay_substrate_client::test_chain::TestChain;
const AT_SOURCE: Option<BlockNumberOf<TestChain>> = Some(10);
const AT_TARGET: Option<BlockNumberOf<TestChain>> = Some(1);
#[async_std::test]
async fn mandatory_headers_scan_range_selects_range_if_some_headers_are_missing() {
assert_eq!(
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 0,).await,
Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())),
);
}
#[async_std::test]
async fn mandatory_headers_scan_range_selects_nothing_if_already_queued() {
assert_eq!(
mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, AT_SOURCE.unwrap(),)
.await,
None,
);
}
}