1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use fancy_regex::Regex;
5use glob_match::glob_match;
6use prom_metrics_parser::MetricMap;
7use provider::DynNode;
8use serde::Serialize;
9use subxt::{backend::rpc::RpcClient, OnlineClient};
10use support::net::{skip_err_while_waiting, wait_ws_ready};
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::{debug, trace};
14
15#[cfg(feature = "pjs")]
16use crate::pjs_helper::{pjs_build_template, pjs_exec, PjsResult, ReturnValue};
17use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
18
19type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
20
21#[derive(Error, Debug)]
22pub enum NetworkNodeError {
23 #[error("metric '{0}' not found!")]
24 MetricNotFound(String),
25}
26
27#[derive(Clone, Serialize)]
28pub struct NetworkNode {
29 #[serde(skip)]
30 pub(crate) inner: DynNode,
31 pub(crate) spec: NodeSpec,
34 pub(crate) name: String,
35 pub(crate) ws_uri: String,
36 pub(crate) multiaddr: String,
37 pub(crate) prometheus_uri: String,
38 #[serde(skip)]
39 metrics_cache: Arc<RwLock<MetricMap>>,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum LogLineCount {
53 TargetReached(u32),
54 TargetFailed(u32),
55}
56
57impl LogLineCount {
58 pub fn success(&self) -> bool {
59 match self {
60 Self::TargetReached(..) => true,
61 Self::TargetFailed(..) => false,
62 }
63 }
64}
65
66#[derive(Clone)]
79pub struct LogLineCountOptions {
80 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
81 pub timeout: Duration,
82 pub wait_until_timeout_elapses: bool,
83}
84
85impl LogLineCountOptions {
86 pub fn new(
87 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
88 timeout: Duration,
89 wait_until_timeout_elapses: bool,
90 ) -> Self {
91 Self {
92 predicate: Arc::new(predicate),
93 timeout,
94 wait_until_timeout_elapses,
95 }
96 }
97
98 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
99 Self::new(|n| n == 0, timeout, true)
100 }
101}
102
103impl NetworkNode {
116 pub(crate) fn new<T: Into<String>>(
118 name: T,
119 ws_uri: T,
120 prometheus_uri: T,
121 multiaddr: T,
122 spec: NodeSpec,
123 inner: DynNode,
124 ) -> Self {
125 Self {
126 name: name.into(),
127 ws_uri: ws_uri.into(),
128 prometheus_uri: prometheus_uri.into(),
129 inner,
130 spec,
131 multiaddr: multiaddr.into(),
132 metrics_cache: Arc::new(Default::default()),
133 }
134 }
135
136 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
137 self.multiaddr = multiaddr.into();
138 }
139
140 pub fn name(&self) -> &str {
141 &self.name
142 }
143
144 pub fn args(&self) -> Vec<&str> {
145 self.inner.args()
146 }
147
148 pub fn spec(&self) -> &NodeSpec {
149 &self.spec
150 }
151
152 pub fn ws_uri(&self) -> &str {
153 &self.ws_uri
154 }
155
156 pub fn multiaddr(&self) -> &str {
157 self.multiaddr.as_ref()
158 }
159
160 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
164 get_client_from_url(&self.ws_uri).await
165 }
166
167 #[deprecated = "Use `wait_client` instead."]
169 pub async fn client<Config: subxt::Config>(
170 &self,
171 ) -> Result<OnlineClient<Config>, subxt::Error> {
172 self.try_client().await
173 }
174
175 pub async fn try_client<Config: subxt::Config>(
184 &self,
185 ) -> Result<OnlineClient<Config>, subxt::Error> {
186 get_client_from_url(&self.ws_uri).await
187 }
188
189 pub async fn wait_client<Config: subxt::Config>(
191 &self,
192 ) -> Result<OnlineClient<Config>, anyhow::Error> {
193 debug!("wait_client ws_uri: {}", self.ws_uri());
194 wait_ws_ready(self.ws_uri())
195 .await
196 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
197
198 self.try_client()
199 .await
200 .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
201 }
202
203 pub async fn wait_client_with_timeout<Config: subxt::Config>(
205 &self,
206 timeout_secs: impl Into<u64>,
207 ) -> Result<OnlineClient<Config>, anyhow::Error> {
208 debug!("waiting until subxt client is ready");
209 tokio::time::timeout(
210 Duration::from_secs(timeout_secs.into()),
211 self.wait_client::<Config>(),
212 )
213 .await?
214 }
215
216 pub async fn pause(&self) -> Result<(), anyhow::Error> {
221 self.inner.pause().await?;
222 Ok(())
223 }
224
225 pub async fn resume(&self) -> Result<(), anyhow::Error> {
228 self.inner.resume().await?;
229 Ok(())
230 }
231
232 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
234 self.inner.restart(after).await?;
235 Ok(())
236 }
237
238 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
246 let metric_name = metric_name.into();
247 self.fetch_metrics().await?;
249 self.metric(&metric_name, true).await
251 }
252
253 pub async fn assert(
262 &self,
263 metric_name: impl Into<String>,
264 value: impl Into<f64>,
265 ) -> Result<bool, anyhow::Error> {
266 let value: f64 = value.into();
267 self.assert_with(metric_name, |v| v == value).await
268 }
269
270 pub async fn assert_with(
273 &self,
274 metric_name: impl Into<String>,
275 predicate: impl Fn(f64) -> bool,
276 ) -> Result<bool, anyhow::Error> {
277 let metric_name = metric_name.into();
278 self.fetch_metrics().await?;
280 let val = self.metric(&metric_name, true).await?;
281 trace!("🔎 Current value {val} passed to the predicated?");
282 Ok(predicate(val))
283 }
284
285 pub async fn wait_metric(
289 &self,
290 metric_name: impl Into<String>,
291 predicate: impl Fn(f64) -> bool,
292 ) -> Result<(), anyhow::Error> {
293 let metric_name = metric_name.into();
294 debug!("waiting until metric {metric_name} pass the predicate");
295 loop {
296 let res = self.assert_with(&metric_name, &predicate).await;
297 match res {
298 Ok(res) => {
299 if res {
300 return Ok(());
301 }
302 },
303 Err(e) => match e.downcast::<reqwest::Error>() {
304 Ok(io_err) => {
305 if !skip_err_while_waiting(&io_err) {
306 return Err(io_err.into());
307 }
308 },
309 Err(other) => {
310 match other.downcast::<NetworkNodeError>() {
311 Ok(node_err) => {
312 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
313 return Err(node_err.into());
314 }
315 },
316 Err(other) => return Err(other),
317 };
318 },
319 },
320 }
321
322 tokio::time::sleep(Duration::from_secs(1)).await;
324 }
325 }
326
327 pub async fn wait_metric_with_timeout(
330 &self,
331 metric_name: impl Into<String>,
332 predicate: impl Fn(f64) -> bool,
333 timeout_secs: impl Into<u64>,
334 ) -> Result<(), anyhow::Error> {
335 let metric_name = metric_name.into();
336 let secs = timeout_secs.into();
337 debug!("waiting until metric {metric_name} pass the predicate");
338 let res = tokio::time::timeout(
339 Duration::from_secs(secs),
340 self.wait_metric(&metric_name, predicate),
341 )
342 .await;
343
344 if let Ok(inner_res) = res {
345 match inner_res {
346 Ok(_) => Ok(()),
347 Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
348 }
349 } else {
350 Err(anyhow!(
352 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
353 ))
354 }
355 }
356
357 pub async fn logs(&self) -> Result<String, anyhow::Error> {
362 Ok(self.inner.logs().await?)
363 }
364
365 pub async fn wait_log_line_count(
367 &self,
368 pattern: impl Into<String>,
369 is_glob: bool,
370 count: usize,
371 ) -> Result<(), anyhow::Error> {
372 let pattern = pattern.into();
373 let pattern_clone = pattern.clone();
374 debug!("waiting until we find pattern {pattern} {count} times");
375 let match_fn: BoxedClosure = if is_glob {
376 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
377 } else {
378 let re = Regex::new(&pattern)?;
379 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
380 };
381
382 loop {
383 let mut q = 0_usize;
384 let logs = self.logs().await?;
385 for line in logs.lines() {
386 trace!("line is {line}");
387 if match_fn(line)? {
388 trace!("pattern {pattern_clone} match in line {line}");
389 q += 1;
390 if q >= count {
391 return Ok(());
392 }
393 }
394 }
395
396 tokio::time::sleep(Duration::from_secs(2)).await;
397 }
398 }
399
400 pub async fn wait_log_line_count_with_timeout(
444 &self,
445 substring: impl Into<String>,
446 is_glob: bool,
447 options: LogLineCountOptions,
448 ) -> Result<LogLineCount, anyhow::Error> {
449 let substring = substring.into();
450 debug!(
451 "waiting until match lines count within {} seconds",
452 options.timeout.as_secs_f64()
453 );
454
455 let start = tokio::time::Instant::now();
456
457 let match_fn: BoxedClosure = if is_glob {
458 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
459 } else {
460 let re = Regex::new(&substring)?;
461 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
462 };
463
464 if options.wait_until_timeout_elapses {
465 tokio::time::sleep(options.timeout).await;
466 }
467
468 let mut q;
469 loop {
470 q = 0_u32;
471 let logs = self.logs().await?;
472 for line in logs.lines() {
473 if match_fn(line)? {
474 q += 1;
475
476 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
481 return Ok(LogLineCount::TargetReached(q));
482 }
483 }
484 }
485
486 if start.elapsed() >= options.timeout {
487 break;
488 }
489
490 tokio::time::sleep(Duration::from_secs(2)).await;
491 }
492
493 if (options.predicate)(q) {
494 Ok(LogLineCount::TargetReached(q))
495 } else {
496 Ok(LogLineCount::TargetFailed(q))
497 }
498 }
499
500 #[cfg(feature = "pjs")]
505 pub async fn pjs(
511 &self,
512 code: impl AsRef<str>,
513 args: Vec<serde_json::Value>,
514 user_types: Option<serde_json::Value>,
515 ) -> Result<PjsResult, anyhow::Error> {
516 let code = pjs_build_template(self.ws_uri(), code.as_ref(), args, user_types);
517 tracing::trace!("Code to execute: {code}");
518 let value = match pjs_exec(code)? {
519 ReturnValue::Deserialized(val) => Ok(val),
520 ReturnValue::CantDeserialize(msg) => Err(msg),
521 };
522
523 Ok(value)
524 }
525
526 #[cfg(feature = "pjs")]
527 pub async fn pjs_file(
533 &self,
534 file: impl AsRef<std::path::Path>,
535 args: Vec<serde_json::Value>,
536 user_types: Option<serde_json::Value>,
537 ) -> Result<PjsResult, anyhow::Error> {
538 let content = std::fs::read_to_string(file)?;
539 self.pjs(content, args, user_types).await
540 }
541
542 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
543 let response = reqwest::get(&self.prometheus_uri).await?;
544 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
545 let mut cache = self.metrics_cache.write().await;
546 *cache = metrics;
547 Ok(())
548 }
549
550 async fn metric(
552 &self,
553 metric_name: &str,
554 treat_not_found_as_zero: bool,
555 ) -> Result<f64, anyhow::Error> {
556 let mut metrics_map = self.metrics_cache.read().await;
557 if metrics_map.is_empty() {
558 drop(metrics_map);
560 self.fetch_metrics().await?;
561 metrics_map = self.metrics_cache.read().await;
562 }
563
564 if let Some(val) = metrics_map.get(metric_name) {
565 Ok(*val)
566 } else if treat_not_found_as_zero {
567 Ok(0_f64)
568 } else {
569 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
570 }
571 }
572
573 pub async fn wait_until_is_up(
585 &self,
586 timeout_secs: impl Into<u64>,
587 ) -> Result<(), anyhow::Error> {
588 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
589 .await
590 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
591 }
592}
593
594impl std::fmt::Debug for NetworkNode {
595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596 f.debug_struct("NetworkNode")
597 .field("inner", &"inner_skipped")
598 .field("spec", &self.spec)
599 .field("name", &self.name)
600 .field("ws_uri", &self.ws_uri)
601 .field("prometheus_uri", &self.prometheus_uri)
602 .finish()
603 }
604}
605
606#[cfg(test)]
608mod tests {
609 use std::{
610 path::{Path, PathBuf},
611 sync::{Arc, Mutex},
612 };
613
614 use async_trait::async_trait;
615 use provider::{types::*, ProviderError, ProviderNode};
616
617 use super::*;
618
619 struct MockNode {
620 logs: Arc<Mutex<Vec<String>>>,
621 }
622
623 impl MockNode {
624 fn new() -> Self {
625 Self {
626 logs: Arc::new(Mutex::new(vec![])),
627 }
628 }
629
630 fn logs_push(&self, lines: Vec<impl Into<String>>) {
631 self.logs
632 .lock()
633 .unwrap()
634 .extend(lines.into_iter().map(|l| l.into()));
635 }
636 }
637
638 #[async_trait]
639 impl ProviderNode for MockNode {
640 fn name(&self) -> &str {
641 todo!()
642 }
643
644 fn args(&self) -> Vec<&str> {
645 todo!()
646 }
647
648 fn base_dir(&self) -> &PathBuf {
649 todo!()
650 }
651
652 fn config_dir(&self) -> &PathBuf {
653 todo!()
654 }
655
656 fn data_dir(&self) -> &PathBuf {
657 todo!()
658 }
659
660 fn relay_data_dir(&self) -> &PathBuf {
661 todo!()
662 }
663
664 fn scripts_dir(&self) -> &PathBuf {
665 todo!()
666 }
667
668 fn log_path(&self) -> &PathBuf {
669 todo!()
670 }
671
672 fn log_cmd(&self) -> String {
673 todo!()
674 }
675
676 fn path_in_node(&self, _file: &Path) -> PathBuf {
677 todo!()
678 }
679
680 async fn logs(&self) -> Result<String, ProviderError> {
681 Ok(self.logs.lock().unwrap().join("\n"))
682 }
683
684 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
685 todo!()
686 }
687
688 async fn run_command(
689 &self,
690 _options: RunCommandOptions,
691 ) -> Result<ExecutionResult, ProviderError> {
692 todo!()
693 }
694
695 async fn run_script(
696 &self,
697 _options: RunScriptOptions,
698 ) -> Result<ExecutionResult, ProviderError> {
699 todo!()
700 }
701
702 async fn send_file(
703 &self,
704 _local_file_path: &Path,
705 _remote_file_path: &Path,
706 _mode: &str,
707 ) -> Result<(), ProviderError> {
708 todo!()
709 }
710
711 async fn receive_file(
712 &self,
713 _remote_file_path: &Path,
714 _local_file_path: &Path,
715 ) -> Result<(), ProviderError> {
716 todo!()
717 }
718
719 async fn pause(&self) -> Result<(), ProviderError> {
720 todo!()
721 }
722
723 async fn resume(&self) -> Result<(), ProviderError> {
724 todo!()
725 }
726
727 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
728 todo!()
729 }
730
731 async fn destroy(&self) -> Result<(), ProviderError> {
732 todo!()
733 }
734 }
735
736 #[tokio::test(flavor = "multi_thread")]
737 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
738 let mock_provider = Arc::new(MockNode::new());
739 let mock_node = NetworkNode::new(
740 "node1",
741 "ws_uri",
742 "prometheus_uri",
743 "multiaddr",
744 NodeSpec::default(),
745 mock_provider.clone(),
746 );
747
748 mock_provider.logs_push(vec![
749 "system booting",
750 "stub line 1",
751 "stub line 2",
752 "system ready",
753 ]);
754
755 let options = LogLineCountOptions {
757 predicate: Arc::new(|n| n == 1),
758 timeout: Duration::from_secs(10),
759 wait_until_timeout_elapses: false,
760 };
761
762 let log_line_count = mock_node
763 .wait_log_line_count_with_timeout("system ready", false, options)
764 .await?;
765
766 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
767
768 Ok(())
769 }
770
771 #[tokio::test(flavor = "multi_thread")]
772 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
773 let mock_provider = Arc::new(MockNode::new());
774 let mock_node = NetworkNode::new(
775 "node1",
776 "ws_uri",
777 "prometheus_uri",
778 "multiaddr",
779 NodeSpec::default(),
780 mock_provider.clone(),
781 );
782
783 mock_provider.logs_push(vec![
784 "system booting",
785 "stub line 1",
786 "stub line 2",
787 "system ready",
788 ]);
789
790 let options = LogLineCountOptions {
792 predicate: Arc::new(|n| n == 2),
793 timeout: Duration::from_secs(4),
794 wait_until_timeout_elapses: false,
795 };
796
797 let task = tokio::spawn({
798 async move {
799 mock_node
800 .wait_log_line_count_with_timeout("system ready", false, options)
801 .await
802 .unwrap()
803 }
804 });
805
806 tokio::time::sleep(Duration::from_secs(2)).await;
807
808 mock_provider.logs_push(vec!["system ready"]);
809
810 let log_line_count = task.await?;
811
812 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
813
814 Ok(())
815 }
816
817 #[tokio::test(flavor = "multi_thread")]
818 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
819 let mock_provider = Arc::new(MockNode::new());
820 let mock_node = NetworkNode::new(
821 "node1",
822 "ws_uri",
823 "prometheus_uri",
824 "multiaddr",
825 NodeSpec::default(),
826 mock_provider.clone(),
827 );
828
829 mock_provider.logs_push(vec![
830 "system booting",
831 "stub line 1",
832 "stub line 2",
833 "system ready",
834 ]);
835
836 let options = LogLineCountOptions {
838 predicate: Arc::new(|n| n == 2),
839 timeout: Duration::from_secs(2),
840 wait_until_timeout_elapses: false,
841 };
842
843 let log_line_count = mock_node
844 .wait_log_line_count_with_timeout("system ready", false, options)
845 .await?;
846
847 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
848
849 Ok(())
850 }
851
852 #[tokio::test(flavor = "multi_thread")]
853 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
854 let mock_provider = Arc::new(MockNode::new());
855 let mock_node = NetworkNode::new(
856 "node1",
857 "ws_uri",
858 "prometheus_uri",
859 "multiaddr",
860 NodeSpec::default(),
861 mock_provider.clone(),
862 );
863
864 mock_provider.logs_push(vec![
865 "system booting",
866 "stub line 1",
867 "stub line 2",
868 "system ready",
869 ]);
870
871 let options = LogLineCountOptions {
873 predicate: Arc::new(|n| n == 2),
874 timeout: Duration::from_secs(2),
875 wait_until_timeout_elapses: true,
876 };
877
878 let task = tokio::spawn({
879 async move {
880 mock_node
881 .wait_log_line_count_with_timeout("system ready", false, options)
882 .await
883 .unwrap()
884 }
885 });
886
887 tokio::time::sleep(Duration::from_secs(1)).await;
888
889 mock_provider.logs_push(vec!["system ready"]);
890 mock_provider.logs_push(vec!["system ready"]);
891
892 let log_line_count = task.await?;
893
894 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
895
896 Ok(())
897 }
898
899 #[tokio::test(flavor = "multi_thread")]
900 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
901 let mock_provider = Arc::new(MockNode::new());
902 let mock_node = NetworkNode::new(
903 "node1",
904 "ws_uri",
905 "prometheus_uri",
906 "multiaddr",
907 NodeSpec::default(),
908 mock_provider.clone(),
909 );
910
911 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
912
913 let task = tokio::spawn({
914 async move {
915 mock_node
916 .wait_log_line_count_with_timeout(
917 "system ready",
918 false,
919 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
921 )
922 .await
923 .unwrap()
924 }
925 });
926
927 tokio::time::sleep(Duration::from_secs(1)).await;
928
929 mock_provider.logs_push(vec!["stub line 3"]);
930
931 assert!(task.await?.success());
932
933 Ok(())
934 }
935
936 #[tokio::test(flavor = "multi_thread")]
937 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
938 let mock_provider = Arc::new(MockNode::new());
939 let mock_node = NetworkNode::new(
940 "node1",
941 "ws_uri",
942 "prometheus_uri",
943 "multiaddr",
944 NodeSpec::default(),
945 mock_provider.clone(),
946 );
947
948 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
949
950 let options = LogLineCountOptions {
952 predicate: Arc::new(|n| (2..=5).contains(&n)),
953 timeout: Duration::from_secs(2),
954 wait_until_timeout_elapses: true,
955 };
956
957 let task = tokio::spawn({
958 async move {
959 mock_node
960 .wait_log_line_count_with_timeout("system ready", false, options)
961 .await
962 .unwrap()
963 }
964 });
965
966 tokio::time::sleep(Duration::from_secs(1)).await;
967
968 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
969
970 assert!(task.await?.success());
971
972 Ok(())
973 }
974
975 #[tokio::test(flavor = "multi_thread")]
976 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
977 let mock_provider = Arc::new(MockNode::new());
978 let mock_node = NetworkNode::new(
979 "node1",
980 "ws_uri",
981 "prometheus_uri",
982 "multiaddr",
983 NodeSpec::default(),
984 mock_provider.clone(),
985 );
986
987 mock_provider.logs_push(vec![
988 "system booting",
989 "stub line 1",
990 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
992 "stub line 2"
993 ]);
994
995 let options = LogLineCountOptions {
996 predicate: Arc::new(|n| n == 1),
997 timeout: Duration::from_secs(3),
998 wait_until_timeout_elapses: true,
999 };
1000
1001 let task = tokio::spawn({
1002 async move {
1003 mock_node
1004 .wait_log_line_count_with_timeout(
1005 "error(?! importing block .*: block has an unknown parent)",
1006 false,
1007 options,
1008 )
1009 .await
1010 .unwrap()
1011 }
1012 });
1013
1014 tokio::time::sleep(Duration::from_secs(1)).await;
1015
1016 mock_provider.logs_push(vec![
1017 "system ready",
1018 "system error",
1020 "system ready",
1021 ]);
1022
1023 assert!(task.await?.success());
1024
1025 Ok(())
1026 }
1027
1028 #[tokio::test(flavor = "multi_thread")]
1029 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1030 ) -> Result<(), anyhow::Error> {
1031 let mock_provider = Arc::new(MockNode::new());
1032 let mock_node = NetworkNode::new(
1033 "node1",
1034 "ws_uri",
1035 "prometheus_uri",
1036 "multiaddr",
1037 NodeSpec::default(),
1038 mock_provider.clone(),
1039 );
1040
1041 mock_provider.logs_push(vec![
1042 "system booting",
1043 "stub line 1",
1044 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1046 "stub line 2"
1047 ]);
1048
1049 let options = LogLineCountOptions {
1050 predicate: Arc::new(|n| n == 1),
1051 timeout: Duration::from_secs(6),
1052 wait_until_timeout_elapses: true,
1053 };
1054
1055 let task = tokio::spawn({
1056 async move {
1057 mock_node
1058 .wait_log_line_count_with_timeout(
1059 "error(?! importing block .*: block has an unknown parent)",
1060 false,
1061 options,
1062 )
1063 .await
1064 .unwrap()
1065 }
1066 });
1067
1068 tokio::time::sleep(Duration::from_secs(1)).await;
1069
1070 mock_provider.logs_push(vec!["system ready", "system ready"]);
1071
1072 assert!(!task.await?.success());
1073
1074 Ok(())
1075 }
1076
1077 #[tokio::test(flavor = "multi_thread")]
1078 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1079 let mock_provider = Arc::new(MockNode::new());
1080 let mock_node = NetworkNode::new(
1081 "node1",
1082 "ws_uri",
1083 "prometheus_uri",
1084 "multiaddr",
1085 NodeSpec::default(),
1086 mock_provider.clone(),
1087 );
1088
1089 mock_provider.logs_push(vec![
1090 "system booting",
1091 "stub line 1",
1092 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1094 "stub line 2"
1095 ]);
1096
1097 let task = tokio::spawn({
1098 async move {
1099 mock_node
1100 .wait_log_line_count(
1101 "error(?! importing block .*: block has an unknown parent)",
1102 false,
1103 1,
1104 )
1105 .await
1106 .unwrap()
1107 }
1108 });
1109
1110 tokio::time::sleep(Duration::from_secs(1)).await;
1111
1112 mock_provider.logs_push(vec![
1113 "system ready",
1114 "system error",
1116 "system ready",
1117 ]);
1118
1119 assert!(task.await.is_ok());
1120
1121 Ok(())
1122 }
1123
1124 #[tokio::test(flavor = "multi_thread")]
1125 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1126 let mock_provider = Arc::new(MockNode::new());
1127 let mock_node = NetworkNode::new(
1128 "node1",
1129 "ws_uri",
1130 "prometheus_uri",
1131 "multiaddr",
1132 NodeSpec::default(),
1133 mock_provider.clone(),
1134 );
1135
1136 mock_provider.logs_push(vec![
1137 "system booting",
1138 "stub line 1",
1139 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1141 "stub line 2"
1142 ]);
1143
1144 let options = LogLineCountOptions {
1145 predicate: Arc::new(|count| count == 1),
1146 timeout: Duration::from_secs(2),
1147 wait_until_timeout_elapses: true,
1148 };
1149
1150 let task = tokio::spawn({
1151 async move {
1152 mock_node
1154 .wait_log_line_count_with_timeout(
1155 "error(?! importing block .*: block has an unknown parent)",
1156 false,
1157 options,
1158 )
1159 .await
1160 .unwrap()
1161 }
1162 });
1163
1164 tokio::time::sleep(Duration::from_secs(1)).await;
1165
1166 mock_provider.logs_push(vec!["system ready", "system ready"]);
1167
1168 assert!(!task.await?.success());
1169
1170 Ok(())
1171 }
1172}