pub use bp_runtime::HeaderId;
pub use error::Error;
pub use relay_loop::{relay_loop, relay_metrics};
pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto};
use std::fmt::Debug;
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use thiserror::Error;
pub const STALL_TIMEOUT: Duration = Duration::from_secs(60 * 60);
pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
pub mod error;
pub mod initialize;
pub mod metrics;
pub mod relay_loop;
pub trait BlockNumberBase:
'static
+ From<u32>
+ UniqueSaturatedInto<u64>
+ Ord
+ Clone
+ Copy
+ Default
+ Send
+ Sync
+ std::fmt::Debug
+ std::fmt::Display
+ std::hash::Hash
+ std::ops::Add<Output = Self>
+ std::ops::Sub<Output = Self>
+ num_traits::CheckedSub
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One
{
}
impl<T> BlockNumberBase for T where
T: 'static
+ From<u32>
+ UniqueSaturatedInto<u64>
+ Ord
+ Clone
+ Copy
+ Default
+ Send
+ Sync
+ std::fmt::Debug
+ std::fmt::Display
+ std::hash::Hash
+ std::ops::Add<Output = Self>
+ std::ops::Sub<Output = Self>
+ num_traits::CheckedSub
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One
{
}
#[macro_export]
macro_rules! bail_on_error {
($result: expr) => {
match $result {
(client, Ok(result)) => (client, result),
(client, Err(error)) => return (client, Err(error)),
}
};
}
#[macro_export]
macro_rules! bail_on_arg_error {
($result: expr, $client: ident) => {
match $result {
Ok(result) => result,
Err(error) => return ($client, Err(error)),
}
};
}
pub trait MaybeConnectionError {
fn is_connection_error(&self) -> bool;
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TrackedTransactionStatus<BlockId> {
Lost,
Finalized(BlockId),
}
#[async_trait]
pub trait TransactionTracker: Send {
type HeaderId: Clone + Debug + Send;
async fn wait(self) -> TrackedTransactionStatus<Self::HeaderId>;
}
pub type TrackedTransactionFuture<'a, T> =
BoxFuture<'a, TrackedTransactionStatus<<T as TransactionTracker>::HeaderId>>;
#[derive(Error, Debug)]
pub enum StringifiedMaybeConnectionError {
#[error("{0}")]
Connection(String),
#[error("{0}")]
NonConnection(String),
}
impl StringifiedMaybeConnectionError {
pub fn new(is_connection_error: bool, error: String) -> Self {
if is_connection_error {
StringifiedMaybeConnectionError::Connection(error)
} else {
StringifiedMaybeConnectionError::NonConnection(error)
}
}
}
impl MaybeConnectionError for StringifiedMaybeConnectionError {
fn is_connection_error(&self) -> bool {
match *self {
StringifiedMaybeConnectionError::Connection(_) => true,
StringifiedMaybeConnectionError::NonConnection(_) => false,
}
}
}
pub fn retry_backoff() -> ExponentialBackoff {
ExponentialBackoff {
max_elapsed_time: None,
max_interval: MAX_BACKOFF_INTERVAL,
..Default::default()
}
}
pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id>) -> String {
const NTH_PROOF: &str = "we have checked len; qed";
match ids.len() {
0 => "<nothing>".into(),
1 => format!("{:?}", ids.next().expect(NTH_PROOF)),
2 => {
let id0 = ids.next().expect(NTH_PROOF);
let id1 = ids.next().expect(NTH_PROOF);
format!("[{id0:?}, {id1:?}]")
},
len => {
let id0 = ids.next().expect(NTH_PROOF);
let id_last = ids.last().expect(NTH_PROOF);
format!("{len}:[{id0:?} ... {id_last:?}]")
},
}
}
pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
async_std::task::sleep(timeout).await;
Some(((), ()))
})
}
#[derive(Debug, Eq, Clone, Copy, PartialEq)]
pub enum FailedClient {
Source,
Target,
Both,
}
#[derive(Debug, Clone, Copy)]
pub enum ProcessFutureResult {
Success,
Failed,
ConnectionFailed,
}
impl ProcessFutureResult {
pub fn is_ok(self) -> bool {
match self {
ProcessFutureResult::Success => true,
ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
}
}
pub fn fail_if_error(self, failed_client: FailedClient) -> Result<(), FailedClient> {
if self.is_ok() {
Ok(())
} else {
Err(failed_client)
}
}
pub fn fail_if_connection_error(
self,
failed_client: FailedClient,
) -> Result<bool, FailedClient> {
match self {
ProcessFutureResult::Success => Ok(true),
ProcessFutureResult::Failed => Ok(false),
ProcessFutureResult::ConnectionFailed => Err(failed_client),
}
}
}
pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) -> ProcessFutureResult
where
TError: std::fmt::Debug + MaybeConnectionError,
TGoOfflineFuture: FutureExt,
{
match result {
Ok(result) => {
on_success(result);
retry_backoff.reset();
ProcessFutureResult::Success
},
Err(error) if error.is_connection_error() => {
log::error!(
target: "bridge",
"{}: {:?}. Going to restart",
error_pattern(),
error,
);
retry_backoff.reset();
go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
ProcessFutureResult::ConnectionFailed
},
Err(error) => {
let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
log::error!(
target: "bridge",
"{}: {:?}. Retrying in {}",
error_pattern(),
error,
retry_delay.as_secs_f64(),
);
go_offline_future.set(go_offline(retry_delay).fuse());
ProcessFutureResult::Failed
},
}
}