zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
5        Arc,
6    },
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15    types::{ExecutionResult, RunScriptOptions},
16    DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{
26    network_spec::node::NodeSpec, shared::constants::PROCESS_START_TIME_METRIC,
27    tx_helper::client::get_client_from_url,
28};
29
30type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
31
32#[derive(Error, Debug)]
33pub enum NetworkNodeError {
34    #[error("metric '{0}' not found!")]
35    MetricNotFound(String),
36}
37
38// Log target for the internal monitor
39const MONITOR_TARGET: &str = "zombie_monitor";
40#[derive(Clone, Serialize)]
41pub struct NetworkNode {
42    #[serde(serialize_with = "serialize_provider_node")]
43    pub(crate) inner: DynNode,
44    // TODO: do we need the full spec here?
45    // Maybe a reduce set of values.
46    pub(crate) spec: NodeSpec,
47    pub(crate) name: String,
48    pub(crate) ws_uri: String,
49    pub(crate) multiaddr: String,
50    pub(crate) prometheus_uri: String,
51    #[serde(skip)]
52    metrics_cache: Arc<RwLock<MetricMap>>,
53    #[serde(skip)]
54    is_running: Arc<AtomicBool>,
55    // Store the last timestamp when we start the node
56    #[serde(skip)]
57    last_start_ts: Arc<AtomicU64>,
58}
59
60#[derive(Deserialize)]
61pub(crate) struct RawNetworkNode {
62    pub(crate) name: String,
63    pub(crate) ws_uri: String,
64    pub(crate) prometheus_uri: String,
65    pub(crate) multiaddr: String,
66    pub(crate) spec: NodeSpec,
67    #[serde(default)]
68    pub(crate) inner: serde_json::Value,
69}
70
71/// Result of waiting for a certain counter (number of log lines or events).
72///
73/// Indicates whether the log line count condition was met within the timeout period.
74///
75/// # Variants
76/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
77///     * `count`: The number of matching instances at the time of satisfaction.
78/// - `TargetFailed(count)` – The condition was not met within the timeout.
79///     * `count`: The final number of matching instances at timeout expiration.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum WaitCount {
82    TargetReached(u32),
83    TargetFailed(u32),
84}
85
86impl WaitCount {
87    pub fn success(&self) -> bool {
88        match self {
89            Self::TargetReached(..) => true,
90            Self::TargetFailed(..) => false,
91        }
92    }
93}
94
95pub type LogLineCount = WaitCount;
96
97/// Configuration for controlling  count waiting behavior.
98///
99/// Allows specifying a custom predicate on the number of matchs,
100/// a timeout in seconds, and whether the system should wait the entire timeout duration.
101///
102/// # Fields
103/// - `predicate`: A function that takes the current value (metric, log lines) and
104///   returns `true` if the condition is satisfied.
105/// - `timeout_secs`: Maximum number of seconds to wait.
106/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
107///   for the full timeout duration, even if the condition is already met early.
108///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
109#[derive(Clone)]
110pub struct CountOptions {
111    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
112    pub timeout: Duration,
113    pub wait_until_timeout_elapses: bool,
114}
115
116impl CountOptions {
117    pub fn new(
118        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
119        timeout: Duration,
120        wait_until_timeout_elapses: bool,
121    ) -> Self {
122        Self {
123            predicate: Arc::new(predicate),
124            timeout,
125            wait_until_timeout_elapses,
126        }
127    }
128
129    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
130        Self::new(|n| n == 0, timeout, true)
131    }
132
133    pub fn at_least_once(timeout: Duration) -> Self {
134        Self::new(|count| count >= 1, timeout, false)
135    }
136
137    pub fn at_least(target: u32, timeout: Duration) -> Self {
138        Self::new(move |count| count >= target, timeout, false)
139    }
140
141    pub fn exactly_once(timeout: Duration) -> Self {
142        Self::new(|count| count == 1, timeout, false)
143    }
144}
145
146pub type LogLineCountOptions = CountOptions;
147
148impl NetworkNode {
149    /// Create a new NetworkNode
150    pub(crate) fn new<T: Into<String>>(
151        name: T,
152        ws_uri: T,
153        prometheus_uri: T,
154        multiaddr: T,
155        spec: NodeSpec,
156        inner: DynNode,
157    ) -> Self {
158        Self {
159            name: name.into(),
160            ws_uri: ws_uri.into(),
161            prometheus_uri: prometheus_uri.into(),
162            inner,
163            spec,
164            multiaddr: multiaddr.into(),
165            metrics_cache: Arc::new(Default::default()),
166            is_running: Arc::new(AtomicBool::new(false)),
167            last_start_ts: Arc::new(AtomicU64::new(0)),
168        }
169    }
170
171    /// Check if the node is currently running (not paused).
172    ///
173    /// This returns the internal running state.
174    pub fn is_running(&self) -> bool {
175        self.is_running.load(Ordering::Acquire)
176    }
177
178    /// Get the last timestamp when the node start.
179    pub fn last_start_ts(&self) -> u64 {
180        self.last_start_ts.load(Ordering::Acquire)
181    }
182
183    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
184    ///
185    /// This performs an actual connection attempt with a short timeout (2 seconds).
186    /// Returns `true` if the node is reachable and responding, `false` otherwise.
187    ///
188    /// This is more robust than `is_running()` as it verifies the node is actually alive.
189    pub async fn is_responsive(&self) -> bool {
190        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
191            .await
192            .is_ok()
193    }
194
195    pub(crate) fn set_is_running(&self, is_running: bool) {
196        self.is_running.store(is_running, Ordering::Release);
197    }
198
199    /// Set the timestamp when the node was started
200    pub(crate) fn set_last_start_ts(&self, ts: u64) {
201        self.last_start_ts.store(ts, Ordering::Release);
202    }
203
204    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
205        self.multiaddr = multiaddr.into();
206    }
207
208    pub fn name(&self) -> &str {
209        &self.name
210    }
211
212    pub fn args(&self) -> Vec<&str> {
213        self.inner.args()
214    }
215
216    pub fn spec(&self) -> &NodeSpec {
217        &self.spec
218    }
219
220    pub fn ws_uri(&self) -> &str {
221        &self.ws_uri
222    }
223
224    pub fn multiaddr(&self) -> &str {
225        self.multiaddr.as_ref()
226    }
227
228    // Subxt
229
230    /// Get the rpc client for the node
231    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
232        get_client_from_url(&self.ws_uri).await
233    }
234
235    /// Get the [online client](subxt::client::OnlineClient) for the node
236    #[deprecated = "Use `wait_client` instead."]
237    pub async fn client<Config: subxt::Config>(
238        &self,
239    ) -> Result<OnlineClient<Config>, subxt::Error> {
240        self.try_client().await
241    }
242
243    /// Try to connect to the node.
244    ///
245    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
246    /// the node to appear before it connects to it. This function directly tries
247    /// to connect to the node and returns an error if the node is not yet available
248    /// at that point in time.
249    ///
250    /// Returns a [`OnlineClient`] on success.
251    pub async fn try_client<Config: subxt::Config>(
252        &self,
253    ) -> Result<OnlineClient<Config>, subxt::Error> {
254        get_client_from_url(&self.ws_uri).await
255    }
256
257    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
258    pub async fn wait_client<Config: subxt::Config>(
259        &self,
260    ) -> Result<OnlineClient<Config>, anyhow::Error> {
261        debug!("wait_client ws_uri: {}", self.ws_uri());
262        wait_ws_ready(self.ws_uri())
263            .await
264            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
265
266        self.try_client()
267            .await
268            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
269    }
270
271    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
272    pub async fn wait_client_with_timeout<Config: subxt::Config>(
273        &self,
274        timeout_secs: impl Into<u64>,
275    ) -> Result<OnlineClient<Config>, anyhow::Error> {
276        debug!("waiting until subxt client is ready");
277        tokio::time::timeout(
278            Duration::from_secs(timeout_secs.into()),
279            self.wait_client::<Config>(),
280        )
281        .await?
282    }
283
284    // Commands
285
286    /// Pause the node, this is implemented by pausing the
287    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
288    ///
289    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
290    /// with global setting `teardown_on_failure` disabled.
291    pub async fn pause(&self) -> Result<(), anyhow::Error> {
292        self.set_is_running(false);
293        self.inner.pause().await?;
294        Ok(())
295    }
296
297    /// Resume the node, this is implemented by resuming the
298    /// actual process (e.g polkadot) with sending `SIGCONT` signal
299    ///
300    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
301    /// with global setting `teardown_on_failure` disabled.
302    pub async fn resume(&self) -> Result<(), anyhow::Error> {
303        self.set_is_running(true);
304        self.inner.resume().await?;
305        Ok(())
306    }
307
308    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
309    ///
310    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
311    /// with global setting `teardown_on_failure` disabled.
312    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
313        self.set_is_running(false);
314        self.inner.restart(after).await?;
315        self.set_is_running(true);
316        self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
317        Ok(())
318    }
319
320    /// Run a script inside the node's container/environment
321    ///
322    /// The script will be uploaded to the node, made executable, and executed with
323    /// the provided arguments and environment variables.
324    ///
325    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
326    pub async fn run_script(
327        &self,
328        options: RunScriptOptions,
329    ) -> Result<ExecutionResult, anyhow::Error> {
330        self.inner
331            .run_script(options)
332            .await
333            .map_err(|e| anyhow!("Failed to run script: {e}"))
334    }
335
336    // Metrics assertions
337
338    /// Get metric value 'by name' from Prometheus (exposed by the node)
339    /// metric name can be:
340    /// with prefix (e.g: 'polkadot_')
341    /// with chain attribute (e.g: 'chain=rococo-local')
342    /// without prefix and/or without chain attribute
343    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
344        let metric_name = metric_name.into();
345        // force cache reload
346        self.fetch_metrics().await?;
347        // by default we treat not found as 0 (same in v1)
348        self.metric(&metric_name, true).await
349    }
350
351    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
352    /// metric name can be:
353    /// with prefix (e.g: 'polkadot_')
354    /// with chain attribute (e.g: 'chain=rococo-local')
355    /// without prefix and/or without chain attribute
356    ///
357    /// We first try to assert on the value using the cached metrics and
358    /// if not meet the criteria we reload the cache and check again
359    pub async fn assert(
360        &self,
361        metric_name: impl Into<String>,
362        value: impl Into<f64>,
363    ) -> Result<bool, anyhow::Error> {
364        let value: f64 = value.into();
365        self.assert_with(metric_name, |v| v == value).await
366    }
367
368    /// Assert on a metric value using a given predicate.
369    /// See [`NetworkNode::reports`] description for details on metric name.
370    pub async fn assert_with(
371        &self,
372        metric_name: impl Into<String>,
373        predicate: impl Fn(f64) -> bool,
374    ) -> Result<bool, anyhow::Error> {
375        let metric_name = metric_name.into();
376        // reload metrics
377        self.fetch_metrics().await?;
378        let val = self.metric(&metric_name, true).await?;
379        let log_msg = format!("🔎 Current value {val} passed to the predicated?");
380        if metric_name == PROCESS_START_TIME_METRIC {
381            trace!(target: MONITOR_TARGET, "{log_msg}");
382        } else {
383            trace!("{log_msg}");
384        }
385        Ok(predicate(val))
386    }
387
388    // Wait methods for metrics
389
390    /// Wait until a metric value pass the `predicate`
391    pub async fn wait_metric(
392        &self,
393        metric_name: impl Into<String>,
394        predicate: impl Fn(f64) -> bool,
395    ) -> Result<(), anyhow::Error> {
396        let metric_name = metric_name.into();
397        let log_msg = format!(
398            "[{}] waiting until metric {metric_name} pass the predicate",
399            self.name()
400        );
401        if metric_name == PROCESS_START_TIME_METRIC {
402            trace!(target: MONITOR_TARGET, "{log_msg}");
403        } else {
404            trace!("{log_msg}");
405        }
406
407        loop {
408            let res = self.assert_with(&metric_name, &predicate).await;
409            let log_msg = format!("res: {res:?}");
410            match res {
411                Ok(res) => {
412                    if res {
413                        return Ok(());
414                    }
415                },
416                Err(e) => match e.downcast::<reqwest::Error>() {
417                    Ok(io_err) => {
418                        if !skip_err_while_waiting(&io_err) {
419                            return Err(io_err.into());
420                        }
421                    },
422                    Err(other) => {
423                        match other.downcast::<NetworkNodeError>() {
424                            Ok(node_err) => {
425                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
426                                    return Err(node_err.into());
427                                }
428                            },
429                            Err(other) => return Err(other),
430                        };
431                    },
432                },
433            }
434
435            if metric_name == PROCESS_START_TIME_METRIC {
436                trace!(target: MONITOR_TARGET, "{log_msg}");
437            } else {
438                trace!("{log_msg}");
439            }
440
441            // sleep to not spam prometheus
442            tokio::time::sleep(Duration::from_secs(1)).await;
443        }
444    }
445
446    /// Wait until a metric value pass the `predicate`
447    /// with a timeout (secs)
448    pub async fn wait_metric_with_timeout(
449        &self,
450        metric_name: impl Into<String>,
451        predicate: impl Fn(f64) -> bool,
452        timeout_secs: impl Into<u64>,
453    ) -> Result<(), anyhow::Error> {
454        let metric_name = metric_name.into();
455        let secs = timeout_secs.into();
456        let log_msg = format!(
457            "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
458            self.name()
459        );
460
461        if metric_name == PROCESS_START_TIME_METRIC {
462            trace!(target: MONITOR_TARGET, "{log_msg}");
463        } else {
464            debug!("{log_msg}");
465        }
466
467        let res = tokio::time::timeout(
468            Duration::from_secs(secs),
469            self.wait_metric(&metric_name, predicate),
470        )
471        .await;
472
473        if let Ok(inner_res) = res {
474            match inner_res {
475                Ok(_) => Ok(()),
476                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
477            }
478        } else {
479            // timeout
480            Err(anyhow!(
481                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
482            ))
483        }
484    }
485
486    // Logs
487
488    /// Get the logs of the node
489    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
490    pub async fn logs(&self) -> Result<String, anyhow::Error> {
491        Ok(self.inner.logs().await?)
492    }
493
494    /// Wait until a the number of matching log lines is reach
495    pub async fn wait_log_line_count(
496        &self,
497        pattern: impl Into<String>,
498        is_glob: bool,
499        count: usize,
500    ) -> Result<(), anyhow::Error> {
501        let pattern = pattern.into();
502        let pattern_clone = pattern.clone();
503        debug!("waiting until we find pattern {pattern} {count} times");
504        let match_fn: BoxedClosure = if is_glob {
505            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
506        } else {
507            let re = Regex::new(&pattern)?;
508            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
509        };
510
511        loop {
512            let mut q = 0_usize;
513            let logs = self.logs().await?;
514            for line in logs.lines() {
515                trace!("line is {line}");
516                if match_fn(line)? {
517                    trace!("pattern {pattern_clone} match in line {line}");
518                    q += 1;
519                    if q >= count {
520                        return Ok(());
521                    }
522                }
523            }
524
525            tokio::time::sleep(Duration::from_secs(2)).await;
526        }
527    }
528
529    /// Waits until the number of matching log lines satisfies a custom condition,
530    /// optionally waiting for the entire duration of the timeout.
531    ///
532    /// This method searches log lines for a given substring or glob pattern,
533    /// and evaluates the number of matching lines using a user-provided predicate function.
534    /// Optionally, it can wait for the full timeout duration to ensure the condition
535    /// holds consistently (e.g., for verifying absence of logs).
536    ///
537    /// # Arguments
538    /// * `substring` - The substring or pattern to match within log lines.
539    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
540    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
541    ///
542    /// # Returns
543    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
544    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
545    /// * `Err(e)` if an error occurred during log retrieval or matching.
546    ///
547    /// # Example
548    /// ```rust
549    /// # use std::{sync::Arc, time::Duration};
550    /// # use provider::NativeProvider;
551    /// # use support::{fs::local::LocalFileSystem};
552    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
553    /// # use configuration::NetworkConfig;
554    /// # async fn example() -> Result<(), anyhow::Error> {
555    /// #   let provider = NativeProvider::new(LocalFileSystem {});
556    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
557    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
558    /// #   let network = orchestrator.spawn(config).await?;
559    /// let node = network.get_node("alice")?;
560    /// // Wait (up to 10 seconds) until pattern occurs once
561    /// let options = LogLineCountOptions {
562    ///     predicate: Arc::new(|count| count == 1),
563    ///     timeout: Duration::from_secs(10),
564    ///     wait_until_timeout_elapses: false,
565    /// };
566    /// let result = node
567    ///     .wait_log_line_count_with_timeout("error", false, options)
568    ///     .await?;
569    /// #   Ok(())
570    /// # }
571    /// ```
572    pub async fn wait_log_line_count_with_timeout(
573        &self,
574        substring: impl Into<String>,
575        is_glob: bool,
576        options: LogLineCountOptions,
577    ) -> Result<LogLineCount, anyhow::Error> {
578        let substring = substring.into();
579        debug!(
580            "waiting until match lines count within {} seconds",
581            options.timeout.as_secs_f64()
582        );
583
584        let start = tokio::time::Instant::now();
585
586        let match_fn: BoxedClosure = if is_glob {
587            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
588        } else {
589            let re = Regex::new(&substring)?;
590            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
591        };
592
593        if options.wait_until_timeout_elapses {
594            tokio::time::sleep(options.timeout).await;
595        }
596
597        let mut q;
598        loop {
599            q = 0_u32;
600            let logs = self.logs().await?;
601            for line in logs.lines() {
602                if match_fn(line)? {
603                    q += 1;
604
605                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
606                    // end after the whole log file is processed. This is to address the cases when the
607                    // predicate becomes true and false again.
608                    // eg. expected exactly 2 matching lines are expected but 3 are present
609                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
610                        return Ok(LogLineCount::TargetReached(q));
611                    }
612                }
613            }
614
615            if start.elapsed() >= options.timeout {
616                break;
617            }
618
619            tokio::time::sleep(Duration::from_secs(2)).await;
620        }
621
622        if (options.predicate)(q) {
623            Ok(LogLineCount::TargetReached(q))
624        } else {
625            Ok(LogLineCount::TargetFailed(q))
626        }
627    }
628
629    /// Waits until the number of matching log lines satisfies a custom condition,
630    /// optionally waiting for the entire duration of the timeout.
631    ///
632    /// This method searches log lines for a given substring or glob pattern,
633    /// and evaluates the number of matching lines using a user-provided predicate function.
634    /// Optionally, it can wait for the full timeout duration to ensure the condition
635    /// holds consistently (e.g., for verifying absence of logs).
636    ///
637    /// # Arguments
638    /// * `substring` - The substring or pattern to match within log lines.
639    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
640    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
641    ///
642    /// # Returns
643    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
644    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
645    /// * `Err(e)` if an error occurred during log retrieval or matching.
646    ///
647    /// # Example
648    /// ```rust
649    /// # use std::{sync::Arc, time::Duration};
650    /// # use provider::NativeProvider;
651    /// # use support::{fs::local::LocalFileSystem};
652    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
653    /// # use configuration::NetworkConfig;
654    /// # async fn example() -> Result<(), anyhow::Error> {
655    /// #   let provider = NativeProvider::new(LocalFileSystem {});
656    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
657    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
658    /// #   let network = orchestrator.spawn(config).await?;
659    /// let node = network.get_node("alice")?;
660    /// // Wait (up to 10 seconds) until pattern occurs once
661    /// let options = LogLineCountOptions {
662    ///     predicate: Arc::new(|count| count == 1),
663    ///     timeout: Duration::from_secs(10),
664    ///     wait_until_timeout_elapses: false,
665    /// };
666    /// let result = node
667    ///     .wait_log_line_count_with_timeout("error", false, options)
668    ///     .await?;
669    /// #   Ok(())
670    /// # }
671    /// ```
672    pub async fn wait_event_count_with_timeout(
673        &self,
674        pallet: impl Into<String>,
675        variant: impl Into<String>,
676        options: CountOptions,
677    ) -> Result<WaitCount, anyhow::Error> {
678        let pallet = pallet.into();
679        let variant = variant.into();
680        debug!(
681            "waiting until match event ({pallet} {variant}) count within {} seconds",
682            options.timeout.as_secs_f64()
683        );
684
685        let init_value = Arc::new(AtomicU32::new(0));
686
687        let res = tokio::time::timeout(
688            options.timeout,
689            self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
690        )
691        .await;
692
693        let q = init_value.load(Ordering::Relaxed);
694        if let Ok(inner_res) = res {
695            match inner_res {
696                Ok(_) => Ok(WaitCount::TargetReached(q)),
697                Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
698            }
699        } else {
700            // timeout
701            if options.wait_until_timeout_elapses {
702                let q = init_value.load(Ordering::Relaxed);
703                if (options.predicate)(q) {
704                    Ok(LogLineCount::TargetReached(q))
705                } else {
706                    Ok(LogLineCount::TargetFailed(q))
707                }
708            } else {
709                Err(anyhow!(
710                    "Timeout ({}), waiting for counter",
711                    options.timeout.as_secs()
712                ))
713            }
714        }
715    }
716
717    //
718    async fn wait_event_count(
719        &self,
720        pallet: &str,
721        variant: &str,
722        options: &CountOptions,
723        init_count: Arc<AtomicU32>,
724    ) -> Result<(), anyhow::Error> {
725        let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
726        let mut blocks_sub: subxt::backend::StreamOf<
727            Result<
728                subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
729                subxt::Error,
730            >,
731        > = client.blocks().subscribe_finalized().await?;
732        while let Some(block) = blocks_sub.next().await {
733            let events = block?.events().await?;
734            for event in events.iter() {
735                let evt = event?;
736                if evt.pallet_name() == pallet && evt.variant_name() == variant {
737                    let old_value = init_count.fetch_add(1, Ordering::Relaxed);
738                    if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
739                        return Ok(());
740                    }
741                }
742            }
743        }
744
745        Ok(())
746    }
747
748    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
749        let response = reqwest::get(&self.prometheus_uri).await?;
750        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
751        let mut cache = self.metrics_cache.write().await;
752        *cache = metrics;
753        Ok(())
754    }
755
756    /// Query individual metric by name
757    async fn metric(
758        &self,
759        metric_name: &str,
760        treat_not_found_as_zero: bool,
761    ) -> Result<f64, anyhow::Error> {
762        let mut metrics_map = self.metrics_cache.read().await;
763        if metrics_map.is_empty() {
764            // reload metrics
765            drop(metrics_map);
766            self.fetch_metrics().await?;
767            metrics_map = self.metrics_cache.read().await;
768        }
769
770        if let Some(val) = metrics_map.get(metric_name) {
771            Ok(*val)
772        } else if treat_not_found_as_zero {
773            Ok(0_f64)
774        } else {
775            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
776        }
777    }
778
779    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
780    ///
781    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
782    /// and calculating the count of observations in each bucket. It automatically appends
783    /// `_bucket` suffix to the metric name if not already present.
784    ///
785    /// # Arguments
786    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
787    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
788    ///
789    /// # Returns
790    /// A HashMap where keys are the `le` bucket boundaries as strings,
791    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
792    ///
793    /// # Example
794    /// ```ignore
795    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
796    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
797    /// ```
798    pub async fn get_histogram_buckets(
799        &self,
800        metric_name: impl AsRef<str>,
801        label_filters: Option<HashMap<String, String>>,
802    ) -> Result<HashMap<String, u64>, anyhow::Error> {
803        let metric_name = metric_name.as_ref();
804
805        // Fetch and parse metrics using the existing parser
806        let response = reqwest::get(&self.prometheus_uri).await?;
807        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
808
809        // Ensure metric name has _bucket suffix
810        let resolved_metric_name = if metric_name.contains("_bucket") {
811            metric_name.to_string()
812        } else {
813            format!("{}_bucket", metric_name)
814        };
815
816        // First pass: collect all matching metrics with their label counts
817        // to identify which ones have the most complete label sets
818        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
819        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
820
821        for (key, &value) in metrics.iter() {
822            if !key.starts_with(&resolved_metric_name) {
823                continue;
824            }
825
826            let remaining = &key[resolved_metric_name.len()..];
827
828            let labels_str = &remaining[1..remaining.len() - 1];
829            let parsed_labels = Self::parse_label_string(labels_str);
830
831            // Must have "le" label
832            if !parsed_labels.contains_key("le") {
833                continue;
834            }
835
836            // Check if label filters match
837            if let Some(ref filters) = label_filters {
838                let mut all_match = true;
839                for (filter_key, filter_value) in filters {
840                    if parsed_labels.get(filter_key) != Some(filter_value) {
841                        all_match = false;
842                        break;
843                    }
844                }
845                if !all_match {
846                    continue;
847                }
848            }
849
850            metric_entries.push((key.clone(), parsed_labels, value as u64));
851        }
852
853        // Find the maximum number of labels (excluding "le") across all entries
854        // This helps us identify the "fullest" version of each metric
855        let max_label_count = metric_entries
856            .iter()
857            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
858            .max()
859            .unwrap_or(0);
860
861        // Second pass: collect buckets, deduplicating and preferring entries with more labels
862        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
863        let mut seen_le_values = HashSet::new();
864        let mut active_series: Option<Vec<(String, String)>> = None;
865
866        for (_, parsed_labels, value) in metric_entries {
867            let le_value = parsed_labels.get("le").unwrap().clone();
868
869            // Get non-"le" labels
870            let mut non_le_labels: Vec<(String, String)> = parsed_labels
871                .iter()
872                .filter(|(k, _)| k.as_str() != "le")
873                .map(|(k, v)| (k.clone(), v.clone()))
874                .collect();
875            non_le_labels.sort();
876
877            // Only process entries that have the maximum number of labels
878            // (this filters out the parser's duplicate keys with fewer labels)
879            if non_le_labels.len() < max_label_count {
880                continue;
881            }
882
883            // Detect series changes
884            if let Some(ref prev_series) = active_series {
885                if prev_series != &non_le_labels {
886                    if !raw_buckets.is_empty() {
887                        break; // Stop at first series change
888                    }
889                    active_series = Some(non_le_labels.clone());
890                    seen_le_values.clear();
891                }
892            } else {
893                active_series = Some(non_le_labels.clone());
894            }
895
896            // Deduplicate by le value within this series
897            if !seen_le_values.insert(le_value.clone()) {
898                continue;
899            }
900
901            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
902            raw_buckets.push((le_value, value));
903        }
904
905        // Sort buckets by their "le" values
906        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
907
908        // Calculate deltas between cumulative buckets
909        let mut buckets = HashMap::new();
910        let mut previous_value = 0_u64;
911        for (le, cumulative_count) in raw_buckets {
912            if cumulative_count < previous_value {
913                warn!(
914                    "Warning: bucket count decreased from {} to {} at le={}",
915                    previous_value, cumulative_count, le
916                );
917            }
918            let delta = cumulative_count.saturating_sub(previous_value);
919            buckets.insert(le, delta);
920            previous_value = cumulative_count;
921        }
922
923        Ok(buckets)
924    }
925
926    /// Parse label string from parsed metric key.
927    ///
928    /// Takes a label string in the format `key1="value1",key2="value2"`
929    /// and returns a HashMap of key-value pairs.
930    /// Handles commas inside quoted values correctly.
931    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
932        let mut labels = HashMap::new();
933        let mut current_key = String::new();
934        let mut current_value = String::new();
935        let mut in_value = false;
936        let mut in_quotes = false;
937
938        for ch in labels_str.chars() {
939            match ch {
940                '=' if !in_quotes && !in_value => {
941                    in_value = true;
942                },
943                '"' if in_value => {
944                    in_quotes = !in_quotes;
945                },
946                ',' if !in_quotes => {
947                    // End of key-value pair
948                    if !current_key.is_empty() {
949                        labels.insert(
950                            current_key.trim().to_string(),
951                            current_value.trim().to_string(),
952                        );
953                        current_key.clear();
954                        current_value.clear();
955                        in_value = false;
956                    }
957                },
958                _ => {
959                    if in_value {
960                        current_value.push(ch);
961                    } else {
962                        current_key.push(ch);
963                    }
964                },
965            }
966        }
967
968        // Insert last pair
969        if !current_key.is_empty() {
970            labels.insert(
971                current_key.trim().to_string(),
972                current_value.trim().to_string(),
973            );
974        }
975
976        labels
977    }
978
979    /// Compare two histogram bucket boundary values for sorting.
980    ///
981    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
982    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
983        use std::cmp::Ordering;
984
985        // Handle +Inf specially
986        match (a, b) {
987            ("+Inf", "+Inf") => Ordering::Equal,
988            ("+Inf", _) => Ordering::Greater,
989            (_, "+Inf") => Ordering::Less,
990            _ => {
991                // Try to parse as f64 for numeric comparison
992                match (a.parse::<f64>(), b.parse::<f64>()) {
993                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
994                    // Fallback to string comparison if parsing fails
995                    _ => a.cmp(b),
996                }
997            },
998        }
999    }
1000
1001    /// Waits given number of seconds until node reports that it is up and running, which
1002    /// is determined by metric 'process_start_time_seconds', which should appear,
1003    /// when node finished booting up.
1004    ///
1005    ///
1006    /// # Arguments
1007    /// * `timeout_secs` - The number of seconds to wait.
1008    ///
1009    /// # Returns
1010    /// * `Ok()` if the node is up before timeout occured.
1011    /// * `Err(e)` if timeout or other error occurred while waiting.
1012    pub async fn wait_until_is_up(
1013        &self,
1014        timeout_secs: impl Into<u64>,
1015    ) -> Result<(), anyhow::Error> {
1016        self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1017            .await
1018            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1019    }
1020}
1021
1022impl std::fmt::Debug for NetworkNode {
1023    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1024        f.debug_struct("NetworkNode")
1025            .field("inner", &"inner_skipped")
1026            .field("spec", &self.spec)
1027            .field("name", &self.name)
1028            .field("ws_uri", &self.ws_uri)
1029            .field("prometheus_uri", &self.prometheus_uri)
1030            .finish()
1031    }
1032}
1033
1034fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1035where
1036    S: Serializer,
1037{
1038    erased_serde::serialize(node.as_ref(), serializer)
1039}
1040
1041// TODO: mock and impl more unit tests
1042#[cfg(test)]
1043mod tests {
1044    use std::{
1045        path::{Path, PathBuf},
1046        sync::{Arc, Mutex},
1047    };
1048
1049    use async_trait::async_trait;
1050    use provider::{types::*, ProviderError, ProviderNode};
1051
1052    use super::*;
1053
1054    #[derive(Serialize)]
1055    struct MockNode {
1056        logs: Arc<Mutex<Vec<String>>>,
1057    }
1058
1059    impl MockNode {
1060        fn new() -> Self {
1061            Self {
1062                logs: Arc::new(Mutex::new(vec![])),
1063            }
1064        }
1065
1066        fn logs_push(&self, lines: Vec<impl Into<String>>) {
1067            self.logs
1068                .lock()
1069                .unwrap()
1070                .extend(lines.into_iter().map(|l| l.into()));
1071        }
1072    }
1073
1074    #[async_trait]
1075    impl ProviderNode for MockNode {
1076        fn name(&self) -> &str {
1077            todo!()
1078        }
1079
1080        fn args(&self) -> Vec<&str> {
1081            todo!()
1082        }
1083
1084        fn base_dir(&self) -> &PathBuf {
1085            todo!()
1086        }
1087
1088        fn config_dir(&self) -> &PathBuf {
1089            todo!()
1090        }
1091
1092        fn data_dir(&self) -> &PathBuf {
1093            todo!()
1094        }
1095
1096        fn relay_data_dir(&self) -> &PathBuf {
1097            todo!()
1098        }
1099
1100        fn scripts_dir(&self) -> &PathBuf {
1101            todo!()
1102        }
1103
1104        fn log_path(&self) -> &PathBuf {
1105            todo!()
1106        }
1107
1108        fn log_cmd(&self) -> String {
1109            todo!()
1110        }
1111
1112        fn path_in_node(&self, _file: &Path) -> PathBuf {
1113            todo!()
1114        }
1115
1116        async fn logs(&self) -> Result<String, ProviderError> {
1117            Ok(self.logs.lock().unwrap().join("\n"))
1118        }
1119
1120        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1121            todo!()
1122        }
1123
1124        async fn run_command(
1125            &self,
1126            _options: RunCommandOptions,
1127        ) -> Result<ExecutionResult, ProviderError> {
1128            todo!()
1129        }
1130
1131        async fn run_script(
1132            &self,
1133            _options: RunScriptOptions,
1134        ) -> Result<ExecutionResult, ProviderError> {
1135            todo!()
1136        }
1137
1138        async fn send_file(
1139            &self,
1140            _local_file_path: &Path,
1141            _remote_file_path: &Path,
1142            _mode: &str,
1143        ) -> Result<(), ProviderError> {
1144            todo!()
1145        }
1146
1147        async fn receive_file(
1148            &self,
1149            _remote_file_path: &Path,
1150            _local_file_path: &Path,
1151        ) -> Result<(), ProviderError> {
1152            todo!()
1153        }
1154
1155        async fn pause(&self) -> Result<(), ProviderError> {
1156            todo!()
1157        }
1158
1159        async fn resume(&self) -> Result<(), ProviderError> {
1160            todo!()
1161        }
1162
1163        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1164            todo!()
1165        }
1166
1167        async fn destroy(&self) -> Result<(), ProviderError> {
1168            todo!()
1169        }
1170    }
1171
1172    #[tokio::test(flavor = "multi_thread")]
1173    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1174        let mock_provider = Arc::new(MockNode::new());
1175        let mock_node = NetworkNode::new(
1176            "node1",
1177            "ws_uri",
1178            "prometheus_uri",
1179            "multiaddr",
1180            NodeSpec::default(),
1181            mock_provider.clone(),
1182        );
1183
1184        mock_provider.logs_push(vec![
1185            "system booting",
1186            "stub line 1",
1187            "stub line 2",
1188            "system ready",
1189        ]);
1190
1191        // Wait (up to 10 seconds) until pattern occurs once
1192        let options = LogLineCountOptions {
1193            predicate: Arc::new(|n| n == 1),
1194            timeout: Duration::from_secs(10),
1195            wait_until_timeout_elapses: false,
1196        };
1197
1198        let log_line_count = mock_node
1199            .wait_log_line_count_with_timeout("system ready", false, options)
1200            .await?;
1201
1202        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1203
1204        Ok(())
1205    }
1206
1207    #[tokio::test(flavor = "multi_thread")]
1208    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1209        let mock_provider = Arc::new(MockNode::new());
1210        let mock_node = NetworkNode::new(
1211            "node1",
1212            "ws_uri",
1213            "prometheus_uri",
1214            "multiaddr",
1215            NodeSpec::default(),
1216            mock_provider.clone(),
1217        );
1218
1219        mock_provider.logs_push(vec![
1220            "system booting",
1221            "stub line 1",
1222            "stub line 2",
1223            "system ready",
1224        ]);
1225
1226        // Wait (up to 4 seconds) until pattern occurs twice
1227        let options = LogLineCountOptions {
1228            predicate: Arc::new(|n| n == 2),
1229            timeout: Duration::from_secs(4),
1230            wait_until_timeout_elapses: false,
1231        };
1232
1233        let task = tokio::spawn({
1234            async move {
1235                mock_node
1236                    .wait_log_line_count_with_timeout("system ready", false, options)
1237                    .await
1238                    .unwrap()
1239            }
1240        });
1241
1242        tokio::time::sleep(Duration::from_secs(2)).await;
1243
1244        mock_provider.logs_push(vec!["system ready"]);
1245
1246        let log_line_count = task.await?;
1247
1248        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1249
1250        Ok(())
1251    }
1252
1253    #[tokio::test(flavor = "multi_thread")]
1254    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1255        let mock_provider = Arc::new(MockNode::new());
1256        let mock_node = NetworkNode::new(
1257            "node1",
1258            "ws_uri",
1259            "prometheus_uri",
1260            "multiaddr",
1261            NodeSpec::default(),
1262            mock_provider.clone(),
1263        );
1264
1265        mock_provider.logs_push(vec![
1266            "system booting",
1267            "stub line 1",
1268            "stub line 2",
1269            "system ready",
1270        ]);
1271
1272        // Wait (up to 2 seconds) until pattern occurs twice
1273        let options = LogLineCountOptions {
1274            predicate: Arc::new(|n| n == 2),
1275            timeout: Duration::from_secs(2),
1276            wait_until_timeout_elapses: false,
1277        };
1278
1279        let log_line_count = mock_node
1280            .wait_log_line_count_with_timeout("system ready", false, options)
1281            .await?;
1282
1283        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1284
1285        Ok(())
1286    }
1287
1288    #[tokio::test(flavor = "multi_thread")]
1289    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1290        let mock_provider = Arc::new(MockNode::new());
1291        let mock_node = NetworkNode::new(
1292            "node1",
1293            "ws_uri",
1294            "prometheus_uri",
1295            "multiaddr",
1296            NodeSpec::default(),
1297            mock_provider.clone(),
1298        );
1299
1300        mock_provider.logs_push(vec![
1301            "system booting",
1302            "stub line 1",
1303            "stub line 2",
1304            "system ready",
1305        ]);
1306
1307        // Wait until timeout and check if pattern occurs exactly twice
1308        let options = LogLineCountOptions {
1309            predicate: Arc::new(|n| n == 2),
1310            timeout: Duration::from_secs(2),
1311            wait_until_timeout_elapses: true,
1312        };
1313
1314        let task = tokio::spawn({
1315            async move {
1316                mock_node
1317                    .wait_log_line_count_with_timeout("system ready", false, options)
1318                    .await
1319                    .unwrap()
1320            }
1321        });
1322
1323        tokio::time::sleep(Duration::from_secs(1)).await;
1324
1325        mock_provider.logs_push(vec!["system ready"]);
1326        mock_provider.logs_push(vec!["system ready"]);
1327
1328        let log_line_count = task.await?;
1329
1330        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1331
1332        Ok(())
1333    }
1334
1335    #[tokio::test(flavor = "multi_thread")]
1336    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1337        let mock_provider = Arc::new(MockNode::new());
1338        let mock_node = NetworkNode::new(
1339            "node1",
1340            "ws_uri",
1341            "prometheus_uri",
1342            "multiaddr",
1343            NodeSpec::default(),
1344            mock_provider.clone(),
1345        );
1346
1347        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1348
1349        let task = tokio::spawn({
1350            async move {
1351                mock_node
1352                    .wait_log_line_count_with_timeout(
1353                        "system ready",
1354                        false,
1355                        // Wait until timeout and make sure pattern occurred zero times
1356                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1357                    )
1358                    .await
1359                    .unwrap()
1360            }
1361        });
1362
1363        tokio::time::sleep(Duration::from_secs(1)).await;
1364
1365        mock_provider.logs_push(vec!["stub line 3"]);
1366
1367        assert!(task.await?.success());
1368
1369        Ok(())
1370    }
1371
1372    #[tokio::test(flavor = "multi_thread")]
1373    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1374        let mock_provider = Arc::new(MockNode::new());
1375        let mock_node = NetworkNode::new(
1376            "node1",
1377            "ws_uri",
1378            "prometheus_uri",
1379            "multiaddr",
1380            NodeSpec::default(),
1381            mock_provider.clone(),
1382        );
1383
1384        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1385
1386        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1387        let options = LogLineCountOptions {
1388            predicate: Arc::new(|n| (2..=5).contains(&n)),
1389            timeout: Duration::from_secs(2),
1390            wait_until_timeout_elapses: true,
1391        };
1392
1393        let task = tokio::spawn({
1394            async move {
1395                mock_node
1396                    .wait_log_line_count_with_timeout("system ready", false, options)
1397                    .await
1398                    .unwrap()
1399            }
1400        });
1401
1402        tokio::time::sleep(Duration::from_secs(1)).await;
1403
1404        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1405
1406        assert!(task.await?.success());
1407
1408        Ok(())
1409    }
1410
1411    #[tokio::test(flavor = "multi_thread")]
1412    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1413        let mock_provider = Arc::new(MockNode::new());
1414        let mock_node = NetworkNode::new(
1415            "node1",
1416            "ws_uri",
1417            "prometheus_uri",
1418            "multiaddr",
1419            NodeSpec::default(),
1420            mock_provider.clone(),
1421        );
1422
1423        mock_provider.logs_push(vec![
1424            "system booting",
1425            "stub line 1",
1426            // this line should not match
1427            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1428            "stub line 2"
1429        ]);
1430
1431        let options = LogLineCountOptions {
1432            predicate: Arc::new(|n| n == 1),
1433            timeout: Duration::from_secs(3),
1434            wait_until_timeout_elapses: true,
1435        };
1436
1437        let task = tokio::spawn({
1438            async move {
1439                mock_node
1440                    .wait_log_line_count_with_timeout(
1441                        "error(?! importing block .*: block has an unknown parent)",
1442                        false,
1443                        options,
1444                    )
1445                    .await
1446                    .unwrap()
1447            }
1448        });
1449
1450        tokio::time::sleep(Duration::from_secs(1)).await;
1451
1452        mock_provider.logs_push(vec![
1453            "system ready",
1454            // this line should match
1455            "system error",
1456            "system ready",
1457        ]);
1458
1459        assert!(task.await?.success());
1460
1461        Ok(())
1462    }
1463
1464    #[tokio::test(flavor = "multi_thread")]
1465    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1466    ) -> Result<(), anyhow::Error> {
1467        let mock_provider = Arc::new(MockNode::new());
1468        let mock_node = NetworkNode::new(
1469            "node1",
1470            "ws_uri",
1471            "prometheus_uri",
1472            "multiaddr",
1473            NodeSpec::default(),
1474            mock_provider.clone(),
1475        );
1476
1477        mock_provider.logs_push(vec![
1478            "system booting",
1479            "stub line 1",
1480            // this line should not match
1481            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1482            "stub line 2"
1483        ]);
1484
1485        let options = LogLineCountOptions {
1486            predicate: Arc::new(|n| n == 1),
1487            timeout: Duration::from_secs(6),
1488            wait_until_timeout_elapses: true,
1489        };
1490
1491        let task = tokio::spawn({
1492            async move {
1493                mock_node
1494                    .wait_log_line_count_with_timeout(
1495                        "error(?! importing block .*: block has an unknown parent)",
1496                        false,
1497                        options,
1498                    )
1499                    .await
1500                    .unwrap()
1501            }
1502        });
1503
1504        tokio::time::sleep(Duration::from_secs(1)).await;
1505
1506        mock_provider.logs_push(vec!["system ready", "system ready"]);
1507
1508        assert!(!task.await?.success());
1509
1510        Ok(())
1511    }
1512
1513    #[tokio::test(flavor = "multi_thread")]
1514    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1515        let mock_provider = Arc::new(MockNode::new());
1516        let mock_node = NetworkNode::new(
1517            "node1",
1518            "ws_uri",
1519            "prometheus_uri",
1520            "multiaddr",
1521            NodeSpec::default(),
1522            mock_provider.clone(),
1523        );
1524
1525        mock_provider.logs_push(vec![
1526            "system booting",
1527            "stub line 1",
1528            // this line should not match
1529            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1530            "stub line 2"
1531        ]);
1532
1533        let task = tokio::spawn({
1534            async move {
1535                mock_node
1536                    .wait_log_line_count(
1537                        "error(?! importing block .*: block has an unknown parent)",
1538                        false,
1539                        1,
1540                    )
1541                    .await
1542                    .unwrap()
1543            }
1544        });
1545
1546        tokio::time::sleep(Duration::from_secs(1)).await;
1547
1548        mock_provider.logs_push(vec![
1549            "system ready",
1550            // this line should match
1551            "system error",
1552            "system ready",
1553        ]);
1554
1555        assert!(task.await.is_ok());
1556
1557        Ok(())
1558    }
1559
1560    #[tokio::test(flavor = "multi_thread")]
1561    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1562        let mock_provider = Arc::new(MockNode::new());
1563        let mock_node = NetworkNode::new(
1564            "node1",
1565            "ws_uri",
1566            "prometheus_uri",
1567            "multiaddr",
1568            NodeSpec::default(),
1569            mock_provider.clone(),
1570        );
1571
1572        mock_provider.logs_push(vec![
1573            "system booting",
1574            "stub line 1",
1575            // this line should not match
1576            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1577            "stub line 2"
1578        ]);
1579
1580        let options = LogLineCountOptions {
1581            predicate: Arc::new(|count| count == 1),
1582            timeout: Duration::from_secs(2),
1583            wait_until_timeout_elapses: true,
1584        };
1585
1586        let task = tokio::spawn({
1587            async move {
1588                // we expect no match, thus wait with timeout
1589                mock_node
1590                    .wait_log_line_count_with_timeout(
1591                        "error(?! importing block .*: block has an unknown parent)",
1592                        false,
1593                        options,
1594                    )
1595                    .await
1596                    .unwrap()
1597            }
1598        });
1599
1600        tokio::time::sleep(Duration::from_secs(1)).await;
1601
1602        mock_provider.logs_push(vec!["system ready", "system ready"]);
1603
1604        assert!(!task.await?.success());
1605
1606        Ok(())
1607    }
1608
1609    #[tokio::test]
1610    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1611        // This test uses a mock HTTP server to simulate Prometheus metrics
1612        use std::sync::Arc;
1613
1614        // Create a mock metrics response with proper HELP and TYPE comments
1615        let mock_metrics = concat!(
1616            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1617            "# TYPE substrate_block_verification_time histogram\n",
1618            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1619            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1620            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1621            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1622            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1623            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1624            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1625        );
1626
1627        // Start a mock HTTP server
1628        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1629        let addr = listener.local_addr()?;
1630        let metrics = Arc::new(mock_metrics.to_string());
1631
1632        tokio::spawn({
1633            let metrics = metrics.clone();
1634            async move {
1635                loop {
1636                    if let Ok((mut socket, _)) = listener.accept().await {
1637                        let metrics = metrics.clone();
1638                        tokio::spawn(async move {
1639                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1640                            let mut buffer = [0; 1024];
1641                            let _ = socket.read(&mut buffer).await;
1642
1643                            let response = format!(
1644                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1645                                metrics.len(),
1646                                metrics
1647                            );
1648                            let _ = socket.write_all(response.as_bytes()).await;
1649                        });
1650                    }
1651                }
1652            }
1653        });
1654
1655        // Create a NetworkNode with the mock prometheus URI
1656        let mock_provider = Arc::new(MockNode::new());
1657        let mock_node = NetworkNode::new(
1658            "test_node",
1659            "ws://localhost:9944",
1660            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1661            "/ip4/127.0.0.1/tcp/30333",
1662            NodeSpec::default(),
1663            mock_provider,
1664        );
1665
1666        // Get buckets with label filter
1667        let mut label_filters = HashMap::new();
1668        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1669        let buckets = mock_node
1670            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1671            .await?;
1672
1673        // Should get the rococo_local_testnet chain's buckets
1674        assert_eq!(buckets.get("0.1"), Some(&10));
1675        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1676        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1677        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1678        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1679
1680        // Get buckets with label filter for rococo
1681        let mut label_filters = std::collections::HashMap::new();
1682        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1683
1684        let buckets_filtered = mock_node
1685            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1686            .await?;
1687
1688        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1689        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1690
1691        // Test 3: Get buckets with _bucket suffix already present
1692        let buckets_with_suffix = mock_node
1693            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1694            .await?;
1695
1696        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1697
1698        Ok(())
1699    }
1700
1701    #[tokio::test]
1702    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1703        // Test that buckets are correctly sorted even when received out of order
1704        use std::sync::Arc;
1705
1706        let mock_metrics = concat!(
1707            "# HELP test_metric A test metric\n",
1708            "# TYPE test_metric histogram\n",
1709            "test_metric_bucket{le=\"2.5\"} 40\n",
1710            "test_metric_bucket{le=\"0.1\"} 10\n",
1711            "test_metric_bucket{le=\"+Inf\"} 42\n",
1712            "test_metric_bucket{le=\"1.0\"} 35\n",
1713            "test_metric_bucket{le=\"0.5\"} 25\n",
1714        );
1715
1716        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1717        let addr = listener.local_addr()?;
1718        let metrics = Arc::new(mock_metrics.to_string());
1719
1720        tokio::spawn({
1721            let metrics = metrics.clone();
1722            async move {
1723                loop {
1724                    if let Ok((mut socket, _)) = listener.accept().await {
1725                        let metrics = metrics.clone();
1726                        tokio::spawn(async move {
1727                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1728                            let mut buffer = [0; 1024];
1729                            let _ = socket.read(&mut buffer).await;
1730                            let response = format!(
1731                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1732                                metrics.len(),
1733                                metrics
1734                            );
1735                            let _ = socket.write_all(response.as_bytes()).await;
1736                        });
1737                    }
1738                }
1739            }
1740        });
1741
1742        let mock_provider = Arc::new(MockNode::new());
1743        let mock_node = NetworkNode::new(
1744            "test_node",
1745            "ws://localhost:9944",
1746            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1747            "/ip4/127.0.0.1/tcp/30333",
1748            NodeSpec::default(),
1749            mock_provider,
1750        );
1751
1752        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1753
1754        // Verify deltas are calculated correctly after sorting
1755        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1756        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1757        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1758        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1759        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1760
1761        Ok(())
1762    }
1763
1764    #[tokio::test]
1765    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1766        // Test label parsing with commas and special characters in values
1767        use std::sync::Arc;
1768
1769        let mock_metrics = concat!(
1770            "# HELP test_metric A test metric\n",
1771            "# TYPE test_metric histogram\n",
1772            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1773            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1774            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1775        );
1776
1777        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1778        let addr = listener.local_addr()?;
1779        let metrics = Arc::new(mock_metrics.to_string());
1780
1781        tokio::spawn({
1782            let metrics = metrics.clone();
1783            async move {
1784                loop {
1785                    if let Ok((mut socket, _)) = listener.accept().await {
1786                        let metrics = metrics.clone();
1787                        tokio::spawn(async move {
1788                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1789                            let mut buffer = [0; 1024];
1790                            let _ = socket.read(&mut buffer).await;
1791                            let response = format!(
1792                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1793                                metrics.len(),
1794                                metrics
1795                            );
1796                            let _ = socket.write_all(response.as_bytes()).await;
1797                        });
1798                    }
1799                }
1800            }
1801        });
1802
1803        let mock_provider = Arc::new(MockNode::new());
1804        let mock_node = NetworkNode::new(
1805            "test_node",
1806            "ws://localhost:9944",
1807            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1808            "/ip4/127.0.0.1/tcp/30333",
1809            NodeSpec::default(),
1810            mock_provider,
1811        );
1812
1813        // Test without filter
1814        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1815        assert_eq!(buckets.get("0.1"), Some(&5));
1816        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1817        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1818
1819        // Test with filter containing comma in value
1820        let mut label_filters = std::collections::HashMap::new();
1821        label_filters.insert("method".to_string(), "GET,POST".to_string());
1822
1823        let buckets_filtered = mock_node
1824            .get_histogram_buckets("test_metric", Some(label_filters))
1825            .await?;
1826
1827        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1828        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1829
1830        Ok(())
1831    }
1832
1833    #[test]
1834    fn test_compare_le_values() {
1835        use std::cmp::Ordering;
1836
1837        use crate::network::node::NetworkNode;
1838
1839        // Numeric comparison
1840        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1841        assert_eq!(
1842            NetworkNode::compare_le_values("1.0", "0.5"),
1843            Ordering::Greater
1844        );
1845        assert_eq!(
1846            NetworkNode::compare_le_values("1.0", "1.0"),
1847            Ordering::Equal
1848        );
1849
1850        // +Inf handling
1851        assert_eq!(
1852            NetworkNode::compare_le_values("+Inf", "999"),
1853            Ordering::Greater
1854        );
1855        assert_eq!(
1856            NetworkNode::compare_le_values("0.1", "+Inf"),
1857            Ordering::Less
1858        );
1859        assert_eq!(
1860            NetworkNode::compare_le_values("+Inf", "+Inf"),
1861            Ordering::Equal
1862        );
1863
1864        // Large numbers
1865        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1866        assert_eq!(
1867            NetworkNode::compare_le_values("1000", "999"),
1868            Ordering::Greater
1869        );
1870    }
1871}