zombienet_orchestrator/network/
node.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use fancy_regex::Regex;
5use glob_match::glob_match;
6use prom_metrics_parser::MetricMap;
7use provider::DynNode;
8use serde::Serialize;
9use subxt::{backend::rpc::RpcClient, OnlineClient};
10use support::net::{skip_err_while_waiting, wait_ws_ready};
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::{debug, trace};
14
15#[cfg(feature = "pjs")]
16use crate::pjs_helper::{pjs_build_template, pjs_exec, PjsResult, ReturnValue};
17use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
18
19type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
20
21#[derive(Error, Debug)]
22pub enum NetworkNodeError {
23    #[error("metric '{0}' not found!")]
24    MetricNotFound(String),
25}
26
27#[derive(Clone, Serialize)]
28pub struct NetworkNode {
29    #[serde(skip)]
30    pub(crate) inner: DynNode,
31    // TODO: do we need the full spec here?
32    // Maybe a reduce set of values.
33    pub(crate) spec: NodeSpec,
34    pub(crate) name: String,
35    pub(crate) ws_uri: String,
36    pub(crate) multiaddr: String,
37    pub(crate) prometheus_uri: String,
38    #[serde(skip)]
39    metrics_cache: Arc<RwLock<MetricMap>>,
40}
41
42/// Result of waiting for a certain number of log lines to appear.
43///
44/// Indicates whether the log line count condition was met within the timeout period.
45///
46/// # Variants
47/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
48///     * `count`: The number of matching log lines at the time of satisfaction.
49/// - `TargetFailed(count)` – The condition was not met within the timeout.
50///     * `count`: The final number of matching log lines at timeout expiration.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum LogLineCount {
53    TargetReached(u32),
54    TargetFailed(u32),
55}
56
57impl LogLineCount {
58    pub fn success(&self) -> bool {
59        match self {
60            Self::TargetReached(..) => true,
61            Self::TargetFailed(..) => false,
62        }
63    }
64}
65
66/// Configuration for controlling log line count waiting behavior.
67///
68/// Allows specifying a custom predicate on the number of matching log lines,
69/// a timeout in seconds, and whether the system should wait the entire timeout duration.
70///
71/// # Fields
72/// - `predicate`: A function that takes the current number of matching lines and
73///   returns `true` if the condition is satisfied.
74/// - `timeout_secs`: Maximum number of seconds to wait.
75/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
76///   for the full timeout duration, even if the condition is already met early.
77///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
78#[derive(Clone)]
79pub struct LogLineCountOptions {
80    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
81    pub timeout: Duration,
82    pub wait_until_timeout_elapses: bool,
83}
84
85impl LogLineCountOptions {
86    pub fn new(
87        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
88        timeout: Duration,
89        wait_until_timeout_elapses: bool,
90    ) -> Self {
91        Self {
92            predicate: Arc::new(predicate),
93            timeout,
94            wait_until_timeout_elapses,
95        }
96    }
97
98    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
99        Self::new(|n| n == 0, timeout, true)
100    }
101}
102
103// #[derive(Clone, Debug)]
104// pub struct QueryMetricOptions {
105//     use_cache: bool,
106//     treat_not_found_as_zero: bool,
107// }
108
109// impl Default for QueryMetricOptions {
110//     fn default() -> Self {
111//         Self { use_cache: false, treat_not_found_as_zero: true }
112//     }
113// }
114
115impl NetworkNode {
116    /// Create a new NetworkNode
117    pub(crate) fn new<T: Into<String>>(
118        name: T,
119        ws_uri: T,
120        prometheus_uri: T,
121        multiaddr: T,
122        spec: NodeSpec,
123        inner: DynNode,
124    ) -> Self {
125        Self {
126            name: name.into(),
127            ws_uri: ws_uri.into(),
128            prometheus_uri: prometheus_uri.into(),
129            inner,
130            spec,
131            multiaddr: multiaddr.into(),
132            metrics_cache: Arc::new(Default::default()),
133        }
134    }
135
136    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
137        self.multiaddr = multiaddr.into();
138    }
139
140    pub fn name(&self) -> &str {
141        &self.name
142    }
143
144    pub fn args(&self) -> Vec<&str> {
145        self.inner.args()
146    }
147
148    pub fn spec(&self) -> &NodeSpec {
149        &self.spec
150    }
151
152    pub fn ws_uri(&self) -> &str {
153        &self.ws_uri
154    }
155
156    pub fn multiaddr(&self) -> &str {
157        self.multiaddr.as_ref()
158    }
159
160    // Subxt
161
162    /// Get the rpc client for the node
163    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
164        get_client_from_url(&self.ws_uri).await
165    }
166
167    /// Get the [online client](subxt::client::OnlineClient) for the node
168    #[deprecated = "Use `wait_client` instead."]
169    pub async fn client<Config: subxt::Config>(
170        &self,
171    ) -> Result<OnlineClient<Config>, subxt::Error> {
172        self.try_client().await
173    }
174
175    /// Try to connect to the node.
176    ///
177    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
178    /// the node to appear before it connects to it. This function directly tries
179    /// to connect to the node and returns an error if the node is not yet available
180    /// at that point in time.
181    ///
182    /// Returns a [`OnlineClient`] on success.
183    pub async fn try_client<Config: subxt::Config>(
184        &self,
185    ) -> Result<OnlineClient<Config>, subxt::Error> {
186        get_client_from_url(&self.ws_uri).await
187    }
188
189    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
190    pub async fn wait_client<Config: subxt::Config>(
191        &self,
192    ) -> Result<OnlineClient<Config>, anyhow::Error> {
193        debug!("wait_client ws_uri: {}", self.ws_uri());
194        wait_ws_ready(self.ws_uri())
195            .await
196            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
197
198        self.try_client()
199            .await
200            .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
201    }
202
203    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
204    pub async fn wait_client_with_timeout<Config: subxt::Config>(
205        &self,
206        timeout_secs: impl Into<u64>,
207    ) -> Result<OnlineClient<Config>, anyhow::Error> {
208        debug!("waiting until subxt client is ready");
209        tokio::time::timeout(
210            Duration::from_secs(timeout_secs.into()),
211            self.wait_client::<Config>(),
212        )
213        .await?
214    }
215
216    // Commands
217
218    /// Pause the node, this is implemented by pausing the
219    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
220    pub async fn pause(&self) -> Result<(), anyhow::Error> {
221        self.inner.pause().await?;
222        Ok(())
223    }
224
225    /// Resume the node, this is implemented by resuming the
226    /// actual process (e.g polkadot) with sending `SIGCONT` signal
227    pub async fn resume(&self) -> Result<(), anyhow::Error> {
228        self.inner.resume().await?;
229        Ok(())
230    }
231
232    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
233    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
234        self.inner.restart(after).await?;
235        Ok(())
236    }
237
238    // Metrics assertions
239
240    /// Get metric value 'by name' from Prometheus (exposed by the node)
241    /// metric name can be:
242    /// with prefix (e.g: 'polkadot_')
243    /// with chain attribute (e.g: 'chain=rococo-local')
244    /// without prefix and/or without chain attribute
245    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
246        let metric_name = metric_name.into();
247        // force cache reload
248        self.fetch_metrics().await?;
249        // by default we treat not found as 0 (same in v1)
250        self.metric(&metric_name, true).await
251    }
252
253    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
254    /// metric name can be:
255    /// with prefix (e.g: 'polkadot_')
256    /// with chain attribute (e.g: 'chain=rococo-local')
257    /// without prefix and/or without chain attribute
258    ///
259    /// We first try to assert on the value using the cached metrics and
260    /// if not meet the criteria we reload the cache and check again
261    pub async fn assert(
262        &self,
263        metric_name: impl Into<String>,
264        value: impl Into<f64>,
265    ) -> Result<bool, anyhow::Error> {
266        let value: f64 = value.into();
267        self.assert_with(metric_name, |v| v == value).await
268    }
269
270    /// Assert on a metric value using a given predicate.
271    /// See [`NetworkNode::reports`] description for details on metric name.
272    pub async fn assert_with(
273        &self,
274        metric_name: impl Into<String>,
275        predicate: impl Fn(f64) -> bool,
276    ) -> Result<bool, anyhow::Error> {
277        let metric_name = metric_name.into();
278        // reload metrics
279        self.fetch_metrics().await?;
280        let val = self.metric(&metric_name, true).await?;
281        trace!("🔎 Current value {val} passed to the predicated?");
282        Ok(predicate(val))
283    }
284
285    // Wait methods for metrics
286
287    /// Wait until a metric value pass the `predicate`
288    pub async fn wait_metric(
289        &self,
290        metric_name: impl Into<String>,
291        predicate: impl Fn(f64) -> bool,
292    ) -> Result<(), anyhow::Error> {
293        let metric_name = metric_name.into();
294        debug!("waiting until metric {metric_name} pass the predicate");
295        loop {
296            let res = self.assert_with(&metric_name, &predicate).await;
297            match res {
298                Ok(res) => {
299                    if res {
300                        return Ok(());
301                    }
302                },
303                Err(e) => match e.downcast::<reqwest::Error>() {
304                    Ok(io_err) => {
305                        if !skip_err_while_waiting(&io_err) {
306                            return Err(io_err.into());
307                        }
308                    },
309                    Err(other) => {
310                        match other.downcast::<NetworkNodeError>() {
311                            Ok(node_err) => {
312                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
313                                    return Err(node_err.into());
314                                }
315                            },
316                            Err(other) => return Err(other),
317                        };
318                    },
319                },
320            }
321
322            // sleep to not spam prometheus
323            tokio::time::sleep(Duration::from_secs(1)).await;
324        }
325    }
326
327    /// Wait until a metric value pass the `predicate`
328    /// with a timeout (secs)
329    pub async fn wait_metric_with_timeout(
330        &self,
331        metric_name: impl Into<String>,
332        predicate: impl Fn(f64) -> bool,
333        timeout_secs: impl Into<u64>,
334    ) -> Result<(), anyhow::Error> {
335        let metric_name = metric_name.into();
336        let secs = timeout_secs.into();
337        debug!("waiting until metric {metric_name} pass the predicate");
338        let res = tokio::time::timeout(
339            Duration::from_secs(secs),
340            self.wait_metric(&metric_name, predicate),
341        )
342        .await;
343
344        if let Ok(inner_res) = res {
345            match inner_res {
346                Ok(_) => Ok(()),
347                Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
348            }
349        } else {
350            // timeout
351            Err(anyhow!(
352                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
353            ))
354        }
355    }
356
357    // Logs
358
359    /// Get the logs of the node
360    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
361    pub async fn logs(&self) -> Result<String, anyhow::Error> {
362        Ok(self.inner.logs().await?)
363    }
364
365    /// Wait until a the number of matching log lines is reach
366    pub async fn wait_log_line_count(
367        &self,
368        pattern: impl Into<String>,
369        is_glob: bool,
370        count: usize,
371    ) -> Result<(), anyhow::Error> {
372        let pattern = pattern.into();
373        let pattern_clone = pattern.clone();
374        debug!("waiting until we find pattern {pattern} {count} times");
375        let match_fn: BoxedClosure = if is_glob {
376            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
377        } else {
378            let re = Regex::new(&pattern)?;
379            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
380        };
381
382        loop {
383            let mut q = 0_usize;
384            let logs = self.logs().await?;
385            for line in logs.lines() {
386                trace!("line is {line}");
387                if match_fn(line)? {
388                    trace!("pattern {pattern_clone} match in line {line}");
389                    q += 1;
390                    if q >= count {
391                        return Ok(());
392                    }
393                }
394            }
395
396            tokio::time::sleep(Duration::from_secs(2)).await;
397        }
398    }
399
400    /// Waits until the number of matching log lines satisfies a custom condition,
401    /// optionally waiting for the entire duration of the timeout.
402    ///
403    /// This method searches log lines for a given substring or glob pattern,
404    /// and evaluates the number of matching lines using a user-provided predicate function.
405    /// Optionally, it can wait for the full timeout duration to ensure the condition
406    /// holds consistently (e.g., for verifying absence of logs).
407    ///
408    /// # Arguments
409    /// * `substring` - The substring or pattern to match within log lines.
410    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
411    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
412    ///
413    /// # Returns
414    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
415    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
416    /// * `Err(e)` if an error occurred during log retrieval or matching.
417    ///
418    /// # Example
419    /// ```rust
420    /// # use std::{sync::Arc, time::Duration};
421    /// # use provider::NativeProvider;
422    /// # use support::{fs::local::LocalFileSystem};
423    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
424    /// # use configuration::NetworkConfig;
425    /// # async fn example() -> Result<(), anyhow::Error> {
426    /// #   let provider = NativeProvider::new(LocalFileSystem {});
427    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
428    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
429    /// #   let network = orchestrator.spawn(config).await?;
430    /// let node = network.get_node("alice")?;
431    /// // Wait (up to 10 seconds) until pattern occurs once
432    /// let options = LogLineCountOptions {
433    ///     predicate: Arc::new(|count| count == 1),
434    ///     timeout: Duration::from_secs(10),
435    ///     wait_until_timeout_elapses: false,
436    /// };
437    /// let result = node
438    ///     .wait_log_line_count_with_timeout("error", false, options)
439    ///     .await?;
440    /// #   Ok(())
441    /// # }
442    /// ```
443    pub async fn wait_log_line_count_with_timeout(
444        &self,
445        substring: impl Into<String>,
446        is_glob: bool,
447        options: LogLineCountOptions,
448    ) -> Result<LogLineCount, anyhow::Error> {
449        let substring = substring.into();
450        debug!(
451            "waiting until match lines count within {} seconds",
452            options.timeout.as_secs_f64()
453        );
454
455        let start = tokio::time::Instant::now();
456
457        let match_fn: BoxedClosure = if is_glob {
458            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
459        } else {
460            let re = Regex::new(&substring)?;
461            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
462        };
463
464        if options.wait_until_timeout_elapses {
465            tokio::time::sleep(options.timeout).await;
466        }
467
468        let mut q;
469        loop {
470            q = 0_u32;
471            let logs = self.logs().await?;
472            for line in logs.lines() {
473                if match_fn(line)? {
474                    q += 1;
475
476                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
477                    // end after the whole log file is processed. This is to address the cases when the
478                    // predicate becomes true and false again.
479                    // eg. expected exactly 2 matching lines are expected but 3 are present
480                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
481                        return Ok(LogLineCount::TargetReached(q));
482                    }
483                }
484            }
485
486            if start.elapsed() >= options.timeout {
487                break;
488            }
489
490            tokio::time::sleep(Duration::from_secs(2)).await;
491        }
492
493        if (options.predicate)(q) {
494            Ok(LogLineCount::TargetReached(q))
495        } else {
496            Ok(LogLineCount::TargetFailed(q))
497        }
498    }
499
500    // TODO: impl
501    // wait_event_count
502    // wait_event_count_with_timeout
503
504    #[cfg(feature = "pjs")]
505    /// Execute js/ts code inside [pjs_rs] custom runtime.
506    ///
507    /// The code will be run in a wrapper similar to the `javascript` developer tab
508    /// of polkadot.js apps. The returning value is represented as [PjsResult] enum, to allow
509    /// to communicate that the execution was successful but the returning value can be deserialized as [serde_json::Value].
510    pub async fn pjs(
511        &self,
512        code: impl AsRef<str>,
513        args: Vec<serde_json::Value>,
514        user_types: Option<serde_json::Value>,
515    ) -> Result<PjsResult, anyhow::Error> {
516        let code = pjs_build_template(self.ws_uri(), code.as_ref(), args, user_types);
517        tracing::trace!("Code to execute: {code}");
518        let value = match pjs_exec(code)? {
519            ReturnValue::Deserialized(val) => Ok(val),
520            ReturnValue::CantDeserialize(msg) => Err(msg),
521        };
522
523        Ok(value)
524    }
525
526    #[cfg(feature = "pjs")]
527    /// Execute js/ts file  inside [pjs_rs] custom runtime.
528    ///
529    /// The content of the file will be run in a wrapper similar to the `javascript` developer tab
530    /// of polkadot.js apps. The returning value is represented as [PjsResult] enum, to allow
531    /// to communicate that the execution was successful but the returning value can be deserialized as [serde_json::Value].
532    pub async fn pjs_file(
533        &self,
534        file: impl AsRef<std::path::Path>,
535        args: Vec<serde_json::Value>,
536        user_types: Option<serde_json::Value>,
537    ) -> Result<PjsResult, anyhow::Error> {
538        let content = std::fs::read_to_string(file)?;
539        self.pjs(content, args, user_types).await
540    }
541
542    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
543        let response = reqwest::get(&self.prometheus_uri).await?;
544        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
545        let mut cache = self.metrics_cache.write().await;
546        *cache = metrics;
547        Ok(())
548    }
549
550    /// Query individual metric by name
551    async fn metric(
552        &self,
553        metric_name: &str,
554        treat_not_found_as_zero: bool,
555    ) -> Result<f64, anyhow::Error> {
556        let mut metrics_map = self.metrics_cache.read().await;
557        if metrics_map.is_empty() {
558            // reload metrics
559            drop(metrics_map);
560            self.fetch_metrics().await?;
561            metrics_map = self.metrics_cache.read().await;
562        }
563
564        if let Some(val) = metrics_map.get(metric_name) {
565            Ok(*val)
566        } else if treat_not_found_as_zero {
567            Ok(0_f64)
568        } else {
569            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
570        }
571    }
572
573    /// Waits given number of seconds until node reports that it is up and running, which
574    /// is determined by metric 'process_start_time_seconds', which should appear,
575    /// when node finished booting up.
576    ///
577    ///
578    /// # Arguments
579    /// * `timeout_secs` - The number of seconds to wait.
580    ///
581    /// # Returns
582    /// * `Ok()` if the node is up before timeout occured.
583    /// * `Err(e)` if timeout or other error occurred while waiting.
584    pub async fn wait_until_is_up(
585        &self,
586        timeout_secs: impl Into<u64>,
587    ) -> Result<(), anyhow::Error> {
588        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
589            .await
590            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
591    }
592}
593
594impl std::fmt::Debug for NetworkNode {
595    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596        f.debug_struct("NetworkNode")
597            .field("inner", &"inner_skipped")
598            .field("spec", &self.spec)
599            .field("name", &self.name)
600            .field("ws_uri", &self.ws_uri)
601            .field("prometheus_uri", &self.prometheus_uri)
602            .finish()
603    }
604}
605
606// TODO: mock and impl more unit tests
607#[cfg(test)]
608mod tests {
609    use std::{
610        path::{Path, PathBuf},
611        sync::{Arc, Mutex},
612    };
613
614    use async_trait::async_trait;
615    use provider::{types::*, ProviderError, ProviderNode};
616
617    use super::*;
618
619    struct MockNode {
620        logs: Arc<Mutex<Vec<String>>>,
621    }
622
623    impl MockNode {
624        fn new() -> Self {
625            Self {
626                logs: Arc::new(Mutex::new(vec![])),
627            }
628        }
629
630        fn logs_push(&self, lines: Vec<impl Into<String>>) {
631            self.logs
632                .lock()
633                .unwrap()
634                .extend(lines.into_iter().map(|l| l.into()));
635        }
636    }
637
638    #[async_trait]
639    impl ProviderNode for MockNode {
640        fn name(&self) -> &str {
641            todo!()
642        }
643
644        fn args(&self) -> Vec<&str> {
645            todo!()
646        }
647
648        fn base_dir(&self) -> &PathBuf {
649            todo!()
650        }
651
652        fn config_dir(&self) -> &PathBuf {
653            todo!()
654        }
655
656        fn data_dir(&self) -> &PathBuf {
657            todo!()
658        }
659
660        fn relay_data_dir(&self) -> &PathBuf {
661            todo!()
662        }
663
664        fn scripts_dir(&self) -> &PathBuf {
665            todo!()
666        }
667
668        fn log_path(&self) -> &PathBuf {
669            todo!()
670        }
671
672        fn log_cmd(&self) -> String {
673            todo!()
674        }
675
676        fn path_in_node(&self, _file: &Path) -> PathBuf {
677            todo!()
678        }
679
680        async fn logs(&self) -> Result<String, ProviderError> {
681            Ok(self.logs.lock().unwrap().join("\n"))
682        }
683
684        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
685            todo!()
686        }
687
688        async fn run_command(
689            &self,
690            _options: RunCommandOptions,
691        ) -> Result<ExecutionResult, ProviderError> {
692            todo!()
693        }
694
695        async fn run_script(
696            &self,
697            _options: RunScriptOptions,
698        ) -> Result<ExecutionResult, ProviderError> {
699            todo!()
700        }
701
702        async fn send_file(
703            &self,
704            _local_file_path: &Path,
705            _remote_file_path: &Path,
706            _mode: &str,
707        ) -> Result<(), ProviderError> {
708            todo!()
709        }
710
711        async fn receive_file(
712            &self,
713            _remote_file_path: &Path,
714            _local_file_path: &Path,
715        ) -> Result<(), ProviderError> {
716            todo!()
717        }
718
719        async fn pause(&self) -> Result<(), ProviderError> {
720            todo!()
721        }
722
723        async fn resume(&self) -> Result<(), ProviderError> {
724            todo!()
725        }
726
727        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
728            todo!()
729        }
730
731        async fn destroy(&self) -> Result<(), ProviderError> {
732            todo!()
733        }
734    }
735
736    #[tokio::test(flavor = "multi_thread")]
737    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
738        let mock_provider = Arc::new(MockNode::new());
739        let mock_node = NetworkNode::new(
740            "node1",
741            "ws_uri",
742            "prometheus_uri",
743            "multiaddr",
744            NodeSpec::default(),
745            mock_provider.clone(),
746        );
747
748        mock_provider.logs_push(vec![
749            "system booting",
750            "stub line 1",
751            "stub line 2",
752            "system ready",
753        ]);
754
755        // Wait (up to 10 seconds) until pattern occurs once
756        let options = LogLineCountOptions {
757            predicate: Arc::new(|n| n == 1),
758            timeout: Duration::from_secs(10),
759            wait_until_timeout_elapses: false,
760        };
761
762        let log_line_count = mock_node
763            .wait_log_line_count_with_timeout("system ready", false, options)
764            .await?;
765
766        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
767
768        Ok(())
769    }
770
771    #[tokio::test(flavor = "multi_thread")]
772    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
773        let mock_provider = Arc::new(MockNode::new());
774        let mock_node = NetworkNode::new(
775            "node1",
776            "ws_uri",
777            "prometheus_uri",
778            "multiaddr",
779            NodeSpec::default(),
780            mock_provider.clone(),
781        );
782
783        mock_provider.logs_push(vec![
784            "system booting",
785            "stub line 1",
786            "stub line 2",
787            "system ready",
788        ]);
789
790        // Wait (up to 4 seconds) until pattern occurs twice
791        let options = LogLineCountOptions {
792            predicate: Arc::new(|n| n == 2),
793            timeout: Duration::from_secs(4),
794            wait_until_timeout_elapses: false,
795        };
796
797        let task = tokio::spawn({
798            async move {
799                mock_node
800                    .wait_log_line_count_with_timeout("system ready", false, options)
801                    .await
802                    .unwrap()
803            }
804        });
805
806        tokio::time::sleep(Duration::from_secs(2)).await;
807
808        mock_provider.logs_push(vec!["system ready"]);
809
810        let log_line_count = task.await?;
811
812        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
813
814        Ok(())
815    }
816
817    #[tokio::test(flavor = "multi_thread")]
818    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
819        let mock_provider = Arc::new(MockNode::new());
820        let mock_node = NetworkNode::new(
821            "node1",
822            "ws_uri",
823            "prometheus_uri",
824            "multiaddr",
825            NodeSpec::default(),
826            mock_provider.clone(),
827        );
828
829        mock_provider.logs_push(vec![
830            "system booting",
831            "stub line 1",
832            "stub line 2",
833            "system ready",
834        ]);
835
836        // Wait (up to 2 seconds) until pattern occurs twice
837        let options = LogLineCountOptions {
838            predicate: Arc::new(|n| n == 2),
839            timeout: Duration::from_secs(2),
840            wait_until_timeout_elapses: false,
841        };
842
843        let log_line_count = mock_node
844            .wait_log_line_count_with_timeout("system ready", false, options)
845            .await?;
846
847        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
848
849        Ok(())
850    }
851
852    #[tokio::test(flavor = "multi_thread")]
853    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
854        let mock_provider = Arc::new(MockNode::new());
855        let mock_node = NetworkNode::new(
856            "node1",
857            "ws_uri",
858            "prometheus_uri",
859            "multiaddr",
860            NodeSpec::default(),
861            mock_provider.clone(),
862        );
863
864        mock_provider.logs_push(vec![
865            "system booting",
866            "stub line 1",
867            "stub line 2",
868            "system ready",
869        ]);
870
871        // Wait until timeout and check if pattern occurs exactly twice
872        let options = LogLineCountOptions {
873            predicate: Arc::new(|n| n == 2),
874            timeout: Duration::from_secs(2),
875            wait_until_timeout_elapses: true,
876        };
877
878        let task = tokio::spawn({
879            async move {
880                mock_node
881                    .wait_log_line_count_with_timeout("system ready", false, options)
882                    .await
883                    .unwrap()
884            }
885        });
886
887        tokio::time::sleep(Duration::from_secs(1)).await;
888
889        mock_provider.logs_push(vec!["system ready"]);
890        mock_provider.logs_push(vec!["system ready"]);
891
892        let log_line_count = task.await?;
893
894        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
895
896        Ok(())
897    }
898
899    #[tokio::test(flavor = "multi_thread")]
900    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
901        let mock_provider = Arc::new(MockNode::new());
902        let mock_node = NetworkNode::new(
903            "node1",
904            "ws_uri",
905            "prometheus_uri",
906            "multiaddr",
907            NodeSpec::default(),
908            mock_provider.clone(),
909        );
910
911        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
912
913        let task = tokio::spawn({
914            async move {
915                mock_node
916                    .wait_log_line_count_with_timeout(
917                        "system ready",
918                        false,
919                        // Wait until timeout and make sure pattern occurred zero times
920                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
921                    )
922                    .await
923                    .unwrap()
924            }
925        });
926
927        tokio::time::sleep(Duration::from_secs(1)).await;
928
929        mock_provider.logs_push(vec!["stub line 3"]);
930
931        assert!(task.await?.success());
932
933        Ok(())
934    }
935
936    #[tokio::test(flavor = "multi_thread")]
937    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
938        let mock_provider = Arc::new(MockNode::new());
939        let mock_node = NetworkNode::new(
940            "node1",
941            "ws_uri",
942            "prometheus_uri",
943            "multiaddr",
944            NodeSpec::default(),
945            mock_provider.clone(),
946        );
947
948        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
949
950        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
951        let options = LogLineCountOptions {
952            predicate: Arc::new(|n| (2..=5).contains(&n)),
953            timeout: Duration::from_secs(2),
954            wait_until_timeout_elapses: true,
955        };
956
957        let task = tokio::spawn({
958            async move {
959                mock_node
960                    .wait_log_line_count_with_timeout("system ready", false, options)
961                    .await
962                    .unwrap()
963            }
964        });
965
966        tokio::time::sleep(Duration::from_secs(1)).await;
967
968        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
969
970        assert!(task.await?.success());
971
972        Ok(())
973    }
974
975    #[tokio::test(flavor = "multi_thread")]
976    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
977        let mock_provider = Arc::new(MockNode::new());
978        let mock_node = NetworkNode::new(
979            "node1",
980            "ws_uri",
981            "prometheus_uri",
982            "multiaddr",
983            NodeSpec::default(),
984            mock_provider.clone(),
985        );
986
987        mock_provider.logs_push(vec![
988            "system booting",
989            "stub line 1",
990            // this line should not match
991            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
992            "stub line 2"
993        ]);
994
995        let options = LogLineCountOptions {
996            predicate: Arc::new(|n| n == 1),
997            timeout: Duration::from_secs(3),
998            wait_until_timeout_elapses: true,
999        };
1000
1001        let task = tokio::spawn({
1002            async move {
1003                mock_node
1004                    .wait_log_line_count_with_timeout(
1005                        "error(?! importing block .*: block has an unknown parent)",
1006                        false,
1007                        options,
1008                    )
1009                    .await
1010                    .unwrap()
1011            }
1012        });
1013
1014        tokio::time::sleep(Duration::from_secs(1)).await;
1015
1016        mock_provider.logs_push(vec![
1017            "system ready",
1018            // this line should match
1019            "system error",
1020            "system ready",
1021        ]);
1022
1023        assert!(task.await?.success());
1024
1025        Ok(())
1026    }
1027
1028    #[tokio::test(flavor = "multi_thread")]
1029    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1030    ) -> Result<(), anyhow::Error> {
1031        let mock_provider = Arc::new(MockNode::new());
1032        let mock_node = NetworkNode::new(
1033            "node1",
1034            "ws_uri",
1035            "prometheus_uri",
1036            "multiaddr",
1037            NodeSpec::default(),
1038            mock_provider.clone(),
1039        );
1040
1041        mock_provider.logs_push(vec![
1042            "system booting",
1043            "stub line 1",
1044            // this line should not match
1045            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1046            "stub line 2"
1047        ]);
1048
1049        let options = LogLineCountOptions {
1050            predicate: Arc::new(|n| n == 1),
1051            timeout: Duration::from_secs(6),
1052            wait_until_timeout_elapses: true,
1053        };
1054
1055        let task = tokio::spawn({
1056            async move {
1057                mock_node
1058                    .wait_log_line_count_with_timeout(
1059                        "error(?! importing block .*: block has an unknown parent)",
1060                        false,
1061                        options,
1062                    )
1063                    .await
1064                    .unwrap()
1065            }
1066        });
1067
1068        tokio::time::sleep(Duration::from_secs(1)).await;
1069
1070        mock_provider.logs_push(vec!["system ready", "system ready"]);
1071
1072        assert!(!task.await?.success());
1073
1074        Ok(())
1075    }
1076
1077    #[tokio::test(flavor = "multi_thread")]
1078    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1079        let mock_provider = Arc::new(MockNode::new());
1080        let mock_node = NetworkNode::new(
1081            "node1",
1082            "ws_uri",
1083            "prometheus_uri",
1084            "multiaddr",
1085            NodeSpec::default(),
1086            mock_provider.clone(),
1087        );
1088
1089        mock_provider.logs_push(vec![
1090            "system booting",
1091            "stub line 1",
1092            // this line should not match
1093            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1094            "stub line 2"
1095        ]);
1096
1097        let task = tokio::spawn({
1098            async move {
1099                mock_node
1100                    .wait_log_line_count(
1101                        "error(?! importing block .*: block has an unknown parent)",
1102                        false,
1103                        1,
1104                    )
1105                    .await
1106                    .unwrap()
1107            }
1108        });
1109
1110        tokio::time::sleep(Duration::from_secs(1)).await;
1111
1112        mock_provider.logs_push(vec![
1113            "system ready",
1114            // this line should match
1115            "system error",
1116            "system ready",
1117        ]);
1118
1119        assert!(task.await.is_ok());
1120
1121        Ok(())
1122    }
1123
1124    #[tokio::test(flavor = "multi_thread")]
1125    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1126        let mock_provider = Arc::new(MockNode::new());
1127        let mock_node = NetworkNode::new(
1128            "node1",
1129            "ws_uri",
1130            "prometheus_uri",
1131            "multiaddr",
1132            NodeSpec::default(),
1133            mock_provider.clone(),
1134        );
1135
1136        mock_provider.logs_push(vec![
1137            "system booting",
1138            "stub line 1",
1139            // this line should not match
1140            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1141            "stub line 2"
1142        ]);
1143
1144        let options = LogLineCountOptions {
1145            predicate: Arc::new(|count| count == 1),
1146            timeout: Duration::from_secs(2),
1147            wait_until_timeout_elapses: true,
1148        };
1149
1150        let task = tokio::spawn({
1151            async move {
1152                // we expect no match, thus wait with timeout
1153                mock_node
1154                    .wait_log_line_count_with_timeout(
1155                        "error(?! importing block .*: block has an unknown parent)",
1156                        false,
1157                        options,
1158                    )
1159                    .await
1160                    .unwrap()
1161            }
1162        });
1163
1164        tokio::time::sleep(Duration::from_secs(1)).await;
1165
1166        mock_provider.logs_push(vec!["system ready", "system ready"]);
1167
1168        assert!(!task.await?.success());
1169
1170        Ok(())
1171    }
1172}