1use crate::message_race_loop::{
21 NoncesRange, RaceState, RaceStrategy, SourceClientNonces, TargetClientNonces,
22};
23
24use async_trait::async_trait;
25use bp_messages::MessageNonce;
26use relay_utils::HeaderId;
27use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, ops::RangeInclusive};
28
29pub type SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange> =
31 VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>;
32
33#[derive(Debug)]
35pub struct BasicStrategy<
36 SourceHeaderNumber,
37 SourceHeaderHash,
38 TargetHeaderNumber,
39 TargetHeaderHash,
40 SourceNoncesRange,
41 Proof,
42> {
43 source_queue: SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange>,
48 best_target_nonce: Option<MessageNonce>,
51 _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
53}
54
55impl<
56 SourceHeaderNumber,
57 SourceHeaderHash,
58 TargetHeaderNumber,
59 TargetHeaderHash,
60 SourceNoncesRange,
61 Proof,
62 >
63 BasicStrategy<
64 SourceHeaderNumber,
65 SourceHeaderHash,
66 TargetHeaderNumber,
67 TargetHeaderHash,
68 SourceNoncesRange,
69 Proof,
70 >
71where
72 SourceHeaderHash: Clone,
73 SourceHeaderNumber: Clone + Ord,
74 SourceNoncesRange: NoncesRange,
75{
76 pub fn new() -> Self {
78 BasicStrategy {
79 source_queue: VecDeque::new(),
80 best_target_nonce: None,
81 _phantom: Default::default(),
82 }
83 }
84
85 pub(crate) fn source_queue(
87 &self,
88 ) -> &VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)> {
89 &self.source_queue
90 }
91
92 #[cfg(test)]
94 pub(crate) fn source_queue_mut(
95 &mut self,
96 ) -> &mut VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)> {
97 &mut self.source_queue
98 }
99
100 pub fn available_source_queue_indices<
111 RS: RaceState<
112 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
113 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
114 >,
115 >(
116 &self,
117 race_state: RS,
118 ) -> Option<RangeInclusive<usize>> {
119 let best_target_nonce = self.best_target_nonce?;
121
122 if race_state.nonces_to_submit().is_some() {
124 return None
125 }
126
127 if race_state.nonces_submitted().is_some() {
129 return None
130 }
131
132 let begin_index = self
134 .source_queue
135 .iter()
136 .enumerate()
137 .skip_while(|(_, (_, nonces))| nonces.end() <= best_target_nonce)
138 .map(|(index, _)| index)
139 .next()?;
140
141 let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target()?;
149 let end_index = self
150 .source_queue
151 .iter()
152 .enumerate()
153 .skip(begin_index)
154 .take_while(|(_, (queued_at, _))| queued_at.0 <= best_header_at_target.0)
155 .map(|(index, _)| index)
156 .last()?;
157
158 Some(begin_index..=end_index)
159 }
160
161 fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
163 while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
164 if let Some(range_to_requeue) = queued_range.greater_than(nonce) {
165 self.source_queue.push_front((queued_at, range_to_requeue));
166 break
167 }
168 }
169 }
170}
171
172#[async_trait]
173impl<
174 SourceHeaderNumber,
175 SourceHeaderHash,
176 TargetHeaderNumber,
177 TargetHeaderHash,
178 SourceNoncesRange,
179 Proof,
180 >
181 RaceStrategy<
182 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
183 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
184 Proof,
185 >
186 for BasicStrategy<
187 SourceHeaderNumber,
188 SourceHeaderHash,
189 TargetHeaderNumber,
190 TargetHeaderHash,
191 SourceNoncesRange,
192 Proof,
193 >
194where
195 SourceHeaderHash: Clone + Debug + Send + Sync,
196 SourceHeaderNumber: Clone + Ord + Debug + Send + Sync,
197 SourceNoncesRange: NoncesRange + Debug + Send + Sync,
198 TargetHeaderHash: Debug + Send + Sync,
199 TargetHeaderNumber: Debug + Send + Sync,
200 Proof: Debug + Send + Sync,
201{
202 type SourceNoncesRange = SourceNoncesRange;
203 type ProofParameters = ();
204 type TargetNoncesData = ();
205
206 fn is_empty(&self) -> bool {
207 self.source_queue.is_empty()
208 }
209
210 async fn required_source_header_at_target<
211 RS: RaceState<
212 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
213 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
214 >,
215 >(
216 &self,
217 race_state: RS,
218 ) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> {
219 let current_best = race_state.best_finalized_source_header_id_at_best_target()?;
220 self.source_queue
221 .back()
222 .and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
223 }
224
225 fn best_at_source(&self) -> Option<MessageNonce> {
226 let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
227 match (best_in_queue, self.best_target_nonce) {
228 (Some(best_in_queue), Some(best_target_nonce)) if best_in_queue > best_target_nonce =>
229 Some(best_in_queue),
230 (_, Some(best_target_nonce)) => Some(best_target_nonce),
231 (_, None) => None,
232 }
233 }
234
235 fn best_at_target(&self) -> Option<MessageNonce> {
236 self.best_target_nonce
237 }
238
239 fn source_nonces_updated(
240 &mut self,
241 at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>,
242 nonces: SourceClientNonces<SourceNoncesRange>,
243 ) {
244 let best_in_queue = self
245 .source_queue
246 .back()
247 .map(|(_, range)| range.end())
248 .or(self.best_target_nonce)
249 .unwrap_or_default();
250 self.source_queue.extend(
251 nonces
252 .new_nonces
253 .greater_than(best_in_queue)
254 .into_iter()
255 .map(move |range| (at_block.clone(), range)),
256 )
257 }
258
259 fn reset_best_target_nonces(&mut self) {
260 self.best_target_nonce = None;
261 }
262
263 fn best_target_nonces_updated<
264 RS: RaceState<
265 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
266 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
267 >,
268 >(
269 &mut self,
270 nonces: TargetClientNonces<()>,
271 race_state: &mut RS,
272 ) {
273 let nonce = nonces.latest_nonce;
274
275 let need_to_select_new_nonces = race_state
278 .nonces_to_submit()
279 .map(|nonces| nonce >= *nonces.start())
280 .unwrap_or(false);
281 if need_to_select_new_nonces {
282 log::trace!(
283 target: "bridge",
284 "Latest nonce at target is {}. Clearing nonces to submit: {:?}",
285 nonce,
286 race_state.nonces_to_submit(),
287 );
288
289 race_state.reset_nonces_to_submit();
290 }
291
292 let need_new_nonces_to_submit = race_state
295 .nonces_submitted()
296 .map(|nonces| nonce >= *nonces.start())
297 .unwrap_or(false);
298 if need_new_nonces_to_submit {
299 log::trace!(
300 target: "bridge",
301 "Latest nonce at target is {}. Clearing submitted nonces: {:?}",
302 nonce,
303 race_state.nonces_submitted(),
304 );
305
306 race_state.reset_nonces_submitted();
307 }
308
309 self.best_target_nonce = Some(nonce);
310 }
311
312 fn finalized_target_nonces_updated<
313 RS: RaceState<
314 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
315 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
316 >,
317 >(
318 &mut self,
319 nonces: TargetClientNonces<()>,
320 _race_state: &mut RS,
321 ) {
322 self.remove_le_nonces_from_source_queue(nonces.latest_nonce);
323 self.best_target_nonce = Some(std::cmp::max(
324 self.best_target_nonce.unwrap_or(nonces.latest_nonce),
325 nonces.latest_nonce,
326 ));
327 }
328
329 async fn select_nonces_to_deliver<
330 RS: RaceState<
331 HeaderId<SourceHeaderHash, SourceHeaderNumber>,
332 HeaderId<TargetHeaderHash, TargetHeaderNumber>,
333 >,
334 >(
335 &self,
336 race_state: RS,
337 ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
338 let available_indices = self.available_source_queue_indices(race_state)?;
339 let range_begin = std::cmp::max(
340 self.best_target_nonce? + 1,
341 self.source_queue[*available_indices.start()].1.begin(),
342 );
343 let range_end = self.source_queue[*available_indices.end()].1.end();
344 Some((range_begin..=range_end, ()))
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use crate::{
352 message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
353 message_lane_loop::tests::{
354 header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderHash,
355 TestSourceHeaderNumber,
356 },
357 message_race_loop::RaceStateImpl,
358 };
359
360 type SourceNoncesRange = RangeInclusive<MessageNonce>;
361
362 type TestRaceStateImpl = RaceStateImpl<
363 SourceHeaderIdOf<TestMessageLane>,
364 TargetHeaderIdOf<TestMessageLane>,
365 TestMessagesProof,
366 (),
367 >;
368
369 type BasicStrategy<P> = super::BasicStrategy<
370 <P as MessageLane>::SourceHeaderNumber,
371 <P as MessageLane>::SourceHeaderHash,
372 <P as MessageLane>::TargetHeaderNumber,
373 <P as MessageLane>::TargetHeaderHash,
374 SourceNoncesRange,
375 <P as MessageLane>::MessagesProof,
376 >;
377
378 fn source_nonces(new_nonces: SourceNoncesRange) -> SourceClientNonces<SourceNoncesRange> {
379 SourceClientNonces { new_nonces, confirmed_nonce: None }
380 }
381
382 fn target_nonces(latest_nonce: MessageNonce) -> TargetClientNonces<()> {
383 TargetClientNonces { latest_nonce, nonces_data: () }
384 }
385
386 #[test]
387 fn strategy_is_empty_works() {
388 let mut strategy = BasicStrategy::<TestMessageLane>::new();
389 assert!(strategy.is_empty());
390 strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
391 assert!(!strategy.is_empty());
392 }
393
394 #[test]
395 fn best_at_source_is_never_lower_than_target_nonce() {
396 let mut strategy = BasicStrategy::<TestMessageLane>::new();
397 assert_eq!(strategy.best_at_source(), None);
398 strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
399 assert_eq!(strategy.best_at_source(), None);
400 strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default());
401 assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]);
402 assert_eq!(strategy.best_at_source(), Some(10));
403 }
404
405 #[test]
406 fn source_nonce_is_never_lower_than_known_target_nonce() {
407 let mut strategy = BasicStrategy::<TestMessageLane>::new();
408 strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default());
409 strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
410 assert_eq!(strategy.source_queue, vec![]);
411 }
412
413 #[test]
414 fn source_nonce_is_never_lower_than_latest_known_source_nonce() {
415 let mut strategy = BasicStrategy::<TestMessageLane>::new();
416 strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
417 strategy.source_nonces_updated(header_id(2), source_nonces(1..=3));
418 strategy.source_nonces_updated(header_id(2), source_nonces(1..=5));
419 assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]);
420 }
421
422 #[test]
423 fn updated_target_nonce_removes_queued_entries() {
424 let mut strategy = BasicStrategy::<TestMessageLane>::new();
425 strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
426 strategy.source_nonces_updated(header_id(2), source_nonces(6..=10));
427 strategy.source_nonces_updated(header_id(3), source_nonces(11..=15));
428 strategy.source_nonces_updated(header_id(4), source_nonces(16..=20));
429 strategy
430 .finalized_target_nonces_updated(target_nonces(15), &mut TestRaceStateImpl::default());
431 assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]);
432 strategy
433 .finalized_target_nonces_updated(target_nonces(17), &mut TestRaceStateImpl::default());
434 assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]);
435 }
436
437 #[test]
438 fn selected_nonces_are_dropped_on_target_nonce_update() {
439 let mut state = TestRaceStateImpl::default();
440 let mut strategy = BasicStrategy::<TestMessageLane>::new();
441 state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
442 strategy.best_target_nonces_updated(target_nonces(4), &mut state);
444 assert!(state.nonces_to_submit.is_some());
445 for nonce in 5..=11 {
447 state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
448 strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
449 assert!(state.nonces_to_submit.is_none());
450 }
451 }
452
453 #[test]
454 fn submitted_nonces_are_dropped_on_target_nonce_update() {
455 let mut state = TestRaceStateImpl::default();
456 let mut strategy = BasicStrategy::<TestMessageLane>::new();
457 state.nonces_submitted = Some(5..=10);
458 strategy.best_target_nonces_updated(target_nonces(4), &mut state);
460 assert!(state.nonces_submitted.is_some());
461 for nonce in 5..=11 {
463 state.nonces_submitted = Some(5..=10);
464 strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
465 assert!(state.nonces_submitted.is_none());
466 }
467 }
468
469 #[async_std::test]
470 async fn nothing_is_selected_if_something_is_already_selected() {
471 let mut state = TestRaceStateImpl::default();
472 let mut strategy = BasicStrategy::<TestMessageLane>::new();
473 state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
474 strategy.best_target_nonces_updated(target_nonces(0), &mut state);
475 strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
476 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
477 }
478
479 #[async_std::test]
480 async fn nothing_is_selected_if_something_is_already_submitted() {
481 let mut state = TestRaceStateImpl::default();
482 let mut strategy = BasicStrategy::<TestMessageLane>::new();
483 state.nonces_submitted = Some(1..=10);
484 strategy.best_target_nonces_updated(target_nonces(0), &mut state);
485 strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
486 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
487 }
488
489 #[async_std::test]
490 async fn select_nonces_to_deliver_works() {
491 let mut state = TestRaceStateImpl::default();
492 let mut strategy = BasicStrategy::<TestMessageLane>::new();
493 strategy.best_target_nonces_updated(target_nonces(0), &mut state);
494 strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
495 strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
496 strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
497 strategy.source_nonces_updated(header_id(5), source_nonces(7..=8));
498
499 state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
500 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=6, ())));
501 strategy.best_target_nonces_updated(target_nonces(6), &mut state);
502 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
503
504 state.best_finalized_source_header_id_at_best_target = Some(header_id(5));
505 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((7..=8, ())));
506 strategy.best_target_nonces_updated(target_nonces(8), &mut state);
507 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
508 }
509
510 #[test]
511 fn available_source_queue_indices_works() {
512 let mut state = TestRaceStateImpl::default();
513 let mut strategy = BasicStrategy::<TestMessageLane>::new();
514 strategy.best_target_nonces_updated(target_nonces(0), &mut state);
515 strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
516 strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
517 strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
518
519 state.best_finalized_source_header_id_at_best_target = Some(header_id(0));
520 assert_eq!(strategy.available_source_queue_indices(state.clone()), None);
521
522 state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
523 assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=0));
524
525 state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
526 assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=1));
527
528 state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
529 assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=2));
530
531 state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
532 assert_eq!(strategy.available_source_queue_indices(state), Some(0..=2));
533 }
534
535 #[test]
536 fn remove_le_nonces_from_source_queue_works() {
537 let mut state = TestRaceStateImpl::default();
538 let mut strategy = BasicStrategy::<TestMessageLane>::new();
539 strategy.best_target_nonces_updated(target_nonces(0), &mut state);
540 strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
541 strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
542 strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
543
544 fn source_queue_nonces(
545 source_queue: &SourceRangesQueue<
546 TestSourceHeaderHash,
547 TestSourceHeaderNumber,
548 SourceNoncesRange,
549 >,
550 ) -> Vec<MessageNonce> {
551 source_queue.iter().flat_map(|(_, range)| range.clone()).collect()
552 }
553
554 strategy.remove_le_nonces_from_source_queue(1);
555 assert_eq!(source_queue_nonces(&strategy.source_queue), vec![2, 3, 4, 5, 6, 7, 8, 9],);
556
557 strategy.remove_le_nonces_from_source_queue(5);
558 assert_eq!(source_queue_nonces(&strategy.source_queue), vec![6, 7, 8, 9],);
559
560 strategy.remove_le_nonces_from_source_queue(9);
561 assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
562
563 strategy.remove_le_nonces_from_source_queue(100);
564 assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
565 }
566
567 #[async_std::test]
568 async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() {
569 let source_header_1 = header_id(1);
570 let target_header_1 = header_id(1);
571
572 let mut state = TestRaceStateImpl {
574 best_finalized_source_header_id_at_source: Some(source_header_1),
575 best_finalized_source_header_id_at_best_target: Some(source_header_1),
576 best_target_header_id: Some(target_header_1),
577 best_finalized_target_header_id: Some(target_header_1),
578 nonces_to_submit: None,
579 nonces_to_submit_batch: None,
580 nonces_submitted: None,
581 };
582
583 let mut strategy = BasicStrategy::<TestMessageLane> {
585 source_queue: vec![(header_id(1), 1..=1)].into_iter().collect(),
586 best_target_nonce: Some(0),
587 _phantom: PhantomData,
588 };
589 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=1, ())),);
590
591 state.nonces_submitted = Some(1..=1);
593
594 let source_header_2 = header_id(2);
596 state.best_finalized_source_header_id_at_source = Some(source_header_2);
597 strategy.source_nonces_updated(
598 source_header_2,
599 SourceClientNonces { new_nonces: 2..=2, confirmed_nonce: None },
600 );
601 let target_header_2 = header_id(2);
604 state.best_target_header_id = Some(target_header_2);
605 strategy.best_target_nonces_updated(
606 TargetClientNonces { latest_nonce: 1, nonces_data: () },
607 &mut state,
608 );
609
610 strategy.best_target_nonces_updated(
612 TargetClientNonces { latest_nonce: 0, nonces_data: () },
613 &mut state,
614 );
615
616 let target_header_2_fork = header_id(2_1);
618 state.best_finalized_source_header_id_at_source = Some(source_header_2);
619 state.best_finalized_source_header_id_at_best_target = Some(source_header_2);
620 state.best_target_header_id = Some(target_header_2_fork);
621 state.best_finalized_target_header_id = Some(target_header_2_fork);
622 strategy.finalized_target_nonces_updated(
623 TargetClientNonces { latest_nonce: 0, nonces_data: () },
624 &mut state,
625 );
626
627 assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=2, ())),);
629 }
630}