referrerpolicy=no-referrer-when-downgrade

substrate_prometheus_endpoint/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18mod sourced;
19
20use hyper::{http::StatusCode, Request, Response};
21use prometheus::{core::Collector, Encoder, TextEncoder};
22use std::net::SocketAddr;
23
24pub use prometheus::{
25	self,
26	core::{
27		AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64, GenericCounter as Counter,
28		GenericCounterVec as CounterVec, GenericGauge as Gauge, GenericGaugeVec as GaugeVec,
29	},
30	exponential_buckets, histogram_opts, linear_buckets, Error as PrometheusError, Histogram,
31	HistogramOpts, HistogramVec, Opts, Registry,
32};
33pub use sourced::{MetricSource, SourcedCounter, SourcedGauge, SourcedMetric};
34
35type Body = http_body_util::Full<hyper::body::Bytes>;
36
37pub fn register<T: Clone + Collector + 'static>(
38	metric: T,
39	registry: &Registry,
40) -> Result<T, PrometheusError> {
41	registry.register(Box::new(metric.clone()))?;
42	Ok(metric)
43}
44
45#[derive(Debug, thiserror::Error)]
46pub enum Error {
47	/// Hyper internal error.
48	#[error(transparent)]
49	Hyper(#[from] hyper::Error),
50
51	/// Http request error.
52	#[error(transparent)]
53	Http(#[from] hyper::http::Error),
54
55	/// i/o error.
56	#[error(transparent)]
57	Io(#[from] std::io::Error),
58
59	#[error("Prometheus port {0} already in use.")]
60	PortInUse(SocketAddr),
61}
62
63async fn request_metrics(
64	req: Request<hyper::body::Incoming>,
65	registry: Registry,
66) -> Result<Response<Body>, Error> {
67	if req.uri().path() == "/metrics" {
68		let metric_families = registry.gather();
69		let mut buffer = vec![];
70		let encoder = TextEncoder::new();
71		encoder.encode(&metric_families, &mut buffer).unwrap();
72
73		Response::builder()
74			.status(StatusCode::OK)
75			.header("Content-Type", encoder.format_type())
76			.body(Body::from(buffer))
77			.map_err(Error::Http)
78	} else {
79		Response::builder()
80			.status(StatusCode::NOT_FOUND)
81			.body(Body::from("Not found."))
82			.map_err(Error::Http)
83	}
84}
85
86/// Initializes the metrics context, and starts an HTTP server
87/// to serve metrics.
88pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error> {
89	let listener = tokio::net::TcpListener::bind(&prometheus_addr).await.map_err(|e| {
90		log::error!(target: "prometheus", "Error binding to '{prometheus_addr:?}': {e:?}");
91		Error::PortInUse(prometheus_addr)
92	})?;
93
94	init_prometheus_with_listener(listener, registry).await
95}
96
97/// Init prometheus using the given listener.
98async fn init_prometheus_with_listener(
99	listener: tokio::net::TcpListener,
100	registry: Registry,
101) -> Result<(), Error> {
102	log::info!(target: "prometheus", "〽️ Prometheus exporter started at {}", listener.local_addr()?);
103
104	let server = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
105	let graceful = hyper_util::server::graceful::GracefulShutdown::new();
106
107	loop {
108		let io = match listener.accept().await {
109			Ok((sock, _)) => hyper_util::rt::TokioIo::new(sock),
110			Err(e) => {
111				log::debug!(target: "prometheus", "Error accepting connection: {:?}", e);
112				continue;
113			},
114		};
115
116		let registry = registry.clone();
117
118		let conn = server
119			.serve_connection_with_upgrades(
120				io,
121				hyper::service::service_fn(move |req| request_metrics(req, registry.clone())),
122			)
123			.into_owned();
124		let conn = graceful.watch(conn);
125
126		tokio::spawn(async move {
127			if let Err(err) = conn.await {
128				log::debug!(target: "prometheus", "connection error: {:?}", err);
129			}
130		});
131	}
132}
133
134#[cfg(test)]
135mod tests {
136	use super::*;
137	use http_body_util::BodyExt;
138	use hyper::Uri;
139	use hyper_util::{client::legacy::Client, rt::TokioExecutor};
140
141	const METRIC_NAME: &str = "test_test_metric_name_test_test";
142
143	#[tokio::test]
144	async fn prometheus_works() {
145		let listener =
146			tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("Creates listener");
147
148		let local_addr = listener.local_addr().expect("Returns the local addr");
149
150		let registry = Registry::default();
151		register(
152			prometheus::Counter::new(METRIC_NAME, "yeah").expect("Creates test counter"),
153			&registry,
154		)
155		.expect("Registers the test metric");
156
157		tokio::spawn(init_prometheus_with_listener(listener, registry));
158
159		let client = Client::builder(TokioExecutor::new()).build_http::<Body>();
160
161		let res = client
162			.get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI"))
163			.await
164			.expect("Requests metrics");
165
166		assert!(res.status().is_success());
167
168		let buf = res.into_body().collect().await.expect("Failed to read HTTP body").to_bytes();
169		let body = String::from_utf8(buf.to_vec()).expect("Converts body to String");
170
171		assert!(body.contains(&format!("{} 0", METRIC_NAME)));
172	}
173}