zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15    types::{ExecutionResult, RunScriptOptions},
16    DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31    #[error("metric '{0}' not found!")]
32    MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37    #[serde(serialize_with = "serialize_provider_node")]
38    pub(crate) inner: DynNode,
39    // TODO: do we need the full spec here?
40    // Maybe a reduce set of values.
41    pub(crate) spec: NodeSpec,
42    pub(crate) name: String,
43    pub(crate) ws_uri: String,
44    pub(crate) multiaddr: String,
45    pub(crate) prometheus_uri: String,
46    #[serde(skip)]
47    metrics_cache: Arc<RwLock<MetricMap>>,
48    #[serde(skip)]
49    is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54    pub(crate) name: String,
55    pub(crate) ws_uri: String,
56    pub(crate) prometheus_uri: String,
57    pub(crate) multiaddr: String,
58    pub(crate) spec: NodeSpec,
59    pub(crate) inner: serde_json::Value,
60}
61
62/// Result of waiting for a certain number of log lines to appear.
63///
64/// Indicates whether the log line count condition was met within the timeout period.
65///
66/// # Variants
67/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
68///     * `count`: The number of matching log lines at the time of satisfaction.
69/// - `TargetFailed(count)` – The condition was not met within the timeout.
70///     * `count`: The final number of matching log lines at timeout expiration.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum LogLineCount {
73    TargetReached(u32),
74    TargetFailed(u32),
75}
76
77impl LogLineCount {
78    pub fn success(&self) -> bool {
79        match self {
80            Self::TargetReached(..) => true,
81            Self::TargetFailed(..) => false,
82        }
83    }
84}
85
86/// Configuration for controlling log line count waiting behavior.
87///
88/// Allows specifying a custom predicate on the number of matching log lines,
89/// a timeout in seconds, and whether the system should wait the entire timeout duration.
90///
91/// # Fields
92/// - `predicate`: A function that takes the current number of matching lines and
93///   returns `true` if the condition is satisfied.
94/// - `timeout_secs`: Maximum number of seconds to wait.
95/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
96///   for the full timeout duration, even if the condition is already met early.
97///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
98#[derive(Clone)]
99pub struct LogLineCountOptions {
100    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
101    pub timeout: Duration,
102    pub wait_until_timeout_elapses: bool,
103}
104
105impl LogLineCountOptions {
106    pub fn new(
107        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
108        timeout: Duration,
109        wait_until_timeout_elapses: bool,
110    ) -> Self {
111        Self {
112            predicate: Arc::new(predicate),
113            timeout,
114            wait_until_timeout_elapses,
115        }
116    }
117
118    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
119        Self::new(|n| n == 0, timeout, true)
120    }
121
122    pub fn at_least_once(timeout: Duration) -> Self {
123        Self::new(|count| count >= 1, timeout, false)
124    }
125
126    pub fn exactly_once(timeout: Duration) -> Self {
127        Self::new(|count| count == 1, timeout, false)
128    }
129}
130
131// #[derive(Clone, Debug)]
132// pub struct QueryMetricOptions {
133//     use_cache: bool,
134//     treat_not_found_as_zero: bool,
135// }
136
137// impl Default for QueryMetricOptions {
138//     fn default() -> Self {
139//         Self { use_cache: false, treat_not_found_as_zero: true }
140//     }
141// }
142
143impl NetworkNode {
144    /// Create a new NetworkNode
145    pub(crate) fn new<T: Into<String>>(
146        name: T,
147        ws_uri: T,
148        prometheus_uri: T,
149        multiaddr: T,
150        spec: NodeSpec,
151        inner: DynNode,
152    ) -> Self {
153        Self {
154            name: name.into(),
155            ws_uri: ws_uri.into(),
156            prometheus_uri: prometheus_uri.into(),
157            inner,
158            spec,
159            multiaddr: multiaddr.into(),
160            metrics_cache: Arc::new(Default::default()),
161            is_running: Arc::new(AtomicBool::new(false)),
162        }
163    }
164
165    pub(crate) fn is_running(&self) -> bool {
166        self.is_running.load(Ordering::Acquire)
167    }
168
169    pub(crate) fn set_is_running(&self, is_running: bool) {
170        self.is_running.store(is_running, Ordering::Release);
171    }
172
173    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
174        self.multiaddr = multiaddr.into();
175    }
176
177    pub fn name(&self) -> &str {
178        &self.name
179    }
180
181    pub fn args(&self) -> Vec<&str> {
182        self.inner.args()
183    }
184
185    pub fn spec(&self) -> &NodeSpec {
186        &self.spec
187    }
188
189    pub fn ws_uri(&self) -> &str {
190        &self.ws_uri
191    }
192
193    pub fn multiaddr(&self) -> &str {
194        self.multiaddr.as_ref()
195    }
196
197    // Subxt
198
199    /// Get the rpc client for the node
200    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
201        get_client_from_url(&self.ws_uri).await
202    }
203
204    /// Get the [online client](subxt::client::OnlineClient) for the node
205    #[deprecated = "Use `wait_client` instead."]
206    pub async fn client<Config: subxt::Config>(
207        &self,
208    ) -> Result<OnlineClient<Config>, subxt::Error> {
209        self.try_client().await
210    }
211
212    /// Try to connect to the node.
213    ///
214    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
215    /// the node to appear before it connects to it. This function directly tries
216    /// to connect to the node and returns an error if the node is not yet available
217    /// at that point in time.
218    ///
219    /// Returns a [`OnlineClient`] on success.
220    pub async fn try_client<Config: subxt::Config>(
221        &self,
222    ) -> Result<OnlineClient<Config>, subxt::Error> {
223        get_client_from_url(&self.ws_uri).await
224    }
225
226    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
227    pub async fn wait_client<Config: subxt::Config>(
228        &self,
229    ) -> Result<OnlineClient<Config>, anyhow::Error> {
230        debug!("wait_client ws_uri: {}", self.ws_uri());
231        wait_ws_ready(self.ws_uri())
232            .await
233            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
234
235        self.try_client()
236            .await
237            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
238    }
239
240    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
241    pub async fn wait_client_with_timeout<Config: subxt::Config>(
242        &self,
243        timeout_secs: impl Into<u64>,
244    ) -> Result<OnlineClient<Config>, anyhow::Error> {
245        debug!("waiting until subxt client is ready");
246        tokio::time::timeout(
247            Duration::from_secs(timeout_secs.into()),
248            self.wait_client::<Config>(),
249        )
250        .await?
251    }
252
253    // Commands
254
255    /// Pause the node, this is implemented by pausing the
256    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
257    ///
258    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
259    /// with global setting `teardown_on_failure` disabled.
260    pub async fn pause(&self) -> Result<(), anyhow::Error> {
261        self.set_is_running(false);
262        self.inner.pause().await?;
263        Ok(())
264    }
265
266    /// Resume the node, this is implemented by resuming the
267    /// actual process (e.g polkadot) with sending `SIGCONT` signal
268    ///
269    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
270    /// with global setting `teardown_on_failure` disabled.
271    pub async fn resume(&self) -> Result<(), anyhow::Error> {
272        self.set_is_running(true);
273        self.inner.resume().await?;
274        Ok(())
275    }
276
277    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
278    ///
279    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
280    /// with global setting `teardown_on_failure` disabled.
281    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
282        self.set_is_running(false);
283        self.inner.restart(after).await?;
284        self.set_is_running(true);
285        Ok(())
286    }
287
288    /// Run a script inside the node's container/environment
289    ///
290    /// The script will be uploaded to the node, made executable, and executed with
291    /// the provided arguments and environment variables.
292    ///
293    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
294    pub async fn run_script(
295        &self,
296        options: RunScriptOptions,
297    ) -> Result<ExecutionResult, anyhow::Error> {
298        self.inner
299            .run_script(options)
300            .await
301            .map_err(|e| anyhow!("Failed to run script: {e}"))
302    }
303
304    // Metrics assertions
305
306    /// Get metric value 'by name' from Prometheus (exposed by the node)
307    /// metric name can be:
308    /// with prefix (e.g: 'polkadot_')
309    /// with chain attribute (e.g: 'chain=rococo-local')
310    /// without prefix and/or without chain attribute
311    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
312        let metric_name = metric_name.into();
313        // force cache reload
314        self.fetch_metrics().await?;
315        // by default we treat not found as 0 (same in v1)
316        self.metric(&metric_name, true).await
317    }
318
319    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
320    /// metric name can be:
321    /// with prefix (e.g: 'polkadot_')
322    /// with chain attribute (e.g: 'chain=rococo-local')
323    /// without prefix and/or without chain attribute
324    ///
325    /// We first try to assert on the value using the cached metrics and
326    /// if not meet the criteria we reload the cache and check again
327    pub async fn assert(
328        &self,
329        metric_name: impl Into<String>,
330        value: impl Into<f64>,
331    ) -> Result<bool, anyhow::Error> {
332        let value: f64 = value.into();
333        self.assert_with(metric_name, |v| v == value).await
334    }
335
336    /// Assert on a metric value using a given predicate.
337    /// See [`NetworkNode::reports`] description for details on metric name.
338    pub async fn assert_with(
339        &self,
340        metric_name: impl Into<String>,
341        predicate: impl Fn(f64) -> bool,
342    ) -> Result<bool, anyhow::Error> {
343        let metric_name = metric_name.into();
344        // reload metrics
345        self.fetch_metrics().await?;
346        let val = self.metric(&metric_name, true).await?;
347        trace!("🔎 Current value {val} passed to the predicated?");
348        Ok(predicate(val))
349    }
350
351    // Wait methods for metrics
352
353    /// Wait until a metric value pass the `predicate`
354    pub async fn wait_metric(
355        &self,
356        metric_name: impl Into<String>,
357        predicate: impl Fn(f64) -> bool,
358    ) -> Result<(), anyhow::Error> {
359        let metric_name = metric_name.into();
360        debug!("waiting until metric {metric_name} pass the predicate");
361        loop {
362            let res = self.assert_with(&metric_name, &predicate).await;
363            match res {
364                Ok(res) => {
365                    if res {
366                        return Ok(());
367                    }
368                },
369                Err(e) => match e.downcast::<reqwest::Error>() {
370                    Ok(io_err) => {
371                        if !skip_err_while_waiting(&io_err) {
372                            return Err(io_err.into());
373                        }
374                    },
375                    Err(other) => {
376                        match other.downcast::<NetworkNodeError>() {
377                            Ok(node_err) => {
378                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
379                                    return Err(node_err.into());
380                                }
381                            },
382                            Err(other) => return Err(other),
383                        };
384                    },
385                },
386            }
387
388            // sleep to not spam prometheus
389            tokio::time::sleep(Duration::from_secs(1)).await;
390        }
391    }
392
393    /// Wait until a metric value pass the `predicate`
394    /// with a timeout (secs)
395    pub async fn wait_metric_with_timeout(
396        &self,
397        metric_name: impl Into<String>,
398        predicate: impl Fn(f64) -> bool,
399        timeout_secs: impl Into<u64>,
400    ) -> Result<(), anyhow::Error> {
401        let metric_name = metric_name.into();
402        let secs = timeout_secs.into();
403        debug!("waiting until metric {metric_name} pass the predicate");
404        let res = tokio::time::timeout(
405            Duration::from_secs(secs),
406            self.wait_metric(&metric_name, predicate),
407        )
408        .await;
409
410        if let Ok(inner_res) = res {
411            match inner_res {
412                Ok(_) => Ok(()),
413                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
414            }
415        } else {
416            // timeout
417            Err(anyhow!(
418                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
419            ))
420        }
421    }
422
423    // Logs
424
425    /// Get the logs of the node
426    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
427    pub async fn logs(&self) -> Result<String, anyhow::Error> {
428        Ok(self.inner.logs().await?)
429    }
430
431    /// Wait until a the number of matching log lines is reach
432    pub async fn wait_log_line_count(
433        &self,
434        pattern: impl Into<String>,
435        is_glob: bool,
436        count: usize,
437    ) -> Result<(), anyhow::Error> {
438        let pattern = pattern.into();
439        let pattern_clone = pattern.clone();
440        debug!("waiting until we find pattern {pattern} {count} times");
441        let match_fn: BoxedClosure = if is_glob {
442            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
443        } else {
444            let re = Regex::new(&pattern)?;
445            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
446        };
447
448        loop {
449            let mut q = 0_usize;
450            let logs = self.logs().await?;
451            for line in logs.lines() {
452                trace!("line is {line}");
453                if match_fn(line)? {
454                    trace!("pattern {pattern_clone} match in line {line}");
455                    q += 1;
456                    if q >= count {
457                        return Ok(());
458                    }
459                }
460            }
461
462            tokio::time::sleep(Duration::from_secs(2)).await;
463        }
464    }
465
466    /// Waits until the number of matching log lines satisfies a custom condition,
467    /// optionally waiting for the entire duration of the timeout.
468    ///
469    /// This method searches log lines for a given substring or glob pattern,
470    /// and evaluates the number of matching lines using a user-provided predicate function.
471    /// Optionally, it can wait for the full timeout duration to ensure the condition
472    /// holds consistently (e.g., for verifying absence of logs).
473    ///
474    /// # Arguments
475    /// * `substring` - The substring or pattern to match within log lines.
476    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
477    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
478    ///
479    /// # Returns
480    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
481    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
482    /// * `Err(e)` if an error occurred during log retrieval or matching.
483    ///
484    /// # Example
485    /// ```rust
486    /// # use std::{sync::Arc, time::Duration};
487    /// # use provider::NativeProvider;
488    /// # use support::{fs::local::LocalFileSystem};
489    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
490    /// # use configuration::NetworkConfig;
491    /// # async fn example() -> Result<(), anyhow::Error> {
492    /// #   let provider = NativeProvider::new(LocalFileSystem {});
493    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
494    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
495    /// #   let network = orchestrator.spawn(config).await?;
496    /// let node = network.get_node("alice")?;
497    /// // Wait (up to 10 seconds) until pattern occurs once
498    /// let options = LogLineCountOptions {
499    ///     predicate: Arc::new(|count| count == 1),
500    ///     timeout: Duration::from_secs(10),
501    ///     wait_until_timeout_elapses: false,
502    /// };
503    /// let result = node
504    ///     .wait_log_line_count_with_timeout("error", false, options)
505    ///     .await?;
506    /// #   Ok(())
507    /// # }
508    /// ```
509    pub async fn wait_log_line_count_with_timeout(
510        &self,
511        substring: impl Into<String>,
512        is_glob: bool,
513        options: LogLineCountOptions,
514    ) -> Result<LogLineCount, anyhow::Error> {
515        let substring = substring.into();
516        debug!(
517            "waiting until match lines count within {} seconds",
518            options.timeout.as_secs_f64()
519        );
520
521        let start = tokio::time::Instant::now();
522
523        let match_fn: BoxedClosure = if is_glob {
524            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
525        } else {
526            let re = Regex::new(&substring)?;
527            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
528        };
529
530        if options.wait_until_timeout_elapses {
531            tokio::time::sleep(options.timeout).await;
532        }
533
534        let mut q;
535        loop {
536            q = 0_u32;
537            let logs = self.logs().await?;
538            for line in logs.lines() {
539                if match_fn(line)? {
540                    q += 1;
541
542                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
543                    // end after the whole log file is processed. This is to address the cases when the
544                    // predicate becomes true and false again.
545                    // eg. expected exactly 2 matching lines are expected but 3 are present
546                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
547                        return Ok(LogLineCount::TargetReached(q));
548                    }
549                }
550            }
551
552            if start.elapsed() >= options.timeout {
553                break;
554            }
555
556            tokio::time::sleep(Duration::from_secs(2)).await;
557        }
558
559        if (options.predicate)(q) {
560            Ok(LogLineCount::TargetReached(q))
561        } else {
562            Ok(LogLineCount::TargetFailed(q))
563        }
564    }
565
566    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
567        let response = reqwest::get(&self.prometheus_uri).await?;
568        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
569        let mut cache = self.metrics_cache.write().await;
570        *cache = metrics;
571        Ok(())
572    }
573
574    /// Query individual metric by name
575    async fn metric(
576        &self,
577        metric_name: &str,
578        treat_not_found_as_zero: bool,
579    ) -> Result<f64, anyhow::Error> {
580        let mut metrics_map = self.metrics_cache.read().await;
581        if metrics_map.is_empty() {
582            // reload metrics
583            drop(metrics_map);
584            self.fetch_metrics().await?;
585            metrics_map = self.metrics_cache.read().await;
586        }
587
588        if let Some(val) = metrics_map.get(metric_name) {
589            Ok(*val)
590        } else if treat_not_found_as_zero {
591            Ok(0_f64)
592        } else {
593            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
594        }
595    }
596
597    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
598    ///
599    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
600    /// and calculating the count of observations in each bucket. It automatically appends
601    /// `_bucket` suffix to the metric name if not already present.
602    ///
603    /// # Arguments
604    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
605    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
606    ///
607    /// # Returns
608    /// A HashMap where keys are the `le` bucket boundaries as strings,
609    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
610    ///
611    /// # Example
612    /// ```ignore
613    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
614    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
615    /// ```
616    pub async fn get_histogram_buckets(
617        &self,
618        metric_name: impl AsRef<str>,
619        label_filters: Option<HashMap<String, String>>,
620    ) -> Result<HashMap<String, u64>, anyhow::Error> {
621        let metric_name = metric_name.as_ref();
622
623        // Fetch and parse metrics using the existing parser
624        let response = reqwest::get(&self.prometheus_uri).await?;
625        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
626
627        // Ensure metric name has _bucket suffix
628        let resolved_metric_name = if metric_name.contains("_bucket") {
629            metric_name.to_string()
630        } else {
631            format!("{}_bucket", metric_name)
632        };
633
634        // First pass: collect all matching metrics with their label counts
635        // to identify which ones have the most complete label sets
636        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
637        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
638
639        for (key, &value) in metrics.iter() {
640            if !key.starts_with(&resolved_metric_name) {
641                continue;
642            }
643
644            let remaining = &key[resolved_metric_name.len()..];
645
646            let labels_str = &remaining[1..remaining.len() - 1];
647            let parsed_labels = Self::parse_label_string(labels_str);
648
649            // Must have "le" label
650            if !parsed_labels.contains_key("le") {
651                continue;
652            }
653
654            // Check if label filters match
655            if let Some(ref filters) = label_filters {
656                let mut all_match = true;
657                for (filter_key, filter_value) in filters {
658                    if parsed_labels.get(filter_key) != Some(filter_value) {
659                        all_match = false;
660                        break;
661                    }
662                }
663                if !all_match {
664                    continue;
665                }
666            }
667
668            metric_entries.push((key.clone(), parsed_labels, value as u64));
669        }
670
671        // Find the maximum number of labels (excluding "le") across all entries
672        // This helps us identify the "fullest" version of each metric
673        let max_label_count = metric_entries
674            .iter()
675            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
676            .max()
677            .unwrap_or(0);
678
679        // Second pass: collect buckets, deduplicating and preferring entries with more labels
680        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
681        let mut seen_le_values = HashSet::new();
682        let mut active_series: Option<Vec<(String, String)>> = None;
683
684        for (_, parsed_labels, value) in metric_entries {
685            let le_value = parsed_labels.get("le").unwrap().clone();
686
687            // Get non-"le" labels
688            let mut non_le_labels: Vec<(String, String)> = parsed_labels
689                .iter()
690                .filter(|(k, _)| k.as_str() != "le")
691                .map(|(k, v)| (k.clone(), v.clone()))
692                .collect();
693            non_le_labels.sort();
694
695            // Only process entries that have the maximum number of labels
696            // (this filters out the parser's duplicate keys with fewer labels)
697            if non_le_labels.len() < max_label_count {
698                continue;
699            }
700
701            // Detect series changes
702            if let Some(ref prev_series) = active_series {
703                if prev_series != &non_le_labels {
704                    if !raw_buckets.is_empty() {
705                        break; // Stop at first series change
706                    }
707                    active_series = Some(non_le_labels.clone());
708                    seen_le_values.clear();
709                }
710            } else {
711                active_series = Some(non_le_labels.clone());
712            }
713
714            // Deduplicate by le value within this series
715            if !seen_le_values.insert(le_value.clone()) {
716                continue;
717            }
718
719            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
720            raw_buckets.push((le_value, value));
721        }
722
723        // Sort buckets by their "le" values
724        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
725
726        // Calculate deltas between cumulative buckets
727        let mut buckets = HashMap::new();
728        let mut previous_value = 0_u64;
729        for (le, cumulative_count) in raw_buckets {
730            if cumulative_count < previous_value {
731                warn!(
732                    "Warning: bucket count decreased from {} to {} at le={}",
733                    previous_value, cumulative_count, le
734                );
735            }
736            let delta = cumulative_count.saturating_sub(previous_value);
737            buckets.insert(le, delta);
738            previous_value = cumulative_count;
739        }
740
741        Ok(buckets)
742    }
743
744    /// Parse label string from parsed metric key.
745    ///
746    /// Takes a label string in the format `key1="value1",key2="value2"`
747    /// and returns a HashMap of key-value pairs.
748    /// Handles commas inside quoted values correctly.
749    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
750        let mut labels = HashMap::new();
751        let mut current_key = String::new();
752        let mut current_value = String::new();
753        let mut in_value = false;
754        let mut in_quotes = false;
755
756        for ch in labels_str.chars() {
757            match ch {
758                '=' if !in_quotes && !in_value => {
759                    in_value = true;
760                },
761                '"' if in_value => {
762                    in_quotes = !in_quotes;
763                },
764                ',' if !in_quotes => {
765                    // End of key-value pair
766                    if !current_key.is_empty() {
767                        labels.insert(
768                            current_key.trim().to_string(),
769                            current_value.trim().to_string(),
770                        );
771                        current_key.clear();
772                        current_value.clear();
773                        in_value = false;
774                    }
775                },
776                _ => {
777                    if in_value {
778                        current_value.push(ch);
779                    } else {
780                        current_key.push(ch);
781                    }
782                },
783            }
784        }
785
786        // Insert last pair
787        if !current_key.is_empty() {
788            labels.insert(
789                current_key.trim().to_string(),
790                current_value.trim().to_string(),
791            );
792        }
793
794        labels
795    }
796
797    /// Compare two histogram bucket boundary values for sorting.
798    ///
799    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
800    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
801        use std::cmp::Ordering;
802
803        // Handle +Inf specially
804        match (a, b) {
805            ("+Inf", "+Inf") => Ordering::Equal,
806            ("+Inf", _) => Ordering::Greater,
807            (_, "+Inf") => Ordering::Less,
808            _ => {
809                // Try to parse as f64 for numeric comparison
810                match (a.parse::<f64>(), b.parse::<f64>()) {
811                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
812                    // Fallback to string comparison if parsing fails
813                    _ => a.cmp(b),
814                }
815            },
816        }
817    }
818
819    /// Waits given number of seconds until node reports that it is up and running, which
820    /// is determined by metric 'process_start_time_seconds', which should appear,
821    /// when node finished booting up.
822    ///
823    ///
824    /// # Arguments
825    /// * `timeout_secs` - The number of seconds to wait.
826    ///
827    /// # Returns
828    /// * `Ok()` if the node is up before timeout occured.
829    /// * `Err(e)` if timeout or other error occurred while waiting.
830    pub async fn wait_until_is_up(
831        &self,
832        timeout_secs: impl Into<u64>,
833    ) -> Result<(), anyhow::Error> {
834        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
835            .await
836            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
837    }
838}
839
840impl std::fmt::Debug for NetworkNode {
841    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
842        f.debug_struct("NetworkNode")
843            .field("inner", &"inner_skipped")
844            .field("spec", &self.spec)
845            .field("name", &self.name)
846            .field("ws_uri", &self.ws_uri)
847            .field("prometheus_uri", &self.prometheus_uri)
848            .finish()
849    }
850}
851
852fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
853where
854    S: Serializer,
855{
856    erased_serde::serialize(node.as_ref(), serializer)
857}
858
859// TODO: mock and impl more unit tests
860#[cfg(test)]
861mod tests {
862    use std::{
863        path::{Path, PathBuf},
864        sync::{Arc, Mutex},
865    };
866
867    use async_trait::async_trait;
868    use provider::{types::*, ProviderError, ProviderNode};
869
870    use super::*;
871
872    #[derive(Serialize)]
873    struct MockNode {
874        logs: Arc<Mutex<Vec<String>>>,
875    }
876
877    impl MockNode {
878        fn new() -> Self {
879            Self {
880                logs: Arc::new(Mutex::new(vec![])),
881            }
882        }
883
884        fn logs_push(&self, lines: Vec<impl Into<String>>) {
885            self.logs
886                .lock()
887                .unwrap()
888                .extend(lines.into_iter().map(|l| l.into()));
889        }
890    }
891
892    #[async_trait]
893    impl ProviderNode for MockNode {
894        fn name(&self) -> &str {
895            todo!()
896        }
897
898        fn args(&self) -> Vec<&str> {
899            todo!()
900        }
901
902        fn base_dir(&self) -> &PathBuf {
903            todo!()
904        }
905
906        fn config_dir(&self) -> &PathBuf {
907            todo!()
908        }
909
910        fn data_dir(&self) -> &PathBuf {
911            todo!()
912        }
913
914        fn relay_data_dir(&self) -> &PathBuf {
915            todo!()
916        }
917
918        fn scripts_dir(&self) -> &PathBuf {
919            todo!()
920        }
921
922        fn log_path(&self) -> &PathBuf {
923            todo!()
924        }
925
926        fn log_cmd(&self) -> String {
927            todo!()
928        }
929
930        fn path_in_node(&self, _file: &Path) -> PathBuf {
931            todo!()
932        }
933
934        async fn logs(&self) -> Result<String, ProviderError> {
935            Ok(self.logs.lock().unwrap().join("\n"))
936        }
937
938        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
939            todo!()
940        }
941
942        async fn run_command(
943            &self,
944            _options: RunCommandOptions,
945        ) -> Result<ExecutionResult, ProviderError> {
946            todo!()
947        }
948
949        async fn run_script(
950            &self,
951            _options: RunScriptOptions,
952        ) -> Result<ExecutionResult, ProviderError> {
953            todo!()
954        }
955
956        async fn send_file(
957            &self,
958            _local_file_path: &Path,
959            _remote_file_path: &Path,
960            _mode: &str,
961        ) -> Result<(), ProviderError> {
962            todo!()
963        }
964
965        async fn receive_file(
966            &self,
967            _remote_file_path: &Path,
968            _local_file_path: &Path,
969        ) -> Result<(), ProviderError> {
970            todo!()
971        }
972
973        async fn pause(&self) -> Result<(), ProviderError> {
974            todo!()
975        }
976
977        async fn resume(&self) -> Result<(), ProviderError> {
978            todo!()
979        }
980
981        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
982            todo!()
983        }
984
985        async fn destroy(&self) -> Result<(), ProviderError> {
986            todo!()
987        }
988    }
989
990    #[tokio::test(flavor = "multi_thread")]
991    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
992        let mock_provider = Arc::new(MockNode::new());
993        let mock_node = NetworkNode::new(
994            "node1",
995            "ws_uri",
996            "prometheus_uri",
997            "multiaddr",
998            NodeSpec::default(),
999            mock_provider.clone(),
1000        );
1001
1002        mock_provider.logs_push(vec![
1003            "system booting",
1004            "stub line 1",
1005            "stub line 2",
1006            "system ready",
1007        ]);
1008
1009        // Wait (up to 10 seconds) until pattern occurs once
1010        let options = LogLineCountOptions {
1011            predicate: Arc::new(|n| n == 1),
1012            timeout: Duration::from_secs(10),
1013            wait_until_timeout_elapses: false,
1014        };
1015
1016        let log_line_count = mock_node
1017            .wait_log_line_count_with_timeout("system ready", false, options)
1018            .await?;
1019
1020        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1021
1022        Ok(())
1023    }
1024
1025    #[tokio::test(flavor = "multi_thread")]
1026    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1027        let mock_provider = Arc::new(MockNode::new());
1028        let mock_node = NetworkNode::new(
1029            "node1",
1030            "ws_uri",
1031            "prometheus_uri",
1032            "multiaddr",
1033            NodeSpec::default(),
1034            mock_provider.clone(),
1035        );
1036
1037        mock_provider.logs_push(vec![
1038            "system booting",
1039            "stub line 1",
1040            "stub line 2",
1041            "system ready",
1042        ]);
1043
1044        // Wait (up to 4 seconds) until pattern occurs twice
1045        let options = LogLineCountOptions {
1046            predicate: Arc::new(|n| n == 2),
1047            timeout: Duration::from_secs(4),
1048            wait_until_timeout_elapses: false,
1049        };
1050
1051        let task = tokio::spawn({
1052            async move {
1053                mock_node
1054                    .wait_log_line_count_with_timeout("system ready", false, options)
1055                    .await
1056                    .unwrap()
1057            }
1058        });
1059
1060        tokio::time::sleep(Duration::from_secs(2)).await;
1061
1062        mock_provider.logs_push(vec!["system ready"]);
1063
1064        let log_line_count = task.await?;
1065
1066        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1067
1068        Ok(())
1069    }
1070
1071    #[tokio::test(flavor = "multi_thread")]
1072    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1073        let mock_provider = Arc::new(MockNode::new());
1074        let mock_node = NetworkNode::new(
1075            "node1",
1076            "ws_uri",
1077            "prometheus_uri",
1078            "multiaddr",
1079            NodeSpec::default(),
1080            mock_provider.clone(),
1081        );
1082
1083        mock_provider.logs_push(vec![
1084            "system booting",
1085            "stub line 1",
1086            "stub line 2",
1087            "system ready",
1088        ]);
1089
1090        // Wait (up to 2 seconds) until pattern occurs twice
1091        let options = LogLineCountOptions {
1092            predicate: Arc::new(|n| n == 2),
1093            timeout: Duration::from_secs(2),
1094            wait_until_timeout_elapses: false,
1095        };
1096
1097        let log_line_count = mock_node
1098            .wait_log_line_count_with_timeout("system ready", false, options)
1099            .await?;
1100
1101        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1102
1103        Ok(())
1104    }
1105
1106    #[tokio::test(flavor = "multi_thread")]
1107    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1108        let mock_provider = Arc::new(MockNode::new());
1109        let mock_node = NetworkNode::new(
1110            "node1",
1111            "ws_uri",
1112            "prometheus_uri",
1113            "multiaddr",
1114            NodeSpec::default(),
1115            mock_provider.clone(),
1116        );
1117
1118        mock_provider.logs_push(vec![
1119            "system booting",
1120            "stub line 1",
1121            "stub line 2",
1122            "system ready",
1123        ]);
1124
1125        // Wait until timeout and check if pattern occurs exactly twice
1126        let options = LogLineCountOptions {
1127            predicate: Arc::new(|n| n == 2),
1128            timeout: Duration::from_secs(2),
1129            wait_until_timeout_elapses: true,
1130        };
1131
1132        let task = tokio::spawn({
1133            async move {
1134                mock_node
1135                    .wait_log_line_count_with_timeout("system ready", false, options)
1136                    .await
1137                    .unwrap()
1138            }
1139        });
1140
1141        tokio::time::sleep(Duration::from_secs(1)).await;
1142
1143        mock_provider.logs_push(vec!["system ready"]);
1144        mock_provider.logs_push(vec!["system ready"]);
1145
1146        let log_line_count = task.await?;
1147
1148        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1149
1150        Ok(())
1151    }
1152
1153    #[tokio::test(flavor = "multi_thread")]
1154    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1155        let mock_provider = Arc::new(MockNode::new());
1156        let mock_node = NetworkNode::new(
1157            "node1",
1158            "ws_uri",
1159            "prometheus_uri",
1160            "multiaddr",
1161            NodeSpec::default(),
1162            mock_provider.clone(),
1163        );
1164
1165        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1166
1167        let task = tokio::spawn({
1168            async move {
1169                mock_node
1170                    .wait_log_line_count_with_timeout(
1171                        "system ready",
1172                        false,
1173                        // Wait until timeout and make sure pattern occurred zero times
1174                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1175                    )
1176                    .await
1177                    .unwrap()
1178            }
1179        });
1180
1181        tokio::time::sleep(Duration::from_secs(1)).await;
1182
1183        mock_provider.logs_push(vec!["stub line 3"]);
1184
1185        assert!(task.await?.success());
1186
1187        Ok(())
1188    }
1189
1190    #[tokio::test(flavor = "multi_thread")]
1191    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1192        let mock_provider = Arc::new(MockNode::new());
1193        let mock_node = NetworkNode::new(
1194            "node1",
1195            "ws_uri",
1196            "prometheus_uri",
1197            "multiaddr",
1198            NodeSpec::default(),
1199            mock_provider.clone(),
1200        );
1201
1202        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1203
1204        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1205        let options = LogLineCountOptions {
1206            predicate: Arc::new(|n| (2..=5).contains(&n)),
1207            timeout: Duration::from_secs(2),
1208            wait_until_timeout_elapses: true,
1209        };
1210
1211        let task = tokio::spawn({
1212            async move {
1213                mock_node
1214                    .wait_log_line_count_with_timeout("system ready", false, options)
1215                    .await
1216                    .unwrap()
1217            }
1218        });
1219
1220        tokio::time::sleep(Duration::from_secs(1)).await;
1221
1222        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1223
1224        assert!(task.await?.success());
1225
1226        Ok(())
1227    }
1228
1229    #[tokio::test(flavor = "multi_thread")]
1230    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1231        let mock_provider = Arc::new(MockNode::new());
1232        let mock_node = NetworkNode::new(
1233            "node1",
1234            "ws_uri",
1235            "prometheus_uri",
1236            "multiaddr",
1237            NodeSpec::default(),
1238            mock_provider.clone(),
1239        );
1240
1241        mock_provider.logs_push(vec![
1242            "system booting",
1243            "stub line 1",
1244            // this line should not match
1245            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1246            "stub line 2"
1247        ]);
1248
1249        let options = LogLineCountOptions {
1250            predicate: Arc::new(|n| n == 1),
1251            timeout: Duration::from_secs(3),
1252            wait_until_timeout_elapses: true,
1253        };
1254
1255        let task = tokio::spawn({
1256            async move {
1257                mock_node
1258                    .wait_log_line_count_with_timeout(
1259                        "error(?! importing block .*: block has an unknown parent)",
1260                        false,
1261                        options,
1262                    )
1263                    .await
1264                    .unwrap()
1265            }
1266        });
1267
1268        tokio::time::sleep(Duration::from_secs(1)).await;
1269
1270        mock_provider.logs_push(vec![
1271            "system ready",
1272            // this line should match
1273            "system error",
1274            "system ready",
1275        ]);
1276
1277        assert!(task.await?.success());
1278
1279        Ok(())
1280    }
1281
1282    #[tokio::test(flavor = "multi_thread")]
1283    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1284    ) -> Result<(), anyhow::Error> {
1285        let mock_provider = Arc::new(MockNode::new());
1286        let mock_node = NetworkNode::new(
1287            "node1",
1288            "ws_uri",
1289            "prometheus_uri",
1290            "multiaddr",
1291            NodeSpec::default(),
1292            mock_provider.clone(),
1293        );
1294
1295        mock_provider.logs_push(vec![
1296            "system booting",
1297            "stub line 1",
1298            // this line should not match
1299            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1300            "stub line 2"
1301        ]);
1302
1303        let options = LogLineCountOptions {
1304            predicate: Arc::new(|n| n == 1),
1305            timeout: Duration::from_secs(6),
1306            wait_until_timeout_elapses: true,
1307        };
1308
1309        let task = tokio::spawn({
1310            async move {
1311                mock_node
1312                    .wait_log_line_count_with_timeout(
1313                        "error(?! importing block .*: block has an unknown parent)",
1314                        false,
1315                        options,
1316                    )
1317                    .await
1318                    .unwrap()
1319            }
1320        });
1321
1322        tokio::time::sleep(Duration::from_secs(1)).await;
1323
1324        mock_provider.logs_push(vec!["system ready", "system ready"]);
1325
1326        assert!(!task.await?.success());
1327
1328        Ok(())
1329    }
1330
1331    #[tokio::test(flavor = "multi_thread")]
1332    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1333        let mock_provider = Arc::new(MockNode::new());
1334        let mock_node = NetworkNode::new(
1335            "node1",
1336            "ws_uri",
1337            "prometheus_uri",
1338            "multiaddr",
1339            NodeSpec::default(),
1340            mock_provider.clone(),
1341        );
1342
1343        mock_provider.logs_push(vec![
1344            "system booting",
1345            "stub line 1",
1346            // this line should not match
1347            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1348            "stub line 2"
1349        ]);
1350
1351        let task = tokio::spawn({
1352            async move {
1353                mock_node
1354                    .wait_log_line_count(
1355                        "error(?! importing block .*: block has an unknown parent)",
1356                        false,
1357                        1,
1358                    )
1359                    .await
1360                    .unwrap()
1361            }
1362        });
1363
1364        tokio::time::sleep(Duration::from_secs(1)).await;
1365
1366        mock_provider.logs_push(vec![
1367            "system ready",
1368            // this line should match
1369            "system error",
1370            "system ready",
1371        ]);
1372
1373        assert!(task.await.is_ok());
1374
1375        Ok(())
1376    }
1377
1378    #[tokio::test(flavor = "multi_thread")]
1379    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1380        let mock_provider = Arc::new(MockNode::new());
1381        let mock_node = NetworkNode::new(
1382            "node1",
1383            "ws_uri",
1384            "prometheus_uri",
1385            "multiaddr",
1386            NodeSpec::default(),
1387            mock_provider.clone(),
1388        );
1389
1390        mock_provider.logs_push(vec![
1391            "system booting",
1392            "stub line 1",
1393            // this line should not match
1394            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1395            "stub line 2"
1396        ]);
1397
1398        let options = LogLineCountOptions {
1399            predicate: Arc::new(|count| count == 1),
1400            timeout: Duration::from_secs(2),
1401            wait_until_timeout_elapses: true,
1402        };
1403
1404        let task = tokio::spawn({
1405            async move {
1406                // we expect no match, thus wait with timeout
1407                mock_node
1408                    .wait_log_line_count_with_timeout(
1409                        "error(?! importing block .*: block has an unknown parent)",
1410                        false,
1411                        options,
1412                    )
1413                    .await
1414                    .unwrap()
1415            }
1416        });
1417
1418        tokio::time::sleep(Duration::from_secs(1)).await;
1419
1420        mock_provider.logs_push(vec!["system ready", "system ready"]);
1421
1422        assert!(!task.await?.success());
1423
1424        Ok(())
1425    }
1426
1427    #[tokio::test]
1428    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1429        // This test uses a mock HTTP server to simulate Prometheus metrics
1430        use std::sync::Arc;
1431
1432        // Create a mock metrics response with proper HELP and TYPE comments
1433        let mock_metrics = concat!(
1434            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1435            "# TYPE substrate_block_verification_time histogram\n",
1436            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1437            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1438            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1439            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1440            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1441            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1442            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1443        );
1444
1445        // Start a mock HTTP server
1446        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1447        let addr = listener.local_addr()?;
1448        let metrics = Arc::new(mock_metrics.to_string());
1449
1450        tokio::spawn({
1451            let metrics = metrics.clone();
1452            async move {
1453                loop {
1454                    if let Ok((mut socket, _)) = listener.accept().await {
1455                        let metrics = metrics.clone();
1456                        tokio::spawn(async move {
1457                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1458                            let mut buffer = [0; 1024];
1459                            let _ = socket.read(&mut buffer).await;
1460
1461                            let response = format!(
1462                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1463                                metrics.len(),
1464                                metrics
1465                            );
1466                            let _ = socket.write_all(response.as_bytes()).await;
1467                        });
1468                    }
1469                }
1470            }
1471        });
1472
1473        // Create a NetworkNode with the mock prometheus URI
1474        let mock_provider = Arc::new(MockNode::new());
1475        let mock_node = NetworkNode::new(
1476            "test_node",
1477            "ws://localhost:9944",
1478            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1479            "/ip4/127.0.0.1/tcp/30333",
1480            NodeSpec::default(),
1481            mock_provider,
1482        );
1483
1484        // Get buckets with label filter
1485        let mut label_filters = HashMap::new();
1486        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1487        let buckets = mock_node
1488            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1489            .await?;
1490
1491        // Should get the rococo_local_testnet chain's buckets
1492        assert_eq!(buckets.get("0.1"), Some(&10));
1493        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1494        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1495        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1496        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1497
1498        // Get buckets with label filter for rococo
1499        let mut label_filters = std::collections::HashMap::new();
1500        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1501
1502        let buckets_filtered = mock_node
1503            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1504            .await?;
1505
1506        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1507        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1508
1509        // Test 3: Get buckets with _bucket suffix already present
1510        let buckets_with_suffix = mock_node
1511            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1512            .await?;
1513
1514        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1515
1516        Ok(())
1517    }
1518
1519    #[tokio::test]
1520    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1521        // Test that buckets are correctly sorted even when received out of order
1522        use std::sync::Arc;
1523
1524        let mock_metrics = concat!(
1525            "# HELP test_metric A test metric\n",
1526            "# TYPE test_metric histogram\n",
1527            "test_metric_bucket{le=\"2.5\"} 40\n",
1528            "test_metric_bucket{le=\"0.1\"} 10\n",
1529            "test_metric_bucket{le=\"+Inf\"} 42\n",
1530            "test_metric_bucket{le=\"1.0\"} 35\n",
1531            "test_metric_bucket{le=\"0.5\"} 25\n",
1532        );
1533
1534        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1535        let addr = listener.local_addr()?;
1536        let metrics = Arc::new(mock_metrics.to_string());
1537
1538        tokio::spawn({
1539            let metrics = metrics.clone();
1540            async move {
1541                loop {
1542                    if let Ok((mut socket, _)) = listener.accept().await {
1543                        let metrics = metrics.clone();
1544                        tokio::spawn(async move {
1545                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1546                            let mut buffer = [0; 1024];
1547                            let _ = socket.read(&mut buffer).await;
1548                            let response = format!(
1549                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1550                                metrics.len(),
1551                                metrics
1552                            );
1553                            let _ = socket.write_all(response.as_bytes()).await;
1554                        });
1555                    }
1556                }
1557            }
1558        });
1559
1560        let mock_provider = Arc::new(MockNode::new());
1561        let mock_node = NetworkNode::new(
1562            "test_node",
1563            "ws://localhost:9944",
1564            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1565            "/ip4/127.0.0.1/tcp/30333",
1566            NodeSpec::default(),
1567            mock_provider,
1568        );
1569
1570        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1571
1572        // Verify deltas are calculated correctly after sorting
1573        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1574        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1575        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1576        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1577        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1578
1579        Ok(())
1580    }
1581
1582    #[tokio::test]
1583    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1584        // Test label parsing with commas and special characters in values
1585        use std::sync::Arc;
1586
1587        let mock_metrics = concat!(
1588            "# HELP test_metric A test metric\n",
1589            "# TYPE test_metric histogram\n",
1590            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1591            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1592            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1593        );
1594
1595        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1596        let addr = listener.local_addr()?;
1597        let metrics = Arc::new(mock_metrics.to_string());
1598
1599        tokio::spawn({
1600            let metrics = metrics.clone();
1601            async move {
1602                loop {
1603                    if let Ok((mut socket, _)) = listener.accept().await {
1604                        let metrics = metrics.clone();
1605                        tokio::spawn(async move {
1606                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1607                            let mut buffer = [0; 1024];
1608                            let _ = socket.read(&mut buffer).await;
1609                            let response = format!(
1610                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1611                                metrics.len(),
1612                                metrics
1613                            );
1614                            let _ = socket.write_all(response.as_bytes()).await;
1615                        });
1616                    }
1617                }
1618            }
1619        });
1620
1621        let mock_provider = Arc::new(MockNode::new());
1622        let mock_node = NetworkNode::new(
1623            "test_node",
1624            "ws://localhost:9944",
1625            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1626            "/ip4/127.0.0.1/tcp/30333",
1627            NodeSpec::default(),
1628            mock_provider,
1629        );
1630
1631        // Test without filter
1632        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1633        assert_eq!(buckets.get("0.1"), Some(&5));
1634        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1635        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1636
1637        // Test with filter containing comma in value
1638        let mut label_filters = std::collections::HashMap::new();
1639        label_filters.insert("method".to_string(), "GET,POST".to_string());
1640
1641        let buckets_filtered = mock_node
1642            .get_histogram_buckets("test_metric", Some(label_filters))
1643            .await?;
1644
1645        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1646        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1647
1648        Ok(())
1649    }
1650
1651    #[test]
1652    fn test_compare_le_values() {
1653        use std::cmp::Ordering;
1654
1655        use crate::network::node::NetworkNode;
1656
1657        // Numeric comparison
1658        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1659        assert_eq!(
1660            NetworkNode::compare_le_values("1.0", "0.5"),
1661            Ordering::Greater
1662        );
1663        assert_eq!(
1664            NetworkNode::compare_le_values("1.0", "1.0"),
1665            Ordering::Equal
1666        );
1667
1668        // +Inf handling
1669        assert_eq!(
1670            NetworkNode::compare_le_values("+Inf", "999"),
1671            Ordering::Greater
1672        );
1673        assert_eq!(
1674            NetworkNode::compare_le_values("0.1", "+Inf"),
1675            Ordering::Less
1676        );
1677        assert_eq!(
1678            NetworkNode::compare_le_values("+Inf", "+Inf"),
1679            Ordering::Equal
1680        );
1681
1682        // Large numbers
1683        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1684        assert_eq!(
1685            NetworkNode::compare_le_values("1000", "999"),
1686            Ordering::Greater
1687        );
1688    }
1689}