zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15    types::{ExecutionResult, RunScriptOptions},
16    DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31    #[error("metric '{0}' not found!")]
32    MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37    #[serde(serialize_with = "serialize_provider_node")]
38    pub(crate) inner: DynNode,
39    // TODO: do we need the full spec here?
40    // Maybe a reduce set of values.
41    pub(crate) spec: NodeSpec,
42    pub(crate) name: String,
43    pub(crate) ws_uri: String,
44    pub(crate) multiaddr: String,
45    pub(crate) prometheus_uri: String,
46    #[serde(skip)]
47    metrics_cache: Arc<RwLock<MetricMap>>,
48    #[serde(skip)]
49    is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54    pub(crate) name: String,
55    pub(crate) ws_uri: String,
56    pub(crate) prometheus_uri: String,
57    pub(crate) multiaddr: String,
58    pub(crate) spec: NodeSpec,
59    #[serde(default)]
60    pub(crate) inner: serde_json::Value,
61}
62
63/// Result of waiting for a certain number of log lines to appear.
64///
65/// Indicates whether the log line count condition was met within the timeout period.
66///
67/// # Variants
68/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
69///     * `count`: The number of matching log lines at the time of satisfaction.
70/// - `TargetFailed(count)` – The condition was not met within the timeout.
71///     * `count`: The final number of matching log lines at timeout expiration.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum LogLineCount {
74    TargetReached(u32),
75    TargetFailed(u32),
76}
77
78impl LogLineCount {
79    pub fn success(&self) -> bool {
80        match self {
81            Self::TargetReached(..) => true,
82            Self::TargetFailed(..) => false,
83        }
84    }
85}
86
87/// Configuration for controlling log line count waiting behavior.
88///
89/// Allows specifying a custom predicate on the number of matching log lines,
90/// a timeout in seconds, and whether the system should wait the entire timeout duration.
91///
92/// # Fields
93/// - `predicate`: A function that takes the current number of matching lines and
94///   returns `true` if the condition is satisfied.
95/// - `timeout_secs`: Maximum number of seconds to wait.
96/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
97///   for the full timeout duration, even if the condition is already met early.
98///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
99#[derive(Clone)]
100pub struct LogLineCountOptions {
101    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
102    pub timeout: Duration,
103    pub wait_until_timeout_elapses: bool,
104}
105
106impl LogLineCountOptions {
107    pub fn new(
108        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
109        timeout: Duration,
110        wait_until_timeout_elapses: bool,
111    ) -> Self {
112        Self {
113            predicate: Arc::new(predicate),
114            timeout,
115            wait_until_timeout_elapses,
116        }
117    }
118
119    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
120        Self::new(|n| n == 0, timeout, true)
121    }
122
123    pub fn at_least_once(timeout: Duration) -> Self {
124        Self::new(|count| count >= 1, timeout, false)
125    }
126
127    pub fn exactly_once(timeout: Duration) -> Self {
128        Self::new(|count| count == 1, timeout, false)
129    }
130}
131
132// #[derive(Clone, Debug)]
133// pub struct QueryMetricOptions {
134//     use_cache: bool,
135//     treat_not_found_as_zero: bool,
136// }
137
138// impl Default for QueryMetricOptions {
139//     fn default() -> Self {
140//         Self { use_cache: false, treat_not_found_as_zero: true }
141//     }
142// }
143
144impl NetworkNode {
145    /// Create a new NetworkNode
146    pub(crate) fn new<T: Into<String>>(
147        name: T,
148        ws_uri: T,
149        prometheus_uri: T,
150        multiaddr: T,
151        spec: NodeSpec,
152        inner: DynNode,
153    ) -> Self {
154        Self {
155            name: name.into(),
156            ws_uri: ws_uri.into(),
157            prometheus_uri: prometheus_uri.into(),
158            inner,
159            spec,
160            multiaddr: multiaddr.into(),
161            metrics_cache: Arc::new(Default::default()),
162            is_running: Arc::new(AtomicBool::new(false)),
163        }
164    }
165
166    /// Check if the node is currently running (not paused).
167    ///
168    /// This returns the internal running state.
169    pub fn is_running(&self) -> bool {
170        self.is_running.load(Ordering::Acquire)
171    }
172
173    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
174    ///
175    /// This performs an actual connection attempt with a short timeout (2 seconds).
176    /// Returns `true` if the node is reachable and responding, `false` otherwise.
177    ///
178    /// This is more robust than `is_running()` as it verifies the node is actually alive.
179    pub async fn is_responsive(&self) -> bool {
180        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
181            .await
182            .is_ok()
183    }
184
185    pub(crate) fn set_is_running(&self, is_running: bool) {
186        self.is_running.store(is_running, Ordering::Release);
187    }
188
189    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
190        self.multiaddr = multiaddr.into();
191    }
192
193    pub fn name(&self) -> &str {
194        &self.name
195    }
196
197    pub fn args(&self) -> Vec<&str> {
198        self.inner.args()
199    }
200
201    pub fn spec(&self) -> &NodeSpec {
202        &self.spec
203    }
204
205    pub fn ws_uri(&self) -> &str {
206        &self.ws_uri
207    }
208
209    pub fn multiaddr(&self) -> &str {
210        self.multiaddr.as_ref()
211    }
212
213    // Subxt
214
215    /// Get the rpc client for the node
216    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
217        get_client_from_url(&self.ws_uri).await
218    }
219
220    /// Get the [online client](subxt::client::OnlineClient) for the node
221    #[deprecated = "Use `wait_client` instead."]
222    pub async fn client<Config: subxt::Config>(
223        &self,
224    ) -> Result<OnlineClient<Config>, subxt::Error> {
225        self.try_client().await
226    }
227
228    /// Try to connect to the node.
229    ///
230    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
231    /// the node to appear before it connects to it. This function directly tries
232    /// to connect to the node and returns an error if the node is not yet available
233    /// at that point in time.
234    ///
235    /// Returns a [`OnlineClient`] on success.
236    pub async fn try_client<Config: subxt::Config>(
237        &self,
238    ) -> Result<OnlineClient<Config>, subxt::Error> {
239        get_client_from_url(&self.ws_uri).await
240    }
241
242    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
243    pub async fn wait_client<Config: subxt::Config>(
244        &self,
245    ) -> Result<OnlineClient<Config>, anyhow::Error> {
246        debug!("wait_client ws_uri: {}", self.ws_uri());
247        wait_ws_ready(self.ws_uri())
248            .await
249            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
250
251        self.try_client()
252            .await
253            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
254    }
255
256    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
257    pub async fn wait_client_with_timeout<Config: subxt::Config>(
258        &self,
259        timeout_secs: impl Into<u64>,
260    ) -> Result<OnlineClient<Config>, anyhow::Error> {
261        debug!("waiting until subxt client is ready");
262        tokio::time::timeout(
263            Duration::from_secs(timeout_secs.into()),
264            self.wait_client::<Config>(),
265        )
266        .await?
267    }
268
269    // Commands
270
271    /// Pause the node, this is implemented by pausing the
272    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
273    ///
274    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
275    /// with global setting `teardown_on_failure` disabled.
276    pub async fn pause(&self) -> Result<(), anyhow::Error> {
277        self.set_is_running(false);
278        self.inner.pause().await?;
279        Ok(())
280    }
281
282    /// Resume the node, this is implemented by resuming the
283    /// actual process (e.g polkadot) with sending `SIGCONT` signal
284    ///
285    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
286    /// with global setting `teardown_on_failure` disabled.
287    pub async fn resume(&self) -> Result<(), anyhow::Error> {
288        self.set_is_running(true);
289        self.inner.resume().await?;
290        Ok(())
291    }
292
293    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
294    ///
295    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
296    /// with global setting `teardown_on_failure` disabled.
297    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
298        self.set_is_running(false);
299        self.inner.restart(after).await?;
300        self.set_is_running(true);
301        Ok(())
302    }
303
304    /// Run a script inside the node's container/environment
305    ///
306    /// The script will be uploaded to the node, made executable, and executed with
307    /// the provided arguments and environment variables.
308    ///
309    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
310    pub async fn run_script(
311        &self,
312        options: RunScriptOptions,
313    ) -> Result<ExecutionResult, anyhow::Error> {
314        self.inner
315            .run_script(options)
316            .await
317            .map_err(|e| anyhow!("Failed to run script: {e}"))
318    }
319
320    // Metrics assertions
321
322    /// Get metric value 'by name' from Prometheus (exposed by the node)
323    /// metric name can be:
324    /// with prefix (e.g: 'polkadot_')
325    /// with chain attribute (e.g: 'chain=rococo-local')
326    /// without prefix and/or without chain attribute
327    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
328        let metric_name = metric_name.into();
329        // force cache reload
330        self.fetch_metrics().await?;
331        // by default we treat not found as 0 (same in v1)
332        self.metric(&metric_name, true).await
333    }
334
335    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
336    /// metric name can be:
337    /// with prefix (e.g: 'polkadot_')
338    /// with chain attribute (e.g: 'chain=rococo-local')
339    /// without prefix and/or without chain attribute
340    ///
341    /// We first try to assert on the value using the cached metrics and
342    /// if not meet the criteria we reload the cache and check again
343    pub async fn assert(
344        &self,
345        metric_name: impl Into<String>,
346        value: impl Into<f64>,
347    ) -> Result<bool, anyhow::Error> {
348        let value: f64 = value.into();
349        self.assert_with(metric_name, |v| v == value).await
350    }
351
352    /// Assert on a metric value using a given predicate.
353    /// See [`NetworkNode::reports`] description for details on metric name.
354    pub async fn assert_with(
355        &self,
356        metric_name: impl Into<String>,
357        predicate: impl Fn(f64) -> bool,
358    ) -> Result<bool, anyhow::Error> {
359        let metric_name = metric_name.into();
360        // reload metrics
361        self.fetch_metrics().await?;
362        let val = self.metric(&metric_name, true).await?;
363        trace!("🔎 Current value {val} passed to the predicated?");
364        Ok(predicate(val))
365    }
366
367    // Wait methods for metrics
368
369    /// Wait until a metric value pass the `predicate`
370    pub async fn wait_metric(
371        &self,
372        metric_name: impl Into<String>,
373        predicate: impl Fn(f64) -> bool,
374    ) -> Result<(), anyhow::Error> {
375        let metric_name = metric_name.into();
376        debug!("waiting until metric {metric_name} pass the predicate");
377        loop {
378            let res = self.assert_with(&metric_name, &predicate).await;
379            match res {
380                Ok(res) => {
381                    if res {
382                        return Ok(());
383                    }
384                },
385                Err(e) => match e.downcast::<reqwest::Error>() {
386                    Ok(io_err) => {
387                        if !skip_err_while_waiting(&io_err) {
388                            return Err(io_err.into());
389                        }
390                    },
391                    Err(other) => {
392                        match other.downcast::<NetworkNodeError>() {
393                            Ok(node_err) => {
394                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
395                                    return Err(node_err.into());
396                                }
397                            },
398                            Err(other) => return Err(other),
399                        };
400                    },
401                },
402            }
403
404            // sleep to not spam prometheus
405            tokio::time::sleep(Duration::from_secs(1)).await;
406        }
407    }
408
409    /// Wait until a metric value pass the `predicate`
410    /// with a timeout (secs)
411    pub async fn wait_metric_with_timeout(
412        &self,
413        metric_name: impl Into<String>,
414        predicate: impl Fn(f64) -> bool,
415        timeout_secs: impl Into<u64>,
416    ) -> Result<(), anyhow::Error> {
417        let metric_name = metric_name.into();
418        let secs = timeout_secs.into();
419        debug!("waiting until metric {metric_name} pass the predicate");
420        let res = tokio::time::timeout(
421            Duration::from_secs(secs),
422            self.wait_metric(&metric_name, predicate),
423        )
424        .await;
425
426        if let Ok(inner_res) = res {
427            match inner_res {
428                Ok(_) => Ok(()),
429                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
430            }
431        } else {
432            // timeout
433            Err(anyhow!(
434                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
435            ))
436        }
437    }
438
439    // Logs
440
441    /// Get the logs of the node
442    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
443    pub async fn logs(&self) -> Result<String, anyhow::Error> {
444        Ok(self.inner.logs().await?)
445    }
446
447    /// Wait until a the number of matching log lines is reach
448    pub async fn wait_log_line_count(
449        &self,
450        pattern: impl Into<String>,
451        is_glob: bool,
452        count: usize,
453    ) -> Result<(), anyhow::Error> {
454        let pattern = pattern.into();
455        let pattern_clone = pattern.clone();
456        debug!("waiting until we find pattern {pattern} {count} times");
457        let match_fn: BoxedClosure = if is_glob {
458            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
459        } else {
460            let re = Regex::new(&pattern)?;
461            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
462        };
463
464        loop {
465            let mut q = 0_usize;
466            let logs = self.logs().await?;
467            for line in logs.lines() {
468                trace!("line is {line}");
469                if match_fn(line)? {
470                    trace!("pattern {pattern_clone} match in line {line}");
471                    q += 1;
472                    if q >= count {
473                        return Ok(());
474                    }
475                }
476            }
477
478            tokio::time::sleep(Duration::from_secs(2)).await;
479        }
480    }
481
482    /// Waits until the number of matching log lines satisfies a custom condition,
483    /// optionally waiting for the entire duration of the timeout.
484    ///
485    /// This method searches log lines for a given substring or glob pattern,
486    /// and evaluates the number of matching lines using a user-provided predicate function.
487    /// Optionally, it can wait for the full timeout duration to ensure the condition
488    /// holds consistently (e.g., for verifying absence of logs).
489    ///
490    /// # Arguments
491    /// * `substring` - The substring or pattern to match within log lines.
492    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
493    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
494    ///
495    /// # Returns
496    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
497    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
498    /// * `Err(e)` if an error occurred during log retrieval or matching.
499    ///
500    /// # Example
501    /// ```rust
502    /// # use std::{sync::Arc, time::Duration};
503    /// # use provider::NativeProvider;
504    /// # use support::{fs::local::LocalFileSystem};
505    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
506    /// # use configuration::NetworkConfig;
507    /// # async fn example() -> Result<(), anyhow::Error> {
508    /// #   let provider = NativeProvider::new(LocalFileSystem {});
509    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
510    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
511    /// #   let network = orchestrator.spawn(config).await?;
512    /// let node = network.get_node("alice")?;
513    /// // Wait (up to 10 seconds) until pattern occurs once
514    /// let options = LogLineCountOptions {
515    ///     predicate: Arc::new(|count| count == 1),
516    ///     timeout: Duration::from_secs(10),
517    ///     wait_until_timeout_elapses: false,
518    /// };
519    /// let result = node
520    ///     .wait_log_line_count_with_timeout("error", false, options)
521    ///     .await?;
522    /// #   Ok(())
523    /// # }
524    /// ```
525    pub async fn wait_log_line_count_with_timeout(
526        &self,
527        substring: impl Into<String>,
528        is_glob: bool,
529        options: LogLineCountOptions,
530    ) -> Result<LogLineCount, anyhow::Error> {
531        let substring = substring.into();
532        debug!(
533            "waiting until match lines count within {} seconds",
534            options.timeout.as_secs_f64()
535        );
536
537        let start = tokio::time::Instant::now();
538
539        let match_fn: BoxedClosure = if is_glob {
540            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
541        } else {
542            let re = Regex::new(&substring)?;
543            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
544        };
545
546        if options.wait_until_timeout_elapses {
547            tokio::time::sleep(options.timeout).await;
548        }
549
550        let mut q;
551        loop {
552            q = 0_u32;
553            let logs = self.logs().await?;
554            for line in logs.lines() {
555                if match_fn(line)? {
556                    q += 1;
557
558                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
559                    // end after the whole log file is processed. This is to address the cases when the
560                    // predicate becomes true and false again.
561                    // eg. expected exactly 2 matching lines are expected but 3 are present
562                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
563                        return Ok(LogLineCount::TargetReached(q));
564                    }
565                }
566            }
567
568            if start.elapsed() >= options.timeout {
569                break;
570            }
571
572            tokio::time::sleep(Duration::from_secs(2)).await;
573        }
574
575        if (options.predicate)(q) {
576            Ok(LogLineCount::TargetReached(q))
577        } else {
578            Ok(LogLineCount::TargetFailed(q))
579        }
580    }
581
582    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
583        let response = reqwest::get(&self.prometheus_uri).await?;
584        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
585        let mut cache = self.metrics_cache.write().await;
586        *cache = metrics;
587        Ok(())
588    }
589
590    /// Query individual metric by name
591    async fn metric(
592        &self,
593        metric_name: &str,
594        treat_not_found_as_zero: bool,
595    ) -> Result<f64, anyhow::Error> {
596        let mut metrics_map = self.metrics_cache.read().await;
597        if metrics_map.is_empty() {
598            // reload metrics
599            drop(metrics_map);
600            self.fetch_metrics().await?;
601            metrics_map = self.metrics_cache.read().await;
602        }
603
604        if let Some(val) = metrics_map.get(metric_name) {
605            Ok(*val)
606        } else if treat_not_found_as_zero {
607            Ok(0_f64)
608        } else {
609            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
610        }
611    }
612
613    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
614    ///
615    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
616    /// and calculating the count of observations in each bucket. It automatically appends
617    /// `_bucket` suffix to the metric name if not already present.
618    ///
619    /// # Arguments
620    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
621    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
622    ///
623    /// # Returns
624    /// A HashMap where keys are the `le` bucket boundaries as strings,
625    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
626    ///
627    /// # Example
628    /// ```ignore
629    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
630    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
631    /// ```
632    pub async fn get_histogram_buckets(
633        &self,
634        metric_name: impl AsRef<str>,
635        label_filters: Option<HashMap<String, String>>,
636    ) -> Result<HashMap<String, u64>, anyhow::Error> {
637        let metric_name = metric_name.as_ref();
638
639        // Fetch and parse metrics using the existing parser
640        let response = reqwest::get(&self.prometheus_uri).await?;
641        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
642
643        // Ensure metric name has _bucket suffix
644        let resolved_metric_name = if metric_name.contains("_bucket") {
645            metric_name.to_string()
646        } else {
647            format!("{}_bucket", metric_name)
648        };
649
650        // First pass: collect all matching metrics with their label counts
651        // to identify which ones have the most complete label sets
652        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
653        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
654
655        for (key, &value) in metrics.iter() {
656            if !key.starts_with(&resolved_metric_name) {
657                continue;
658            }
659
660            let remaining = &key[resolved_metric_name.len()..];
661
662            let labels_str = &remaining[1..remaining.len() - 1];
663            let parsed_labels = Self::parse_label_string(labels_str);
664
665            // Must have "le" label
666            if !parsed_labels.contains_key("le") {
667                continue;
668            }
669
670            // Check if label filters match
671            if let Some(ref filters) = label_filters {
672                let mut all_match = true;
673                for (filter_key, filter_value) in filters {
674                    if parsed_labels.get(filter_key) != Some(filter_value) {
675                        all_match = false;
676                        break;
677                    }
678                }
679                if !all_match {
680                    continue;
681                }
682            }
683
684            metric_entries.push((key.clone(), parsed_labels, value as u64));
685        }
686
687        // Find the maximum number of labels (excluding "le") across all entries
688        // This helps us identify the "fullest" version of each metric
689        let max_label_count = metric_entries
690            .iter()
691            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
692            .max()
693            .unwrap_or(0);
694
695        // Second pass: collect buckets, deduplicating and preferring entries with more labels
696        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
697        let mut seen_le_values = HashSet::new();
698        let mut active_series: Option<Vec<(String, String)>> = None;
699
700        for (_, parsed_labels, value) in metric_entries {
701            let le_value = parsed_labels.get("le").unwrap().clone();
702
703            // Get non-"le" labels
704            let mut non_le_labels: Vec<(String, String)> = parsed_labels
705                .iter()
706                .filter(|(k, _)| k.as_str() != "le")
707                .map(|(k, v)| (k.clone(), v.clone()))
708                .collect();
709            non_le_labels.sort();
710
711            // Only process entries that have the maximum number of labels
712            // (this filters out the parser's duplicate keys with fewer labels)
713            if non_le_labels.len() < max_label_count {
714                continue;
715            }
716
717            // Detect series changes
718            if let Some(ref prev_series) = active_series {
719                if prev_series != &non_le_labels {
720                    if !raw_buckets.is_empty() {
721                        break; // Stop at first series change
722                    }
723                    active_series = Some(non_le_labels.clone());
724                    seen_le_values.clear();
725                }
726            } else {
727                active_series = Some(non_le_labels.clone());
728            }
729
730            // Deduplicate by le value within this series
731            if !seen_le_values.insert(le_value.clone()) {
732                continue;
733            }
734
735            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
736            raw_buckets.push((le_value, value));
737        }
738
739        // Sort buckets by their "le" values
740        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
741
742        // Calculate deltas between cumulative buckets
743        let mut buckets = HashMap::new();
744        let mut previous_value = 0_u64;
745        for (le, cumulative_count) in raw_buckets {
746            if cumulative_count < previous_value {
747                warn!(
748                    "Warning: bucket count decreased from {} to {} at le={}",
749                    previous_value, cumulative_count, le
750                );
751            }
752            let delta = cumulative_count.saturating_sub(previous_value);
753            buckets.insert(le, delta);
754            previous_value = cumulative_count;
755        }
756
757        Ok(buckets)
758    }
759
760    /// Parse label string from parsed metric key.
761    ///
762    /// Takes a label string in the format `key1="value1",key2="value2"`
763    /// and returns a HashMap of key-value pairs.
764    /// Handles commas inside quoted values correctly.
765    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
766        let mut labels = HashMap::new();
767        let mut current_key = String::new();
768        let mut current_value = String::new();
769        let mut in_value = false;
770        let mut in_quotes = false;
771
772        for ch in labels_str.chars() {
773            match ch {
774                '=' if !in_quotes && !in_value => {
775                    in_value = true;
776                },
777                '"' if in_value => {
778                    in_quotes = !in_quotes;
779                },
780                ',' if !in_quotes => {
781                    // End of key-value pair
782                    if !current_key.is_empty() {
783                        labels.insert(
784                            current_key.trim().to_string(),
785                            current_value.trim().to_string(),
786                        );
787                        current_key.clear();
788                        current_value.clear();
789                        in_value = false;
790                    }
791                },
792                _ => {
793                    if in_value {
794                        current_value.push(ch);
795                    } else {
796                        current_key.push(ch);
797                    }
798                },
799            }
800        }
801
802        // Insert last pair
803        if !current_key.is_empty() {
804            labels.insert(
805                current_key.trim().to_string(),
806                current_value.trim().to_string(),
807            );
808        }
809
810        labels
811    }
812
813    /// Compare two histogram bucket boundary values for sorting.
814    ///
815    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
816    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
817        use std::cmp::Ordering;
818
819        // Handle +Inf specially
820        match (a, b) {
821            ("+Inf", "+Inf") => Ordering::Equal,
822            ("+Inf", _) => Ordering::Greater,
823            (_, "+Inf") => Ordering::Less,
824            _ => {
825                // Try to parse as f64 for numeric comparison
826                match (a.parse::<f64>(), b.parse::<f64>()) {
827                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
828                    // Fallback to string comparison if parsing fails
829                    _ => a.cmp(b),
830                }
831            },
832        }
833    }
834
835    /// Waits given number of seconds until node reports that it is up and running, which
836    /// is determined by metric 'process_start_time_seconds', which should appear,
837    /// when node finished booting up.
838    ///
839    ///
840    /// # Arguments
841    /// * `timeout_secs` - The number of seconds to wait.
842    ///
843    /// # Returns
844    /// * `Ok()` if the node is up before timeout occured.
845    /// * `Err(e)` if timeout or other error occurred while waiting.
846    pub async fn wait_until_is_up(
847        &self,
848        timeout_secs: impl Into<u64>,
849    ) -> Result<(), anyhow::Error> {
850        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
851            .await
852            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
853    }
854}
855
856impl std::fmt::Debug for NetworkNode {
857    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
858        f.debug_struct("NetworkNode")
859            .field("inner", &"inner_skipped")
860            .field("spec", &self.spec)
861            .field("name", &self.name)
862            .field("ws_uri", &self.ws_uri)
863            .field("prometheus_uri", &self.prometheus_uri)
864            .finish()
865    }
866}
867
868fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
869where
870    S: Serializer,
871{
872    erased_serde::serialize(node.as_ref(), serializer)
873}
874
875// TODO: mock and impl more unit tests
876#[cfg(test)]
877mod tests {
878    use std::{
879        path::{Path, PathBuf},
880        sync::{Arc, Mutex},
881    };
882
883    use async_trait::async_trait;
884    use provider::{types::*, ProviderError, ProviderNode};
885
886    use super::*;
887
888    #[derive(Serialize)]
889    struct MockNode {
890        logs: Arc<Mutex<Vec<String>>>,
891    }
892
893    impl MockNode {
894        fn new() -> Self {
895            Self {
896                logs: Arc::new(Mutex::new(vec![])),
897            }
898        }
899
900        fn logs_push(&self, lines: Vec<impl Into<String>>) {
901            self.logs
902                .lock()
903                .unwrap()
904                .extend(lines.into_iter().map(|l| l.into()));
905        }
906    }
907
908    #[async_trait]
909    impl ProviderNode for MockNode {
910        fn name(&self) -> &str {
911            todo!()
912        }
913
914        fn args(&self) -> Vec<&str> {
915            todo!()
916        }
917
918        fn base_dir(&self) -> &PathBuf {
919            todo!()
920        }
921
922        fn config_dir(&self) -> &PathBuf {
923            todo!()
924        }
925
926        fn data_dir(&self) -> &PathBuf {
927            todo!()
928        }
929
930        fn relay_data_dir(&self) -> &PathBuf {
931            todo!()
932        }
933
934        fn scripts_dir(&self) -> &PathBuf {
935            todo!()
936        }
937
938        fn log_path(&self) -> &PathBuf {
939            todo!()
940        }
941
942        fn log_cmd(&self) -> String {
943            todo!()
944        }
945
946        fn path_in_node(&self, _file: &Path) -> PathBuf {
947            todo!()
948        }
949
950        async fn logs(&self) -> Result<String, ProviderError> {
951            Ok(self.logs.lock().unwrap().join("\n"))
952        }
953
954        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
955            todo!()
956        }
957
958        async fn run_command(
959            &self,
960            _options: RunCommandOptions,
961        ) -> Result<ExecutionResult, ProviderError> {
962            todo!()
963        }
964
965        async fn run_script(
966            &self,
967            _options: RunScriptOptions,
968        ) -> Result<ExecutionResult, ProviderError> {
969            todo!()
970        }
971
972        async fn send_file(
973            &self,
974            _local_file_path: &Path,
975            _remote_file_path: &Path,
976            _mode: &str,
977        ) -> Result<(), ProviderError> {
978            todo!()
979        }
980
981        async fn receive_file(
982            &self,
983            _remote_file_path: &Path,
984            _local_file_path: &Path,
985        ) -> Result<(), ProviderError> {
986            todo!()
987        }
988
989        async fn pause(&self) -> Result<(), ProviderError> {
990            todo!()
991        }
992
993        async fn resume(&self) -> Result<(), ProviderError> {
994            todo!()
995        }
996
997        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
998            todo!()
999        }
1000
1001        async fn destroy(&self) -> Result<(), ProviderError> {
1002            todo!()
1003        }
1004    }
1005
1006    #[tokio::test(flavor = "multi_thread")]
1007    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1008        let mock_provider = Arc::new(MockNode::new());
1009        let mock_node = NetworkNode::new(
1010            "node1",
1011            "ws_uri",
1012            "prometheus_uri",
1013            "multiaddr",
1014            NodeSpec::default(),
1015            mock_provider.clone(),
1016        );
1017
1018        mock_provider.logs_push(vec![
1019            "system booting",
1020            "stub line 1",
1021            "stub line 2",
1022            "system ready",
1023        ]);
1024
1025        // Wait (up to 10 seconds) until pattern occurs once
1026        let options = LogLineCountOptions {
1027            predicate: Arc::new(|n| n == 1),
1028            timeout: Duration::from_secs(10),
1029            wait_until_timeout_elapses: false,
1030        };
1031
1032        let log_line_count = mock_node
1033            .wait_log_line_count_with_timeout("system ready", false, options)
1034            .await?;
1035
1036        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1037
1038        Ok(())
1039    }
1040
1041    #[tokio::test(flavor = "multi_thread")]
1042    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1043        let mock_provider = Arc::new(MockNode::new());
1044        let mock_node = NetworkNode::new(
1045            "node1",
1046            "ws_uri",
1047            "prometheus_uri",
1048            "multiaddr",
1049            NodeSpec::default(),
1050            mock_provider.clone(),
1051        );
1052
1053        mock_provider.logs_push(vec![
1054            "system booting",
1055            "stub line 1",
1056            "stub line 2",
1057            "system ready",
1058        ]);
1059
1060        // Wait (up to 4 seconds) until pattern occurs twice
1061        let options = LogLineCountOptions {
1062            predicate: Arc::new(|n| n == 2),
1063            timeout: Duration::from_secs(4),
1064            wait_until_timeout_elapses: false,
1065        };
1066
1067        let task = tokio::spawn({
1068            async move {
1069                mock_node
1070                    .wait_log_line_count_with_timeout("system ready", false, options)
1071                    .await
1072                    .unwrap()
1073            }
1074        });
1075
1076        tokio::time::sleep(Duration::from_secs(2)).await;
1077
1078        mock_provider.logs_push(vec!["system ready"]);
1079
1080        let log_line_count = task.await?;
1081
1082        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1083
1084        Ok(())
1085    }
1086
1087    #[tokio::test(flavor = "multi_thread")]
1088    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1089        let mock_provider = Arc::new(MockNode::new());
1090        let mock_node = NetworkNode::new(
1091            "node1",
1092            "ws_uri",
1093            "prometheus_uri",
1094            "multiaddr",
1095            NodeSpec::default(),
1096            mock_provider.clone(),
1097        );
1098
1099        mock_provider.logs_push(vec![
1100            "system booting",
1101            "stub line 1",
1102            "stub line 2",
1103            "system ready",
1104        ]);
1105
1106        // Wait (up to 2 seconds) until pattern occurs twice
1107        let options = LogLineCountOptions {
1108            predicate: Arc::new(|n| n == 2),
1109            timeout: Duration::from_secs(2),
1110            wait_until_timeout_elapses: false,
1111        };
1112
1113        let log_line_count = mock_node
1114            .wait_log_line_count_with_timeout("system ready", false, options)
1115            .await?;
1116
1117        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1118
1119        Ok(())
1120    }
1121
1122    #[tokio::test(flavor = "multi_thread")]
1123    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1124        let mock_provider = Arc::new(MockNode::new());
1125        let mock_node = NetworkNode::new(
1126            "node1",
1127            "ws_uri",
1128            "prometheus_uri",
1129            "multiaddr",
1130            NodeSpec::default(),
1131            mock_provider.clone(),
1132        );
1133
1134        mock_provider.logs_push(vec![
1135            "system booting",
1136            "stub line 1",
1137            "stub line 2",
1138            "system ready",
1139        ]);
1140
1141        // Wait until timeout and check if pattern occurs exactly twice
1142        let options = LogLineCountOptions {
1143            predicate: Arc::new(|n| n == 2),
1144            timeout: Duration::from_secs(2),
1145            wait_until_timeout_elapses: true,
1146        };
1147
1148        let task = tokio::spawn({
1149            async move {
1150                mock_node
1151                    .wait_log_line_count_with_timeout("system ready", false, options)
1152                    .await
1153                    .unwrap()
1154            }
1155        });
1156
1157        tokio::time::sleep(Duration::from_secs(1)).await;
1158
1159        mock_provider.logs_push(vec!["system ready"]);
1160        mock_provider.logs_push(vec!["system ready"]);
1161
1162        let log_line_count = task.await?;
1163
1164        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1165
1166        Ok(())
1167    }
1168
1169    #[tokio::test(flavor = "multi_thread")]
1170    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1171        let mock_provider = Arc::new(MockNode::new());
1172        let mock_node = NetworkNode::new(
1173            "node1",
1174            "ws_uri",
1175            "prometheus_uri",
1176            "multiaddr",
1177            NodeSpec::default(),
1178            mock_provider.clone(),
1179        );
1180
1181        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1182
1183        let task = tokio::spawn({
1184            async move {
1185                mock_node
1186                    .wait_log_line_count_with_timeout(
1187                        "system ready",
1188                        false,
1189                        // Wait until timeout and make sure pattern occurred zero times
1190                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1191                    )
1192                    .await
1193                    .unwrap()
1194            }
1195        });
1196
1197        tokio::time::sleep(Duration::from_secs(1)).await;
1198
1199        mock_provider.logs_push(vec!["stub line 3"]);
1200
1201        assert!(task.await?.success());
1202
1203        Ok(())
1204    }
1205
1206    #[tokio::test(flavor = "multi_thread")]
1207    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1208        let mock_provider = Arc::new(MockNode::new());
1209        let mock_node = NetworkNode::new(
1210            "node1",
1211            "ws_uri",
1212            "prometheus_uri",
1213            "multiaddr",
1214            NodeSpec::default(),
1215            mock_provider.clone(),
1216        );
1217
1218        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1219
1220        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1221        let options = LogLineCountOptions {
1222            predicate: Arc::new(|n| (2..=5).contains(&n)),
1223            timeout: Duration::from_secs(2),
1224            wait_until_timeout_elapses: true,
1225        };
1226
1227        let task = tokio::spawn({
1228            async move {
1229                mock_node
1230                    .wait_log_line_count_with_timeout("system ready", false, options)
1231                    .await
1232                    .unwrap()
1233            }
1234        });
1235
1236        tokio::time::sleep(Duration::from_secs(1)).await;
1237
1238        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1239
1240        assert!(task.await?.success());
1241
1242        Ok(())
1243    }
1244
1245    #[tokio::test(flavor = "multi_thread")]
1246    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1247        let mock_provider = Arc::new(MockNode::new());
1248        let mock_node = NetworkNode::new(
1249            "node1",
1250            "ws_uri",
1251            "prometheus_uri",
1252            "multiaddr",
1253            NodeSpec::default(),
1254            mock_provider.clone(),
1255        );
1256
1257        mock_provider.logs_push(vec![
1258            "system booting",
1259            "stub line 1",
1260            // this line should not match
1261            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1262            "stub line 2"
1263        ]);
1264
1265        let options = LogLineCountOptions {
1266            predicate: Arc::new(|n| n == 1),
1267            timeout: Duration::from_secs(3),
1268            wait_until_timeout_elapses: true,
1269        };
1270
1271        let task = tokio::spawn({
1272            async move {
1273                mock_node
1274                    .wait_log_line_count_with_timeout(
1275                        "error(?! importing block .*: block has an unknown parent)",
1276                        false,
1277                        options,
1278                    )
1279                    .await
1280                    .unwrap()
1281            }
1282        });
1283
1284        tokio::time::sleep(Duration::from_secs(1)).await;
1285
1286        mock_provider.logs_push(vec![
1287            "system ready",
1288            // this line should match
1289            "system error",
1290            "system ready",
1291        ]);
1292
1293        assert!(task.await?.success());
1294
1295        Ok(())
1296    }
1297
1298    #[tokio::test(flavor = "multi_thread")]
1299    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1300    ) -> Result<(), anyhow::Error> {
1301        let mock_provider = Arc::new(MockNode::new());
1302        let mock_node = NetworkNode::new(
1303            "node1",
1304            "ws_uri",
1305            "prometheus_uri",
1306            "multiaddr",
1307            NodeSpec::default(),
1308            mock_provider.clone(),
1309        );
1310
1311        mock_provider.logs_push(vec![
1312            "system booting",
1313            "stub line 1",
1314            // this line should not match
1315            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1316            "stub line 2"
1317        ]);
1318
1319        let options = LogLineCountOptions {
1320            predicate: Arc::new(|n| n == 1),
1321            timeout: Duration::from_secs(6),
1322            wait_until_timeout_elapses: true,
1323        };
1324
1325        let task = tokio::spawn({
1326            async move {
1327                mock_node
1328                    .wait_log_line_count_with_timeout(
1329                        "error(?! importing block .*: block has an unknown parent)",
1330                        false,
1331                        options,
1332                    )
1333                    .await
1334                    .unwrap()
1335            }
1336        });
1337
1338        tokio::time::sleep(Duration::from_secs(1)).await;
1339
1340        mock_provider.logs_push(vec!["system ready", "system ready"]);
1341
1342        assert!(!task.await?.success());
1343
1344        Ok(())
1345    }
1346
1347    #[tokio::test(flavor = "multi_thread")]
1348    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1349        let mock_provider = Arc::new(MockNode::new());
1350        let mock_node = NetworkNode::new(
1351            "node1",
1352            "ws_uri",
1353            "prometheus_uri",
1354            "multiaddr",
1355            NodeSpec::default(),
1356            mock_provider.clone(),
1357        );
1358
1359        mock_provider.logs_push(vec![
1360            "system booting",
1361            "stub line 1",
1362            // this line should not match
1363            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1364            "stub line 2"
1365        ]);
1366
1367        let task = tokio::spawn({
1368            async move {
1369                mock_node
1370                    .wait_log_line_count(
1371                        "error(?! importing block .*: block has an unknown parent)",
1372                        false,
1373                        1,
1374                    )
1375                    .await
1376                    .unwrap()
1377            }
1378        });
1379
1380        tokio::time::sleep(Duration::from_secs(1)).await;
1381
1382        mock_provider.logs_push(vec![
1383            "system ready",
1384            // this line should match
1385            "system error",
1386            "system ready",
1387        ]);
1388
1389        assert!(task.await.is_ok());
1390
1391        Ok(())
1392    }
1393
1394    #[tokio::test(flavor = "multi_thread")]
1395    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1396        let mock_provider = Arc::new(MockNode::new());
1397        let mock_node = NetworkNode::new(
1398            "node1",
1399            "ws_uri",
1400            "prometheus_uri",
1401            "multiaddr",
1402            NodeSpec::default(),
1403            mock_provider.clone(),
1404        );
1405
1406        mock_provider.logs_push(vec![
1407            "system booting",
1408            "stub line 1",
1409            // this line should not match
1410            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1411            "stub line 2"
1412        ]);
1413
1414        let options = LogLineCountOptions {
1415            predicate: Arc::new(|count| count == 1),
1416            timeout: Duration::from_secs(2),
1417            wait_until_timeout_elapses: true,
1418        };
1419
1420        let task = tokio::spawn({
1421            async move {
1422                // we expect no match, thus wait with timeout
1423                mock_node
1424                    .wait_log_line_count_with_timeout(
1425                        "error(?! importing block .*: block has an unknown parent)",
1426                        false,
1427                        options,
1428                    )
1429                    .await
1430                    .unwrap()
1431            }
1432        });
1433
1434        tokio::time::sleep(Duration::from_secs(1)).await;
1435
1436        mock_provider.logs_push(vec!["system ready", "system ready"]);
1437
1438        assert!(!task.await?.success());
1439
1440        Ok(())
1441    }
1442
1443    #[tokio::test]
1444    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1445        // This test uses a mock HTTP server to simulate Prometheus metrics
1446        use std::sync::Arc;
1447
1448        // Create a mock metrics response with proper HELP and TYPE comments
1449        let mock_metrics = concat!(
1450            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1451            "# TYPE substrate_block_verification_time histogram\n",
1452            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1453            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1454            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1455            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1456            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1457            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1458            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1459        );
1460
1461        // Start a mock HTTP server
1462        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1463        let addr = listener.local_addr()?;
1464        let metrics = Arc::new(mock_metrics.to_string());
1465
1466        tokio::spawn({
1467            let metrics = metrics.clone();
1468            async move {
1469                loop {
1470                    if let Ok((mut socket, _)) = listener.accept().await {
1471                        let metrics = metrics.clone();
1472                        tokio::spawn(async move {
1473                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1474                            let mut buffer = [0; 1024];
1475                            let _ = socket.read(&mut buffer).await;
1476
1477                            let response = format!(
1478                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1479                                metrics.len(),
1480                                metrics
1481                            );
1482                            let _ = socket.write_all(response.as_bytes()).await;
1483                        });
1484                    }
1485                }
1486            }
1487        });
1488
1489        // Create a NetworkNode with the mock prometheus URI
1490        let mock_provider = Arc::new(MockNode::new());
1491        let mock_node = NetworkNode::new(
1492            "test_node",
1493            "ws://localhost:9944",
1494            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1495            "/ip4/127.0.0.1/tcp/30333",
1496            NodeSpec::default(),
1497            mock_provider,
1498        );
1499
1500        // Get buckets with label filter
1501        let mut label_filters = HashMap::new();
1502        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1503        let buckets = mock_node
1504            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1505            .await?;
1506
1507        // Should get the rococo_local_testnet chain's buckets
1508        assert_eq!(buckets.get("0.1"), Some(&10));
1509        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1510        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1511        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1512        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1513
1514        // Get buckets with label filter for rococo
1515        let mut label_filters = std::collections::HashMap::new();
1516        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1517
1518        let buckets_filtered = mock_node
1519            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1520            .await?;
1521
1522        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1523        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1524
1525        // Test 3: Get buckets with _bucket suffix already present
1526        let buckets_with_suffix = mock_node
1527            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1528            .await?;
1529
1530        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1531
1532        Ok(())
1533    }
1534
1535    #[tokio::test]
1536    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1537        // Test that buckets are correctly sorted even when received out of order
1538        use std::sync::Arc;
1539
1540        let mock_metrics = concat!(
1541            "# HELP test_metric A test metric\n",
1542            "# TYPE test_metric histogram\n",
1543            "test_metric_bucket{le=\"2.5\"} 40\n",
1544            "test_metric_bucket{le=\"0.1\"} 10\n",
1545            "test_metric_bucket{le=\"+Inf\"} 42\n",
1546            "test_metric_bucket{le=\"1.0\"} 35\n",
1547            "test_metric_bucket{le=\"0.5\"} 25\n",
1548        );
1549
1550        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1551        let addr = listener.local_addr()?;
1552        let metrics = Arc::new(mock_metrics.to_string());
1553
1554        tokio::spawn({
1555            let metrics = metrics.clone();
1556            async move {
1557                loop {
1558                    if let Ok((mut socket, _)) = listener.accept().await {
1559                        let metrics = metrics.clone();
1560                        tokio::spawn(async move {
1561                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1562                            let mut buffer = [0; 1024];
1563                            let _ = socket.read(&mut buffer).await;
1564                            let response = format!(
1565                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1566                                metrics.len(),
1567                                metrics
1568                            );
1569                            let _ = socket.write_all(response.as_bytes()).await;
1570                        });
1571                    }
1572                }
1573            }
1574        });
1575
1576        let mock_provider = Arc::new(MockNode::new());
1577        let mock_node = NetworkNode::new(
1578            "test_node",
1579            "ws://localhost:9944",
1580            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1581            "/ip4/127.0.0.1/tcp/30333",
1582            NodeSpec::default(),
1583            mock_provider,
1584        );
1585
1586        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1587
1588        // Verify deltas are calculated correctly after sorting
1589        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1590        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1591        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1592        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1593        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1594
1595        Ok(())
1596    }
1597
1598    #[tokio::test]
1599    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1600        // Test label parsing with commas and special characters in values
1601        use std::sync::Arc;
1602
1603        let mock_metrics = concat!(
1604            "# HELP test_metric A test metric\n",
1605            "# TYPE test_metric histogram\n",
1606            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1607            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1608            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1609        );
1610
1611        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1612        let addr = listener.local_addr()?;
1613        let metrics = Arc::new(mock_metrics.to_string());
1614
1615        tokio::spawn({
1616            let metrics = metrics.clone();
1617            async move {
1618                loop {
1619                    if let Ok((mut socket, _)) = listener.accept().await {
1620                        let metrics = metrics.clone();
1621                        tokio::spawn(async move {
1622                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1623                            let mut buffer = [0; 1024];
1624                            let _ = socket.read(&mut buffer).await;
1625                            let response = format!(
1626                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1627                                metrics.len(),
1628                                metrics
1629                            );
1630                            let _ = socket.write_all(response.as_bytes()).await;
1631                        });
1632                    }
1633                }
1634            }
1635        });
1636
1637        let mock_provider = Arc::new(MockNode::new());
1638        let mock_node = NetworkNode::new(
1639            "test_node",
1640            "ws://localhost:9944",
1641            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1642            "/ip4/127.0.0.1/tcp/30333",
1643            NodeSpec::default(),
1644            mock_provider,
1645        );
1646
1647        // Test without filter
1648        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1649        assert_eq!(buckets.get("0.1"), Some(&5));
1650        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1651        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1652
1653        // Test with filter containing comma in value
1654        let mut label_filters = std::collections::HashMap::new();
1655        label_filters.insert("method".to_string(), "GET,POST".to_string());
1656
1657        let buckets_filtered = mock_node
1658            .get_histogram_buckets("test_metric", Some(label_filters))
1659            .await?;
1660
1661        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1662        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1663
1664        Ok(())
1665    }
1666
1667    #[test]
1668    fn test_compare_le_values() {
1669        use std::cmp::Ordering;
1670
1671        use crate::network::node::NetworkNode;
1672
1673        // Numeric comparison
1674        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1675        assert_eq!(
1676            NetworkNode::compare_le_values("1.0", "0.5"),
1677            Ordering::Greater
1678        );
1679        assert_eq!(
1680            NetworkNode::compare_le_values("1.0", "1.0"),
1681            Ordering::Equal
1682        );
1683
1684        // +Inf handling
1685        assert_eq!(
1686            NetworkNode::compare_le_values("+Inf", "999"),
1687            Ordering::Greater
1688        );
1689        assert_eq!(
1690            NetworkNode::compare_le_values("0.1", "+Inf"),
1691            Ordering::Less
1692        );
1693        assert_eq!(
1694            NetworkNode::compare_le_values("+Inf", "+Inf"),
1695            Ordering::Equal
1696        );
1697
1698        // Large numbers
1699        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1700        assert_eq!(
1701            NetworkNode::compare_le_values("1000", "999"),
1702            Ordering::Greater
1703        );
1704    }
1705}