1pub use bp_runtime::HeaderId;
20pub use error::Error;
21pub use relay_loop::{relay_loop, relay_metrics};
22pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto};
23use std::fmt::Debug;
24
25use async_trait::async_trait;
26use backoff::{backoff::Backoff, ExponentialBackoff};
27use futures::future::{BoxFuture, FutureExt};
28use std::time::Duration;
29use thiserror::Error;
30
31pub const STALL_TIMEOUT: Duration = Duration::from_secs(60 * 60);
39
40pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
43pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
46
47pub mod error;
48pub mod initialize;
49pub mod metrics;
50pub mod relay_loop;
51
52pub trait BlockNumberBase:
54 'static
55 + From<u32>
56 + UniqueSaturatedInto<u64>
57 + Ord
58 + Clone
59 + Copy
60 + Default
61 + Send
62 + Sync
63 + std::fmt::Debug
64 + std::fmt::Display
65 + std::hash::Hash
66 + std::ops::Add<Output = Self>
67 + std::ops::Sub<Output = Self>
68 + num_traits::CheckedSub
69 + num_traits::Saturating
70 + num_traits::Zero
71 + num_traits::One
72{
73}
74
75impl<T> BlockNumberBase for T where
76 T: 'static
77 + From<u32>
78 + UniqueSaturatedInto<u64>
79 + Ord
80 + Clone
81 + Copy
82 + Default
83 + Send
84 + Sync
85 + std::fmt::Debug
86 + std::fmt::Display
87 + std::hash::Hash
88 + std::ops::Add<Output = Self>
89 + std::ops::Sub<Output = Self>
90 + num_traits::CheckedSub
91 + num_traits::Saturating
92 + num_traits::Zero
93 + num_traits::One
94{
95}
96
97#[macro_export]
99macro_rules! bail_on_error {
100 ($result: expr) => {
101 match $result {
102 (client, Ok(result)) => (client, result),
103 (client, Err(error)) => return (client, Err(error)),
104 }
105 };
106}
107
108#[macro_export]
110macro_rules! bail_on_arg_error {
111 ($result: expr, $client: ident) => {
112 match $result {
113 Ok(result) => result,
114 Err(error) => return ($client, Err(error)),
115 }
116 };
117}
118
119pub trait MaybeConnectionError {
121 fn is_connection_error(&self) -> bool;
123}
124
125#[derive(Debug, Clone, Copy, PartialEq)]
127pub enum TrackedTransactionStatus<BlockId> {
128 Lost,
130 Finalized(BlockId),
132}
133
134#[async_trait]
136pub trait TransactionTracker: Send {
137 type HeaderId: Clone + Debug + Send;
139
140 async fn wait(self) -> TrackedTransactionStatus<Self::HeaderId>;
142}
143
144pub type TrackedTransactionFuture<'a, T> =
146 BoxFuture<'a, TrackedTransactionStatus<<T as TransactionTracker>::HeaderId>>;
147
148#[derive(Error, Debug)]
150pub enum StringifiedMaybeConnectionError {
151 #[error("{0}")]
153 Connection(String),
154 #[error("{0}")]
156 NonConnection(String),
157}
158
159impl StringifiedMaybeConnectionError {
160 pub fn new(is_connection_error: bool, error: String) -> Self {
162 if is_connection_error {
163 StringifiedMaybeConnectionError::Connection(error)
164 } else {
165 StringifiedMaybeConnectionError::NonConnection(error)
166 }
167 }
168}
169
170impl MaybeConnectionError for StringifiedMaybeConnectionError {
171 fn is_connection_error(&self) -> bool {
172 match *self {
173 StringifiedMaybeConnectionError::Connection(_) => true,
174 StringifiedMaybeConnectionError::NonConnection(_) => false,
175 }
176 }
177}
178
179pub fn retry_backoff() -> ExponentialBackoff {
181 ExponentialBackoff {
182 max_elapsed_time: None,
184 max_interval: MAX_BACKOFF_INTERVAL,
185 ..Default::default()
186 }
187}
188
189pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id>) -> String {
191 const NTH_PROOF: &str = "we have checked len; qed";
192 match ids.len() {
193 0 => "<nothing>".into(),
194 1 => format!("{:?}", ids.next().expect(NTH_PROOF)),
195 2 => {
196 let id0 = ids.next().expect(NTH_PROOF);
197 let id1 = ids.next().expect(NTH_PROOF);
198 format!("[{id0:?}, {id1:?}]")
199 },
200 len => {
201 let id0 = ids.next().expect(NTH_PROOF);
202 let id_last = ids.last().expect(NTH_PROOF);
203 format!("{len}:[{id0:?} ... {id_last:?}]")
204 },
205 }
206}
207
208pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
210 futures::stream::unfold((), move |_| async move {
211 async_std::task::sleep(timeout).await;
212 Some(((), ()))
213 })
214}
215
216#[derive(Debug, Eq, Clone, Copy, PartialEq)]
218pub enum FailedClient {
219 Source,
221 Target,
223 Both,
226}
227
228#[derive(Debug, Clone, Copy)]
230pub enum ProcessFutureResult {
231 Success,
233 Failed,
235 ConnectionFailed,
237}
238
239impl ProcessFutureResult {
240 pub fn is_ok(self) -> bool {
242 match self {
243 ProcessFutureResult::Success => true,
244 ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
245 }
246 }
247
248 pub fn fail_if_error(self, failed_client: FailedClient) -> Result<(), FailedClient> {
251 if self.is_ok() {
252 Ok(())
253 } else {
254 Err(failed_client)
255 }
256 }
257
258 pub fn fail_if_connection_error(
262 self,
263 failed_client: FailedClient,
264 ) -> Result<bool, FailedClient> {
265 match self {
266 ProcessFutureResult::Success => Ok(true),
267 ProcessFutureResult::Failed => Ok(false),
268 ProcessFutureResult::ConnectionFailed => Err(failed_client),
269 }
270 }
271}
272
273pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
275 result: Result<TResult, TError>,
276 retry_backoff: &mut ExponentialBackoff,
277 on_success: impl FnOnce(TResult),
278 go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
279 go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
280 error_pattern: impl FnOnce() -> String,
281) -> ProcessFutureResult
282where
283 TError: std::fmt::Debug + MaybeConnectionError,
284 TGoOfflineFuture: FutureExt,
285{
286 match result {
287 Ok(result) => {
288 on_success(result);
289 retry_backoff.reset();
290 ProcessFutureResult::Success
291 },
292 Err(error) if error.is_connection_error() => {
293 log::error!(
294 target: "bridge",
295 "{}: {:?}. Going to restart",
296 error_pattern(),
297 error,
298 );
299
300 retry_backoff.reset();
301 go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
302 ProcessFutureResult::ConnectionFailed
303 },
304 Err(error) => {
305 let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
306 log::error!(
307 target: "bridge",
308 "{}: {:?}. Retrying in {}",
309 error_pattern(),
310 error,
311 retry_delay.as_secs_f64(),
312 );
313
314 go_offline_future.set(go_offline(retry_delay).fuse());
315 ProcessFutureResult::Failed
316 },
317 }
318}