referrerpolicy=no-referrer-when-downgrade

relay_utils/
relay_loop.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	error::Error,
19	metrics::{Metric, MetricsAddress, MetricsParams},
20	FailedClient, MaybeConnectionError,
21};
22
23use async_trait::async_trait;
24use prometheus_endpoint::{init_prometheus, Registry};
25use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
26
27/// Default pause between reconnect attempts.
28pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
29
30/// Basic blockchain client from relay perspective.
31#[async_trait]
32pub trait Client: 'static + Clone + Send + Sync {
33	/// Type of error these clients returns.
34	type Error: 'static + Debug + MaybeConnectionError + Send + Sync;
35
36	/// Try to reconnect to source node.
37	async fn reconnect(&mut self) -> Result<(), Self::Error>;
38
39	/// Try to reconnect to the source node in an infinite loop until it succeeds.
40	async fn reconnect_until_success(&mut self, delay: Duration) {
41		loop {
42			match self.reconnect().await {
43				Ok(()) => break,
44				Err(error) => {
45					log::warn!(
46						target: "bridge",
47						"Failed to reconnect to client. Going to retry in {}s: {:?}",
48						delay.as_secs(),
49						error,
50					);
51
52					async_std::task::sleep(delay).await;
53				},
54			}
55		}
56	}
57}
58
59#[async_trait]
60impl Client for () {
61	type Error = crate::StringifiedMaybeConnectionError;
62
63	async fn reconnect(&mut self) -> Result<(), Self::Error> {
64		Ok(())
65	}
66}
67
68/// Returns generic loop that may be customized and started.
69pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
70	Loop { reconnect_delay: RECONNECT_DELAY, source_client, target_client, loop_metric: None }
71}
72
73/// Returns generic relay loop metrics that may be customized and used in one or several relay
74/// loops.
75pub fn relay_metrics(params: MetricsParams) -> LoopMetrics<(), (), ()> {
76	LoopMetrics {
77		relay_loop: Loop {
78			reconnect_delay: RECONNECT_DELAY,
79			source_client: (),
80			target_client: (),
81			loop_metric: None,
82		},
83		address: params.address,
84		registry: params.registry,
85		loop_metric: None,
86	}
87}
88
89/// Generic relay loop.
90pub struct Loop<SC, TC, LM> {
91	reconnect_delay: Duration,
92	source_client: SC,
93	target_client: TC,
94	loop_metric: Option<LM>,
95}
96
97/// Relay loop metrics builder.
98pub struct LoopMetrics<SC, TC, LM> {
99	relay_loop: Loop<SC, TC, ()>,
100	address: Option<MetricsAddress>,
101	registry: Registry,
102	loop_metric: Option<LM>,
103}
104
105impl<SC, TC, LM> Loop<SC, TC, LM> {
106	/// Customize delay between reconnect attempts.
107	#[must_use]
108	pub fn reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
109		self.reconnect_delay = reconnect_delay;
110		self
111	}
112
113	/// Start building loop metrics using given prefix.
114	pub fn with_metrics(self, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
115		LoopMetrics {
116			relay_loop: Loop {
117				reconnect_delay: self.reconnect_delay,
118				source_client: self.source_client,
119				target_client: self.target_client,
120				loop_metric: None,
121			},
122			address: params.address,
123			registry: params.registry,
124			loop_metric: None,
125		}
126	}
127
128	/// Run relay loop.
129	///
130	/// This function represents an outer loop, which in turn calls provided `run_loop` function to
131	/// do actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
132	/// target or both) and calls `run_loop` again.
133	pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), Error>
134	where
135		R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
136		F: 'static + Send + Future<Output = Result<(), FailedClient>>,
137		SC: 'static + Client,
138		TC: 'static + Client,
139		LM: 'static + Send + Clone,
140	{
141		let run_loop_task = async move {
142			crate::initialize::initialize_loop(loop_name);
143
144			loop {
145				let loop_metric = self.loop_metric.clone();
146				let future_result =
147					run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
148				let result = future_result.await;
149
150				match result {
151					Ok(()) => break,
152					Err(failed_client) => {
153						log::debug!(target: "bridge", "Restarting relay loop");
154
155						reconnect_failed_client(
156							failed_client,
157							self.reconnect_delay,
158							&mut self.source_client,
159							&mut self.target_client,
160						)
161						.await
162					},
163				}
164			}
165			Ok(())
166		};
167
168		async_std::task::spawn(run_loop_task).await
169	}
170}
171
172impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
173	/// Add relay loop metrics.
174	///
175	/// Loop metrics will be passed to the loop callback.
176	pub fn loop_metric<NewLM: Metric>(
177		self,
178		metric: NewLM,
179	) -> Result<LoopMetrics<SC, TC, NewLM>, Error> {
180		metric.register(&self.registry)?;
181
182		Ok(LoopMetrics {
183			relay_loop: self.relay_loop,
184			address: self.address,
185			registry: self.registry,
186			loop_metric: Some(metric),
187		})
188	}
189
190	/// Convert into `MetricsParams` structure so that metrics registry may be extended later.
191	pub fn into_params(self) -> MetricsParams {
192		MetricsParams { address: self.address, registry: self.registry }
193	}
194
195	/// Expose metrics using address passed at creation.
196	///
197	/// If passed `address` is `None`, metrics are not exposed.
198	pub async fn expose(self) -> Result<Loop<SC, TC, LM>, Error> {
199		if let Some(address) = self.address {
200			let socket_addr = SocketAddr::new(
201				address
202					.host
203					.parse()
204					.map_err(|err| Error::ExposingMetricsInvalidHost(address.host.clone(), err))?,
205				address.port,
206			);
207
208			let registry = self.registry;
209			async_std::task::spawn(async move {
210				let runtime =
211					match tokio::runtime::Builder::new_current_thread().enable_all().build() {
212						Ok(runtime) => runtime,
213						Err(err) => {
214							log::trace!(
215								target: "bridge-metrics",
216								"Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
217								err,
218							);
219							return
220						},
221					};
222
223				runtime.block_on(async move {
224					log::trace!(
225						target: "bridge-metrics",
226						"Starting prometheus endpoint at: {:?}",
227						socket_addr,
228					);
229					let result = init_prometheus(socket_addr, registry).await;
230					log::trace!(
231						target: "bridge-metrics",
232						"Prometheus endpoint has exited with result: {:?}",
233						result,
234					);
235				});
236			});
237		}
238
239		Ok(Loop {
240			reconnect_delay: self.relay_loop.reconnect_delay,
241			source_client: self.relay_loop.source_client,
242			target_client: self.relay_loop.target_client,
243			loop_metric: self.loop_metric,
244		})
245	}
246}
247
248/// Deal with the clients that have returned connection error.
249pub async fn reconnect_failed_client(
250	failed_client: FailedClient,
251	reconnect_delay: Duration,
252	source_client: &mut impl Client,
253	target_client: &mut impl Client,
254) {
255	if failed_client == FailedClient::Source || failed_client == FailedClient::Both {
256		source_client.reconnect_until_success(reconnect_delay).await;
257	}
258
259	if failed_client == FailedClient::Target || failed_client == FailedClient::Both {
260		target_client.reconnect_until_success(reconnect_delay).await;
261	}
262}