1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
5 Arc,
6 },
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15 types::{ExecutionResult, RunScriptOptions},
16 DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{
26 network_spec::node::NodeSpec, shared::constants::PROCESS_START_TIME_METRIC,
27 tx_helper::client::get_client_from_url,
28};
29
30type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
31
32#[derive(Error, Debug)]
33pub enum NetworkNodeError {
34 #[error("metric '{0}' not found!")]
35 MetricNotFound(String),
36}
37
38const MONITOR_TARGET: &str = "zombie_monitor";
40#[derive(Clone, Serialize)]
41pub struct NetworkNode {
42 #[serde(serialize_with = "serialize_provider_node")]
43 pub(crate) inner: DynNode,
44 pub(crate) spec: NodeSpec,
47 pub(crate) name: String,
48 pub(crate) ws_uri: String,
49 pub(crate) multiaddr: String,
50 pub(crate) prometheus_uri: String,
51 #[serde(skip)]
52 metrics_cache: Arc<RwLock<MetricMap>>,
53 #[serde(skip)]
54 is_running: Arc<AtomicBool>,
55 #[serde(skip)]
57 last_start_ts: Arc<AtomicU64>,
58}
59
60#[derive(Deserialize)]
61pub(crate) struct RawNetworkNode {
62 pub(crate) name: String,
63 pub(crate) ws_uri: String,
64 pub(crate) prometheus_uri: String,
65 pub(crate) multiaddr: String,
66 pub(crate) spec: NodeSpec,
67 #[serde(default)]
68 pub(crate) inner: serde_json::Value,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum WaitCount {
82 TargetReached(u32),
83 TargetFailed(u32),
84}
85
86impl WaitCount {
87 pub fn success(&self) -> bool {
88 match self {
89 Self::TargetReached(..) => true,
90 Self::TargetFailed(..) => false,
91 }
92 }
93}
94
95pub type LogLineCount = WaitCount;
96
97#[derive(Clone)]
110pub struct CountOptions {
111 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
112 pub timeout: Duration,
113 pub wait_until_timeout_elapses: bool,
114}
115
116impl CountOptions {
117 pub fn new(
118 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
119 timeout: Duration,
120 wait_until_timeout_elapses: bool,
121 ) -> Self {
122 Self {
123 predicate: Arc::new(predicate),
124 timeout,
125 wait_until_timeout_elapses,
126 }
127 }
128
129 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
130 Self::new(|n| n == 0, timeout, true)
131 }
132
133 pub fn at_least_once(timeout: Duration) -> Self {
134 Self::new(|count| count >= 1, timeout, false)
135 }
136
137 pub fn at_least(target: u32, timeout: Duration) -> Self {
138 Self::new(move |count| count >= target, timeout, false)
139 }
140
141 pub fn exactly_once(timeout: Duration) -> Self {
142 Self::new(|count| count == 1, timeout, false)
143 }
144}
145
146pub type LogLineCountOptions = CountOptions;
147
148impl NetworkNode {
149 pub(crate) fn new<T: Into<String>>(
151 name: T,
152 ws_uri: T,
153 prometheus_uri: T,
154 multiaddr: T,
155 spec: NodeSpec,
156 inner: DynNode,
157 ) -> Self {
158 Self {
159 name: name.into(),
160 ws_uri: ws_uri.into(),
161 prometheus_uri: prometheus_uri.into(),
162 inner,
163 spec,
164 multiaddr: multiaddr.into(),
165 metrics_cache: Arc::new(Default::default()),
166 is_running: Arc::new(AtomicBool::new(false)),
167 last_start_ts: Arc::new(AtomicU64::new(0)),
168 }
169 }
170
171 pub fn is_running(&self) -> bool {
175 self.is_running.load(Ordering::Acquire)
176 }
177
178 pub fn last_start_ts(&self) -> u64 {
180 self.last_start_ts.load(Ordering::Acquire)
181 }
182
183 pub async fn is_responsive(&self) -> bool {
190 tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
191 .await
192 .is_ok()
193 }
194
195 pub(crate) fn set_is_running(&self, is_running: bool) {
196 self.is_running.store(is_running, Ordering::Release);
197 }
198
199 pub(crate) fn set_last_start_ts(&self, ts: u64) {
201 self.last_start_ts.store(ts, Ordering::Release);
202 }
203
204 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
205 self.multiaddr = multiaddr.into();
206 }
207
208 pub fn name(&self) -> &str {
209 &self.name
210 }
211
212 pub fn args(&self) -> Vec<&str> {
213 self.inner.args()
214 }
215
216 pub fn spec(&self) -> &NodeSpec {
217 &self.spec
218 }
219
220 pub fn ws_uri(&self) -> &str {
221 &self.ws_uri
222 }
223
224 pub fn multiaddr(&self) -> &str {
225 self.multiaddr.as_ref()
226 }
227
228 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
232 get_client_from_url(&self.ws_uri).await
233 }
234
235 #[deprecated = "Use `wait_client` instead."]
237 pub async fn client<Config: subxt::Config>(
238 &self,
239 ) -> Result<OnlineClient<Config>, subxt::Error> {
240 self.try_client().await
241 }
242
243 pub async fn try_client<Config: subxt::Config>(
252 &self,
253 ) -> Result<OnlineClient<Config>, subxt::Error> {
254 get_client_from_url(&self.ws_uri).await
255 }
256
257 pub async fn wait_client<Config: subxt::Config>(
259 &self,
260 ) -> Result<OnlineClient<Config>, anyhow::Error> {
261 debug!("wait_client ws_uri: {}", self.ws_uri());
262 wait_ws_ready(self.ws_uri())
263 .await
264 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
265
266 self.try_client()
267 .await
268 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
269 }
270
271 pub async fn wait_client_with_timeout<Config: subxt::Config>(
273 &self,
274 timeout_secs: impl Into<u64>,
275 ) -> Result<OnlineClient<Config>, anyhow::Error> {
276 debug!("waiting until subxt client is ready");
277 tokio::time::timeout(
278 Duration::from_secs(timeout_secs.into()),
279 self.wait_client::<Config>(),
280 )
281 .await?
282 }
283
284 pub async fn pause(&self) -> Result<(), anyhow::Error> {
292 self.set_is_running(false);
293 self.inner.pause().await?;
294 Ok(())
295 }
296
297 pub async fn resume(&self) -> Result<(), anyhow::Error> {
303 self.set_is_running(true);
304 self.inner.resume().await?;
305 Ok(())
306 }
307
308 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
313 self.set_is_running(false);
314 self.inner.restart(after).await?;
315 self.set_is_running(true);
316 self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
317 Ok(())
318 }
319
320 pub async fn run_script(
327 &self,
328 options: RunScriptOptions,
329 ) -> Result<ExecutionResult, anyhow::Error> {
330 self.inner
331 .run_script(options)
332 .await
333 .map_err(|e| anyhow!("Failed to run script: {e}"))
334 }
335
336 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
344 let metric_name = metric_name.into();
345 self.fetch_metrics().await?;
347 self.metric(&metric_name, true).await
349 }
350
351 pub async fn assert(
360 &self,
361 metric_name: impl Into<String>,
362 value: impl Into<f64>,
363 ) -> Result<bool, anyhow::Error> {
364 let value: f64 = value.into();
365 self.assert_with(metric_name, |v| v == value).await
366 }
367
368 pub async fn assert_with(
371 &self,
372 metric_name: impl Into<String>,
373 predicate: impl Fn(f64) -> bool,
374 ) -> Result<bool, anyhow::Error> {
375 let metric_name = metric_name.into();
376 self.fetch_metrics().await?;
378 let val = self.metric(&metric_name, true).await?;
379 let log_msg = format!("🔎 Current value {val} passed to the predicated?");
380 if metric_name == PROCESS_START_TIME_METRIC {
381 trace!(target: MONITOR_TARGET, "{log_msg}");
382 } else {
383 trace!("{log_msg}");
384 }
385 Ok(predicate(val))
386 }
387
388 pub async fn wait_metric(
392 &self,
393 metric_name: impl Into<String>,
394 predicate: impl Fn(f64) -> bool,
395 ) -> Result<(), anyhow::Error> {
396 let metric_name = metric_name.into();
397 let log_msg = format!(
398 "[{}] waiting until metric {metric_name} pass the predicate",
399 self.name()
400 );
401 if metric_name == PROCESS_START_TIME_METRIC {
402 trace!(target: MONITOR_TARGET, "{log_msg}");
403 } else {
404 trace!("{log_msg}");
405 }
406
407 loop {
408 let res = self.assert_with(&metric_name, &predicate).await;
409 let log_msg = format!("res: {res:?}");
410 match res {
411 Ok(res) => {
412 if res {
413 return Ok(());
414 }
415 },
416 Err(e) => match e.downcast::<reqwest::Error>() {
417 Ok(io_err) => {
418 if !skip_err_while_waiting(&io_err) {
419 return Err(io_err.into());
420 }
421 },
422 Err(other) => {
423 match other.downcast::<NetworkNodeError>() {
424 Ok(node_err) => {
425 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
426 return Err(node_err.into());
427 }
428 },
429 Err(other) => return Err(other),
430 };
431 },
432 },
433 }
434
435 if metric_name == PROCESS_START_TIME_METRIC {
436 trace!(target: MONITOR_TARGET, "{log_msg}");
437 } else {
438 trace!("{log_msg}");
439 }
440
441 tokio::time::sleep(Duration::from_secs(1)).await;
443 }
444 }
445
446 pub async fn wait_metric_with_timeout(
449 &self,
450 metric_name: impl Into<String>,
451 predicate: impl Fn(f64) -> bool,
452 timeout_secs: impl Into<u64>,
453 ) -> Result<(), anyhow::Error> {
454 let metric_name = metric_name.into();
455 let secs = timeout_secs.into();
456 let log_msg = format!(
457 "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
458 self.name()
459 );
460
461 if metric_name == PROCESS_START_TIME_METRIC {
462 trace!(target: MONITOR_TARGET, "{log_msg}");
463 } else {
464 debug!("{log_msg}");
465 }
466
467 let res = tokio::time::timeout(
468 Duration::from_secs(secs),
469 self.wait_metric(&metric_name, predicate),
470 )
471 .await;
472
473 if let Ok(inner_res) = res {
474 match inner_res {
475 Ok(_) => Ok(()),
476 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
477 }
478 } else {
479 Err(anyhow!(
481 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
482 ))
483 }
484 }
485
486 pub async fn logs(&self) -> Result<String, anyhow::Error> {
491 Ok(self.inner.logs().await?)
492 }
493
494 pub async fn wait_log_line_count(
496 &self,
497 pattern: impl Into<String>,
498 is_glob: bool,
499 count: usize,
500 ) -> Result<(), anyhow::Error> {
501 let pattern = pattern.into();
502 let pattern_clone = pattern.clone();
503 debug!("waiting until we find pattern {pattern} {count} times");
504 let match_fn: BoxedClosure = if is_glob {
505 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
506 } else {
507 let re = Regex::new(&pattern)?;
508 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
509 };
510
511 loop {
512 let mut q = 0_usize;
513 let logs = self.logs().await?;
514 for line in logs.lines() {
515 trace!("line is {line}");
516 if match_fn(line)? {
517 trace!("pattern {pattern_clone} match in line {line}");
518 q += 1;
519 if q >= count {
520 return Ok(());
521 }
522 }
523 }
524
525 tokio::time::sleep(Duration::from_secs(2)).await;
526 }
527 }
528
529 pub async fn wait_log_line_count_with_timeout(
573 &self,
574 substring: impl Into<String>,
575 is_glob: bool,
576 options: LogLineCountOptions,
577 ) -> Result<LogLineCount, anyhow::Error> {
578 let substring = substring.into();
579 debug!(
580 "waiting until match lines count within {} seconds",
581 options.timeout.as_secs_f64()
582 );
583
584 let start = tokio::time::Instant::now();
585
586 let match_fn: BoxedClosure = if is_glob {
587 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
588 } else {
589 let re = Regex::new(&substring)?;
590 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
591 };
592
593 if options.wait_until_timeout_elapses {
594 tokio::time::sleep(options.timeout).await;
595 }
596
597 let mut q;
598 loop {
599 q = 0_u32;
600 let logs = self.logs().await?;
601 for line in logs.lines() {
602 if match_fn(line)? {
603 q += 1;
604
605 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
610 return Ok(LogLineCount::TargetReached(q));
611 }
612 }
613 }
614
615 if start.elapsed() >= options.timeout {
616 break;
617 }
618
619 tokio::time::sleep(Duration::from_secs(2)).await;
620 }
621
622 if (options.predicate)(q) {
623 Ok(LogLineCount::TargetReached(q))
624 } else {
625 Ok(LogLineCount::TargetFailed(q))
626 }
627 }
628
629 pub async fn wait_event_count_with_timeout(
673 &self,
674 pallet: impl Into<String>,
675 variant: impl Into<String>,
676 options: CountOptions,
677 ) -> Result<WaitCount, anyhow::Error> {
678 let pallet = pallet.into();
679 let variant = variant.into();
680 debug!(
681 "waiting until match event ({pallet} {variant}) count within {} seconds",
682 options.timeout.as_secs_f64()
683 );
684
685 let init_value = Arc::new(AtomicU32::new(0));
686
687 let res = tokio::time::timeout(
688 options.timeout,
689 self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
690 )
691 .await;
692
693 let q = init_value.load(Ordering::Relaxed);
694 if let Ok(inner_res) = res {
695 match inner_res {
696 Ok(_) => Ok(WaitCount::TargetReached(q)),
697 Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
698 }
699 } else {
700 if options.wait_until_timeout_elapses {
702 let q = init_value.load(Ordering::Relaxed);
703 if (options.predicate)(q) {
704 Ok(LogLineCount::TargetReached(q))
705 } else {
706 Ok(LogLineCount::TargetFailed(q))
707 }
708 } else {
709 Err(anyhow!(
710 "Timeout ({}), waiting for counter",
711 options.timeout.as_secs()
712 ))
713 }
714 }
715 }
716
717 async fn wait_event_count(
719 &self,
720 pallet: &str,
721 variant: &str,
722 options: &CountOptions,
723 init_count: Arc<AtomicU32>,
724 ) -> Result<(), anyhow::Error> {
725 let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
726 let mut blocks_sub: subxt::backend::StreamOf<
727 Result<
728 subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
729 subxt::Error,
730 >,
731 > = client.blocks().subscribe_finalized().await?;
732 while let Some(block) = blocks_sub.next().await {
733 let events = block?.events().await?;
734 for event in events.iter() {
735 let evt = event?;
736 if evt.pallet_name() == pallet && evt.variant_name() == variant {
737 let old_value = init_count.fetch_add(1, Ordering::Relaxed);
738 if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
739 return Ok(());
740 }
741 }
742 }
743 }
744
745 Ok(())
746 }
747
748 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
749 let response = reqwest::get(&self.prometheus_uri).await?;
750 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
751 let mut cache = self.metrics_cache.write().await;
752 *cache = metrics;
753 Ok(())
754 }
755
756 async fn metric(
758 &self,
759 metric_name: &str,
760 treat_not_found_as_zero: bool,
761 ) -> Result<f64, anyhow::Error> {
762 let mut metrics_map = self.metrics_cache.read().await;
763 if metrics_map.is_empty() {
764 drop(metrics_map);
766 self.fetch_metrics().await?;
767 metrics_map = self.metrics_cache.read().await;
768 }
769
770 if let Some(val) = metrics_map.get(metric_name) {
771 Ok(*val)
772 } else if treat_not_found_as_zero {
773 Ok(0_f64)
774 } else {
775 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
776 }
777 }
778
779 pub async fn get_histogram_buckets(
799 &self,
800 metric_name: impl AsRef<str>,
801 label_filters: Option<HashMap<String, String>>,
802 ) -> Result<HashMap<String, u64>, anyhow::Error> {
803 let metric_name = metric_name.as_ref();
804
805 let response = reqwest::get(&self.prometheus_uri).await?;
807 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
808
809 let resolved_metric_name = if metric_name.contains("_bucket") {
811 metric_name.to_string()
812 } else {
813 format!("{}_bucket", metric_name)
814 };
815
816 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
820
821 for (key, &value) in metrics.iter() {
822 if !key.starts_with(&resolved_metric_name) {
823 continue;
824 }
825
826 let remaining = &key[resolved_metric_name.len()..];
827
828 let labels_str = &remaining[1..remaining.len() - 1];
829 let parsed_labels = Self::parse_label_string(labels_str);
830
831 if !parsed_labels.contains_key("le") {
833 continue;
834 }
835
836 if let Some(ref filters) = label_filters {
838 let mut all_match = true;
839 for (filter_key, filter_value) in filters {
840 if parsed_labels.get(filter_key) != Some(filter_value) {
841 all_match = false;
842 break;
843 }
844 }
845 if !all_match {
846 continue;
847 }
848 }
849
850 metric_entries.push((key.clone(), parsed_labels, value as u64));
851 }
852
853 let max_label_count = metric_entries
856 .iter()
857 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
858 .max()
859 .unwrap_or(0);
860
861 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
863 let mut seen_le_values = HashSet::new();
864 let mut active_series: Option<Vec<(String, String)>> = None;
865
866 for (_, parsed_labels, value) in metric_entries {
867 let le_value = parsed_labels.get("le").unwrap().clone();
868
869 let mut non_le_labels: Vec<(String, String)> = parsed_labels
871 .iter()
872 .filter(|(k, _)| k.as_str() != "le")
873 .map(|(k, v)| (k.clone(), v.clone()))
874 .collect();
875 non_le_labels.sort();
876
877 if non_le_labels.len() < max_label_count {
880 continue;
881 }
882
883 if let Some(ref prev_series) = active_series {
885 if prev_series != &non_le_labels {
886 if !raw_buckets.is_empty() {
887 break; }
889 active_series = Some(non_le_labels.clone());
890 seen_le_values.clear();
891 }
892 } else {
893 active_series = Some(non_le_labels.clone());
894 }
895
896 if !seen_le_values.insert(le_value.clone()) {
898 continue;
899 }
900
901 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
902 raw_buckets.push((le_value, value));
903 }
904
905 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
907
908 let mut buckets = HashMap::new();
910 let mut previous_value = 0_u64;
911 for (le, cumulative_count) in raw_buckets {
912 if cumulative_count < previous_value {
913 warn!(
914 "Warning: bucket count decreased from {} to {} at le={}",
915 previous_value, cumulative_count, le
916 );
917 }
918 let delta = cumulative_count.saturating_sub(previous_value);
919 buckets.insert(le, delta);
920 previous_value = cumulative_count;
921 }
922
923 Ok(buckets)
924 }
925
926 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
932 let mut labels = HashMap::new();
933 let mut current_key = String::new();
934 let mut current_value = String::new();
935 let mut in_value = false;
936 let mut in_quotes = false;
937
938 for ch in labels_str.chars() {
939 match ch {
940 '=' if !in_quotes && !in_value => {
941 in_value = true;
942 },
943 '"' if in_value => {
944 in_quotes = !in_quotes;
945 },
946 ',' if !in_quotes => {
947 if !current_key.is_empty() {
949 labels.insert(
950 current_key.trim().to_string(),
951 current_value.trim().to_string(),
952 );
953 current_key.clear();
954 current_value.clear();
955 in_value = false;
956 }
957 },
958 _ => {
959 if in_value {
960 current_value.push(ch);
961 } else {
962 current_key.push(ch);
963 }
964 },
965 }
966 }
967
968 if !current_key.is_empty() {
970 labels.insert(
971 current_key.trim().to_string(),
972 current_value.trim().to_string(),
973 );
974 }
975
976 labels
977 }
978
979 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
983 use std::cmp::Ordering;
984
985 match (a, b) {
987 ("+Inf", "+Inf") => Ordering::Equal,
988 ("+Inf", _) => Ordering::Greater,
989 (_, "+Inf") => Ordering::Less,
990 _ => {
991 match (a.parse::<f64>(), b.parse::<f64>()) {
993 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
994 _ => a.cmp(b),
996 }
997 },
998 }
999 }
1000
1001 pub async fn wait_until_is_up(
1013 &self,
1014 timeout_secs: impl Into<u64>,
1015 ) -> Result<(), anyhow::Error> {
1016 self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1017 .await
1018 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1019 }
1020}
1021
1022impl std::fmt::Debug for NetworkNode {
1023 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1024 f.debug_struct("NetworkNode")
1025 .field("inner", &"inner_skipped")
1026 .field("spec", &self.spec)
1027 .field("name", &self.name)
1028 .field("ws_uri", &self.ws_uri)
1029 .field("prometheus_uri", &self.prometheus_uri)
1030 .finish()
1031 }
1032}
1033
1034fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1035where
1036 S: Serializer,
1037{
1038 erased_serde::serialize(node.as_ref(), serializer)
1039}
1040
1041#[cfg(test)]
1043mod tests {
1044 use std::{
1045 path::{Path, PathBuf},
1046 sync::{Arc, Mutex},
1047 };
1048
1049 use async_trait::async_trait;
1050 use provider::{types::*, ProviderError, ProviderNode};
1051
1052 use super::*;
1053
1054 #[derive(Serialize)]
1055 struct MockNode {
1056 logs: Arc<Mutex<Vec<String>>>,
1057 }
1058
1059 impl MockNode {
1060 fn new() -> Self {
1061 Self {
1062 logs: Arc::new(Mutex::new(vec![])),
1063 }
1064 }
1065
1066 fn logs_push(&self, lines: Vec<impl Into<String>>) {
1067 self.logs
1068 .lock()
1069 .unwrap()
1070 .extend(lines.into_iter().map(|l| l.into()));
1071 }
1072 }
1073
1074 #[async_trait]
1075 impl ProviderNode for MockNode {
1076 fn name(&self) -> &str {
1077 todo!()
1078 }
1079
1080 fn args(&self) -> Vec<&str> {
1081 todo!()
1082 }
1083
1084 fn base_dir(&self) -> &PathBuf {
1085 todo!()
1086 }
1087
1088 fn config_dir(&self) -> &PathBuf {
1089 todo!()
1090 }
1091
1092 fn data_dir(&self) -> &PathBuf {
1093 todo!()
1094 }
1095
1096 fn relay_data_dir(&self) -> &PathBuf {
1097 todo!()
1098 }
1099
1100 fn scripts_dir(&self) -> &PathBuf {
1101 todo!()
1102 }
1103
1104 fn log_path(&self) -> &PathBuf {
1105 todo!()
1106 }
1107
1108 fn log_cmd(&self) -> String {
1109 todo!()
1110 }
1111
1112 fn path_in_node(&self, _file: &Path) -> PathBuf {
1113 todo!()
1114 }
1115
1116 async fn logs(&self) -> Result<String, ProviderError> {
1117 Ok(self.logs.lock().unwrap().join("\n"))
1118 }
1119
1120 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1121 todo!()
1122 }
1123
1124 async fn run_command(
1125 &self,
1126 _options: RunCommandOptions,
1127 ) -> Result<ExecutionResult, ProviderError> {
1128 todo!()
1129 }
1130
1131 async fn run_script(
1132 &self,
1133 _options: RunScriptOptions,
1134 ) -> Result<ExecutionResult, ProviderError> {
1135 todo!()
1136 }
1137
1138 async fn send_file(
1139 &self,
1140 _local_file_path: &Path,
1141 _remote_file_path: &Path,
1142 _mode: &str,
1143 ) -> Result<(), ProviderError> {
1144 todo!()
1145 }
1146
1147 async fn receive_file(
1148 &self,
1149 _remote_file_path: &Path,
1150 _local_file_path: &Path,
1151 ) -> Result<(), ProviderError> {
1152 todo!()
1153 }
1154
1155 async fn pause(&self) -> Result<(), ProviderError> {
1156 todo!()
1157 }
1158
1159 async fn resume(&self) -> Result<(), ProviderError> {
1160 todo!()
1161 }
1162
1163 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1164 todo!()
1165 }
1166
1167 async fn destroy(&self) -> Result<(), ProviderError> {
1168 todo!()
1169 }
1170 }
1171
1172 #[tokio::test(flavor = "multi_thread")]
1173 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1174 let mock_provider = Arc::new(MockNode::new());
1175 let mock_node = NetworkNode::new(
1176 "node1",
1177 "ws_uri",
1178 "prometheus_uri",
1179 "multiaddr",
1180 NodeSpec::default(),
1181 mock_provider.clone(),
1182 );
1183
1184 mock_provider.logs_push(vec![
1185 "system booting",
1186 "stub line 1",
1187 "stub line 2",
1188 "system ready",
1189 ]);
1190
1191 let options = LogLineCountOptions {
1193 predicate: Arc::new(|n| n == 1),
1194 timeout: Duration::from_secs(10),
1195 wait_until_timeout_elapses: false,
1196 };
1197
1198 let log_line_count = mock_node
1199 .wait_log_line_count_with_timeout("system ready", false, options)
1200 .await?;
1201
1202 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1203
1204 Ok(())
1205 }
1206
1207 #[tokio::test(flavor = "multi_thread")]
1208 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1209 let mock_provider = Arc::new(MockNode::new());
1210 let mock_node = NetworkNode::new(
1211 "node1",
1212 "ws_uri",
1213 "prometheus_uri",
1214 "multiaddr",
1215 NodeSpec::default(),
1216 mock_provider.clone(),
1217 );
1218
1219 mock_provider.logs_push(vec![
1220 "system booting",
1221 "stub line 1",
1222 "stub line 2",
1223 "system ready",
1224 ]);
1225
1226 let options = LogLineCountOptions {
1228 predicate: Arc::new(|n| n == 2),
1229 timeout: Duration::from_secs(4),
1230 wait_until_timeout_elapses: false,
1231 };
1232
1233 let task = tokio::spawn({
1234 async move {
1235 mock_node
1236 .wait_log_line_count_with_timeout("system ready", false, options)
1237 .await
1238 .unwrap()
1239 }
1240 });
1241
1242 tokio::time::sleep(Duration::from_secs(2)).await;
1243
1244 mock_provider.logs_push(vec!["system ready"]);
1245
1246 let log_line_count = task.await?;
1247
1248 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1249
1250 Ok(())
1251 }
1252
1253 #[tokio::test(flavor = "multi_thread")]
1254 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1255 let mock_provider = Arc::new(MockNode::new());
1256 let mock_node = NetworkNode::new(
1257 "node1",
1258 "ws_uri",
1259 "prometheus_uri",
1260 "multiaddr",
1261 NodeSpec::default(),
1262 mock_provider.clone(),
1263 );
1264
1265 mock_provider.logs_push(vec![
1266 "system booting",
1267 "stub line 1",
1268 "stub line 2",
1269 "system ready",
1270 ]);
1271
1272 let options = LogLineCountOptions {
1274 predicate: Arc::new(|n| n == 2),
1275 timeout: Duration::from_secs(2),
1276 wait_until_timeout_elapses: false,
1277 };
1278
1279 let log_line_count = mock_node
1280 .wait_log_line_count_with_timeout("system ready", false, options)
1281 .await?;
1282
1283 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1284
1285 Ok(())
1286 }
1287
1288 #[tokio::test(flavor = "multi_thread")]
1289 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1290 let mock_provider = Arc::new(MockNode::new());
1291 let mock_node = NetworkNode::new(
1292 "node1",
1293 "ws_uri",
1294 "prometheus_uri",
1295 "multiaddr",
1296 NodeSpec::default(),
1297 mock_provider.clone(),
1298 );
1299
1300 mock_provider.logs_push(vec![
1301 "system booting",
1302 "stub line 1",
1303 "stub line 2",
1304 "system ready",
1305 ]);
1306
1307 let options = LogLineCountOptions {
1309 predicate: Arc::new(|n| n == 2),
1310 timeout: Duration::from_secs(2),
1311 wait_until_timeout_elapses: true,
1312 };
1313
1314 let task = tokio::spawn({
1315 async move {
1316 mock_node
1317 .wait_log_line_count_with_timeout("system ready", false, options)
1318 .await
1319 .unwrap()
1320 }
1321 });
1322
1323 tokio::time::sleep(Duration::from_secs(1)).await;
1324
1325 mock_provider.logs_push(vec!["system ready"]);
1326 mock_provider.logs_push(vec!["system ready"]);
1327
1328 let log_line_count = task.await?;
1329
1330 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1331
1332 Ok(())
1333 }
1334
1335 #[tokio::test(flavor = "multi_thread")]
1336 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1337 let mock_provider = Arc::new(MockNode::new());
1338 let mock_node = NetworkNode::new(
1339 "node1",
1340 "ws_uri",
1341 "prometheus_uri",
1342 "multiaddr",
1343 NodeSpec::default(),
1344 mock_provider.clone(),
1345 );
1346
1347 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1348
1349 let task = tokio::spawn({
1350 async move {
1351 mock_node
1352 .wait_log_line_count_with_timeout(
1353 "system ready",
1354 false,
1355 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1357 )
1358 .await
1359 .unwrap()
1360 }
1361 });
1362
1363 tokio::time::sleep(Duration::from_secs(1)).await;
1364
1365 mock_provider.logs_push(vec!["stub line 3"]);
1366
1367 assert!(task.await?.success());
1368
1369 Ok(())
1370 }
1371
1372 #[tokio::test(flavor = "multi_thread")]
1373 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1374 let mock_provider = Arc::new(MockNode::new());
1375 let mock_node = NetworkNode::new(
1376 "node1",
1377 "ws_uri",
1378 "prometheus_uri",
1379 "multiaddr",
1380 NodeSpec::default(),
1381 mock_provider.clone(),
1382 );
1383
1384 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1385
1386 let options = LogLineCountOptions {
1388 predicate: Arc::new(|n| (2..=5).contains(&n)),
1389 timeout: Duration::from_secs(2),
1390 wait_until_timeout_elapses: true,
1391 };
1392
1393 let task = tokio::spawn({
1394 async move {
1395 mock_node
1396 .wait_log_line_count_with_timeout("system ready", false, options)
1397 .await
1398 .unwrap()
1399 }
1400 });
1401
1402 tokio::time::sleep(Duration::from_secs(1)).await;
1403
1404 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1405
1406 assert!(task.await?.success());
1407
1408 Ok(())
1409 }
1410
1411 #[tokio::test(flavor = "multi_thread")]
1412 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1413 let mock_provider = Arc::new(MockNode::new());
1414 let mock_node = NetworkNode::new(
1415 "node1",
1416 "ws_uri",
1417 "prometheus_uri",
1418 "multiaddr",
1419 NodeSpec::default(),
1420 mock_provider.clone(),
1421 );
1422
1423 mock_provider.logs_push(vec![
1424 "system booting",
1425 "stub line 1",
1426 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1428 "stub line 2"
1429 ]);
1430
1431 let options = LogLineCountOptions {
1432 predicate: Arc::new(|n| n == 1),
1433 timeout: Duration::from_secs(3),
1434 wait_until_timeout_elapses: true,
1435 };
1436
1437 let task = tokio::spawn({
1438 async move {
1439 mock_node
1440 .wait_log_line_count_with_timeout(
1441 "error(?! importing block .*: block has an unknown parent)",
1442 false,
1443 options,
1444 )
1445 .await
1446 .unwrap()
1447 }
1448 });
1449
1450 tokio::time::sleep(Duration::from_secs(1)).await;
1451
1452 mock_provider.logs_push(vec![
1453 "system ready",
1454 "system error",
1456 "system ready",
1457 ]);
1458
1459 assert!(task.await?.success());
1460
1461 Ok(())
1462 }
1463
1464 #[tokio::test(flavor = "multi_thread")]
1465 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1466 ) -> Result<(), anyhow::Error> {
1467 let mock_provider = Arc::new(MockNode::new());
1468 let mock_node = NetworkNode::new(
1469 "node1",
1470 "ws_uri",
1471 "prometheus_uri",
1472 "multiaddr",
1473 NodeSpec::default(),
1474 mock_provider.clone(),
1475 );
1476
1477 mock_provider.logs_push(vec![
1478 "system booting",
1479 "stub line 1",
1480 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1482 "stub line 2"
1483 ]);
1484
1485 let options = LogLineCountOptions {
1486 predicate: Arc::new(|n| n == 1),
1487 timeout: Duration::from_secs(6),
1488 wait_until_timeout_elapses: true,
1489 };
1490
1491 let task = tokio::spawn({
1492 async move {
1493 mock_node
1494 .wait_log_line_count_with_timeout(
1495 "error(?! importing block .*: block has an unknown parent)",
1496 false,
1497 options,
1498 )
1499 .await
1500 .unwrap()
1501 }
1502 });
1503
1504 tokio::time::sleep(Duration::from_secs(1)).await;
1505
1506 mock_provider.logs_push(vec!["system ready", "system ready"]);
1507
1508 assert!(!task.await?.success());
1509
1510 Ok(())
1511 }
1512
1513 #[tokio::test(flavor = "multi_thread")]
1514 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1515 let mock_provider = Arc::new(MockNode::new());
1516 let mock_node = NetworkNode::new(
1517 "node1",
1518 "ws_uri",
1519 "prometheus_uri",
1520 "multiaddr",
1521 NodeSpec::default(),
1522 mock_provider.clone(),
1523 );
1524
1525 mock_provider.logs_push(vec![
1526 "system booting",
1527 "stub line 1",
1528 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1530 "stub line 2"
1531 ]);
1532
1533 let task = tokio::spawn({
1534 async move {
1535 mock_node
1536 .wait_log_line_count(
1537 "error(?! importing block .*: block has an unknown parent)",
1538 false,
1539 1,
1540 )
1541 .await
1542 .unwrap()
1543 }
1544 });
1545
1546 tokio::time::sleep(Duration::from_secs(1)).await;
1547
1548 mock_provider.logs_push(vec![
1549 "system ready",
1550 "system error",
1552 "system ready",
1553 ]);
1554
1555 assert!(task.await.is_ok());
1556
1557 Ok(())
1558 }
1559
1560 #[tokio::test(flavor = "multi_thread")]
1561 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1562 let mock_provider = Arc::new(MockNode::new());
1563 let mock_node = NetworkNode::new(
1564 "node1",
1565 "ws_uri",
1566 "prometheus_uri",
1567 "multiaddr",
1568 NodeSpec::default(),
1569 mock_provider.clone(),
1570 );
1571
1572 mock_provider.logs_push(vec![
1573 "system booting",
1574 "stub line 1",
1575 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1577 "stub line 2"
1578 ]);
1579
1580 let options = LogLineCountOptions {
1581 predicate: Arc::new(|count| count == 1),
1582 timeout: Duration::from_secs(2),
1583 wait_until_timeout_elapses: true,
1584 };
1585
1586 let task = tokio::spawn({
1587 async move {
1588 mock_node
1590 .wait_log_line_count_with_timeout(
1591 "error(?! importing block .*: block has an unknown parent)",
1592 false,
1593 options,
1594 )
1595 .await
1596 .unwrap()
1597 }
1598 });
1599
1600 tokio::time::sleep(Duration::from_secs(1)).await;
1601
1602 mock_provider.logs_push(vec!["system ready", "system ready"]);
1603
1604 assert!(!task.await?.success());
1605
1606 Ok(())
1607 }
1608
1609 #[tokio::test]
1610 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1611 use std::sync::Arc;
1613
1614 let mock_metrics = concat!(
1616 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1617 "# TYPE substrate_block_verification_time histogram\n",
1618 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1619 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1620 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1621 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1622 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1623 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1624 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1625 );
1626
1627 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1629 let addr = listener.local_addr()?;
1630 let metrics = Arc::new(mock_metrics.to_string());
1631
1632 tokio::spawn({
1633 let metrics = metrics.clone();
1634 async move {
1635 loop {
1636 if let Ok((mut socket, _)) = listener.accept().await {
1637 let metrics = metrics.clone();
1638 tokio::spawn(async move {
1639 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1640 let mut buffer = [0; 1024];
1641 let _ = socket.read(&mut buffer).await;
1642
1643 let response = format!(
1644 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1645 metrics.len(),
1646 metrics
1647 );
1648 let _ = socket.write_all(response.as_bytes()).await;
1649 });
1650 }
1651 }
1652 }
1653 });
1654
1655 let mock_provider = Arc::new(MockNode::new());
1657 let mock_node = NetworkNode::new(
1658 "test_node",
1659 "ws://localhost:9944",
1660 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1661 "/ip4/127.0.0.1/tcp/30333",
1662 NodeSpec::default(),
1663 mock_provider,
1664 );
1665
1666 let mut label_filters = HashMap::new();
1668 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1669 let buckets = mock_node
1670 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1671 .await?;
1672
1673 assert_eq!(buckets.get("0.1"), Some(&10));
1675 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1682 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1683
1684 let buckets_filtered = mock_node
1685 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1686 .await?;
1687
1688 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1689 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1690
1691 let buckets_with_suffix = mock_node
1693 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1694 .await?;
1695
1696 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1697
1698 Ok(())
1699 }
1700
1701 #[tokio::test]
1702 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1703 use std::sync::Arc;
1705
1706 let mock_metrics = concat!(
1707 "# HELP test_metric A test metric\n",
1708 "# TYPE test_metric histogram\n",
1709 "test_metric_bucket{le=\"2.5\"} 40\n",
1710 "test_metric_bucket{le=\"0.1\"} 10\n",
1711 "test_metric_bucket{le=\"+Inf\"} 42\n",
1712 "test_metric_bucket{le=\"1.0\"} 35\n",
1713 "test_metric_bucket{le=\"0.5\"} 25\n",
1714 );
1715
1716 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1717 let addr = listener.local_addr()?;
1718 let metrics = Arc::new(mock_metrics.to_string());
1719
1720 tokio::spawn({
1721 let metrics = metrics.clone();
1722 async move {
1723 loop {
1724 if let Ok((mut socket, _)) = listener.accept().await {
1725 let metrics = metrics.clone();
1726 tokio::spawn(async move {
1727 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1728 let mut buffer = [0; 1024];
1729 let _ = socket.read(&mut buffer).await;
1730 let response = format!(
1731 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1732 metrics.len(),
1733 metrics
1734 );
1735 let _ = socket.write_all(response.as_bytes()).await;
1736 });
1737 }
1738 }
1739 }
1740 });
1741
1742 let mock_provider = Arc::new(MockNode::new());
1743 let mock_node = NetworkNode::new(
1744 "test_node",
1745 "ws://localhost:9944",
1746 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1747 "/ip4/127.0.0.1/tcp/30333",
1748 NodeSpec::default(),
1749 mock_provider,
1750 );
1751
1752 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1753
1754 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1762 }
1763
1764 #[tokio::test]
1765 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1766 use std::sync::Arc;
1768
1769 let mock_metrics = concat!(
1770 "# HELP test_metric A test metric\n",
1771 "# TYPE test_metric histogram\n",
1772 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1773 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1774 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1775 );
1776
1777 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1778 let addr = listener.local_addr()?;
1779 let metrics = Arc::new(mock_metrics.to_string());
1780
1781 tokio::spawn({
1782 let metrics = metrics.clone();
1783 async move {
1784 loop {
1785 if let Ok((mut socket, _)) = listener.accept().await {
1786 let metrics = metrics.clone();
1787 tokio::spawn(async move {
1788 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1789 let mut buffer = [0; 1024];
1790 let _ = socket.read(&mut buffer).await;
1791 let response = format!(
1792 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1793 metrics.len(),
1794 metrics
1795 );
1796 let _ = socket.write_all(response.as_bytes()).await;
1797 });
1798 }
1799 }
1800 }
1801 });
1802
1803 let mock_provider = Arc::new(MockNode::new());
1804 let mock_node = NetworkNode::new(
1805 "test_node",
1806 "ws://localhost:9944",
1807 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1808 "/ip4/127.0.0.1/tcp/30333",
1809 NodeSpec::default(),
1810 mock_provider,
1811 );
1812
1813 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1815 assert_eq!(buckets.get("0.1"), Some(&5));
1816 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1821 label_filters.insert("method".to_string(), "GET,POST".to_string());
1822
1823 let buckets_filtered = mock_node
1824 .get_histogram_buckets("test_metric", Some(label_filters))
1825 .await?;
1826
1827 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1828 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1829
1830 Ok(())
1831 }
1832
1833 #[test]
1834 fn test_compare_le_values() {
1835 use std::cmp::Ordering;
1836
1837 use crate::network::node::NetworkNode;
1838
1839 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1841 assert_eq!(
1842 NetworkNode::compare_le_values("1.0", "0.5"),
1843 Ordering::Greater
1844 );
1845 assert_eq!(
1846 NetworkNode::compare_le_values("1.0", "1.0"),
1847 Ordering::Equal
1848 );
1849
1850 assert_eq!(
1852 NetworkNode::compare_le_values("+Inf", "999"),
1853 Ordering::Greater
1854 );
1855 assert_eq!(
1856 NetworkNode::compare_le_values("0.1", "+Inf"),
1857 Ordering::Less
1858 );
1859 assert_eq!(
1860 NetworkNode::compare_le_values("+Inf", "+Inf"),
1861 Ordering::Equal
1862 );
1863
1864 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1866 assert_eq!(
1867 NetworkNode::compare_le_values("1000", "999"),
1868 Ordering::Greater
1869 );
1870 }
1871}