referrerpolicy=no-referrer-when-downgrade

relay_utils/
lib.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utilities used by different relays.
18
19pub use bp_runtime::HeaderId;
20pub use error::Error;
21pub use relay_loop::{relay_loop, relay_metrics};
22pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto};
23use std::fmt::Debug;
24
25use async_trait::async_trait;
26use backoff::{backoff::Backoff, ExponentialBackoff};
27use futures::future::{BoxFuture, FutureExt};
28use std::time::Duration;
29use thiserror::Error;
30
31/// Default relay loop stall timeout. If transactions generated by relay are immortal, then
32/// this timeout is used.
33///
34/// There are no any strict requirements on block time in Substrate. But we assume here that all
35/// Substrate-based chains will be designed to produce relatively fast (compared to the slowest
36/// blockchains) blocks. So 1 hour seems to be a good guess for (even congested) chains to mine
37/// transaction, or remove it from the pool.
38pub const STALL_TIMEOUT: Duration = Duration::from_secs(60 * 60);
39
40/// Max delay after connection-unrelated error happened before we'll try the
41/// same request again.
42pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
43/// Delay after connection-related error happened before we'll try
44/// reconnection again.
45pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
46
47pub mod error;
48pub mod initialize;
49pub mod metrics;
50pub mod relay_loop;
51
52/// Block number traits shared by all chains that relay is able to serve.
53pub trait BlockNumberBase:
54	'static
55	+ From<u32>
56	+ UniqueSaturatedInto<u64>
57	+ Ord
58	+ Clone
59	+ Copy
60	+ Default
61	+ Send
62	+ Sync
63	+ std::fmt::Debug
64	+ std::fmt::Display
65	+ std::hash::Hash
66	+ std::ops::Add<Output = Self>
67	+ std::ops::Sub<Output = Self>
68	+ num_traits::CheckedSub
69	+ num_traits::Saturating
70	+ num_traits::Zero
71	+ num_traits::One
72{
73}
74
75impl<T> BlockNumberBase for T where
76	T: 'static
77		+ From<u32>
78		+ UniqueSaturatedInto<u64>
79		+ Ord
80		+ Clone
81		+ Copy
82		+ Default
83		+ Send
84		+ Sync
85		+ std::fmt::Debug
86		+ std::fmt::Display
87		+ std::hash::Hash
88		+ std::ops::Add<Output = Self>
89		+ std::ops::Sub<Output = Self>
90		+ num_traits::CheckedSub
91		+ num_traits::Saturating
92		+ num_traits::Zero
93		+ num_traits::One
94{
95}
96
97/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
98#[macro_export]
99macro_rules! bail_on_error {
100	($result: expr) => {
101		match $result {
102			(client, Ok(result)) => (client, result),
103			(client, Err(error)) => return (client, Err(error)),
104		}
105	};
106}
107
108/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
109#[macro_export]
110macro_rules! bail_on_arg_error {
111	($result: expr, $client: ident) => {
112		match $result {
113			Ok(result) => result,
114			Err(error) => return ($client, Err(error)),
115		}
116	};
117}
118
119/// Error type that can signal connection errors.
120pub trait MaybeConnectionError {
121	/// Returns true if error (maybe) represents connection error.
122	fn is_connection_error(&self) -> bool;
123}
124
125/// Final status of the tracked transaction.
126#[derive(Debug, Clone, Copy, PartialEq)]
127pub enum TrackedTransactionStatus<BlockId> {
128	/// Transaction has been lost.
129	Lost,
130	/// Transaction has been mined and finalized at given block.
131	Finalized(BlockId),
132}
133
134/// Transaction tracker.
135#[async_trait]
136pub trait TransactionTracker: Send {
137	/// Header id, used by the chain.
138	type HeaderId: Clone + Debug + Send;
139
140	/// Wait until transaction is either finalized or invalidated/lost.
141	async fn wait(self) -> TrackedTransactionStatus<Self::HeaderId>;
142}
143
144/// Future associated with `TransactionTracker`, monitoring the transaction status.
145pub type TrackedTransactionFuture<'a, T> =
146	BoxFuture<'a, TrackedTransactionStatus<<T as TransactionTracker>::HeaderId>>;
147
148/// Stringified error that may be either connection-related or not.
149#[derive(Error, Debug)]
150pub enum StringifiedMaybeConnectionError {
151	/// The error is connection-related error.
152	#[error("{0}")]
153	Connection(String),
154	/// The error is connection-unrelated error.
155	#[error("{0}")]
156	NonConnection(String),
157}
158
159impl StringifiedMaybeConnectionError {
160	/// Create new stringified connection error.
161	pub fn new(is_connection_error: bool, error: String) -> Self {
162		if is_connection_error {
163			StringifiedMaybeConnectionError::Connection(error)
164		} else {
165			StringifiedMaybeConnectionError::NonConnection(error)
166		}
167	}
168}
169
170impl MaybeConnectionError for StringifiedMaybeConnectionError {
171	fn is_connection_error(&self) -> bool {
172		match *self {
173			StringifiedMaybeConnectionError::Connection(_) => true,
174			StringifiedMaybeConnectionError::NonConnection(_) => false,
175		}
176	}
177}
178
179/// Exponential backoff for connection-unrelated errors retries.
180pub fn retry_backoff() -> ExponentialBackoff {
181	ExponentialBackoff {
182		// we do not want relayer to stop
183		max_elapsed_time: None,
184		max_interval: MAX_BACKOFF_INTERVAL,
185		..Default::default()
186	}
187}
188
189/// Compact format of IDs vector.
190pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id>) -> String {
191	const NTH_PROOF: &str = "we have checked len; qed";
192	match ids.len() {
193		0 => "<nothing>".into(),
194		1 => format!("{:?}", ids.next().expect(NTH_PROOF)),
195		2 => {
196			let id0 = ids.next().expect(NTH_PROOF);
197			let id1 = ids.next().expect(NTH_PROOF);
198			format!("[{id0:?}, {id1:?}]")
199		},
200		len => {
201			let id0 = ids.next().expect(NTH_PROOF);
202			let id_last = ids.last().expect(NTH_PROOF);
203			format!("{len}:[{id0:?} ... {id_last:?}]")
204		},
205	}
206}
207
208/// Stream that emits item every `timeout_ms` milliseconds.
209pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
210	futures::stream::unfold((), move |_| async move {
211		async_std::task::sleep(timeout).await;
212		Some(((), ()))
213	})
214}
215
216/// Which client has caused error.
217#[derive(Debug, Eq, Clone, Copy, PartialEq)]
218pub enum FailedClient {
219	/// It is the source client who has caused error.
220	Source,
221	/// It is the target client who has caused error.
222	Target,
223	/// Both clients are failing, or we just encountered some other error that
224	/// should be treated like that.
225	Both,
226}
227
228/// Future process result.
229#[derive(Debug, Clone, Copy)]
230pub enum ProcessFutureResult {
231	/// Future has been processed successfully.
232	Success,
233	/// Future has failed with non-connection error.
234	Failed,
235	/// Future has failed with connection error.
236	ConnectionFailed,
237}
238
239impl ProcessFutureResult {
240	/// Returns true if result is Success.
241	pub fn is_ok(self) -> bool {
242		match self {
243			ProcessFutureResult::Success => true,
244			ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
245		}
246	}
247
248	/// Returns `Ok(())` if future has succeeded.
249	/// Returns `Err(failed_client)` otherwise.
250	pub fn fail_if_error(self, failed_client: FailedClient) -> Result<(), FailedClient> {
251		if self.is_ok() {
252			Ok(())
253		} else {
254			Err(failed_client)
255		}
256	}
257
258	/// Returns Ok(true) if future has succeeded.
259	/// Returns Ok(false) if future has failed with non-connection error.
260	/// Returns Err if future is `ConnectionFailed`.
261	pub fn fail_if_connection_error(
262		self,
263		failed_client: FailedClient,
264	) -> Result<bool, FailedClient> {
265		match self {
266			ProcessFutureResult::Success => Ok(true),
267			ProcessFutureResult::Failed => Ok(false),
268			ProcessFutureResult::ConnectionFailed => Err(failed_client),
269		}
270	}
271}
272
273/// Process result of the future from a client.
274pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
275	result: Result<TResult, TError>,
276	retry_backoff: &mut ExponentialBackoff,
277	on_success: impl FnOnce(TResult),
278	go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
279	go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
280	error_pattern: impl FnOnce() -> String,
281) -> ProcessFutureResult
282where
283	TError: std::fmt::Debug + MaybeConnectionError,
284	TGoOfflineFuture: FutureExt,
285{
286	match result {
287		Ok(result) => {
288			on_success(result);
289			retry_backoff.reset();
290			ProcessFutureResult::Success
291		},
292		Err(error) if error.is_connection_error() => {
293			log::error!(
294				target: "bridge",
295				"{}: {:?}. Going to restart",
296				error_pattern(),
297				error,
298			);
299
300			retry_backoff.reset();
301			go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
302			ProcessFutureResult::ConnectionFailed
303		},
304		Err(error) => {
305			let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
306			log::error!(
307				target: "bridge",
308				"{}: {:?}. Retrying in {}",
309				error_pattern(),
310				error,
311				retry_delay.as_secs_f64(),
312			);
313
314			go_offline_future.set(go_offline(retry_delay).fuse());
315			ProcessFutureResult::Failed
316		},
317	}
318}