zombienet_orchestrator/network/
node.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::Serialize;
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21#[cfg(feature = "pjs")]
22use crate::pjs_helper::{pjs_build_template, pjs_exec, PjsResult, ReturnValue};
23use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
24
25type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
26
27#[derive(Error, Debug)]
28pub enum NetworkNodeError {
29    #[error("metric '{0}' not found!")]
30    MetricNotFound(String),
31}
32
33#[derive(Clone, Serialize)]
34pub struct NetworkNode {
35    #[serde(skip)]
36    pub(crate) inner: DynNode,
37    // TODO: do we need the full spec here?
38    // Maybe a reduce set of values.
39    pub(crate) spec: NodeSpec,
40    pub(crate) name: String,
41    pub(crate) ws_uri: String,
42    pub(crate) multiaddr: String,
43    pub(crate) prometheus_uri: String,
44    #[serde(skip)]
45    metrics_cache: Arc<RwLock<MetricMap>>,
46    #[serde(skip)]
47    is_running: Arc<AtomicBool>,
48}
49
50/// Result of waiting for a certain number of log lines to appear.
51///
52/// Indicates whether the log line count condition was met within the timeout period.
53///
54/// # Variants
55/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
56///     * `count`: The number of matching log lines at the time of satisfaction.
57/// - `TargetFailed(count)` – The condition was not met within the timeout.
58///     * `count`: The final number of matching log lines at timeout expiration.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum LogLineCount {
61    TargetReached(u32),
62    TargetFailed(u32),
63}
64
65impl LogLineCount {
66    pub fn success(&self) -> bool {
67        match self {
68            Self::TargetReached(..) => true,
69            Self::TargetFailed(..) => false,
70        }
71    }
72}
73
74/// Configuration for controlling log line count waiting behavior.
75///
76/// Allows specifying a custom predicate on the number of matching log lines,
77/// a timeout in seconds, and whether the system should wait the entire timeout duration.
78///
79/// # Fields
80/// - `predicate`: A function that takes the current number of matching lines and
81///   returns `true` if the condition is satisfied.
82/// - `timeout_secs`: Maximum number of seconds to wait.
83/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
84///   for the full timeout duration, even if the condition is already met early.
85///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
86#[derive(Clone)]
87pub struct LogLineCountOptions {
88    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
89    pub timeout: Duration,
90    pub wait_until_timeout_elapses: bool,
91}
92
93impl LogLineCountOptions {
94    pub fn new(
95        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
96        timeout: Duration,
97        wait_until_timeout_elapses: bool,
98    ) -> Self {
99        Self {
100            predicate: Arc::new(predicate),
101            timeout,
102            wait_until_timeout_elapses,
103        }
104    }
105
106    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
107        Self::new(|n| n == 0, timeout, true)
108    }
109}
110
111// #[derive(Clone, Debug)]
112// pub struct QueryMetricOptions {
113//     use_cache: bool,
114//     treat_not_found_as_zero: bool,
115// }
116
117// impl Default for QueryMetricOptions {
118//     fn default() -> Self {
119//         Self { use_cache: false, treat_not_found_as_zero: true }
120//     }
121// }
122
123impl NetworkNode {
124    /// Create a new NetworkNode
125    pub(crate) fn new<T: Into<String>>(
126        name: T,
127        ws_uri: T,
128        prometheus_uri: T,
129        multiaddr: T,
130        spec: NodeSpec,
131        inner: DynNode,
132    ) -> Self {
133        Self {
134            name: name.into(),
135            ws_uri: ws_uri.into(),
136            prometheus_uri: prometheus_uri.into(),
137            inner,
138            spec,
139            multiaddr: multiaddr.into(),
140            metrics_cache: Arc::new(Default::default()),
141            is_running: Arc::new(AtomicBool::new(false)),
142        }
143    }
144
145    pub(crate) fn is_running(&self) -> bool {
146        self.is_running.load(Ordering::Acquire)
147    }
148
149    pub(crate) fn set_is_running(&self, is_running: bool) {
150        self.is_running.store(is_running, Ordering::Release);
151    }
152
153    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
154        self.multiaddr = multiaddr.into();
155    }
156
157    pub fn name(&self) -> &str {
158        &self.name
159    }
160
161    pub fn args(&self) -> Vec<&str> {
162        self.inner.args()
163    }
164
165    pub fn spec(&self) -> &NodeSpec {
166        &self.spec
167    }
168
169    pub fn ws_uri(&self) -> &str {
170        &self.ws_uri
171    }
172
173    pub fn multiaddr(&self) -> &str {
174        self.multiaddr.as_ref()
175    }
176
177    // Subxt
178
179    /// Get the rpc client for the node
180    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
181        get_client_from_url(&self.ws_uri).await
182    }
183
184    /// Get the [online client](subxt::client::OnlineClient) for the node
185    #[deprecated = "Use `wait_client` instead."]
186    pub async fn client<Config: subxt::Config>(
187        &self,
188    ) -> Result<OnlineClient<Config>, subxt::Error> {
189        self.try_client().await
190    }
191
192    /// Try to connect to the node.
193    ///
194    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
195    /// the node to appear before it connects to it. This function directly tries
196    /// to connect to the node and returns an error if the node is not yet available
197    /// at that point in time.
198    ///
199    /// Returns a [`OnlineClient`] on success.
200    pub async fn try_client<Config: subxt::Config>(
201        &self,
202    ) -> Result<OnlineClient<Config>, subxt::Error> {
203        get_client_from_url(&self.ws_uri).await
204    }
205
206    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
207    pub async fn wait_client<Config: subxt::Config>(
208        &self,
209    ) -> Result<OnlineClient<Config>, anyhow::Error> {
210        debug!("wait_client ws_uri: {}", self.ws_uri());
211        wait_ws_ready(self.ws_uri())
212            .await
213            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
214
215        self.try_client()
216            .await
217            .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
218    }
219
220    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
221    pub async fn wait_client_with_timeout<Config: subxt::Config>(
222        &self,
223        timeout_secs: impl Into<u64>,
224    ) -> Result<OnlineClient<Config>, anyhow::Error> {
225        debug!("waiting until subxt client is ready");
226        tokio::time::timeout(
227            Duration::from_secs(timeout_secs.into()),
228            self.wait_client::<Config>(),
229        )
230        .await?
231    }
232
233    // Commands
234
235    /// Pause the node, this is implemented by pausing the
236    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
237    pub async fn pause(&self) -> Result<(), anyhow::Error> {
238        self.set_is_running(false);
239        self.inner.pause().await?;
240        Ok(())
241    }
242
243    /// Resume the node, this is implemented by resuming the
244    /// actual process (e.g polkadot) with sending `SIGCONT` signal
245    pub async fn resume(&self) -> Result<(), anyhow::Error> {
246        self.set_is_running(true);
247        self.inner.resume().await?;
248        Ok(())
249    }
250
251    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
252    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
253        self.set_is_running(false);
254        self.inner.restart(after).await?;
255        self.set_is_running(true);
256        Ok(())
257    }
258
259    // Metrics assertions
260
261    /// Get metric value 'by name' from Prometheus (exposed by the node)
262    /// metric name can be:
263    /// with prefix (e.g: 'polkadot_')
264    /// with chain attribute (e.g: 'chain=rococo-local')
265    /// without prefix and/or without chain attribute
266    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
267        let metric_name = metric_name.into();
268        // force cache reload
269        self.fetch_metrics().await?;
270        // by default we treat not found as 0 (same in v1)
271        self.metric(&metric_name, true).await
272    }
273
274    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
275    /// metric name can be:
276    /// with prefix (e.g: 'polkadot_')
277    /// with chain attribute (e.g: 'chain=rococo-local')
278    /// without prefix and/or without chain attribute
279    ///
280    /// We first try to assert on the value using the cached metrics and
281    /// if not meet the criteria we reload the cache and check again
282    pub async fn assert(
283        &self,
284        metric_name: impl Into<String>,
285        value: impl Into<f64>,
286    ) -> Result<bool, anyhow::Error> {
287        let value: f64 = value.into();
288        self.assert_with(metric_name, |v| v == value).await
289    }
290
291    /// Assert on a metric value using a given predicate.
292    /// See [`NetworkNode::reports`] description for details on metric name.
293    pub async fn assert_with(
294        &self,
295        metric_name: impl Into<String>,
296        predicate: impl Fn(f64) -> bool,
297    ) -> Result<bool, anyhow::Error> {
298        let metric_name = metric_name.into();
299        // reload metrics
300        self.fetch_metrics().await?;
301        let val = self.metric(&metric_name, true).await?;
302        trace!("🔎 Current value {val} passed to the predicated?");
303        Ok(predicate(val))
304    }
305
306    // Wait methods for metrics
307
308    /// Wait until a metric value pass the `predicate`
309    pub async fn wait_metric(
310        &self,
311        metric_name: impl Into<String>,
312        predicate: impl Fn(f64) -> bool,
313    ) -> Result<(), anyhow::Error> {
314        let metric_name = metric_name.into();
315        debug!("waiting until metric {metric_name} pass the predicate");
316        loop {
317            let res = self.assert_with(&metric_name, &predicate).await;
318            match res {
319                Ok(res) => {
320                    if res {
321                        return Ok(());
322                    }
323                },
324                Err(e) => match e.downcast::<reqwest::Error>() {
325                    Ok(io_err) => {
326                        if !skip_err_while_waiting(&io_err) {
327                            return Err(io_err.into());
328                        }
329                    },
330                    Err(other) => {
331                        match other.downcast::<NetworkNodeError>() {
332                            Ok(node_err) => {
333                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
334                                    return Err(node_err.into());
335                                }
336                            },
337                            Err(other) => return Err(other),
338                        };
339                    },
340                },
341            }
342
343            // sleep to not spam prometheus
344            tokio::time::sleep(Duration::from_secs(1)).await;
345        }
346    }
347
348    /// Wait until a metric value pass the `predicate`
349    /// with a timeout (secs)
350    pub async fn wait_metric_with_timeout(
351        &self,
352        metric_name: impl Into<String>,
353        predicate: impl Fn(f64) -> bool,
354        timeout_secs: impl Into<u64>,
355    ) -> Result<(), anyhow::Error> {
356        let metric_name = metric_name.into();
357        let secs = timeout_secs.into();
358        debug!("waiting until metric {metric_name} pass the predicate");
359        let res = tokio::time::timeout(
360            Duration::from_secs(secs),
361            self.wait_metric(&metric_name, predicate),
362        )
363        .await;
364
365        if let Ok(inner_res) = res {
366            match inner_res {
367                Ok(_) => Ok(()),
368                Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
369            }
370        } else {
371            // timeout
372            Err(anyhow!(
373                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
374            ))
375        }
376    }
377
378    // Logs
379
380    /// Get the logs of the node
381    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
382    pub async fn logs(&self) -> Result<String, anyhow::Error> {
383        Ok(self.inner.logs().await?)
384    }
385
386    /// Wait until a the number of matching log lines is reach
387    pub async fn wait_log_line_count(
388        &self,
389        pattern: impl Into<String>,
390        is_glob: bool,
391        count: usize,
392    ) -> Result<(), anyhow::Error> {
393        let pattern = pattern.into();
394        let pattern_clone = pattern.clone();
395        debug!("waiting until we find pattern {pattern} {count} times");
396        let match_fn: BoxedClosure = if is_glob {
397            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
398        } else {
399            let re = Regex::new(&pattern)?;
400            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
401        };
402
403        loop {
404            let mut q = 0_usize;
405            let logs = self.logs().await?;
406            for line in logs.lines() {
407                trace!("line is {line}");
408                if match_fn(line)? {
409                    trace!("pattern {pattern_clone} match in line {line}");
410                    q += 1;
411                    if q >= count {
412                        return Ok(());
413                    }
414                }
415            }
416
417            tokio::time::sleep(Duration::from_secs(2)).await;
418        }
419    }
420
421    /// Waits until the number of matching log lines satisfies a custom condition,
422    /// optionally waiting for the entire duration of the timeout.
423    ///
424    /// This method searches log lines for a given substring or glob pattern,
425    /// and evaluates the number of matching lines using a user-provided predicate function.
426    /// Optionally, it can wait for the full timeout duration to ensure the condition
427    /// holds consistently (e.g., for verifying absence of logs).
428    ///
429    /// # Arguments
430    /// * `substring` - The substring or pattern to match within log lines.
431    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
432    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
433    ///
434    /// # Returns
435    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
436    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
437    /// * `Err(e)` if an error occurred during log retrieval or matching.
438    ///
439    /// # Example
440    /// ```rust
441    /// # use std::{sync::Arc, time::Duration};
442    /// # use provider::NativeProvider;
443    /// # use support::{fs::local::LocalFileSystem};
444    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
445    /// # use configuration::NetworkConfig;
446    /// # async fn example() -> Result<(), anyhow::Error> {
447    /// #   let provider = NativeProvider::new(LocalFileSystem {});
448    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
449    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
450    /// #   let network = orchestrator.spawn(config).await?;
451    /// let node = network.get_node("alice")?;
452    /// // Wait (up to 10 seconds) until pattern occurs once
453    /// let options = LogLineCountOptions {
454    ///     predicate: Arc::new(|count| count == 1),
455    ///     timeout: Duration::from_secs(10),
456    ///     wait_until_timeout_elapses: false,
457    /// };
458    /// let result = node
459    ///     .wait_log_line_count_with_timeout("error", false, options)
460    ///     .await?;
461    /// #   Ok(())
462    /// # }
463    /// ```
464    pub async fn wait_log_line_count_with_timeout(
465        &self,
466        substring: impl Into<String>,
467        is_glob: bool,
468        options: LogLineCountOptions,
469    ) -> Result<LogLineCount, anyhow::Error> {
470        let substring = substring.into();
471        debug!(
472            "waiting until match lines count within {} seconds",
473            options.timeout.as_secs_f64()
474        );
475
476        let start = tokio::time::Instant::now();
477
478        let match_fn: BoxedClosure = if is_glob {
479            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
480        } else {
481            let re = Regex::new(&substring)?;
482            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
483        };
484
485        if options.wait_until_timeout_elapses {
486            tokio::time::sleep(options.timeout).await;
487        }
488
489        let mut q;
490        loop {
491            q = 0_u32;
492            let logs = self.logs().await?;
493            for line in logs.lines() {
494                if match_fn(line)? {
495                    q += 1;
496
497                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
498                    // end after the whole log file is processed. This is to address the cases when the
499                    // predicate becomes true and false again.
500                    // eg. expected exactly 2 matching lines are expected but 3 are present
501                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
502                        return Ok(LogLineCount::TargetReached(q));
503                    }
504                }
505            }
506
507            if start.elapsed() >= options.timeout {
508                break;
509            }
510
511            tokio::time::sleep(Duration::from_secs(2)).await;
512        }
513
514        if (options.predicate)(q) {
515            Ok(LogLineCount::TargetReached(q))
516        } else {
517            Ok(LogLineCount::TargetFailed(q))
518        }
519    }
520
521    // TODO: impl
522    // wait_event_count
523    // wait_event_count_with_timeout
524
525    #[cfg(feature = "pjs")]
526    /// Execute js/ts code inside [pjs_rs] custom runtime.
527    ///
528    /// The code will be run in a wrapper similar to the `javascript` developer tab
529    /// of polkadot.js apps. The returning value is represented as [PjsResult] enum, to allow
530    /// to communicate that the execution was successful but the returning value can be deserialized as [serde_json::Value].
531    pub async fn pjs(
532        &self,
533        code: impl AsRef<str>,
534        args: Vec<serde_json::Value>,
535        user_types: Option<serde_json::Value>,
536    ) -> Result<PjsResult, anyhow::Error> {
537        let code = pjs_build_template(self.ws_uri(), code.as_ref(), args, user_types);
538        tracing::trace!("Code to execute: {code}");
539        let value = match pjs_exec(code)? {
540            ReturnValue::Deserialized(val) => Ok(val),
541            ReturnValue::CantDeserialize(msg) => Err(msg),
542        };
543
544        Ok(value)
545    }
546
547    #[cfg(feature = "pjs")]
548    /// Execute js/ts file  inside [pjs_rs] custom runtime.
549    ///
550    /// The content of the file will be run in a wrapper similar to the `javascript` developer tab
551    /// of polkadot.js apps. The returning value is represented as [PjsResult] enum, to allow
552    /// to communicate that the execution was successful but the returning value can be deserialized as [serde_json::Value].
553    pub async fn pjs_file(
554        &self,
555        file: impl AsRef<std::path::Path>,
556        args: Vec<serde_json::Value>,
557        user_types: Option<serde_json::Value>,
558    ) -> Result<PjsResult, anyhow::Error> {
559        let content = std::fs::read_to_string(file)?;
560        self.pjs(content, args, user_types).await
561    }
562
563    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
564        let response = reqwest::get(&self.prometheus_uri).await?;
565        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
566        let mut cache = self.metrics_cache.write().await;
567        *cache = metrics;
568        Ok(())
569    }
570
571    /// Query individual metric by name
572    async fn metric(
573        &self,
574        metric_name: &str,
575        treat_not_found_as_zero: bool,
576    ) -> Result<f64, anyhow::Error> {
577        let mut metrics_map = self.metrics_cache.read().await;
578        if metrics_map.is_empty() {
579            // reload metrics
580            drop(metrics_map);
581            self.fetch_metrics().await?;
582            metrics_map = self.metrics_cache.read().await;
583        }
584
585        if let Some(val) = metrics_map.get(metric_name) {
586            Ok(*val)
587        } else if treat_not_found_as_zero {
588            Ok(0_f64)
589        } else {
590            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
591        }
592    }
593
594    /// Waits given number of seconds until node reports that it is up and running, which
595    /// is determined by metric 'process_start_time_seconds', which should appear,
596    /// when node finished booting up.
597    ///
598    ///
599    /// # Arguments
600    /// * `timeout_secs` - The number of seconds to wait.
601    ///
602    /// # Returns
603    /// * `Ok()` if the node is up before timeout occured.
604    /// * `Err(e)` if timeout or other error occurred while waiting.
605    pub async fn wait_until_is_up(
606        &self,
607        timeout_secs: impl Into<u64>,
608    ) -> Result<(), anyhow::Error> {
609        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
610            .await
611            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
612    }
613}
614
615impl std::fmt::Debug for NetworkNode {
616    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617        f.debug_struct("NetworkNode")
618            .field("inner", &"inner_skipped")
619            .field("spec", &self.spec)
620            .field("name", &self.name)
621            .field("ws_uri", &self.ws_uri)
622            .field("prometheus_uri", &self.prometheus_uri)
623            .finish()
624    }
625}
626
627// TODO: mock and impl more unit tests
628#[cfg(test)]
629mod tests {
630    use std::{
631        path::{Path, PathBuf},
632        sync::{Arc, Mutex},
633    };
634
635    use async_trait::async_trait;
636    use provider::{types::*, ProviderError, ProviderNode};
637
638    use super::*;
639
640    struct MockNode {
641        logs: Arc<Mutex<Vec<String>>>,
642    }
643
644    impl MockNode {
645        fn new() -> Self {
646            Self {
647                logs: Arc::new(Mutex::new(vec![])),
648            }
649        }
650
651        fn logs_push(&self, lines: Vec<impl Into<String>>) {
652            self.logs
653                .lock()
654                .unwrap()
655                .extend(lines.into_iter().map(|l| l.into()));
656        }
657    }
658
659    #[async_trait]
660    impl ProviderNode for MockNode {
661        fn name(&self) -> &str {
662            todo!()
663        }
664
665        fn args(&self) -> Vec<&str> {
666            todo!()
667        }
668
669        fn base_dir(&self) -> &PathBuf {
670            todo!()
671        }
672
673        fn config_dir(&self) -> &PathBuf {
674            todo!()
675        }
676
677        fn data_dir(&self) -> &PathBuf {
678            todo!()
679        }
680
681        fn relay_data_dir(&self) -> &PathBuf {
682            todo!()
683        }
684
685        fn scripts_dir(&self) -> &PathBuf {
686            todo!()
687        }
688
689        fn log_path(&self) -> &PathBuf {
690            todo!()
691        }
692
693        fn log_cmd(&self) -> String {
694            todo!()
695        }
696
697        fn path_in_node(&self, _file: &Path) -> PathBuf {
698            todo!()
699        }
700
701        async fn logs(&self) -> Result<String, ProviderError> {
702            Ok(self.logs.lock().unwrap().join("\n"))
703        }
704
705        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
706            todo!()
707        }
708
709        async fn run_command(
710            &self,
711            _options: RunCommandOptions,
712        ) -> Result<ExecutionResult, ProviderError> {
713            todo!()
714        }
715
716        async fn run_script(
717            &self,
718            _options: RunScriptOptions,
719        ) -> Result<ExecutionResult, ProviderError> {
720            todo!()
721        }
722
723        async fn send_file(
724            &self,
725            _local_file_path: &Path,
726            _remote_file_path: &Path,
727            _mode: &str,
728        ) -> Result<(), ProviderError> {
729            todo!()
730        }
731
732        async fn receive_file(
733            &self,
734            _remote_file_path: &Path,
735            _local_file_path: &Path,
736        ) -> Result<(), ProviderError> {
737            todo!()
738        }
739
740        async fn pause(&self) -> Result<(), ProviderError> {
741            todo!()
742        }
743
744        async fn resume(&self) -> Result<(), ProviderError> {
745            todo!()
746        }
747
748        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
749            todo!()
750        }
751
752        async fn destroy(&self) -> Result<(), ProviderError> {
753            todo!()
754        }
755    }
756
757    #[tokio::test(flavor = "multi_thread")]
758    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
759        let mock_provider = Arc::new(MockNode::new());
760        let mock_node = NetworkNode::new(
761            "node1",
762            "ws_uri",
763            "prometheus_uri",
764            "multiaddr",
765            NodeSpec::default(),
766            mock_provider.clone(),
767        );
768
769        mock_provider.logs_push(vec![
770            "system booting",
771            "stub line 1",
772            "stub line 2",
773            "system ready",
774        ]);
775
776        // Wait (up to 10 seconds) until pattern occurs once
777        let options = LogLineCountOptions {
778            predicate: Arc::new(|n| n == 1),
779            timeout: Duration::from_secs(10),
780            wait_until_timeout_elapses: false,
781        };
782
783        let log_line_count = mock_node
784            .wait_log_line_count_with_timeout("system ready", false, options)
785            .await?;
786
787        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
788
789        Ok(())
790    }
791
792    #[tokio::test(flavor = "multi_thread")]
793    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
794        let mock_provider = Arc::new(MockNode::new());
795        let mock_node = NetworkNode::new(
796            "node1",
797            "ws_uri",
798            "prometheus_uri",
799            "multiaddr",
800            NodeSpec::default(),
801            mock_provider.clone(),
802        );
803
804        mock_provider.logs_push(vec![
805            "system booting",
806            "stub line 1",
807            "stub line 2",
808            "system ready",
809        ]);
810
811        // Wait (up to 4 seconds) until pattern occurs twice
812        let options = LogLineCountOptions {
813            predicate: Arc::new(|n| n == 2),
814            timeout: Duration::from_secs(4),
815            wait_until_timeout_elapses: false,
816        };
817
818        let task = tokio::spawn({
819            async move {
820                mock_node
821                    .wait_log_line_count_with_timeout("system ready", false, options)
822                    .await
823                    .unwrap()
824            }
825        });
826
827        tokio::time::sleep(Duration::from_secs(2)).await;
828
829        mock_provider.logs_push(vec!["system ready"]);
830
831        let log_line_count = task.await?;
832
833        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
834
835        Ok(())
836    }
837
838    #[tokio::test(flavor = "multi_thread")]
839    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
840        let mock_provider = Arc::new(MockNode::new());
841        let mock_node = NetworkNode::new(
842            "node1",
843            "ws_uri",
844            "prometheus_uri",
845            "multiaddr",
846            NodeSpec::default(),
847            mock_provider.clone(),
848        );
849
850        mock_provider.logs_push(vec![
851            "system booting",
852            "stub line 1",
853            "stub line 2",
854            "system ready",
855        ]);
856
857        // Wait (up to 2 seconds) until pattern occurs twice
858        let options = LogLineCountOptions {
859            predicate: Arc::new(|n| n == 2),
860            timeout: Duration::from_secs(2),
861            wait_until_timeout_elapses: false,
862        };
863
864        let log_line_count = mock_node
865            .wait_log_line_count_with_timeout("system ready", false, options)
866            .await?;
867
868        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
869
870        Ok(())
871    }
872
873    #[tokio::test(flavor = "multi_thread")]
874    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
875        let mock_provider = Arc::new(MockNode::new());
876        let mock_node = NetworkNode::new(
877            "node1",
878            "ws_uri",
879            "prometheus_uri",
880            "multiaddr",
881            NodeSpec::default(),
882            mock_provider.clone(),
883        );
884
885        mock_provider.logs_push(vec![
886            "system booting",
887            "stub line 1",
888            "stub line 2",
889            "system ready",
890        ]);
891
892        // Wait until timeout and check if pattern occurs exactly twice
893        let options = LogLineCountOptions {
894            predicate: Arc::new(|n| n == 2),
895            timeout: Duration::from_secs(2),
896            wait_until_timeout_elapses: true,
897        };
898
899        let task = tokio::spawn({
900            async move {
901                mock_node
902                    .wait_log_line_count_with_timeout("system ready", false, options)
903                    .await
904                    .unwrap()
905            }
906        });
907
908        tokio::time::sleep(Duration::from_secs(1)).await;
909
910        mock_provider.logs_push(vec!["system ready"]);
911        mock_provider.logs_push(vec!["system ready"]);
912
913        let log_line_count = task.await?;
914
915        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
916
917        Ok(())
918    }
919
920    #[tokio::test(flavor = "multi_thread")]
921    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
922        let mock_provider = Arc::new(MockNode::new());
923        let mock_node = NetworkNode::new(
924            "node1",
925            "ws_uri",
926            "prometheus_uri",
927            "multiaddr",
928            NodeSpec::default(),
929            mock_provider.clone(),
930        );
931
932        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
933
934        let task = tokio::spawn({
935            async move {
936                mock_node
937                    .wait_log_line_count_with_timeout(
938                        "system ready",
939                        false,
940                        // Wait until timeout and make sure pattern occurred zero times
941                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
942                    )
943                    .await
944                    .unwrap()
945            }
946        });
947
948        tokio::time::sleep(Duration::from_secs(1)).await;
949
950        mock_provider.logs_push(vec!["stub line 3"]);
951
952        assert!(task.await?.success());
953
954        Ok(())
955    }
956
957    #[tokio::test(flavor = "multi_thread")]
958    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
959        let mock_provider = Arc::new(MockNode::new());
960        let mock_node = NetworkNode::new(
961            "node1",
962            "ws_uri",
963            "prometheus_uri",
964            "multiaddr",
965            NodeSpec::default(),
966            mock_provider.clone(),
967        );
968
969        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
970
971        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
972        let options = LogLineCountOptions {
973            predicate: Arc::new(|n| (2..=5).contains(&n)),
974            timeout: Duration::from_secs(2),
975            wait_until_timeout_elapses: true,
976        };
977
978        let task = tokio::spawn({
979            async move {
980                mock_node
981                    .wait_log_line_count_with_timeout("system ready", false, options)
982                    .await
983                    .unwrap()
984            }
985        });
986
987        tokio::time::sleep(Duration::from_secs(1)).await;
988
989        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
990
991        assert!(task.await?.success());
992
993        Ok(())
994    }
995
996    #[tokio::test(flavor = "multi_thread")]
997    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
998        let mock_provider = Arc::new(MockNode::new());
999        let mock_node = NetworkNode::new(
1000            "node1",
1001            "ws_uri",
1002            "prometheus_uri",
1003            "multiaddr",
1004            NodeSpec::default(),
1005            mock_provider.clone(),
1006        );
1007
1008        mock_provider.logs_push(vec![
1009            "system booting",
1010            "stub line 1",
1011            // this line should not match
1012            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1013            "stub line 2"
1014        ]);
1015
1016        let options = LogLineCountOptions {
1017            predicate: Arc::new(|n| n == 1),
1018            timeout: Duration::from_secs(3),
1019            wait_until_timeout_elapses: true,
1020        };
1021
1022        let task = tokio::spawn({
1023            async move {
1024                mock_node
1025                    .wait_log_line_count_with_timeout(
1026                        "error(?! importing block .*: block has an unknown parent)",
1027                        false,
1028                        options,
1029                    )
1030                    .await
1031                    .unwrap()
1032            }
1033        });
1034
1035        tokio::time::sleep(Duration::from_secs(1)).await;
1036
1037        mock_provider.logs_push(vec![
1038            "system ready",
1039            // this line should match
1040            "system error",
1041            "system ready",
1042        ]);
1043
1044        assert!(task.await?.success());
1045
1046        Ok(())
1047    }
1048
1049    #[tokio::test(flavor = "multi_thread")]
1050    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1051    ) -> Result<(), anyhow::Error> {
1052        let mock_provider = Arc::new(MockNode::new());
1053        let mock_node = NetworkNode::new(
1054            "node1",
1055            "ws_uri",
1056            "prometheus_uri",
1057            "multiaddr",
1058            NodeSpec::default(),
1059            mock_provider.clone(),
1060        );
1061
1062        mock_provider.logs_push(vec![
1063            "system booting",
1064            "stub line 1",
1065            // this line should not match
1066            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1067            "stub line 2"
1068        ]);
1069
1070        let options = LogLineCountOptions {
1071            predicate: Arc::new(|n| n == 1),
1072            timeout: Duration::from_secs(6),
1073            wait_until_timeout_elapses: true,
1074        };
1075
1076        let task = tokio::spawn({
1077            async move {
1078                mock_node
1079                    .wait_log_line_count_with_timeout(
1080                        "error(?! importing block .*: block has an unknown parent)",
1081                        false,
1082                        options,
1083                    )
1084                    .await
1085                    .unwrap()
1086            }
1087        });
1088
1089        tokio::time::sleep(Duration::from_secs(1)).await;
1090
1091        mock_provider.logs_push(vec!["system ready", "system ready"]);
1092
1093        assert!(!task.await?.success());
1094
1095        Ok(())
1096    }
1097
1098    #[tokio::test(flavor = "multi_thread")]
1099    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1100        let mock_provider = Arc::new(MockNode::new());
1101        let mock_node = NetworkNode::new(
1102            "node1",
1103            "ws_uri",
1104            "prometheus_uri",
1105            "multiaddr",
1106            NodeSpec::default(),
1107            mock_provider.clone(),
1108        );
1109
1110        mock_provider.logs_push(vec![
1111            "system booting",
1112            "stub line 1",
1113            // this line should not match
1114            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1115            "stub line 2"
1116        ]);
1117
1118        let task = tokio::spawn({
1119            async move {
1120                mock_node
1121                    .wait_log_line_count(
1122                        "error(?! importing block .*: block has an unknown parent)",
1123                        false,
1124                        1,
1125                    )
1126                    .await
1127                    .unwrap()
1128            }
1129        });
1130
1131        tokio::time::sleep(Duration::from_secs(1)).await;
1132
1133        mock_provider.logs_push(vec![
1134            "system ready",
1135            // this line should match
1136            "system error",
1137            "system ready",
1138        ]);
1139
1140        assert!(task.await.is_ok());
1141
1142        Ok(())
1143    }
1144
1145    #[tokio::test(flavor = "multi_thread")]
1146    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1147        let mock_provider = Arc::new(MockNode::new());
1148        let mock_node = NetworkNode::new(
1149            "node1",
1150            "ws_uri",
1151            "prometheus_uri",
1152            "multiaddr",
1153            NodeSpec::default(),
1154            mock_provider.clone(),
1155        );
1156
1157        mock_provider.logs_push(vec![
1158            "system booting",
1159            "stub line 1",
1160            // this line should not match
1161            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1162            "stub line 2"
1163        ]);
1164
1165        let options = LogLineCountOptions {
1166            predicate: Arc::new(|count| count == 1),
1167            timeout: Duration::from_secs(2),
1168            wait_until_timeout_elapses: true,
1169        };
1170
1171        let task = tokio::spawn({
1172            async move {
1173                // we expect no match, thus wait with timeout
1174                mock_node
1175                    .wait_log_line_count_with_timeout(
1176                        "error(?! importing block .*: block has an unknown parent)",
1177                        false,
1178                        options,
1179                    )
1180                    .await
1181                    .unwrap()
1182            }
1183        });
1184
1185        tokio::time::sleep(Duration::from_secs(1)).await;
1186
1187        mock_provider.logs_push(vec!["system ready", "system ready"]);
1188
1189        assert!(!task.await?.success());
1190
1191        Ok(())
1192    }
1193}