use crate::{Config, Pallet, LOG_TARGET};
use crate::{BridgeOf, Bridges};
use bp_messages::{
source_chain::{MessagesBridge, OnMessagesDelivered},
MessageNonce,
};
use bp_xcm_bridge_hub::{BridgeId, BridgeState, LocalXcmChannelManager, XcmAsPlainPayload};
use frame_support::{ensure, traits::Get};
use pallet_bridge_messages::{
Config as BridgeMessagesConfig, Error, Pallet as BridgeMessagesPallet,
};
use xcm::prelude::*;
use xcm_builder::{HaulBlob, HaulBlobError, HaulBlobExporter};
use xcm_executor::traits::ExportXcm;
const OUTBOUND_LANE_CONGESTED_THRESHOLD: MessageNonce = 8_192;
const OUTBOUND_LANE_UNCONGESTED_THRESHOLD: MessageNonce = 1_024;
pub type PalletAsHaulBlobExporter<T, I> = HaulBlobExporter<
DummyHaulBlob,
<T as Config<I>>::BridgedNetwork,
<T as Config<I>>::DestinationVersion,
<T as Config<I>>::MessageExportPrice,
>;
type MessagesPallet<T, I> = BridgeMessagesPallet<T, <T as Config<I>>::BridgeMessagesPalletInstance>;
impl<T: Config<I>, I: 'static> ExportXcm for Pallet<T, I>
where
T: BridgeMessagesConfig<T::BridgeMessagesPalletInstance, OutboundPayload = XcmAsPlainPayload>,
{
type Ticket = (
BridgeId,
BridgeOf<T, I>,
<MessagesPallet<T, I> as MessagesBridge<T::OutboundPayload, T::LaneId>>::SendMessageArgs,
XcmHash,
);
fn validate(
network: NetworkId,
channel: u32,
universal_source: &mut Option<InteriorLocation>,
destination: &mut Option<InteriorLocation>,
message: &mut Option<Xcm<()>>,
) -> Result<(Self::Ticket, Assets), SendError> {
log::trace!(
target: LOG_TARGET,
"Validate for network: {network:?}, channel: {channel:?}, universal_source: {universal_source:?}, destination: {destination:?}"
);
let bridge_origin_universal_location =
universal_source.clone().take().ok_or(SendError::MissingArgument)?;
let bridge_destination_universal_location = {
let dest = destination.clone().take().ok_or(SendError::MissingArgument)?;
match dest.global_consensus() {
Ok(dest_network) => {
log::trace!(
target: LOG_TARGET,
"Destination: {dest:?} is already universal, checking dest_network: {dest_network:?} and network: {network:?} if matches: {:?}",
dest_network == network
);
ensure!(dest_network == network, SendError::NotApplicable);
dest
},
Err(_) => {
dest.pushed_front_with(GlobalConsensus(network)).map_err(|error_data| {
log::error!(
target: LOG_TARGET,
"Destination: {:?} is not a universal and prepending with {:?} failed!",
error_data.0,
error_data.1,
);
SendError::NotApplicable
})?
},
}
};
let bridge_origin_relative_location =
bridge_origin_universal_location.relative_to(&T::UniversalLocation::get());
let locations = Self::bridge_locations(
bridge_origin_relative_location,
bridge_destination_universal_location.into(),
)
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"Validate `bridge_locations` with error: {e:?}",
);
SendError::NotApplicable
})?;
let bridge = Self::bridge(locations.bridge_id()).ok_or_else(|| {
log::error!(
target: LOG_TARGET,
"No opened bridge for requested bridge_origin_relative_location: {:?} and bridge_destination_universal_location: {:?}",
locations.bridge_origin_relative_location(),
locations.bridge_destination_universal_location(),
);
SendError::NotApplicable
})?;
let ((blob, id), price) = PalletAsHaulBlobExporter::<T, I>::validate(
network,
channel,
universal_source,
destination,
message,
)?;
let bridge_message = MessagesPallet::<T, I>::validate_message(bridge.lane_id, &blob)
.map_err(|e| {
match e {
Error::LanesManager(ref ei) =>
log::error!(target: LOG_TARGET, "LanesManager: {ei:?}"),
Error::MessageRejectedByPallet(ref ei) =>
log::error!(target: LOG_TARGET, "MessageRejectedByPallet: {ei:?}"),
Error::ReceptionConfirmation(ref ei) =>
log::error!(target: LOG_TARGET, "ReceptionConfirmation: {ei:?}"),
_ => (),
};
log::error!(
target: LOG_TARGET,
"XCM message {:?} cannot be exported because of bridge error: {:?} on bridge {:?} and laneId: {:?}",
id,
e,
locations,
bridge.lane_id,
);
SendError::Transport("BridgeValidateError")
})?;
Ok(((*locations.bridge_id(), bridge, bridge_message, id), price))
}
fn deliver(
(bridge_id, bridge, bridge_message, id): Self::Ticket,
) -> Result<XcmHash, SendError> {
let artifacts = MessagesPallet::<T, I>::send_message(bridge_message);
log::info!(
target: LOG_TARGET,
"XCM message {:?} has been enqueued at bridge {:?} and lane_id: {:?} with nonce {}",
id,
bridge_id,
bridge.lane_id,
artifacts.nonce,
);
Self::on_bridge_message_enqueued(bridge_id, bridge, artifacts.enqueued_messages);
Ok(id)
}
}
impl<T: Config<I>, I: 'static> OnMessagesDelivered<T::LaneId> for Pallet<T, I> {
fn on_messages_delivered(lane_id: T::LaneId, enqueued_messages: MessageNonce) {
Self::on_bridge_messages_delivered(lane_id, enqueued_messages);
}
}
impl<T: Config<I>, I: 'static> Pallet<T, I> {
fn on_bridge_message_enqueued(
bridge_id: BridgeId,
bridge: BridgeOf<T, I>,
enqueued_messages: MessageNonce,
) {
let is_congested = enqueued_messages > OUTBOUND_LANE_CONGESTED_THRESHOLD;
if !is_congested {
return
}
if bridge.state == BridgeState::Suspended {
return
}
let bridge_origin_relative_location = match bridge.bridge_origin_relative_location.try_as()
{
Ok(bridge_origin_relative_location) => bridge_origin_relative_location,
Err(_) => {
log::debug!(
target: LOG_TARGET,
"Failed to convert the bridge {:?} origin location {:?}",
bridge_id,
bridge.bridge_origin_relative_location,
);
return
},
};
let suspend_result =
T::LocalXcmChannelManager::suspend_bridge(bridge_origin_relative_location, bridge_id);
match suspend_result {
Ok(_) => {
log::debug!(
target: LOG_TARGET,
"Suspended the bridge {:?}, originated by the {:?}",
bridge_id,
bridge.bridge_origin_relative_location,
);
},
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Failed to suspended the bridge {:?}, originated by the {:?}: {:?}",
bridge_id,
bridge.bridge_origin_relative_location,
e,
);
return
},
}
Bridges::<T, I>::mutate_extant(bridge_id, |bridge| {
bridge.state = BridgeState::Suspended;
});
}
fn on_bridge_messages_delivered(lane_id: T::LaneId, enqueued_messages: MessageNonce) {
let is_congested = enqueued_messages > OUTBOUND_LANE_UNCONGESTED_THRESHOLD;
if is_congested {
return
}
let (bridge_id, bridge) = match Self::bridge_by_lane_id(&lane_id) {
Some(bridge) if bridge.1.state == BridgeState::Suspended => bridge,
_ => {
return
},
};
let bridge_origin_relative_location = (*bridge.bridge_origin_relative_location).try_into();
let bridge_origin_relative_location = match bridge_origin_relative_location {
Ok(bridge_origin_relative_location) => bridge_origin_relative_location,
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Failed to convert the bridge {:?} location for lane_id: {:?}, error {:?}",
bridge_id,
lane_id,
e,
);
return
},
};
let resume_result =
T::LocalXcmChannelManager::resume_bridge(&bridge_origin_relative_location, bridge_id);
match resume_result {
Ok(_) => {
log::debug!(
target: LOG_TARGET,
"Resumed the bridge {:?} and lane_id: {:?}, originated by the {:?}",
bridge_id,
lane_id,
bridge_origin_relative_location,
);
},
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Failed to resume the bridge {:?} and lane_id: {:?}, originated by the {:?}: {:?}",
bridge_id,
lane_id,
bridge_origin_relative_location,
e,
);
return
},
}
Bridges::<T, I>::mutate_extant(bridge_id, |bridge| {
bridge.state = BridgeState::Opened;
});
}
}
pub struct DummyHaulBlob;
impl HaulBlob for DummyHaulBlob {
fn haul_blob(_blob: XcmAsPlainPayload) -> Result<(), HaulBlobError> {
Err(HaulBlobError::Transport("DummyHaulBlob"))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{mock::*, Bridges, LaneToBridge, LanesManagerOf};
use bp_runtime::RangeInclusiveExt;
use bp_xcm_bridge_hub::{Bridge, BridgeLocations, BridgeState};
use frame_support::{assert_ok, traits::EnsureOrigin};
use pallet_bridge_messages::InboundLaneStorage;
use xcm_builder::{NetworkExportTable, UnpaidRemoteExporter};
use xcm_executor::traits::{export_xcm, ConvertLocation};
fn universal_source() -> InteriorLocation {
SiblingUniversalLocation::get()
}
fn bridged_relative_destination() -> InteriorLocation {
BridgedRelativeDestination::get()
}
fn bridged_universal_destination() -> InteriorLocation {
BridgedUniversalDestination::get()
}
fn open_lane(origin: RuntimeOrigin) -> (BridgeLocations, TestLaneIdType) {
let with = bridged_asset_hub_universal_location();
let locations =
XcmOverBridge::bridge_locations_from_origin(origin, Box::new(with.into())).unwrap();
let lane_id = locations.calculate_lane_id(xcm::latest::VERSION).unwrap();
if !Bridges::<TestRuntime, ()>::contains_key(locations.bridge_id()) {
Bridges::<TestRuntime, ()>::insert(
locations.bridge_id(),
Bridge {
bridge_origin_relative_location: Box::new(SiblingLocation::get().into()),
bridge_origin_universal_location: Box::new(
locations.bridge_origin_universal_location().clone().into(),
),
bridge_destination_universal_location: Box::new(
locations.bridge_destination_universal_location().clone().into(),
),
state: BridgeState::Opened,
bridge_owner_account: LocationToAccountId::convert_location(
locations.bridge_origin_relative_location(),
)
.expect("valid accountId"),
deposit: 0,
lane_id,
},
);
LaneToBridge::<TestRuntime, ()>::insert(lane_id, locations.bridge_id());
let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
if lanes_manager.create_inbound_lane(lane_id).is_ok() {
assert_eq!(
0,
lanes_manager
.active_inbound_lane(lane_id)
.unwrap()
.storage()
.data()
.last_confirmed_nonce
);
}
if lanes_manager.create_outbound_lane(lane_id).is_ok() {
assert!(lanes_manager
.active_outbound_lane(lane_id)
.unwrap()
.queued_messages()
.is_empty());
}
}
assert_ok!(XcmOverBridge::do_try_state());
(*locations, lane_id)
}
fn open_lane_and_send_regular_message() -> (BridgeId, TestLaneIdType) {
let (locations, lane_id) = open_lane(OpenBridgeOrigin::sibling_parachain_origin());
export_xcm::<XcmOverBridge>(
BridgedRelayNetwork::get(),
0,
locations.bridge_origin_universal_location().clone(),
locations.bridge_destination_universal_location().clone(),
vec![Instruction::ClearOrigin].into(),
)
.unwrap();
(*locations.bridge_id(), lane_id)
}
#[test]
fn exporter_works() {
run_test(|| {
let (_, lane_id) = open_lane_and_send_regular_message();
assert!(!LanesManagerOf::<TestRuntime, ()>::new()
.active_outbound_lane(lane_id)
.unwrap()
.queued_messages()
.is_empty());
});
}
#[test]
fn exporter_does_not_suspend_the_bridge_if_outbound_bridge_queue_is_not_congested() {
run_test(|| {
let (bridge_id, _) = open_lane_and_send_regular_message();
assert!(!TestLocalXcmChannelManager::is_bridge_suspended(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Opened);
});
}
#[test]
fn exporter_does_not_suspend_the_bridge_if_it_is_already_suspended() {
run_test(|| {
let (bridge_id, _) = open_lane_and_send_regular_message();
Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
bridge.state = BridgeState::Suspended;
});
for _ in 1..OUTBOUND_LANE_CONGESTED_THRESHOLD {
open_lane_and_send_regular_message();
}
open_lane_and_send_regular_message();
assert!(!TestLocalXcmChannelManager::is_bridge_suspended(&bridge_id));
});
}
#[test]
fn exporter_suspends_the_bridge_if_outbound_bridge_queue_is_congested() {
run_test(|| {
let (bridge_id, _) = open_lane_and_send_regular_message();
for _ in 1..OUTBOUND_LANE_CONGESTED_THRESHOLD {
open_lane_and_send_regular_message();
}
assert!(!TestLocalXcmChannelManager::is_bridge_suspended(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Opened);
open_lane_and_send_regular_message();
assert!(TestLocalXcmChannelManager::is_bridge_suspended(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Suspended);
});
}
#[test]
fn bridge_is_not_resumed_if_outbound_bridge_queue_is_still_congested() {
run_test(|| {
let (bridge_id, lane_id) = open_lane_and_send_regular_message();
Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
bridge.state = BridgeState::Suspended;
});
XcmOverBridge::on_bridge_messages_delivered(
lane_id,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD + 1,
);
assert!(!TestLocalXcmChannelManager::is_bridge_resumed(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Suspended);
});
}
#[test]
fn bridge_is_not_resumed_if_it_was_not_suspended_before() {
run_test(|| {
let (bridge_id, lane_id) = open_lane_and_send_regular_message();
XcmOverBridge::on_bridge_messages_delivered(
lane_id,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
);
assert!(!TestLocalXcmChannelManager::is_bridge_resumed(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Opened);
});
}
#[test]
fn bridge_is_resumed_when_enough_messages_are_delivered() {
run_test(|| {
let (bridge_id, lane_id) = open_lane_and_send_regular_message();
Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
bridge.state = BridgeState::Suspended;
});
XcmOverBridge::on_bridge_messages_delivered(
lane_id,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
);
assert!(TestLocalXcmChannelManager::is_bridge_resumed(&bridge_id));
assert_eq!(XcmOverBridge::bridge(&bridge_id).unwrap().state, BridgeState::Opened);
});
}
#[test]
fn export_fails_if_argument_is_missing() {
run_test(|| {
assert_eq!(
XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut None,
&mut Some(bridged_relative_destination()),
&mut Some(Vec::new().into()),
),
Err(SendError::MissingArgument),
);
assert_eq!(
XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut Some(universal_source()),
&mut None,
&mut Some(Vec::new().into()),
),
Err(SendError::MissingArgument),
);
})
}
#[test]
fn exporter_computes_correct_lane_id() {
run_test(|| {
assert_ne!(bridged_universal_destination(), bridged_relative_destination());
let locations = BridgeLocations::bridge_locations(
UniversalLocation::get(),
SiblingLocation::get(),
bridged_universal_destination(),
BridgedRelayNetwork::get(),
)
.unwrap();
let expected_bridge_id = locations.bridge_id();
let expected_lane_id = locations.calculate_lane_id(xcm::latest::VERSION).unwrap();
if LanesManagerOf::<TestRuntime, ()>::new()
.create_outbound_lane(expected_lane_id)
.is_ok()
{
Bridges::<TestRuntime, ()>::insert(
expected_bridge_id,
Bridge {
bridge_origin_relative_location: Box::new(
locations.bridge_origin_relative_location().clone().into(),
),
bridge_origin_universal_location: Box::new(
locations.bridge_origin_universal_location().clone().into(),
),
bridge_destination_universal_location: Box::new(
locations.bridge_destination_universal_location().clone().into(),
),
state: BridgeState::Opened,
bridge_owner_account: [0u8; 32].into(),
deposit: 0,
lane_id: expected_lane_id,
},
);
}
let ticket = XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut Some(universal_source()),
&mut Some(bridged_relative_destination()),
&mut Some(Vec::new().into()),
)
.unwrap()
.0;
assert_eq!(&ticket.0, expected_bridge_id);
assert_eq!(ticket.1.lane_id, expected_lane_id);
});
}
#[test]
fn exporter_is_compatible_with_pallet_xcm_bridge_hub_router() {
run_test(|| {
let dest = Location::new(2, BridgedUniversalDestination::get());
let origin = OpenBridgeOrigin::sibling_parachain_origin();
let origin_as_location =
OpenBridgeOriginOf::<TestRuntime, ()>::try_origin(origin.clone()).unwrap();
let (_, expected_lane_id) = open_lane(origin);
assert_eq!(
pallet_bridge_messages::Pallet::<TestRuntime, ()>::outbound_lane_data(
expected_lane_id
)
.unwrap()
.queued_messages()
.saturating_len(),
0
);
ExecuteXcmOverSendXcm::set_origin_for_execute(origin_as_location);
assert_ok!(send_xcm::<
UnpaidRemoteExporter<
NetworkExportTable<BridgeTable>,
ExecuteXcmOverSendXcm,
UniversalLocation,
>,
>(dest.clone(), Xcm::<()>::default()));
ExportMessageOriginUniversalLocation::set(Some(SiblingUniversalLocation::get()));
ExecuteXcmOverSendXcm::set_origin_for_execute(SiblingLocation::get());
assert_ok!(send_xcm::<XcmOverBridgeWrappedWithExportMessageRouter>(
dest.clone(),
Xcm::<()>::default()
));
assert_eq!(
pallet_bridge_messages::Pallet::<TestRuntime, ()>::outbound_lane_data(
expected_lane_id
)
.unwrap()
.queued_messages()
.saturating_len(),
2
);
})
}
#[test]
fn validate_works() {
run_test(|| {
let xcm: Xcm<()> = vec![ClearOrigin].into();
let mut xcm_wrapper = Some(xcm.clone());
let mut universal_source_wrapper = Some(universal_source());
let mut dest_wrapper = Some(bridged_relative_destination());
assert_eq!(
XcmOverBridge::validate(
NetworkId::ByGenesis([0; 32]),
0,
&mut universal_source_wrapper,
&mut dest_wrapper,
&mut xcm_wrapper,
),
Err(SendError::NotApplicable),
);
assert_eq!(&Some(xcm.clone()), &xcm_wrapper);
assert_eq!(&Some(universal_source()), &universal_source_wrapper);
assert_eq!(&Some(bridged_relative_destination()), &dest_wrapper);
let mut invalid_dest_wrapper = Some(
[GlobalConsensus(NetworkId::ByGenesis([0; 32])), Parachain(BRIDGED_ASSET_HUB_ID)]
.into(),
);
assert_eq!(
XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut Some(universal_source()),
&mut invalid_dest_wrapper,
&mut xcm_wrapper,
),
Err(SendError::NotApplicable),
);
assert_eq!(&Some(xcm.clone()), &xcm_wrapper);
assert_eq!(&Some(universal_source()), &universal_source_wrapper);
assert_eq!(
&Some(
[
GlobalConsensus(NetworkId::ByGenesis([0; 32]),),
Parachain(BRIDGED_ASSET_HUB_ID)
]
.into()
),
&invalid_dest_wrapper
);
let mut dest_without_lane_wrapper =
Some([GlobalConsensus(BridgedRelayNetwork::get()), Parachain(5679)].into());
assert_eq!(
XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut Some(universal_source()),
&mut dest_without_lane_wrapper,
&mut xcm_wrapper,
),
Err(SendError::NotApplicable),
);
assert_eq!(&Some(xcm.clone()), &xcm_wrapper);
assert_eq!(&Some(universal_source()), &universal_source_wrapper);
assert_eq!(
&Some([GlobalConsensus(BridgedRelayNetwork::get(),), Parachain(5679)].into()),
&dest_without_lane_wrapper
);
let _ = open_lane(OpenBridgeOrigin::sibling_parachain_origin());
let mut dest_wrapper = Some(bridged_relative_destination());
assert_ok!(XcmOverBridge::validate(
BridgedRelayNetwork::get(),
0,
&mut Some(universal_source()),
&mut dest_wrapper,
&mut xcm_wrapper,
));
assert_eq!(None, xcm_wrapper);
assert_eq!(&Some(universal_source()), &universal_source_wrapper);
assert_eq!(None, dest_wrapper);
});
}
#[test]
fn congestion_with_pallet_xcm_bridge_hub_router_works() {
run_test(|| {
let dest = Location::new(2, BridgedUniversalDestination::get());
fn router_bridge_state() -> pallet_xcm_bridge_hub_router::BridgeState {
pallet_xcm_bridge_hub_router::Bridge::<
TestRuntime,
XcmOverBridgeWrappedWithExportMessageRouterInstance,
>::get()
}
let origin = OpenBridgeOrigin::sibling_parachain_origin();
let origin_as_location =
OpenBridgeOriginOf::<TestRuntime, ()>::try_origin(origin.clone()).unwrap();
let (bridge_1, expected_lane_id_1) = open_lane(origin);
ExportMessageOriginUniversalLocation::set(Some(SiblingUniversalLocation::get()));
assert_eq!(
XcmOverBridge::bridge(bridge_1.bridge_id()).unwrap().state,
BridgeState::Opened
);
assert!(!router_bridge_state().is_congested);
assert!(!TestLocalXcmChannelManager::is_bridge_suspended(bridge_1.bridge_id()));
assert!(!TestLocalXcmChannelManager::is_bridge_resumed(bridge_1.bridge_id()));
for _ in 1..(OUTBOUND_LANE_CONGESTED_THRESHOLD + 2) {
ExecuteXcmOverSendXcm::set_origin_for_execute(origin_as_location.clone());
assert_ok!(send_xcm::<XcmOverBridgeWrappedWithExportMessageRouter>(
dest.clone(),
Xcm::<()>::default()
));
}
assert_eq!(
XcmOverBridge::bridge(bridge_1.bridge_id()).unwrap().state,
BridgeState::Suspended,
);
assert!(router_bridge_state().is_congested);
assert!(TestLocalXcmChannelManager::is_bridge_suspended(bridge_1.bridge_id()));
assert!(!TestLocalXcmChannelManager::is_bridge_resumed(bridge_1.bridge_id()));
XcmOverBridge::on_bridge_messages_delivered(
expected_lane_id_1,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
);
assert_eq!(
XcmOverBridge::bridge(bridge_1.bridge_id()).unwrap().state,
BridgeState::Opened
);
assert!(!router_bridge_state().is_congested);
assert!(TestLocalXcmChannelManager::is_bridge_resumed(bridge_1.bridge_id()));
})
}
}