zombienet_orchestrator/network/
node.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::Serialize;
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27    #[error("metric '{0}' not found!")]
28    MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33    #[serde(skip)]
34    pub(crate) inner: DynNode,
35    // TODO: do we need the full spec here?
36    // Maybe a reduce set of values.
37    pub(crate) spec: NodeSpec,
38    pub(crate) name: String,
39    pub(crate) ws_uri: String,
40    pub(crate) multiaddr: String,
41    pub(crate) prometheus_uri: String,
42    #[serde(skip)]
43    metrics_cache: Arc<RwLock<MetricMap>>,
44    #[serde(skip)]
45    is_running: Arc<AtomicBool>,
46}
47
48/// Result of waiting for a certain number of log lines to appear.
49///
50/// Indicates whether the log line count condition was met within the timeout period.
51///
52/// # Variants
53/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
54///     * `count`: The number of matching log lines at the time of satisfaction.
55/// - `TargetFailed(count)` – The condition was not met within the timeout.
56///     * `count`: The final number of matching log lines at timeout expiration.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum LogLineCount {
59    TargetReached(u32),
60    TargetFailed(u32),
61}
62
63impl LogLineCount {
64    pub fn success(&self) -> bool {
65        match self {
66            Self::TargetReached(..) => true,
67            Self::TargetFailed(..) => false,
68        }
69    }
70}
71
72/// Configuration for controlling log line count waiting behavior.
73///
74/// Allows specifying a custom predicate on the number of matching log lines,
75/// a timeout in seconds, and whether the system should wait the entire timeout duration.
76///
77/// # Fields
78/// - `predicate`: A function that takes the current number of matching lines and
79///   returns `true` if the condition is satisfied.
80/// - `timeout_secs`: Maximum number of seconds to wait.
81/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
82///   for the full timeout duration, even if the condition is already met early.
83///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
84#[derive(Clone)]
85pub struct LogLineCountOptions {
86    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
87    pub timeout: Duration,
88    pub wait_until_timeout_elapses: bool,
89}
90
91impl LogLineCountOptions {
92    pub fn new(
93        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
94        timeout: Duration,
95        wait_until_timeout_elapses: bool,
96    ) -> Self {
97        Self {
98            predicate: Arc::new(predicate),
99            timeout,
100            wait_until_timeout_elapses,
101        }
102    }
103
104    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
105        Self::new(|n| n == 0, timeout, true)
106    }
107}
108
109// #[derive(Clone, Debug)]
110// pub struct QueryMetricOptions {
111//     use_cache: bool,
112//     treat_not_found_as_zero: bool,
113// }
114
115// impl Default for QueryMetricOptions {
116//     fn default() -> Self {
117//         Self { use_cache: false, treat_not_found_as_zero: true }
118//     }
119// }
120
121impl NetworkNode {
122    /// Create a new NetworkNode
123    pub(crate) fn new<T: Into<String>>(
124        name: T,
125        ws_uri: T,
126        prometheus_uri: T,
127        multiaddr: T,
128        spec: NodeSpec,
129        inner: DynNode,
130    ) -> Self {
131        Self {
132            name: name.into(),
133            ws_uri: ws_uri.into(),
134            prometheus_uri: prometheus_uri.into(),
135            inner,
136            spec,
137            multiaddr: multiaddr.into(),
138            metrics_cache: Arc::new(Default::default()),
139            is_running: Arc::new(AtomicBool::new(false)),
140        }
141    }
142
143    pub(crate) fn is_running(&self) -> bool {
144        self.is_running.load(Ordering::Acquire)
145    }
146
147    pub(crate) fn set_is_running(&self, is_running: bool) {
148        self.is_running.store(is_running, Ordering::Release);
149    }
150
151    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
152        self.multiaddr = multiaddr.into();
153    }
154
155    pub fn name(&self) -> &str {
156        &self.name
157    }
158
159    pub fn args(&self) -> Vec<&str> {
160        self.inner.args()
161    }
162
163    pub fn spec(&self) -> &NodeSpec {
164        &self.spec
165    }
166
167    pub fn ws_uri(&self) -> &str {
168        &self.ws_uri
169    }
170
171    pub fn multiaddr(&self) -> &str {
172        self.multiaddr.as_ref()
173    }
174
175    // Subxt
176
177    /// Get the rpc client for the node
178    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
179        get_client_from_url(&self.ws_uri).await
180    }
181
182    /// Get the [online client](subxt::client::OnlineClient) for the node
183    #[deprecated = "Use `wait_client` instead."]
184    pub async fn client<Config: subxt::Config>(
185        &self,
186    ) -> Result<OnlineClient<Config>, subxt::Error> {
187        self.try_client().await
188    }
189
190    /// Try to connect to the node.
191    ///
192    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
193    /// the node to appear before it connects to it. This function directly tries
194    /// to connect to the node and returns an error if the node is not yet available
195    /// at that point in time.
196    ///
197    /// Returns a [`OnlineClient`] on success.
198    pub async fn try_client<Config: subxt::Config>(
199        &self,
200    ) -> Result<OnlineClient<Config>, subxt::Error> {
201        get_client_from_url(&self.ws_uri).await
202    }
203
204    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
205    pub async fn wait_client<Config: subxt::Config>(
206        &self,
207    ) -> Result<OnlineClient<Config>, anyhow::Error> {
208        debug!("wait_client ws_uri: {}", self.ws_uri());
209        wait_ws_ready(self.ws_uri())
210            .await
211            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
212
213        self.try_client()
214            .await
215            .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
216    }
217
218    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
219    pub async fn wait_client_with_timeout<Config: subxt::Config>(
220        &self,
221        timeout_secs: impl Into<u64>,
222    ) -> Result<OnlineClient<Config>, anyhow::Error> {
223        debug!("waiting until subxt client is ready");
224        tokio::time::timeout(
225            Duration::from_secs(timeout_secs.into()),
226            self.wait_client::<Config>(),
227        )
228        .await?
229    }
230
231    // Commands
232
233    /// Pause the node, this is implemented by pausing the
234    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
235    pub async fn pause(&self) -> Result<(), anyhow::Error> {
236        self.set_is_running(false);
237        self.inner.pause().await?;
238        Ok(())
239    }
240
241    /// Resume the node, this is implemented by resuming the
242    /// actual process (e.g polkadot) with sending `SIGCONT` signal
243    pub async fn resume(&self) -> Result<(), anyhow::Error> {
244        self.set_is_running(true);
245        self.inner.resume().await?;
246        Ok(())
247    }
248
249    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
250    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
251        self.set_is_running(false);
252        self.inner.restart(after).await?;
253        self.set_is_running(true);
254        Ok(())
255    }
256
257    // Metrics assertions
258
259    /// Get metric value 'by name' from Prometheus (exposed by the node)
260    /// metric name can be:
261    /// with prefix (e.g: 'polkadot_')
262    /// with chain attribute (e.g: 'chain=rococo-local')
263    /// without prefix and/or without chain attribute
264    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
265        let metric_name = metric_name.into();
266        // force cache reload
267        self.fetch_metrics().await?;
268        // by default we treat not found as 0 (same in v1)
269        self.metric(&metric_name, true).await
270    }
271
272    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
273    /// metric name can be:
274    /// with prefix (e.g: 'polkadot_')
275    /// with chain attribute (e.g: 'chain=rococo-local')
276    /// without prefix and/or without chain attribute
277    ///
278    /// We first try to assert on the value using the cached metrics and
279    /// if not meet the criteria we reload the cache and check again
280    pub async fn assert(
281        &self,
282        metric_name: impl Into<String>,
283        value: impl Into<f64>,
284    ) -> Result<bool, anyhow::Error> {
285        let value: f64 = value.into();
286        self.assert_with(metric_name, |v| v == value).await
287    }
288
289    /// Assert on a metric value using a given predicate.
290    /// See [`NetworkNode::reports`] description for details on metric name.
291    pub async fn assert_with(
292        &self,
293        metric_name: impl Into<String>,
294        predicate: impl Fn(f64) -> bool,
295    ) -> Result<bool, anyhow::Error> {
296        let metric_name = metric_name.into();
297        // reload metrics
298        self.fetch_metrics().await?;
299        let val = self.metric(&metric_name, true).await?;
300        trace!("🔎 Current value {val} passed to the predicated?");
301        Ok(predicate(val))
302    }
303
304    // Wait methods for metrics
305
306    /// Wait until a metric value pass the `predicate`
307    pub async fn wait_metric(
308        &self,
309        metric_name: impl Into<String>,
310        predicate: impl Fn(f64) -> bool,
311    ) -> Result<(), anyhow::Error> {
312        let metric_name = metric_name.into();
313        debug!("waiting until metric {metric_name} pass the predicate");
314        loop {
315            let res = self.assert_with(&metric_name, &predicate).await;
316            match res {
317                Ok(res) => {
318                    if res {
319                        return Ok(());
320                    }
321                },
322                Err(e) => match e.downcast::<reqwest::Error>() {
323                    Ok(io_err) => {
324                        if !skip_err_while_waiting(&io_err) {
325                            return Err(io_err.into());
326                        }
327                    },
328                    Err(other) => {
329                        match other.downcast::<NetworkNodeError>() {
330                            Ok(node_err) => {
331                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
332                                    return Err(node_err.into());
333                                }
334                            },
335                            Err(other) => return Err(other),
336                        };
337                    },
338                },
339            }
340
341            // sleep to not spam prometheus
342            tokio::time::sleep(Duration::from_secs(1)).await;
343        }
344    }
345
346    /// Wait until a metric value pass the `predicate`
347    /// with a timeout (secs)
348    pub async fn wait_metric_with_timeout(
349        &self,
350        metric_name: impl Into<String>,
351        predicate: impl Fn(f64) -> bool,
352        timeout_secs: impl Into<u64>,
353    ) -> Result<(), anyhow::Error> {
354        let metric_name = metric_name.into();
355        let secs = timeout_secs.into();
356        debug!("waiting until metric {metric_name} pass the predicate");
357        let res = tokio::time::timeout(
358            Duration::from_secs(secs),
359            self.wait_metric(&metric_name, predicate),
360        )
361        .await;
362
363        if let Ok(inner_res) = res {
364            match inner_res {
365                Ok(_) => Ok(()),
366                Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
367            }
368        } else {
369            // timeout
370            Err(anyhow!(
371                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
372            ))
373        }
374    }
375
376    // Logs
377
378    /// Get the logs of the node
379    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
380    pub async fn logs(&self) -> Result<String, anyhow::Error> {
381        Ok(self.inner.logs().await?)
382    }
383
384    /// Wait until a the number of matching log lines is reach
385    pub async fn wait_log_line_count(
386        &self,
387        pattern: impl Into<String>,
388        is_glob: bool,
389        count: usize,
390    ) -> Result<(), anyhow::Error> {
391        let pattern = pattern.into();
392        let pattern_clone = pattern.clone();
393        debug!("waiting until we find pattern {pattern} {count} times");
394        let match_fn: BoxedClosure = if is_glob {
395            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
396        } else {
397            let re = Regex::new(&pattern)?;
398            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
399        };
400
401        loop {
402            let mut q = 0_usize;
403            let logs = self.logs().await?;
404            for line in logs.lines() {
405                trace!("line is {line}");
406                if match_fn(line)? {
407                    trace!("pattern {pattern_clone} match in line {line}");
408                    q += 1;
409                    if q >= count {
410                        return Ok(());
411                    }
412                }
413            }
414
415            tokio::time::sleep(Duration::from_secs(2)).await;
416        }
417    }
418
419    /// Waits until the number of matching log lines satisfies a custom condition,
420    /// optionally waiting for the entire duration of the timeout.
421    ///
422    /// This method searches log lines for a given substring or glob pattern,
423    /// and evaluates the number of matching lines using a user-provided predicate function.
424    /// Optionally, it can wait for the full timeout duration to ensure the condition
425    /// holds consistently (e.g., for verifying absence of logs).
426    ///
427    /// # Arguments
428    /// * `substring` - The substring or pattern to match within log lines.
429    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
430    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
431    ///
432    /// # Returns
433    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
434    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
435    /// * `Err(e)` if an error occurred during log retrieval or matching.
436    ///
437    /// # Example
438    /// ```rust
439    /// # use std::{sync::Arc, time::Duration};
440    /// # use provider::NativeProvider;
441    /// # use support::{fs::local::LocalFileSystem};
442    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
443    /// # use configuration::NetworkConfig;
444    /// # async fn example() -> Result<(), anyhow::Error> {
445    /// #   let provider = NativeProvider::new(LocalFileSystem {});
446    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
447    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
448    /// #   let network = orchestrator.spawn(config).await?;
449    /// let node = network.get_node("alice")?;
450    /// // Wait (up to 10 seconds) until pattern occurs once
451    /// let options = LogLineCountOptions {
452    ///     predicate: Arc::new(|count| count == 1),
453    ///     timeout: Duration::from_secs(10),
454    ///     wait_until_timeout_elapses: false,
455    /// };
456    /// let result = node
457    ///     .wait_log_line_count_with_timeout("error", false, options)
458    ///     .await?;
459    /// #   Ok(())
460    /// # }
461    /// ```
462    pub async fn wait_log_line_count_with_timeout(
463        &self,
464        substring: impl Into<String>,
465        is_glob: bool,
466        options: LogLineCountOptions,
467    ) -> Result<LogLineCount, anyhow::Error> {
468        let substring = substring.into();
469        debug!(
470            "waiting until match lines count within {} seconds",
471            options.timeout.as_secs_f64()
472        );
473
474        let start = tokio::time::Instant::now();
475
476        let match_fn: BoxedClosure = if is_glob {
477            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
478        } else {
479            let re = Regex::new(&substring)?;
480            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
481        };
482
483        if options.wait_until_timeout_elapses {
484            tokio::time::sleep(options.timeout).await;
485        }
486
487        let mut q;
488        loop {
489            q = 0_u32;
490            let logs = self.logs().await?;
491            for line in logs.lines() {
492                if match_fn(line)? {
493                    q += 1;
494
495                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
496                    // end after the whole log file is processed. This is to address the cases when the
497                    // predicate becomes true and false again.
498                    // eg. expected exactly 2 matching lines are expected but 3 are present
499                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
500                        return Ok(LogLineCount::TargetReached(q));
501                    }
502                }
503            }
504
505            if start.elapsed() >= options.timeout {
506                break;
507            }
508
509            tokio::time::sleep(Duration::from_secs(2)).await;
510        }
511
512        if (options.predicate)(q) {
513            Ok(LogLineCount::TargetReached(q))
514        } else {
515            Ok(LogLineCount::TargetFailed(q))
516        }
517    }
518
519    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
520        let response = reqwest::get(&self.prometheus_uri).await?;
521        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
522        let mut cache = self.metrics_cache.write().await;
523        *cache = metrics;
524        Ok(())
525    }
526
527    /// Query individual metric by name
528    async fn metric(
529        &self,
530        metric_name: &str,
531        treat_not_found_as_zero: bool,
532    ) -> Result<f64, anyhow::Error> {
533        let mut metrics_map = self.metrics_cache.read().await;
534        if metrics_map.is_empty() {
535            // reload metrics
536            drop(metrics_map);
537            self.fetch_metrics().await?;
538            metrics_map = self.metrics_cache.read().await;
539        }
540
541        if let Some(val) = metrics_map.get(metric_name) {
542            Ok(*val)
543        } else if treat_not_found_as_zero {
544            Ok(0_f64)
545        } else {
546            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
547        }
548    }
549
550    /// Waits given number of seconds until node reports that it is up and running, which
551    /// is determined by metric 'process_start_time_seconds', which should appear,
552    /// when node finished booting up.
553    ///
554    ///
555    /// # Arguments
556    /// * `timeout_secs` - The number of seconds to wait.
557    ///
558    /// # Returns
559    /// * `Ok()` if the node is up before timeout occured.
560    /// * `Err(e)` if timeout or other error occurred while waiting.
561    pub async fn wait_until_is_up(
562        &self,
563        timeout_secs: impl Into<u64>,
564    ) -> Result<(), anyhow::Error> {
565        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
566            .await
567            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
568    }
569}
570
571impl std::fmt::Debug for NetworkNode {
572    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573        f.debug_struct("NetworkNode")
574            .field("inner", &"inner_skipped")
575            .field("spec", &self.spec)
576            .field("name", &self.name)
577            .field("ws_uri", &self.ws_uri)
578            .field("prometheus_uri", &self.prometheus_uri)
579            .finish()
580    }
581}
582
583// TODO: mock and impl more unit tests
584#[cfg(test)]
585mod tests {
586    use std::{
587        path::{Path, PathBuf},
588        sync::{Arc, Mutex},
589    };
590
591    use async_trait::async_trait;
592    use provider::{types::*, ProviderError, ProviderNode};
593
594    use super::*;
595
596    struct MockNode {
597        logs: Arc<Mutex<Vec<String>>>,
598    }
599
600    impl MockNode {
601        fn new() -> Self {
602            Self {
603                logs: Arc::new(Mutex::new(vec![])),
604            }
605        }
606
607        fn logs_push(&self, lines: Vec<impl Into<String>>) {
608            self.logs
609                .lock()
610                .unwrap()
611                .extend(lines.into_iter().map(|l| l.into()));
612        }
613    }
614
615    #[async_trait]
616    impl ProviderNode for MockNode {
617        fn name(&self) -> &str {
618            todo!()
619        }
620
621        fn args(&self) -> Vec<&str> {
622            todo!()
623        }
624
625        fn base_dir(&self) -> &PathBuf {
626            todo!()
627        }
628
629        fn config_dir(&self) -> &PathBuf {
630            todo!()
631        }
632
633        fn data_dir(&self) -> &PathBuf {
634            todo!()
635        }
636
637        fn relay_data_dir(&self) -> &PathBuf {
638            todo!()
639        }
640
641        fn scripts_dir(&self) -> &PathBuf {
642            todo!()
643        }
644
645        fn log_path(&self) -> &PathBuf {
646            todo!()
647        }
648
649        fn log_cmd(&self) -> String {
650            todo!()
651        }
652
653        fn path_in_node(&self, _file: &Path) -> PathBuf {
654            todo!()
655        }
656
657        async fn logs(&self) -> Result<String, ProviderError> {
658            Ok(self.logs.lock().unwrap().join("\n"))
659        }
660
661        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
662            todo!()
663        }
664
665        async fn run_command(
666            &self,
667            _options: RunCommandOptions,
668        ) -> Result<ExecutionResult, ProviderError> {
669            todo!()
670        }
671
672        async fn run_script(
673            &self,
674            _options: RunScriptOptions,
675        ) -> Result<ExecutionResult, ProviderError> {
676            todo!()
677        }
678
679        async fn send_file(
680            &self,
681            _local_file_path: &Path,
682            _remote_file_path: &Path,
683            _mode: &str,
684        ) -> Result<(), ProviderError> {
685            todo!()
686        }
687
688        async fn receive_file(
689            &self,
690            _remote_file_path: &Path,
691            _local_file_path: &Path,
692        ) -> Result<(), ProviderError> {
693            todo!()
694        }
695
696        async fn pause(&self) -> Result<(), ProviderError> {
697            todo!()
698        }
699
700        async fn resume(&self) -> Result<(), ProviderError> {
701            todo!()
702        }
703
704        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
705            todo!()
706        }
707
708        async fn destroy(&self) -> Result<(), ProviderError> {
709            todo!()
710        }
711    }
712
713    #[tokio::test(flavor = "multi_thread")]
714    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
715        let mock_provider = Arc::new(MockNode::new());
716        let mock_node = NetworkNode::new(
717            "node1",
718            "ws_uri",
719            "prometheus_uri",
720            "multiaddr",
721            NodeSpec::default(),
722            mock_provider.clone(),
723        );
724
725        mock_provider.logs_push(vec![
726            "system booting",
727            "stub line 1",
728            "stub line 2",
729            "system ready",
730        ]);
731
732        // Wait (up to 10 seconds) until pattern occurs once
733        let options = LogLineCountOptions {
734            predicate: Arc::new(|n| n == 1),
735            timeout: Duration::from_secs(10),
736            wait_until_timeout_elapses: false,
737        };
738
739        let log_line_count = mock_node
740            .wait_log_line_count_with_timeout("system ready", false, options)
741            .await?;
742
743        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
744
745        Ok(())
746    }
747
748    #[tokio::test(flavor = "multi_thread")]
749    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
750        let mock_provider = Arc::new(MockNode::new());
751        let mock_node = NetworkNode::new(
752            "node1",
753            "ws_uri",
754            "prometheus_uri",
755            "multiaddr",
756            NodeSpec::default(),
757            mock_provider.clone(),
758        );
759
760        mock_provider.logs_push(vec![
761            "system booting",
762            "stub line 1",
763            "stub line 2",
764            "system ready",
765        ]);
766
767        // Wait (up to 4 seconds) until pattern occurs twice
768        let options = LogLineCountOptions {
769            predicate: Arc::new(|n| n == 2),
770            timeout: Duration::from_secs(4),
771            wait_until_timeout_elapses: false,
772        };
773
774        let task = tokio::spawn({
775            async move {
776                mock_node
777                    .wait_log_line_count_with_timeout("system ready", false, options)
778                    .await
779                    .unwrap()
780            }
781        });
782
783        tokio::time::sleep(Duration::from_secs(2)).await;
784
785        mock_provider.logs_push(vec!["system ready"]);
786
787        let log_line_count = task.await?;
788
789        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
790
791        Ok(())
792    }
793
794    #[tokio::test(flavor = "multi_thread")]
795    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
796        let mock_provider = Arc::new(MockNode::new());
797        let mock_node = NetworkNode::new(
798            "node1",
799            "ws_uri",
800            "prometheus_uri",
801            "multiaddr",
802            NodeSpec::default(),
803            mock_provider.clone(),
804        );
805
806        mock_provider.logs_push(vec![
807            "system booting",
808            "stub line 1",
809            "stub line 2",
810            "system ready",
811        ]);
812
813        // Wait (up to 2 seconds) until pattern occurs twice
814        let options = LogLineCountOptions {
815            predicate: Arc::new(|n| n == 2),
816            timeout: Duration::from_secs(2),
817            wait_until_timeout_elapses: false,
818        };
819
820        let log_line_count = mock_node
821            .wait_log_line_count_with_timeout("system ready", false, options)
822            .await?;
823
824        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
825
826        Ok(())
827    }
828
829    #[tokio::test(flavor = "multi_thread")]
830    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
831        let mock_provider = Arc::new(MockNode::new());
832        let mock_node = NetworkNode::new(
833            "node1",
834            "ws_uri",
835            "prometheus_uri",
836            "multiaddr",
837            NodeSpec::default(),
838            mock_provider.clone(),
839        );
840
841        mock_provider.logs_push(vec![
842            "system booting",
843            "stub line 1",
844            "stub line 2",
845            "system ready",
846        ]);
847
848        // Wait until timeout and check if pattern occurs exactly twice
849        let options = LogLineCountOptions {
850            predicate: Arc::new(|n| n == 2),
851            timeout: Duration::from_secs(2),
852            wait_until_timeout_elapses: true,
853        };
854
855        let task = tokio::spawn({
856            async move {
857                mock_node
858                    .wait_log_line_count_with_timeout("system ready", false, options)
859                    .await
860                    .unwrap()
861            }
862        });
863
864        tokio::time::sleep(Duration::from_secs(1)).await;
865
866        mock_provider.logs_push(vec!["system ready"]);
867        mock_provider.logs_push(vec!["system ready"]);
868
869        let log_line_count = task.await?;
870
871        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
872
873        Ok(())
874    }
875
876    #[tokio::test(flavor = "multi_thread")]
877    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
878        let mock_provider = Arc::new(MockNode::new());
879        let mock_node = NetworkNode::new(
880            "node1",
881            "ws_uri",
882            "prometheus_uri",
883            "multiaddr",
884            NodeSpec::default(),
885            mock_provider.clone(),
886        );
887
888        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
889
890        let task = tokio::spawn({
891            async move {
892                mock_node
893                    .wait_log_line_count_with_timeout(
894                        "system ready",
895                        false,
896                        // Wait until timeout and make sure pattern occurred zero times
897                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
898                    )
899                    .await
900                    .unwrap()
901            }
902        });
903
904        tokio::time::sleep(Duration::from_secs(1)).await;
905
906        mock_provider.logs_push(vec!["stub line 3"]);
907
908        assert!(task.await?.success());
909
910        Ok(())
911    }
912
913    #[tokio::test(flavor = "multi_thread")]
914    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
915        let mock_provider = Arc::new(MockNode::new());
916        let mock_node = NetworkNode::new(
917            "node1",
918            "ws_uri",
919            "prometheus_uri",
920            "multiaddr",
921            NodeSpec::default(),
922            mock_provider.clone(),
923        );
924
925        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
926
927        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
928        let options = LogLineCountOptions {
929            predicate: Arc::new(|n| (2..=5).contains(&n)),
930            timeout: Duration::from_secs(2),
931            wait_until_timeout_elapses: true,
932        };
933
934        let task = tokio::spawn({
935            async move {
936                mock_node
937                    .wait_log_line_count_with_timeout("system ready", false, options)
938                    .await
939                    .unwrap()
940            }
941        });
942
943        tokio::time::sleep(Duration::from_secs(1)).await;
944
945        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
946
947        assert!(task.await?.success());
948
949        Ok(())
950    }
951
952    #[tokio::test(flavor = "multi_thread")]
953    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
954        let mock_provider = Arc::new(MockNode::new());
955        let mock_node = NetworkNode::new(
956            "node1",
957            "ws_uri",
958            "prometheus_uri",
959            "multiaddr",
960            NodeSpec::default(),
961            mock_provider.clone(),
962        );
963
964        mock_provider.logs_push(vec![
965            "system booting",
966            "stub line 1",
967            // this line should not match
968            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
969            "stub line 2"
970        ]);
971
972        let options = LogLineCountOptions {
973            predicate: Arc::new(|n| n == 1),
974            timeout: Duration::from_secs(3),
975            wait_until_timeout_elapses: true,
976        };
977
978        let task = tokio::spawn({
979            async move {
980                mock_node
981                    .wait_log_line_count_with_timeout(
982                        "error(?! importing block .*: block has an unknown parent)",
983                        false,
984                        options,
985                    )
986                    .await
987                    .unwrap()
988            }
989        });
990
991        tokio::time::sleep(Duration::from_secs(1)).await;
992
993        mock_provider.logs_push(vec![
994            "system ready",
995            // this line should match
996            "system error",
997            "system ready",
998        ]);
999
1000        assert!(task.await?.success());
1001
1002        Ok(())
1003    }
1004
1005    #[tokio::test(flavor = "multi_thread")]
1006    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1007    ) -> Result<(), anyhow::Error> {
1008        let mock_provider = Arc::new(MockNode::new());
1009        let mock_node = NetworkNode::new(
1010            "node1",
1011            "ws_uri",
1012            "prometheus_uri",
1013            "multiaddr",
1014            NodeSpec::default(),
1015            mock_provider.clone(),
1016        );
1017
1018        mock_provider.logs_push(vec![
1019            "system booting",
1020            "stub line 1",
1021            // this line should not match
1022            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1023            "stub line 2"
1024        ]);
1025
1026        let options = LogLineCountOptions {
1027            predicate: Arc::new(|n| n == 1),
1028            timeout: Duration::from_secs(6),
1029            wait_until_timeout_elapses: true,
1030        };
1031
1032        let task = tokio::spawn({
1033            async move {
1034                mock_node
1035                    .wait_log_line_count_with_timeout(
1036                        "error(?! importing block .*: block has an unknown parent)",
1037                        false,
1038                        options,
1039                    )
1040                    .await
1041                    .unwrap()
1042            }
1043        });
1044
1045        tokio::time::sleep(Duration::from_secs(1)).await;
1046
1047        mock_provider.logs_push(vec!["system ready", "system ready"]);
1048
1049        assert!(!task.await?.success());
1050
1051        Ok(())
1052    }
1053
1054    #[tokio::test(flavor = "multi_thread")]
1055    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1056        let mock_provider = Arc::new(MockNode::new());
1057        let mock_node = NetworkNode::new(
1058            "node1",
1059            "ws_uri",
1060            "prometheus_uri",
1061            "multiaddr",
1062            NodeSpec::default(),
1063            mock_provider.clone(),
1064        );
1065
1066        mock_provider.logs_push(vec![
1067            "system booting",
1068            "stub line 1",
1069            // this line should not match
1070            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1071            "stub line 2"
1072        ]);
1073
1074        let task = tokio::spawn({
1075            async move {
1076                mock_node
1077                    .wait_log_line_count(
1078                        "error(?! importing block .*: block has an unknown parent)",
1079                        false,
1080                        1,
1081                    )
1082                    .await
1083                    .unwrap()
1084            }
1085        });
1086
1087        tokio::time::sleep(Duration::from_secs(1)).await;
1088
1089        mock_provider.logs_push(vec![
1090            "system ready",
1091            // this line should match
1092            "system error",
1093            "system ready",
1094        ]);
1095
1096        assert!(task.await.is_ok());
1097
1098        Ok(())
1099    }
1100
1101    #[tokio::test(flavor = "multi_thread")]
1102    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1103        let mock_provider = Arc::new(MockNode::new());
1104        let mock_node = NetworkNode::new(
1105            "node1",
1106            "ws_uri",
1107            "prometheus_uri",
1108            "multiaddr",
1109            NodeSpec::default(),
1110            mock_provider.clone(),
1111        );
1112
1113        mock_provider.logs_push(vec![
1114            "system booting",
1115            "stub line 1",
1116            // this line should not match
1117            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1118            "stub line 2"
1119        ]);
1120
1121        let options = LogLineCountOptions {
1122            predicate: Arc::new(|count| count == 1),
1123            timeout: Duration::from_secs(2),
1124            wait_until_timeout_elapses: true,
1125        };
1126
1127        let task = tokio::spawn({
1128            async move {
1129                // we expect no match, thus wait with timeout
1130                mock_node
1131                    .wait_log_line_count_with_timeout(
1132                        "error(?! importing block .*: block has an unknown parent)",
1133                        false,
1134                        options,
1135                    )
1136                    .await
1137                    .unwrap()
1138            }
1139        });
1140
1141        tokio::time::sleep(Duration::from_secs(1)).await;
1142
1143        mock_provider.logs_push(vec!["system ready", "system ready"]);
1144
1145        assert!(!task.await?.success());
1146
1147        Ok(())
1148    }
1149}