use crate::message_race_loop::{
NoncesRange, RaceState, RaceStrategy, SourceClientNonces, TargetClientNonces,
};
use async_trait::async_trait;
use bp_messages::MessageNonce;
use relay_utils::HeaderId;
use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, ops::RangeInclusive};
pub type SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange> =
VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>;
#[derive(Debug)]
pub struct BasicStrategy<
SourceHeaderNumber,
SourceHeaderHash,
TargetHeaderNumber,
TargetHeaderHash,
SourceNoncesRange,
Proof,
> {
source_queue: SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange>,
best_target_nonce: Option<MessageNonce>,
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
}
impl<
SourceHeaderNumber,
SourceHeaderHash,
TargetHeaderNumber,
TargetHeaderHash,
SourceNoncesRange,
Proof,
>
BasicStrategy<
SourceHeaderNumber,
SourceHeaderHash,
TargetHeaderNumber,
TargetHeaderHash,
SourceNoncesRange,
Proof,
>
where
SourceHeaderHash: Clone,
SourceHeaderNumber: Clone + Ord,
SourceNoncesRange: NoncesRange,
{
pub fn new() -> Self {
BasicStrategy {
source_queue: VecDeque::new(),
best_target_nonce: None,
_phantom: Default::default(),
}
}
pub(crate) fn source_queue(
&self,
) -> &VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)> {
&self.source_queue
}
#[cfg(test)]
pub(crate) fn source_queue_mut(
&mut self,
) -> &mut VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)> {
&mut self.source_queue
}
pub fn available_source_queue_indices<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
>,
>(
&self,
race_state: RS,
) -> Option<RangeInclusive<usize>> {
let best_target_nonce = self.best_target_nonce?;
if race_state.nonces_to_submit().is_some() {
return None
}
if race_state.nonces_submitted().is_some() {
return None
}
let begin_index = self
.source_queue
.iter()
.enumerate()
.skip_while(|(_, (_, nonces))| nonces.end() <= best_target_nonce)
.map(|(index, _)| index)
.next()?;
let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target()?;
let end_index = self
.source_queue
.iter()
.enumerate()
.skip(begin_index)
.take_while(|(_, (queued_at, _))| queued_at.0 <= best_header_at_target.0)
.map(|(index, _)| index)
.last()?;
Some(begin_index..=end_index)
}
fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
if let Some(range_to_requeue) = queued_range.greater_than(nonce) {
self.source_queue.push_front((queued_at, range_to_requeue));
break
}
}
}
}
#[async_trait]
impl<
SourceHeaderNumber,
SourceHeaderHash,
TargetHeaderNumber,
TargetHeaderHash,
SourceNoncesRange,
Proof,
>
RaceStrategy<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Proof,
>
for BasicStrategy<
SourceHeaderNumber,
SourceHeaderHash,
TargetHeaderNumber,
TargetHeaderHash,
SourceNoncesRange,
Proof,
>
where
SourceHeaderHash: Clone + Debug + Send + Sync,
SourceHeaderNumber: Clone + Ord + Debug + Send + Sync,
SourceNoncesRange: NoncesRange + Debug + Send + Sync,
TargetHeaderHash: Debug + Send + Sync,
TargetHeaderNumber: Debug + Send + Sync,
Proof: Debug + Send + Sync,
{
type SourceNoncesRange = SourceNoncesRange;
type ProofParameters = ();
type TargetNoncesData = ();
fn is_empty(&self) -> bool {
self.source_queue.is_empty()
}
async fn required_source_header_at_target<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
>,
>(
&self,
race_state: RS,
) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> {
let current_best = race_state.best_finalized_source_header_id_at_best_target()?;
self.source_queue
.back()
.and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
}
fn best_at_source(&self) -> Option<MessageNonce> {
let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
match (best_in_queue, self.best_target_nonce) {
(Some(best_in_queue), Some(best_target_nonce)) if best_in_queue > best_target_nonce =>
Some(best_in_queue),
(_, Some(best_target_nonce)) => Some(best_target_nonce),
(_, None) => None,
}
}
fn best_at_target(&self) -> Option<MessageNonce> {
self.best_target_nonce
}
fn source_nonces_updated(
&mut self,
at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>,
nonces: SourceClientNonces<SourceNoncesRange>,
) {
let best_in_queue = self
.source_queue
.back()
.map(|(_, range)| range.end())
.or(self.best_target_nonce)
.unwrap_or_default();
self.source_queue.extend(
nonces
.new_nonces
.greater_than(best_in_queue)
.into_iter()
.map(move |range| (at_block.clone(), range)),
)
}
fn reset_best_target_nonces(&mut self) {
self.best_target_nonce = None;
}
fn best_target_nonces_updated<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
>,
>(
&mut self,
nonces: TargetClientNonces<()>,
race_state: &mut RS,
) {
let nonce = nonces.latest_nonce;
let need_to_select_new_nonces = race_state
.nonces_to_submit()
.map(|nonces| nonce >= *nonces.start())
.unwrap_or(false);
if need_to_select_new_nonces {
log::trace!(
target: "bridge",
"Latest nonce at target is {}. Clearing nonces to submit: {:?}",
nonce,
race_state.nonces_to_submit(),
);
race_state.reset_nonces_to_submit();
}
let need_new_nonces_to_submit = race_state
.nonces_submitted()
.map(|nonces| nonce >= *nonces.start())
.unwrap_or(false);
if need_new_nonces_to_submit {
log::trace!(
target: "bridge",
"Latest nonce at target is {}. Clearing submitted nonces: {:?}",
nonce,
race_state.nonces_submitted(),
);
race_state.reset_nonces_submitted();
}
self.best_target_nonce = Some(nonce);
}
fn finalized_target_nonces_updated<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
>,
>(
&mut self,
nonces: TargetClientNonces<()>,
_race_state: &mut RS,
) {
self.remove_le_nonces_from_source_queue(nonces.latest_nonce);
self.best_target_nonce = Some(std::cmp::max(
self.best_target_nonce.unwrap_or(nonces.latest_nonce),
nonces.latest_nonce,
));
}
async fn select_nonces_to_deliver<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
>,
>(
&self,
race_state: RS,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let available_indices = self.available_source_queue_indices(race_state)?;
let range_begin = std::cmp::max(
self.best_target_nonce? + 1,
self.source_queue[*available_indices.start()].1.begin(),
);
let range_end = self.source_queue[*available_indices.end()].1.end();
Some((range_begin..=range_end, ()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::tests::{
header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderHash,
TestSourceHeaderNumber,
},
message_race_loop::RaceStateImpl,
};
type SourceNoncesRange = RangeInclusive<MessageNonce>;
type TestRaceStateImpl = RaceStateImpl<
SourceHeaderIdOf<TestMessageLane>,
TargetHeaderIdOf<TestMessageLane>,
TestMessagesProof,
(),
>;
type BasicStrategy<P> = super::BasicStrategy<
<P as MessageLane>::SourceHeaderNumber,
<P as MessageLane>::SourceHeaderHash,
<P as MessageLane>::TargetHeaderNumber,
<P as MessageLane>::TargetHeaderHash,
SourceNoncesRange,
<P as MessageLane>::MessagesProof,
>;
fn source_nonces(new_nonces: SourceNoncesRange) -> SourceClientNonces<SourceNoncesRange> {
SourceClientNonces { new_nonces, confirmed_nonce: None }
}
fn target_nonces(latest_nonce: MessageNonce) -> TargetClientNonces<()> {
TargetClientNonces { latest_nonce, nonces_data: () }
}
#[test]
fn strategy_is_empty_works() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert!(strategy.is_empty());
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
assert!(!strategy.is_empty());
}
#[test]
fn best_at_source_is_never_lower_than_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert_eq!(strategy.best_at_source(), None);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
assert_eq!(strategy.best_at_source(), None);
strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default());
assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]);
assert_eq!(strategy.best_at_source(), Some(10));
}
#[test]
fn source_nonce_is_never_lower_than_known_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default());
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
assert_eq!(strategy.source_queue, vec![]);
}
#[test]
fn source_nonce_is_never_lower_than_latest_known_source_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
strategy.source_nonces_updated(header_id(2), source_nonces(1..=3));
strategy.source_nonces_updated(header_id(2), source_nonces(1..=5));
assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]);
}
#[test]
fn updated_target_nonce_removes_queued_entries() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
strategy.source_nonces_updated(header_id(2), source_nonces(6..=10));
strategy.source_nonces_updated(header_id(3), source_nonces(11..=15));
strategy.source_nonces_updated(header_id(4), source_nonces(16..=20));
strategy
.finalized_target_nonces_updated(target_nonces(15), &mut TestRaceStateImpl::default());
assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]);
strategy
.finalized_target_nonces_updated(target_nonces(17), &mut TestRaceStateImpl::default());
assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]);
}
#[test]
fn selected_nonces_are_dropped_on_target_nonce_update() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
strategy.best_target_nonces_updated(target_nonces(4), &mut state);
assert!(state.nonces_to_submit.is_some());
for nonce in 5..=11 {
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
assert!(state.nonces_to_submit.is_none());
}
}
#[test]
fn submitted_nonces_are_dropped_on_target_nonce_update() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_submitted = Some(5..=10);
strategy.best_target_nonces_updated(target_nonces(4), &mut state);
assert!(state.nonces_submitted.is_some());
for nonce in 5..=11 {
state.nonces_submitted = Some(5..=10);
strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
assert!(state.nonces_submitted.is_none());
}
}
#[async_std::test]
async fn nothing_is_selected_if_something_is_already_selected() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
}
#[async_std::test]
async fn nothing_is_selected_if_something_is_already_submitted() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_submitted = Some(1..=10);
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
}
#[async_std::test]
async fn select_nonces_to_deliver_works() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
strategy.source_nonces_updated(header_id(5), source_nonces(7..=8));
state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=6, ())));
strategy.best_target_nonces_updated(target_nonces(6), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
state.best_finalized_source_header_id_at_best_target = Some(header_id(5));
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((7..=8, ())));
strategy.best_target_nonces_updated(target_nonces(8), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
}
#[test]
fn available_source_queue_indices_works() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
state.best_finalized_source_header_id_at_best_target = Some(header_id(0));
assert_eq!(strategy.available_source_queue_indices(state.clone()), None);
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=0));
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=1));
state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=2));
state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
assert_eq!(strategy.available_source_queue_indices(state), Some(0..=2));
}
#[test]
fn remove_le_nonces_from_source_queue_works() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
fn source_queue_nonces(
source_queue: &SourceRangesQueue<
TestSourceHeaderHash,
TestSourceHeaderNumber,
SourceNoncesRange,
>,
) -> Vec<MessageNonce> {
source_queue.iter().flat_map(|(_, range)| range.clone()).collect()
}
strategy.remove_le_nonces_from_source_queue(1);
assert_eq!(source_queue_nonces(&strategy.source_queue), vec![2, 3, 4, 5, 6, 7, 8, 9],);
strategy.remove_le_nonces_from_source_queue(5);
assert_eq!(source_queue_nonces(&strategy.source_queue), vec![6, 7, 8, 9],);
strategy.remove_le_nonces_from_source_queue(9);
assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
strategy.remove_le_nonces_from_source_queue(100);
assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
}
#[async_std::test]
async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() {
let source_header_1 = header_id(1);
let target_header_1 = header_id(1);
let mut state = TestRaceStateImpl {
best_finalized_source_header_id_at_source: Some(source_header_1),
best_finalized_source_header_id_at_best_target: Some(source_header_1),
best_target_header_id: Some(target_header_1),
best_finalized_target_header_id: Some(target_header_1),
nonces_to_submit: None,
nonces_to_submit_batch: None,
nonces_submitted: None,
};
let mut strategy = BasicStrategy::<TestMessageLane> {
source_queue: vec![(header_id(1), 1..=1)].into_iter().collect(),
best_target_nonce: Some(0),
_phantom: PhantomData,
};
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=1, ())),);
state.nonces_submitted = Some(1..=1);
let source_header_2 = header_id(2);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
strategy.source_nonces_updated(
source_header_2,
SourceClientNonces { new_nonces: 2..=2, confirmed_nonce: None },
);
let target_header_2 = header_id(2);
state.best_target_header_id = Some(target_header_2);
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 1, nonces_data: () },
&mut state,
);
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 0, nonces_data: () },
&mut state,
);
let target_header_2_fork = header_id(2_1);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
state.best_finalized_source_header_id_at_best_target = Some(source_header_2);
state.best_target_header_id = Some(target_header_2_fork);
state.best_finalized_target_header_id = Some(target_header_2_fork);
strategy.finalized_target_nonces_updated(
TargetClientNonces { latest_nonce: 0, nonces_data: () },
&mut state,
);
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=2, ())),);
}
}