use crate::{
messages::source::best_finalized_peer_header_at_self,
on_demand::OnDemandRelay,
parachains::{
source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter,
SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline,
},
TransactionParams,
};
use async_std::{
channel::{unbounded, Receiver, Sender},
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use bp_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use bp_polkadot_core::parachains::{ParaHash, ParaId};
use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::Zero;
use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient};
use relay_substrate_client::{
is_ancient_block, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client,
Error as SubstrateError, HashOf, HeaderIdOf, ParachainBase,
};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient,
HeaderId, UniqueSaturatedInto,
};
use std::fmt::Debug;
#[derive(Clone)]
pub struct OnDemandParachainsRelay<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt> {
relay_task_name: String,
required_header_number_sender: Sender<BlockNumberOf<P::SourceParachain>>,
source_relay_client: SourceRelayClnt,
target_client: TargetClnt,
on_demand_source_relay_to_target_headers:
Arc<dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>>,
}
impl<
P: SubstrateParachainsPipeline,
SourceRelayClnt: Client<P::SourceRelayChain>,
TargetClnt: Client<P::TargetChain>,
> OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
{
pub fn new(
source_relay_client: SourceRelayClnt,
target_client: TargetClnt,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
on_demand_source_relay_to_target_headers: Arc<
dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
>,
) -> Self
where
P::SourceParachain: Chain<Hash = ParaHash>,
P::SourceRelayChain:
Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
AccountIdOf<P::TargetChain>:
From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
{
let (required_header_number_sender, required_header_number_receiver) = unbounded();
let this = OnDemandParachainsRelay {
relay_task_name: on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(
),
required_header_number_sender,
source_relay_client: source_relay_client.clone(),
target_client: target_client.clone(),
on_demand_source_relay_to_target_headers: on_demand_source_relay_to_target_headers
.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
source_relay_client,
target_client,
target_transaction_params,
on_demand_source_relay_to_target_headers,
required_header_number_receiver,
)
.await;
});
this
}
}
#[async_trait]
impl<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
OnDemandRelay<P::SourceParachain, P::TargetChain>
for OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
where
P::SourceParachain: Chain<Hash = ParaHash>,
SourceRelayClnt: Client<P::SourceRelayChain>,
TargetClnt: Client<P::TargetChain>,
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
self.source_relay_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await?;
self.on_demand_source_relay_to_target_headers.reconnect().await
}
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
if let Err(e) = self.required_header_number_sender.send(required_header).await {
log::trace!(
target: "bridge",
"[{}] Failed to request {} header {:?}: {:?}",
self.relay_task_name,
P::SourceParachain::NAME,
required_header,
e,
);
}
}
async fn prove_header(
&self,
required_parachain_header: BlockNumberOf<P::SourceParachain>,
) -> Result<(HeaderIdOf<P::SourceParachain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
let parachains_source = ParachainsSource::<P, _>::new(
self.source_relay_client.clone(),
Arc::new(Mutex::new(AvailableHeader::Missing)),
);
let env = (self, ¶chains_source);
let (need_to_prove_relay_block, selected_relay_block, selected_parachain_block) =
select_headers_to_prove(env, required_parachain_header).await?;
log::debug!(
target: "bridge",
"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} and {} head {:?}",
self.relay_task_name,
P::SourceParachain::NAME,
required_parachain_header,
P::SourceParachain::NAME,
selected_parachain_block,
P::SourceRelayChain::NAME,
if need_to_prove_relay_block {
Some(selected_relay_block)
} else {
None
},
);
let mut calls = Vec::new();
let mut proved_relay_block = selected_relay_block;
if need_to_prove_relay_block {
let (relay_block, relay_prove_call) = self
.on_demand_source_relay_to_target_headers
.prove_header(selected_relay_block.number())
.await?;
proved_relay_block = relay_block;
calls.extend(relay_prove_call);
}
let para_id = ParaId(P::SourceParachain::PARACHAIN_ID);
let mut proved_parachain_block = selected_parachain_block;
if proved_relay_block != selected_relay_block {
proved_parachain_block = parachains_source
.on_chain_para_head_id(proved_relay_block)
.await?
.ok_or_else(|| {
SubstrateError::MissingRequiredParachainHead(
para_id,
proved_relay_block.number().unique_saturated_into(),
)
})?;
log::debug!(
target: "bridge",
"[{}] Selected to prove {} head {:?} and {} head {:?}. Instead proved {} head {:?} and {} head {:?}",
self.relay_task_name,
P::SourceParachain::NAME,
selected_parachain_block,
P::SourceRelayChain::NAME,
selected_relay_block,
P::SourceParachain::NAME,
proved_parachain_block,
P::SourceRelayChain::NAME,
proved_relay_block,
);
}
let (para_proof, para_hash) =
parachains_source.prove_parachain_head(proved_relay_block).await?;
calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
proved_relay_block,
vec![(para_id, para_hash)],
para_proof,
false,
));
Ok((proved_parachain_block, calls))
}
}
async fn background_task<P: SubstrateParachainsPipeline>(
source_relay_client: impl Client<P::SourceRelayChain>,
target_client: impl Client<P::TargetChain>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
on_demand_source_relay_to_target_headers: Arc<
dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
>,
required_parachain_header_number_receiver: Receiver<BlockNumberOf<P::SourceParachain>>,
) where
P::SourceParachain: Chain<Hash = ParaHash>,
P::SourceRelayChain:
Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
{
let relay_task_name = on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>();
let target_transactions_mortality = target_transaction_params.mortality;
let mut relay_state = RelayState::Idle;
let mut required_parachain_header_number = Zero::zero();
let required_para_header_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
let mut restart_relay = true;
let parachains_relay_task = futures::future::Fuse::terminated();
futures::pin_mut!(parachains_relay_task);
let mut parachains_source = ParachainsSource::<P, _>::new(
source_relay_client.clone(),
required_para_header_ref.clone(),
);
let mut parachains_target = ParachainsTarget::<P, _, _>::new(
source_relay_client.clone(),
target_client.clone(),
target_transaction_params.clone(),
);
loop {
select! {
new_required_parachain_header_number = required_parachain_header_number_receiver.recv().fuse() => {
let new_required_parachain_header_number = match new_required_parachain_header_number {
Ok(new_required_parachain_header_number) => new_required_parachain_header_number,
Err(e) => {
log::error!(
target: "bridge",
"[{}] Background task has exited with error: {:?}",
relay_task_name,
e,
);
return;
},
};
if new_required_parachain_header_number > required_parachain_header_number {
log::trace!(
target: "bridge",
"[{}] More {} headers required. Going to sync up to the {}",
relay_task_name,
P::SourceParachain::NAME,
new_required_parachain_header_number,
);
required_parachain_header_number = new_required_parachain_header_number;
}
},
_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
_ = parachains_relay_task => {
restart_relay = true;
},
}
let relay_data = read_relay_data(
¶chains_source,
¶chains_target,
required_parachain_header_number,
)
.await;
match relay_data {
Ok(relay_data) => {
let prev_relay_state = relay_state;
relay_state = select_headers_to_relay(&relay_data, relay_state);
log::trace!(
target: "bridge",
"[{}] Selected new relay state: {:?} using old state {:?} and data {:?}",
relay_task_name,
relay_state,
prev_relay_state,
relay_data,
);
},
Err(failed_client) => {
relay_utils::relay_loop::reconnect_failed_client(
failed_client,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut parachains_source,
&mut parachains_target,
)
.await;
continue
},
}
match relay_state {
RelayState::Idle => (),
RelayState::RelayingRelayHeader(required_relay_header) => {
on_demand_source_relay_to_target_headers
.require_more_headers(required_relay_header)
.await;
},
RelayState::RelayingParaHeader(required_para_header) => {
*required_para_header_ref.lock().await =
AvailableHeader::Available(required_para_header);
},
}
if restart_relay {
let stall_timeout = relay_substrate_client::transaction_stall_timeout(
target_transactions_mortality,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
relay_utils::STALL_TIMEOUT,
);
log::info!(
target: "bridge",
"[{}] Starting on-demand-parachains relay task\n\t\
Tx mortality: {:?} (~{}m)\n\t\
Stall timeout: {:?}",
relay_task_name,
target_transactions_mortality,
stall_timeout.as_secs_f64() / 60.0f64,
stall_timeout,
);
parachains_relay_task.set(
parachains_relay::parachains_loop::run(
parachains_source.clone(),
parachains_target.clone(),
MetricsParams::disabled(),
false,
futures::future::pending(),
)
.fuse(),
);
restart_relay = false;
}
}
}
fn on_demand_parachains_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
format!("{}-to-{}-on-demand-parachain", SourceChain::NAME, TargetChain::NAME)
}
#[derive(Clone, Copy, Debug, PartialEq)]
enum RelayState<ParaHash, ParaNumber, RelayNumber> {
Idle,
RelayingRelayHeader(RelayNumber),
RelayingParaHeader(HeaderId<ParaHash, ParaNumber>),
}
#[derive(Debug)]
struct RelayData<ParaHash, ParaNumber, RelayNumber> {
pub required_para_header: ParaNumber,
pub para_header_at_target: Option<ParaNumber>,
pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
pub para_header_at_relay_header_at_target: Option<HeaderId<ParaHash, ParaNumber>>,
pub relay_header_at_source: RelayNumber,
pub relay_header_at_target: Option<RelayNumber>,
}
async fn read_relay_data<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>(
source: &ParachainsSource<P, SourceRelayClnt>,
target: &ParachainsTarget<P, SourceRelayClnt, TargetClnt>,
required_header_number: BlockNumberOf<P::SourceParachain>,
) -> Result<
RelayData<
HashOf<P::SourceParachain>,
BlockNumberOf<P::SourceParachain>,
BlockNumberOf<P::SourceRelayChain>,
>,
FailedClient,
>
where
SourceRelayClnt: Client<P::SourceRelayChain>,
TargetClnt: Client<P::TargetChain>,
ParachainsTarget<P, SourceRelayClnt, TargetClnt>:
TargetClient<ParachainsPipelineAdapter<P>> + RelayClient<Error = SubstrateError>,
{
let map_target_err = |e| {
log::error!(
target: "bridge",
"[{}] Failed to read relay data from {} client: {:?}",
on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
P::TargetChain::NAME,
e,
);
FailedClient::Target
};
let map_source_err = |e| {
log::error!(
target: "bridge",
"[{}] Failed to read relay data from {} client: {:?}",
on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
P::SourceRelayChain::NAME,
e,
);
FailedClient::Source
};
let best_target_block_hash = target.best_block().await.map_err(map_target_err)?.1;
let para_header_at_target = best_finalized_peer_header_at_self::<
P::TargetChain,
P::SourceParachain,
>(target.target_client(), best_target_block_hash)
.await;
let para_header_at_target = match para_header_at_target {
Ok(Some(para_header_at_target)) => Some(para_header_at_target.0),
Ok(None) => None,
Err(e) => return Err(map_target_err(e)),
};
let best_finalized_relay_header =
source.client().best_finalized_header().await.map_err(map_source_err)?;
let best_finalized_relay_block_id = best_finalized_relay_header.id();
let para_header_at_source = source
.on_chain_para_head_id(best_finalized_relay_block_id)
.await
.map_err(map_source_err)?;
let relay_header_at_source = best_finalized_relay_block_id.0;
let relay_header_at_target = best_finalized_peer_header_at_self::<
P::TargetChain,
P::SourceRelayChain,
>(target.target_client(), best_target_block_hash)
.await
.map_err(map_target_err)?;
let available_relay_header_at_target =
relay_header_at_target.filter(|relay_header_at_target| {
!is_ancient_block(relay_header_at_target.number(), relay_header_at_source)
});
let para_header_at_relay_header_at_target =
if let Some(available_relay_header_at_target) = available_relay_header_at_target {
source
.on_chain_para_head_id(available_relay_header_at_target)
.await
.map_err(map_source_err)?
} else {
None
};
Ok(RelayData {
required_para_header: required_header_number,
para_header_at_target,
para_header_at_source,
relay_header_at_source,
relay_header_at_target: relay_header_at_target
.map(|relay_header_at_target| relay_header_at_target.0),
para_header_at_relay_header_at_target,
})
}
fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
data: &RelayData<ParaHash, ParaNumber, RelayNumber>,
state: RelayState<ParaHash, ParaNumber, RelayNumber>,
) -> RelayState<ParaHash, ParaNumber, RelayNumber>
where
ParaHash: Clone,
ParaNumber: Copy + PartialOrd + Zero,
RelayNumber: Copy + Debug + Ord,
{
let relay_header_at_target = match data.relay_header_at_target {
Some(relay_header_at_target) => relay_header_at_target,
None => return RelayState::Idle,
};
if let &RelayState::RelayingRelayHeader(relay_header_number) = &state {
if relay_header_at_target < relay_header_number {
return state
}
if let Some(para_header_at_relay_header_at_target) =
data.para_header_at_relay_header_at_target.as_ref()
{
return RelayState::RelayingParaHeader(para_header_at_relay_header_at_target.clone())
}
}
if let RelayState::RelayingParaHeader(para_header_id) = &state {
let para_header_at_target_or_zero = data.para_header_at_target.unwrap_or_else(Zero::zero);
if para_header_at_target_or_zero < para_header_id.0 {
return state
}
}
let para_header_at_source = match data.para_header_at_source {
Some(ref para_header_at_source) => para_header_at_source.clone(),
None => return RelayState::Idle,
};
let (required_para_header, para_header_at_target) = match data.para_header_at_target {
Some(para_header_at_target) => (data.required_para_header, para_header_at_target),
None => (para_header_at_source.0, Zero::zero()),
};
if required_para_header <= para_header_at_target {
return RelayState::Idle
}
if required_para_header > para_header_at_source.0 {
return RelayState::Idle
}
if relay_header_at_target < data.relay_header_at_source {
return RelayState::RelayingRelayHeader(data.relay_header_at_source)
}
RelayState::RelayingParaHeader(para_header_at_source)
}
#[async_trait]
trait SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH> {
fn parachain_id(&self) -> ParaId;
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderId<RBH, RBN>,
) -> Result<Option<HeaderId<PBH, PBN>>, SubstrateError>;
}
#[async_trait]
impl<'a, P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
SelectHeadersToProveEnvironment<
BlockNumberOf<P::SourceRelayChain>,
HashOf<P::SourceRelayChain>,
BlockNumberOf<P::SourceParachain>,
HashOf<P::SourceParachain>,
>
for (
&'a OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>,
&'a ParachainsSource<P, SourceRelayClnt>,
)
where
SourceRelayClnt: Client<P::SourceRelayChain>,
TargetClnt: Client<P::TargetChain>,
{
fn parachain_id(&self) -> ParaId {
ParaId(P::SourceParachain::PARACHAIN_ID)
}
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
Ok(self.0.source_relay_client.best_finalized_header().await?.id())
}
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
Ok(crate::messages::source::read_client_state::<P::TargetChain, P::SourceRelayChain>(
&self.0.target_client,
)
.await?
.best_finalized_peer_at_best_self
.ok_or(SubstrateError::BridgePalletIsNotInitialized)?)
}
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
self.1.on_chain_para_head_id(at_relay_block).await
}
}
async fn select_headers_to_prove<RBN, RBH, PBN, PBH>(
env: impl SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH>,
required_parachain_header: PBN,
) -> Result<(bool, HeaderId<RBH, RBN>, HeaderId<PBH, PBN>), SubstrateError>
where
RBH: Copy,
RBN: BlockNumberBase,
PBH: Copy,
PBN: BlockNumberBase,
{
let best_finalized_relay_block_at_source = env.best_finalized_relay_block_at_source().await?;
let best_finalized_relay_block_at_target = env.best_finalized_relay_block_at_target().await?;
let best_possible_parachain_block = env
.best_finalized_para_block_at_source(best_finalized_relay_block_at_source)
.await?
.filter(|best_possible_parachain_block| {
best_possible_parachain_block.number() >= required_parachain_header
})
.ok_or(SubstrateError::MissingRequiredParachainHead(
env.parachain_id(),
required_parachain_header.unique_saturated_into(),
))?;
let may_use_state_at_best_finalized_relay_block_at_target = !is_ancient_block(
best_finalized_relay_block_at_target.number(),
best_finalized_relay_block_at_source.number(),
);
let selection = if may_use_state_at_best_finalized_relay_block_at_target {
env.best_finalized_para_block_at_source(best_finalized_relay_block_at_target)
.await?
.filter(|best_finalized_para_block_at_target| {
best_finalized_para_block_at_target.number() >= required_parachain_header
})
.map(|best_finalized_para_block_at_target| {
(false, best_finalized_relay_block_at_target, best_finalized_para_block_at_target)
})
} else {
None
};
Ok(selection.unwrap_or((
true,
best_finalized_relay_block_at_source,
best_possible_parachain_block,
)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn relay_waits_for_relay_header_to_be_delivered() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(700),
para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
},
RelayState::RelayingRelayHeader(750),
),
RelayState::RelayingRelayHeader(750),
);
}
#[test]
fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(750),
para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
},
RelayState::RelayingRelayHeader(750),
),
RelayState::RelayingParaHeader(HeaderId(100, 100)),
);
}
#[test]
fn relay_selects_better_para_header_after_better_relay_header_is_delivered() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::RelayingRelayHeader(750),
),
RelayState::RelayingParaHeader(HeaderId(105, 105)),
);
}
#[test]
fn relay_waits_for_para_header_to_be_delivered() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: Some(50),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::RelayingParaHeader(HeaderId(105, 105)),
),
RelayState::RelayingParaHeader(HeaderId(105, 105)),
);
}
#[test]
fn relay_stays_idle_if_required_para_header_is_already_delivered() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 90,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::Idle,
),
RelayState::Idle,
);
}
#[test]
fn relay_waits_for_required_para_header_to_appear_at_source_1() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: Some(105),
para_header_at_source: None,
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::Idle,
),
RelayState::Idle,
);
}
#[test]
fn relay_waits_for_required_para_header_to_appear_at_source_2() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(110, 110)),
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::Idle,
),
RelayState::Idle,
);
}
#[test]
fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: Some(780),
para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
},
RelayState::Idle,
),
RelayState::RelayingRelayHeader(800),
);
}
#[test]
fn relay_starts_relaying_para_header_when_new_para_header_is_requested() {
assert_eq!(
select_headers_to_relay(
&RelayData {
required_para_header: 120,
para_header_at_target: Some(105),
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: Some(800),
para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
},
RelayState::Idle,
),
RelayState::RelayingParaHeader(HeaderId(125, 125)),
);
}
#[test]
fn relay_goes_idle_when_parachain_is_deregistered() {
assert_eq!(
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 120,
para_header_at_target: Some(105),
para_header_at_source: None,
relay_header_at_source: 800,
relay_header_at_target: Some(800),
para_header_at_relay_header_at_target: None,
},
RelayState::RelayingRelayHeader(800),
),
RelayState::Idle,
);
}
#[test]
fn relay_starts_relaying_first_parachain_header() {
assert_eq!(
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 0,
para_header_at_target: None,
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: Some(800),
para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
},
RelayState::Idle,
),
RelayState::RelayingParaHeader(HeaderId(125, 125)),
);
}
#[test]
fn relay_starts_relaying_relay_header_to_relay_first_parachain_header() {
assert_eq!(
select_headers_to_relay::<i32, _, _>(
&RelayData {
required_para_header: 0,
para_header_at_target: None,
para_header_at_source: Some(HeaderId(125, 125)),
relay_header_at_source: 800,
relay_header_at_target: Some(700),
para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
},
RelayState::Idle,
),
RelayState::RelayingRelayHeader(800),
);
}
#[async_trait]
impl SelectHeadersToProveEnvironment<u32, u32, u32, u32> for (u32, u32, u32, u32) {
fn parachain_id(&self) -> ParaId {
ParaId(0)
}
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderId<u32, u32>, SubstrateError> {
Ok(HeaderId(self.0, self.0))
}
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderId<u32, u32>, SubstrateError> {
Ok(HeaderId(self.1, self.1))
}
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderId<u32, u32>,
) -> Result<Option<HeaderId<u32, u32>>, SubstrateError> {
if at_relay_block.0 == self.0 {
Ok(Some(HeaderId(self.2, self.2)))
} else if at_relay_block.0 == self.1 {
Ok(Some(HeaderId(self.3, self.3)))
} else {
Ok(None)
}
}
}
#[async_std::test]
async fn select_headers_to_prove_returns_err_if_required_para_block_is_missing_at_source() {
assert!(matches!(
select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 300_u32,).await,
Err(SubstrateError::MissingRequiredParachainHead(ParaId(0), 300_u64)),
));
}
#[async_std::test]
async fn select_headers_to_prove_fails_to_use_existing_ancient_relay_block() {
assert_eq!(
select_headers_to_prove((220_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
.await
.map_err(drop),
Ok((true, HeaderId(220, 220), HeaderId(200, 200))),
);
}
#[async_std::test]
async fn select_headers_to_prove_is_able_to_use_existing_recent_relay_block() {
assert_eq!(
select_headers_to_prove((40_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
.await
.map_err(drop),
Ok((false, HeaderId(10, 10), HeaderId(100, 100))),
);
}
#[async_std::test]
async fn select_headers_to_prove_uses_new_relay_block() {
assert_eq!(
select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 200_u32,)
.await
.map_err(drop),
Ok((true, HeaderId(20, 20), HeaderId(200, 200))),
);
}
}