1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
5 Arc,
6 },
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use anyhow::anyhow;
11use configuration::types::{Arg, AssetLocation};
12use fancy_regex::Regex;
13use glob_match::glob_match;
14use prom_metrics_parser::MetricMap;
15use provider::{
16 types::{ExecutionResult, RunScriptOptions},
17 DynNode,
18};
19use serde::{Deserialize, Serialize, Serializer};
20use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
21use support::net::{skip_err_while_waiting, wait_ws_ready};
22use thiserror::Error;
23use tokio::sync::RwLock;
24use tracing::{debug, trace, warn};
25
26use crate::{
27 generators::{generate_node_command, generate_node_command_cumulus, GenCmdOptions},
28 network::NodeContext,
29 network_spec::node::NodeSpec,
30 shared::constants::PROCESS_START_TIME_METRIC,
31 tx_helper::client::get_client_from_url,
32};
33
34type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
35
36#[derive(Error, Debug)]
37pub enum NetworkNodeError {
38 #[error("metric '{0}' not found!")]
39 MetricNotFound(String),
40}
41
42const MONITOR_TARGET: &str = "zombie_monitor";
44#[derive(Clone, Serialize)]
45pub struct NetworkNode {
46 #[serde(serialize_with = "serialize_provider_node")]
47 pub(crate) inner: DynNode,
48 pub(crate) spec: NodeSpec,
51 pub(crate) name: String,
52 pub(crate) ws_uri: String,
53 pub(crate) multiaddr: String,
54 pub(crate) prometheus_uri: String,
55 pub(crate) cmd_generator_opts: GenCmdOptions,
59 pub(crate) context: NodeContext,
61 #[serde(skip)]
62 metrics_cache: Arc<RwLock<MetricMap>>,
63 #[serde(skip)]
64 is_running: Arc<AtomicBool>,
65 #[serde(skip)]
67 last_start_ts: Arc<AtomicU64>,
68}
69
70#[derive(Deserialize)]
71pub(crate) struct RawNetworkNode {
72 pub(crate) name: String,
73 pub(crate) ws_uri: String,
74 pub(crate) prometheus_uri: String,
75 pub(crate) multiaddr: String,
76 pub(crate) spec: NodeSpec,
77 pub(crate) cmd_generator_opts: GenCmdOptions,
78 pub(crate) context: NodeContext,
79 #[serde(default)]
80 pub(crate) inner: serde_json::Value,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum WaitCount {
94 TargetReached(u32),
95 TargetFailed(u32),
96}
97
98impl WaitCount {
99 pub fn success(&self) -> bool {
100 match self {
101 Self::TargetReached(..) => true,
102 Self::TargetFailed(..) => false,
103 }
104 }
105}
106
107pub type LogLineCount = WaitCount;
108
109#[derive(Clone)]
122pub struct CountOptions {
123 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
124 pub timeout: Duration,
125 pub wait_until_timeout_elapses: bool,
126}
127
128impl CountOptions {
129 pub fn new(
130 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
131 timeout: Duration,
132 wait_until_timeout_elapses: bool,
133 ) -> Self {
134 Self {
135 predicate: Arc::new(predicate),
136 timeout,
137 wait_until_timeout_elapses,
138 }
139 }
140
141 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
142 Self::new(|n| n == 0, timeout, true)
143 }
144
145 pub fn at_least_once(timeout: Duration) -> Self {
146 Self::new(|count| count >= 1, timeout, false)
147 }
148
149 pub fn at_least(target: u32, timeout: Duration) -> Self {
150 Self::new(move |count| count >= target, timeout, false)
151 }
152
153 pub fn exactly_once(timeout: Duration) -> Self {
154 Self::new(|count| count == 1, timeout, false)
155 }
156}
157
158pub type LogLineCountOptions = CountOptions;
159
160impl NetworkNode {
161 #[allow(clippy::too_many_arguments)]
163 pub(crate) fn new<T: Into<String>>(
164 name: T,
165 ws_uri: T,
166 prometheus_uri: T,
167 multiaddr: T,
168 spec: NodeSpec,
169 inner: DynNode,
170 cmd_generator_opts: GenCmdOptions,
171 context: NodeContext,
172 ) -> Self {
173 Self {
174 name: name.into(),
175 ws_uri: ws_uri.into(),
176 prometheus_uri: prometheus_uri.into(),
177 inner,
178 spec,
179 cmd_generator_opts,
180 context,
181 multiaddr: multiaddr.into(),
182 metrics_cache: Arc::new(Default::default()),
183 is_running: Arc::new(AtomicBool::new(false)),
184 last_start_ts: Arc::new(AtomicU64::new(0)),
185 }
186 }
187
188 pub fn is_running(&self) -> bool {
192 self.is_running.load(Ordering::Acquire)
193 }
194
195 pub fn last_start_ts(&self) -> u64 {
197 self.last_start_ts.load(Ordering::Acquire)
198 }
199
200 pub async fn is_responsive(&self) -> bool {
207 tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
208 .await
209 .is_ok()
210 }
211
212 pub(crate) fn set_is_running(&self, is_running: bool) {
213 self.is_running.store(is_running, Ordering::Release);
214 }
215
216 pub(crate) fn set_last_start_ts(&self, ts: u64) {
218 self.last_start_ts.store(ts, Ordering::Release);
219 }
220
221 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
222 self.multiaddr = multiaddr.into();
223 }
224
225 pub fn name(&self) -> &str {
226 &self.name
227 }
228
229 pub fn args(&self) -> Vec<&str> {
232 self.inner.args()
233 }
234
235 pub fn user_args(&self) -> Vec<String> {
238 self.spec
239 .args
240 .iter()
241 .fold(vec![], |acc, arg| [acc, arg.to_vec()].concat())
242 }
243
244 pub fn spec(&self) -> &NodeSpec {
245 &self.spec
246 }
247
248 pub fn ws_uri(&self) -> &str {
249 &self.ws_uri
250 }
251
252 pub fn multiaddr(&self) -> &str {
253 self.multiaddr.as_ref()
254 }
255
256 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
260 get_client_from_url(&self.ws_uri).await
261 }
262
263 #[deprecated = "Use `wait_client` instead."]
265 pub async fn client<Config: subxt::Config>(
266 &self,
267 ) -> Result<OnlineClient<Config>, subxt::Error> {
268 self.try_client().await
269 }
270
271 pub async fn try_client<Config: subxt::Config>(
280 &self,
281 ) -> Result<OnlineClient<Config>, subxt::Error> {
282 get_client_from_url(&self.ws_uri).await
283 }
284
285 pub async fn wait_client<Config: subxt::Config>(
287 &self,
288 ) -> Result<OnlineClient<Config>, anyhow::Error> {
289 debug!("wait_client ws_uri: {}", self.ws_uri());
290 wait_ws_ready(self.ws_uri())
291 .await
292 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
293
294 self.try_client()
295 .await
296 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
297 }
298
299 pub async fn wait_client_with_timeout<Config: subxt::Config>(
301 &self,
302 timeout_secs: impl Into<u64>,
303 ) -> Result<OnlineClient<Config>, anyhow::Error> {
304 debug!("waiting until subxt client is ready");
305 tokio::time::timeout(
306 Duration::from_secs(timeout_secs.into()),
307 self.wait_client::<Config>(),
308 )
309 .await?
310 }
311
312 pub async fn pause(&self) -> Result<(), anyhow::Error> {
320 self.set_is_running(false);
321 self.inner.pause().await?;
322 Ok(())
323 }
324
325 pub async fn resume(&self) -> Result<(), anyhow::Error> {
331 self.set_is_running(true);
332 self.inner.resume().await?;
333 Ok(())
334 }
335
336 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
341 self.set_is_running(false);
342 self.inner.restart(after).await?;
343 self.set_is_running(true);
344 self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
345 Ok(())
346 }
347
348 pub async fn restart_with(
360 &self,
361 assets: Vec<AssetLocation>,
362 program: Option<String>,
363 args: Option<Vec<Arg>>,
364 after: Option<Duration>,
365 ) -> Result<(), anyhow::Error> {
366 self.set_is_running(false);
367 let mut spec_cloned = self.spec.clone();
368
369 if let Some(args) = args {
370 spec_cloned.args = args;
371 }
372 if let Some(program) = program {
373 spec_cloned.command = program.as_str().try_into()?;
374 }
375
376 let (program, args) = match self.context {
377 NodeContext::Rc
378 | NodeContext::Para {
379 is_cumulus_based: false,
380 ..
381 } => generate_node_command(&spec_cloned, self.cmd_generator_opts.clone(), None),
382 NodeContext::Para {
383 para_id,
384 is_cumulus_based: true,
385 } => generate_node_command_cumulus(
386 &spec_cloned,
387 self.cmd_generator_opts.clone(),
388 para_id,
389 ),
390 };
391
392 self.inner
393 .restart_with(&assets, &program, &args, after)
394 .await?;
395 self.set_is_running(true);
396 self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
397 Ok(())
398 }
399
400 pub async fn run_script(
407 &self,
408 options: RunScriptOptions,
409 ) -> Result<ExecutionResult, anyhow::Error> {
410 self.inner
411 .run_script(options)
412 .await
413 .map_err(|e| anyhow!("Failed to run script: {e}"))
414 }
415
416 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
424 let metric_name = metric_name.into();
425 self.fetch_metrics().await?;
427 self.metric(&metric_name, true).await
429 }
430
431 pub async fn assert(
440 &self,
441 metric_name: impl Into<String>,
442 value: impl Into<f64>,
443 ) -> Result<bool, anyhow::Error> {
444 let value: f64 = value.into();
445 self.assert_with(metric_name, |v| v == value).await
446 }
447
448 pub async fn assert_with(
451 &self,
452 metric_name: impl Into<String>,
453 predicate: impl Fn(f64) -> bool,
454 ) -> Result<bool, anyhow::Error> {
455 let metric_name = metric_name.into();
456 self.fetch_metrics().await?;
458 let val = self.metric(&metric_name, true).await?;
459 let log_msg = format!("🔎 Current value {val} passed to the predicated?");
460 if metric_name == PROCESS_START_TIME_METRIC {
461 trace!(target: MONITOR_TARGET, "{log_msg}");
462 } else {
463 trace!("{log_msg}");
464 }
465 Ok(predicate(val))
466 }
467
468 pub async fn wait_metric(
472 &self,
473 metric_name: impl Into<String>,
474 predicate: impl Fn(f64) -> bool,
475 ) -> Result<(), anyhow::Error> {
476 let metric_name = metric_name.into();
477 let log_msg = format!(
478 "[{}] waiting until metric {metric_name} pass the predicate",
479 self.name()
480 );
481 if metric_name == PROCESS_START_TIME_METRIC {
482 trace!(target: MONITOR_TARGET, "{log_msg}");
483 } else {
484 trace!("{log_msg}");
485 }
486
487 loop {
488 let res = self.assert_with(&metric_name, &predicate).await;
489 let log_msg = format!("res: {res:?}");
490 match res {
491 Ok(res) => {
492 if res {
493 return Ok(());
494 }
495 },
496 Err(e) => match e.downcast::<reqwest::Error>() {
497 Ok(io_err) => {
498 if !skip_err_while_waiting(&io_err) {
499 return Err(io_err.into());
500 }
501 },
502 Err(other) => {
503 match other.downcast::<NetworkNodeError>() {
504 Ok(node_err) => {
505 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
506 return Err(node_err.into());
507 }
508 },
509 Err(other) => return Err(other),
510 };
511 },
512 },
513 }
514
515 if metric_name == PROCESS_START_TIME_METRIC {
516 trace!(target: MONITOR_TARGET, "{log_msg}");
517 } else {
518 trace!("{log_msg}");
519 }
520
521 tokio::time::sleep(Duration::from_secs(1)).await;
523 }
524 }
525
526 pub async fn wait_metric_with_timeout(
529 &self,
530 metric_name: impl Into<String>,
531 predicate: impl Fn(f64) -> bool,
532 timeout_secs: impl Into<u64>,
533 ) -> Result<(), anyhow::Error> {
534 let metric_name = metric_name.into();
535 let secs = timeout_secs.into();
536 let log_msg = format!(
537 "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
538 self.name()
539 );
540
541 if metric_name == PROCESS_START_TIME_METRIC {
542 trace!(target: MONITOR_TARGET, "{log_msg}");
543 } else {
544 debug!("{log_msg}");
545 }
546
547 let res = tokio::time::timeout(
548 Duration::from_secs(secs),
549 self.wait_metric(&metric_name, predicate),
550 )
551 .await;
552
553 if let Ok(inner_res) = res {
554 match inner_res {
555 Ok(_) => Ok(()),
556 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
557 }
558 } else {
559 Err(anyhow!(
561 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
562 ))
563 }
564 }
565
566 pub async fn logs(&self) -> Result<String, anyhow::Error> {
571 Ok(self.inner.logs().await?)
572 }
573
574 pub async fn wait_log_line_count(
576 &self,
577 pattern: impl Into<String>,
578 is_glob: bool,
579 count: usize,
580 ) -> Result<(), anyhow::Error> {
581 let pattern = pattern.into();
582 let pattern_clone = pattern.clone();
583 debug!("waiting until we find pattern {pattern} {count} times");
584 let match_fn: BoxedClosure = if is_glob {
585 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
586 } else {
587 let re = Regex::new(&pattern)?;
588 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
589 };
590
591 loop {
592 let mut q = 0_usize;
593 let logs = self.logs().await?;
594 for line in logs.lines() {
595 trace!("line is {line}");
596 if match_fn(line)? {
597 trace!("pattern {pattern_clone} match in line {line}");
598 q += 1;
599 if q >= count {
600 return Ok(());
601 }
602 }
603 }
604
605 tokio::time::sleep(Duration::from_secs(2)).await;
606 }
607 }
608
609 pub async fn wait_log_line_count_with_timeout(
653 &self,
654 substring: impl Into<String>,
655 is_glob: bool,
656 options: LogLineCountOptions,
657 ) -> Result<LogLineCount, anyhow::Error> {
658 let substring = substring.into();
659 debug!(
660 "waiting until match lines count within {} seconds",
661 options.timeout.as_secs_f64()
662 );
663
664 let start = tokio::time::Instant::now();
665
666 let match_fn: BoxedClosure = if is_glob {
667 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
668 } else {
669 let re = Regex::new(&substring)?;
670 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
671 };
672
673 if options.wait_until_timeout_elapses {
674 tokio::time::sleep(options.timeout).await;
675 }
676
677 let mut q;
678 loop {
679 q = 0_u32;
680 let logs = self.logs().await?;
681 for line in logs.lines() {
682 if match_fn(line)? {
683 q += 1;
684
685 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
690 return Ok(LogLineCount::TargetReached(q));
691 }
692 }
693 }
694
695 if start.elapsed() >= options.timeout {
696 break;
697 }
698
699 tokio::time::sleep(Duration::from_secs(2)).await;
700 }
701
702 if (options.predicate)(q) {
703 Ok(LogLineCount::TargetReached(q))
704 } else {
705 Ok(LogLineCount::TargetFailed(q))
706 }
707 }
708
709 pub async fn wait_event_count_with_timeout(
753 &self,
754 pallet: impl Into<String>,
755 variant: impl Into<String>,
756 options: CountOptions,
757 ) -> Result<WaitCount, anyhow::Error> {
758 let pallet = pallet.into();
759 let variant = variant.into();
760 debug!(
761 "waiting until match event ({pallet} {variant}) count within {} seconds",
762 options.timeout.as_secs_f64()
763 );
764
765 let init_value = Arc::new(AtomicU32::new(0));
766
767 let res = tokio::time::timeout(
768 options.timeout,
769 self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
770 )
771 .await;
772
773 let q = init_value.load(Ordering::Relaxed);
774 if let Ok(inner_res) = res {
775 match inner_res {
776 Ok(_) => Ok(WaitCount::TargetReached(q)),
777 Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
778 }
779 } else {
780 if options.wait_until_timeout_elapses {
782 let q = init_value.load(Ordering::Relaxed);
783 if (options.predicate)(q) {
784 Ok(LogLineCount::TargetReached(q))
785 } else {
786 Ok(LogLineCount::TargetFailed(q))
787 }
788 } else {
789 Err(anyhow!(
790 "Timeout ({}), waiting for counter",
791 options.timeout.as_secs()
792 ))
793 }
794 }
795 }
796
797 async fn wait_event_count(
799 &self,
800 pallet: &str,
801 variant: &str,
802 options: &CountOptions,
803 init_count: Arc<AtomicU32>,
804 ) -> Result<(), anyhow::Error> {
805 let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
806 let mut blocks_sub: subxt::backend::StreamOf<
807 Result<
808 subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
809 subxt::Error,
810 >,
811 > = client.blocks().subscribe_finalized().await?;
812 while let Some(block) = blocks_sub.next().await {
813 let events = block?.events().await?;
814 for event in events.iter() {
815 let evt = event?;
816 if evt.pallet_name() == pallet && evt.variant_name() == variant {
817 let old_value = init_count.fetch_add(1, Ordering::Relaxed);
818 if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
819 return Ok(());
820 }
821 }
822 }
823 }
824
825 Ok(())
826 }
827
828 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
829 let response = reqwest::get(&self.prometheus_uri).await?;
830 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
831 let mut cache = self.metrics_cache.write().await;
832 *cache = metrics;
833 Ok(())
834 }
835
836 async fn metric(
838 &self,
839 metric_name: &str,
840 treat_not_found_as_zero: bool,
841 ) -> Result<f64, anyhow::Error> {
842 let mut metrics_map = self.metrics_cache.read().await;
843 if metrics_map.is_empty() {
844 drop(metrics_map);
846 self.fetch_metrics().await?;
847 metrics_map = self.metrics_cache.read().await;
848 }
849
850 if let Some(val) = metrics_map.get(metric_name) {
851 Ok(*val)
852 } else if treat_not_found_as_zero {
853 Ok(0_f64)
854 } else {
855 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
856 }
857 }
858
859 pub async fn get_histogram_buckets(
879 &self,
880 metric_name: impl AsRef<str>,
881 label_filters: Option<HashMap<String, String>>,
882 ) -> Result<HashMap<String, u64>, anyhow::Error> {
883 let metric_name = metric_name.as_ref();
884
885 let response = reqwest::get(&self.prometheus_uri).await?;
887 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
888
889 let resolved_metric_name = if metric_name.contains("_bucket") {
891 metric_name.to_string()
892 } else {
893 format!("{}_bucket", metric_name)
894 };
895
896 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
900
901 for (key, &value) in metrics.iter() {
902 if !key.starts_with(&resolved_metric_name) {
903 continue;
904 }
905
906 let remaining = &key[resolved_metric_name.len()..];
907
908 let labels_str = &remaining[1..remaining.len() - 1];
909 let parsed_labels = Self::parse_label_string(labels_str);
910
911 if !parsed_labels.contains_key("le") {
913 continue;
914 }
915
916 if let Some(ref filters) = label_filters {
918 let mut all_match = true;
919 for (filter_key, filter_value) in filters {
920 if parsed_labels.get(filter_key) != Some(filter_value) {
921 all_match = false;
922 break;
923 }
924 }
925 if !all_match {
926 continue;
927 }
928 }
929
930 metric_entries.push((key.clone(), parsed_labels, value as u64));
931 }
932
933 let max_label_count = metric_entries
936 .iter()
937 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
938 .max()
939 .unwrap_or(0);
940
941 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
943 let mut seen_le_values = HashSet::new();
944 let mut active_series: Option<Vec<(String, String)>> = None;
945
946 for (_, parsed_labels, value) in metric_entries {
947 let le_value = parsed_labels.get("le").unwrap().clone();
948
949 let mut non_le_labels: Vec<(String, String)> = parsed_labels
951 .iter()
952 .filter(|(k, _)| k.as_str() != "le")
953 .map(|(k, v)| (k.clone(), v.clone()))
954 .collect();
955 non_le_labels.sort();
956
957 if non_le_labels.len() < max_label_count {
960 continue;
961 }
962
963 if let Some(ref prev_series) = active_series {
965 if prev_series != &non_le_labels {
966 if !raw_buckets.is_empty() {
967 break; }
969 active_series = Some(non_le_labels.clone());
970 seen_le_values.clear();
971 }
972 } else {
973 active_series = Some(non_le_labels.clone());
974 }
975
976 if !seen_le_values.insert(le_value.clone()) {
978 continue;
979 }
980
981 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
982 raw_buckets.push((le_value, value));
983 }
984
985 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
987
988 let mut buckets = HashMap::new();
990 let mut previous_value = 0_u64;
991 for (le, cumulative_count) in raw_buckets {
992 if cumulative_count < previous_value {
993 warn!(
994 "Warning: bucket count decreased from {} to {} at le={}",
995 previous_value, cumulative_count, le
996 );
997 }
998 let delta = cumulative_count.saturating_sub(previous_value);
999 buckets.insert(le, delta);
1000 previous_value = cumulative_count;
1001 }
1002
1003 Ok(buckets)
1004 }
1005
1006 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
1012 let mut labels = HashMap::new();
1013 let mut current_key = String::new();
1014 let mut current_value = String::new();
1015 let mut in_value = false;
1016 let mut in_quotes = false;
1017
1018 for ch in labels_str.chars() {
1019 match ch {
1020 '=' if !in_quotes && !in_value => {
1021 in_value = true;
1022 },
1023 '"' if in_value => {
1024 in_quotes = !in_quotes;
1025 },
1026 ',' if !in_quotes => {
1027 if !current_key.is_empty() {
1029 labels.insert(
1030 current_key.trim().to_string(),
1031 current_value.trim().to_string(),
1032 );
1033 current_key.clear();
1034 current_value.clear();
1035 in_value = false;
1036 }
1037 },
1038 _ => {
1039 if in_value {
1040 current_value.push(ch);
1041 } else {
1042 current_key.push(ch);
1043 }
1044 },
1045 }
1046 }
1047
1048 if !current_key.is_empty() {
1050 labels.insert(
1051 current_key.trim().to_string(),
1052 current_value.trim().to_string(),
1053 );
1054 }
1055
1056 labels
1057 }
1058
1059 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
1063 use std::cmp::Ordering;
1064
1065 match (a, b) {
1067 ("+Inf", "+Inf") => Ordering::Equal,
1068 ("+Inf", _) => Ordering::Greater,
1069 (_, "+Inf") => Ordering::Less,
1070 _ => {
1071 match (a.parse::<f64>(), b.parse::<f64>()) {
1073 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
1074 _ => a.cmp(b),
1076 }
1077 },
1078 }
1079 }
1080
1081 pub async fn wait_until_is_up(
1093 &self,
1094 timeout_secs: impl Into<u64>,
1095 ) -> Result<(), anyhow::Error> {
1096 self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1097 .await
1098 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1099 }
1100}
1101
1102impl std::fmt::Debug for NetworkNode {
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 f.debug_struct("NetworkNode")
1105 .field("inner", &"inner_skipped")
1106 .field("spec", &self.spec)
1107 .field("name", &self.name)
1108 .field("ws_uri", &self.ws_uri)
1109 .field("prometheus_uri", &self.prometheus_uri)
1110 .finish()
1111 }
1112}
1113
1114fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1115where
1116 S: Serializer,
1117{
1118 erased_serde::serialize(node.as_ref(), serializer)
1119}
1120
1121#[cfg(test)]
1123mod tests {
1124 use std::{
1125 path::{Path, PathBuf},
1126 sync::{Arc, Mutex},
1127 };
1128
1129 use async_trait::async_trait;
1130 use provider::{types::*, ProviderError, ProviderNode};
1131
1132 use super::*;
1133
1134 #[derive(Serialize)]
1135 struct MockNode {
1136 logs: Arc<Mutex<Vec<String>>>,
1137 }
1138
1139 impl MockNode {
1140 fn new() -> Self {
1141 Self {
1142 logs: Arc::new(Mutex::new(vec![])),
1143 }
1144 }
1145
1146 fn logs_push(&self, lines: Vec<impl Into<String>>) {
1147 self.logs
1148 .lock()
1149 .unwrap()
1150 .extend(lines.into_iter().map(|l| l.into()));
1151 }
1152 }
1153
1154 #[async_trait]
1155 impl ProviderNode for MockNode {
1156 fn name(&self) -> &str {
1157 todo!()
1158 }
1159
1160 fn args(&self) -> Vec<&str> {
1161 todo!()
1162 }
1163
1164 fn base_dir(&self) -> &PathBuf {
1165 todo!()
1166 }
1167
1168 fn config_dir(&self) -> &PathBuf {
1169 todo!()
1170 }
1171
1172 fn data_dir(&self) -> &PathBuf {
1173 todo!()
1174 }
1175
1176 fn relay_data_dir(&self) -> &PathBuf {
1177 todo!()
1178 }
1179
1180 fn scripts_dir(&self) -> &PathBuf {
1181 todo!()
1182 }
1183
1184 fn log_path(&self) -> &PathBuf {
1185 todo!()
1186 }
1187
1188 fn log_cmd(&self) -> String {
1189 todo!()
1190 }
1191
1192 fn path_in_node(&self, _file: &Path) -> PathBuf {
1193 todo!()
1194 }
1195
1196 async fn logs(&self) -> Result<String, ProviderError> {
1197 Ok(self.logs.lock().unwrap().join("\n"))
1198 }
1199
1200 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1201 todo!()
1202 }
1203
1204 async fn run_command(
1205 &self,
1206 _options: RunCommandOptions,
1207 ) -> Result<ExecutionResult, ProviderError> {
1208 todo!()
1209 }
1210
1211 async fn run_script(
1212 &self,
1213 _options: RunScriptOptions,
1214 ) -> Result<ExecutionResult, ProviderError> {
1215 todo!()
1216 }
1217
1218 async fn send_file(
1219 &self,
1220 _local_file_path: &Path,
1221 _remote_file_path: &Path,
1222 _mode: &str,
1223 ) -> Result<(), ProviderError> {
1224 todo!()
1225 }
1226
1227 async fn receive_file(
1228 &self,
1229 _remote_file_path: &Path,
1230 _local_file_path: &Path,
1231 ) -> Result<(), ProviderError> {
1232 todo!()
1233 }
1234
1235 async fn pause(&self) -> Result<(), ProviderError> {
1236 todo!()
1237 }
1238
1239 async fn resume(&self) -> Result<(), ProviderError> {
1240 todo!()
1241 }
1242
1243 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1244 todo!()
1245 }
1246
1247 async fn restart_with(
1248 &self,
1249 _assets: &[AssetLocation],
1250 _cmd: &str,
1251 _args: &[String],
1252 _after: Option<Duration>,
1253 ) -> Result<(), ProviderError> {
1254 todo!()
1255 }
1256
1257 async fn destroy(&self) -> Result<(), ProviderError> {
1258 todo!()
1259 }
1260 }
1261
1262 #[tokio::test(flavor = "multi_thread")]
1263 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1264 let mock_provider = Arc::new(MockNode::new());
1265 let mock_node = NetworkNode::new(
1266 "node1",
1267 "ws_uri",
1268 "prometheus_uri",
1269 "multiaddr",
1270 NodeSpec::default(),
1271 mock_provider.clone(),
1272 GenCmdOptions::default(),
1273 NodeContext::Rc,
1274 );
1275
1276 mock_provider.logs_push(vec![
1277 "system booting",
1278 "stub line 1",
1279 "stub line 2",
1280 "system ready",
1281 ]);
1282
1283 let options = LogLineCountOptions {
1285 predicate: Arc::new(|n| n == 1),
1286 timeout: Duration::from_secs(10),
1287 wait_until_timeout_elapses: false,
1288 };
1289
1290 let log_line_count = mock_node
1291 .wait_log_line_count_with_timeout("system ready", false, options)
1292 .await?;
1293
1294 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1295
1296 Ok(())
1297 }
1298
1299 #[tokio::test(flavor = "multi_thread")]
1300 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1301 let mock_provider = Arc::new(MockNode::new());
1302 let mock_node = NetworkNode::new(
1303 "node1",
1304 "ws_uri",
1305 "prometheus_uri",
1306 "multiaddr",
1307 NodeSpec::default(),
1308 mock_provider.clone(),
1309 GenCmdOptions::default(),
1310 NodeContext::Rc,
1311 );
1312
1313 mock_provider.logs_push(vec![
1314 "system booting",
1315 "stub line 1",
1316 "stub line 2",
1317 "system ready",
1318 ]);
1319
1320 let options = LogLineCountOptions {
1322 predicate: Arc::new(|n| n == 2),
1323 timeout: Duration::from_secs(4),
1324 wait_until_timeout_elapses: false,
1325 };
1326
1327 let task = tokio::spawn({
1328 async move {
1329 mock_node
1330 .wait_log_line_count_with_timeout("system ready", false, options)
1331 .await
1332 .unwrap()
1333 }
1334 });
1335
1336 tokio::time::sleep(Duration::from_secs(2)).await;
1337
1338 mock_provider.logs_push(vec!["system ready"]);
1339
1340 let log_line_count = task.await?;
1341
1342 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1343
1344 Ok(())
1345 }
1346
1347 #[tokio::test(flavor = "multi_thread")]
1348 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1349 let mock_provider = Arc::new(MockNode::new());
1350 let mock_node = NetworkNode::new(
1351 "node1",
1352 "ws_uri",
1353 "prometheus_uri",
1354 "multiaddr",
1355 NodeSpec::default(),
1356 mock_provider.clone(),
1357 GenCmdOptions::default(),
1358 NodeContext::Rc,
1359 );
1360
1361 mock_provider.logs_push(vec![
1362 "system booting",
1363 "stub line 1",
1364 "stub line 2",
1365 "system ready",
1366 ]);
1367
1368 let options = LogLineCountOptions {
1370 predicate: Arc::new(|n| n == 2),
1371 timeout: Duration::from_secs(2),
1372 wait_until_timeout_elapses: false,
1373 };
1374
1375 let log_line_count = mock_node
1376 .wait_log_line_count_with_timeout("system ready", false, options)
1377 .await?;
1378
1379 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1380
1381 Ok(())
1382 }
1383
1384 #[tokio::test(flavor = "multi_thread")]
1385 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1386 let mock_provider = Arc::new(MockNode::new());
1387 let mock_node = NetworkNode::new(
1388 "node1",
1389 "ws_uri",
1390 "prometheus_uri",
1391 "multiaddr",
1392 NodeSpec::default(),
1393 mock_provider.clone(),
1394 GenCmdOptions::default(),
1395 NodeContext::Rc,
1396 );
1397
1398 mock_provider.logs_push(vec![
1399 "system booting",
1400 "stub line 1",
1401 "stub line 2",
1402 "system ready",
1403 ]);
1404
1405 let options = LogLineCountOptions {
1407 predicate: Arc::new(|n| n == 2),
1408 timeout: Duration::from_secs(2),
1409 wait_until_timeout_elapses: true,
1410 };
1411
1412 let task = tokio::spawn({
1413 async move {
1414 mock_node
1415 .wait_log_line_count_with_timeout("system ready", false, options)
1416 .await
1417 .unwrap()
1418 }
1419 });
1420
1421 tokio::time::sleep(Duration::from_secs(1)).await;
1422
1423 mock_provider.logs_push(vec!["system ready"]);
1424 mock_provider.logs_push(vec!["system ready"]);
1425
1426 let log_line_count = task.await?;
1427
1428 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1429
1430 Ok(())
1431 }
1432
1433 #[tokio::test(flavor = "multi_thread")]
1434 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1435 let mock_provider = Arc::new(MockNode::new());
1436 let mock_node = NetworkNode::new(
1437 "node1",
1438 "ws_uri",
1439 "prometheus_uri",
1440 "multiaddr",
1441 NodeSpec::default(),
1442 mock_provider.clone(),
1443 GenCmdOptions::default(),
1444 NodeContext::Rc,
1445 );
1446
1447 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1448
1449 let task = tokio::spawn({
1450 async move {
1451 mock_node
1452 .wait_log_line_count_with_timeout(
1453 "system ready",
1454 false,
1455 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1457 )
1458 .await
1459 .unwrap()
1460 }
1461 });
1462
1463 tokio::time::sleep(Duration::from_secs(1)).await;
1464
1465 mock_provider.logs_push(vec!["stub line 3"]);
1466
1467 assert!(task.await?.success());
1468
1469 Ok(())
1470 }
1471
1472 #[tokio::test(flavor = "multi_thread")]
1473 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1474 let mock_provider = Arc::new(MockNode::new());
1475 let mock_node = NetworkNode::new(
1476 "node1",
1477 "ws_uri",
1478 "prometheus_uri",
1479 "multiaddr",
1480 NodeSpec::default(),
1481 mock_provider.clone(),
1482 GenCmdOptions::default(),
1483 NodeContext::Rc,
1484 );
1485
1486 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1487
1488 let options = LogLineCountOptions {
1490 predicate: Arc::new(|n| (2..=5).contains(&n)),
1491 timeout: Duration::from_secs(2),
1492 wait_until_timeout_elapses: true,
1493 };
1494
1495 let task = tokio::spawn({
1496 async move {
1497 mock_node
1498 .wait_log_line_count_with_timeout("system ready", false, options)
1499 .await
1500 .unwrap()
1501 }
1502 });
1503
1504 tokio::time::sleep(Duration::from_secs(1)).await;
1505
1506 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1507
1508 assert!(task.await?.success());
1509
1510 Ok(())
1511 }
1512
1513 #[tokio::test(flavor = "multi_thread")]
1514 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1515 let mock_provider = Arc::new(MockNode::new());
1516 let mock_node = NetworkNode::new(
1517 "node1",
1518 "ws_uri",
1519 "prometheus_uri",
1520 "multiaddr",
1521 NodeSpec::default(),
1522 mock_provider.clone(),
1523 GenCmdOptions::default(),
1524 NodeContext::Rc,
1525 );
1526
1527 mock_provider.logs_push(vec![
1528 "system booting",
1529 "stub line 1",
1530 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1532 "stub line 2"
1533 ]);
1534
1535 let options = LogLineCountOptions {
1536 predicate: Arc::new(|n| n == 1),
1537 timeout: Duration::from_secs(3),
1538 wait_until_timeout_elapses: true,
1539 };
1540
1541 let task = tokio::spawn({
1542 async move {
1543 mock_node
1544 .wait_log_line_count_with_timeout(
1545 "error(?! importing block .*: block has an unknown parent)",
1546 false,
1547 options,
1548 )
1549 .await
1550 .unwrap()
1551 }
1552 });
1553
1554 tokio::time::sleep(Duration::from_secs(1)).await;
1555
1556 mock_provider.logs_push(vec![
1557 "system ready",
1558 "system error",
1560 "system ready",
1561 ]);
1562
1563 assert!(task.await?.success());
1564
1565 Ok(())
1566 }
1567
1568 #[tokio::test(flavor = "multi_thread")]
1569 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1570 ) -> Result<(), anyhow::Error> {
1571 let mock_provider = Arc::new(MockNode::new());
1572 let mock_node = NetworkNode::new(
1573 "node1",
1574 "ws_uri",
1575 "prometheus_uri",
1576 "multiaddr",
1577 NodeSpec::default(),
1578 mock_provider.clone(),
1579 GenCmdOptions::default(),
1580 NodeContext::Rc,
1581 );
1582
1583 mock_provider.logs_push(vec![
1584 "system booting",
1585 "stub line 1",
1586 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1588 "stub line 2"
1589 ]);
1590
1591 let options = LogLineCountOptions {
1592 predicate: Arc::new(|n| n == 1),
1593 timeout: Duration::from_secs(6),
1594 wait_until_timeout_elapses: true,
1595 };
1596
1597 let task = tokio::spawn({
1598 async move {
1599 mock_node
1600 .wait_log_line_count_with_timeout(
1601 "error(?! importing block .*: block has an unknown parent)",
1602 false,
1603 options,
1604 )
1605 .await
1606 .unwrap()
1607 }
1608 });
1609
1610 tokio::time::sleep(Duration::from_secs(1)).await;
1611
1612 mock_provider.logs_push(vec!["system ready", "system ready"]);
1613
1614 assert!(!task.await?.success());
1615
1616 Ok(())
1617 }
1618
1619 #[tokio::test(flavor = "multi_thread")]
1620 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1621 let mock_provider = Arc::new(MockNode::new());
1622 let mock_node = NetworkNode::new(
1623 "node1",
1624 "ws_uri",
1625 "prometheus_uri",
1626 "multiaddr",
1627 NodeSpec::default(),
1628 mock_provider.clone(),
1629 GenCmdOptions::default(),
1630 NodeContext::Rc,
1631 );
1632
1633 mock_provider.logs_push(vec![
1634 "system booting",
1635 "stub line 1",
1636 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1638 "stub line 2"
1639 ]);
1640
1641 let task = tokio::spawn({
1642 async move {
1643 mock_node
1644 .wait_log_line_count(
1645 "error(?! importing block .*: block has an unknown parent)",
1646 false,
1647 1,
1648 )
1649 .await
1650 .unwrap()
1651 }
1652 });
1653
1654 tokio::time::sleep(Duration::from_secs(1)).await;
1655
1656 mock_provider.logs_push(vec![
1657 "system ready",
1658 "system error",
1660 "system ready",
1661 ]);
1662
1663 assert!(task.await.is_ok());
1664
1665 Ok(())
1666 }
1667
1668 #[tokio::test(flavor = "multi_thread")]
1669 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1670 let mock_provider = Arc::new(MockNode::new());
1671 let mock_node = NetworkNode::new(
1672 "node1",
1673 "ws_uri",
1674 "prometheus_uri",
1675 "multiaddr",
1676 NodeSpec::default(),
1677 mock_provider.clone(),
1678 GenCmdOptions::default(),
1679 NodeContext::Rc,
1680 );
1681
1682 mock_provider.logs_push(vec![
1683 "system booting",
1684 "stub line 1",
1685 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1687 "stub line 2"
1688 ]);
1689
1690 let options = LogLineCountOptions {
1691 predicate: Arc::new(|count| count == 1),
1692 timeout: Duration::from_secs(2),
1693 wait_until_timeout_elapses: true,
1694 };
1695
1696 let task = tokio::spawn({
1697 async move {
1698 mock_node
1700 .wait_log_line_count_with_timeout(
1701 "error(?! importing block .*: block has an unknown parent)",
1702 false,
1703 options,
1704 )
1705 .await
1706 .unwrap()
1707 }
1708 });
1709
1710 tokio::time::sleep(Duration::from_secs(1)).await;
1711
1712 mock_provider.logs_push(vec!["system ready", "system ready"]);
1713
1714 assert!(!task.await?.success());
1715
1716 Ok(())
1717 }
1718
1719 #[tokio::test]
1720 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1721 use std::sync::Arc;
1723
1724 let mock_metrics = concat!(
1726 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1727 "# TYPE substrate_block_verification_time histogram\n",
1728 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1729 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1730 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1731 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1732 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1733 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1734 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1735 );
1736
1737 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1739 let addr = listener.local_addr()?;
1740 let metrics = Arc::new(mock_metrics.to_string());
1741
1742 tokio::spawn({
1743 let metrics = metrics.clone();
1744 async move {
1745 loop {
1746 if let Ok((mut socket, _)) = listener.accept().await {
1747 let metrics = metrics.clone();
1748 tokio::spawn(async move {
1749 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1750 let mut buffer = [0; 1024];
1751 let _ = socket.read(&mut buffer).await;
1752
1753 let response = format!(
1754 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1755 metrics.len(),
1756 metrics
1757 );
1758 let _ = socket.write_all(response.as_bytes()).await;
1759 });
1760 }
1761 }
1762 }
1763 });
1764
1765 let mock_provider = Arc::new(MockNode::new());
1767 let mock_node = NetworkNode::new(
1768 "test_node",
1769 "ws://localhost:9944",
1770 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1771 "/ip4/127.0.0.1/tcp/30333",
1772 NodeSpec::default(),
1773 mock_provider,
1774 GenCmdOptions::default(),
1775 NodeContext::Rc,
1776 );
1777
1778 let mut label_filters = HashMap::new();
1780 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1781 let buckets = mock_node
1782 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1783 .await?;
1784
1785 assert_eq!(buckets.get("0.1"), Some(&10));
1787 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1794 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1795
1796 let buckets_filtered = mock_node
1797 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1798 .await?;
1799
1800 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1801 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1802
1803 let buckets_with_suffix = mock_node
1805 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1806 .await?;
1807
1808 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1809
1810 Ok(())
1811 }
1812
1813 #[tokio::test]
1814 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1815 use std::sync::Arc;
1817
1818 let mock_metrics = concat!(
1819 "# HELP test_metric A test metric\n",
1820 "# TYPE test_metric histogram\n",
1821 "test_metric_bucket{le=\"2.5\"} 40\n",
1822 "test_metric_bucket{le=\"0.1\"} 10\n",
1823 "test_metric_bucket{le=\"+Inf\"} 42\n",
1824 "test_metric_bucket{le=\"1.0\"} 35\n",
1825 "test_metric_bucket{le=\"0.5\"} 25\n",
1826 );
1827
1828 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1829 let addr = listener.local_addr()?;
1830 let metrics = Arc::new(mock_metrics.to_string());
1831
1832 tokio::spawn({
1833 let metrics = metrics.clone();
1834 async move {
1835 loop {
1836 if let Ok((mut socket, _)) = listener.accept().await {
1837 let metrics = metrics.clone();
1838 tokio::spawn(async move {
1839 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1840 let mut buffer = [0; 1024];
1841 let _ = socket.read(&mut buffer).await;
1842 let response = format!(
1843 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1844 metrics.len(),
1845 metrics
1846 );
1847 let _ = socket.write_all(response.as_bytes()).await;
1848 });
1849 }
1850 }
1851 }
1852 });
1853
1854 let mock_provider = Arc::new(MockNode::new());
1855 let mock_node = NetworkNode::new(
1856 "test_node",
1857 "ws://localhost:9944",
1858 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1859 "/ip4/127.0.0.1/tcp/30333",
1860 NodeSpec::default(),
1861 mock_provider,
1862 GenCmdOptions::default(),
1863 NodeContext::Rc,
1864 );
1865
1866 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1867
1868 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1876 }
1877
1878 #[tokio::test]
1879 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1880 use std::sync::Arc;
1882
1883 let mock_metrics = concat!(
1884 "# HELP test_metric A test metric\n",
1885 "# TYPE test_metric histogram\n",
1886 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1887 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1888 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1889 );
1890
1891 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1892 let addr = listener.local_addr()?;
1893 let metrics = Arc::new(mock_metrics.to_string());
1894
1895 tokio::spawn({
1896 let metrics = metrics.clone();
1897 async move {
1898 loop {
1899 if let Ok((mut socket, _)) = listener.accept().await {
1900 let metrics = metrics.clone();
1901 tokio::spawn(async move {
1902 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1903 let mut buffer = [0; 1024];
1904 let _ = socket.read(&mut buffer).await;
1905 let response = format!(
1906 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1907 metrics.len(),
1908 metrics
1909 );
1910 let _ = socket.write_all(response.as_bytes()).await;
1911 });
1912 }
1913 }
1914 }
1915 });
1916
1917 let mock_provider = Arc::new(MockNode::new());
1918 let mock_node = NetworkNode::new(
1919 "test_node",
1920 "ws://localhost:9944",
1921 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1922 "/ip4/127.0.0.1/tcp/30333",
1923 NodeSpec::default(),
1924 mock_provider,
1925 GenCmdOptions::default(),
1926 NodeContext::Rc,
1927 );
1928
1929 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1931 assert_eq!(buckets.get("0.1"), Some(&5));
1932 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1937 label_filters.insert("method".to_string(), "GET,POST".to_string());
1938
1939 let buckets_filtered = mock_node
1940 .get_histogram_buckets("test_metric", Some(label_filters))
1941 .await?;
1942
1943 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1944 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1945
1946 Ok(())
1947 }
1948
1949 #[test]
1950 fn test_compare_le_values() {
1951 use std::cmp::Ordering;
1952
1953 use crate::network::node::NetworkNode;
1954
1955 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1957 assert_eq!(
1958 NetworkNode::compare_le_values("1.0", "0.5"),
1959 Ordering::Greater
1960 );
1961 assert_eq!(
1962 NetworkNode::compare_le_values("1.0", "1.0"),
1963 Ordering::Equal
1964 );
1965
1966 assert_eq!(
1968 NetworkNode::compare_le_values("+Inf", "999"),
1969 Ordering::Greater
1970 );
1971 assert_eq!(
1972 NetworkNode::compare_le_values("0.1", "+Inf"),
1973 Ordering::Less
1974 );
1975 assert_eq!(
1976 NetworkNode::compare_le_values("+Inf", "+Inf"),
1977 Ordering::Equal
1978 );
1979
1980 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1982 assert_eq!(
1983 NetworkNode::compare_le_values("1000", "999"),
1984 Ordering::Greater
1985 );
1986 }
1987}