1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::Serialize;
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21#[cfg(feature = "pjs")]
22use crate::pjs_helper::{pjs_build_template, pjs_exec, PjsResult, ReturnValue};
23use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
24
25type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
26
27#[derive(Error, Debug)]
28pub enum NetworkNodeError {
29 #[error("metric '{0}' not found!")]
30 MetricNotFound(String),
31}
32
33#[derive(Clone, Serialize)]
34pub struct NetworkNode {
35 #[serde(skip)]
36 pub(crate) inner: DynNode,
37 pub(crate) spec: NodeSpec,
40 pub(crate) name: String,
41 pub(crate) ws_uri: String,
42 pub(crate) multiaddr: String,
43 pub(crate) prometheus_uri: String,
44 #[serde(skip)]
45 metrics_cache: Arc<RwLock<MetricMap>>,
46 #[serde(skip)]
47 is_running: Arc<AtomicBool>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum LogLineCount {
61 TargetReached(u32),
62 TargetFailed(u32),
63}
64
65impl LogLineCount {
66 pub fn success(&self) -> bool {
67 match self {
68 Self::TargetReached(..) => true,
69 Self::TargetFailed(..) => false,
70 }
71 }
72}
73
74#[derive(Clone)]
87pub struct LogLineCountOptions {
88 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
89 pub timeout: Duration,
90 pub wait_until_timeout_elapses: bool,
91}
92
93impl LogLineCountOptions {
94 pub fn new(
95 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
96 timeout: Duration,
97 wait_until_timeout_elapses: bool,
98 ) -> Self {
99 Self {
100 predicate: Arc::new(predicate),
101 timeout,
102 wait_until_timeout_elapses,
103 }
104 }
105
106 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
107 Self::new(|n| n == 0, timeout, true)
108 }
109}
110
111impl NetworkNode {
124 pub(crate) fn new<T: Into<String>>(
126 name: T,
127 ws_uri: T,
128 prometheus_uri: T,
129 multiaddr: T,
130 spec: NodeSpec,
131 inner: DynNode,
132 ) -> Self {
133 Self {
134 name: name.into(),
135 ws_uri: ws_uri.into(),
136 prometheus_uri: prometheus_uri.into(),
137 inner,
138 spec,
139 multiaddr: multiaddr.into(),
140 metrics_cache: Arc::new(Default::default()),
141 is_running: Arc::new(AtomicBool::new(false)),
142 }
143 }
144
145 pub(crate) fn is_running(&self) -> bool {
146 self.is_running.load(Ordering::Acquire)
147 }
148
149 pub(crate) fn set_is_running(&self, is_running: bool) {
150 self.is_running.store(is_running, Ordering::Release);
151 }
152
153 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
154 self.multiaddr = multiaddr.into();
155 }
156
157 pub fn name(&self) -> &str {
158 &self.name
159 }
160
161 pub fn args(&self) -> Vec<&str> {
162 self.inner.args()
163 }
164
165 pub fn spec(&self) -> &NodeSpec {
166 &self.spec
167 }
168
169 pub fn ws_uri(&self) -> &str {
170 &self.ws_uri
171 }
172
173 pub fn multiaddr(&self) -> &str {
174 self.multiaddr.as_ref()
175 }
176
177 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
181 get_client_from_url(&self.ws_uri).await
182 }
183
184 #[deprecated = "Use `wait_client` instead."]
186 pub async fn client<Config: subxt::Config>(
187 &self,
188 ) -> Result<OnlineClient<Config>, subxt::Error> {
189 self.try_client().await
190 }
191
192 pub async fn try_client<Config: subxt::Config>(
201 &self,
202 ) -> Result<OnlineClient<Config>, subxt::Error> {
203 get_client_from_url(&self.ws_uri).await
204 }
205
206 pub async fn wait_client<Config: subxt::Config>(
208 &self,
209 ) -> Result<OnlineClient<Config>, anyhow::Error> {
210 debug!("wait_client ws_uri: {}", self.ws_uri());
211 wait_ws_ready(self.ws_uri())
212 .await
213 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
214
215 self.try_client()
216 .await
217 .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
218 }
219
220 pub async fn wait_client_with_timeout<Config: subxt::Config>(
222 &self,
223 timeout_secs: impl Into<u64>,
224 ) -> Result<OnlineClient<Config>, anyhow::Error> {
225 debug!("waiting until subxt client is ready");
226 tokio::time::timeout(
227 Duration::from_secs(timeout_secs.into()),
228 self.wait_client::<Config>(),
229 )
230 .await?
231 }
232
233 pub async fn pause(&self) -> Result<(), anyhow::Error> {
238 self.set_is_running(false);
239 self.inner.pause().await?;
240 Ok(())
241 }
242
243 pub async fn resume(&self) -> Result<(), anyhow::Error> {
246 self.set_is_running(true);
247 self.inner.resume().await?;
248 Ok(())
249 }
250
251 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
253 self.set_is_running(false);
254 self.inner.restart(after).await?;
255 self.set_is_running(true);
256 Ok(())
257 }
258
259 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
267 let metric_name = metric_name.into();
268 self.fetch_metrics().await?;
270 self.metric(&metric_name, true).await
272 }
273
274 pub async fn assert(
283 &self,
284 metric_name: impl Into<String>,
285 value: impl Into<f64>,
286 ) -> Result<bool, anyhow::Error> {
287 let value: f64 = value.into();
288 self.assert_with(metric_name, |v| v == value).await
289 }
290
291 pub async fn assert_with(
294 &self,
295 metric_name: impl Into<String>,
296 predicate: impl Fn(f64) -> bool,
297 ) -> Result<bool, anyhow::Error> {
298 let metric_name = metric_name.into();
299 self.fetch_metrics().await?;
301 let val = self.metric(&metric_name, true).await?;
302 trace!("🔎 Current value {val} passed to the predicated?");
303 Ok(predicate(val))
304 }
305
306 pub async fn wait_metric(
310 &self,
311 metric_name: impl Into<String>,
312 predicate: impl Fn(f64) -> bool,
313 ) -> Result<(), anyhow::Error> {
314 let metric_name = metric_name.into();
315 debug!("waiting until metric {metric_name} pass the predicate");
316 loop {
317 let res = self.assert_with(&metric_name, &predicate).await;
318 match res {
319 Ok(res) => {
320 if res {
321 return Ok(());
322 }
323 },
324 Err(e) => match e.downcast::<reqwest::Error>() {
325 Ok(io_err) => {
326 if !skip_err_while_waiting(&io_err) {
327 return Err(io_err.into());
328 }
329 },
330 Err(other) => {
331 match other.downcast::<NetworkNodeError>() {
332 Ok(node_err) => {
333 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
334 return Err(node_err.into());
335 }
336 },
337 Err(other) => return Err(other),
338 };
339 },
340 },
341 }
342
343 tokio::time::sleep(Duration::from_secs(1)).await;
345 }
346 }
347
348 pub async fn wait_metric_with_timeout(
351 &self,
352 metric_name: impl Into<String>,
353 predicate: impl Fn(f64) -> bool,
354 timeout_secs: impl Into<u64>,
355 ) -> Result<(), anyhow::Error> {
356 let metric_name = metric_name.into();
357 let secs = timeout_secs.into();
358 debug!("waiting until metric {metric_name} pass the predicate");
359 let res = tokio::time::timeout(
360 Duration::from_secs(secs),
361 self.wait_metric(&metric_name, predicate),
362 )
363 .await;
364
365 if let Ok(inner_res) = res {
366 match inner_res {
367 Ok(_) => Ok(()),
368 Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
369 }
370 } else {
371 Err(anyhow!(
373 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
374 ))
375 }
376 }
377
378 pub async fn logs(&self) -> Result<String, anyhow::Error> {
383 Ok(self.inner.logs().await?)
384 }
385
386 pub async fn wait_log_line_count(
388 &self,
389 pattern: impl Into<String>,
390 is_glob: bool,
391 count: usize,
392 ) -> Result<(), anyhow::Error> {
393 let pattern = pattern.into();
394 let pattern_clone = pattern.clone();
395 debug!("waiting until we find pattern {pattern} {count} times");
396 let match_fn: BoxedClosure = if is_glob {
397 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
398 } else {
399 let re = Regex::new(&pattern)?;
400 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
401 };
402
403 loop {
404 let mut q = 0_usize;
405 let logs = self.logs().await?;
406 for line in logs.lines() {
407 trace!("line is {line}");
408 if match_fn(line)? {
409 trace!("pattern {pattern_clone} match in line {line}");
410 q += 1;
411 if q >= count {
412 return Ok(());
413 }
414 }
415 }
416
417 tokio::time::sleep(Duration::from_secs(2)).await;
418 }
419 }
420
421 pub async fn wait_log_line_count_with_timeout(
465 &self,
466 substring: impl Into<String>,
467 is_glob: bool,
468 options: LogLineCountOptions,
469 ) -> Result<LogLineCount, anyhow::Error> {
470 let substring = substring.into();
471 debug!(
472 "waiting until match lines count within {} seconds",
473 options.timeout.as_secs_f64()
474 );
475
476 let start = tokio::time::Instant::now();
477
478 let match_fn: BoxedClosure = if is_glob {
479 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
480 } else {
481 let re = Regex::new(&substring)?;
482 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
483 };
484
485 if options.wait_until_timeout_elapses {
486 tokio::time::sleep(options.timeout).await;
487 }
488
489 let mut q;
490 loop {
491 q = 0_u32;
492 let logs = self.logs().await?;
493 for line in logs.lines() {
494 if match_fn(line)? {
495 q += 1;
496
497 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
502 return Ok(LogLineCount::TargetReached(q));
503 }
504 }
505 }
506
507 if start.elapsed() >= options.timeout {
508 break;
509 }
510
511 tokio::time::sleep(Duration::from_secs(2)).await;
512 }
513
514 if (options.predicate)(q) {
515 Ok(LogLineCount::TargetReached(q))
516 } else {
517 Ok(LogLineCount::TargetFailed(q))
518 }
519 }
520
521 #[cfg(feature = "pjs")]
526 pub async fn pjs(
532 &self,
533 code: impl AsRef<str>,
534 args: Vec<serde_json::Value>,
535 user_types: Option<serde_json::Value>,
536 ) -> Result<PjsResult, anyhow::Error> {
537 let code = pjs_build_template(self.ws_uri(), code.as_ref(), args, user_types);
538 tracing::trace!("Code to execute: {code}");
539 let value = match pjs_exec(code)? {
540 ReturnValue::Deserialized(val) => Ok(val),
541 ReturnValue::CantDeserialize(msg) => Err(msg),
542 };
543
544 Ok(value)
545 }
546
547 #[cfg(feature = "pjs")]
548 pub async fn pjs_file(
554 &self,
555 file: impl AsRef<std::path::Path>,
556 args: Vec<serde_json::Value>,
557 user_types: Option<serde_json::Value>,
558 ) -> Result<PjsResult, anyhow::Error> {
559 let content = std::fs::read_to_string(file)?;
560 self.pjs(content, args, user_types).await
561 }
562
563 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
564 let response = reqwest::get(&self.prometheus_uri).await?;
565 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
566 let mut cache = self.metrics_cache.write().await;
567 *cache = metrics;
568 Ok(())
569 }
570
571 async fn metric(
573 &self,
574 metric_name: &str,
575 treat_not_found_as_zero: bool,
576 ) -> Result<f64, anyhow::Error> {
577 let mut metrics_map = self.metrics_cache.read().await;
578 if metrics_map.is_empty() {
579 drop(metrics_map);
581 self.fetch_metrics().await?;
582 metrics_map = self.metrics_cache.read().await;
583 }
584
585 if let Some(val) = metrics_map.get(metric_name) {
586 Ok(*val)
587 } else if treat_not_found_as_zero {
588 Ok(0_f64)
589 } else {
590 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
591 }
592 }
593
594 pub async fn wait_until_is_up(
606 &self,
607 timeout_secs: impl Into<u64>,
608 ) -> Result<(), anyhow::Error> {
609 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
610 .await
611 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
612 }
613}
614
615impl std::fmt::Debug for NetworkNode {
616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617 f.debug_struct("NetworkNode")
618 .field("inner", &"inner_skipped")
619 .field("spec", &self.spec)
620 .field("name", &self.name)
621 .field("ws_uri", &self.ws_uri)
622 .field("prometheus_uri", &self.prometheus_uri)
623 .finish()
624 }
625}
626
627#[cfg(test)]
629mod tests {
630 use std::{
631 path::{Path, PathBuf},
632 sync::{Arc, Mutex},
633 };
634
635 use async_trait::async_trait;
636 use provider::{types::*, ProviderError, ProviderNode};
637
638 use super::*;
639
640 struct MockNode {
641 logs: Arc<Mutex<Vec<String>>>,
642 }
643
644 impl MockNode {
645 fn new() -> Self {
646 Self {
647 logs: Arc::new(Mutex::new(vec![])),
648 }
649 }
650
651 fn logs_push(&self, lines: Vec<impl Into<String>>) {
652 self.logs
653 .lock()
654 .unwrap()
655 .extend(lines.into_iter().map(|l| l.into()));
656 }
657 }
658
659 #[async_trait]
660 impl ProviderNode for MockNode {
661 fn name(&self) -> &str {
662 todo!()
663 }
664
665 fn args(&self) -> Vec<&str> {
666 todo!()
667 }
668
669 fn base_dir(&self) -> &PathBuf {
670 todo!()
671 }
672
673 fn config_dir(&self) -> &PathBuf {
674 todo!()
675 }
676
677 fn data_dir(&self) -> &PathBuf {
678 todo!()
679 }
680
681 fn relay_data_dir(&self) -> &PathBuf {
682 todo!()
683 }
684
685 fn scripts_dir(&self) -> &PathBuf {
686 todo!()
687 }
688
689 fn log_path(&self) -> &PathBuf {
690 todo!()
691 }
692
693 fn log_cmd(&self) -> String {
694 todo!()
695 }
696
697 fn path_in_node(&self, _file: &Path) -> PathBuf {
698 todo!()
699 }
700
701 async fn logs(&self) -> Result<String, ProviderError> {
702 Ok(self.logs.lock().unwrap().join("\n"))
703 }
704
705 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
706 todo!()
707 }
708
709 async fn run_command(
710 &self,
711 _options: RunCommandOptions,
712 ) -> Result<ExecutionResult, ProviderError> {
713 todo!()
714 }
715
716 async fn run_script(
717 &self,
718 _options: RunScriptOptions,
719 ) -> Result<ExecutionResult, ProviderError> {
720 todo!()
721 }
722
723 async fn send_file(
724 &self,
725 _local_file_path: &Path,
726 _remote_file_path: &Path,
727 _mode: &str,
728 ) -> Result<(), ProviderError> {
729 todo!()
730 }
731
732 async fn receive_file(
733 &self,
734 _remote_file_path: &Path,
735 _local_file_path: &Path,
736 ) -> Result<(), ProviderError> {
737 todo!()
738 }
739
740 async fn pause(&self) -> Result<(), ProviderError> {
741 todo!()
742 }
743
744 async fn resume(&self) -> Result<(), ProviderError> {
745 todo!()
746 }
747
748 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
749 todo!()
750 }
751
752 async fn destroy(&self) -> Result<(), ProviderError> {
753 todo!()
754 }
755 }
756
757 #[tokio::test(flavor = "multi_thread")]
758 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
759 let mock_provider = Arc::new(MockNode::new());
760 let mock_node = NetworkNode::new(
761 "node1",
762 "ws_uri",
763 "prometheus_uri",
764 "multiaddr",
765 NodeSpec::default(),
766 mock_provider.clone(),
767 );
768
769 mock_provider.logs_push(vec![
770 "system booting",
771 "stub line 1",
772 "stub line 2",
773 "system ready",
774 ]);
775
776 let options = LogLineCountOptions {
778 predicate: Arc::new(|n| n == 1),
779 timeout: Duration::from_secs(10),
780 wait_until_timeout_elapses: false,
781 };
782
783 let log_line_count = mock_node
784 .wait_log_line_count_with_timeout("system ready", false, options)
785 .await?;
786
787 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
788
789 Ok(())
790 }
791
792 #[tokio::test(flavor = "multi_thread")]
793 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
794 let mock_provider = Arc::new(MockNode::new());
795 let mock_node = NetworkNode::new(
796 "node1",
797 "ws_uri",
798 "prometheus_uri",
799 "multiaddr",
800 NodeSpec::default(),
801 mock_provider.clone(),
802 );
803
804 mock_provider.logs_push(vec![
805 "system booting",
806 "stub line 1",
807 "stub line 2",
808 "system ready",
809 ]);
810
811 let options = LogLineCountOptions {
813 predicate: Arc::new(|n| n == 2),
814 timeout: Duration::from_secs(4),
815 wait_until_timeout_elapses: false,
816 };
817
818 let task = tokio::spawn({
819 async move {
820 mock_node
821 .wait_log_line_count_with_timeout("system ready", false, options)
822 .await
823 .unwrap()
824 }
825 });
826
827 tokio::time::sleep(Duration::from_secs(2)).await;
828
829 mock_provider.logs_push(vec!["system ready"]);
830
831 let log_line_count = task.await?;
832
833 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
834
835 Ok(())
836 }
837
838 #[tokio::test(flavor = "multi_thread")]
839 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
840 let mock_provider = Arc::new(MockNode::new());
841 let mock_node = NetworkNode::new(
842 "node1",
843 "ws_uri",
844 "prometheus_uri",
845 "multiaddr",
846 NodeSpec::default(),
847 mock_provider.clone(),
848 );
849
850 mock_provider.logs_push(vec![
851 "system booting",
852 "stub line 1",
853 "stub line 2",
854 "system ready",
855 ]);
856
857 let options = LogLineCountOptions {
859 predicate: Arc::new(|n| n == 2),
860 timeout: Duration::from_secs(2),
861 wait_until_timeout_elapses: false,
862 };
863
864 let log_line_count = mock_node
865 .wait_log_line_count_with_timeout("system ready", false, options)
866 .await?;
867
868 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
869
870 Ok(())
871 }
872
873 #[tokio::test(flavor = "multi_thread")]
874 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
875 let mock_provider = Arc::new(MockNode::new());
876 let mock_node = NetworkNode::new(
877 "node1",
878 "ws_uri",
879 "prometheus_uri",
880 "multiaddr",
881 NodeSpec::default(),
882 mock_provider.clone(),
883 );
884
885 mock_provider.logs_push(vec![
886 "system booting",
887 "stub line 1",
888 "stub line 2",
889 "system ready",
890 ]);
891
892 let options = LogLineCountOptions {
894 predicate: Arc::new(|n| n == 2),
895 timeout: Duration::from_secs(2),
896 wait_until_timeout_elapses: true,
897 };
898
899 let task = tokio::spawn({
900 async move {
901 mock_node
902 .wait_log_line_count_with_timeout("system ready", false, options)
903 .await
904 .unwrap()
905 }
906 });
907
908 tokio::time::sleep(Duration::from_secs(1)).await;
909
910 mock_provider.logs_push(vec!["system ready"]);
911 mock_provider.logs_push(vec!["system ready"]);
912
913 let log_line_count = task.await?;
914
915 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
916
917 Ok(())
918 }
919
920 #[tokio::test(flavor = "multi_thread")]
921 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
922 let mock_provider = Arc::new(MockNode::new());
923 let mock_node = NetworkNode::new(
924 "node1",
925 "ws_uri",
926 "prometheus_uri",
927 "multiaddr",
928 NodeSpec::default(),
929 mock_provider.clone(),
930 );
931
932 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
933
934 let task = tokio::spawn({
935 async move {
936 mock_node
937 .wait_log_line_count_with_timeout(
938 "system ready",
939 false,
940 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
942 )
943 .await
944 .unwrap()
945 }
946 });
947
948 tokio::time::sleep(Duration::from_secs(1)).await;
949
950 mock_provider.logs_push(vec!["stub line 3"]);
951
952 assert!(task.await?.success());
953
954 Ok(())
955 }
956
957 #[tokio::test(flavor = "multi_thread")]
958 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
959 let mock_provider = Arc::new(MockNode::new());
960 let mock_node = NetworkNode::new(
961 "node1",
962 "ws_uri",
963 "prometheus_uri",
964 "multiaddr",
965 NodeSpec::default(),
966 mock_provider.clone(),
967 );
968
969 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
970
971 let options = LogLineCountOptions {
973 predicate: Arc::new(|n| (2..=5).contains(&n)),
974 timeout: Duration::from_secs(2),
975 wait_until_timeout_elapses: true,
976 };
977
978 let task = tokio::spawn({
979 async move {
980 mock_node
981 .wait_log_line_count_with_timeout("system ready", false, options)
982 .await
983 .unwrap()
984 }
985 });
986
987 tokio::time::sleep(Duration::from_secs(1)).await;
988
989 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
990
991 assert!(task.await?.success());
992
993 Ok(())
994 }
995
996 #[tokio::test(flavor = "multi_thread")]
997 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
998 let mock_provider = Arc::new(MockNode::new());
999 let mock_node = NetworkNode::new(
1000 "node1",
1001 "ws_uri",
1002 "prometheus_uri",
1003 "multiaddr",
1004 NodeSpec::default(),
1005 mock_provider.clone(),
1006 );
1007
1008 mock_provider.logs_push(vec![
1009 "system booting",
1010 "stub line 1",
1011 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1013 "stub line 2"
1014 ]);
1015
1016 let options = LogLineCountOptions {
1017 predicate: Arc::new(|n| n == 1),
1018 timeout: Duration::from_secs(3),
1019 wait_until_timeout_elapses: true,
1020 };
1021
1022 let task = tokio::spawn({
1023 async move {
1024 mock_node
1025 .wait_log_line_count_with_timeout(
1026 "error(?! importing block .*: block has an unknown parent)",
1027 false,
1028 options,
1029 )
1030 .await
1031 .unwrap()
1032 }
1033 });
1034
1035 tokio::time::sleep(Duration::from_secs(1)).await;
1036
1037 mock_provider.logs_push(vec![
1038 "system ready",
1039 "system error",
1041 "system ready",
1042 ]);
1043
1044 assert!(task.await?.success());
1045
1046 Ok(())
1047 }
1048
1049 #[tokio::test(flavor = "multi_thread")]
1050 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1051 ) -> Result<(), anyhow::Error> {
1052 let mock_provider = Arc::new(MockNode::new());
1053 let mock_node = NetworkNode::new(
1054 "node1",
1055 "ws_uri",
1056 "prometheus_uri",
1057 "multiaddr",
1058 NodeSpec::default(),
1059 mock_provider.clone(),
1060 );
1061
1062 mock_provider.logs_push(vec![
1063 "system booting",
1064 "stub line 1",
1065 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1067 "stub line 2"
1068 ]);
1069
1070 let options = LogLineCountOptions {
1071 predicate: Arc::new(|n| n == 1),
1072 timeout: Duration::from_secs(6),
1073 wait_until_timeout_elapses: true,
1074 };
1075
1076 let task = tokio::spawn({
1077 async move {
1078 mock_node
1079 .wait_log_line_count_with_timeout(
1080 "error(?! importing block .*: block has an unknown parent)",
1081 false,
1082 options,
1083 )
1084 .await
1085 .unwrap()
1086 }
1087 });
1088
1089 tokio::time::sleep(Duration::from_secs(1)).await;
1090
1091 mock_provider.logs_push(vec!["system ready", "system ready"]);
1092
1093 assert!(!task.await?.success());
1094
1095 Ok(())
1096 }
1097
1098 #[tokio::test(flavor = "multi_thread")]
1099 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1100 let mock_provider = Arc::new(MockNode::new());
1101 let mock_node = NetworkNode::new(
1102 "node1",
1103 "ws_uri",
1104 "prometheus_uri",
1105 "multiaddr",
1106 NodeSpec::default(),
1107 mock_provider.clone(),
1108 );
1109
1110 mock_provider.logs_push(vec![
1111 "system booting",
1112 "stub line 1",
1113 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1115 "stub line 2"
1116 ]);
1117
1118 let task = tokio::spawn({
1119 async move {
1120 mock_node
1121 .wait_log_line_count(
1122 "error(?! importing block .*: block has an unknown parent)",
1123 false,
1124 1,
1125 )
1126 .await
1127 .unwrap()
1128 }
1129 });
1130
1131 tokio::time::sleep(Duration::from_secs(1)).await;
1132
1133 mock_provider.logs_push(vec![
1134 "system ready",
1135 "system error",
1137 "system ready",
1138 ]);
1139
1140 assert!(task.await.is_ok());
1141
1142 Ok(())
1143 }
1144
1145 #[tokio::test(flavor = "multi_thread")]
1146 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1147 let mock_provider = Arc::new(MockNode::new());
1148 let mock_node = NetworkNode::new(
1149 "node1",
1150 "ws_uri",
1151 "prometheus_uri",
1152 "multiaddr",
1153 NodeSpec::default(),
1154 mock_provider.clone(),
1155 );
1156
1157 mock_provider.logs_push(vec![
1158 "system booting",
1159 "stub line 1",
1160 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1162 "stub line 2"
1163 ]);
1164
1165 let options = LogLineCountOptions {
1166 predicate: Arc::new(|count| count == 1),
1167 timeout: Duration::from_secs(2),
1168 wait_until_timeout_elapses: true,
1169 };
1170
1171 let task = tokio::spawn({
1172 async move {
1173 mock_node
1175 .wait_log_line_count_with_timeout(
1176 "error(?! importing block .*: block has an unknown parent)",
1177 false,
1178 options,
1179 )
1180 .await
1181 .unwrap()
1182 }
1183 });
1184
1185 tokio::time::sleep(Duration::from_secs(1)).await;
1186
1187 mock_provider.logs_push(vec!["system ready", "system ready"]);
1188
1189 assert!(!task.await?.success());
1190
1191 Ok(())
1192 }
1193}