1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
34// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
89// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
1314// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
1617//! Utilities used by different relays.
1819pub use bp_runtime::HeaderId;
20pub use error::Error;
21pub use relay_loop::{relay_loop, relay_metrics};
22pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto};
23use std::fmt::Debug;
2425use async_trait::async_trait;
26use backoff::{backoff::Backoff, ExponentialBackoff};
27use futures::future::{BoxFuture, FutureExt};
28use std::time::Duration;
29use thiserror::Error;
3031/// Default relay loop stall timeout. If transactions generated by relay are immortal, then
32/// this timeout is used.
33///
34/// There are no any strict requirements on block time in Substrate. But we assume here that all
35/// Substrate-based chains will be designed to produce relatively fast (compared to the slowest
36/// blockchains) blocks. So 1 hour seems to be a good guess for (even congested) chains to mine
37/// transaction, or remove it from the pool.
38pub const STALL_TIMEOUT: Duration = Duration::from_secs(60 * 60);
3940/// Max delay after connection-unrelated error happened before we'll try the
41/// same request again.
42pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
43/// Delay after connection-related error happened before we'll try
44/// reconnection again.
45pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
4647pub mod error;
48pub mod initialize;
49pub mod metrics;
50pub mod relay_loop;
5152/// Block number traits shared by all chains that relay is able to serve.
53pub trait BlockNumberBase:
54'static
55+ From<u32>
56 + UniqueSaturatedInto<u64>
57 + Ord
58 + Clone
59 + Copy
60 + Default
61 + Send
62 + Sync
63 + std::fmt::Debug
64 + std::fmt::Display
65 + std::hash::Hash
66 + std::ops::Add<Output = Self>
67 + std::ops::Sub<Output = Self>
68 + num_traits::CheckedSub
69 + num_traits::Saturating
70 + num_traits::Zero
71 + num_traits::One
72{
73}
7475impl<T> BlockNumberBase for T where
76T: 'static
77+ From<u32>
78 + UniqueSaturatedInto<u64>
79 + Ord
80 + Clone
81 + Copy
82 + Default
83 + Send
84 + Sync
85 + std::fmt::Debug
86 + std::fmt::Display
87 + std::hash::Hash
88 + std::ops::Add<Output = Self>
89 + std::ops::Sub<Output = Self>
90 + num_traits::CheckedSub
91 + num_traits::Saturating
92 + num_traits::Zero
93 + num_traits::One
94{
95}
9697/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
98#[macro_export]
99macro_rules! bail_on_error {
100 ($result: expr) => {
101match $result {
102 (client, Ok(result)) => (client, result),
103 (client, Err(error)) => return (client, Err(error)),
104 }
105 };
106}
107108/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
109#[macro_export]
110macro_rules! bail_on_arg_error {
111 ($result: expr, $client: ident) => {
112match $result {
113Ok(result) => result,
114Err(error) => return ($client, Err(error)),
115 }
116 };
117}
118119/// Error type that can signal connection errors.
120pub trait MaybeConnectionError {
121/// Returns true if error (maybe) represents connection error.
122fn is_connection_error(&self) -> bool;
123}
124125/// Final status of the tracked transaction.
126#[derive(Debug, Clone, Copy, PartialEq)]
127pub enum TrackedTransactionStatus<BlockId> {
128/// Transaction has been lost.
129Lost,
130/// Transaction has been mined and finalized at given block.
131Finalized(BlockId),
132}
133134/// Transaction tracker.
135#[async_trait]
136pub trait TransactionTracker: Send {
137/// Header id, used by the chain.
138type HeaderId: Clone + Debug + Send;
139140/// Wait until transaction is either finalized or invalidated/lost.
141async fn wait(self) -> TrackedTransactionStatus<Self::HeaderId>;
142}
143144/// Future associated with `TransactionTracker`, monitoring the transaction status.
145pub type TrackedTransactionFuture<'a, T> =
146 BoxFuture<'a, TrackedTransactionStatus<<T as TransactionTracker>::HeaderId>>;
147148/// Stringified error that may be either connection-related or not.
149#[derive(Error, Debug)]
150pub enum StringifiedMaybeConnectionError {
151/// The error is connection-related error.
152#[error("{0}")]
153Connection(String),
154/// The error is connection-unrelated error.
155#[error("{0}")]
156NonConnection(String),
157}
158159impl StringifiedMaybeConnectionError {
160/// Create new stringified connection error.
161pub fn new(is_connection_error: bool, error: String) -> Self {
162if is_connection_error {
163 StringifiedMaybeConnectionError::Connection(error)
164 } else {
165 StringifiedMaybeConnectionError::NonConnection(error)
166 }
167 }
168}
169170impl MaybeConnectionError for StringifiedMaybeConnectionError {
171fn is_connection_error(&self) -> bool {
172match *self {
173 StringifiedMaybeConnectionError::Connection(_) => true,
174 StringifiedMaybeConnectionError::NonConnection(_) => false,
175 }
176 }
177}
178179/// Exponential backoff for connection-unrelated errors retries.
180pub fn retry_backoff() -> ExponentialBackoff {
181 ExponentialBackoff {
182// we do not want relayer to stop
183max_elapsed_time: None,
184 max_interval: MAX_BACKOFF_INTERVAL,
185 ..Default::default()
186 }
187}
188189/// Compact format of IDs vector.
190pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id>) -> String {
191const NTH_PROOF: &str = "we have checked len; qed";
192match ids.len() {
1930 => "<nothing>".into(),
1941 => format!("{:?}", ids.next().expect(NTH_PROOF)),
1952 => {
196let id0 = ids.next().expect(NTH_PROOF);
197let id1 = ids.next().expect(NTH_PROOF);
198format!("[{id0:?}, {id1:?}]")
199 },
200 len => {
201let id0 = ids.next().expect(NTH_PROOF);
202let id_last = ids.last().expect(NTH_PROOF);
203format!("{len}:[{id0:?} ... {id_last:?}]")
204 },
205 }
206}
207208/// Stream that emits item every `timeout_ms` milliseconds.
209pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
210 futures::stream::unfold((), move |_| async move {
211 async_std::task::sleep(timeout).await;
212Some(((), ()))
213 })
214}
215216/// Which client has caused error.
217#[derive(Debug, Eq, Clone, Copy, PartialEq)]
218pub enum FailedClient {
219/// It is the source client who has caused error.
220Source,
221/// It is the target client who has caused error.
222Target,
223/// Both clients are failing, or we just encountered some other error that
224 /// should be treated like that.
225Both,
226}
227228/// Future process result.
229#[derive(Debug, Clone, Copy)]
230pub enum ProcessFutureResult {
231/// Future has been processed successfully.
232Success,
233/// Future has failed with non-connection error.
234Failed,
235/// Future has failed with connection error.
236ConnectionFailed,
237}
238239impl ProcessFutureResult {
240/// Returns true if result is Success.
241pub fn is_ok(self) -> bool {
242match self {
243 ProcessFutureResult::Success => true,
244 ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
245 }
246 }
247248/// Returns `Ok(())` if future has succeeded.
249 /// Returns `Err(failed_client)` otherwise.
250pub fn fail_if_error(self, failed_client: FailedClient) -> Result<(), FailedClient> {
251if self.is_ok() {
252Ok(())
253 } else {
254Err(failed_client)
255 }
256 }
257258/// Returns Ok(true) if future has succeeded.
259 /// Returns Ok(false) if future has failed with non-connection error.
260 /// Returns Err if future is `ConnectionFailed`.
261pub fn fail_if_connection_error(
262self,
263 failed_client: FailedClient,
264 ) -> Result<bool, FailedClient> {
265match self {
266 ProcessFutureResult::Success => Ok(true),
267 ProcessFutureResult::Failed => Ok(false),
268 ProcessFutureResult::ConnectionFailed => Err(failed_client),
269 }
270 }
271}
272273/// Process result of the future from a client.
274pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
275 result: Result<TResult, TError>,
276 retry_backoff: &mut ExponentialBackoff,
277 on_success: impl FnOnce(TResult),
278 go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
279 go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
280 error_pattern: impl FnOnce() -> String,
281) -> ProcessFutureResult
282where
283TError: std::fmt::Debug + MaybeConnectionError,
284 TGoOfflineFuture: FutureExt,
285{
286match result {
287Ok(result) => {
288 on_success(result);
289 retry_backoff.reset();
290 ProcessFutureResult::Success
291 },
292Err(error) if error.is_connection_error() => {
293log::error!(
294 target: "bridge",
295"{}: {:?}. Going to restart",
296 error_pattern(),
297 error,
298 );
299300 retry_backoff.reset();
301 go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
302 ProcessFutureResult::ConnectionFailed
303 },
304Err(error) => {
305let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
306log::error!(
307 target: "bridge",
308"{}: {:?}. Retrying in {}",
309 error_pattern(),
310 error,
311 retry_delay.as_secs_f64(),
312 );
313314 go_offline_future.set(go_offline(retry_delay).fuse());
315 ProcessFutureResult::Failed
316 },
317 }
318}