use crate::{
error::Error,
metrics::{Metric, MetricsAddress, MetricsParams},
FailedClient, MaybeConnectionError,
};
use async_trait::async_trait;
use prometheus_endpoint::{init_prometheus, Registry};
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
#[async_trait]
pub trait Client: 'static + Clone + Send + Sync {
type Error: 'static + Debug + MaybeConnectionError + Send + Sync;
async fn reconnect(&mut self) -> Result<(), Self::Error>;
async fn reconnect_until_success(&mut self, delay: Duration) {
loop {
match self.reconnect().await {
Ok(()) => break,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to client. Going to retry in {}s: {:?}",
delay.as_secs(),
error,
);
async_std::task::sleep(delay).await;
},
}
}
}
}
#[async_trait]
impl Client for () {
type Error = crate::StringifiedMaybeConnectionError;
async fn reconnect(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
Loop { reconnect_delay: RECONNECT_DELAY, source_client, target_client, loop_metric: None }
}
pub fn relay_metrics(params: MetricsParams) -> LoopMetrics<(), (), ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
source_client: (),
target_client: (),
loop_metric: None,
},
address: params.address,
registry: params.registry,
loop_metric: None,
}
}
pub struct Loop<SC, TC, LM> {
reconnect_delay: Duration,
source_client: SC,
target_client: TC,
loop_metric: Option<LM>,
}
pub struct LoopMetrics<SC, TC, LM> {
relay_loop: Loop<SC, TC, ()>,
address: Option<MetricsAddress>,
registry: Registry,
loop_metric: Option<LM>,
}
impl<SC, TC, LM> Loop<SC, TC, LM> {
#[must_use]
pub fn reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
self.reconnect_delay = reconnect_delay;
self
}
pub fn with_metrics(self, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
source_client: self.source_client,
target_client: self.target_client,
loop_metric: None,
},
address: params.address,
registry: params.registry,
loop_metric: None,
}
}
pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), Error>
where
R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
F: 'static + Send + Future<Output = Result<(), FailedClient>>,
SC: 'static + Client,
TC: 'static + Client,
LM: 'static + Send + Clone,
{
let run_loop_task = async move {
crate::initialize::initialize_loop(loop_name);
loop {
let loop_metric = self.loop_metric.clone();
let future_result =
run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
let result = future_result.await;
match result {
Ok(()) => break,
Err(failed_client) => {
log::debug!(target: "bridge", "Restarting relay loop");
reconnect_failed_client(
failed_client,
self.reconnect_delay,
&mut self.source_client,
&mut self.target_client,
)
.await
},
}
}
Ok(())
};
async_std::task::spawn(run_loop_task).await
}
}
impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
pub fn loop_metric<NewLM: Metric>(
self,
metric: NewLM,
) -> Result<LoopMetrics<SC, TC, NewLM>, Error> {
metric.register(&self.registry)?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
address: self.address,
registry: self.registry,
loop_metric: Some(metric),
})
}
pub fn into_params(self) -> MetricsParams {
MetricsParams { address: self.address, registry: self.registry }
}
pub async fn expose(self) -> Result<Loop<SC, TC, LM>, Error> {
if let Some(address) = self.address {
let socket_addr = SocketAddr::new(
address
.host
.parse()
.map_err(|err| Error::ExposingMetricsInvalidHost(address.host.clone(), err))?,
address.port,
);
let registry = self.registry;
async_std::task::spawn(async move {
let runtime =
match tokio::runtime::Builder::new_current_thread().enable_all().build() {
Ok(runtime) => runtime,
Err(err) => {
log::trace!(
target: "bridge-metrics",
"Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
err,
);
return
},
};
runtime.block_on(async move {
log::trace!(
target: "bridge-metrics",
"Starting prometheus endpoint at: {:?}",
socket_addr,
);
let result = init_prometheus(socket_addr, registry).await;
log::trace!(
target: "bridge-metrics",
"Prometheus endpoint has exited with result: {:?}",
result,
);
});
});
}
Ok(Loop {
reconnect_delay: self.relay_loop.reconnect_delay,
source_client: self.relay_loop.source_client,
target_client: self.relay_loop.target_client,
loop_metric: self.loop_metric,
})
}
}
pub async fn reconnect_failed_client(
failed_client: FailedClient,
reconnect_delay: Duration,
source_client: &mut impl Client,
target_client: &mut impl Client,
) {
if failed_client == FailedClient::Source || failed_client == FailedClient::Both {
source_client.reconnect_until_success(reconnect_delay).await;
}
if failed_client == FailedClient::Target || failed_client == FailedClient::Both {
target_client.reconnect_until_success(reconnect_delay).await;
}
}