zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
5        Arc,
6    },
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use anyhow::anyhow;
11use configuration::types::{Arg, AssetLocation};
12use fancy_regex::Regex;
13use glob_match::glob_match;
14use prom_metrics_parser::MetricMap;
15use provider::{
16    types::{ExecutionResult, RunScriptOptions},
17    DynNode,
18};
19use serde::{Deserialize, Serialize, Serializer};
20use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
21use support::net::{skip_err_while_waiting, wait_ws_ready};
22use thiserror::Error;
23use tokio::sync::RwLock;
24use tracing::{debug, trace, warn};
25
26use crate::{
27    generators::{generate_node_command, generate_node_command_cumulus, GenCmdOptions},
28    network::NodeContext,
29    network_spec::node::NodeSpec,
30    shared::constants::PROCESS_START_TIME_METRIC,
31    tx_helper::client::get_client_from_url,
32};
33
34type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
35
36#[derive(Error, Debug)]
37pub enum NetworkNodeError {
38    #[error("metric '{0}' not found!")]
39    MetricNotFound(String),
40}
41
42// Log target for the internal monitor
43const MONITOR_TARGET: &str = "zombie_monitor";
44#[derive(Clone, Serialize)]
45pub struct NetworkNode {
46    #[serde(serialize_with = "serialize_provider_node")]
47    pub(crate) inner: DynNode,
48    // TODO: do we need the full spec here?
49    // Maybe a reduce set of values.
50    pub(crate) spec: NodeSpec,
51    pub(crate) name: String,
52    pub(crate) ws_uri: String,
53    pub(crate) multiaddr: String,
54    pub(crate) prometheus_uri: String,
55    // Store the option used to generate the cmd,
56    // since we can use it later for recalculate the (cmd, args) from a
57    // modified spec.
58    pub(crate) cmd_generator_opts: GenCmdOptions,
59    // Store the context used to generate this node.
60    pub(crate) context: NodeContext,
61    #[serde(skip)]
62    metrics_cache: Arc<RwLock<MetricMap>>,
63    #[serde(skip)]
64    is_running: Arc<AtomicBool>,
65    // Store the last timestamp when we start the node
66    #[serde(skip)]
67    last_start_ts: Arc<AtomicU64>,
68}
69
70#[derive(Deserialize)]
71pub(crate) struct RawNetworkNode {
72    pub(crate) name: String,
73    pub(crate) ws_uri: String,
74    pub(crate) prometheus_uri: String,
75    pub(crate) multiaddr: String,
76    pub(crate) spec: NodeSpec,
77    pub(crate) cmd_generator_opts: GenCmdOptions,
78    pub(crate) context: NodeContext,
79    #[serde(default)]
80    pub(crate) inner: serde_json::Value,
81}
82
83/// Result of waiting for a certain counter (number of log lines or events).
84///
85/// Indicates whether the log line count condition was met within the timeout period.
86///
87/// # Variants
88/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
89///     * `count`: The number of matching instances at the time of satisfaction.
90/// - `TargetFailed(count)` – The condition was not met within the timeout.
91///     * `count`: The final number of matching instances at timeout expiration.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum WaitCount {
94    TargetReached(u32),
95    TargetFailed(u32),
96}
97
98impl WaitCount {
99    pub fn success(&self) -> bool {
100        match self {
101            Self::TargetReached(..) => true,
102            Self::TargetFailed(..) => false,
103        }
104    }
105}
106
107pub type LogLineCount = WaitCount;
108
109/// Configuration for controlling  count waiting behavior.
110///
111/// Allows specifying a custom predicate on the number of matchs,
112/// a timeout in seconds, and whether the system should wait the entire timeout duration.
113///
114/// # Fields
115/// - `predicate`: A function that takes the current value (metric, log lines) and
116///   returns `true` if the condition is satisfied.
117/// - `timeout_secs`: Maximum number of seconds to wait.
118/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
119///   for the full timeout duration, even if the condition is already met early.
120///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
121#[derive(Clone)]
122pub struct CountOptions {
123    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
124    pub timeout: Duration,
125    pub wait_until_timeout_elapses: bool,
126}
127
128impl CountOptions {
129    pub fn new(
130        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
131        timeout: Duration,
132        wait_until_timeout_elapses: bool,
133    ) -> Self {
134        Self {
135            predicate: Arc::new(predicate),
136            timeout,
137            wait_until_timeout_elapses,
138        }
139    }
140
141    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
142        Self::new(|n| n == 0, timeout, true)
143    }
144
145    pub fn at_least_once(timeout: Duration) -> Self {
146        Self::new(|count| count >= 1, timeout, false)
147    }
148
149    pub fn at_least(target: u32, timeout: Duration) -> Self {
150        Self::new(move |count| count >= target, timeout, false)
151    }
152
153    pub fn exactly_once(timeout: Duration) -> Self {
154        Self::new(|count| count == 1, timeout, false)
155    }
156}
157
158pub type LogLineCountOptions = CountOptions;
159
160impl NetworkNode {
161    /// Create a new NetworkNode
162    #[allow(clippy::too_many_arguments)]
163    pub(crate) fn new<T: Into<String>>(
164        name: T,
165        ws_uri: T,
166        prometheus_uri: T,
167        multiaddr: T,
168        spec: NodeSpec,
169        inner: DynNode,
170        cmd_generator_opts: GenCmdOptions,
171        context: NodeContext,
172    ) -> Self {
173        Self {
174            name: name.into(),
175            ws_uri: ws_uri.into(),
176            prometheus_uri: prometheus_uri.into(),
177            inner,
178            spec,
179            cmd_generator_opts,
180            context,
181            multiaddr: multiaddr.into(),
182            metrics_cache: Arc::new(Default::default()),
183            is_running: Arc::new(AtomicBool::new(false)),
184            last_start_ts: Arc::new(AtomicU64::new(0)),
185        }
186    }
187
188    /// Check if the node is currently running (not paused).
189    ///
190    /// This returns the internal running state.
191    pub fn is_running(&self) -> bool {
192        self.is_running.load(Ordering::Acquire)
193    }
194
195    /// Get the last timestamp when the node start.
196    pub fn last_start_ts(&self) -> u64 {
197        self.last_start_ts.load(Ordering::Acquire)
198    }
199
200    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
201    ///
202    /// This performs an actual connection attempt with a short timeout (2 seconds).
203    /// Returns `true` if the node is reachable and responding, `false` otherwise.
204    ///
205    /// This is more robust than `is_running()` as it verifies the node is actually alive.
206    pub async fn is_responsive(&self) -> bool {
207        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
208            .await
209            .is_ok()
210    }
211
212    pub(crate) fn set_is_running(&self, is_running: bool) {
213        self.is_running.store(is_running, Ordering::Release);
214    }
215
216    /// Set the timestamp when the node was started
217    pub(crate) fn set_last_start_ts(&self, ts: u64) {
218        self.last_start_ts.store(ts, Ordering::Release);
219    }
220
221    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
222        self.multiaddr = multiaddr.into();
223    }
224
225    pub fn name(&self) -> &str {
226        &self.name
227    }
228
229    /// Args used for bootstrap the node.
230    /// NOTE: this may not be in sync if you restart the node with [`NetworkNode::restart_with`].
231    pub fn args(&self) -> Vec<&str> {
232        self.inner.args()
233    }
234
235    /// Only include the args provided by the user, filtering all
236    /// autogenerated by zombienet.
237    pub fn user_args(&self) -> Vec<String> {
238        self.spec
239            .args
240            .iter()
241            .fold(vec![], |acc, arg| [acc, arg.to_vec()].concat())
242    }
243
244    pub fn spec(&self) -> &NodeSpec {
245        &self.spec
246    }
247
248    pub fn ws_uri(&self) -> &str {
249        &self.ws_uri
250    }
251
252    pub fn multiaddr(&self) -> &str {
253        self.multiaddr.as_ref()
254    }
255
256    // Subxt
257
258    /// Get the rpc client for the node
259    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
260        get_client_from_url(&self.ws_uri).await
261    }
262
263    /// Get the [online client](subxt::client::OnlineClient) for the node
264    #[deprecated = "Use `wait_client` instead."]
265    pub async fn client<Config: subxt::Config>(
266        &self,
267    ) -> Result<OnlineClient<Config>, subxt::Error> {
268        self.try_client().await
269    }
270
271    /// Try to connect to the node.
272    ///
273    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
274    /// the node to appear before it connects to it. This function directly tries
275    /// to connect to the node and returns an error if the node is not yet available
276    /// at that point in time.
277    ///
278    /// Returns a [`OnlineClient`] on success.
279    pub async fn try_client<Config: subxt::Config>(
280        &self,
281    ) -> Result<OnlineClient<Config>, subxt::Error> {
282        get_client_from_url(&self.ws_uri).await
283    }
284
285    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
286    pub async fn wait_client<Config: subxt::Config>(
287        &self,
288    ) -> Result<OnlineClient<Config>, anyhow::Error> {
289        debug!("wait_client ws_uri: {}", self.ws_uri());
290        wait_ws_ready(self.ws_uri())
291            .await
292            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
293
294        self.try_client()
295            .await
296            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
297    }
298
299    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
300    pub async fn wait_client_with_timeout<Config: subxt::Config>(
301        &self,
302        timeout_secs: impl Into<u64>,
303    ) -> Result<OnlineClient<Config>, anyhow::Error> {
304        debug!("waiting until subxt client is ready");
305        tokio::time::timeout(
306            Duration::from_secs(timeout_secs.into()),
307            self.wait_client::<Config>(),
308        )
309        .await?
310    }
311
312    // Commands
313
314    /// Pause the node, this is implemented by pausing the
315    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
316    ///
317    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
318    /// with global setting `teardown_on_failure` disabled.
319    pub async fn pause(&self) -> Result<(), anyhow::Error> {
320        self.set_is_running(false);
321        self.inner.pause().await?;
322        Ok(())
323    }
324
325    /// Resume the node, this is implemented by resuming the
326    /// actual process (e.g polkadot) with sending `SIGCONT` signal
327    ///
328    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
329    /// with global setting `teardown_on_failure` disabled.
330    pub async fn resume(&self) -> Result<(), anyhow::Error> {
331        self.set_is_running(true);
332        self.inner.resume().await?;
333        Ok(())
334    }
335
336    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
337    ///
338    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
339    /// with global setting `teardown_on_failure` disabled.
340    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
341        self.set_is_running(false);
342        self.inner.restart(after).await?;
343        self.set_is_running(true);
344        self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
345        Ok(())
346    }
347
348    /// Restart the node using the optional provided:
349    /// - Assets: binaries to download
350    ///   NOTE: if you want to use a diff version of polkadot you need both the
351    ///   main bin (polkadot) and the workers (polkadot-execute-worker/polkadot-prepare-worker)
352    /// - cmd: program to exec.
353    /// - args: Arguments to override the ones provided in config.
354    ///
355    /// The node will be restarted with the same `env` and isolated dirrectory (e.g same database).
356    ///
357    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
358    /// with global setting `teardown_on_failure` disabled.
359    pub async fn restart_with(
360        &self,
361        assets: Vec<AssetLocation>,
362        program: Option<String>,
363        args: Option<Vec<Arg>>,
364        after: Option<Duration>,
365    ) -> Result<(), anyhow::Error> {
366        self.set_is_running(false);
367        let mut spec_cloned = self.spec.clone();
368
369        if let Some(args) = args {
370            spec_cloned.args = args;
371        }
372        if let Some(program) = program {
373            spec_cloned.command = program.as_str().try_into()?;
374        }
375
376        let (program, args) = match self.context {
377            NodeContext::Rc
378            | NodeContext::Para {
379                is_cumulus_based: false,
380                ..
381            } => generate_node_command(&spec_cloned, self.cmd_generator_opts.clone(), None),
382            NodeContext::Para {
383                para_id,
384                is_cumulus_based: true,
385            } => generate_node_command_cumulus(
386                &spec_cloned,
387                self.cmd_generator_opts.clone(),
388                para_id,
389            ),
390        };
391
392        self.inner
393            .restart_with(&assets, &program, &args, after)
394            .await?;
395        self.set_is_running(true);
396        self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
397        Ok(())
398    }
399
400    /// Run a script inside the node's container/environment
401    ///
402    /// The script will be uploaded to the node, made executable, and executed with
403    /// the provided arguments and environment variables.
404    ///
405    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
406    pub async fn run_script(
407        &self,
408        options: RunScriptOptions,
409    ) -> Result<ExecutionResult, anyhow::Error> {
410        self.inner
411            .run_script(options)
412            .await
413            .map_err(|e| anyhow!("Failed to run script: {e}"))
414    }
415
416    // Metrics assertions
417
418    /// Get metric value 'by name' from Prometheus (exposed by the node)
419    /// metric name can be:
420    /// with prefix (e.g: 'polkadot_')
421    /// with chain attribute (e.g: 'chain=rococo-local')
422    /// without prefix and/or without chain attribute
423    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
424        let metric_name = metric_name.into();
425        // force cache reload
426        self.fetch_metrics().await?;
427        // by default we treat not found as 0 (same in v1)
428        self.metric(&metric_name, true).await
429    }
430
431    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
432    /// metric name can be:
433    /// with prefix (e.g: 'polkadot_')
434    /// with chain attribute (e.g: 'chain=rococo-local')
435    /// without prefix and/or without chain attribute
436    ///
437    /// We first try to assert on the value using the cached metrics and
438    /// if not meet the criteria we reload the cache and check again
439    pub async fn assert(
440        &self,
441        metric_name: impl Into<String>,
442        value: impl Into<f64>,
443    ) -> Result<bool, anyhow::Error> {
444        let value: f64 = value.into();
445        self.assert_with(metric_name, |v| v == value).await
446    }
447
448    /// Assert on a metric value using a given predicate.
449    /// See [`NetworkNode::reports`] description for details on metric name.
450    pub async fn assert_with(
451        &self,
452        metric_name: impl Into<String>,
453        predicate: impl Fn(f64) -> bool,
454    ) -> Result<bool, anyhow::Error> {
455        let metric_name = metric_name.into();
456        // reload metrics
457        self.fetch_metrics().await?;
458        let val = self.metric(&metric_name, true).await?;
459        let log_msg = format!("🔎 Current value {val} passed to the predicated?");
460        if metric_name == PROCESS_START_TIME_METRIC {
461            trace!(target: MONITOR_TARGET, "{log_msg}");
462        } else {
463            trace!("{log_msg}");
464        }
465        Ok(predicate(val))
466    }
467
468    // Wait methods for metrics
469
470    /// Wait until a metric value pass the `predicate`
471    pub async fn wait_metric(
472        &self,
473        metric_name: impl Into<String>,
474        predicate: impl Fn(f64) -> bool,
475    ) -> Result<(), anyhow::Error> {
476        let metric_name = metric_name.into();
477        let log_msg = format!(
478            "[{}] waiting until metric {metric_name} pass the predicate",
479            self.name()
480        );
481        if metric_name == PROCESS_START_TIME_METRIC {
482            trace!(target: MONITOR_TARGET, "{log_msg}");
483        } else {
484            trace!("{log_msg}");
485        }
486
487        loop {
488            let res = self.assert_with(&metric_name, &predicate).await;
489            let log_msg = format!("res: {res:?}");
490            match res {
491                Ok(res) => {
492                    if res {
493                        return Ok(());
494                    }
495                },
496                Err(e) => match e.downcast::<reqwest::Error>() {
497                    Ok(io_err) => {
498                        if !skip_err_while_waiting(&io_err) {
499                            return Err(io_err.into());
500                        }
501                    },
502                    Err(other) => {
503                        match other.downcast::<NetworkNodeError>() {
504                            Ok(node_err) => {
505                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
506                                    return Err(node_err.into());
507                                }
508                            },
509                            Err(other) => return Err(other),
510                        };
511                    },
512                },
513            }
514
515            if metric_name == PROCESS_START_TIME_METRIC {
516                trace!(target: MONITOR_TARGET, "{log_msg}");
517            } else {
518                trace!("{log_msg}");
519            }
520
521            // sleep to not spam prometheus
522            tokio::time::sleep(Duration::from_secs(1)).await;
523        }
524    }
525
526    /// Wait until a metric value pass the `predicate`
527    /// with a timeout (secs)
528    pub async fn wait_metric_with_timeout(
529        &self,
530        metric_name: impl Into<String>,
531        predicate: impl Fn(f64) -> bool,
532        timeout_secs: impl Into<u64>,
533    ) -> Result<(), anyhow::Error> {
534        let metric_name = metric_name.into();
535        let secs = timeout_secs.into();
536        let log_msg = format!(
537            "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
538            self.name()
539        );
540
541        if metric_name == PROCESS_START_TIME_METRIC {
542            trace!(target: MONITOR_TARGET, "{log_msg}");
543        } else {
544            debug!("{log_msg}");
545        }
546
547        let res = tokio::time::timeout(
548            Duration::from_secs(secs),
549            self.wait_metric(&metric_name, predicate),
550        )
551        .await;
552
553        if let Ok(inner_res) = res {
554            match inner_res {
555                Ok(_) => Ok(()),
556                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
557            }
558        } else {
559            // timeout
560            Err(anyhow!(
561                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
562            ))
563        }
564    }
565
566    // Logs
567
568    /// Get the logs of the node
569    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
570    pub async fn logs(&self) -> Result<String, anyhow::Error> {
571        Ok(self.inner.logs().await?)
572    }
573
574    /// Wait until a the number of matching log lines is reach
575    pub async fn wait_log_line_count(
576        &self,
577        pattern: impl Into<String>,
578        is_glob: bool,
579        count: usize,
580    ) -> Result<(), anyhow::Error> {
581        let pattern = pattern.into();
582        let pattern_clone = pattern.clone();
583        debug!("waiting until we find pattern {pattern} {count} times");
584        let match_fn: BoxedClosure = if is_glob {
585            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
586        } else {
587            let re = Regex::new(&pattern)?;
588            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
589        };
590
591        loop {
592            let mut q = 0_usize;
593            let logs = self.logs().await?;
594            for line in logs.lines() {
595                trace!("line is {line}");
596                if match_fn(line)? {
597                    trace!("pattern {pattern_clone} match in line {line}");
598                    q += 1;
599                    if q >= count {
600                        return Ok(());
601                    }
602                }
603            }
604
605            tokio::time::sleep(Duration::from_secs(2)).await;
606        }
607    }
608
609    /// Waits until the number of matching log lines satisfies a custom condition,
610    /// optionally waiting for the entire duration of the timeout.
611    ///
612    /// This method searches log lines for a given substring or glob pattern,
613    /// and evaluates the number of matching lines using a user-provided predicate function.
614    /// Optionally, it can wait for the full timeout duration to ensure the condition
615    /// holds consistently (e.g., for verifying absence of logs).
616    ///
617    /// # Arguments
618    /// * `substring` - The substring or pattern to match within log lines.
619    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
620    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
621    ///
622    /// # Returns
623    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
624    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
625    /// * `Err(e)` if an error occurred during log retrieval or matching.
626    ///
627    /// # Example
628    /// ```rust
629    /// # use std::{sync::Arc, time::Duration};
630    /// # use provider::NativeProvider;
631    /// # use support::{fs::local::LocalFileSystem};
632    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
633    /// # use configuration::NetworkConfig;
634    /// # async fn example() -> Result<(), anyhow::Error> {
635    /// #   let provider = NativeProvider::new(LocalFileSystem {});
636    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
637    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
638    /// #   let network = orchestrator.spawn(config).await?;
639    /// let node = network.get_node("alice")?;
640    /// // Wait (up to 10 seconds) until pattern occurs once
641    /// let options = LogLineCountOptions {
642    ///     predicate: Arc::new(|count| count == 1),
643    ///     timeout: Duration::from_secs(10),
644    ///     wait_until_timeout_elapses: false,
645    /// };
646    /// let result = node
647    ///     .wait_log_line_count_with_timeout("error", false, options)
648    ///     .await?;
649    /// #   Ok(())
650    /// # }
651    /// ```
652    pub async fn wait_log_line_count_with_timeout(
653        &self,
654        substring: impl Into<String>,
655        is_glob: bool,
656        options: LogLineCountOptions,
657    ) -> Result<LogLineCount, anyhow::Error> {
658        let substring = substring.into();
659        debug!(
660            "waiting until match lines count within {} seconds",
661            options.timeout.as_secs_f64()
662        );
663
664        let start = tokio::time::Instant::now();
665
666        let match_fn: BoxedClosure = if is_glob {
667            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
668        } else {
669            let re = Regex::new(&substring)?;
670            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
671        };
672
673        if options.wait_until_timeout_elapses {
674            tokio::time::sleep(options.timeout).await;
675        }
676
677        let mut q;
678        loop {
679            q = 0_u32;
680            let logs = self.logs().await?;
681            for line in logs.lines() {
682                if match_fn(line)? {
683                    q += 1;
684
685                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
686                    // end after the whole log file is processed. This is to address the cases when the
687                    // predicate becomes true and false again.
688                    // eg. expected exactly 2 matching lines are expected but 3 are present
689                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
690                        return Ok(LogLineCount::TargetReached(q));
691                    }
692                }
693            }
694
695            if start.elapsed() >= options.timeout {
696                break;
697            }
698
699            tokio::time::sleep(Duration::from_secs(2)).await;
700        }
701
702        if (options.predicate)(q) {
703            Ok(LogLineCount::TargetReached(q))
704        } else {
705            Ok(LogLineCount::TargetFailed(q))
706        }
707    }
708
709    /// Waits until the number of matching log lines satisfies a custom condition,
710    /// optionally waiting for the entire duration of the timeout.
711    ///
712    /// This method searches log lines for a given substring or glob pattern,
713    /// and evaluates the number of matching lines using a user-provided predicate function.
714    /// Optionally, it can wait for the full timeout duration to ensure the condition
715    /// holds consistently (e.g., for verifying absence of logs).
716    ///
717    /// # Arguments
718    /// * `substring` - The substring or pattern to match within log lines.
719    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
720    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
721    ///
722    /// # Returns
723    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
724    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
725    /// * `Err(e)` if an error occurred during log retrieval or matching.
726    ///
727    /// # Example
728    /// ```rust
729    /// # use std::{sync::Arc, time::Duration};
730    /// # use provider::NativeProvider;
731    /// # use support::{fs::local::LocalFileSystem};
732    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
733    /// # use configuration::NetworkConfig;
734    /// # async fn example() -> Result<(), anyhow::Error> {
735    /// #   let provider = NativeProvider::new(LocalFileSystem {});
736    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
737    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
738    /// #   let network = orchestrator.spawn(config).await?;
739    /// let node = network.get_node("alice")?;
740    /// // Wait (up to 10 seconds) until pattern occurs once
741    /// let options = LogLineCountOptions {
742    ///     predicate: Arc::new(|count| count == 1),
743    ///     timeout: Duration::from_secs(10),
744    ///     wait_until_timeout_elapses: false,
745    /// };
746    /// let result = node
747    ///     .wait_log_line_count_with_timeout("error", false, options)
748    ///     .await?;
749    /// #   Ok(())
750    /// # }
751    /// ```
752    pub async fn wait_event_count_with_timeout(
753        &self,
754        pallet: impl Into<String>,
755        variant: impl Into<String>,
756        options: CountOptions,
757    ) -> Result<WaitCount, anyhow::Error> {
758        let pallet = pallet.into();
759        let variant = variant.into();
760        debug!(
761            "waiting until match event ({pallet} {variant}) count within {} seconds",
762            options.timeout.as_secs_f64()
763        );
764
765        let init_value = Arc::new(AtomicU32::new(0));
766
767        let res = tokio::time::timeout(
768            options.timeout,
769            self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
770        )
771        .await;
772
773        let q = init_value.load(Ordering::Relaxed);
774        if let Ok(inner_res) = res {
775            match inner_res {
776                Ok(_) => Ok(WaitCount::TargetReached(q)),
777                Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
778            }
779        } else {
780            // timeout
781            if options.wait_until_timeout_elapses {
782                let q = init_value.load(Ordering::Relaxed);
783                if (options.predicate)(q) {
784                    Ok(LogLineCount::TargetReached(q))
785                } else {
786                    Ok(LogLineCount::TargetFailed(q))
787                }
788            } else {
789                Err(anyhow!(
790                    "Timeout ({}), waiting for counter",
791                    options.timeout.as_secs()
792                ))
793            }
794        }
795    }
796
797    //
798    async fn wait_event_count(
799        &self,
800        pallet: &str,
801        variant: &str,
802        options: &CountOptions,
803        init_count: Arc<AtomicU32>,
804    ) -> Result<(), anyhow::Error> {
805        let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
806        let mut blocks_sub: subxt::backend::StreamOf<
807            Result<
808                subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
809                subxt::Error,
810            >,
811        > = client.blocks().subscribe_finalized().await?;
812        while let Some(block) = blocks_sub.next().await {
813            let events = block?.events().await?;
814            for event in events.iter() {
815                let evt = event?;
816                if evt.pallet_name() == pallet && evt.variant_name() == variant {
817                    let old_value = init_count.fetch_add(1, Ordering::Relaxed);
818                    if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
819                        return Ok(());
820                    }
821                }
822            }
823        }
824
825        Ok(())
826    }
827
828    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
829        let response = reqwest::get(&self.prometheus_uri).await?;
830        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
831        let mut cache = self.metrics_cache.write().await;
832        *cache = metrics;
833        Ok(())
834    }
835
836    /// Query individual metric by name
837    async fn metric(
838        &self,
839        metric_name: &str,
840        treat_not_found_as_zero: bool,
841    ) -> Result<f64, anyhow::Error> {
842        let mut metrics_map = self.metrics_cache.read().await;
843        if metrics_map.is_empty() {
844            // reload metrics
845            drop(metrics_map);
846            self.fetch_metrics().await?;
847            metrics_map = self.metrics_cache.read().await;
848        }
849
850        if let Some(val) = metrics_map.get(metric_name) {
851            Ok(*val)
852        } else if treat_not_found_as_zero {
853            Ok(0_f64)
854        } else {
855            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
856        }
857    }
858
859    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
860    ///
861    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
862    /// and calculating the count of observations in each bucket. It automatically appends
863    /// `_bucket` suffix to the metric name if not already present.
864    ///
865    /// # Arguments
866    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
867    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
868    ///
869    /// # Returns
870    /// A HashMap where keys are the `le` bucket boundaries as strings,
871    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
872    ///
873    /// # Example
874    /// ```ignore
875    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
876    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
877    /// ```
878    pub async fn get_histogram_buckets(
879        &self,
880        metric_name: impl AsRef<str>,
881        label_filters: Option<HashMap<String, String>>,
882    ) -> Result<HashMap<String, u64>, anyhow::Error> {
883        let metric_name = metric_name.as_ref();
884
885        // Fetch and parse metrics using the existing parser
886        let response = reqwest::get(&self.prometheus_uri).await?;
887        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
888
889        // Ensure metric name has _bucket suffix
890        let resolved_metric_name = if metric_name.contains("_bucket") {
891            metric_name.to_string()
892        } else {
893            format!("{}_bucket", metric_name)
894        };
895
896        // First pass: collect all matching metrics with their label counts
897        // to identify which ones have the most complete label sets
898        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
899        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
900
901        for (key, &value) in metrics.iter() {
902            if !key.starts_with(&resolved_metric_name) {
903                continue;
904            }
905
906            let remaining = &key[resolved_metric_name.len()..];
907
908            let labels_str = &remaining[1..remaining.len() - 1];
909            let parsed_labels = Self::parse_label_string(labels_str);
910
911            // Must have "le" label
912            if !parsed_labels.contains_key("le") {
913                continue;
914            }
915
916            // Check if label filters match
917            if let Some(ref filters) = label_filters {
918                let mut all_match = true;
919                for (filter_key, filter_value) in filters {
920                    if parsed_labels.get(filter_key) != Some(filter_value) {
921                        all_match = false;
922                        break;
923                    }
924                }
925                if !all_match {
926                    continue;
927                }
928            }
929
930            metric_entries.push((key.clone(), parsed_labels, value as u64));
931        }
932
933        // Find the maximum number of labels (excluding "le") across all entries
934        // This helps us identify the "fullest" version of each metric
935        let max_label_count = metric_entries
936            .iter()
937            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
938            .max()
939            .unwrap_or(0);
940
941        // Second pass: collect buckets, deduplicating and preferring entries with more labels
942        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
943        let mut seen_le_values = HashSet::new();
944        let mut active_series: Option<Vec<(String, String)>> = None;
945
946        for (_, parsed_labels, value) in metric_entries {
947            let le_value = parsed_labels.get("le").unwrap().clone();
948
949            // Get non-"le" labels
950            let mut non_le_labels: Vec<(String, String)> = parsed_labels
951                .iter()
952                .filter(|(k, _)| k.as_str() != "le")
953                .map(|(k, v)| (k.clone(), v.clone()))
954                .collect();
955            non_le_labels.sort();
956
957            // Only process entries that have the maximum number of labels
958            // (this filters out the parser's duplicate keys with fewer labels)
959            if non_le_labels.len() < max_label_count {
960                continue;
961            }
962
963            // Detect series changes
964            if let Some(ref prev_series) = active_series {
965                if prev_series != &non_le_labels {
966                    if !raw_buckets.is_empty() {
967                        break; // Stop at first series change
968                    }
969                    active_series = Some(non_le_labels.clone());
970                    seen_le_values.clear();
971                }
972            } else {
973                active_series = Some(non_le_labels.clone());
974            }
975
976            // Deduplicate by le value within this series
977            if !seen_le_values.insert(le_value.clone()) {
978                continue;
979            }
980
981            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
982            raw_buckets.push((le_value, value));
983        }
984
985        // Sort buckets by their "le" values
986        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
987
988        // Calculate deltas between cumulative buckets
989        let mut buckets = HashMap::new();
990        let mut previous_value = 0_u64;
991        for (le, cumulative_count) in raw_buckets {
992            if cumulative_count < previous_value {
993                warn!(
994                    "Warning: bucket count decreased from {} to {} at le={}",
995                    previous_value, cumulative_count, le
996                );
997            }
998            let delta = cumulative_count.saturating_sub(previous_value);
999            buckets.insert(le, delta);
1000            previous_value = cumulative_count;
1001        }
1002
1003        Ok(buckets)
1004    }
1005
1006    /// Parse label string from parsed metric key.
1007    ///
1008    /// Takes a label string in the format `key1="value1",key2="value2"`
1009    /// and returns a HashMap of key-value pairs.
1010    /// Handles commas inside quoted values correctly.
1011    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
1012        let mut labels = HashMap::new();
1013        let mut current_key = String::new();
1014        let mut current_value = String::new();
1015        let mut in_value = false;
1016        let mut in_quotes = false;
1017
1018        for ch in labels_str.chars() {
1019            match ch {
1020                '=' if !in_quotes && !in_value => {
1021                    in_value = true;
1022                },
1023                '"' if in_value => {
1024                    in_quotes = !in_quotes;
1025                },
1026                ',' if !in_quotes => {
1027                    // End of key-value pair
1028                    if !current_key.is_empty() {
1029                        labels.insert(
1030                            current_key.trim().to_string(),
1031                            current_value.trim().to_string(),
1032                        );
1033                        current_key.clear();
1034                        current_value.clear();
1035                        in_value = false;
1036                    }
1037                },
1038                _ => {
1039                    if in_value {
1040                        current_value.push(ch);
1041                    } else {
1042                        current_key.push(ch);
1043                    }
1044                },
1045            }
1046        }
1047
1048        // Insert last pair
1049        if !current_key.is_empty() {
1050            labels.insert(
1051                current_key.trim().to_string(),
1052                current_value.trim().to_string(),
1053            );
1054        }
1055
1056        labels
1057    }
1058
1059    /// Compare two histogram bucket boundary values for sorting.
1060    ///
1061    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
1062    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
1063        use std::cmp::Ordering;
1064
1065        // Handle +Inf specially
1066        match (a, b) {
1067            ("+Inf", "+Inf") => Ordering::Equal,
1068            ("+Inf", _) => Ordering::Greater,
1069            (_, "+Inf") => Ordering::Less,
1070            _ => {
1071                // Try to parse as f64 for numeric comparison
1072                match (a.parse::<f64>(), b.parse::<f64>()) {
1073                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
1074                    // Fallback to string comparison if parsing fails
1075                    _ => a.cmp(b),
1076                }
1077            },
1078        }
1079    }
1080
1081    /// Waits given number of seconds until node reports that it is up and running, which
1082    /// is determined by metric 'process_start_time_seconds', which should appear,
1083    /// when node finished booting up.
1084    ///
1085    ///
1086    /// # Arguments
1087    /// * `timeout_secs` - The number of seconds to wait.
1088    ///
1089    /// # Returns
1090    /// * `Ok()` if the node is up before timeout occured.
1091    /// * `Err(e)` if timeout or other error occurred while waiting.
1092    pub async fn wait_until_is_up(
1093        &self,
1094        timeout_secs: impl Into<u64>,
1095    ) -> Result<(), anyhow::Error> {
1096        self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1097            .await
1098            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1099    }
1100}
1101
1102impl std::fmt::Debug for NetworkNode {
1103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104        f.debug_struct("NetworkNode")
1105            .field("inner", &"inner_skipped")
1106            .field("spec", &self.spec)
1107            .field("name", &self.name)
1108            .field("ws_uri", &self.ws_uri)
1109            .field("prometheus_uri", &self.prometheus_uri)
1110            .finish()
1111    }
1112}
1113
1114fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1115where
1116    S: Serializer,
1117{
1118    erased_serde::serialize(node.as_ref(), serializer)
1119}
1120
1121// TODO: mock and impl more unit tests
1122#[cfg(test)]
1123mod tests {
1124    use std::{
1125        path::{Path, PathBuf},
1126        sync::{Arc, Mutex},
1127    };
1128
1129    use async_trait::async_trait;
1130    use provider::{types::*, ProviderError, ProviderNode};
1131
1132    use super::*;
1133
1134    #[derive(Serialize)]
1135    struct MockNode {
1136        logs: Arc<Mutex<Vec<String>>>,
1137    }
1138
1139    impl MockNode {
1140        fn new() -> Self {
1141            Self {
1142                logs: Arc::new(Mutex::new(vec![])),
1143            }
1144        }
1145
1146        fn logs_push(&self, lines: Vec<impl Into<String>>) {
1147            self.logs
1148                .lock()
1149                .unwrap()
1150                .extend(lines.into_iter().map(|l| l.into()));
1151        }
1152    }
1153
1154    #[async_trait]
1155    impl ProviderNode for MockNode {
1156        fn name(&self) -> &str {
1157            todo!()
1158        }
1159
1160        fn args(&self) -> Vec<&str> {
1161            todo!()
1162        }
1163
1164        fn base_dir(&self) -> &PathBuf {
1165            todo!()
1166        }
1167
1168        fn config_dir(&self) -> &PathBuf {
1169            todo!()
1170        }
1171
1172        fn data_dir(&self) -> &PathBuf {
1173            todo!()
1174        }
1175
1176        fn relay_data_dir(&self) -> &PathBuf {
1177            todo!()
1178        }
1179
1180        fn scripts_dir(&self) -> &PathBuf {
1181            todo!()
1182        }
1183
1184        fn log_path(&self) -> &PathBuf {
1185            todo!()
1186        }
1187
1188        fn log_cmd(&self) -> String {
1189            todo!()
1190        }
1191
1192        fn path_in_node(&self, _file: &Path) -> PathBuf {
1193            todo!()
1194        }
1195
1196        async fn logs(&self) -> Result<String, ProviderError> {
1197            Ok(self.logs.lock().unwrap().join("\n"))
1198        }
1199
1200        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1201            todo!()
1202        }
1203
1204        async fn run_command(
1205            &self,
1206            _options: RunCommandOptions,
1207        ) -> Result<ExecutionResult, ProviderError> {
1208            todo!()
1209        }
1210
1211        async fn run_script(
1212            &self,
1213            _options: RunScriptOptions,
1214        ) -> Result<ExecutionResult, ProviderError> {
1215            todo!()
1216        }
1217
1218        async fn send_file(
1219            &self,
1220            _local_file_path: &Path,
1221            _remote_file_path: &Path,
1222            _mode: &str,
1223        ) -> Result<(), ProviderError> {
1224            todo!()
1225        }
1226
1227        async fn receive_file(
1228            &self,
1229            _remote_file_path: &Path,
1230            _local_file_path: &Path,
1231        ) -> Result<(), ProviderError> {
1232            todo!()
1233        }
1234
1235        async fn pause(&self) -> Result<(), ProviderError> {
1236            todo!()
1237        }
1238
1239        async fn resume(&self) -> Result<(), ProviderError> {
1240            todo!()
1241        }
1242
1243        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1244            todo!()
1245        }
1246
1247        async fn restart_with(
1248            &self,
1249            _assets: &[AssetLocation],
1250            _cmd: &str,
1251            _args: &[String],
1252            _after: Option<Duration>,
1253        ) -> Result<(), ProviderError> {
1254            todo!()
1255        }
1256
1257        async fn destroy(&self) -> Result<(), ProviderError> {
1258            todo!()
1259        }
1260    }
1261
1262    #[tokio::test(flavor = "multi_thread")]
1263    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1264        let mock_provider = Arc::new(MockNode::new());
1265        let mock_node = NetworkNode::new(
1266            "node1",
1267            "ws_uri",
1268            "prometheus_uri",
1269            "multiaddr",
1270            NodeSpec::default(),
1271            mock_provider.clone(),
1272            GenCmdOptions::default(),
1273            NodeContext::Rc,
1274        );
1275
1276        mock_provider.logs_push(vec![
1277            "system booting",
1278            "stub line 1",
1279            "stub line 2",
1280            "system ready",
1281        ]);
1282
1283        // Wait (up to 10 seconds) until pattern occurs once
1284        let options = LogLineCountOptions {
1285            predicate: Arc::new(|n| n == 1),
1286            timeout: Duration::from_secs(10),
1287            wait_until_timeout_elapses: false,
1288        };
1289
1290        let log_line_count = mock_node
1291            .wait_log_line_count_with_timeout("system ready", false, options)
1292            .await?;
1293
1294        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1295
1296        Ok(())
1297    }
1298
1299    #[tokio::test(flavor = "multi_thread")]
1300    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1301        let mock_provider = Arc::new(MockNode::new());
1302        let mock_node = NetworkNode::new(
1303            "node1",
1304            "ws_uri",
1305            "prometheus_uri",
1306            "multiaddr",
1307            NodeSpec::default(),
1308            mock_provider.clone(),
1309            GenCmdOptions::default(),
1310            NodeContext::Rc,
1311        );
1312
1313        mock_provider.logs_push(vec![
1314            "system booting",
1315            "stub line 1",
1316            "stub line 2",
1317            "system ready",
1318        ]);
1319
1320        // Wait (up to 4 seconds) until pattern occurs twice
1321        let options = LogLineCountOptions {
1322            predicate: Arc::new(|n| n == 2),
1323            timeout: Duration::from_secs(4),
1324            wait_until_timeout_elapses: false,
1325        };
1326
1327        let task = tokio::spawn({
1328            async move {
1329                mock_node
1330                    .wait_log_line_count_with_timeout("system ready", false, options)
1331                    .await
1332                    .unwrap()
1333            }
1334        });
1335
1336        tokio::time::sleep(Duration::from_secs(2)).await;
1337
1338        mock_provider.logs_push(vec!["system ready"]);
1339
1340        let log_line_count = task.await?;
1341
1342        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1343
1344        Ok(())
1345    }
1346
1347    #[tokio::test(flavor = "multi_thread")]
1348    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1349        let mock_provider = Arc::new(MockNode::new());
1350        let mock_node = NetworkNode::new(
1351            "node1",
1352            "ws_uri",
1353            "prometheus_uri",
1354            "multiaddr",
1355            NodeSpec::default(),
1356            mock_provider.clone(),
1357            GenCmdOptions::default(),
1358            NodeContext::Rc,
1359        );
1360
1361        mock_provider.logs_push(vec![
1362            "system booting",
1363            "stub line 1",
1364            "stub line 2",
1365            "system ready",
1366        ]);
1367
1368        // Wait (up to 2 seconds) until pattern occurs twice
1369        let options = LogLineCountOptions {
1370            predicate: Arc::new(|n| n == 2),
1371            timeout: Duration::from_secs(2),
1372            wait_until_timeout_elapses: false,
1373        };
1374
1375        let log_line_count = mock_node
1376            .wait_log_line_count_with_timeout("system ready", false, options)
1377            .await?;
1378
1379        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1380
1381        Ok(())
1382    }
1383
1384    #[tokio::test(flavor = "multi_thread")]
1385    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1386        let mock_provider = Arc::new(MockNode::new());
1387        let mock_node = NetworkNode::new(
1388            "node1",
1389            "ws_uri",
1390            "prometheus_uri",
1391            "multiaddr",
1392            NodeSpec::default(),
1393            mock_provider.clone(),
1394            GenCmdOptions::default(),
1395            NodeContext::Rc,
1396        );
1397
1398        mock_provider.logs_push(vec![
1399            "system booting",
1400            "stub line 1",
1401            "stub line 2",
1402            "system ready",
1403        ]);
1404
1405        // Wait until timeout and check if pattern occurs exactly twice
1406        let options = LogLineCountOptions {
1407            predicate: Arc::new(|n| n == 2),
1408            timeout: Duration::from_secs(2),
1409            wait_until_timeout_elapses: true,
1410        };
1411
1412        let task = tokio::spawn({
1413            async move {
1414                mock_node
1415                    .wait_log_line_count_with_timeout("system ready", false, options)
1416                    .await
1417                    .unwrap()
1418            }
1419        });
1420
1421        tokio::time::sleep(Duration::from_secs(1)).await;
1422
1423        mock_provider.logs_push(vec!["system ready"]);
1424        mock_provider.logs_push(vec!["system ready"]);
1425
1426        let log_line_count = task.await?;
1427
1428        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1429
1430        Ok(())
1431    }
1432
1433    #[tokio::test(flavor = "multi_thread")]
1434    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1435        let mock_provider = Arc::new(MockNode::new());
1436        let mock_node = NetworkNode::new(
1437            "node1",
1438            "ws_uri",
1439            "prometheus_uri",
1440            "multiaddr",
1441            NodeSpec::default(),
1442            mock_provider.clone(),
1443            GenCmdOptions::default(),
1444            NodeContext::Rc,
1445        );
1446
1447        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1448
1449        let task = tokio::spawn({
1450            async move {
1451                mock_node
1452                    .wait_log_line_count_with_timeout(
1453                        "system ready",
1454                        false,
1455                        // Wait until timeout and make sure pattern occurred zero times
1456                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1457                    )
1458                    .await
1459                    .unwrap()
1460            }
1461        });
1462
1463        tokio::time::sleep(Duration::from_secs(1)).await;
1464
1465        mock_provider.logs_push(vec!["stub line 3"]);
1466
1467        assert!(task.await?.success());
1468
1469        Ok(())
1470    }
1471
1472    #[tokio::test(flavor = "multi_thread")]
1473    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1474        let mock_provider = Arc::new(MockNode::new());
1475        let mock_node = NetworkNode::new(
1476            "node1",
1477            "ws_uri",
1478            "prometheus_uri",
1479            "multiaddr",
1480            NodeSpec::default(),
1481            mock_provider.clone(),
1482            GenCmdOptions::default(),
1483            NodeContext::Rc,
1484        );
1485
1486        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1487
1488        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1489        let options = LogLineCountOptions {
1490            predicate: Arc::new(|n| (2..=5).contains(&n)),
1491            timeout: Duration::from_secs(2),
1492            wait_until_timeout_elapses: true,
1493        };
1494
1495        let task = tokio::spawn({
1496            async move {
1497                mock_node
1498                    .wait_log_line_count_with_timeout("system ready", false, options)
1499                    .await
1500                    .unwrap()
1501            }
1502        });
1503
1504        tokio::time::sleep(Duration::from_secs(1)).await;
1505
1506        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1507
1508        assert!(task.await?.success());
1509
1510        Ok(())
1511    }
1512
1513    #[tokio::test(flavor = "multi_thread")]
1514    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1515        let mock_provider = Arc::new(MockNode::new());
1516        let mock_node = NetworkNode::new(
1517            "node1",
1518            "ws_uri",
1519            "prometheus_uri",
1520            "multiaddr",
1521            NodeSpec::default(),
1522            mock_provider.clone(),
1523            GenCmdOptions::default(),
1524            NodeContext::Rc,
1525        );
1526
1527        mock_provider.logs_push(vec![
1528            "system booting",
1529            "stub line 1",
1530            // this line should not match
1531            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1532            "stub line 2"
1533        ]);
1534
1535        let options = LogLineCountOptions {
1536            predicate: Arc::new(|n| n == 1),
1537            timeout: Duration::from_secs(3),
1538            wait_until_timeout_elapses: true,
1539        };
1540
1541        let task = tokio::spawn({
1542            async move {
1543                mock_node
1544                    .wait_log_line_count_with_timeout(
1545                        "error(?! importing block .*: block has an unknown parent)",
1546                        false,
1547                        options,
1548                    )
1549                    .await
1550                    .unwrap()
1551            }
1552        });
1553
1554        tokio::time::sleep(Duration::from_secs(1)).await;
1555
1556        mock_provider.logs_push(vec![
1557            "system ready",
1558            // this line should match
1559            "system error",
1560            "system ready",
1561        ]);
1562
1563        assert!(task.await?.success());
1564
1565        Ok(())
1566    }
1567
1568    #[tokio::test(flavor = "multi_thread")]
1569    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1570    ) -> Result<(), anyhow::Error> {
1571        let mock_provider = Arc::new(MockNode::new());
1572        let mock_node = NetworkNode::new(
1573            "node1",
1574            "ws_uri",
1575            "prometheus_uri",
1576            "multiaddr",
1577            NodeSpec::default(),
1578            mock_provider.clone(),
1579            GenCmdOptions::default(),
1580            NodeContext::Rc,
1581        );
1582
1583        mock_provider.logs_push(vec![
1584            "system booting",
1585            "stub line 1",
1586            // this line should not match
1587            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1588            "stub line 2"
1589        ]);
1590
1591        let options = LogLineCountOptions {
1592            predicate: Arc::new(|n| n == 1),
1593            timeout: Duration::from_secs(6),
1594            wait_until_timeout_elapses: true,
1595        };
1596
1597        let task = tokio::spawn({
1598            async move {
1599                mock_node
1600                    .wait_log_line_count_with_timeout(
1601                        "error(?! importing block .*: block has an unknown parent)",
1602                        false,
1603                        options,
1604                    )
1605                    .await
1606                    .unwrap()
1607            }
1608        });
1609
1610        tokio::time::sleep(Duration::from_secs(1)).await;
1611
1612        mock_provider.logs_push(vec!["system ready", "system ready"]);
1613
1614        assert!(!task.await?.success());
1615
1616        Ok(())
1617    }
1618
1619    #[tokio::test(flavor = "multi_thread")]
1620    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1621        let mock_provider = Arc::new(MockNode::new());
1622        let mock_node = NetworkNode::new(
1623            "node1",
1624            "ws_uri",
1625            "prometheus_uri",
1626            "multiaddr",
1627            NodeSpec::default(),
1628            mock_provider.clone(),
1629            GenCmdOptions::default(),
1630            NodeContext::Rc,
1631        );
1632
1633        mock_provider.logs_push(vec![
1634            "system booting",
1635            "stub line 1",
1636            // this line should not match
1637            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1638            "stub line 2"
1639        ]);
1640
1641        let task = tokio::spawn({
1642            async move {
1643                mock_node
1644                    .wait_log_line_count(
1645                        "error(?! importing block .*: block has an unknown parent)",
1646                        false,
1647                        1,
1648                    )
1649                    .await
1650                    .unwrap()
1651            }
1652        });
1653
1654        tokio::time::sleep(Duration::from_secs(1)).await;
1655
1656        mock_provider.logs_push(vec![
1657            "system ready",
1658            // this line should match
1659            "system error",
1660            "system ready",
1661        ]);
1662
1663        assert!(task.await.is_ok());
1664
1665        Ok(())
1666    }
1667
1668    #[tokio::test(flavor = "multi_thread")]
1669    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1670        let mock_provider = Arc::new(MockNode::new());
1671        let mock_node = NetworkNode::new(
1672            "node1",
1673            "ws_uri",
1674            "prometheus_uri",
1675            "multiaddr",
1676            NodeSpec::default(),
1677            mock_provider.clone(),
1678            GenCmdOptions::default(),
1679            NodeContext::Rc,
1680        );
1681
1682        mock_provider.logs_push(vec![
1683            "system booting",
1684            "stub line 1",
1685            // this line should not match
1686            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1687            "stub line 2"
1688        ]);
1689
1690        let options = LogLineCountOptions {
1691            predicate: Arc::new(|count| count == 1),
1692            timeout: Duration::from_secs(2),
1693            wait_until_timeout_elapses: true,
1694        };
1695
1696        let task = tokio::spawn({
1697            async move {
1698                // we expect no match, thus wait with timeout
1699                mock_node
1700                    .wait_log_line_count_with_timeout(
1701                        "error(?! importing block .*: block has an unknown parent)",
1702                        false,
1703                        options,
1704                    )
1705                    .await
1706                    .unwrap()
1707            }
1708        });
1709
1710        tokio::time::sleep(Duration::from_secs(1)).await;
1711
1712        mock_provider.logs_push(vec!["system ready", "system ready"]);
1713
1714        assert!(!task.await?.success());
1715
1716        Ok(())
1717    }
1718
1719    #[tokio::test]
1720    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1721        // This test uses a mock HTTP server to simulate Prometheus metrics
1722        use std::sync::Arc;
1723
1724        // Create a mock metrics response with proper HELP and TYPE comments
1725        let mock_metrics = concat!(
1726            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1727            "# TYPE substrate_block_verification_time histogram\n",
1728            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1729            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1730            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1731            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1732            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1733            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1734            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1735        );
1736
1737        // Start a mock HTTP server
1738        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1739        let addr = listener.local_addr()?;
1740        let metrics = Arc::new(mock_metrics.to_string());
1741
1742        tokio::spawn({
1743            let metrics = metrics.clone();
1744            async move {
1745                loop {
1746                    if let Ok((mut socket, _)) = listener.accept().await {
1747                        let metrics = metrics.clone();
1748                        tokio::spawn(async move {
1749                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1750                            let mut buffer = [0; 1024];
1751                            let _ = socket.read(&mut buffer).await;
1752
1753                            let response = format!(
1754                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1755                                metrics.len(),
1756                                metrics
1757                            );
1758                            let _ = socket.write_all(response.as_bytes()).await;
1759                        });
1760                    }
1761                }
1762            }
1763        });
1764
1765        // Create a NetworkNode with the mock prometheus URI
1766        let mock_provider = Arc::new(MockNode::new());
1767        let mock_node = NetworkNode::new(
1768            "test_node",
1769            "ws://localhost:9944",
1770            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1771            "/ip4/127.0.0.1/tcp/30333",
1772            NodeSpec::default(),
1773            mock_provider,
1774            GenCmdOptions::default(),
1775            NodeContext::Rc,
1776        );
1777
1778        // Get buckets with label filter
1779        let mut label_filters = HashMap::new();
1780        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1781        let buckets = mock_node
1782            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1783            .await?;
1784
1785        // Should get the rococo_local_testnet chain's buckets
1786        assert_eq!(buckets.get("0.1"), Some(&10));
1787        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1788        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1789        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1790        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1791
1792        // Get buckets with label filter for rococo
1793        let mut label_filters = std::collections::HashMap::new();
1794        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1795
1796        let buckets_filtered = mock_node
1797            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1798            .await?;
1799
1800        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1801        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1802
1803        // Test 3: Get buckets with _bucket suffix already present
1804        let buckets_with_suffix = mock_node
1805            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1806            .await?;
1807
1808        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1809
1810        Ok(())
1811    }
1812
1813    #[tokio::test]
1814    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1815        // Test that buckets are correctly sorted even when received out of order
1816        use std::sync::Arc;
1817
1818        let mock_metrics = concat!(
1819            "# HELP test_metric A test metric\n",
1820            "# TYPE test_metric histogram\n",
1821            "test_metric_bucket{le=\"2.5\"} 40\n",
1822            "test_metric_bucket{le=\"0.1\"} 10\n",
1823            "test_metric_bucket{le=\"+Inf\"} 42\n",
1824            "test_metric_bucket{le=\"1.0\"} 35\n",
1825            "test_metric_bucket{le=\"0.5\"} 25\n",
1826        );
1827
1828        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1829        let addr = listener.local_addr()?;
1830        let metrics = Arc::new(mock_metrics.to_string());
1831
1832        tokio::spawn({
1833            let metrics = metrics.clone();
1834            async move {
1835                loop {
1836                    if let Ok((mut socket, _)) = listener.accept().await {
1837                        let metrics = metrics.clone();
1838                        tokio::spawn(async move {
1839                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1840                            let mut buffer = [0; 1024];
1841                            let _ = socket.read(&mut buffer).await;
1842                            let response = format!(
1843                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1844                                metrics.len(),
1845                                metrics
1846                            );
1847                            let _ = socket.write_all(response.as_bytes()).await;
1848                        });
1849                    }
1850                }
1851            }
1852        });
1853
1854        let mock_provider = Arc::new(MockNode::new());
1855        let mock_node = NetworkNode::new(
1856            "test_node",
1857            "ws://localhost:9944",
1858            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1859            "/ip4/127.0.0.1/tcp/30333",
1860            NodeSpec::default(),
1861            mock_provider,
1862            GenCmdOptions::default(),
1863            NodeContext::Rc,
1864        );
1865
1866        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1867
1868        // Verify deltas are calculated correctly after sorting
1869        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1870        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1871        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1872        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1873        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1874
1875        Ok(())
1876    }
1877
1878    #[tokio::test]
1879    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1880        // Test label parsing with commas and special characters in values
1881        use std::sync::Arc;
1882
1883        let mock_metrics = concat!(
1884            "# HELP test_metric A test metric\n",
1885            "# TYPE test_metric histogram\n",
1886            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1887            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1888            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1889        );
1890
1891        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1892        let addr = listener.local_addr()?;
1893        let metrics = Arc::new(mock_metrics.to_string());
1894
1895        tokio::spawn({
1896            let metrics = metrics.clone();
1897            async move {
1898                loop {
1899                    if let Ok((mut socket, _)) = listener.accept().await {
1900                        let metrics = metrics.clone();
1901                        tokio::spawn(async move {
1902                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1903                            let mut buffer = [0; 1024];
1904                            let _ = socket.read(&mut buffer).await;
1905                            let response = format!(
1906                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1907                                metrics.len(),
1908                                metrics
1909                            );
1910                            let _ = socket.write_all(response.as_bytes()).await;
1911                        });
1912                    }
1913                }
1914            }
1915        });
1916
1917        let mock_provider = Arc::new(MockNode::new());
1918        let mock_node = NetworkNode::new(
1919            "test_node",
1920            "ws://localhost:9944",
1921            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1922            "/ip4/127.0.0.1/tcp/30333",
1923            NodeSpec::default(),
1924            mock_provider,
1925            GenCmdOptions::default(),
1926            NodeContext::Rc,
1927        );
1928
1929        // Test without filter
1930        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1931        assert_eq!(buckets.get("0.1"), Some(&5));
1932        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1933        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1934
1935        // Test with filter containing comma in value
1936        let mut label_filters = std::collections::HashMap::new();
1937        label_filters.insert("method".to_string(), "GET,POST".to_string());
1938
1939        let buckets_filtered = mock_node
1940            .get_histogram_buckets("test_metric", Some(label_filters))
1941            .await?;
1942
1943        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1944        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1945
1946        Ok(())
1947    }
1948
1949    #[test]
1950    fn test_compare_le_values() {
1951        use std::cmp::Ordering;
1952
1953        use crate::network::node::NetworkNode;
1954
1955        // Numeric comparison
1956        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1957        assert_eq!(
1958            NetworkNode::compare_le_values("1.0", "0.5"),
1959            Ordering::Greater
1960        );
1961        assert_eq!(
1962            NetworkNode::compare_le_values("1.0", "1.0"),
1963            Ordering::Equal
1964        );
1965
1966        // +Inf handling
1967        assert_eq!(
1968            NetworkNode::compare_le_values("+Inf", "999"),
1969            Ordering::Greater
1970        );
1971        assert_eq!(
1972            NetworkNode::compare_le_values("0.1", "+Inf"),
1973            Ordering::Less
1974        );
1975        assert_eq!(
1976            NetworkNode::compare_le_values("+Inf", "+Inf"),
1977            Ordering::Equal
1978        );
1979
1980        // Large numbers
1981        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1982        assert_eq!(
1983            NetworkNode::compare_le_values("1000", "999"),
1984            Ordering::Greater
1985        );
1986    }
1987}