1use futures_timer::Delay;
20use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
21use sc_client_api::{ClientInfo, UsageProvider};
22use sc_network::{config::Role, NetworkStatus, NetworkStatusProvider};
23use sc_network_sync::{SyncStatus, SyncStatusProvider};
24use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
25use sc_transaction_pool_api::{MaintainedTransactionPool, PoolStatus};
26use sc_utils::metrics::register_globals;
27use sp_api::ProvideRuntimeApi;
28use sp_runtime::traits::{Block, NumberFor, SaturatedConversion, UniqueSaturatedInto};
29use std::{
30 sync::Arc,
31 time::{Duration, Instant, SystemTime},
32};
33
34struct PrometheusMetrics {
35 block_height: GaugeVec<U64>,
37 number_leaves: Gauge<U64>,
38 ready_transactions_number: Gauge<U64>,
39
40 database_cache: Gauge<U64>,
42 state_cache: Gauge<U64>,
43}
44
45impl PrometheusMetrics {
46 fn setup(
47 registry: &Registry,
48 name: &str,
49 version: &str,
50 roles: u64,
51 ) -> Result<Self, PrometheusError> {
52 register(
53 Gauge::<U64>::with_opts(
54 Opts::new(
55 "substrate_build_info",
56 "A metric with a constant '1' value labeled by name, version",
57 )
58 .const_label("name", name)
59 .const_label("version", version),
60 )?,
61 registry,
62 )?
63 .set(1);
64
65 register(
66 Gauge::<U64>::new("substrate_node_roles", "The roles the node is running as")?,
67 registry,
68 )?
69 .set(roles);
70
71 register_globals(registry)?;
72
73 let start_time_since_epoch =
74 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default();
75 register(
76 Gauge::<U64>::new(
77 "substrate_process_start_time_seconds",
78 "Number of seconds between the UNIX epoch and the moment the process started",
79 )?,
80 registry,
81 )?
82 .set(start_time_since_epoch.as_secs());
83
84 Ok(Self {
85 block_height: register(
87 GaugeVec::new(
88 Opts::new("substrate_block_height", "Block height info of the chain"),
89 &["status"],
90 )?,
91 registry,
92 )?,
93
94 number_leaves: register(
95 Gauge::new("substrate_number_leaves", "Number of known chain leaves (aka forks)")?,
96 registry,
97 )?,
98
99 ready_transactions_number: register(
100 Gauge::new(
101 "substrate_ready_transactions_number",
102 "Number of transactions in the ready queue",
103 )?,
104 registry,
105 )?,
106
107 database_cache: register(
109 Gauge::new("substrate_database_cache_bytes", "RocksDB cache size in bytes")?,
110 registry,
111 )?,
112 state_cache: register(
113 Gauge::new("substrate_state_cache_bytes", "State cache size in bytes")?,
114 registry,
115 )?,
116 })
117 }
118}
119
120pub struct MetricsService {
124 metrics: Option<PrometheusMetrics>,
125 last_update: Instant,
126 last_total_bytes_inbound: u64,
127 last_total_bytes_outbound: u64,
128 telemetry: Option<TelemetryHandle>,
129}
130
131impl MetricsService {
132 pub fn new(telemetry: Option<TelemetryHandle>) -> Self {
135 MetricsService {
136 metrics: None,
137 last_total_bytes_inbound: 0,
138 last_total_bytes_outbound: 0,
139 last_update: Instant::now(),
140 telemetry,
141 }
142 }
143
144 pub fn with_prometheus(
147 telemetry: Option<TelemetryHandle>,
148 registry: &Registry,
149 role: Role,
150 node_name: &str,
151 impl_version: &str,
152 ) -> Result<Self, PrometheusError> {
153 let role_bits = match role {
154 Role::Full => 1u64,
155 Role::Authority { .. } => 4u64,
157 };
158
159 PrometheusMetrics::setup(registry, node_name, impl_version, role_bits).map(|p| {
160 MetricsService {
161 metrics: Some(p),
162 last_total_bytes_inbound: 0,
163 last_total_bytes_outbound: 0,
164 last_update: Instant::now(),
165 telemetry,
166 }
167 })
168 }
169
170 pub async fn run<TBl, TExPool, TCl, TNet, TSync>(
174 mut self,
175 client: Arc<TCl>,
176 transactions: Arc<TExPool>,
177 network: TNet,
178 syncing: TSync,
179 ) where
180 TBl: Block,
181 TCl: ProvideRuntimeApi<TBl> + UsageProvider<TBl>,
182 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as Block>::Hash>,
183 TNet: NetworkStatusProvider,
184 TSync: SyncStatusProvider<TBl>,
185 {
186 let mut timer = Delay::new(Duration::from_secs(0));
187 let timer_interval = Duration::from_secs(5);
188
189 loop {
190 (&mut timer).await;
192
193 let net_status = network.status().await.ok();
195
196 let sync_status = syncing.status().await.ok();
198
199 self.update(&client.usage_info(), &transactions.status(), net_status, sync_status);
201
202 timer.reset(timer_interval);
204 }
205 }
206
207 fn update<T: Block>(
208 &mut self,
209 info: &ClientInfo<T>,
210 txpool_status: &PoolStatus,
211 net_status: Option<NetworkStatus>,
212 sync_status: Option<SyncStatus<T>>,
213 ) {
214 let now = Instant::now();
215 let elapsed = (now - self.last_update).as_secs();
216 self.last_update = now;
217
218 let best_number = info.chain.best_number.saturated_into::<u64>();
219 let best_hash = info.chain.best_hash;
220 let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
221
222 telemetry!(
224 self.telemetry;
225 SUBSTRATE_INFO;
226 "system.interval";
227 "height" => best_number,
228 "best" => ?best_hash,
229 "txcount" => txpool_status.ready,
230 "finalized_height" => finalized_number,
231 "finalized_hash" => ?info.chain.finalized_hash,
232 "used_state_cache_size" => info.usage.as_ref()
233 .map(|usage| usage.memory.state_cache.as_bytes())
234 .unwrap_or(0),
235 );
236
237 if let Some(metrics) = self.metrics.as_ref() {
238 metrics.block_height.with_label_values(&["finalized"]).set(finalized_number);
239 metrics.block_height.with_label_values(&["best"]).set(best_number);
240
241 if let Ok(leaves) = u64::try_from(info.chain.number_leaves) {
242 metrics.number_leaves.set(leaves);
243 }
244
245 metrics.ready_transactions_number.set(txpool_status.ready as u64);
246
247 if let Some(info) = info.usage.as_ref() {
248 metrics.database_cache.set(info.memory.database_cache.as_bytes() as u64);
249 metrics.state_cache.set(info.memory.state_cache.as_bytes() as u64);
250 }
251 }
252
253 if let Some(net_status) = net_status {
255 let num_peers = net_status.num_connected_peers;
256 let total_bytes_inbound = net_status.total_bytes_inbound;
257 let total_bytes_outbound = net_status.total_bytes_outbound;
258
259 let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound;
260 let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound;
261 let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) = if elapsed > 0 {
262 self.last_total_bytes_inbound = total_bytes_inbound;
263 self.last_total_bytes_outbound = total_bytes_outbound;
264 (diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed)
265 } else {
266 (diff_bytes_inbound, diff_bytes_outbound)
267 };
268
269 telemetry!(
270 self.telemetry;
271 SUBSTRATE_INFO;
272 "system.interval";
273 "peers" => num_peers,
274 "bandwidth_download" => avg_bytes_per_sec_inbound,
275 "bandwidth_upload" => avg_bytes_per_sec_outbound,
276 );
277 }
278
279 if let Some(sync_status) = sync_status {
280 if let Some(metrics) = self.metrics.as_ref() {
281 let best_seen_block: Option<u64> =
282 sync_status.best_seen_block.map(|num: NumberFor<T>| {
283 UniqueSaturatedInto::<u64>::unique_saturated_into(num)
284 });
285
286 metrics
287 .block_height
288 .with_label_values(&["sync_target"])
289 .set(best_seen_block.unwrap_or(best_number));
290 }
291 }
292 }
293}