referrerpolicy=no-referrer-when-downgrade

messages_relay/
message_race_receiving.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14//! Message receiving race delivers proof-of-messages-delivery from "lane.target" to "lane.source".
15
16use crate::{
17	message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
18	message_lane_loop::{
19		NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState,
20		TargetClient as MessageLaneTargetClient, TargetClientState,
21	},
22	message_race_loop::{
23		MessageRace, NoncesRange, SourceClient, SourceClientNonces, TargetClient,
24		TargetClientNonces,
25	},
26	message_race_strategy::BasicStrategy,
27	metrics::MessageLaneLoopMetrics,
28};
29
30use async_trait::async_trait;
31use bp_messages::MessageNonce;
32use futures::stream::FusedStream;
33use relay_utils::{FailedClient, TrackedTransactionStatus, TransactionTracker};
34use std::{marker::PhantomData, ops::RangeInclusive};
35
36/// Message receiving confirmations delivery strategy.
37type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy<
38	<P as MessageLane>::TargetHeaderNumber,
39	<P as MessageLane>::TargetHeaderHash,
40	<P as MessageLane>::SourceHeaderNumber,
41	<P as MessageLane>::SourceHeaderHash,
42	RangeInclusive<MessageNonce>,
43	<P as MessageLane>::MessagesReceivingProof,
44>;
45
46/// Run receiving confirmations race.
47pub async fn run<P: MessageLane>(
48	source_client: impl MessageLaneSourceClient<P>,
49	source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
50	target_client: impl MessageLaneTargetClient<P>,
51	target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
52	metrics_msg: Option<MessageLaneLoopMetrics>,
53) -> Result<(), FailedClient> {
54	crate::message_race_loop::run(
55		ReceivingConfirmationsRaceSource {
56			client: target_client,
57			metrics_msg: metrics_msg.clone(),
58			_phantom: Default::default(),
59		},
60		target_state_updates,
61		ReceivingConfirmationsRaceTarget {
62			client: source_client,
63			metrics_msg,
64			_phantom: Default::default(),
65		},
66		source_state_updates,
67		ReceivingConfirmationsBasicStrategy::<P>::new(),
68	)
69	.await
70}
71
72/// Relay messages delivery confirmation.
73pub async fn relay_messages_delivery_confirmation<P: MessageLane>(
74	source_client: impl MessageLaneSourceClient<P>,
75	target_client: impl MessageLaneTargetClient<P>,
76	at: TargetHeaderIdOf<P>,
77) -> Result<(), ()> {
78	// prepare messages delivery proof
79	let (at, proof) = target_client.prove_messages_receiving(at.clone()).await.map_err(|e| {
80		log::error!(
81			target: "bridge",
82			"Failed to generate messages delivery proof at {:?}: {:?}",
83			at,
84			e,
85		);
86	})?;
87	// submit messages delivery proof to the source node
88	let tx_tracker =
89		source_client
90			.submit_messages_receiving_proof(None, at, proof)
91			.await
92			.map_err(|e| {
93				log::error!(
94					target: "bridge",
95					"Failed to submit messages delivery proof: {:?}",
96					e,
97				);
98			})?;
99
100	match tx_tracker.wait().await {
101		TrackedTransactionStatus::Finalized(_) => Ok(()),
102		TrackedTransactionStatus::Lost => {
103			log::error!("Transaction with messages delivery proof is considered lost");
104			Err(())
105		},
106	}
107}
108
109/// Messages receiving confirmations race.
110struct ReceivingConfirmationsRace<P>(std::marker::PhantomData<P>);
111
112impl<P: MessageLane> MessageRace for ReceivingConfirmationsRace<P> {
113	type SourceHeaderId = TargetHeaderIdOf<P>;
114	type TargetHeaderId = SourceHeaderIdOf<P>;
115
116	type MessageNonce = MessageNonce;
117	type Proof = P::MessagesReceivingProof;
118
119	fn source_name() -> String {
120		format!("{}::ReceivingConfirmationsDelivery", P::TARGET_NAME)
121	}
122
123	fn target_name() -> String {
124		format!("{}::ReceivingConfirmationsDelivery", P::SOURCE_NAME)
125	}
126}
127
128/// Message receiving confirmations race source, which is a target of the lane.
129struct ReceivingConfirmationsRaceSource<P: MessageLane, C> {
130	client: C,
131	metrics_msg: Option<MessageLaneLoopMetrics>,
132	_phantom: PhantomData<P>,
133}
134
135#[async_trait]
136impl<P, C> SourceClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceSource<P, C>
137where
138	P: MessageLane,
139	C: MessageLaneTargetClient<P>,
140{
141	type Error = C::Error;
142	type NoncesRange = RangeInclusive<MessageNonce>;
143	type ProofParameters = ();
144
145	async fn nonces(
146		&self,
147		at_block: TargetHeaderIdOf<P>,
148		prev_latest_nonce: MessageNonce,
149	) -> Result<(TargetHeaderIdOf<P>, SourceClientNonces<Self::NoncesRange>), Self::Error> {
150		let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
151		if let Some(metrics_msg) = self.metrics_msg.as_ref() {
152			metrics_msg.update_target_latest_received_nonce(latest_received_nonce);
153		}
154		Ok((
155			at_block,
156			SourceClientNonces {
157				new_nonces: prev_latest_nonce + 1..=latest_received_nonce,
158				confirmed_nonce: None,
159			},
160		))
161	}
162
163	#[allow(clippy::unit_arg)]
164	async fn generate_proof(
165		&self,
166		at_block: TargetHeaderIdOf<P>,
167		nonces: RangeInclusive<MessageNonce>,
168		_proof_parameters: Self::ProofParameters,
169	) -> Result<
170		(TargetHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesReceivingProof),
171		Self::Error,
172	> {
173		self.client
174			.prove_messages_receiving(at_block)
175			.await
176			.map(|(at_block, proof)| (at_block, nonces, proof))
177	}
178}
179
180/// Message receiving confirmations race target, which is a source of the lane.
181struct ReceivingConfirmationsRaceTarget<P: MessageLane, C> {
182	client: C,
183	metrics_msg: Option<MessageLaneLoopMetrics>,
184	_phantom: PhantomData<P>,
185}
186
187#[async_trait]
188impl<P, C> TargetClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceTarget<P, C>
189where
190	P: MessageLane,
191	C: MessageLaneSourceClient<P>,
192{
193	type Error = C::Error;
194	type TargetNoncesData = ();
195	type BatchTransaction = C::BatchTransaction;
196	type TransactionTracker = C::TransactionTracker;
197
198	async fn require_source_header(
199		&self,
200		id: TargetHeaderIdOf<P>,
201	) -> Result<Option<C::BatchTransaction>, Self::Error> {
202		self.client.require_target_header_on_source(id).await
203	}
204
205	async fn nonces(
206		&self,
207		at_block: SourceHeaderIdOf<P>,
208		update_metrics: bool,
209	) -> Result<(SourceHeaderIdOf<P>, TargetClientNonces<()>), Self::Error> {
210		let (at_block, latest_confirmed_nonce) =
211			self.client.latest_confirmed_received_nonce(at_block).await?;
212		if update_metrics {
213			if let Some(metrics_msg) = self.metrics_msg.as_ref() {
214				metrics_msg.update_source_latest_confirmed_nonce(latest_confirmed_nonce);
215			}
216		}
217		Ok((at_block, TargetClientNonces { latest_nonce: latest_confirmed_nonce, nonces_data: () }))
218	}
219
220	async fn submit_proof(
221		&self,
222		maybe_batch_tx: Option<Self::BatchTransaction>,
223		generated_at_block: TargetHeaderIdOf<P>,
224		nonces: RangeInclusive<MessageNonce>,
225		proof: P::MessagesReceivingProof,
226	) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
227		let tx_tracker = self
228			.client
229			.submit_messages_receiving_proof(maybe_batch_tx, generated_at_block, proof)
230			.await?;
231		Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
232	}
233}
234
235impl NoncesRange for RangeInclusive<MessageNonce> {
236	fn begin(&self) -> MessageNonce {
237		*RangeInclusive::<MessageNonce>::start(self)
238	}
239
240	fn end(&self) -> MessageNonce {
241		*RangeInclusive::<MessageNonce>::end(self)
242	}
243
244	fn greater_than(self, nonce: MessageNonce) -> Option<Self> {
245		let next_nonce = nonce + 1;
246		let end = *self.end();
247		if next_nonce > end {
248			None
249		} else {
250			Some(std::cmp::max(self.begin(), next_nonce)..=end)
251		}
252	}
253}
254
255#[cfg(test)]
256mod tests {
257	use super::*;
258
259	#[test]
260	fn range_inclusive_works_as_nonces_range() {
261		let range = 20..=30;
262
263		assert_eq!(NoncesRange::begin(&range), 20);
264		assert_eq!(NoncesRange::end(&range), 30);
265		assert_eq!(range.clone().greater_than(10), Some(20..=30));
266		assert_eq!(range.clone().greater_than(19), Some(20..=30));
267		assert_eq!(range.clone().greater_than(20), Some(21..=30));
268		assert_eq!(range.clone().greater_than(25), Some(26..=30));
269		assert_eq!(range.clone().greater_than(29), Some(30..=30));
270		assert_eq!(range.greater_than(30), None);
271	}
272}