relay_utils/metrics/
float_json_value.rs1use crate::{
18 error::{self, Error},
19 metrics::{
20 metric_name, register, F64SharedRef, Gauge, Metric, PrometheusError, Registry,
21 StandaloneMetric, F64,
22 },
23};
24
25use async_std::sync::{Arc, RwLock};
26use async_trait::async_trait;
27use std::time::Duration;
28
29const UPDATE_INTERVAL: Duration = Duration::from_secs(300);
31
32#[derive(Debug, Clone)]
37pub struct FloatJsonValueMetric {
38 url: String,
39 json_path: String,
40 metric: Gauge<F64>,
41 shared_value_ref: F64SharedRef,
42}
43
44impl FloatJsonValueMetric {
45 pub fn new(
47 url: String,
48 json_path: String,
49 name: String,
50 help: String,
51 ) -> Result<Self, PrometheusError> {
52 let shared_value_ref = Arc::new(RwLock::new(None));
53 Ok(FloatJsonValueMetric {
54 url,
55 json_path,
56 metric: Gauge::new(metric_name(None, &name), help)?,
57 shared_value_ref,
58 })
59 }
60
61 pub fn shared_value_ref(&self) -> F64SharedRef {
63 self.shared_value_ref.clone()
64 }
65
66 async fn request_value(&self) -> anyhow::Result<String> {
68 use isahc::{AsyncReadResponseExt, HttpClient, Request};
69
70 let request = Request::get(&self.url).header("Accept", "application/json").body(())?;
71 let raw_response = HttpClient::new()?.send_async(request).await?.text().await?;
72 Ok(raw_response)
73 }
74
75 async fn read_value(&self) -> error::Result<f64> {
77 let raw_response = self.request_value().await.map_err(Error::FetchTokenPrice)?;
78 parse_service_response(&self.json_path, &raw_response)
79 }
80}
81
82impl Metric for FloatJsonValueMetric {
83 fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
84 register(self.metric.clone(), registry).map(drop)
85 }
86}
87
88#[async_trait]
89impl StandaloneMetric for FloatJsonValueMetric {
90 fn update_interval(&self) -> Duration {
91 UPDATE_INTERVAL
92 }
93
94 async fn update(&self) {
95 let value = self.read_value().await;
96 let maybe_ok = value.as_ref().ok().copied();
97 crate::metrics::set_gauge_value(&self.metric, value.map(Some));
98 *self.shared_value_ref.write().await = maybe_ok;
99 }
100}
101
102fn parse_service_response(json_path: &str, response: &str) -> error::Result<f64> {
104 let json =
105 serde_json::from_str(response).map_err(|err| Error::ParseHttp(err, response.to_owned()))?;
106
107 let mut selector = jsonpath_lib::selector(&json);
108 let maybe_selected_value =
109 selector(json_path).map_err(|err| Error::SelectResponseValue(err, response.to_owned()))?;
110 let selected_value = maybe_selected_value
111 .first()
112 .and_then(|v| v.as_f64())
113 .ok_or_else(|| Error::MissingResponseValue(response.to_owned()))?;
114 if !selected_value.is_normal() || selected_value < 0.0 {
115 return Err(Error::ParseFloat(selected_value))
116 }
117
118 Ok(selected_value)
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 #[test]
126 fn parse_service_response_works() {
127 assert_eq!(
128 parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":433.05}}"#).map_err(drop),
129 Ok(433.05),
130 );
131 }
132
133 #[test]
134 fn parse_service_response_rejects_negative_numbers() {
135 assert!(parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":-433.05}}"#).is_err());
136 }
137
138 #[test]
139 fn parse_service_response_rejects_zero_numbers() {
140 assert!(parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":0.0}}"#).is_err());
141 }
142
143 #[test]
144 fn parse_service_response_rejects_nan() {
145 assert!(parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":NaN}}"#).is_err());
146 }
147}