1use crate::{
18 error::Error,
19 metrics::{Metric, MetricsAddress, MetricsParams},
20 FailedClient, MaybeConnectionError,
21};
22
23use async_trait::async_trait;
24use prometheus_endpoint::{init_prometheus, Registry};
25use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
26
27pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
29
30#[async_trait]
32pub trait Client: 'static + Clone + Send + Sync {
33 type Error: 'static + Debug + MaybeConnectionError + Send + Sync;
35
36 async fn reconnect(&mut self) -> Result<(), Self::Error>;
38
39 async fn reconnect_until_success(&mut self, delay: Duration) {
41 loop {
42 match self.reconnect().await {
43 Ok(()) => break,
44 Err(error) => {
45 log::warn!(
46 target: "bridge",
47 "Failed to reconnect to client. Going to retry in {}s: {:?}",
48 delay.as_secs(),
49 error,
50 );
51
52 async_std::task::sleep(delay).await;
53 },
54 }
55 }
56 }
57}
58
59#[async_trait]
60impl Client for () {
61 type Error = crate::StringifiedMaybeConnectionError;
62
63 async fn reconnect(&mut self) -> Result<(), Self::Error> {
64 Ok(())
65 }
66}
67
68pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
70 Loop { reconnect_delay: RECONNECT_DELAY, source_client, target_client, loop_metric: None }
71}
72
73pub fn relay_metrics(params: MetricsParams) -> LoopMetrics<(), (), ()> {
76 LoopMetrics {
77 relay_loop: Loop {
78 reconnect_delay: RECONNECT_DELAY,
79 source_client: (),
80 target_client: (),
81 loop_metric: None,
82 },
83 address: params.address,
84 registry: params.registry,
85 loop_metric: None,
86 }
87}
88
89pub struct Loop<SC, TC, LM> {
91 reconnect_delay: Duration,
92 source_client: SC,
93 target_client: TC,
94 loop_metric: Option<LM>,
95}
96
97pub struct LoopMetrics<SC, TC, LM> {
99 relay_loop: Loop<SC, TC, ()>,
100 address: Option<MetricsAddress>,
101 registry: Registry,
102 loop_metric: Option<LM>,
103}
104
105impl<SC, TC, LM> Loop<SC, TC, LM> {
106 #[must_use]
108 pub fn reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
109 self.reconnect_delay = reconnect_delay;
110 self
111 }
112
113 pub fn with_metrics(self, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
115 LoopMetrics {
116 relay_loop: Loop {
117 reconnect_delay: self.reconnect_delay,
118 source_client: self.source_client,
119 target_client: self.target_client,
120 loop_metric: None,
121 },
122 address: params.address,
123 registry: params.registry,
124 loop_metric: None,
125 }
126 }
127
128 pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), Error>
134 where
135 R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
136 F: 'static + Send + Future<Output = Result<(), FailedClient>>,
137 SC: 'static + Client,
138 TC: 'static + Client,
139 LM: 'static + Send + Clone,
140 {
141 let run_loop_task = async move {
142 crate::initialize::initialize_loop(loop_name);
143
144 loop {
145 let loop_metric = self.loop_metric.clone();
146 let future_result =
147 run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
148 let result = future_result.await;
149
150 match result {
151 Ok(()) => break,
152 Err(failed_client) => {
153 log::debug!(target: "bridge", "Restarting relay loop");
154
155 reconnect_failed_client(
156 failed_client,
157 self.reconnect_delay,
158 &mut self.source_client,
159 &mut self.target_client,
160 )
161 .await
162 },
163 }
164 }
165 Ok(())
166 };
167
168 async_std::task::spawn(run_loop_task).await
169 }
170}
171
172impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
173 pub fn loop_metric<NewLM: Metric>(
177 self,
178 metric: NewLM,
179 ) -> Result<LoopMetrics<SC, TC, NewLM>, Error> {
180 metric.register(&self.registry)?;
181
182 Ok(LoopMetrics {
183 relay_loop: self.relay_loop,
184 address: self.address,
185 registry: self.registry,
186 loop_metric: Some(metric),
187 })
188 }
189
190 pub fn into_params(self) -> MetricsParams {
192 MetricsParams { address: self.address, registry: self.registry }
193 }
194
195 pub async fn expose(self) -> Result<Loop<SC, TC, LM>, Error> {
199 if let Some(address) = self.address {
200 let socket_addr = SocketAddr::new(
201 address
202 .host
203 .parse()
204 .map_err(|err| Error::ExposingMetricsInvalidHost(address.host.clone(), err))?,
205 address.port,
206 );
207
208 let registry = self.registry;
209 async_std::task::spawn(async move {
210 let runtime =
211 match tokio::runtime::Builder::new_current_thread().enable_all().build() {
212 Ok(runtime) => runtime,
213 Err(err) => {
214 log::trace!(
215 target: "bridge-metrics",
216 "Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
217 err,
218 );
219 return
220 },
221 };
222
223 runtime.block_on(async move {
224 log::trace!(
225 target: "bridge-metrics",
226 "Starting prometheus endpoint at: {:?}",
227 socket_addr,
228 );
229 let result = init_prometheus(socket_addr, registry).await;
230 log::trace!(
231 target: "bridge-metrics",
232 "Prometheus endpoint has exited with result: {:?}",
233 result,
234 );
235 });
236 });
237 }
238
239 Ok(Loop {
240 reconnect_delay: self.relay_loop.reconnect_delay,
241 source_client: self.relay_loop.source_client,
242 target_client: self.relay_loop.target_client,
243 loop_metric: self.loop_metric,
244 })
245 }
246}
247
248pub async fn reconnect_failed_client(
250 failed_client: FailedClient,
251 reconnect_delay: Duration,
252 source_client: &mut impl Client,
253 target_client: &mut impl Client,
254) {
255 if failed_client == FailedClient::Source || failed_client == FailedClient::Both {
256 source_client.reconnect_until_success(reconnect_delay).await;
257 }
258
259 if failed_client == FailedClient::Target || failed_client == FailedClient::Both {
260 target_client.reconnect_until_success(reconnect_delay).await;
261 }
262}