relay_substrate_client/metrics/
float_storage_value.rs1use crate::{Chain, Client, Error as SubstrateError};
18
19use async_std::sync::{Arc, RwLock};
20use async_trait::async_trait;
21use codec::Decode;
22use num_traits::One;
23use relay_utils::metrics::{
24 metric_name, register, F64SharedRef, Gauge, Metric, PrometheusError, Registry,
25 StandaloneMetric, F64,
26};
27use sp_core::storage::{StorageData, StorageKey};
28use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber, FixedU128};
29use std::{marker::PhantomData, time::Duration};
30
31const UPDATE_INTERVAL_IN_BLOCKS: u32 = 5;
33
34pub trait FloatStorageValue: 'static + Clone + Send + Sync {
36 type Value: FixedPointNumber;
38 fn decode(
40 &self,
41 maybe_raw_value: Option<StorageData>,
42 ) -> Result<Option<Self::Value>, SubstrateError>;
43}
44
45#[derive(Clone, Debug, Default)]
48pub struct FixedU128OrOne;
49
50impl FloatStorageValue for FixedU128OrOne {
51 type Value = FixedU128;
52
53 fn decode(
54 &self,
55 maybe_raw_value: Option<StorageData>,
56 ) -> Result<Option<Self::Value>, SubstrateError> {
57 maybe_raw_value
58 .map(|raw_value| {
59 FixedU128::decode(&mut &raw_value.0[..])
60 .map_err(SubstrateError::ResponseParseFailed)
61 .map(Some)
62 })
63 .unwrap_or_else(|| Ok(Some(FixedU128::one())))
64 }
65}
66
67#[derive(Clone, Debug)]
69pub struct FloatStorageValueMetric<C, Clnt, V> {
70 value_converter: V,
71 client: Clnt,
72 storage_key: StorageKey,
73 metric: Gauge<F64>,
74 shared_value_ref: F64SharedRef,
75 _phantom: PhantomData<(C, V)>,
76}
77
78impl<C, Clnt, V> FloatStorageValueMetric<C, Clnt, V> {
79 pub fn new(
81 value_converter: V,
82 client: Clnt,
83 storage_key: StorageKey,
84 name: String,
85 help: String,
86 ) -> Result<Self, PrometheusError> {
87 let shared_value_ref = Arc::new(RwLock::new(None));
88 Ok(FloatStorageValueMetric {
89 value_converter,
90 client,
91 storage_key,
92 metric: Gauge::new(metric_name(None, &name), help)?,
93 shared_value_ref,
94 _phantom: Default::default(),
95 })
96 }
97
98 pub fn shared_value_ref(&self) -> F64SharedRef {
100 self.shared_value_ref.clone()
101 }
102}
103
104impl<C: Chain, Clnt: Client<C>, V: FloatStorageValue> Metric
105 for FloatStorageValueMetric<C, Clnt, V>
106{
107 fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
108 register(self.metric.clone(), registry).map(drop)
109 }
110}
111
112#[async_trait]
113impl<C: Chain, Clnt: Client<C>, V: FloatStorageValue> StandaloneMetric
114 for FloatStorageValueMetric<C, Clnt, V>
115{
116 fn update_interval(&self) -> Duration {
117 C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS
118 }
119
120 async fn update(&self) {
121 let value = async move {
122 let best_header_hash = self.client.best_header_hash().await?;
123 let maybe_storage_value = self
124 .client
125 .raw_storage_value(best_header_hash, self.storage_key.clone())
126 .await?;
127 self.value_converter.decode(maybe_storage_value).map(|maybe_fixed_point_value| {
128 maybe_fixed_point_value.map(|fixed_point_value| {
129 fixed_point_value.into_inner().unique_saturated_into() as f64 /
130 V::Value::DIV.unique_saturated_into() as f64
131 })
132 })
133 }
134 .await
135 .map_err(|e| e.to_string());
136
137 relay_utils::metrics::set_gauge_value(&self.metric, value.clone());
138 *self.shared_value_ref.write().await = value.ok().and_then(|x| x);
139 }
140}