zombienet_orchestrator/network/
node.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::{Deserialize, Serialize, Serializer};
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27    #[error("metric '{0}' not found!")]
28    MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33    #[serde(serialize_with = "serialize_provider_node")]
34    pub(crate) inner: DynNode,
35    // TODO: do we need the full spec here?
36    // Maybe a reduce set of values.
37    pub(crate) spec: NodeSpec,
38    pub(crate) name: String,
39    pub(crate) ws_uri: String,
40    pub(crate) multiaddr: String,
41    pub(crate) prometheus_uri: String,
42    #[serde(skip)]
43    metrics_cache: Arc<RwLock<MetricMap>>,
44    #[serde(skip)]
45    is_running: Arc<AtomicBool>,
46}
47
48#[derive(Deserialize)]
49pub(crate) struct RawNetworkNode {
50    pub(crate) name: String,
51    pub(crate) ws_uri: String,
52    pub(crate) prometheus_uri: String,
53    pub(crate) multiaddr: String,
54    pub(crate) spec: NodeSpec,
55    pub(crate) inner: serde_json::Value,
56}
57
58/// Result of waiting for a certain number of log lines to appear.
59///
60/// Indicates whether the log line count condition was met within the timeout period.
61///
62/// # Variants
63/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
64///     * `count`: The number of matching log lines at the time of satisfaction.
65/// - `TargetFailed(count)` – The condition was not met within the timeout.
66///     * `count`: The final number of matching log lines at timeout expiration.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum LogLineCount {
69    TargetReached(u32),
70    TargetFailed(u32),
71}
72
73impl LogLineCount {
74    pub fn success(&self) -> bool {
75        match self {
76            Self::TargetReached(..) => true,
77            Self::TargetFailed(..) => false,
78        }
79    }
80}
81
82/// Configuration for controlling log line count waiting behavior.
83///
84/// Allows specifying a custom predicate on the number of matching log lines,
85/// a timeout in seconds, and whether the system should wait the entire timeout duration.
86///
87/// # Fields
88/// - `predicate`: A function that takes the current number of matching lines and
89///   returns `true` if the condition is satisfied.
90/// - `timeout_secs`: Maximum number of seconds to wait.
91/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
92///   for the full timeout duration, even if the condition is already met early.
93///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
94#[derive(Clone)]
95pub struct LogLineCountOptions {
96    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
97    pub timeout: Duration,
98    pub wait_until_timeout_elapses: bool,
99}
100
101impl LogLineCountOptions {
102    pub fn new(
103        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
104        timeout: Duration,
105        wait_until_timeout_elapses: bool,
106    ) -> Self {
107        Self {
108            predicate: Arc::new(predicate),
109            timeout,
110            wait_until_timeout_elapses,
111        }
112    }
113
114    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
115        Self::new(|n| n == 0, timeout, true)
116    }
117}
118
119// #[derive(Clone, Debug)]
120// pub struct QueryMetricOptions {
121//     use_cache: bool,
122//     treat_not_found_as_zero: bool,
123// }
124
125// impl Default for QueryMetricOptions {
126//     fn default() -> Self {
127//         Self { use_cache: false, treat_not_found_as_zero: true }
128//     }
129// }
130
131impl NetworkNode {
132    /// Create a new NetworkNode
133    pub(crate) fn new<T: Into<String>>(
134        name: T,
135        ws_uri: T,
136        prometheus_uri: T,
137        multiaddr: T,
138        spec: NodeSpec,
139        inner: DynNode,
140    ) -> Self {
141        Self {
142            name: name.into(),
143            ws_uri: ws_uri.into(),
144            prometheus_uri: prometheus_uri.into(),
145            inner,
146            spec,
147            multiaddr: multiaddr.into(),
148            metrics_cache: Arc::new(Default::default()),
149            is_running: Arc::new(AtomicBool::new(false)),
150        }
151    }
152
153    pub(crate) fn is_running(&self) -> bool {
154        self.is_running.load(Ordering::Acquire)
155    }
156
157    pub(crate) fn set_is_running(&self, is_running: bool) {
158        self.is_running.store(is_running, Ordering::Release);
159    }
160
161    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
162        self.multiaddr = multiaddr.into();
163    }
164
165    pub fn name(&self) -> &str {
166        &self.name
167    }
168
169    pub fn args(&self) -> Vec<&str> {
170        self.inner.args()
171    }
172
173    pub fn spec(&self) -> &NodeSpec {
174        &self.spec
175    }
176
177    pub fn ws_uri(&self) -> &str {
178        &self.ws_uri
179    }
180
181    pub fn multiaddr(&self) -> &str {
182        self.multiaddr.as_ref()
183    }
184
185    // Subxt
186
187    /// Get the rpc client for the node
188    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
189        get_client_from_url(&self.ws_uri).await
190    }
191
192    /// Get the [online client](subxt::client::OnlineClient) for the node
193    #[deprecated = "Use `wait_client` instead."]
194    pub async fn client<Config: subxt::Config>(
195        &self,
196    ) -> Result<OnlineClient<Config>, subxt::Error> {
197        self.try_client().await
198    }
199
200    /// Try to connect to the node.
201    ///
202    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
203    /// the node to appear before it connects to it. This function directly tries
204    /// to connect to the node and returns an error if the node is not yet available
205    /// at that point in time.
206    ///
207    /// Returns a [`OnlineClient`] on success.
208    pub async fn try_client<Config: subxt::Config>(
209        &self,
210    ) -> Result<OnlineClient<Config>, subxt::Error> {
211        get_client_from_url(&self.ws_uri).await
212    }
213
214    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
215    pub async fn wait_client<Config: subxt::Config>(
216        &self,
217    ) -> Result<OnlineClient<Config>, anyhow::Error> {
218        debug!("wait_client ws_uri: {}", self.ws_uri());
219        wait_ws_ready(self.ws_uri())
220            .await
221            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
222
223        self.try_client()
224            .await
225            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
226    }
227
228    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
229    pub async fn wait_client_with_timeout<Config: subxt::Config>(
230        &self,
231        timeout_secs: impl Into<u64>,
232    ) -> Result<OnlineClient<Config>, anyhow::Error> {
233        debug!("waiting until subxt client is ready");
234        tokio::time::timeout(
235            Duration::from_secs(timeout_secs.into()),
236            self.wait_client::<Config>(),
237        )
238        .await?
239    }
240
241    // Commands
242
243    /// Pause the node, this is implemented by pausing the
244    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
245    ///
246    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
247    /// with global setting `teardown_on_failure` disabled.
248    pub async fn pause(&self) -> Result<(), anyhow::Error> {
249        self.set_is_running(false);
250        self.inner.pause().await?;
251        Ok(())
252    }
253
254    /// Resume the node, this is implemented by resuming the
255    /// actual process (e.g polkadot) with sending `SIGCONT` signal
256    ///
257    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
258    /// with global setting `teardown_on_failure` disabled.
259    pub async fn resume(&self) -> Result<(), anyhow::Error> {
260        self.set_is_running(true);
261        self.inner.resume().await?;
262        Ok(())
263    }
264
265    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
266    ///
267    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
268    /// with global setting `teardown_on_failure` disabled.
269    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
270        self.set_is_running(false);
271        self.inner.restart(after).await?;
272        self.set_is_running(true);
273        Ok(())
274    }
275
276    // Metrics assertions
277
278    /// Get metric value 'by name' from Prometheus (exposed by the node)
279    /// metric name can be:
280    /// with prefix (e.g: 'polkadot_')
281    /// with chain attribute (e.g: 'chain=rococo-local')
282    /// without prefix and/or without chain attribute
283    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
284        let metric_name = metric_name.into();
285        // force cache reload
286        self.fetch_metrics().await?;
287        // by default we treat not found as 0 (same in v1)
288        self.metric(&metric_name, true).await
289    }
290
291    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
292    /// metric name can be:
293    /// with prefix (e.g: 'polkadot_')
294    /// with chain attribute (e.g: 'chain=rococo-local')
295    /// without prefix and/or without chain attribute
296    ///
297    /// We first try to assert on the value using the cached metrics and
298    /// if not meet the criteria we reload the cache and check again
299    pub async fn assert(
300        &self,
301        metric_name: impl Into<String>,
302        value: impl Into<f64>,
303    ) -> Result<bool, anyhow::Error> {
304        let value: f64 = value.into();
305        self.assert_with(metric_name, |v| v == value).await
306    }
307
308    /// Assert on a metric value using a given predicate.
309    /// See [`NetworkNode::reports`] description for details on metric name.
310    pub async fn assert_with(
311        &self,
312        metric_name: impl Into<String>,
313        predicate: impl Fn(f64) -> bool,
314    ) -> Result<bool, anyhow::Error> {
315        let metric_name = metric_name.into();
316        // reload metrics
317        self.fetch_metrics().await?;
318        let val = self.metric(&metric_name, true).await?;
319        trace!("🔎 Current value {val} passed to the predicated?");
320        Ok(predicate(val))
321    }
322
323    // Wait methods for metrics
324
325    /// Wait until a metric value pass the `predicate`
326    pub async fn wait_metric(
327        &self,
328        metric_name: impl Into<String>,
329        predicate: impl Fn(f64) -> bool,
330    ) -> Result<(), anyhow::Error> {
331        let metric_name = metric_name.into();
332        debug!("waiting until metric {metric_name} pass the predicate");
333        loop {
334            let res = self.assert_with(&metric_name, &predicate).await;
335            match res {
336                Ok(res) => {
337                    if res {
338                        return Ok(());
339                    }
340                },
341                Err(e) => match e.downcast::<reqwest::Error>() {
342                    Ok(io_err) => {
343                        if !skip_err_while_waiting(&io_err) {
344                            return Err(io_err.into());
345                        }
346                    },
347                    Err(other) => {
348                        match other.downcast::<NetworkNodeError>() {
349                            Ok(node_err) => {
350                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
351                                    return Err(node_err.into());
352                                }
353                            },
354                            Err(other) => return Err(other),
355                        };
356                    },
357                },
358            }
359
360            // sleep to not spam prometheus
361            tokio::time::sleep(Duration::from_secs(1)).await;
362        }
363    }
364
365    /// Wait until a metric value pass the `predicate`
366    /// with a timeout (secs)
367    pub async fn wait_metric_with_timeout(
368        &self,
369        metric_name: impl Into<String>,
370        predicate: impl Fn(f64) -> bool,
371        timeout_secs: impl Into<u64>,
372    ) -> Result<(), anyhow::Error> {
373        let metric_name = metric_name.into();
374        let secs = timeout_secs.into();
375        debug!("waiting until metric {metric_name} pass the predicate");
376        let res = tokio::time::timeout(
377            Duration::from_secs(secs),
378            self.wait_metric(&metric_name, predicate),
379        )
380        .await;
381
382        if let Ok(inner_res) = res {
383            match inner_res {
384                Ok(_) => Ok(()),
385                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
386            }
387        } else {
388            // timeout
389            Err(anyhow!(
390                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
391            ))
392        }
393    }
394
395    // Logs
396
397    /// Get the logs of the node
398    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
399    pub async fn logs(&self) -> Result<String, anyhow::Error> {
400        Ok(self.inner.logs().await?)
401    }
402
403    /// Wait until a the number of matching log lines is reach
404    pub async fn wait_log_line_count(
405        &self,
406        pattern: impl Into<String>,
407        is_glob: bool,
408        count: usize,
409    ) -> Result<(), anyhow::Error> {
410        let pattern = pattern.into();
411        let pattern_clone = pattern.clone();
412        debug!("waiting until we find pattern {pattern} {count} times");
413        let match_fn: BoxedClosure = if is_glob {
414            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
415        } else {
416            let re = Regex::new(&pattern)?;
417            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
418        };
419
420        loop {
421            let mut q = 0_usize;
422            let logs = self.logs().await?;
423            for line in logs.lines() {
424                trace!("line is {line}");
425                if match_fn(line)? {
426                    trace!("pattern {pattern_clone} match in line {line}");
427                    q += 1;
428                    if q >= count {
429                        return Ok(());
430                    }
431                }
432            }
433
434            tokio::time::sleep(Duration::from_secs(2)).await;
435        }
436    }
437
438    /// Waits until the number of matching log lines satisfies a custom condition,
439    /// optionally waiting for the entire duration of the timeout.
440    ///
441    /// This method searches log lines for a given substring or glob pattern,
442    /// and evaluates the number of matching lines using a user-provided predicate function.
443    /// Optionally, it can wait for the full timeout duration to ensure the condition
444    /// holds consistently (e.g., for verifying absence of logs).
445    ///
446    /// # Arguments
447    /// * `substring` - The substring or pattern to match within log lines.
448    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
449    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
450    ///
451    /// # Returns
452    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
453    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
454    /// * `Err(e)` if an error occurred during log retrieval or matching.
455    ///
456    /// # Example
457    /// ```rust
458    /// # use std::{sync::Arc, time::Duration};
459    /// # use provider::NativeProvider;
460    /// # use support::{fs::local::LocalFileSystem};
461    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
462    /// # use configuration::NetworkConfig;
463    /// # async fn example() -> Result<(), anyhow::Error> {
464    /// #   let provider = NativeProvider::new(LocalFileSystem {});
465    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
466    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
467    /// #   let network = orchestrator.spawn(config).await?;
468    /// let node = network.get_node("alice")?;
469    /// // Wait (up to 10 seconds) until pattern occurs once
470    /// let options = LogLineCountOptions {
471    ///     predicate: Arc::new(|count| count == 1),
472    ///     timeout: Duration::from_secs(10),
473    ///     wait_until_timeout_elapses: false,
474    /// };
475    /// let result = node
476    ///     .wait_log_line_count_with_timeout("error", false, options)
477    ///     .await?;
478    /// #   Ok(())
479    /// # }
480    /// ```
481    pub async fn wait_log_line_count_with_timeout(
482        &self,
483        substring: impl Into<String>,
484        is_glob: bool,
485        options: LogLineCountOptions,
486    ) -> Result<LogLineCount, anyhow::Error> {
487        let substring = substring.into();
488        debug!(
489            "waiting until match lines count within {} seconds",
490            options.timeout.as_secs_f64()
491        );
492
493        let start = tokio::time::Instant::now();
494
495        let match_fn: BoxedClosure = if is_glob {
496            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
497        } else {
498            let re = Regex::new(&substring)?;
499            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
500        };
501
502        if options.wait_until_timeout_elapses {
503            tokio::time::sleep(options.timeout).await;
504        }
505
506        let mut q;
507        loop {
508            q = 0_u32;
509            let logs = self.logs().await?;
510            for line in logs.lines() {
511                if match_fn(line)? {
512                    q += 1;
513
514                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
515                    // end after the whole log file is processed. This is to address the cases when the
516                    // predicate becomes true and false again.
517                    // eg. expected exactly 2 matching lines are expected but 3 are present
518                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
519                        return Ok(LogLineCount::TargetReached(q));
520                    }
521                }
522            }
523
524            if start.elapsed() >= options.timeout {
525                break;
526            }
527
528            tokio::time::sleep(Duration::from_secs(2)).await;
529        }
530
531        if (options.predicate)(q) {
532            Ok(LogLineCount::TargetReached(q))
533        } else {
534            Ok(LogLineCount::TargetFailed(q))
535        }
536    }
537
538    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
539        let response = reqwest::get(&self.prometheus_uri).await?;
540        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
541        let mut cache = self.metrics_cache.write().await;
542        *cache = metrics;
543        Ok(())
544    }
545
546    /// Query individual metric by name
547    async fn metric(
548        &self,
549        metric_name: &str,
550        treat_not_found_as_zero: bool,
551    ) -> Result<f64, anyhow::Error> {
552        let mut metrics_map = self.metrics_cache.read().await;
553        if metrics_map.is_empty() {
554            // reload metrics
555            drop(metrics_map);
556            self.fetch_metrics().await?;
557            metrics_map = self.metrics_cache.read().await;
558        }
559
560        if let Some(val) = metrics_map.get(metric_name) {
561            Ok(*val)
562        } else if treat_not_found_as_zero {
563            Ok(0_f64)
564        } else {
565            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
566        }
567    }
568
569    /// Waits given number of seconds until node reports that it is up and running, which
570    /// is determined by metric 'process_start_time_seconds', which should appear,
571    /// when node finished booting up.
572    ///
573    ///
574    /// # Arguments
575    /// * `timeout_secs` - The number of seconds to wait.
576    ///
577    /// # Returns
578    /// * `Ok()` if the node is up before timeout occured.
579    /// * `Err(e)` if timeout or other error occurred while waiting.
580    pub async fn wait_until_is_up(
581        &self,
582        timeout_secs: impl Into<u64>,
583    ) -> Result<(), anyhow::Error> {
584        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
585            .await
586            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
587    }
588}
589
590impl std::fmt::Debug for NetworkNode {
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        f.debug_struct("NetworkNode")
593            .field("inner", &"inner_skipped")
594            .field("spec", &self.spec)
595            .field("name", &self.name)
596            .field("ws_uri", &self.ws_uri)
597            .field("prometheus_uri", &self.prometheus_uri)
598            .finish()
599    }
600}
601
602fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
603where
604    S: Serializer,
605{
606    erased_serde::serialize(node.as_ref(), serializer)
607}
608
609// TODO: mock and impl more unit tests
610#[cfg(test)]
611mod tests {
612    use std::{
613        path::{Path, PathBuf},
614        sync::{Arc, Mutex},
615    };
616
617    use async_trait::async_trait;
618    use provider::{types::*, ProviderError, ProviderNode};
619
620    use super::*;
621
622    #[derive(Serialize)]
623    struct MockNode {
624        logs: Arc<Mutex<Vec<String>>>,
625    }
626
627    impl MockNode {
628        fn new() -> Self {
629            Self {
630                logs: Arc::new(Mutex::new(vec![])),
631            }
632        }
633
634        fn logs_push(&self, lines: Vec<impl Into<String>>) {
635            self.logs
636                .lock()
637                .unwrap()
638                .extend(lines.into_iter().map(|l| l.into()));
639        }
640    }
641
642    #[async_trait]
643    impl ProviderNode for MockNode {
644        fn name(&self) -> &str {
645            todo!()
646        }
647
648        fn args(&self) -> Vec<&str> {
649            todo!()
650        }
651
652        fn base_dir(&self) -> &PathBuf {
653            todo!()
654        }
655
656        fn config_dir(&self) -> &PathBuf {
657            todo!()
658        }
659
660        fn data_dir(&self) -> &PathBuf {
661            todo!()
662        }
663
664        fn relay_data_dir(&self) -> &PathBuf {
665            todo!()
666        }
667
668        fn scripts_dir(&self) -> &PathBuf {
669            todo!()
670        }
671
672        fn log_path(&self) -> &PathBuf {
673            todo!()
674        }
675
676        fn log_cmd(&self) -> String {
677            todo!()
678        }
679
680        fn path_in_node(&self, _file: &Path) -> PathBuf {
681            todo!()
682        }
683
684        async fn logs(&self) -> Result<String, ProviderError> {
685            Ok(self.logs.lock().unwrap().join("\n"))
686        }
687
688        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
689            todo!()
690        }
691
692        async fn run_command(
693            &self,
694            _options: RunCommandOptions,
695        ) -> Result<ExecutionResult, ProviderError> {
696            todo!()
697        }
698
699        async fn run_script(
700            &self,
701            _options: RunScriptOptions,
702        ) -> Result<ExecutionResult, ProviderError> {
703            todo!()
704        }
705
706        async fn send_file(
707            &self,
708            _local_file_path: &Path,
709            _remote_file_path: &Path,
710            _mode: &str,
711        ) -> Result<(), ProviderError> {
712            todo!()
713        }
714
715        async fn receive_file(
716            &self,
717            _remote_file_path: &Path,
718            _local_file_path: &Path,
719        ) -> Result<(), ProviderError> {
720            todo!()
721        }
722
723        async fn pause(&self) -> Result<(), ProviderError> {
724            todo!()
725        }
726
727        async fn resume(&self) -> Result<(), ProviderError> {
728            todo!()
729        }
730
731        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
732            todo!()
733        }
734
735        async fn destroy(&self) -> Result<(), ProviderError> {
736            todo!()
737        }
738    }
739
740    #[tokio::test(flavor = "multi_thread")]
741    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
742        let mock_provider = Arc::new(MockNode::new());
743        let mock_node = NetworkNode::new(
744            "node1",
745            "ws_uri",
746            "prometheus_uri",
747            "multiaddr",
748            NodeSpec::default(),
749            mock_provider.clone(),
750        );
751
752        mock_provider.logs_push(vec![
753            "system booting",
754            "stub line 1",
755            "stub line 2",
756            "system ready",
757        ]);
758
759        // Wait (up to 10 seconds) until pattern occurs once
760        let options = LogLineCountOptions {
761            predicate: Arc::new(|n| n == 1),
762            timeout: Duration::from_secs(10),
763            wait_until_timeout_elapses: false,
764        };
765
766        let log_line_count = mock_node
767            .wait_log_line_count_with_timeout("system ready", false, options)
768            .await?;
769
770        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
771
772        Ok(())
773    }
774
775    #[tokio::test(flavor = "multi_thread")]
776    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
777        let mock_provider = Arc::new(MockNode::new());
778        let mock_node = NetworkNode::new(
779            "node1",
780            "ws_uri",
781            "prometheus_uri",
782            "multiaddr",
783            NodeSpec::default(),
784            mock_provider.clone(),
785        );
786
787        mock_provider.logs_push(vec![
788            "system booting",
789            "stub line 1",
790            "stub line 2",
791            "system ready",
792        ]);
793
794        // Wait (up to 4 seconds) until pattern occurs twice
795        let options = LogLineCountOptions {
796            predicate: Arc::new(|n| n == 2),
797            timeout: Duration::from_secs(4),
798            wait_until_timeout_elapses: false,
799        };
800
801        let task = tokio::spawn({
802            async move {
803                mock_node
804                    .wait_log_line_count_with_timeout("system ready", false, options)
805                    .await
806                    .unwrap()
807            }
808        });
809
810        tokio::time::sleep(Duration::from_secs(2)).await;
811
812        mock_provider.logs_push(vec!["system ready"]);
813
814        let log_line_count = task.await?;
815
816        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
817
818        Ok(())
819    }
820
821    #[tokio::test(flavor = "multi_thread")]
822    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
823        let mock_provider = Arc::new(MockNode::new());
824        let mock_node = NetworkNode::new(
825            "node1",
826            "ws_uri",
827            "prometheus_uri",
828            "multiaddr",
829            NodeSpec::default(),
830            mock_provider.clone(),
831        );
832
833        mock_provider.logs_push(vec![
834            "system booting",
835            "stub line 1",
836            "stub line 2",
837            "system ready",
838        ]);
839
840        // Wait (up to 2 seconds) until pattern occurs twice
841        let options = LogLineCountOptions {
842            predicate: Arc::new(|n| n == 2),
843            timeout: Duration::from_secs(2),
844            wait_until_timeout_elapses: false,
845        };
846
847        let log_line_count = mock_node
848            .wait_log_line_count_with_timeout("system ready", false, options)
849            .await?;
850
851        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
852
853        Ok(())
854    }
855
856    #[tokio::test(flavor = "multi_thread")]
857    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
858        let mock_provider = Arc::new(MockNode::new());
859        let mock_node = NetworkNode::new(
860            "node1",
861            "ws_uri",
862            "prometheus_uri",
863            "multiaddr",
864            NodeSpec::default(),
865            mock_provider.clone(),
866        );
867
868        mock_provider.logs_push(vec![
869            "system booting",
870            "stub line 1",
871            "stub line 2",
872            "system ready",
873        ]);
874
875        // Wait until timeout and check if pattern occurs exactly twice
876        let options = LogLineCountOptions {
877            predicate: Arc::new(|n| n == 2),
878            timeout: Duration::from_secs(2),
879            wait_until_timeout_elapses: true,
880        };
881
882        let task = tokio::spawn({
883            async move {
884                mock_node
885                    .wait_log_line_count_with_timeout("system ready", false, options)
886                    .await
887                    .unwrap()
888            }
889        });
890
891        tokio::time::sleep(Duration::from_secs(1)).await;
892
893        mock_provider.logs_push(vec!["system ready"]);
894        mock_provider.logs_push(vec!["system ready"]);
895
896        let log_line_count = task.await?;
897
898        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
899
900        Ok(())
901    }
902
903    #[tokio::test(flavor = "multi_thread")]
904    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
905        let mock_provider = Arc::new(MockNode::new());
906        let mock_node = NetworkNode::new(
907            "node1",
908            "ws_uri",
909            "prometheus_uri",
910            "multiaddr",
911            NodeSpec::default(),
912            mock_provider.clone(),
913        );
914
915        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
916
917        let task = tokio::spawn({
918            async move {
919                mock_node
920                    .wait_log_line_count_with_timeout(
921                        "system ready",
922                        false,
923                        // Wait until timeout and make sure pattern occurred zero times
924                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
925                    )
926                    .await
927                    .unwrap()
928            }
929        });
930
931        tokio::time::sleep(Duration::from_secs(1)).await;
932
933        mock_provider.logs_push(vec!["stub line 3"]);
934
935        assert!(task.await?.success());
936
937        Ok(())
938    }
939
940    #[tokio::test(flavor = "multi_thread")]
941    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
942        let mock_provider = Arc::new(MockNode::new());
943        let mock_node = NetworkNode::new(
944            "node1",
945            "ws_uri",
946            "prometheus_uri",
947            "multiaddr",
948            NodeSpec::default(),
949            mock_provider.clone(),
950        );
951
952        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
953
954        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
955        let options = LogLineCountOptions {
956            predicate: Arc::new(|n| (2..=5).contains(&n)),
957            timeout: Duration::from_secs(2),
958            wait_until_timeout_elapses: true,
959        };
960
961        let task = tokio::spawn({
962            async move {
963                mock_node
964                    .wait_log_line_count_with_timeout("system ready", false, options)
965                    .await
966                    .unwrap()
967            }
968        });
969
970        tokio::time::sleep(Duration::from_secs(1)).await;
971
972        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
973
974        assert!(task.await?.success());
975
976        Ok(())
977    }
978
979    #[tokio::test(flavor = "multi_thread")]
980    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
981        let mock_provider = Arc::new(MockNode::new());
982        let mock_node = NetworkNode::new(
983            "node1",
984            "ws_uri",
985            "prometheus_uri",
986            "multiaddr",
987            NodeSpec::default(),
988            mock_provider.clone(),
989        );
990
991        mock_provider.logs_push(vec![
992            "system booting",
993            "stub line 1",
994            // this line should not match
995            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
996            "stub line 2"
997        ]);
998
999        let options = LogLineCountOptions {
1000            predicate: Arc::new(|n| n == 1),
1001            timeout: Duration::from_secs(3),
1002            wait_until_timeout_elapses: true,
1003        };
1004
1005        let task = tokio::spawn({
1006            async move {
1007                mock_node
1008                    .wait_log_line_count_with_timeout(
1009                        "error(?! importing block .*: block has an unknown parent)",
1010                        false,
1011                        options,
1012                    )
1013                    .await
1014                    .unwrap()
1015            }
1016        });
1017
1018        tokio::time::sleep(Duration::from_secs(1)).await;
1019
1020        mock_provider.logs_push(vec![
1021            "system ready",
1022            // this line should match
1023            "system error",
1024            "system ready",
1025        ]);
1026
1027        assert!(task.await?.success());
1028
1029        Ok(())
1030    }
1031
1032    #[tokio::test(flavor = "multi_thread")]
1033    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1034    ) -> Result<(), anyhow::Error> {
1035        let mock_provider = Arc::new(MockNode::new());
1036        let mock_node = NetworkNode::new(
1037            "node1",
1038            "ws_uri",
1039            "prometheus_uri",
1040            "multiaddr",
1041            NodeSpec::default(),
1042            mock_provider.clone(),
1043        );
1044
1045        mock_provider.logs_push(vec![
1046            "system booting",
1047            "stub line 1",
1048            // this line should not match
1049            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1050            "stub line 2"
1051        ]);
1052
1053        let options = LogLineCountOptions {
1054            predicate: Arc::new(|n| n == 1),
1055            timeout: Duration::from_secs(6),
1056            wait_until_timeout_elapses: true,
1057        };
1058
1059        let task = tokio::spawn({
1060            async move {
1061                mock_node
1062                    .wait_log_line_count_with_timeout(
1063                        "error(?! importing block .*: block has an unknown parent)",
1064                        false,
1065                        options,
1066                    )
1067                    .await
1068                    .unwrap()
1069            }
1070        });
1071
1072        tokio::time::sleep(Duration::from_secs(1)).await;
1073
1074        mock_provider.logs_push(vec!["system ready", "system ready"]);
1075
1076        assert!(!task.await?.success());
1077
1078        Ok(())
1079    }
1080
1081    #[tokio::test(flavor = "multi_thread")]
1082    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1083        let mock_provider = Arc::new(MockNode::new());
1084        let mock_node = NetworkNode::new(
1085            "node1",
1086            "ws_uri",
1087            "prometheus_uri",
1088            "multiaddr",
1089            NodeSpec::default(),
1090            mock_provider.clone(),
1091        );
1092
1093        mock_provider.logs_push(vec![
1094            "system booting",
1095            "stub line 1",
1096            // this line should not match
1097            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1098            "stub line 2"
1099        ]);
1100
1101        let task = tokio::spawn({
1102            async move {
1103                mock_node
1104                    .wait_log_line_count(
1105                        "error(?! importing block .*: block has an unknown parent)",
1106                        false,
1107                        1,
1108                    )
1109                    .await
1110                    .unwrap()
1111            }
1112        });
1113
1114        tokio::time::sleep(Duration::from_secs(1)).await;
1115
1116        mock_provider.logs_push(vec![
1117            "system ready",
1118            // this line should match
1119            "system error",
1120            "system ready",
1121        ]);
1122
1123        assert!(task.await.is_ok());
1124
1125        Ok(())
1126    }
1127
1128    #[tokio::test(flavor = "multi_thread")]
1129    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1130        let mock_provider = Arc::new(MockNode::new());
1131        let mock_node = NetworkNode::new(
1132            "node1",
1133            "ws_uri",
1134            "prometheus_uri",
1135            "multiaddr",
1136            NodeSpec::default(),
1137            mock_provider.clone(),
1138        );
1139
1140        mock_provider.logs_push(vec![
1141            "system booting",
1142            "stub line 1",
1143            // this line should not match
1144            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1145            "stub line 2"
1146        ]);
1147
1148        let options = LogLineCountOptions {
1149            predicate: Arc::new(|count| count == 1),
1150            timeout: Duration::from_secs(2),
1151            wait_until_timeout_elapses: true,
1152        };
1153
1154        let task = tokio::spawn({
1155            async move {
1156                // we expect no match, thus wait with timeout
1157                mock_node
1158                    .wait_log_line_count_with_timeout(
1159                        "error(?! importing block .*: block has an unknown parent)",
1160                        false,
1161                        options,
1162                    )
1163                    .await
1164                    .unwrap()
1165            }
1166        });
1167
1168        tokio::time::sleep(Duration::from_secs(1)).await;
1169
1170        mock_provider.logs_push(vec!["system ready", "system ready"]);
1171
1172        assert!(!task.await?.success());
1173
1174        Ok(())
1175    }
1176}