Skip to main content

zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::{Path, PathBuf},
4    sync::{
5        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
6        Arc,
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::anyhow;
12use configuration::types::{Arg, AssetLocation};
13use fancy_regex::Regex;
14use glob_match::glob_match;
15use prom_metrics_parser::MetricMap;
16use provider::{
17    types::{ExecutionResult, InnerSnapshotDb, RunScriptOptions},
18    DynNode,
19};
20use serde::{Deserialize, Serialize, Serializer};
21use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
22use support::net::{skip_err_while_waiting, wait_ws_ready};
23use thiserror::Error;
24use tokio::sync::RwLock;
25use tracing::{debug, trace, warn};
26
27use crate::{
28    generators::{generate_node_command, generate_node_command_cumulus, GenCmdOptions},
29    network::NodeContext,
30    network_spec::node::NodeSpec,
31    shared::{constants::PROCESS_START_TIME_METRIC, types::NodeSnapshot},
32    tx_helper::client::get_client_from_url,
33};
34
35type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
36
37#[derive(Error, Debug)]
38pub enum NetworkNodeError {
39    #[error("metric '{0}' not found!")]
40    MetricNotFound(String),
41}
42
43// Log target for the internal monitor
44const MONITOR_TARGET: &str = "zombie_monitor";
45#[derive(Clone, Serialize)]
46pub struct NetworkNode {
47    #[serde(serialize_with = "serialize_provider_node")]
48    pub(crate) inner: DynNode,
49    // TODO: do we need the full spec here?
50    // Maybe a reduce set of values.
51    pub(crate) spec: NodeSpec,
52    pub(crate) name: String,
53    pub(crate) ws_uri: String,
54    pub(crate) multiaddr: String,
55    pub(crate) prometheus_uri: String,
56    // Store the option used to generate the cmd,
57    // since we can use it later for recalculate the (cmd, args) from a
58    // modified spec.
59    pub(crate) cmd_generator_opts: GenCmdOptions,
60    // Store the context used to generate this node.
61    pub(crate) context: NodeContext,
62    #[serde(skip)]
63    metrics_cache: Arc<RwLock<MetricMap>>,
64    #[serde(skip)]
65    is_running: Arc<AtomicBool>,
66    // Store the last timestamp when we start the node
67    #[serde(skip)]
68    last_start_ts: Arc<AtomicU64>,
69}
70
71#[derive(Deserialize)]
72pub(crate) struct RawNetworkNode {
73    pub(crate) name: String,
74    pub(crate) ws_uri: String,
75    pub(crate) prometheus_uri: String,
76    pub(crate) multiaddr: String,
77    pub(crate) spec: NodeSpec,
78    pub(crate) cmd_generator_opts: GenCmdOptions,
79    pub(crate) context: NodeContext,
80    #[serde(default)]
81    pub(crate) inner: serde_json::Value,
82}
83
84/// Result of waiting for a certain counter (number of log lines or events).
85///
86/// Indicates whether the log line count condition was met within the timeout period.
87///
88/// # Variants
89/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
90///     * `count`: The number of matching instances at the time of satisfaction.
91/// - `TargetFailed(count)` – The condition was not met within the timeout.
92///     * `count`: The final number of matching instances at timeout expiration.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum WaitCount {
95    TargetReached(u32),
96    TargetFailed(u32),
97}
98
99impl WaitCount {
100    pub fn success(&self) -> bool {
101        match self {
102            Self::TargetReached(..) => true,
103            Self::TargetFailed(..) => false,
104        }
105    }
106}
107
108pub type LogLineCount = WaitCount;
109
110/// Configuration for controlling  count waiting behavior.
111///
112/// Allows specifying a custom predicate on the number of matchs,
113/// a timeout in seconds, and whether the system should wait the entire timeout duration.
114///
115/// # Fields
116/// - `predicate`: A function that takes the current value (metric, log lines) and
117///   returns `true` if the condition is satisfied.
118/// - `timeout_secs`: Maximum number of seconds to wait.
119/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
120///   for the full timeout duration, even if the condition is already met early.
121///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
122#[derive(Clone)]
123pub struct CountOptions {
124    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
125    pub timeout: Duration,
126    pub wait_until_timeout_elapses: bool,
127}
128
129impl CountOptions {
130    pub fn new(
131        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
132        timeout: Duration,
133        wait_until_timeout_elapses: bool,
134    ) -> Self {
135        Self {
136            predicate: Arc::new(predicate),
137            timeout,
138            wait_until_timeout_elapses,
139        }
140    }
141
142    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
143        Self::new(|n| n == 0, timeout, true)
144    }
145
146    pub fn at_least_once(timeout: Duration) -> Self {
147        Self::new(|count| count >= 1, timeout, false)
148    }
149
150    pub fn at_least(target: u32, timeout: Duration) -> Self {
151        Self::new(move |count| count >= target, timeout, false)
152    }
153
154    pub fn exactly_once(timeout: Duration) -> Self {
155        Self::new(|count| count == 1, timeout, false)
156    }
157}
158
159pub type LogLineCountOptions = CountOptions;
160
161impl NetworkNode {
162    /// Create a new NetworkNode
163    #[allow(clippy::too_many_arguments)]
164    pub(crate) fn new<T: Into<String>>(
165        name: T,
166        ws_uri: T,
167        prometheus_uri: T,
168        multiaddr: T,
169        spec: NodeSpec,
170        inner: DynNode,
171        cmd_generator_opts: GenCmdOptions,
172        context: NodeContext,
173    ) -> Self {
174        Self {
175            name: name.into(),
176            ws_uri: ws_uri.into(),
177            prometheus_uri: prometheus_uri.into(),
178            inner,
179            spec,
180            cmd_generator_opts,
181            context,
182            multiaddr: multiaddr.into(),
183            metrics_cache: Arc::new(Default::default()),
184            is_running: Arc::new(AtomicBool::new(false)),
185            last_start_ts: Arc::new(AtomicU64::new(0)),
186        }
187    }
188
189    /// Check if the node is currently running (not paused).
190    ///
191    /// This returns the internal running state.
192    pub fn is_running(&self) -> bool {
193        self.is_running.load(Ordering::Acquire)
194    }
195
196    /// Get the last timestamp when the node start.
197    pub fn last_start_ts(&self) -> u64 {
198        self.last_start_ts.load(Ordering::Acquire)
199    }
200
201    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
202    ///
203    /// This performs an actual connection attempt with a short timeout (2 seconds).
204    /// Returns `true` if the node is reachable and responding, `false` otherwise.
205    ///
206    /// This is more robust than `is_running()` as it verifies the node is actually alive.
207    pub async fn is_responsive(&self) -> bool {
208        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
209            .await
210            .is_ok()
211    }
212
213    pub(crate) fn set_is_running(&self, is_running: bool) {
214        self.is_running.store(is_running, Ordering::Release);
215    }
216
217    /// Set the timestamp when the node was started
218    pub(crate) fn set_last_start_ts(&self, ts: u64) {
219        self.last_start_ts.store(ts, Ordering::Release);
220    }
221
222    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
223        self.multiaddr = multiaddr.into();
224    }
225
226    pub fn name(&self) -> &str {
227        &self.name
228    }
229
230    /// Args used for bootstrap the node.
231    /// NOTE: this may not be in sync if you restart the node with [`NetworkNode::restart_with`].
232    pub fn args(&self) -> Vec<&str> {
233        self.inner.args()
234    }
235
236    /// Only include the args provided by the user, filtering all
237    /// autogenerated by zombienet.
238    pub fn user_args(&self) -> Vec<String> {
239        self.spec
240            .args
241            .iter()
242            .fold(vec![], |acc, arg| [acc, arg.to_vec()].concat())
243    }
244
245    pub fn spec(&self) -> &NodeSpec {
246        &self.spec
247    }
248
249    pub fn ws_uri(&self) -> &str {
250        &self.ws_uri
251    }
252
253    pub fn multiaddr(&self) -> &str {
254        self.multiaddr.as_ref()
255    }
256
257    // Subxt
258
259    /// Get the rpc client for the node
260    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
261        get_client_from_url(&self.ws_uri).await
262    }
263
264    /// Get the [online client](subxt::client::OnlineClient) for the node
265    #[deprecated = "Use `wait_client` instead."]
266    pub async fn client<Config: subxt::Config>(
267        &self,
268    ) -> Result<OnlineClient<Config>, subxt::Error> {
269        self.try_client().await
270    }
271
272    /// Try to connect to the node.
273    ///
274    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
275    /// the node to appear before it connects to it. This function directly tries
276    /// to connect to the node and returns an error if the node is not yet available
277    /// at that point in time.
278    ///
279    /// Returns a [`OnlineClient`] on success.
280    pub async fn try_client<Config: subxt::Config>(
281        &self,
282    ) -> Result<OnlineClient<Config>, subxt::Error> {
283        get_client_from_url(&self.ws_uri).await
284    }
285
286    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
287    pub async fn wait_client<Config: subxt::Config>(
288        &self,
289    ) -> Result<OnlineClient<Config>, anyhow::Error> {
290        debug!("wait_client ws_uri: {}", self.ws_uri());
291        wait_ws_ready(self.ws_uri())
292            .await
293            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
294
295        self.try_client()
296            .await
297            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
298    }
299
300    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
301    pub async fn wait_client_with_timeout<Config: subxt::Config>(
302        &self,
303        timeout_secs: impl Into<u64>,
304    ) -> Result<OnlineClient<Config>, anyhow::Error> {
305        debug!("waiting until subxt client is ready");
306        tokio::time::timeout(
307            Duration::from_secs(timeout_secs.into()),
308            self.wait_client::<Config>(),
309        )
310        .await?
311    }
312
313    // Commands
314
315    /// Pause the node, this is implemented by pausing the
316    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
317    ///
318    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
319    /// with global setting `teardown_on_failure` disabled.
320    pub async fn pause(&self) -> Result<(), anyhow::Error> {
321        self.set_is_running(false);
322        self.inner.pause().await?;
323        Ok(())
324    }
325
326    /// Resume the node, this is implemented by resuming the
327    /// actual process (e.g polkadot) with sending `SIGCONT` signal
328    ///
329    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
330    /// with global setting `teardown_on_failure` disabled.
331    pub async fn resume(&self) -> Result<(), anyhow::Error> {
332        self.set_is_running(true);
333        self.inner.resume().await?;
334        Ok(())
335    }
336
337    /// On-disk base directory of the node — root of `data/`, `relay-data/`,
338    /// `cfg/`, etc.
339    /// This will be the _base directory_ of the inner (provider) node.
340    pub fn base_dir(&self) -> &PathBuf {
341        self.inner.base_dir()
342    }
343
344    /// Tar the node's database into `out_path` (gzipped).
345    ///
346    /// NOTE: Currently __only__ implemented in native provider. Also,
347    /// the caller is responsible for pausing the node first;
348    /// snapshotting a running node risks a torn RocksDB state.
349    pub async fn snapshot_db(
350        &self,
351        out_path: impl AsRef<Path>,
352    ) -> Result<NodeSnapshot, anyhow::Error> {
353        let out_path = out_path.as_ref().to_path_buf();
354        let is_cumulus_based = matches!(
355            self.context,
356            NodeContext::Para {
357                is_cumulus_based: true,
358                ..
359            }
360        );
361
362        let InnerSnapshotDb {
363            filename,
364            sha256,
365            size,
366        } = self.inner.snapshot_db(is_cumulus_based).await?;
367
368        // now we need to _move_ the inner file to the out_path
369        let remote_file_path = PathBuf::from(&filename);
370        self.inner
371            .receive_file(remote_file_path.as_ref(), out_path.as_ref())
372            .await?;
373
374        Ok(NodeSnapshot {
375            path: out_path,
376            sha256,
377            size,
378            node_name: self.name().into(),
379        })
380    }
381
382    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
383    ///
384    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
385    /// with global setting `teardown_on_failure` disabled.
386    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
387        self.set_is_running(false);
388        self.inner.restart(after).await?;
389        self.set_is_running(true);
390        self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
391        Ok(())
392    }
393
394    /// Restart the node using the optional provided:
395    /// - Assets: binaries to download
396    ///   NOTE: if you want to use a diff version of polkadot you need both the
397    ///   main bin (polkadot) and the workers (polkadot-execute-worker/polkadot-prepare-worker)
398    /// - cmd: program to exec.
399    /// - args: Arguments to override the ones provided in config.
400    ///
401    /// The node will be restarted with the same `env` and isolated dirrectory (e.g same database).
402    ///
403    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
404    /// with global setting `teardown_on_failure` disabled.
405    pub async fn restart_with(
406        &self,
407        assets: Vec<AssetLocation>,
408        program: Option<String>,
409        args: Option<Vec<Arg>>,
410        after: Option<Duration>,
411    ) -> Result<(), anyhow::Error> {
412        self.set_is_running(false);
413        let mut spec_cloned = self.spec.clone();
414
415        if let Some(args) = args {
416            spec_cloned.args = args;
417        }
418        if let Some(program) = program {
419            spec_cloned.command = program.as_str().try_into()?;
420        }
421
422        let (program, args) = match self.context {
423            NodeContext::Rc
424            | NodeContext::Para {
425                is_cumulus_based: false,
426                ..
427            } => generate_node_command(&spec_cloned, self.cmd_generator_opts.clone(), None),
428            NodeContext::Para {
429                para_id,
430                is_cumulus_based: true,
431            } => generate_node_command_cumulus(
432                &spec_cloned,
433                self.cmd_generator_opts.clone(),
434                para_id,
435            ),
436        };
437
438        self.inner
439            .restart_with(&assets, &program, &args, after)
440            .await?;
441        self.set_is_running(true);
442        self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
443        Ok(())
444    }
445
446    /// Run a script inside the node's container/environment
447    ///
448    /// The script will be uploaded to the node, made executable, and executed with
449    /// the provided arguments and environment variables.
450    ///
451    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
452    pub async fn run_script(
453        &self,
454        options: RunScriptOptions,
455    ) -> Result<ExecutionResult, anyhow::Error> {
456        self.inner
457            .run_script(options)
458            .await
459            .map_err(|e| anyhow!("Failed to run script: {e}"))
460    }
461
462    // Metrics assertions
463
464    /// Get metric value 'by name' from Prometheus (exposed by the node)
465    /// metric name can be:
466    /// with prefix (e.g: 'polkadot_')
467    /// with chain attribute (e.g: 'chain=rococo-local')
468    /// without prefix and/or without chain attribute
469    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
470        let metric_name = metric_name.into();
471        // force cache reload
472        self.fetch_metrics().await?;
473        // by default we treat not found as 0 (same in v1)
474        self.metric(&metric_name, true).await
475    }
476
477    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
478    /// metric name can be:
479    /// with prefix (e.g: 'polkadot_')
480    /// with chain attribute (e.g: 'chain=rococo-local')
481    /// without prefix and/or without chain attribute
482    ///
483    /// We first try to assert on the value using the cached metrics and
484    /// if not meet the criteria we reload the cache and check again
485    pub async fn assert(
486        &self,
487        metric_name: impl Into<String>,
488        value: impl Into<f64>,
489    ) -> Result<bool, anyhow::Error> {
490        let value: f64 = value.into();
491        self.assert_with(metric_name, |v| v == value).await
492    }
493
494    /// Assert on a metric value using a given predicate.
495    /// See [`NetworkNode::reports`] description for details on metric name.
496    pub async fn assert_with(
497        &self,
498        metric_name: impl Into<String>,
499        predicate: impl Fn(f64) -> bool,
500    ) -> Result<bool, anyhow::Error> {
501        let metric_name = metric_name.into();
502        // reload metrics
503        self.fetch_metrics().await?;
504        let val = self.metric(&metric_name, true).await?;
505        let log_msg = format!("🔎 Current value {val} passed to the predicated?");
506        if metric_name == PROCESS_START_TIME_METRIC {
507            trace!(target: MONITOR_TARGET, "{log_msg}");
508        } else {
509            trace!("{log_msg}");
510        }
511        Ok(predicate(val))
512    }
513
514    // Wait methods for metrics
515
516    /// Wait until a metric value pass the `predicate`
517    pub async fn wait_metric(
518        &self,
519        metric_name: impl Into<String>,
520        predicate: impl Fn(f64) -> bool,
521    ) -> Result<(), anyhow::Error> {
522        let metric_name = metric_name.into();
523        let log_msg = format!(
524            "[{}] waiting until metric {metric_name} pass the predicate",
525            self.name()
526        );
527        if metric_name == PROCESS_START_TIME_METRIC {
528            trace!(target: MONITOR_TARGET, "{log_msg}");
529        } else {
530            trace!("{log_msg}");
531        }
532
533        loop {
534            let res = self.assert_with(&metric_name, &predicate).await;
535            let log_msg = format!("res: {res:?}");
536            match res {
537                Ok(res) => {
538                    if res {
539                        return Ok(());
540                    }
541                },
542                Err(e) => match e.downcast::<reqwest::Error>() {
543                    Ok(io_err) => {
544                        if !skip_err_while_waiting(&io_err) {
545                            return Err(io_err.into());
546                        }
547                    },
548                    Err(other) => {
549                        match other.downcast::<NetworkNodeError>() {
550                            Ok(node_err) => {
551                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
552                                    return Err(node_err.into());
553                                }
554                            },
555                            Err(other) => return Err(other),
556                        };
557                    },
558                },
559            }
560
561            if metric_name == PROCESS_START_TIME_METRIC {
562                trace!(target: MONITOR_TARGET, "{log_msg}");
563            } else {
564                trace!("{log_msg}");
565            }
566
567            // sleep to not spam prometheus
568            tokio::time::sleep(Duration::from_secs(1)).await;
569        }
570    }
571
572    /// Wait until a metric value pass the `predicate`
573    /// with a timeout (secs)
574    pub async fn wait_metric_with_timeout(
575        &self,
576        metric_name: impl Into<String>,
577        predicate: impl Fn(f64) -> bool,
578        timeout_secs: impl Into<u64>,
579    ) -> Result<(), anyhow::Error> {
580        let metric_name = metric_name.into();
581        let secs = timeout_secs.into();
582        let log_msg = format!(
583            "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
584            self.name()
585        );
586
587        if metric_name == PROCESS_START_TIME_METRIC {
588            trace!(target: MONITOR_TARGET, "{log_msg}");
589        } else {
590            debug!("{log_msg}");
591        }
592
593        let res = tokio::time::timeout(
594            Duration::from_secs(secs),
595            self.wait_metric(&metric_name, predicate),
596        )
597        .await;
598
599        if let Ok(inner_res) = res {
600            match inner_res {
601                Ok(_) => Ok(()),
602                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
603            }
604        } else {
605            // timeout
606            Err(anyhow!(
607                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
608            ))
609        }
610    }
611
612    // Logs
613
614    /// Get the logs of the node
615    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
616    pub async fn logs(&self) -> Result<String, anyhow::Error> {
617        Ok(self.inner.logs().await?)
618    }
619
620    /// Wait until a the number of matching log lines is reach
621    pub async fn wait_log_line_count(
622        &self,
623        pattern: impl Into<String>,
624        is_glob: bool,
625        count: usize,
626    ) -> Result<(), anyhow::Error> {
627        let pattern = pattern.into();
628        let pattern_clone = pattern.clone();
629        debug!("waiting until we find pattern {pattern} {count} times");
630        let match_fn: BoxedClosure = if is_glob {
631            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
632        } else {
633            let re = Regex::new(&pattern)?;
634            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
635        };
636
637        loop {
638            let mut q = 0_usize;
639            let logs = self.logs().await?;
640            for line in logs.lines() {
641                trace!("line is {line}");
642                if match_fn(line)? {
643                    trace!("pattern {pattern_clone} match in line {line}");
644                    q += 1;
645                    if q >= count {
646                        return Ok(());
647                    }
648                }
649            }
650
651            tokio::time::sleep(Duration::from_secs(2)).await;
652        }
653    }
654
655    /// Waits until the number of matching log lines satisfies a custom condition,
656    /// optionally waiting for the entire duration of the timeout.
657    ///
658    /// This method searches log lines for a given substring or glob pattern,
659    /// and evaluates the number of matching lines using a user-provided predicate function.
660    /// Optionally, it can wait for the full timeout duration to ensure the condition
661    /// holds consistently (e.g., for verifying absence of logs).
662    ///
663    /// # Arguments
664    /// * `substring` - The substring or pattern to match within log lines.
665    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
666    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
667    ///
668    /// # Returns
669    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
670    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
671    /// * `Err(e)` if an error occurred during log retrieval or matching.
672    ///
673    /// # Example
674    /// ```rust
675    /// # use std::{sync::Arc, time::Duration};
676    /// # use provider::NativeProvider;
677    /// # use support::{fs::local::LocalFileSystem};
678    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
679    /// # use configuration::NetworkConfig;
680    /// # async fn example() -> Result<(), anyhow::Error> {
681    /// #   let provider = NativeProvider::new(LocalFileSystem {});
682    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
683    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
684    /// #   let network = orchestrator.spawn(config).await?;
685    /// let node = network.get_node("alice")?;
686    /// // Wait (up to 10 seconds) until pattern occurs once
687    /// let options = LogLineCountOptions {
688    ///     predicate: Arc::new(|count| count == 1),
689    ///     timeout: Duration::from_secs(10),
690    ///     wait_until_timeout_elapses: false,
691    /// };
692    /// let result = node
693    ///     .wait_log_line_count_with_timeout("error", false, options)
694    ///     .await?;
695    /// #   Ok(())
696    /// # }
697    /// ```
698    pub async fn wait_log_line_count_with_timeout(
699        &self,
700        substring: impl Into<String>,
701        is_glob: bool,
702        options: LogLineCountOptions,
703    ) -> Result<LogLineCount, anyhow::Error> {
704        let substring = substring.into();
705        debug!(
706            "waiting until match lines count within {} seconds",
707            options.timeout.as_secs_f64()
708        );
709
710        let start = tokio::time::Instant::now();
711
712        let match_fn: BoxedClosure = if is_glob {
713            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
714        } else {
715            let re = Regex::new(&substring)?;
716            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
717        };
718
719        if options.wait_until_timeout_elapses {
720            tokio::time::sleep(options.timeout).await;
721        }
722
723        let mut q;
724        loop {
725            q = 0_u32;
726            let logs = self.logs().await?;
727            for line in logs.lines() {
728                if match_fn(line)? {
729                    q += 1;
730
731                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
732                    // end after the whole log file is processed. This is to address the cases when the
733                    // predicate becomes true and false again.
734                    // eg. expected exactly 2 matching lines are expected but 3 are present
735                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
736                        return Ok(LogLineCount::TargetReached(q));
737                    }
738                }
739            }
740
741            if start.elapsed() >= options.timeout {
742                break;
743            }
744
745            tokio::time::sleep(Duration::from_secs(2)).await;
746        }
747
748        if (options.predicate)(q) {
749            Ok(LogLineCount::TargetReached(q))
750        } else {
751            Ok(LogLineCount::TargetFailed(q))
752        }
753    }
754
755    /// Waits until the number of matching log lines satisfies a custom condition,
756    /// optionally waiting for the entire duration of the timeout.
757    ///
758    /// This method searches log lines for a given substring or glob pattern,
759    /// and evaluates the number of matching lines using a user-provided predicate function.
760    /// Optionally, it can wait for the full timeout duration to ensure the condition
761    /// holds consistently (e.g., for verifying absence of logs).
762    ///
763    /// # Arguments
764    /// * `substring` - The substring or pattern to match within log lines.
765    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
766    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
767    ///
768    /// # Returns
769    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
770    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
771    /// * `Err(e)` if an error occurred during log retrieval or matching.
772    ///
773    /// # Example
774    /// ```rust
775    /// # use std::{sync::Arc, time::Duration};
776    /// # use provider::NativeProvider;
777    /// # use support::{fs::local::LocalFileSystem};
778    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
779    /// # use configuration::NetworkConfig;
780    /// # async fn example() -> Result<(), anyhow::Error> {
781    /// #   let provider = NativeProvider::new(LocalFileSystem {});
782    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
783    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
784    /// #   let network = orchestrator.spawn(config).await?;
785    /// let node = network.get_node("alice")?;
786    /// // Wait (up to 10 seconds) until pattern occurs once
787    /// let options = LogLineCountOptions {
788    ///     predicate: Arc::new(|count| count == 1),
789    ///     timeout: Duration::from_secs(10),
790    ///     wait_until_timeout_elapses: false,
791    /// };
792    /// let result = node
793    ///     .wait_log_line_count_with_timeout("error", false, options)
794    ///     .await?;
795    /// #   Ok(())
796    /// # }
797    /// ```
798    pub async fn wait_event_count_with_timeout(
799        &self,
800        pallet: impl Into<String>,
801        variant: impl Into<String>,
802        options: CountOptions,
803    ) -> Result<WaitCount, anyhow::Error> {
804        let pallet = pallet.into();
805        let variant = variant.into();
806        debug!(
807            "waiting until match event ({pallet} {variant}) count within {} seconds",
808            options.timeout.as_secs_f64()
809        );
810
811        let init_value = Arc::new(AtomicU32::new(0));
812
813        let res = tokio::time::timeout(
814            options.timeout,
815            self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
816        )
817        .await;
818
819        let q = init_value.load(Ordering::Relaxed);
820        if let Ok(inner_res) = res {
821            match inner_res {
822                Ok(_) => Ok(WaitCount::TargetReached(q)),
823                Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
824            }
825        } else {
826            // timeout
827            if options.wait_until_timeout_elapses {
828                let q = init_value.load(Ordering::Relaxed);
829                if (options.predicate)(q) {
830                    Ok(LogLineCount::TargetReached(q))
831                } else {
832                    Ok(LogLineCount::TargetFailed(q))
833                }
834            } else {
835                Err(anyhow!(
836                    "Timeout ({}), waiting for counter",
837                    options.timeout.as_secs()
838                ))
839            }
840        }
841    }
842
843    //
844    async fn wait_event_count(
845        &self,
846        pallet: &str,
847        variant: &str,
848        options: &CountOptions,
849        init_count: Arc<AtomicU32>,
850    ) -> Result<(), anyhow::Error> {
851        let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
852        let mut blocks_sub: subxt::backend::StreamOf<
853            Result<
854                subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
855                subxt::Error,
856            >,
857        > = client.blocks().subscribe_finalized().await?;
858        while let Some(block) = blocks_sub.next().await {
859            let events = block?.events().await?;
860            for event in events.iter() {
861                let evt = event?;
862                if evt.pallet_name() == pallet && evt.variant_name() == variant {
863                    let old_value = init_count.fetch_add(1, Ordering::Relaxed);
864                    if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
865                        return Ok(());
866                    }
867                }
868            }
869        }
870
871        Ok(())
872    }
873
874    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
875        let response = reqwest::get(&self.prometheus_uri).await?;
876        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
877        let mut cache = self.metrics_cache.write().await;
878        *cache = metrics;
879        Ok(())
880    }
881
882    /// Query individual metric by name
883    async fn metric(
884        &self,
885        metric_name: &str,
886        treat_not_found_as_zero: bool,
887    ) -> Result<f64, anyhow::Error> {
888        let mut metrics_map = self.metrics_cache.read().await;
889        if metrics_map.is_empty() {
890            // reload metrics
891            drop(metrics_map);
892            self.fetch_metrics().await?;
893            metrics_map = self.metrics_cache.read().await;
894        }
895
896        if let Some(val) = metrics_map.get(metric_name) {
897            Ok(*val)
898        } else if treat_not_found_as_zero {
899            Ok(0_f64)
900        } else {
901            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
902        }
903    }
904
905    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
906    ///
907    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
908    /// and calculating the count of observations in each bucket. It automatically appends
909    /// `_bucket` suffix to the metric name if not already present.
910    ///
911    /// # Arguments
912    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
913    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
914    ///
915    /// # Returns
916    /// A HashMap where keys are the `le` bucket boundaries as strings,
917    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
918    ///
919    /// # Example
920    /// ```ignore
921    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
922    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
923    /// ```
924    pub async fn get_histogram_buckets(
925        &self,
926        metric_name: impl AsRef<str>,
927        label_filters: Option<HashMap<String, String>>,
928    ) -> Result<HashMap<String, u64>, anyhow::Error> {
929        let metric_name = metric_name.as_ref();
930
931        // Fetch and parse metrics using the existing parser
932        let response = reqwest::get(&self.prometheus_uri).await?;
933        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
934
935        // Ensure metric name has _bucket suffix
936        let resolved_metric_name = if metric_name.contains("_bucket") {
937            metric_name.to_string()
938        } else {
939            format!("{metric_name}_bucket")
940        };
941
942        // First pass: collect all matching metrics with their label counts
943        // to identify which ones have the most complete label sets
944        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
945        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
946
947        for (key, &value) in metrics.iter() {
948            if !key.starts_with(&resolved_metric_name) {
949                continue;
950            }
951
952            let remaining = &key[resolved_metric_name.len()..];
953
954            let labels_str = &remaining[1..remaining.len() - 1];
955            let parsed_labels = Self::parse_label_string(labels_str);
956
957            // Must have "le" label
958            if !parsed_labels.contains_key("le") {
959                continue;
960            }
961
962            // Check if label filters match
963            if let Some(ref filters) = label_filters {
964                let mut all_match = true;
965                for (filter_key, filter_value) in filters {
966                    if parsed_labels.get(filter_key) != Some(filter_value) {
967                        all_match = false;
968                        break;
969                    }
970                }
971                if !all_match {
972                    continue;
973                }
974            }
975
976            metric_entries.push((key.clone(), parsed_labels, value as u64));
977        }
978
979        // Find the maximum number of labels (excluding "le") across all entries
980        // This helps us identify the "fullest" version of each metric
981        let max_label_count = metric_entries
982            .iter()
983            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
984            .max()
985            .unwrap_or(0);
986
987        // Second pass: collect buckets, deduplicating and preferring entries with more labels
988        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
989        let mut seen_le_values = HashSet::new();
990        let mut active_series: Option<Vec<(String, String)>> = None;
991
992        for (_, parsed_labels, value) in metric_entries {
993            let le_value = parsed_labels.get("le").unwrap().clone();
994
995            // Get non-"le" labels
996            let mut non_le_labels: Vec<(String, String)> = parsed_labels
997                .iter()
998                .filter(|(k, _)| k.as_str() != "le")
999                .map(|(k, v)| (k.clone(), v.clone()))
1000                .collect();
1001            non_le_labels.sort();
1002
1003            // Only process entries that have the maximum number of labels
1004            // (this filters out the parser's duplicate keys with fewer labels)
1005            if non_le_labels.len() < max_label_count {
1006                continue;
1007            }
1008
1009            // Detect series changes
1010            if let Some(ref prev_series) = active_series {
1011                if prev_series != &non_le_labels {
1012                    if !raw_buckets.is_empty() {
1013                        break; // Stop at first series change
1014                    }
1015                    active_series = Some(non_le_labels.clone());
1016                    seen_le_values.clear();
1017                }
1018            } else {
1019                active_series = Some(non_le_labels.clone());
1020            }
1021
1022            // Deduplicate by le value within this series
1023            if !seen_le_values.insert(le_value.clone()) {
1024                continue;
1025            }
1026
1027            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
1028            raw_buckets.push((le_value, value));
1029        }
1030
1031        // Sort buckets by their "le" values
1032        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
1033
1034        // Calculate deltas between cumulative buckets
1035        let mut buckets = HashMap::new();
1036        let mut previous_value = 0_u64;
1037        for (le, cumulative_count) in raw_buckets {
1038            if cumulative_count < previous_value {
1039                warn!(
1040                    "Warning: bucket count decreased from {} to {} at le={}",
1041                    previous_value, cumulative_count, le
1042                );
1043            }
1044            let delta = cumulative_count.saturating_sub(previous_value);
1045            buckets.insert(le, delta);
1046            previous_value = cumulative_count;
1047        }
1048
1049        Ok(buckets)
1050    }
1051
1052    /// Parse label string from parsed metric key.
1053    ///
1054    /// Takes a label string in the format `key1="value1",key2="value2"`
1055    /// and returns a HashMap of key-value pairs.
1056    /// Handles commas inside quoted values correctly.
1057    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
1058        let mut labels = HashMap::new();
1059        let mut current_key = String::new();
1060        let mut current_value = String::new();
1061        let mut in_value = false;
1062        let mut in_quotes = false;
1063
1064        for ch in labels_str.chars() {
1065            match ch {
1066                '=' if !in_quotes && !in_value => {
1067                    in_value = true;
1068                },
1069                '"' if in_value => {
1070                    in_quotes = !in_quotes;
1071                },
1072                ',' if !in_quotes => {
1073                    // End of key-value pair
1074                    if !current_key.is_empty() {
1075                        labels.insert(
1076                            current_key.trim().to_string(),
1077                            current_value.trim().to_string(),
1078                        );
1079                        current_key.clear();
1080                        current_value.clear();
1081                        in_value = false;
1082                    }
1083                },
1084                _ => {
1085                    if in_value {
1086                        current_value.push(ch);
1087                    } else {
1088                        current_key.push(ch);
1089                    }
1090                },
1091            }
1092        }
1093
1094        // Insert last pair
1095        if !current_key.is_empty() {
1096            labels.insert(
1097                current_key.trim().to_string(),
1098                current_value.trim().to_string(),
1099            );
1100        }
1101
1102        labels
1103    }
1104
1105    /// Compare two histogram bucket boundary values for sorting.
1106    ///
1107    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
1108    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
1109        use std::cmp::Ordering;
1110
1111        // Handle +Inf specially
1112        match (a, b) {
1113            ("+Inf", "+Inf") => Ordering::Equal,
1114            ("+Inf", _) => Ordering::Greater,
1115            (_, "+Inf") => Ordering::Less,
1116            _ => {
1117                // Try to parse as f64 for numeric comparison
1118                match (a.parse::<f64>(), b.parse::<f64>()) {
1119                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
1120                    // Fallback to string comparison if parsing fails
1121                    _ => a.cmp(b),
1122                }
1123            },
1124        }
1125    }
1126
1127    /// Waits given number of seconds until node reports that it is up and running, which
1128    /// is determined by metric 'process_start_time_seconds', which should appear,
1129    /// when node finished booting up.
1130    ///
1131    ///
1132    /// # Arguments
1133    /// * `timeout_secs` - The number of seconds to wait.
1134    ///
1135    /// # Returns
1136    /// * `Ok()` if the node is up before timeout occured.
1137    /// * `Err(e)` if timeout or other error occurred while waiting.
1138    pub async fn wait_until_is_up(
1139        &self,
1140        timeout_secs: impl Into<u64>,
1141    ) -> Result<(), anyhow::Error> {
1142        self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1143            .await
1144            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1145    }
1146}
1147
1148impl std::fmt::Debug for NetworkNode {
1149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1150        f.debug_struct("NetworkNode")
1151            .field("inner", &"inner_skipped")
1152            .field("spec", &self.spec)
1153            .field("name", &self.name)
1154            .field("ws_uri", &self.ws_uri)
1155            .field("prometheus_uri", &self.prometheus_uri)
1156            .finish()
1157    }
1158}
1159
1160fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1161where
1162    S: Serializer,
1163{
1164    erased_serde::serialize(node.as_ref(), serializer)
1165}
1166
1167// TODO: mock and impl more unit tests
1168#[cfg(test)]
1169mod tests {
1170    use std::{
1171        path::{Path, PathBuf},
1172        sync::{Arc, Mutex},
1173    };
1174
1175    use async_trait::async_trait;
1176    use provider::{types::*, ProviderError, ProviderNode};
1177
1178    use super::*;
1179
1180    #[derive(Serialize)]
1181    struct MockNode {
1182        logs: Arc<Mutex<Vec<String>>>,
1183    }
1184
1185    impl MockNode {
1186        fn new() -> Self {
1187            Self {
1188                logs: Arc::new(Mutex::new(vec![])),
1189            }
1190        }
1191
1192        fn logs_push(&self, lines: Vec<impl Into<String>>) {
1193            self.logs
1194                .lock()
1195                .unwrap()
1196                .extend(lines.into_iter().map(|l| l.into()));
1197        }
1198    }
1199
1200    #[async_trait]
1201    impl ProviderNode for MockNode {
1202        fn name(&self) -> &str {
1203            todo!()
1204        }
1205
1206        fn args(&self) -> Vec<&str> {
1207            todo!()
1208        }
1209
1210        fn base_dir(&self) -> &PathBuf {
1211            todo!()
1212        }
1213
1214        fn config_dir(&self) -> &PathBuf {
1215            todo!()
1216        }
1217
1218        fn data_dir(&self) -> &PathBuf {
1219            todo!()
1220        }
1221
1222        fn relay_data_dir(&self) -> &PathBuf {
1223            todo!()
1224        }
1225
1226        fn scripts_dir(&self) -> &PathBuf {
1227            todo!()
1228        }
1229
1230        fn log_path(&self) -> &PathBuf {
1231            todo!()
1232        }
1233
1234        fn log_cmd(&self) -> String {
1235            todo!()
1236        }
1237
1238        fn path_in_node(&self, _file: &Path) -> PathBuf {
1239            todo!()
1240        }
1241
1242        async fn logs(&self) -> Result<String, ProviderError> {
1243            Ok(self.logs.lock().unwrap().join("\n"))
1244        }
1245
1246        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1247            todo!()
1248        }
1249
1250        async fn run_command(
1251            &self,
1252            _options: RunCommandOptions,
1253        ) -> Result<ExecutionResult, ProviderError> {
1254            todo!()
1255        }
1256
1257        async fn run_script(
1258            &self,
1259            _options: RunScriptOptions,
1260        ) -> Result<ExecutionResult, ProviderError> {
1261            todo!()
1262        }
1263
1264        async fn send_file(
1265            &self,
1266            _local_file_path: &Path,
1267            _remote_file_path: &Path,
1268            _mode: &str,
1269        ) -> Result<(), ProviderError> {
1270            todo!()
1271        }
1272
1273        async fn receive_file(
1274            &self,
1275            _remote_file_path: &Path,
1276            _local_file_path: &Path,
1277        ) -> Result<(), ProviderError> {
1278            todo!()
1279        }
1280
1281        async fn pause(&self) -> Result<(), ProviderError> {
1282            todo!()
1283        }
1284
1285        async fn resume(&self) -> Result<(), ProviderError> {
1286            todo!()
1287        }
1288
1289        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1290            todo!()
1291        }
1292
1293        async fn restart_with(
1294            &self,
1295            _assets: &[AssetLocation],
1296            _cmd: &str,
1297            _args: &[String],
1298            _after: Option<Duration>,
1299        ) -> Result<(), ProviderError> {
1300            todo!()
1301        }
1302
1303        async fn destroy(&self) -> Result<(), ProviderError> {
1304            todo!()
1305        }
1306
1307        async fn snapshot_db(&self, _: bool) -> Result<InnerSnapshotDb, ProviderError> {
1308            todo!()
1309        }
1310    }
1311
1312    #[tokio::test(flavor = "multi_thread")]
1313    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1314        let mock_provider = Arc::new(MockNode::new());
1315        let mock_node = NetworkNode::new(
1316            "node1",
1317            "ws_uri",
1318            "prometheus_uri",
1319            "multiaddr",
1320            NodeSpec::default(),
1321            mock_provider.clone(),
1322            GenCmdOptions::default(),
1323            NodeContext::Rc,
1324        );
1325
1326        mock_provider.logs_push(vec![
1327            "system booting",
1328            "stub line 1",
1329            "stub line 2",
1330            "system ready",
1331        ]);
1332
1333        // Wait (up to 10 seconds) until pattern occurs once
1334        let options = LogLineCountOptions {
1335            predicate: Arc::new(|n| n == 1),
1336            timeout: Duration::from_secs(10),
1337            wait_until_timeout_elapses: false,
1338        };
1339
1340        let log_line_count = mock_node
1341            .wait_log_line_count_with_timeout("system ready", false, options)
1342            .await?;
1343
1344        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1345
1346        Ok(())
1347    }
1348
1349    #[tokio::test(flavor = "multi_thread")]
1350    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1351        let mock_provider = Arc::new(MockNode::new());
1352        let mock_node = NetworkNode::new(
1353            "node1",
1354            "ws_uri",
1355            "prometheus_uri",
1356            "multiaddr",
1357            NodeSpec::default(),
1358            mock_provider.clone(),
1359            GenCmdOptions::default(),
1360            NodeContext::Rc,
1361        );
1362
1363        mock_provider.logs_push(vec![
1364            "system booting",
1365            "stub line 1",
1366            "stub line 2",
1367            "system ready",
1368        ]);
1369
1370        // Wait (up to 4 seconds) until pattern occurs twice
1371        let options = LogLineCountOptions {
1372            predicate: Arc::new(|n| n == 2),
1373            timeout: Duration::from_secs(4),
1374            wait_until_timeout_elapses: false,
1375        };
1376
1377        let task = tokio::spawn({
1378            async move {
1379                mock_node
1380                    .wait_log_line_count_with_timeout("system ready", false, options)
1381                    .await
1382                    .unwrap()
1383            }
1384        });
1385
1386        tokio::time::sleep(Duration::from_secs(2)).await;
1387
1388        mock_provider.logs_push(vec!["system ready"]);
1389
1390        let log_line_count = task.await?;
1391
1392        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1393
1394        Ok(())
1395    }
1396
1397    #[tokio::test(flavor = "multi_thread")]
1398    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1399        let mock_provider = Arc::new(MockNode::new());
1400        let mock_node = NetworkNode::new(
1401            "node1",
1402            "ws_uri",
1403            "prometheus_uri",
1404            "multiaddr",
1405            NodeSpec::default(),
1406            mock_provider.clone(),
1407            GenCmdOptions::default(),
1408            NodeContext::Rc,
1409        );
1410
1411        mock_provider.logs_push(vec![
1412            "system booting",
1413            "stub line 1",
1414            "stub line 2",
1415            "system ready",
1416        ]);
1417
1418        // Wait (up to 2 seconds) until pattern occurs twice
1419        let options = LogLineCountOptions {
1420            predicate: Arc::new(|n| n == 2),
1421            timeout: Duration::from_secs(2),
1422            wait_until_timeout_elapses: false,
1423        };
1424
1425        let log_line_count = mock_node
1426            .wait_log_line_count_with_timeout("system ready", false, options)
1427            .await?;
1428
1429        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1430
1431        Ok(())
1432    }
1433
1434    #[tokio::test(flavor = "multi_thread")]
1435    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1436        let mock_provider = Arc::new(MockNode::new());
1437        let mock_node = NetworkNode::new(
1438            "node1",
1439            "ws_uri",
1440            "prometheus_uri",
1441            "multiaddr",
1442            NodeSpec::default(),
1443            mock_provider.clone(),
1444            GenCmdOptions::default(),
1445            NodeContext::Rc,
1446        );
1447
1448        mock_provider.logs_push(vec![
1449            "system booting",
1450            "stub line 1",
1451            "stub line 2",
1452            "system ready",
1453        ]);
1454
1455        // Wait until timeout and check if pattern occurs exactly twice
1456        let options = LogLineCountOptions {
1457            predicate: Arc::new(|n| n == 2),
1458            timeout: Duration::from_secs(2),
1459            wait_until_timeout_elapses: true,
1460        };
1461
1462        let task = tokio::spawn({
1463            async move {
1464                mock_node
1465                    .wait_log_line_count_with_timeout("system ready", false, options)
1466                    .await
1467                    .unwrap()
1468            }
1469        });
1470
1471        tokio::time::sleep(Duration::from_secs(1)).await;
1472
1473        mock_provider.logs_push(vec!["system ready"]);
1474        mock_provider.logs_push(vec!["system ready"]);
1475
1476        let log_line_count = task.await?;
1477
1478        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1479
1480        Ok(())
1481    }
1482
1483    #[tokio::test(flavor = "multi_thread")]
1484    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1485        let mock_provider = Arc::new(MockNode::new());
1486        let mock_node = NetworkNode::new(
1487            "node1",
1488            "ws_uri",
1489            "prometheus_uri",
1490            "multiaddr",
1491            NodeSpec::default(),
1492            mock_provider.clone(),
1493            GenCmdOptions::default(),
1494            NodeContext::Rc,
1495        );
1496
1497        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1498
1499        let task = tokio::spawn({
1500            async move {
1501                mock_node
1502                    .wait_log_line_count_with_timeout(
1503                        "system ready",
1504                        false,
1505                        // Wait until timeout and make sure pattern occurred zero times
1506                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1507                    )
1508                    .await
1509                    .unwrap()
1510            }
1511        });
1512
1513        tokio::time::sleep(Duration::from_secs(1)).await;
1514
1515        mock_provider.logs_push(vec!["stub line 3"]);
1516
1517        assert!(task.await?.success());
1518
1519        Ok(())
1520    }
1521
1522    #[tokio::test(flavor = "multi_thread")]
1523    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1524        let mock_provider = Arc::new(MockNode::new());
1525        let mock_node = NetworkNode::new(
1526            "node1",
1527            "ws_uri",
1528            "prometheus_uri",
1529            "multiaddr",
1530            NodeSpec::default(),
1531            mock_provider.clone(),
1532            GenCmdOptions::default(),
1533            NodeContext::Rc,
1534        );
1535
1536        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1537
1538        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1539        let options = LogLineCountOptions {
1540            predicate: Arc::new(|n| (2..=5).contains(&n)),
1541            timeout: Duration::from_secs(2),
1542            wait_until_timeout_elapses: true,
1543        };
1544
1545        let task = tokio::spawn({
1546            async move {
1547                mock_node
1548                    .wait_log_line_count_with_timeout("system ready", false, options)
1549                    .await
1550                    .unwrap()
1551            }
1552        });
1553
1554        tokio::time::sleep(Duration::from_secs(1)).await;
1555
1556        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1557
1558        assert!(task.await?.success());
1559
1560        Ok(())
1561    }
1562
1563    #[tokio::test(flavor = "multi_thread")]
1564    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1565        let mock_provider = Arc::new(MockNode::new());
1566        let mock_node = NetworkNode::new(
1567            "node1",
1568            "ws_uri",
1569            "prometheus_uri",
1570            "multiaddr",
1571            NodeSpec::default(),
1572            mock_provider.clone(),
1573            GenCmdOptions::default(),
1574            NodeContext::Rc,
1575        );
1576
1577        mock_provider.logs_push(vec![
1578            "system booting",
1579            "stub line 1",
1580            // this line should not match
1581            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1582            "stub line 2"
1583        ]);
1584
1585        let options = LogLineCountOptions {
1586            predicate: Arc::new(|n| n == 1),
1587            timeout: Duration::from_secs(3),
1588            wait_until_timeout_elapses: true,
1589        };
1590
1591        let task = tokio::spawn({
1592            async move {
1593                mock_node
1594                    .wait_log_line_count_with_timeout(
1595                        "error(?! importing block .*: block has an unknown parent)",
1596                        false,
1597                        options,
1598                    )
1599                    .await
1600                    .unwrap()
1601            }
1602        });
1603
1604        tokio::time::sleep(Duration::from_secs(1)).await;
1605
1606        mock_provider.logs_push(vec![
1607            "system ready",
1608            // this line should match
1609            "system error",
1610            "system ready",
1611        ]);
1612
1613        assert!(task.await?.success());
1614
1615        Ok(())
1616    }
1617
1618    #[tokio::test(flavor = "multi_thread")]
1619    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1620    ) -> Result<(), anyhow::Error> {
1621        let mock_provider = Arc::new(MockNode::new());
1622        let mock_node = NetworkNode::new(
1623            "node1",
1624            "ws_uri",
1625            "prometheus_uri",
1626            "multiaddr",
1627            NodeSpec::default(),
1628            mock_provider.clone(),
1629            GenCmdOptions::default(),
1630            NodeContext::Rc,
1631        );
1632
1633        mock_provider.logs_push(vec![
1634            "system booting",
1635            "stub line 1",
1636            // this line should not match
1637            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1638            "stub line 2"
1639        ]);
1640
1641        let options = LogLineCountOptions {
1642            predicate: Arc::new(|n| n == 1),
1643            timeout: Duration::from_secs(6),
1644            wait_until_timeout_elapses: true,
1645        };
1646
1647        let task = tokio::spawn({
1648            async move {
1649                mock_node
1650                    .wait_log_line_count_with_timeout(
1651                        "error(?! importing block .*: block has an unknown parent)",
1652                        false,
1653                        options,
1654                    )
1655                    .await
1656                    .unwrap()
1657            }
1658        });
1659
1660        tokio::time::sleep(Duration::from_secs(1)).await;
1661
1662        mock_provider.logs_push(vec!["system ready", "system ready"]);
1663
1664        assert!(!task.await?.success());
1665
1666        Ok(())
1667    }
1668
1669    #[tokio::test(flavor = "multi_thread")]
1670    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1671        let mock_provider = Arc::new(MockNode::new());
1672        let mock_node = NetworkNode::new(
1673            "node1",
1674            "ws_uri",
1675            "prometheus_uri",
1676            "multiaddr",
1677            NodeSpec::default(),
1678            mock_provider.clone(),
1679            GenCmdOptions::default(),
1680            NodeContext::Rc,
1681        );
1682
1683        mock_provider.logs_push(vec![
1684            "system booting",
1685            "stub line 1",
1686            // this line should not match
1687            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1688            "stub line 2"
1689        ]);
1690
1691        let task = tokio::spawn({
1692            async move {
1693                mock_node
1694                    .wait_log_line_count(
1695                        "error(?! importing block .*: block has an unknown parent)",
1696                        false,
1697                        1,
1698                    )
1699                    .await
1700                    .unwrap()
1701            }
1702        });
1703
1704        tokio::time::sleep(Duration::from_secs(1)).await;
1705
1706        mock_provider.logs_push(vec![
1707            "system ready",
1708            // this line should match
1709            "system error",
1710            "system ready",
1711        ]);
1712
1713        assert!(task.await.is_ok());
1714
1715        Ok(())
1716    }
1717
1718    #[tokio::test(flavor = "multi_thread")]
1719    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1720        let mock_provider = Arc::new(MockNode::new());
1721        let mock_node = NetworkNode::new(
1722            "node1",
1723            "ws_uri",
1724            "prometheus_uri",
1725            "multiaddr",
1726            NodeSpec::default(),
1727            mock_provider.clone(),
1728            GenCmdOptions::default(),
1729            NodeContext::Rc,
1730        );
1731
1732        mock_provider.logs_push(vec![
1733            "system booting",
1734            "stub line 1",
1735            // this line should not match
1736            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1737            "stub line 2"
1738        ]);
1739
1740        let options = LogLineCountOptions {
1741            predicate: Arc::new(|count| count == 1),
1742            timeout: Duration::from_secs(2),
1743            wait_until_timeout_elapses: true,
1744        };
1745
1746        let task = tokio::spawn({
1747            async move {
1748                // we expect no match, thus wait with timeout
1749                mock_node
1750                    .wait_log_line_count_with_timeout(
1751                        "error(?! importing block .*: block has an unknown parent)",
1752                        false,
1753                        options,
1754                    )
1755                    .await
1756                    .unwrap()
1757            }
1758        });
1759
1760        tokio::time::sleep(Duration::from_secs(1)).await;
1761
1762        mock_provider.logs_push(vec!["system ready", "system ready"]);
1763
1764        assert!(!task.await?.success());
1765
1766        Ok(())
1767    }
1768
1769    #[tokio::test]
1770    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1771        // This test uses a mock HTTP server to simulate Prometheus metrics
1772        use std::sync::Arc;
1773
1774        // Create a mock metrics response with proper HELP and TYPE comments
1775        let mock_metrics = concat!(
1776            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1777            "# TYPE substrate_block_verification_time histogram\n",
1778            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1779            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1780            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1781            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1782            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1783            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1784            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1785        );
1786
1787        // Start a mock HTTP server
1788        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1789        let addr = listener.local_addr()?;
1790        let metrics = Arc::new(mock_metrics.to_string());
1791
1792        tokio::spawn({
1793            let metrics = metrics.clone();
1794            async move {
1795                loop {
1796                    if let Ok((mut socket, _)) = listener.accept().await {
1797                        let metrics = metrics.clone();
1798                        tokio::spawn(async move {
1799                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1800                            let mut buffer = [0; 1024];
1801                            let _ = socket.read(&mut buffer).await;
1802
1803                            let response = format!(
1804                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1805                                metrics.len(),
1806                                metrics
1807                            );
1808                            let _ = socket.write_all(response.as_bytes()).await;
1809                        });
1810                    }
1811                }
1812            }
1813        });
1814
1815        // Create a NetworkNode with the mock prometheus URI
1816        let mock_provider = Arc::new(MockNode::new());
1817        let mock_node = NetworkNode::new(
1818            "test_node",
1819            "ws://localhost:9944",
1820            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1821            "/ip4/127.0.0.1/tcp/30333",
1822            NodeSpec::default(),
1823            mock_provider,
1824            GenCmdOptions::default(),
1825            NodeContext::Rc,
1826        );
1827
1828        // Get buckets with label filter
1829        let mut label_filters = HashMap::new();
1830        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1831        let buckets = mock_node
1832            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1833            .await?;
1834
1835        // Should get the rococo_local_testnet chain's buckets
1836        assert_eq!(buckets.get("0.1"), Some(&10));
1837        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1838        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1839        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1840        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1841
1842        // Get buckets with label filter for rococo
1843        let mut label_filters = std::collections::HashMap::new();
1844        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1845
1846        let buckets_filtered = mock_node
1847            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1848            .await?;
1849
1850        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1851        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1852
1853        // Test 3: Get buckets with _bucket suffix already present
1854        let buckets_with_suffix = mock_node
1855            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1856            .await?;
1857
1858        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1859
1860        Ok(())
1861    }
1862
1863    #[tokio::test]
1864    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1865        // Test that buckets are correctly sorted even when received out of order
1866        use std::sync::Arc;
1867
1868        let mock_metrics = concat!(
1869            "# HELP test_metric A test metric\n",
1870            "# TYPE test_metric histogram\n",
1871            "test_metric_bucket{le=\"2.5\"} 40\n",
1872            "test_metric_bucket{le=\"0.1\"} 10\n",
1873            "test_metric_bucket{le=\"+Inf\"} 42\n",
1874            "test_metric_bucket{le=\"1.0\"} 35\n",
1875            "test_metric_bucket{le=\"0.5\"} 25\n",
1876        );
1877
1878        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1879        let addr = listener.local_addr()?;
1880        let metrics = Arc::new(mock_metrics.to_string());
1881
1882        tokio::spawn({
1883            let metrics = metrics.clone();
1884            async move {
1885                loop {
1886                    if let Ok((mut socket, _)) = listener.accept().await {
1887                        let metrics = metrics.clone();
1888                        tokio::spawn(async move {
1889                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1890                            let mut buffer = [0; 1024];
1891                            let _ = socket.read(&mut buffer).await;
1892                            let response = format!(
1893                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1894                                metrics.len(),
1895                                metrics
1896                            );
1897                            let _ = socket.write_all(response.as_bytes()).await;
1898                        });
1899                    }
1900                }
1901            }
1902        });
1903
1904        let mock_provider = Arc::new(MockNode::new());
1905        let mock_node = NetworkNode::new(
1906            "test_node",
1907            "ws://localhost:9944",
1908            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1909            "/ip4/127.0.0.1/tcp/30333",
1910            NodeSpec::default(),
1911            mock_provider,
1912            GenCmdOptions::default(),
1913            NodeContext::Rc,
1914        );
1915
1916        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1917
1918        // Verify deltas are calculated correctly after sorting
1919        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1920        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1921        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1922        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1923        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1924
1925        Ok(())
1926    }
1927
1928    #[tokio::test]
1929    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1930        // Test label parsing with commas and special characters in values
1931        use std::sync::Arc;
1932
1933        let mock_metrics = concat!(
1934            "# HELP test_metric A test metric\n",
1935            "# TYPE test_metric histogram\n",
1936            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1937            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1938            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1939        );
1940
1941        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1942        let addr = listener.local_addr()?;
1943        let metrics = Arc::new(mock_metrics.to_string());
1944
1945        tokio::spawn({
1946            let metrics = metrics.clone();
1947            async move {
1948                loop {
1949                    if let Ok((mut socket, _)) = listener.accept().await {
1950                        let metrics = metrics.clone();
1951                        tokio::spawn(async move {
1952                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953                            let mut buffer = [0; 1024];
1954                            let _ = socket.read(&mut buffer).await;
1955                            let response = format!(
1956                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1957                                metrics.len(),
1958                                metrics
1959                            );
1960                            let _ = socket.write_all(response.as_bytes()).await;
1961                        });
1962                    }
1963                }
1964            }
1965        });
1966
1967        let mock_provider = Arc::new(MockNode::new());
1968        let mock_node = NetworkNode::new(
1969            "test_node",
1970            "ws://localhost:9944",
1971            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1972            "/ip4/127.0.0.1/tcp/30333",
1973            NodeSpec::default(),
1974            mock_provider,
1975            GenCmdOptions::default(),
1976            NodeContext::Rc,
1977        );
1978
1979        // Test without filter
1980        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1981        assert_eq!(buckets.get("0.1"), Some(&5));
1982        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1983        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1984
1985        // Test with filter containing comma in value
1986        let mut label_filters = std::collections::HashMap::new();
1987        label_filters.insert("method".to_string(), "GET,POST".to_string());
1988
1989        let buckets_filtered = mock_node
1990            .get_histogram_buckets("test_metric", Some(label_filters))
1991            .await?;
1992
1993        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1994        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1995
1996        Ok(())
1997    }
1998
1999    #[test]
2000    fn test_compare_le_values() {
2001        use std::cmp::Ordering;
2002
2003        use crate::network::node::NetworkNode;
2004
2005        // Numeric comparison
2006        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
2007        assert_eq!(
2008            NetworkNode::compare_le_values("1.0", "0.5"),
2009            Ordering::Greater
2010        );
2011        assert_eq!(
2012            NetworkNode::compare_le_values("1.0", "1.0"),
2013            Ordering::Equal
2014        );
2015
2016        // +Inf handling
2017        assert_eq!(
2018            NetworkNode::compare_le_values("+Inf", "999"),
2019            Ordering::Greater
2020        );
2021        assert_eq!(
2022            NetworkNode::compare_le_values("0.1", "+Inf"),
2023            Ordering::Less
2024        );
2025        assert_eq!(
2026            NetworkNode::compare_le_values("+Inf", "+Inf"),
2027            Ordering::Equal
2028        );
2029
2030        // Large numbers
2031        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
2032        assert_eq!(
2033            NetworkNode::compare_le_values("1000", "999"),
2034            Ordering::Greater
2035        );
2036    }
2037}