use crate::{
finality_base::best_synced_header_id,
messages::{
BatchProofTransaction, MessageLaneAdapter, ReceiveMessagesDeliveryProofCallBuilder,
SubstrateMessageLane,
},
on_demand::OnDemandRelay,
proofs::to_raw_storage_proof,
TransactionParams,
};
use async_std::sync::Arc;
use async_trait::async_trait;
use bp_messages::{
storage_keys::{operating_mode_key, outbound_lane_data_key},
target_chain::FromBridgedChainMessagesProof,
ChainWithMessages as _, InboundMessageDetails, MessageNonce, MessagePayload,
MessagesOperatingMode, OutboundMessageDetails,
};
use bp_runtime::{BasicOperatingMode, HeaderIdProvider, RangeInclusiveExt};
use codec::{Decode, Encode};
use frame_support::weights::Weight;
use messages_relay::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{
ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters, SourceClient,
SourceClientState,
},
};
use num_traits::Zero;
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, TransactionEra, TransactionTracker,
UnsignedTransaction,
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_core::Pair;
use std::ops::RangeInclusive;
pub type SubstrateMessagesProof<C, L> = (Weight, FromBridgedChainMessagesProof<HashOf<C>, L>);
type MessagesToRefine<'a> = Vec<(MessagePayload, &'a mut OutboundMessageDetails)>;
#[derive(Decode)]
struct LegacyOutboundLaneData {
#[allow(unused)]
oldest_unpruned_nonce: MessageNonce,
latest_received_nonce: MessageNonce,
latest_generated_nonce: MessageNonce,
}
pub struct SubstrateMessagesSource<P: SubstrateMessageLane, SourceClnt, TargetClnt> {
source_client: SourceClnt,
target_client: TargetClnt,
lane_id: P::LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceChain>>,
target_to_source_headers_relay: Option<Arc<dyn OnDemandRelay<P::TargetChain, P::SourceChain>>>,
}
impl<P: SubstrateMessageLane, SourceClnt: Client<P::SourceChain>, TargetClnt>
SubstrateMessagesSource<P, SourceClnt, TargetClnt>
{
pub fn new(
source_client: SourceClnt,
target_client: TargetClnt,
lane_id: P::LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceChain>>,
target_to_source_headers_relay: Option<
Arc<dyn OnDemandRelay<P::TargetChain, P::SourceChain>>,
>,
) -> Self {
SubstrateMessagesSource {
source_client,
target_client,
lane_id,
transaction_params,
target_to_source_headers_relay,
}
}
async fn outbound_lane_data(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<LegacyOutboundLaneData>, SubstrateError> {
self.source_client
.storage_value(
id.hash(),
outbound_lane_data_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
),
)
.await
}
async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> {
ensure_messages_pallet_active::<P::SourceChain, P::TargetChain, _>(&self.source_client)
.await
}
}
impl<P: SubstrateMessageLane, SourceClnt: Clone, TargetClnt: Clone> Clone
for SubstrateMessagesSource<P, SourceClnt, TargetClnt>
{
fn clone(&self) -> Self {
Self {
source_client: self.source_client.clone(),
target_client: self.target_client.clone(),
lane_id: self.lane_id,
transaction_params: self.transaction_params.clone(),
target_to_source_headers_relay: self.target_to_source_headers_relay.clone(),
}
}
}
#[async_trait]
impl<
P: SubstrateMessageLane,
SourceClnt: Client<P::SourceChain>,
TargetClnt: Client<P::TargetChain>,
> RelayClient for SubstrateMessagesSource<P, SourceClnt, TargetClnt>
{
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.source_client.reconnect().await?;
self.target_client.reconnect().await?;
if let Some(ref mut target_to_source_headers_relay) = self.target_to_source_headers_relay {
target_to_source_headers_relay.reconnect().await?;
}
Ok(())
}
}
#[async_trait]
impl<
P: SubstrateMessageLane,
SourceClnt: Client<P::SourceChain>,
TargetClnt: Client<P::TargetChain>,
> SourceClient<MessageLaneAdapter<P>> for SubstrateMessagesSource<P, SourceClnt, TargetClnt>
where
AccountIdOf<P::SourceChain>: From<<AccountKeyPairOf<P::SourceChain> as Pair>::Public>,
{
type BatchTransaction =
BatchProofTransaction<P::SourceChain, P::TargetChain, P::SourceBatchCallBuilder>;
type TransactionTracker = TransactionTracker<P::SourceChain, SourceClnt>;
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
self.source_client.ensure_synced().await?;
self.target_client.ensure_synced().await?;
self.ensure_pallet_active().await?;
read_client_state_from_both_chains(&self.source_client, &self.target_client).await
}
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<(SourceHeaderIdOf<MessageLaneAdapter<P>>, MessageNonce), SubstrateError> {
let latest_generated_nonce = self
.outbound_lane_data(id)
.await?
.map(|data| data.latest_generated_nonce)
.unwrap_or(0);
Ok((id, latest_generated_nonce))
}
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<(SourceHeaderIdOf<MessageLaneAdapter<P>>, MessageNonce), SubstrateError> {
let latest_received_nonce = self
.outbound_lane_data(id)
.await?
.map(|data| data.latest_received_nonce)
.unwrap_or(0);
Ok((id, latest_received_nonce))
}
async fn generated_message_details(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
nonces: RangeInclusive<MessageNonce>,
) -> Result<MessageDetailsMap<BalanceOf<P::SourceChain>>, SubstrateError> {
let mut out_msgs_details: Vec<_> = self
.source_client
.state_call::<_, Vec<_>>(
id.hash(),
P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(),
(self.lane_id, *nonces.start(), *nonces.end()),
)
.await?;
validate_out_msgs_details::<P::SourceChain>(&out_msgs_details, nonces)?;
let mut msgs_to_refine = vec![];
for out_msg_details in out_msgs_details.iter_mut() {
let msg_key = bp_messages::storage_keys::message_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
out_msg_details.nonce,
);
let msg_payload: MessagePayload =
self.source_client.storage_value(id.hash(), msg_key).await?.ok_or_else(|| {
SubstrateError::Custom(format!(
"Message to {} {:?}/{} is missing from runtime the storage of {} at {:?}",
P::TargetChain::NAME,
self.lane_id,
out_msg_details.nonce,
P::SourceChain::NAME,
id,
))
})?;
msgs_to_refine.push((msg_payload, out_msg_details));
}
let best_target_header_hash = self.target_client.best_header_hash().await?;
for mut msgs_to_refine_batch in split_msgs_to_refine::<
P::SourceChain,
P::TargetChain,
P::LaneId,
>(self.lane_id, msgs_to_refine)?
{
let in_msgs_details = self
.target_client
.state_call::<_, Vec<InboundMessageDetails>>(
best_target_header_hash,
P::SourceChain::FROM_CHAIN_MESSAGE_DETAILS_METHOD.into(),
(self.lane_id, &msgs_to_refine_batch),
)
.await?;
if in_msgs_details.len() != msgs_to_refine_batch.len() {
return Err(SubstrateError::Custom(format!(
"Call of {} at {} has returned {} entries instead of expected {}",
P::SourceChain::FROM_CHAIN_MESSAGE_DETAILS_METHOD,
P::TargetChain::NAME,
in_msgs_details.len(),
msgs_to_refine_batch.len(),
)))
}
for ((_, out_msg_details), in_msg_details) in
msgs_to_refine_batch.iter_mut().zip(in_msgs_details)
{
log::trace!(
target: "bridge",
"Refined weight of {}->{} message {:?}/{}: at-source: {}, at-target: {}",
P::SourceChain::NAME,
P::TargetChain::NAME,
self.lane_id,
out_msg_details.nonce,
out_msg_details.dispatch_weight,
in_msg_details.dispatch_weight,
);
out_msg_details.dispatch_weight = in_msg_details.dispatch_weight;
}
}
let mut msgs_details_map = MessageDetailsMap::new();
for out_msg_details in out_msgs_details {
msgs_details_map.insert(
out_msg_details.nonce,
MessageDetails {
dispatch_weight: out_msg_details.dispatch_weight,
size: out_msg_details.size as _,
reward: Zero::zero(),
},
);
}
Ok(msgs_details_map)
}
async fn prove_messages(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
nonces: RangeInclusive<MessageNonce>,
proof_parameters: MessageProofParameters,
) -> Result<
(
SourceHeaderIdOf<MessageLaneAdapter<P>>,
RangeInclusive<MessageNonce>,
<MessageLaneAdapter<P> as MessageLane>::MessagesProof,
),
SubstrateError,
> {
let mut storage_keys = Vec::with_capacity(nonces.saturating_len() as usize);
for message_nonce in nonces.clone() {
let message_key = bp_messages::storage_keys::message_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
message_nonce,
);
storage_keys.push(message_key);
}
if proof_parameters.outbound_state_proof_required {
storage_keys.push(outbound_lane_data_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
));
}
let storage_proof =
self.source_client.prove_storage(id.hash(), storage_keys.clone()).await?;
let proof = FromBridgedChainMessagesProof {
bridged_header_hash: id.1,
storage_proof: to_raw_storage_proof::<P::SourceChain>(storage_proof),
lane: self.lane_id,
nonces_start: *nonces.start(),
nonces_end: *nonces.end(),
};
Ok((id, nonces, (proof_parameters.dispatch_weight, proof)))
}
async fn submit_messages_receiving_proof(
&self,
maybe_batch_tx: Option<Self::BatchTransaction>,
_generated_at_block: TargetHeaderIdOf<MessageLaneAdapter<P>>,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof,
) -> Result<Self::TransactionTracker, SubstrateError> {
let messages_proof_call =
P::ReceiveMessagesDeliveryProofCallBuilder::build_receive_messages_delivery_proof_call(
proof,
maybe_batch_tx.is_none(),
);
let final_call = match maybe_batch_tx {
Some(batch_tx) => batch_tx.append_call_and_build(messages_proof_call),
None => messages_proof_call,
};
let transaction_params = self.transaction_params.clone();
self.source_client
.submit_and_watch_signed_extrinsic(
&self.transaction_params.signer,
move |best_block_id, transaction_nonce| {
Ok(UnsignedTransaction::new(final_call.into(), transaction_nonce)
.era(TransactionEra::new(best_block_id, transaction_params.mortality)))
},
)
.await
}
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<Self::BatchTransaction>, SubstrateError> {
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
if let Some(batch_tx) =
BatchProofTransaction::new(target_to_source_headers_relay.clone(), id.0).await?
{
return Ok(Some(batch_tx))
}
target_to_source_headers_relay.require_more_headers(id.0).await;
}
Ok(None)
}
}
pub(crate) async fn ensure_messages_pallet_active<AtChain, WithChain, AtChainClient>(
client: &AtChainClient,
) -> Result<(), SubstrateError>
where
AtChain: ChainWithMessages,
WithChain: ChainWithMessages,
AtChainClient: Client<AtChain>,
{
let operating_mode = client
.storage_value(
client.best_header_hash().await?,
operating_mode_key(WithChain::WITH_CHAIN_MESSAGES_PALLET_NAME),
)
.await?;
let is_halted =
operating_mode == Some(MessagesOperatingMode::Basic(BasicOperatingMode::Halted));
if is_halted {
Err(SubstrateError::BridgePalletIsHalted)
} else {
Ok(())
}
}
pub async fn read_client_state<SelfChain, PeerChain>(
self_client: &impl Client<SelfChain>,
) -> Result<ClientState<HeaderIdOf<SelfChain>, HeaderIdOf<PeerChain>>, SubstrateError>
where
SelfChain: Chain,
PeerChain: Chain,
{
let self_best_finalized_id = self_client.best_finalized_header().await?.id();
let self_best_id = self_client.best_header().await?.id();
let peer_on_self_best_finalized_id =
best_synced_header_id::<PeerChain, SelfChain>(self_client, self_best_id.hash()).await?;
Ok(ClientState {
best_self: self_best_id,
best_finalized_self: self_best_finalized_id,
best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
actual_best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
})
}
pub async fn read_client_state_from_both_chains<SelfChain, PeerChain>(
self_client: &impl Client<SelfChain>,
peer_client: &impl Client<PeerChain>,
) -> Result<ClientState<HeaderIdOf<SelfChain>, HeaderIdOf<PeerChain>>, SubstrateError>
where
SelfChain: Chain,
PeerChain: Chain,
{
let mut client_state = read_client_state::<SelfChain, PeerChain>(self_client).await?;
client_state.actual_best_finalized_peer_at_best_self =
match client_state.best_finalized_peer_at_best_self.as_ref() {
Some(peer_on_self_best_finalized_id) => {
let actual_peer_on_self_best_finalized =
peer_client.header_by_number(peer_on_self_best_finalized_id.number()).await?;
Some(actual_peer_on_self_best_finalized.id())
},
_ => client_state.best_finalized_peer_at_best_self,
};
Ok(client_state)
}
pub async fn best_finalized_peer_header_at_self<SelfChain, PeerChain>(
self_client: &impl Client<SelfChain>,
at_self_hash: HashOf<SelfChain>,
) -> Result<Option<HeaderIdOf<PeerChain>>, SubstrateError>
where
SelfChain: Chain,
PeerChain: Chain,
{
self_client
.state_call::<_, Option<_>>(
at_self_hash,
PeerChain::BEST_FINALIZED_HEADER_ID_METHOD.into(),
(),
)
.await
}
fn validate_out_msgs_details<C: Chain>(
out_msgs_details: &[OutboundMessageDetails],
nonces: RangeInclusive<MessageNonce>,
) -> Result<(), SubstrateError> {
let make_missing_nonce_error = |expected_nonce| {
Err(SubstrateError::Custom(format!(
"Missing nonce {expected_nonce} in message_details call result. Expected all nonces from {nonces:?}",
)))
};
if out_msgs_details.len() > nonces.clone().count() {
return Err(SubstrateError::Custom(
"More messages than requested returned by the message_details call.".into(),
))
}
if out_msgs_details.is_empty() && !nonces.is_empty() {
return make_missing_nonce_error(*nonces.end())
}
let mut nonces_iter = nonces.clone().rev().peekable();
let mut out_msgs_details_iter = out_msgs_details.iter().rev();
while let Some((out_msg_details, &nonce)) = out_msgs_details_iter.next().zip(nonces_iter.peek())
{
nonces_iter.next();
if out_msg_details.nonce != nonce {
return make_missing_nonce_error(nonce)
}
}
if nonces_iter.peek().is_some() {
log::info!(
target: "bridge",
"Some messages are missing from the {} node: {:?}. Target node may be out of sync?",
C::NAME,
nonces_iter.rev().collect::<Vec<_>>(),
);
}
Ok(())
}
fn split_msgs_to_refine<Source: Chain + ChainWithMessages, Target: Chain, LaneId: Encode + Copy>(
lane_id: LaneId,
msgs_to_refine: MessagesToRefine,
) -> Result<Vec<MessagesToRefine>, SubstrateError> {
let max_batch_size = Target::max_extrinsic_size() as usize;
let mut batches = vec![];
let mut current_msgs_batch = msgs_to_refine;
while !current_msgs_batch.is_empty() {
let mut next_msgs_batch = vec![];
while (lane_id, ¤t_msgs_batch).encoded_size() > max_batch_size {
if current_msgs_batch.len() <= 1 {
return Err(SubstrateError::Custom(format!(
"Call of {} at {} can't be executed even if only one message is supplied. \
max_extrinsic_size(): {}",
Source::FROM_CHAIN_MESSAGE_DETAILS_METHOD,
Target::NAME,
Target::max_extrinsic_size(),
)))
}
if let Some(msg) = current_msgs_batch.pop() {
next_msgs_batch.insert(0, msg);
}
}
batches.push(current_msgs_batch);
current_msgs_batch = next_msgs_batch;
}
Ok(batches)
}
#[cfg(test)]
mod tests {
use super::*;
use bp_messages::{HashedLaneId, LaneIdType};
use relay_substrate_client::test_chain::TestChain;
type TestLaneIdType = HashedLaneId;
fn message_details_from_rpc(
nonces: RangeInclusive<MessageNonce>,
) -> Vec<OutboundMessageDetails> {
nonces
.into_iter()
.map(|nonce| bp_messages::OutboundMessageDetails {
nonce,
dispatch_weight: Weight::zero(),
size: 0,
})
.collect()
}
#[test]
fn validate_out_msgs_details_succeeds_if_no_messages_are_missing() {
assert!(validate_out_msgs_details::<TestChain>(&message_details_from_rpc(1..=3), 1..=3,)
.is_ok());
}
#[test]
fn validate_out_msgs_details_succeeds_if_head_messages_are_missing() {
assert!(validate_out_msgs_details::<TestChain>(&message_details_from_rpc(2..=3), 1..=3,)
.is_ok())
}
#[test]
fn validate_out_msgs_details_fails_if_mid_messages_are_missing() {
let mut message_details_from_rpc = message_details_from_rpc(1..=3);
message_details_from_rpc.remove(1);
assert!(matches!(
validate_out_msgs_details::<TestChain>(&message_details_from_rpc, 1..=3,),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn validate_out_msgs_details_map_fails_if_tail_messages_are_missing() {
assert!(matches!(
validate_out_msgs_details::<TestChain>(&message_details_from_rpc(1..=2), 1..=3,),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn validate_out_msgs_details_fails_if_all_messages_are_missing() {
assert!(matches!(
validate_out_msgs_details::<TestChain>(&[], 1..=3),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn validate_out_msgs_details_fails_if_more_messages_than_nonces() {
assert!(matches!(
validate_out_msgs_details::<TestChain>(&message_details_from_rpc(1..=5), 2..=5,),
Err(SubstrateError::Custom(_))
));
}
fn check_split_msgs_to_refine(
payload_sizes: Vec<usize>,
expected_batches: Result<Vec<usize>, ()>,
) {
let mut out_msgs_details = vec![];
for (idx, _) in payload_sizes.iter().enumerate() {
out_msgs_details.push(OutboundMessageDetails {
nonce: idx as MessageNonce,
dispatch_weight: Weight::zero(),
size: 0,
});
}
let mut msgs_to_refine = vec![];
for (&payload_size, out_msg_details) in
payload_sizes.iter().zip(out_msgs_details.iter_mut())
{
let payload = vec![1u8; payload_size];
msgs_to_refine.push((payload, out_msg_details));
}
let maybe_batches = split_msgs_to_refine::<TestChain, TestChain, TestLaneIdType>(
TestLaneIdType::try_new(1, 2).unwrap(),
msgs_to_refine,
);
match expected_batches {
Ok(expected_batches) => {
let batches = maybe_batches.unwrap();
let mut idx = 0;
assert_eq!(batches.len(), expected_batches.len());
for (batch, &expected_batch_size) in batches.iter().zip(expected_batches.iter()) {
assert_eq!(batch.len(), expected_batch_size);
for msg_to_refine in batch {
assert_eq!(msg_to_refine.0.len(), payload_sizes[idx]);
idx += 1;
}
}
},
Err(_) => {
matches!(maybe_batches, Err(SubstrateError::Custom(_)));
},
}
}
#[test]
fn test_split_msgs_to_refine() {
let max_extrinsic_size = 100000;
check_split_msgs_to_refine(vec![max_extrinsic_size], Err(()));
check_split_msgs_to_refine(vec![50, 100, max_extrinsic_size, 200], Err(()));
check_split_msgs_to_refine(vec![100, 200, 300, 400], Ok(vec![4]));
check_split_msgs_to_refine(
vec![
50,
100,
max_extrinsic_size - 500,
500,
1000,
1500,
max_extrinsic_size - 3500,
5000,
10000,
],
Ok(vec![3, 4, 2]),
);
check_split_msgs_to_refine(
vec![
50,
100,
max_extrinsic_size - 150,
500,
1000,
1500,
max_extrinsic_size - 3000,
5000,
10000,
],
Ok(vec![2, 1, 3, 1, 2]),
);
check_split_msgs_to_refine(
vec![
5000,
10000,
max_extrinsic_size - 3500,
500,
1000,
1500,
max_extrinsic_size - 500,
50,
100,
],
Ok(vec![2, 4, 3]),
);
}
#[test]
fn outbound_lane_data_wrapper_is_compatible() {
let bytes_without_state =
vec![1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0];
let bytes_with_state = {
let mut b = bytes_without_state.clone();
b.push(0);
b
};
let full = bp_messages::OutboundLaneData {
oldest_unpruned_nonce: 1,
latest_received_nonce: 2,
latest_generated_nonce: 3,
state: bp_messages::LaneState::Opened,
};
assert_eq!(full.encode(), bytes_with_state);
assert_ne!(full.encode(), bytes_without_state);
let decoded: LegacyOutboundLaneData = Decode::decode(&mut &bytes_with_state[..]).unwrap();
assert_eq!(full.oldest_unpruned_nonce, decoded.oldest_unpruned_nonce);
assert_eq!(full.latest_received_nonce, decoded.latest_received_nonce);
assert_eq!(full.latest_generated_nonce, decoded.latest_generated_nonce);
let decoded: LegacyOutboundLaneData =
Decode::decode(&mut &bytes_without_state[..]).unwrap();
assert_eq!(full.oldest_unpruned_nonce, decoded.oldest_unpruned_nonce);
assert_eq!(full.latest_received_nonce, decoded.latest_received_nonce);
assert_eq!(full.latest_generated_nonce, decoded.latest_generated_nonce);
}
}