zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, AtomicU32, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15    types::{ExecutionResult, RunScriptOptions},
16    DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{
26    network_spec::node::NodeSpec, shared::constants::PROCESS_START_TIME_METRIC,
27    tx_helper::client::get_client_from_url,
28};
29
30type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
31
32#[derive(Error, Debug)]
33pub enum NetworkNodeError {
34    #[error("metric '{0}' not found!")]
35    MetricNotFound(String),
36}
37
38#[derive(Clone, Serialize)]
39pub struct NetworkNode {
40    #[serde(serialize_with = "serialize_provider_node")]
41    pub(crate) inner: DynNode,
42    // TODO: do we need the full spec here?
43    // Maybe a reduce set of values.
44    pub(crate) spec: NodeSpec,
45    pub(crate) name: String,
46    pub(crate) ws_uri: String,
47    pub(crate) multiaddr: String,
48    pub(crate) prometheus_uri: String,
49    #[serde(skip)]
50    metrics_cache: Arc<RwLock<MetricMap>>,
51    #[serde(skip)]
52    is_running: Arc<AtomicBool>,
53}
54
55#[derive(Deserialize)]
56pub(crate) struct RawNetworkNode {
57    pub(crate) name: String,
58    pub(crate) ws_uri: String,
59    pub(crate) prometheus_uri: String,
60    pub(crate) multiaddr: String,
61    pub(crate) spec: NodeSpec,
62    #[serde(default)]
63    pub(crate) inner: serde_json::Value,
64}
65
66/// Result of waiting for a certain counter (number of log lines or events).
67///
68/// Indicates whether the log line count condition was met within the timeout period.
69///
70/// # Variants
71/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
72///     * `count`: The number of matching instances at the time of satisfaction.
73/// - `TargetFailed(count)` – The condition was not met within the timeout.
74///     * `count`: The final number of matching instances at timeout expiration.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum WaitCount {
77    TargetReached(u32),
78    TargetFailed(u32),
79}
80
81impl WaitCount {
82    pub fn success(&self) -> bool {
83        match self {
84            Self::TargetReached(..) => true,
85            Self::TargetFailed(..) => false,
86        }
87    }
88}
89
90pub type LogLineCount = WaitCount;
91
92/// Configuration for controlling  count waiting behavior.
93///
94/// Allows specifying a custom predicate on the number of matchs,
95/// a timeout in seconds, and whether the system should wait the entire timeout duration.
96///
97/// # Fields
98/// - `predicate`: A function that takes the current value (metric, log lines) and
99///   returns `true` if the condition is satisfied.
100/// - `timeout_secs`: Maximum number of seconds to wait.
101/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
102///   for the full timeout duration, even if the condition is already met early.
103///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
104#[derive(Clone)]
105pub struct CountOptions {
106    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
107    pub timeout: Duration,
108    pub wait_until_timeout_elapses: bool,
109}
110
111impl CountOptions {
112    pub fn new(
113        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
114        timeout: Duration,
115        wait_until_timeout_elapses: bool,
116    ) -> Self {
117        Self {
118            predicate: Arc::new(predicate),
119            timeout,
120            wait_until_timeout_elapses,
121        }
122    }
123
124    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
125        Self::new(|n| n == 0, timeout, true)
126    }
127
128    pub fn at_least_once(timeout: Duration) -> Self {
129        Self::new(|count| count >= 1, timeout, false)
130    }
131
132    pub fn at_least(target: u32, timeout: Duration) -> Self {
133        Self::new(move |count| count >= target, timeout, false)
134    }
135
136    pub fn exactly_once(timeout: Duration) -> Self {
137        Self::new(|count| count == 1, timeout, false)
138    }
139}
140
141pub type LogLineCountOptions = CountOptions;
142
143impl NetworkNode {
144    /// Create a new NetworkNode
145    pub(crate) fn new<T: Into<String>>(
146        name: T,
147        ws_uri: T,
148        prometheus_uri: T,
149        multiaddr: T,
150        spec: NodeSpec,
151        inner: DynNode,
152    ) -> Self {
153        Self {
154            name: name.into(),
155            ws_uri: ws_uri.into(),
156            prometheus_uri: prometheus_uri.into(),
157            inner,
158            spec,
159            multiaddr: multiaddr.into(),
160            metrics_cache: Arc::new(Default::default()),
161            is_running: Arc::new(AtomicBool::new(false)),
162        }
163    }
164
165    /// Check if the node is currently running (not paused).
166    ///
167    /// This returns the internal running state.
168    pub fn is_running(&self) -> bool {
169        self.is_running.load(Ordering::Acquire)
170    }
171
172    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
173    ///
174    /// This performs an actual connection attempt with a short timeout (2 seconds).
175    /// Returns `true` if the node is reachable and responding, `false` otherwise.
176    ///
177    /// This is more robust than `is_running()` as it verifies the node is actually alive.
178    pub async fn is_responsive(&self) -> bool {
179        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
180            .await
181            .is_ok()
182    }
183
184    pub(crate) fn set_is_running(&self, is_running: bool) {
185        self.is_running.store(is_running, Ordering::Release);
186    }
187
188    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
189        self.multiaddr = multiaddr.into();
190    }
191
192    pub fn name(&self) -> &str {
193        &self.name
194    }
195
196    pub fn args(&self) -> Vec<&str> {
197        self.inner.args()
198    }
199
200    pub fn spec(&self) -> &NodeSpec {
201        &self.spec
202    }
203
204    pub fn ws_uri(&self) -> &str {
205        &self.ws_uri
206    }
207
208    pub fn multiaddr(&self) -> &str {
209        self.multiaddr.as_ref()
210    }
211
212    // Subxt
213
214    /// Get the rpc client for the node
215    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
216        get_client_from_url(&self.ws_uri).await
217    }
218
219    /// Get the [online client](subxt::client::OnlineClient) for the node
220    #[deprecated = "Use `wait_client` instead."]
221    pub async fn client<Config: subxt::Config>(
222        &self,
223    ) -> Result<OnlineClient<Config>, subxt::Error> {
224        self.try_client().await
225    }
226
227    /// Try to connect to the node.
228    ///
229    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
230    /// the node to appear before it connects to it. This function directly tries
231    /// to connect to the node and returns an error if the node is not yet available
232    /// at that point in time.
233    ///
234    /// Returns a [`OnlineClient`] on success.
235    pub async fn try_client<Config: subxt::Config>(
236        &self,
237    ) -> Result<OnlineClient<Config>, subxt::Error> {
238        get_client_from_url(&self.ws_uri).await
239    }
240
241    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
242    pub async fn wait_client<Config: subxt::Config>(
243        &self,
244    ) -> Result<OnlineClient<Config>, anyhow::Error> {
245        debug!("wait_client ws_uri: {}", self.ws_uri());
246        wait_ws_ready(self.ws_uri())
247            .await
248            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
249
250        self.try_client()
251            .await
252            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
253    }
254
255    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
256    pub async fn wait_client_with_timeout<Config: subxt::Config>(
257        &self,
258        timeout_secs: impl Into<u64>,
259    ) -> Result<OnlineClient<Config>, anyhow::Error> {
260        debug!("waiting until subxt client is ready");
261        tokio::time::timeout(
262            Duration::from_secs(timeout_secs.into()),
263            self.wait_client::<Config>(),
264        )
265        .await?
266    }
267
268    // Commands
269
270    /// Pause the node, this is implemented by pausing the
271    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
272    ///
273    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
274    /// with global setting `teardown_on_failure` disabled.
275    pub async fn pause(&self) -> Result<(), anyhow::Error> {
276        self.set_is_running(false);
277        self.inner.pause().await?;
278        Ok(())
279    }
280
281    /// Resume the node, this is implemented by resuming the
282    /// actual process (e.g polkadot) with sending `SIGCONT` signal
283    ///
284    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
285    /// with global setting `teardown_on_failure` disabled.
286    pub async fn resume(&self) -> Result<(), anyhow::Error> {
287        self.set_is_running(true);
288        self.inner.resume().await?;
289        Ok(())
290    }
291
292    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
293    ///
294    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
295    /// with global setting `teardown_on_failure` disabled.
296    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
297        self.set_is_running(false);
298        self.inner.restart(after).await?;
299        self.set_is_running(true);
300        Ok(())
301    }
302
303    /// Run a script inside the node's container/environment
304    ///
305    /// The script will be uploaded to the node, made executable, and executed with
306    /// the provided arguments and environment variables.
307    ///
308    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
309    pub async fn run_script(
310        &self,
311        options: RunScriptOptions,
312    ) -> Result<ExecutionResult, anyhow::Error> {
313        self.inner
314            .run_script(options)
315            .await
316            .map_err(|e| anyhow!("Failed to run script: {e}"))
317    }
318
319    // Metrics assertions
320
321    /// Get metric value 'by name' from Prometheus (exposed by the node)
322    /// metric name can be:
323    /// with prefix (e.g: 'polkadot_')
324    /// with chain attribute (e.g: 'chain=rococo-local')
325    /// without prefix and/or without chain attribute
326    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
327        let metric_name = metric_name.into();
328        // force cache reload
329        self.fetch_metrics().await?;
330        // by default we treat not found as 0 (same in v1)
331        self.metric(&metric_name, true).await
332    }
333
334    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
335    /// metric name can be:
336    /// with prefix (e.g: 'polkadot_')
337    /// with chain attribute (e.g: 'chain=rococo-local')
338    /// without prefix and/or without chain attribute
339    ///
340    /// We first try to assert on the value using the cached metrics and
341    /// if not meet the criteria we reload the cache and check again
342    pub async fn assert(
343        &self,
344        metric_name: impl Into<String>,
345        value: impl Into<f64>,
346    ) -> Result<bool, anyhow::Error> {
347        let value: f64 = value.into();
348        self.assert_with(metric_name, |v| v == value).await
349    }
350
351    /// Assert on a metric value using a given predicate.
352    /// See [`NetworkNode::reports`] description for details on metric name.
353    pub async fn assert_with(
354        &self,
355        metric_name: impl Into<String>,
356        predicate: impl Fn(f64) -> bool,
357    ) -> Result<bool, anyhow::Error> {
358        let metric_name = metric_name.into();
359        // reload metrics
360        self.fetch_metrics().await?;
361        let val = self.metric(&metric_name, true).await?;
362        trace!("🔎 Current value {val} passed to the predicated?");
363        Ok(predicate(val))
364    }
365
366    // Wait methods for metrics
367
368    /// Wait until a metric value pass the `predicate`
369    pub async fn wait_metric(
370        &self,
371        metric_name: impl Into<String>,
372        predicate: impl Fn(f64) -> bool,
373    ) -> Result<(), anyhow::Error> {
374        let metric_name = metric_name.into();
375        trace!(
376            "[{}] waiting until metric {metric_name} pass the predicate",
377            self.name()
378        );
379        loop {
380            let res = self.assert_with(&metric_name, &predicate).await;
381            trace!("res: {res:?}");
382            match res {
383                Ok(res) => {
384                    if res {
385                        return Ok(());
386                    }
387                },
388                Err(e) => match e.downcast::<reqwest::Error>() {
389                    Ok(io_err) => {
390                        if !skip_err_while_waiting(&io_err) {
391                            return Err(io_err.into());
392                        }
393                    },
394                    Err(other) => {
395                        match other.downcast::<NetworkNodeError>() {
396                            Ok(node_err) => {
397                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
398                                    return Err(node_err.into());
399                                }
400                            },
401                            Err(other) => return Err(other),
402                        };
403                    },
404                },
405            }
406
407            // sleep to not spam prometheus
408            tokio::time::sleep(Duration::from_secs(1)).await;
409        }
410    }
411
412    /// Wait until a metric value pass the `predicate`
413    /// with a timeout (secs)
414    pub async fn wait_metric_with_timeout(
415        &self,
416        metric_name: impl Into<String>,
417        predicate: impl Fn(f64) -> bool,
418        timeout_secs: impl Into<u64>,
419    ) -> Result<(), anyhow::Error> {
420        let metric_name = metric_name.into();
421        let secs = timeout_secs.into();
422        let log_msg = format!(
423            "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
424            self.name()
425        );
426        if metric_name == PROCESS_START_TIME_METRIC {
427            trace!("{log_msg}");
428        } else {
429            debug!("{log_msg}");
430        }
431        let res = tokio::time::timeout(
432            Duration::from_secs(secs),
433            self.wait_metric(&metric_name, predicate),
434        )
435        .await;
436
437        if let Ok(inner_res) = res {
438            match inner_res {
439                Ok(_) => Ok(()),
440                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
441            }
442        } else {
443            // timeout
444            Err(anyhow!(
445                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
446            ))
447        }
448    }
449
450    // Logs
451
452    /// Get the logs of the node
453    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
454    pub async fn logs(&self) -> Result<String, anyhow::Error> {
455        Ok(self.inner.logs().await?)
456    }
457
458    /// Wait until a the number of matching log lines is reach
459    pub async fn wait_log_line_count(
460        &self,
461        pattern: impl Into<String>,
462        is_glob: bool,
463        count: usize,
464    ) -> Result<(), anyhow::Error> {
465        let pattern = pattern.into();
466        let pattern_clone = pattern.clone();
467        debug!("waiting until we find pattern {pattern} {count} times");
468        let match_fn: BoxedClosure = if is_glob {
469            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
470        } else {
471            let re = Regex::new(&pattern)?;
472            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
473        };
474
475        loop {
476            let mut q = 0_usize;
477            let logs = self.logs().await?;
478            for line in logs.lines() {
479                trace!("line is {line}");
480                if match_fn(line)? {
481                    trace!("pattern {pattern_clone} match in line {line}");
482                    q += 1;
483                    if q >= count {
484                        return Ok(());
485                    }
486                }
487            }
488
489            tokio::time::sleep(Duration::from_secs(2)).await;
490        }
491    }
492
493    /// Waits until the number of matching log lines satisfies a custom condition,
494    /// optionally waiting for the entire duration of the timeout.
495    ///
496    /// This method searches log lines for a given substring or glob pattern,
497    /// and evaluates the number of matching lines using a user-provided predicate function.
498    /// Optionally, it can wait for the full timeout duration to ensure the condition
499    /// holds consistently (e.g., for verifying absence of logs).
500    ///
501    /// # Arguments
502    /// * `substring` - The substring or pattern to match within log lines.
503    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
504    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
505    ///
506    /// # Returns
507    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
508    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
509    /// * `Err(e)` if an error occurred during log retrieval or matching.
510    ///
511    /// # Example
512    /// ```rust
513    /// # use std::{sync::Arc, time::Duration};
514    /// # use provider::NativeProvider;
515    /// # use support::{fs::local::LocalFileSystem};
516    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
517    /// # use configuration::NetworkConfig;
518    /// # async fn example() -> Result<(), anyhow::Error> {
519    /// #   let provider = NativeProvider::new(LocalFileSystem {});
520    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
521    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
522    /// #   let network = orchestrator.spawn(config).await?;
523    /// let node = network.get_node("alice")?;
524    /// // Wait (up to 10 seconds) until pattern occurs once
525    /// let options = LogLineCountOptions {
526    ///     predicate: Arc::new(|count| count == 1),
527    ///     timeout: Duration::from_secs(10),
528    ///     wait_until_timeout_elapses: false,
529    /// };
530    /// let result = node
531    ///     .wait_log_line_count_with_timeout("error", false, options)
532    ///     .await?;
533    /// #   Ok(())
534    /// # }
535    /// ```
536    pub async fn wait_log_line_count_with_timeout(
537        &self,
538        substring: impl Into<String>,
539        is_glob: bool,
540        options: LogLineCountOptions,
541    ) -> Result<LogLineCount, anyhow::Error> {
542        let substring = substring.into();
543        debug!(
544            "waiting until match lines count within {} seconds",
545            options.timeout.as_secs_f64()
546        );
547
548        let start = tokio::time::Instant::now();
549
550        let match_fn: BoxedClosure = if is_glob {
551            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
552        } else {
553            let re = Regex::new(&substring)?;
554            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
555        };
556
557        if options.wait_until_timeout_elapses {
558            tokio::time::sleep(options.timeout).await;
559        }
560
561        let mut q;
562        loop {
563            q = 0_u32;
564            let logs = self.logs().await?;
565            for line in logs.lines() {
566                if match_fn(line)? {
567                    q += 1;
568
569                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
570                    // end after the whole log file is processed. This is to address the cases when the
571                    // predicate becomes true and false again.
572                    // eg. expected exactly 2 matching lines are expected but 3 are present
573                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
574                        return Ok(LogLineCount::TargetReached(q));
575                    }
576                }
577            }
578
579            if start.elapsed() >= options.timeout {
580                break;
581            }
582
583            tokio::time::sleep(Duration::from_secs(2)).await;
584        }
585
586        if (options.predicate)(q) {
587            Ok(LogLineCount::TargetReached(q))
588        } else {
589            Ok(LogLineCount::TargetFailed(q))
590        }
591    }
592
593    /// Waits until the number of matching log lines satisfies a custom condition,
594    /// optionally waiting for the entire duration of the timeout.
595    ///
596    /// This method searches log lines for a given substring or glob pattern,
597    /// and evaluates the number of matching lines using a user-provided predicate function.
598    /// Optionally, it can wait for the full timeout duration to ensure the condition
599    /// holds consistently (e.g., for verifying absence of logs).
600    ///
601    /// # Arguments
602    /// * `substring` - The substring or pattern to match within log lines.
603    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
604    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
605    ///
606    /// # Returns
607    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
608    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
609    /// * `Err(e)` if an error occurred during log retrieval or matching.
610    ///
611    /// # Example
612    /// ```rust
613    /// # use std::{sync::Arc, time::Duration};
614    /// # use provider::NativeProvider;
615    /// # use support::{fs::local::LocalFileSystem};
616    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
617    /// # use configuration::NetworkConfig;
618    /// # async fn example() -> Result<(), anyhow::Error> {
619    /// #   let provider = NativeProvider::new(LocalFileSystem {});
620    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
621    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
622    /// #   let network = orchestrator.spawn(config).await?;
623    /// let node = network.get_node("alice")?;
624    /// // Wait (up to 10 seconds) until pattern occurs once
625    /// let options = LogLineCountOptions {
626    ///     predicate: Arc::new(|count| count == 1),
627    ///     timeout: Duration::from_secs(10),
628    ///     wait_until_timeout_elapses: false,
629    /// };
630    /// let result = node
631    ///     .wait_log_line_count_with_timeout("error", false, options)
632    ///     .await?;
633    /// #   Ok(())
634    /// # }
635    /// ```
636    pub async fn wait_event_count_with_timeout(
637        &self,
638        pallet: impl Into<String>,
639        variant: impl Into<String>,
640        options: CountOptions,
641    ) -> Result<WaitCount, anyhow::Error> {
642        let pallet = pallet.into();
643        let variant = variant.into();
644        debug!(
645            "waiting until match event ({pallet} {variant}) count within {} seconds",
646            options.timeout.as_secs_f64()
647        );
648
649        let init_value = Arc::new(AtomicU32::new(0));
650
651        let res = tokio::time::timeout(
652            options.timeout,
653            self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
654        )
655        .await;
656
657        let q = init_value.load(Ordering::Relaxed);
658        if let Ok(inner_res) = res {
659            match inner_res {
660                Ok(_) => Ok(WaitCount::TargetReached(q)),
661                Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
662            }
663        } else {
664            // timeout
665            if options.wait_until_timeout_elapses {
666                let q = init_value.load(Ordering::Relaxed);
667                if (options.predicate)(q) {
668                    Ok(LogLineCount::TargetReached(q))
669                } else {
670                    Ok(LogLineCount::TargetFailed(q))
671                }
672            } else {
673                Err(anyhow!(
674                    "Timeout ({}), waiting for counter",
675                    options.timeout.as_secs()
676                ))
677            }
678        }
679    }
680
681    //
682    async fn wait_event_count(
683        &self,
684        pallet: &str,
685        variant: &str,
686        options: &CountOptions,
687        init_count: Arc<AtomicU32>,
688    ) -> Result<(), anyhow::Error> {
689        let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
690        let mut blocks_sub: subxt::backend::StreamOf<
691            Result<
692                subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
693                subxt::Error,
694            >,
695        > = client.blocks().subscribe_finalized().await?;
696        while let Some(block) = blocks_sub.next().await {
697            let events = block?.events().await?;
698            for event in events.iter() {
699                let evt = event?;
700                if evt.pallet_name() == pallet && evt.variant_name() == variant {
701                    let old_value = init_count.fetch_add(1, Ordering::Relaxed);
702                    if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
703                        return Ok(());
704                    }
705                }
706            }
707        }
708
709        Ok(())
710    }
711
712    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
713        let response = reqwest::get(&self.prometheus_uri).await?;
714        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
715        let mut cache = self.metrics_cache.write().await;
716        *cache = metrics;
717        Ok(())
718    }
719
720    /// Query individual metric by name
721    async fn metric(
722        &self,
723        metric_name: &str,
724        treat_not_found_as_zero: bool,
725    ) -> Result<f64, anyhow::Error> {
726        let mut metrics_map = self.metrics_cache.read().await;
727        if metrics_map.is_empty() {
728            // reload metrics
729            drop(metrics_map);
730            self.fetch_metrics().await?;
731            metrics_map = self.metrics_cache.read().await;
732        }
733
734        if let Some(val) = metrics_map.get(metric_name) {
735            Ok(*val)
736        } else if treat_not_found_as_zero {
737            Ok(0_f64)
738        } else {
739            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
740        }
741    }
742
743    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
744    ///
745    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
746    /// and calculating the count of observations in each bucket. It automatically appends
747    /// `_bucket` suffix to the metric name if not already present.
748    ///
749    /// # Arguments
750    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
751    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
752    ///
753    /// # Returns
754    /// A HashMap where keys are the `le` bucket boundaries as strings,
755    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
756    ///
757    /// # Example
758    /// ```ignore
759    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
760    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
761    /// ```
762    pub async fn get_histogram_buckets(
763        &self,
764        metric_name: impl AsRef<str>,
765        label_filters: Option<HashMap<String, String>>,
766    ) -> Result<HashMap<String, u64>, anyhow::Error> {
767        let metric_name = metric_name.as_ref();
768
769        // Fetch and parse metrics using the existing parser
770        let response = reqwest::get(&self.prometheus_uri).await?;
771        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
772
773        // Ensure metric name has _bucket suffix
774        let resolved_metric_name = if metric_name.contains("_bucket") {
775            metric_name.to_string()
776        } else {
777            format!("{}_bucket", metric_name)
778        };
779
780        // First pass: collect all matching metrics with their label counts
781        // to identify which ones have the most complete label sets
782        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
783        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
784
785        for (key, &value) in metrics.iter() {
786            if !key.starts_with(&resolved_metric_name) {
787                continue;
788            }
789
790            let remaining = &key[resolved_metric_name.len()..];
791
792            let labels_str = &remaining[1..remaining.len() - 1];
793            let parsed_labels = Self::parse_label_string(labels_str);
794
795            // Must have "le" label
796            if !parsed_labels.contains_key("le") {
797                continue;
798            }
799
800            // Check if label filters match
801            if let Some(ref filters) = label_filters {
802                let mut all_match = true;
803                for (filter_key, filter_value) in filters {
804                    if parsed_labels.get(filter_key) != Some(filter_value) {
805                        all_match = false;
806                        break;
807                    }
808                }
809                if !all_match {
810                    continue;
811                }
812            }
813
814            metric_entries.push((key.clone(), parsed_labels, value as u64));
815        }
816
817        // Find the maximum number of labels (excluding "le") across all entries
818        // This helps us identify the "fullest" version of each metric
819        let max_label_count = metric_entries
820            .iter()
821            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
822            .max()
823            .unwrap_or(0);
824
825        // Second pass: collect buckets, deduplicating and preferring entries with more labels
826        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
827        let mut seen_le_values = HashSet::new();
828        let mut active_series: Option<Vec<(String, String)>> = None;
829
830        for (_, parsed_labels, value) in metric_entries {
831            let le_value = parsed_labels.get("le").unwrap().clone();
832
833            // Get non-"le" labels
834            let mut non_le_labels: Vec<(String, String)> = parsed_labels
835                .iter()
836                .filter(|(k, _)| k.as_str() != "le")
837                .map(|(k, v)| (k.clone(), v.clone()))
838                .collect();
839            non_le_labels.sort();
840
841            // Only process entries that have the maximum number of labels
842            // (this filters out the parser's duplicate keys with fewer labels)
843            if non_le_labels.len() < max_label_count {
844                continue;
845            }
846
847            // Detect series changes
848            if let Some(ref prev_series) = active_series {
849                if prev_series != &non_le_labels {
850                    if !raw_buckets.is_empty() {
851                        break; // Stop at first series change
852                    }
853                    active_series = Some(non_le_labels.clone());
854                    seen_le_values.clear();
855                }
856            } else {
857                active_series = Some(non_le_labels.clone());
858            }
859
860            // Deduplicate by le value within this series
861            if !seen_le_values.insert(le_value.clone()) {
862                continue;
863            }
864
865            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
866            raw_buckets.push((le_value, value));
867        }
868
869        // Sort buckets by their "le" values
870        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
871
872        // Calculate deltas between cumulative buckets
873        let mut buckets = HashMap::new();
874        let mut previous_value = 0_u64;
875        for (le, cumulative_count) in raw_buckets {
876            if cumulative_count < previous_value {
877                warn!(
878                    "Warning: bucket count decreased from {} to {} at le={}",
879                    previous_value, cumulative_count, le
880                );
881            }
882            let delta = cumulative_count.saturating_sub(previous_value);
883            buckets.insert(le, delta);
884            previous_value = cumulative_count;
885        }
886
887        Ok(buckets)
888    }
889
890    /// Parse label string from parsed metric key.
891    ///
892    /// Takes a label string in the format `key1="value1",key2="value2"`
893    /// and returns a HashMap of key-value pairs.
894    /// Handles commas inside quoted values correctly.
895    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
896        let mut labels = HashMap::new();
897        let mut current_key = String::new();
898        let mut current_value = String::new();
899        let mut in_value = false;
900        let mut in_quotes = false;
901
902        for ch in labels_str.chars() {
903            match ch {
904                '=' if !in_quotes && !in_value => {
905                    in_value = true;
906                },
907                '"' if in_value => {
908                    in_quotes = !in_quotes;
909                },
910                ',' if !in_quotes => {
911                    // End of key-value pair
912                    if !current_key.is_empty() {
913                        labels.insert(
914                            current_key.trim().to_string(),
915                            current_value.trim().to_string(),
916                        );
917                        current_key.clear();
918                        current_value.clear();
919                        in_value = false;
920                    }
921                },
922                _ => {
923                    if in_value {
924                        current_value.push(ch);
925                    } else {
926                        current_key.push(ch);
927                    }
928                },
929            }
930        }
931
932        // Insert last pair
933        if !current_key.is_empty() {
934            labels.insert(
935                current_key.trim().to_string(),
936                current_value.trim().to_string(),
937            );
938        }
939
940        labels
941    }
942
943    /// Compare two histogram bucket boundary values for sorting.
944    ///
945    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
946    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
947        use std::cmp::Ordering;
948
949        // Handle +Inf specially
950        match (a, b) {
951            ("+Inf", "+Inf") => Ordering::Equal,
952            ("+Inf", _) => Ordering::Greater,
953            (_, "+Inf") => Ordering::Less,
954            _ => {
955                // Try to parse as f64 for numeric comparison
956                match (a.parse::<f64>(), b.parse::<f64>()) {
957                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
958                    // Fallback to string comparison if parsing fails
959                    _ => a.cmp(b),
960                }
961            },
962        }
963    }
964
965    /// Waits given number of seconds until node reports that it is up and running, which
966    /// is determined by metric 'process_start_time_seconds', which should appear,
967    /// when node finished booting up.
968    ///
969    ///
970    /// # Arguments
971    /// * `timeout_secs` - The number of seconds to wait.
972    ///
973    /// # Returns
974    /// * `Ok()` if the node is up before timeout occured.
975    /// * `Err(e)` if timeout or other error occurred while waiting.
976    pub async fn wait_until_is_up(
977        &self,
978        timeout_secs: impl Into<u64>,
979    ) -> Result<(), anyhow::Error> {
980        self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
981            .await
982            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
983    }
984}
985
986impl std::fmt::Debug for NetworkNode {
987    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
988        f.debug_struct("NetworkNode")
989            .field("inner", &"inner_skipped")
990            .field("spec", &self.spec)
991            .field("name", &self.name)
992            .field("ws_uri", &self.ws_uri)
993            .field("prometheus_uri", &self.prometheus_uri)
994            .finish()
995    }
996}
997
998fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
999where
1000    S: Serializer,
1001{
1002    erased_serde::serialize(node.as_ref(), serializer)
1003}
1004
1005// TODO: mock and impl more unit tests
1006#[cfg(test)]
1007mod tests {
1008    use std::{
1009        path::{Path, PathBuf},
1010        sync::{Arc, Mutex},
1011    };
1012
1013    use async_trait::async_trait;
1014    use provider::{types::*, ProviderError, ProviderNode};
1015
1016    use super::*;
1017
1018    #[derive(Serialize)]
1019    struct MockNode {
1020        logs: Arc<Mutex<Vec<String>>>,
1021    }
1022
1023    impl MockNode {
1024        fn new() -> Self {
1025            Self {
1026                logs: Arc::new(Mutex::new(vec![])),
1027            }
1028        }
1029
1030        fn logs_push(&self, lines: Vec<impl Into<String>>) {
1031            self.logs
1032                .lock()
1033                .unwrap()
1034                .extend(lines.into_iter().map(|l| l.into()));
1035        }
1036    }
1037
1038    #[async_trait]
1039    impl ProviderNode for MockNode {
1040        fn name(&self) -> &str {
1041            todo!()
1042        }
1043
1044        fn args(&self) -> Vec<&str> {
1045            todo!()
1046        }
1047
1048        fn base_dir(&self) -> &PathBuf {
1049            todo!()
1050        }
1051
1052        fn config_dir(&self) -> &PathBuf {
1053            todo!()
1054        }
1055
1056        fn data_dir(&self) -> &PathBuf {
1057            todo!()
1058        }
1059
1060        fn relay_data_dir(&self) -> &PathBuf {
1061            todo!()
1062        }
1063
1064        fn scripts_dir(&self) -> &PathBuf {
1065            todo!()
1066        }
1067
1068        fn log_path(&self) -> &PathBuf {
1069            todo!()
1070        }
1071
1072        fn log_cmd(&self) -> String {
1073            todo!()
1074        }
1075
1076        fn path_in_node(&self, _file: &Path) -> PathBuf {
1077            todo!()
1078        }
1079
1080        async fn logs(&self) -> Result<String, ProviderError> {
1081            Ok(self.logs.lock().unwrap().join("\n"))
1082        }
1083
1084        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1085            todo!()
1086        }
1087
1088        async fn run_command(
1089            &self,
1090            _options: RunCommandOptions,
1091        ) -> Result<ExecutionResult, ProviderError> {
1092            todo!()
1093        }
1094
1095        async fn run_script(
1096            &self,
1097            _options: RunScriptOptions,
1098        ) -> Result<ExecutionResult, ProviderError> {
1099            todo!()
1100        }
1101
1102        async fn send_file(
1103            &self,
1104            _local_file_path: &Path,
1105            _remote_file_path: &Path,
1106            _mode: &str,
1107        ) -> Result<(), ProviderError> {
1108            todo!()
1109        }
1110
1111        async fn receive_file(
1112            &self,
1113            _remote_file_path: &Path,
1114            _local_file_path: &Path,
1115        ) -> Result<(), ProviderError> {
1116            todo!()
1117        }
1118
1119        async fn pause(&self) -> Result<(), ProviderError> {
1120            todo!()
1121        }
1122
1123        async fn resume(&self) -> Result<(), ProviderError> {
1124            todo!()
1125        }
1126
1127        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1128            todo!()
1129        }
1130
1131        async fn destroy(&self) -> Result<(), ProviderError> {
1132            todo!()
1133        }
1134    }
1135
1136    #[tokio::test(flavor = "multi_thread")]
1137    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1138        let mock_provider = Arc::new(MockNode::new());
1139        let mock_node = NetworkNode::new(
1140            "node1",
1141            "ws_uri",
1142            "prometheus_uri",
1143            "multiaddr",
1144            NodeSpec::default(),
1145            mock_provider.clone(),
1146        );
1147
1148        mock_provider.logs_push(vec![
1149            "system booting",
1150            "stub line 1",
1151            "stub line 2",
1152            "system ready",
1153        ]);
1154
1155        // Wait (up to 10 seconds) until pattern occurs once
1156        let options = LogLineCountOptions {
1157            predicate: Arc::new(|n| n == 1),
1158            timeout: Duration::from_secs(10),
1159            wait_until_timeout_elapses: false,
1160        };
1161
1162        let log_line_count = mock_node
1163            .wait_log_line_count_with_timeout("system ready", false, options)
1164            .await?;
1165
1166        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1167
1168        Ok(())
1169    }
1170
1171    #[tokio::test(flavor = "multi_thread")]
1172    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1173        let mock_provider = Arc::new(MockNode::new());
1174        let mock_node = NetworkNode::new(
1175            "node1",
1176            "ws_uri",
1177            "prometheus_uri",
1178            "multiaddr",
1179            NodeSpec::default(),
1180            mock_provider.clone(),
1181        );
1182
1183        mock_provider.logs_push(vec![
1184            "system booting",
1185            "stub line 1",
1186            "stub line 2",
1187            "system ready",
1188        ]);
1189
1190        // Wait (up to 4 seconds) until pattern occurs twice
1191        let options = LogLineCountOptions {
1192            predicate: Arc::new(|n| n == 2),
1193            timeout: Duration::from_secs(4),
1194            wait_until_timeout_elapses: false,
1195        };
1196
1197        let task = tokio::spawn({
1198            async move {
1199                mock_node
1200                    .wait_log_line_count_with_timeout("system ready", false, options)
1201                    .await
1202                    .unwrap()
1203            }
1204        });
1205
1206        tokio::time::sleep(Duration::from_secs(2)).await;
1207
1208        mock_provider.logs_push(vec!["system ready"]);
1209
1210        let log_line_count = task.await?;
1211
1212        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1213
1214        Ok(())
1215    }
1216
1217    #[tokio::test(flavor = "multi_thread")]
1218    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1219        let mock_provider = Arc::new(MockNode::new());
1220        let mock_node = NetworkNode::new(
1221            "node1",
1222            "ws_uri",
1223            "prometheus_uri",
1224            "multiaddr",
1225            NodeSpec::default(),
1226            mock_provider.clone(),
1227        );
1228
1229        mock_provider.logs_push(vec![
1230            "system booting",
1231            "stub line 1",
1232            "stub line 2",
1233            "system ready",
1234        ]);
1235
1236        // Wait (up to 2 seconds) until pattern occurs twice
1237        let options = LogLineCountOptions {
1238            predicate: Arc::new(|n| n == 2),
1239            timeout: Duration::from_secs(2),
1240            wait_until_timeout_elapses: false,
1241        };
1242
1243        let log_line_count = mock_node
1244            .wait_log_line_count_with_timeout("system ready", false, options)
1245            .await?;
1246
1247        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1248
1249        Ok(())
1250    }
1251
1252    #[tokio::test(flavor = "multi_thread")]
1253    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1254        let mock_provider = Arc::new(MockNode::new());
1255        let mock_node = NetworkNode::new(
1256            "node1",
1257            "ws_uri",
1258            "prometheus_uri",
1259            "multiaddr",
1260            NodeSpec::default(),
1261            mock_provider.clone(),
1262        );
1263
1264        mock_provider.logs_push(vec![
1265            "system booting",
1266            "stub line 1",
1267            "stub line 2",
1268            "system ready",
1269        ]);
1270
1271        // Wait until timeout and check if pattern occurs exactly twice
1272        let options = LogLineCountOptions {
1273            predicate: Arc::new(|n| n == 2),
1274            timeout: Duration::from_secs(2),
1275            wait_until_timeout_elapses: true,
1276        };
1277
1278        let task = tokio::spawn({
1279            async move {
1280                mock_node
1281                    .wait_log_line_count_with_timeout("system ready", false, options)
1282                    .await
1283                    .unwrap()
1284            }
1285        });
1286
1287        tokio::time::sleep(Duration::from_secs(1)).await;
1288
1289        mock_provider.logs_push(vec!["system ready"]);
1290        mock_provider.logs_push(vec!["system ready"]);
1291
1292        let log_line_count = task.await?;
1293
1294        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1295
1296        Ok(())
1297    }
1298
1299    #[tokio::test(flavor = "multi_thread")]
1300    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1301        let mock_provider = Arc::new(MockNode::new());
1302        let mock_node = NetworkNode::new(
1303            "node1",
1304            "ws_uri",
1305            "prometheus_uri",
1306            "multiaddr",
1307            NodeSpec::default(),
1308            mock_provider.clone(),
1309        );
1310
1311        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1312
1313        let task = tokio::spawn({
1314            async move {
1315                mock_node
1316                    .wait_log_line_count_with_timeout(
1317                        "system ready",
1318                        false,
1319                        // Wait until timeout and make sure pattern occurred zero times
1320                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1321                    )
1322                    .await
1323                    .unwrap()
1324            }
1325        });
1326
1327        tokio::time::sleep(Duration::from_secs(1)).await;
1328
1329        mock_provider.logs_push(vec!["stub line 3"]);
1330
1331        assert!(task.await?.success());
1332
1333        Ok(())
1334    }
1335
1336    #[tokio::test(flavor = "multi_thread")]
1337    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1338        let mock_provider = Arc::new(MockNode::new());
1339        let mock_node = NetworkNode::new(
1340            "node1",
1341            "ws_uri",
1342            "prometheus_uri",
1343            "multiaddr",
1344            NodeSpec::default(),
1345            mock_provider.clone(),
1346        );
1347
1348        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1349
1350        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1351        let options = LogLineCountOptions {
1352            predicate: Arc::new(|n| (2..=5).contains(&n)),
1353            timeout: Duration::from_secs(2),
1354            wait_until_timeout_elapses: true,
1355        };
1356
1357        let task = tokio::spawn({
1358            async move {
1359                mock_node
1360                    .wait_log_line_count_with_timeout("system ready", false, options)
1361                    .await
1362                    .unwrap()
1363            }
1364        });
1365
1366        tokio::time::sleep(Duration::from_secs(1)).await;
1367
1368        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1369
1370        assert!(task.await?.success());
1371
1372        Ok(())
1373    }
1374
1375    #[tokio::test(flavor = "multi_thread")]
1376    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1377        let mock_provider = Arc::new(MockNode::new());
1378        let mock_node = NetworkNode::new(
1379            "node1",
1380            "ws_uri",
1381            "prometheus_uri",
1382            "multiaddr",
1383            NodeSpec::default(),
1384            mock_provider.clone(),
1385        );
1386
1387        mock_provider.logs_push(vec![
1388            "system booting",
1389            "stub line 1",
1390            // this line should not match
1391            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1392            "stub line 2"
1393        ]);
1394
1395        let options = LogLineCountOptions {
1396            predicate: Arc::new(|n| n == 1),
1397            timeout: Duration::from_secs(3),
1398            wait_until_timeout_elapses: true,
1399        };
1400
1401        let task = tokio::spawn({
1402            async move {
1403                mock_node
1404                    .wait_log_line_count_with_timeout(
1405                        "error(?! importing block .*: block has an unknown parent)",
1406                        false,
1407                        options,
1408                    )
1409                    .await
1410                    .unwrap()
1411            }
1412        });
1413
1414        tokio::time::sleep(Duration::from_secs(1)).await;
1415
1416        mock_provider.logs_push(vec![
1417            "system ready",
1418            // this line should match
1419            "system error",
1420            "system ready",
1421        ]);
1422
1423        assert!(task.await?.success());
1424
1425        Ok(())
1426    }
1427
1428    #[tokio::test(flavor = "multi_thread")]
1429    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1430    ) -> Result<(), anyhow::Error> {
1431        let mock_provider = Arc::new(MockNode::new());
1432        let mock_node = NetworkNode::new(
1433            "node1",
1434            "ws_uri",
1435            "prometheus_uri",
1436            "multiaddr",
1437            NodeSpec::default(),
1438            mock_provider.clone(),
1439        );
1440
1441        mock_provider.logs_push(vec![
1442            "system booting",
1443            "stub line 1",
1444            // this line should not match
1445            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1446            "stub line 2"
1447        ]);
1448
1449        let options = LogLineCountOptions {
1450            predicate: Arc::new(|n| n == 1),
1451            timeout: Duration::from_secs(6),
1452            wait_until_timeout_elapses: true,
1453        };
1454
1455        let task = tokio::spawn({
1456            async move {
1457                mock_node
1458                    .wait_log_line_count_with_timeout(
1459                        "error(?! importing block .*: block has an unknown parent)",
1460                        false,
1461                        options,
1462                    )
1463                    .await
1464                    .unwrap()
1465            }
1466        });
1467
1468        tokio::time::sleep(Duration::from_secs(1)).await;
1469
1470        mock_provider.logs_push(vec!["system ready", "system ready"]);
1471
1472        assert!(!task.await?.success());
1473
1474        Ok(())
1475    }
1476
1477    #[tokio::test(flavor = "multi_thread")]
1478    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1479        let mock_provider = Arc::new(MockNode::new());
1480        let mock_node = NetworkNode::new(
1481            "node1",
1482            "ws_uri",
1483            "prometheus_uri",
1484            "multiaddr",
1485            NodeSpec::default(),
1486            mock_provider.clone(),
1487        );
1488
1489        mock_provider.logs_push(vec![
1490            "system booting",
1491            "stub line 1",
1492            // this line should not match
1493            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1494            "stub line 2"
1495        ]);
1496
1497        let task = tokio::spawn({
1498            async move {
1499                mock_node
1500                    .wait_log_line_count(
1501                        "error(?! importing block .*: block has an unknown parent)",
1502                        false,
1503                        1,
1504                    )
1505                    .await
1506                    .unwrap()
1507            }
1508        });
1509
1510        tokio::time::sleep(Duration::from_secs(1)).await;
1511
1512        mock_provider.logs_push(vec![
1513            "system ready",
1514            // this line should match
1515            "system error",
1516            "system ready",
1517        ]);
1518
1519        assert!(task.await.is_ok());
1520
1521        Ok(())
1522    }
1523
1524    #[tokio::test(flavor = "multi_thread")]
1525    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1526        let mock_provider = Arc::new(MockNode::new());
1527        let mock_node = NetworkNode::new(
1528            "node1",
1529            "ws_uri",
1530            "prometheus_uri",
1531            "multiaddr",
1532            NodeSpec::default(),
1533            mock_provider.clone(),
1534        );
1535
1536        mock_provider.logs_push(vec![
1537            "system booting",
1538            "stub line 1",
1539            // this line should not match
1540            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1541            "stub line 2"
1542        ]);
1543
1544        let options = LogLineCountOptions {
1545            predicate: Arc::new(|count| count == 1),
1546            timeout: Duration::from_secs(2),
1547            wait_until_timeout_elapses: true,
1548        };
1549
1550        let task = tokio::spawn({
1551            async move {
1552                // we expect no match, thus wait with timeout
1553                mock_node
1554                    .wait_log_line_count_with_timeout(
1555                        "error(?! importing block .*: block has an unknown parent)",
1556                        false,
1557                        options,
1558                    )
1559                    .await
1560                    .unwrap()
1561            }
1562        });
1563
1564        tokio::time::sleep(Duration::from_secs(1)).await;
1565
1566        mock_provider.logs_push(vec!["system ready", "system ready"]);
1567
1568        assert!(!task.await?.success());
1569
1570        Ok(())
1571    }
1572
1573    #[tokio::test]
1574    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1575        // This test uses a mock HTTP server to simulate Prometheus metrics
1576        use std::sync::Arc;
1577
1578        // Create a mock metrics response with proper HELP and TYPE comments
1579        let mock_metrics = concat!(
1580            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1581            "# TYPE substrate_block_verification_time histogram\n",
1582            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1583            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1584            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1585            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1586            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1587            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1588            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1589        );
1590
1591        // Start a mock HTTP server
1592        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1593        let addr = listener.local_addr()?;
1594        let metrics = Arc::new(mock_metrics.to_string());
1595
1596        tokio::spawn({
1597            let metrics = metrics.clone();
1598            async move {
1599                loop {
1600                    if let Ok((mut socket, _)) = listener.accept().await {
1601                        let metrics = metrics.clone();
1602                        tokio::spawn(async move {
1603                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1604                            let mut buffer = [0; 1024];
1605                            let _ = socket.read(&mut buffer).await;
1606
1607                            let response = format!(
1608                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1609                                metrics.len(),
1610                                metrics
1611                            );
1612                            let _ = socket.write_all(response.as_bytes()).await;
1613                        });
1614                    }
1615                }
1616            }
1617        });
1618
1619        // Create a NetworkNode with the mock prometheus URI
1620        let mock_provider = Arc::new(MockNode::new());
1621        let mock_node = NetworkNode::new(
1622            "test_node",
1623            "ws://localhost:9944",
1624            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1625            "/ip4/127.0.0.1/tcp/30333",
1626            NodeSpec::default(),
1627            mock_provider,
1628        );
1629
1630        // Get buckets with label filter
1631        let mut label_filters = HashMap::new();
1632        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1633        let buckets = mock_node
1634            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1635            .await?;
1636
1637        // Should get the rococo_local_testnet chain's buckets
1638        assert_eq!(buckets.get("0.1"), Some(&10));
1639        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1640        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1641        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1642        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1643
1644        // Get buckets with label filter for rococo
1645        let mut label_filters = std::collections::HashMap::new();
1646        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1647
1648        let buckets_filtered = mock_node
1649            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1650            .await?;
1651
1652        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1653        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1654
1655        // Test 3: Get buckets with _bucket suffix already present
1656        let buckets_with_suffix = mock_node
1657            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1658            .await?;
1659
1660        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1661
1662        Ok(())
1663    }
1664
1665    #[tokio::test]
1666    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1667        // Test that buckets are correctly sorted even when received out of order
1668        use std::sync::Arc;
1669
1670        let mock_metrics = concat!(
1671            "# HELP test_metric A test metric\n",
1672            "# TYPE test_metric histogram\n",
1673            "test_metric_bucket{le=\"2.5\"} 40\n",
1674            "test_metric_bucket{le=\"0.1\"} 10\n",
1675            "test_metric_bucket{le=\"+Inf\"} 42\n",
1676            "test_metric_bucket{le=\"1.0\"} 35\n",
1677            "test_metric_bucket{le=\"0.5\"} 25\n",
1678        );
1679
1680        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1681        let addr = listener.local_addr()?;
1682        let metrics = Arc::new(mock_metrics.to_string());
1683
1684        tokio::spawn({
1685            let metrics = metrics.clone();
1686            async move {
1687                loop {
1688                    if let Ok((mut socket, _)) = listener.accept().await {
1689                        let metrics = metrics.clone();
1690                        tokio::spawn(async move {
1691                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1692                            let mut buffer = [0; 1024];
1693                            let _ = socket.read(&mut buffer).await;
1694                            let response = format!(
1695                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1696                                metrics.len(),
1697                                metrics
1698                            );
1699                            let _ = socket.write_all(response.as_bytes()).await;
1700                        });
1701                    }
1702                }
1703            }
1704        });
1705
1706        let mock_provider = Arc::new(MockNode::new());
1707        let mock_node = NetworkNode::new(
1708            "test_node",
1709            "ws://localhost:9944",
1710            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1711            "/ip4/127.0.0.1/tcp/30333",
1712            NodeSpec::default(),
1713            mock_provider,
1714        );
1715
1716        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1717
1718        // Verify deltas are calculated correctly after sorting
1719        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1720        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1721        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1722        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1723        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1724
1725        Ok(())
1726    }
1727
1728    #[tokio::test]
1729    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1730        // Test label parsing with commas and special characters in values
1731        use std::sync::Arc;
1732
1733        let mock_metrics = concat!(
1734            "# HELP test_metric A test metric\n",
1735            "# TYPE test_metric histogram\n",
1736            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1737            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1738            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1739        );
1740
1741        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1742        let addr = listener.local_addr()?;
1743        let metrics = Arc::new(mock_metrics.to_string());
1744
1745        tokio::spawn({
1746            let metrics = metrics.clone();
1747            async move {
1748                loop {
1749                    if let Ok((mut socket, _)) = listener.accept().await {
1750                        let metrics = metrics.clone();
1751                        tokio::spawn(async move {
1752                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1753                            let mut buffer = [0; 1024];
1754                            let _ = socket.read(&mut buffer).await;
1755                            let response = format!(
1756                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1757                                metrics.len(),
1758                                metrics
1759                            );
1760                            let _ = socket.write_all(response.as_bytes()).await;
1761                        });
1762                    }
1763                }
1764            }
1765        });
1766
1767        let mock_provider = Arc::new(MockNode::new());
1768        let mock_node = NetworkNode::new(
1769            "test_node",
1770            "ws://localhost:9944",
1771            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1772            "/ip4/127.0.0.1/tcp/30333",
1773            NodeSpec::default(),
1774            mock_provider,
1775        );
1776
1777        // Test without filter
1778        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1779        assert_eq!(buckets.get("0.1"), Some(&5));
1780        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1781        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1782
1783        // Test with filter containing comma in value
1784        let mut label_filters = std::collections::HashMap::new();
1785        label_filters.insert("method".to_string(), "GET,POST".to_string());
1786
1787        let buckets_filtered = mock_node
1788            .get_histogram_buckets("test_metric", Some(label_filters))
1789            .await?;
1790
1791        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1792        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1793
1794        Ok(())
1795    }
1796
1797    #[test]
1798    fn test_compare_le_values() {
1799        use std::cmp::Ordering;
1800
1801        use crate::network::node::NetworkNode;
1802
1803        // Numeric comparison
1804        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1805        assert_eq!(
1806            NetworkNode::compare_le_values("1.0", "0.5"),
1807            Ordering::Greater
1808        );
1809        assert_eq!(
1810            NetworkNode::compare_le_values("1.0", "1.0"),
1811            Ordering::Equal
1812        );
1813
1814        // +Inf handling
1815        assert_eq!(
1816            NetworkNode::compare_le_values("+Inf", "999"),
1817            Ordering::Greater
1818        );
1819        assert_eq!(
1820            NetworkNode::compare_le_values("0.1", "+Inf"),
1821            Ordering::Less
1822        );
1823        assert_eq!(
1824            NetworkNode::compare_le_values("+Inf", "+Inf"),
1825            Ordering::Equal
1826        );
1827
1828        // Large numbers
1829        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1830        assert_eq!(
1831            NetworkNode::compare_le_values("1000", "999"),
1832            Ordering::Greater
1833        );
1834    }
1835}