1use crate::{
17 message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
18 message_lane_loop::{
19 NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState,
20 TargetClient as MessageLaneTargetClient, TargetClientState,
21 },
22 message_race_loop::{
23 MessageRace, NoncesRange, SourceClient, SourceClientNonces, TargetClient,
24 TargetClientNonces,
25 },
26 message_race_strategy::BasicStrategy,
27 metrics::MessageLaneLoopMetrics,
28};
29
30use async_trait::async_trait;
31use bp_messages::MessageNonce;
32use futures::stream::FusedStream;
33use relay_utils::{FailedClient, TrackedTransactionStatus, TransactionTracker};
34use std::{marker::PhantomData, ops::RangeInclusive};
35
36type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy<
38 <P as MessageLane>::TargetHeaderNumber,
39 <P as MessageLane>::TargetHeaderHash,
40 <P as MessageLane>::SourceHeaderNumber,
41 <P as MessageLane>::SourceHeaderHash,
42 RangeInclusive<MessageNonce>,
43 <P as MessageLane>::MessagesReceivingProof,
44>;
45
46pub async fn run<P: MessageLane>(
48 source_client: impl MessageLaneSourceClient<P>,
49 source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
50 target_client: impl MessageLaneTargetClient<P>,
51 target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
52 metrics_msg: Option<MessageLaneLoopMetrics>,
53) -> Result<(), FailedClient> {
54 crate::message_race_loop::run(
55 ReceivingConfirmationsRaceSource {
56 client: target_client,
57 metrics_msg: metrics_msg.clone(),
58 _phantom: Default::default(),
59 },
60 target_state_updates,
61 ReceivingConfirmationsRaceTarget {
62 client: source_client,
63 metrics_msg,
64 _phantom: Default::default(),
65 },
66 source_state_updates,
67 ReceivingConfirmationsBasicStrategy::<P>::new(),
68 )
69 .await
70}
71
72pub async fn relay_messages_delivery_confirmation<P: MessageLane>(
74 source_client: impl MessageLaneSourceClient<P>,
75 target_client: impl MessageLaneTargetClient<P>,
76 at: TargetHeaderIdOf<P>,
77) -> Result<(), ()> {
78 let (at, proof) = target_client.prove_messages_receiving(at.clone()).await.map_err(|e| {
80 log::error!(
81 target: "bridge",
82 "Failed to generate messages delivery proof at {:?}: {:?}",
83 at,
84 e,
85 );
86 })?;
87 let tx_tracker =
89 source_client
90 .submit_messages_receiving_proof(None, at, proof)
91 .await
92 .map_err(|e| {
93 log::error!(
94 target: "bridge",
95 "Failed to submit messages delivery proof: {:?}",
96 e,
97 );
98 })?;
99
100 match tx_tracker.wait().await {
101 TrackedTransactionStatus::Finalized(_) => Ok(()),
102 TrackedTransactionStatus::Lost => {
103 log::error!("Transaction with messages delivery proof is considered lost");
104 Err(())
105 },
106 }
107}
108
109struct ReceivingConfirmationsRace<P>(std::marker::PhantomData<P>);
111
112impl<P: MessageLane> MessageRace for ReceivingConfirmationsRace<P> {
113 type SourceHeaderId = TargetHeaderIdOf<P>;
114 type TargetHeaderId = SourceHeaderIdOf<P>;
115
116 type MessageNonce = MessageNonce;
117 type Proof = P::MessagesReceivingProof;
118
119 fn source_name() -> String {
120 format!("{}::ReceivingConfirmationsDelivery", P::TARGET_NAME)
121 }
122
123 fn target_name() -> String {
124 format!("{}::ReceivingConfirmationsDelivery", P::SOURCE_NAME)
125 }
126}
127
128struct ReceivingConfirmationsRaceSource<P: MessageLane, C> {
130 client: C,
131 metrics_msg: Option<MessageLaneLoopMetrics>,
132 _phantom: PhantomData<P>,
133}
134
135#[async_trait]
136impl<P, C> SourceClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceSource<P, C>
137where
138 P: MessageLane,
139 C: MessageLaneTargetClient<P>,
140{
141 type Error = C::Error;
142 type NoncesRange = RangeInclusive<MessageNonce>;
143 type ProofParameters = ();
144
145 async fn nonces(
146 &self,
147 at_block: TargetHeaderIdOf<P>,
148 prev_latest_nonce: MessageNonce,
149 ) -> Result<(TargetHeaderIdOf<P>, SourceClientNonces<Self::NoncesRange>), Self::Error> {
150 let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
151 if let Some(metrics_msg) = self.metrics_msg.as_ref() {
152 metrics_msg.update_target_latest_received_nonce(latest_received_nonce);
153 }
154 Ok((
155 at_block,
156 SourceClientNonces {
157 new_nonces: prev_latest_nonce + 1..=latest_received_nonce,
158 confirmed_nonce: None,
159 },
160 ))
161 }
162
163 #[allow(clippy::unit_arg)]
164 async fn generate_proof(
165 &self,
166 at_block: TargetHeaderIdOf<P>,
167 nonces: RangeInclusive<MessageNonce>,
168 _proof_parameters: Self::ProofParameters,
169 ) -> Result<
170 (TargetHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesReceivingProof),
171 Self::Error,
172 > {
173 self.client
174 .prove_messages_receiving(at_block)
175 .await
176 .map(|(at_block, proof)| (at_block, nonces, proof))
177 }
178}
179
180struct ReceivingConfirmationsRaceTarget<P: MessageLane, C> {
182 client: C,
183 metrics_msg: Option<MessageLaneLoopMetrics>,
184 _phantom: PhantomData<P>,
185}
186
187#[async_trait]
188impl<P, C> TargetClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceTarget<P, C>
189where
190 P: MessageLane,
191 C: MessageLaneSourceClient<P>,
192{
193 type Error = C::Error;
194 type TargetNoncesData = ();
195 type BatchTransaction = C::BatchTransaction;
196 type TransactionTracker = C::TransactionTracker;
197
198 async fn require_source_header(
199 &self,
200 id: TargetHeaderIdOf<P>,
201 ) -> Result<Option<C::BatchTransaction>, Self::Error> {
202 self.client.require_target_header_on_source(id).await
203 }
204
205 async fn nonces(
206 &self,
207 at_block: SourceHeaderIdOf<P>,
208 update_metrics: bool,
209 ) -> Result<(SourceHeaderIdOf<P>, TargetClientNonces<()>), Self::Error> {
210 let (at_block, latest_confirmed_nonce) =
211 self.client.latest_confirmed_received_nonce(at_block).await?;
212 if update_metrics {
213 if let Some(metrics_msg) = self.metrics_msg.as_ref() {
214 metrics_msg.update_source_latest_confirmed_nonce(latest_confirmed_nonce);
215 }
216 }
217 Ok((at_block, TargetClientNonces { latest_nonce: latest_confirmed_nonce, nonces_data: () }))
218 }
219
220 async fn submit_proof(
221 &self,
222 maybe_batch_tx: Option<Self::BatchTransaction>,
223 generated_at_block: TargetHeaderIdOf<P>,
224 nonces: RangeInclusive<MessageNonce>,
225 proof: P::MessagesReceivingProof,
226 ) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
227 let tx_tracker = self
228 .client
229 .submit_messages_receiving_proof(maybe_batch_tx, generated_at_block, proof)
230 .await?;
231 Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
232 }
233}
234
235impl NoncesRange for RangeInclusive<MessageNonce> {
236 fn begin(&self) -> MessageNonce {
237 *RangeInclusive::<MessageNonce>::start(self)
238 }
239
240 fn end(&self) -> MessageNonce {
241 *RangeInclusive::<MessageNonce>::end(self)
242 }
243
244 fn greater_than(self, nonce: MessageNonce) -> Option<Self> {
245 let next_nonce = nonce + 1;
246 let end = *self.end();
247 if next_nonce > end {
248 None
249 } else {
250 Some(std::cmp::max(self.begin(), next_nonce)..=end)
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn range_inclusive_works_as_nonces_range() {
261 let range = 20..=30;
262
263 assert_eq!(NoncesRange::begin(&range), 20);
264 assert_eq!(NoncesRange::end(&range), 30);
265 assert_eq!(range.clone().greater_than(10), Some(20..=30));
266 assert_eq!(range.clone().greater_than(19), Some(20..=30));
267 assert_eq!(range.clone().greater_than(20), Some(21..=30));
268 assert_eq!(range.clone().greater_than(25), Some(26..=30));
269 assert_eq!(range.clone().greater_than(29), Some(30..=30));
270 assert_eq!(range.greater_than(30), None);
271 }
272}