1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::Serialize;
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27 #[error("metric '{0}' not found!")]
28 MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33 #[serde(skip)]
34 pub(crate) inner: DynNode,
35 pub(crate) spec: NodeSpec,
38 pub(crate) name: String,
39 pub(crate) ws_uri: String,
40 pub(crate) multiaddr: String,
41 pub(crate) prometheus_uri: String,
42 #[serde(skip)]
43 metrics_cache: Arc<RwLock<MetricMap>>,
44 #[serde(skip)]
45 is_running: Arc<AtomicBool>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum LogLineCount {
59 TargetReached(u32),
60 TargetFailed(u32),
61}
62
63impl LogLineCount {
64 pub fn success(&self) -> bool {
65 match self {
66 Self::TargetReached(..) => true,
67 Self::TargetFailed(..) => false,
68 }
69 }
70}
71
72#[derive(Clone)]
85pub struct LogLineCountOptions {
86 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
87 pub timeout: Duration,
88 pub wait_until_timeout_elapses: bool,
89}
90
91impl LogLineCountOptions {
92 pub fn new(
93 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
94 timeout: Duration,
95 wait_until_timeout_elapses: bool,
96 ) -> Self {
97 Self {
98 predicate: Arc::new(predicate),
99 timeout,
100 wait_until_timeout_elapses,
101 }
102 }
103
104 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
105 Self::new(|n| n == 0, timeout, true)
106 }
107}
108
109impl NetworkNode {
122 pub(crate) fn new<T: Into<String>>(
124 name: T,
125 ws_uri: T,
126 prometheus_uri: T,
127 multiaddr: T,
128 spec: NodeSpec,
129 inner: DynNode,
130 ) -> Self {
131 Self {
132 name: name.into(),
133 ws_uri: ws_uri.into(),
134 prometheus_uri: prometheus_uri.into(),
135 inner,
136 spec,
137 multiaddr: multiaddr.into(),
138 metrics_cache: Arc::new(Default::default()),
139 is_running: Arc::new(AtomicBool::new(false)),
140 }
141 }
142
143 pub(crate) fn is_running(&self) -> bool {
144 self.is_running.load(Ordering::Acquire)
145 }
146
147 pub(crate) fn set_is_running(&self, is_running: bool) {
148 self.is_running.store(is_running, Ordering::Release);
149 }
150
151 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
152 self.multiaddr = multiaddr.into();
153 }
154
155 pub fn name(&self) -> &str {
156 &self.name
157 }
158
159 pub fn args(&self) -> Vec<&str> {
160 self.inner.args()
161 }
162
163 pub fn spec(&self) -> &NodeSpec {
164 &self.spec
165 }
166
167 pub fn ws_uri(&self) -> &str {
168 &self.ws_uri
169 }
170
171 pub fn multiaddr(&self) -> &str {
172 self.multiaddr.as_ref()
173 }
174
175 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
179 get_client_from_url(&self.ws_uri).await
180 }
181
182 #[deprecated = "Use `wait_client` instead."]
184 pub async fn client<Config: subxt::Config>(
185 &self,
186 ) -> Result<OnlineClient<Config>, subxt::Error> {
187 self.try_client().await
188 }
189
190 pub async fn try_client<Config: subxt::Config>(
199 &self,
200 ) -> Result<OnlineClient<Config>, subxt::Error> {
201 get_client_from_url(&self.ws_uri).await
202 }
203
204 pub async fn wait_client<Config: subxt::Config>(
206 &self,
207 ) -> Result<OnlineClient<Config>, anyhow::Error> {
208 debug!("wait_client ws_uri: {}", self.ws_uri());
209 wait_ws_ready(self.ws_uri())
210 .await
211 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {}", e))?;
212
213 self.try_client()
214 .await
215 .map_err(|e| anyhow!("Can't create a subxt client, err: {}", e))
216 }
217
218 pub async fn wait_client_with_timeout<Config: subxt::Config>(
220 &self,
221 timeout_secs: impl Into<u64>,
222 ) -> Result<OnlineClient<Config>, anyhow::Error> {
223 debug!("waiting until subxt client is ready");
224 tokio::time::timeout(
225 Duration::from_secs(timeout_secs.into()),
226 self.wait_client::<Config>(),
227 )
228 .await?
229 }
230
231 pub async fn pause(&self) -> Result<(), anyhow::Error> {
236 self.set_is_running(false);
237 self.inner.pause().await?;
238 Ok(())
239 }
240
241 pub async fn resume(&self) -> Result<(), anyhow::Error> {
244 self.set_is_running(true);
245 self.inner.resume().await?;
246 Ok(())
247 }
248
249 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
251 self.set_is_running(false);
252 self.inner.restart(after).await?;
253 self.set_is_running(true);
254 Ok(())
255 }
256
257 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
265 let metric_name = metric_name.into();
266 self.fetch_metrics().await?;
268 self.metric(&metric_name, true).await
270 }
271
272 pub async fn assert(
281 &self,
282 metric_name: impl Into<String>,
283 value: impl Into<f64>,
284 ) -> Result<bool, anyhow::Error> {
285 let value: f64 = value.into();
286 self.assert_with(metric_name, |v| v == value).await
287 }
288
289 pub async fn assert_with(
292 &self,
293 metric_name: impl Into<String>,
294 predicate: impl Fn(f64) -> bool,
295 ) -> Result<bool, anyhow::Error> {
296 let metric_name = metric_name.into();
297 self.fetch_metrics().await?;
299 let val = self.metric(&metric_name, true).await?;
300 trace!("🔎 Current value {val} passed to the predicated?");
301 Ok(predicate(val))
302 }
303
304 pub async fn wait_metric(
308 &self,
309 metric_name: impl Into<String>,
310 predicate: impl Fn(f64) -> bool,
311 ) -> Result<(), anyhow::Error> {
312 let metric_name = metric_name.into();
313 debug!("waiting until metric {metric_name} pass the predicate");
314 loop {
315 let res = self.assert_with(&metric_name, &predicate).await;
316 match res {
317 Ok(res) => {
318 if res {
319 return Ok(());
320 }
321 },
322 Err(e) => match e.downcast::<reqwest::Error>() {
323 Ok(io_err) => {
324 if !skip_err_while_waiting(&io_err) {
325 return Err(io_err.into());
326 }
327 },
328 Err(other) => {
329 match other.downcast::<NetworkNodeError>() {
330 Ok(node_err) => {
331 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
332 return Err(node_err.into());
333 }
334 },
335 Err(other) => return Err(other),
336 };
337 },
338 },
339 }
340
341 tokio::time::sleep(Duration::from_secs(1)).await;
343 }
344 }
345
346 pub async fn wait_metric_with_timeout(
349 &self,
350 metric_name: impl Into<String>,
351 predicate: impl Fn(f64) -> bool,
352 timeout_secs: impl Into<u64>,
353 ) -> Result<(), anyhow::Error> {
354 let metric_name = metric_name.into();
355 let secs = timeout_secs.into();
356 debug!("waiting until metric {metric_name} pass the predicate");
357 let res = tokio::time::timeout(
358 Duration::from_secs(secs),
359 self.wait_metric(&metric_name, predicate),
360 )
361 .await;
362
363 if let Ok(inner_res) = res {
364 match inner_res {
365 Ok(_) => Ok(()),
366 Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
367 }
368 } else {
369 Err(anyhow!(
371 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
372 ))
373 }
374 }
375
376 pub async fn logs(&self) -> Result<String, anyhow::Error> {
381 Ok(self.inner.logs().await?)
382 }
383
384 pub async fn wait_log_line_count(
386 &self,
387 pattern: impl Into<String>,
388 is_glob: bool,
389 count: usize,
390 ) -> Result<(), anyhow::Error> {
391 let pattern = pattern.into();
392 let pattern_clone = pattern.clone();
393 debug!("waiting until we find pattern {pattern} {count} times");
394 let match_fn: BoxedClosure = if is_glob {
395 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
396 } else {
397 let re = Regex::new(&pattern)?;
398 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
399 };
400
401 loop {
402 let mut q = 0_usize;
403 let logs = self.logs().await?;
404 for line in logs.lines() {
405 trace!("line is {line}");
406 if match_fn(line)? {
407 trace!("pattern {pattern_clone} match in line {line}");
408 q += 1;
409 if q >= count {
410 return Ok(());
411 }
412 }
413 }
414
415 tokio::time::sleep(Duration::from_secs(2)).await;
416 }
417 }
418
419 pub async fn wait_log_line_count_with_timeout(
463 &self,
464 substring: impl Into<String>,
465 is_glob: bool,
466 options: LogLineCountOptions,
467 ) -> Result<LogLineCount, anyhow::Error> {
468 let substring = substring.into();
469 debug!(
470 "waiting until match lines count within {} seconds",
471 options.timeout.as_secs_f64()
472 );
473
474 let start = tokio::time::Instant::now();
475
476 let match_fn: BoxedClosure = if is_glob {
477 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
478 } else {
479 let re = Regex::new(&substring)?;
480 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
481 };
482
483 if options.wait_until_timeout_elapses {
484 tokio::time::sleep(options.timeout).await;
485 }
486
487 let mut q;
488 loop {
489 q = 0_u32;
490 let logs = self.logs().await?;
491 for line in logs.lines() {
492 if match_fn(line)? {
493 q += 1;
494
495 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
500 return Ok(LogLineCount::TargetReached(q));
501 }
502 }
503 }
504
505 if start.elapsed() >= options.timeout {
506 break;
507 }
508
509 tokio::time::sleep(Duration::from_secs(2)).await;
510 }
511
512 if (options.predicate)(q) {
513 Ok(LogLineCount::TargetReached(q))
514 } else {
515 Ok(LogLineCount::TargetFailed(q))
516 }
517 }
518
519 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
520 let response = reqwest::get(&self.prometheus_uri).await?;
521 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
522 let mut cache = self.metrics_cache.write().await;
523 *cache = metrics;
524 Ok(())
525 }
526
527 async fn metric(
529 &self,
530 metric_name: &str,
531 treat_not_found_as_zero: bool,
532 ) -> Result<f64, anyhow::Error> {
533 let mut metrics_map = self.metrics_cache.read().await;
534 if metrics_map.is_empty() {
535 drop(metrics_map);
537 self.fetch_metrics().await?;
538 metrics_map = self.metrics_cache.read().await;
539 }
540
541 if let Some(val) = metrics_map.get(metric_name) {
542 Ok(*val)
543 } else if treat_not_found_as_zero {
544 Ok(0_f64)
545 } else {
546 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
547 }
548 }
549
550 pub async fn wait_until_is_up(
562 &self,
563 timeout_secs: impl Into<u64>,
564 ) -> Result<(), anyhow::Error> {
565 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
566 .await
567 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
568 }
569}
570
571impl std::fmt::Debug for NetworkNode {
572 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573 f.debug_struct("NetworkNode")
574 .field("inner", &"inner_skipped")
575 .field("spec", &self.spec)
576 .field("name", &self.name)
577 .field("ws_uri", &self.ws_uri)
578 .field("prometheus_uri", &self.prometheus_uri)
579 .finish()
580 }
581}
582
583#[cfg(test)]
585mod tests {
586 use std::{
587 path::{Path, PathBuf},
588 sync::{Arc, Mutex},
589 };
590
591 use async_trait::async_trait;
592 use provider::{types::*, ProviderError, ProviderNode};
593
594 use super::*;
595
596 struct MockNode {
597 logs: Arc<Mutex<Vec<String>>>,
598 }
599
600 impl MockNode {
601 fn new() -> Self {
602 Self {
603 logs: Arc::new(Mutex::new(vec![])),
604 }
605 }
606
607 fn logs_push(&self, lines: Vec<impl Into<String>>) {
608 self.logs
609 .lock()
610 .unwrap()
611 .extend(lines.into_iter().map(|l| l.into()));
612 }
613 }
614
615 #[async_trait]
616 impl ProviderNode for MockNode {
617 fn name(&self) -> &str {
618 todo!()
619 }
620
621 fn args(&self) -> Vec<&str> {
622 todo!()
623 }
624
625 fn base_dir(&self) -> &PathBuf {
626 todo!()
627 }
628
629 fn config_dir(&self) -> &PathBuf {
630 todo!()
631 }
632
633 fn data_dir(&self) -> &PathBuf {
634 todo!()
635 }
636
637 fn relay_data_dir(&self) -> &PathBuf {
638 todo!()
639 }
640
641 fn scripts_dir(&self) -> &PathBuf {
642 todo!()
643 }
644
645 fn log_path(&self) -> &PathBuf {
646 todo!()
647 }
648
649 fn log_cmd(&self) -> String {
650 todo!()
651 }
652
653 fn path_in_node(&self, _file: &Path) -> PathBuf {
654 todo!()
655 }
656
657 async fn logs(&self) -> Result<String, ProviderError> {
658 Ok(self.logs.lock().unwrap().join("\n"))
659 }
660
661 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
662 todo!()
663 }
664
665 async fn run_command(
666 &self,
667 _options: RunCommandOptions,
668 ) -> Result<ExecutionResult, ProviderError> {
669 todo!()
670 }
671
672 async fn run_script(
673 &self,
674 _options: RunScriptOptions,
675 ) -> Result<ExecutionResult, ProviderError> {
676 todo!()
677 }
678
679 async fn send_file(
680 &self,
681 _local_file_path: &Path,
682 _remote_file_path: &Path,
683 _mode: &str,
684 ) -> Result<(), ProviderError> {
685 todo!()
686 }
687
688 async fn receive_file(
689 &self,
690 _remote_file_path: &Path,
691 _local_file_path: &Path,
692 ) -> Result<(), ProviderError> {
693 todo!()
694 }
695
696 async fn pause(&self) -> Result<(), ProviderError> {
697 todo!()
698 }
699
700 async fn resume(&self) -> Result<(), ProviderError> {
701 todo!()
702 }
703
704 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
705 todo!()
706 }
707
708 async fn destroy(&self) -> Result<(), ProviderError> {
709 todo!()
710 }
711 }
712
713 #[tokio::test(flavor = "multi_thread")]
714 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
715 let mock_provider = Arc::new(MockNode::new());
716 let mock_node = NetworkNode::new(
717 "node1",
718 "ws_uri",
719 "prometheus_uri",
720 "multiaddr",
721 NodeSpec::default(),
722 mock_provider.clone(),
723 );
724
725 mock_provider.logs_push(vec![
726 "system booting",
727 "stub line 1",
728 "stub line 2",
729 "system ready",
730 ]);
731
732 let options = LogLineCountOptions {
734 predicate: Arc::new(|n| n == 1),
735 timeout: Duration::from_secs(10),
736 wait_until_timeout_elapses: false,
737 };
738
739 let log_line_count = mock_node
740 .wait_log_line_count_with_timeout("system ready", false, options)
741 .await?;
742
743 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
744
745 Ok(())
746 }
747
748 #[tokio::test(flavor = "multi_thread")]
749 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
750 let mock_provider = Arc::new(MockNode::new());
751 let mock_node = NetworkNode::new(
752 "node1",
753 "ws_uri",
754 "prometheus_uri",
755 "multiaddr",
756 NodeSpec::default(),
757 mock_provider.clone(),
758 );
759
760 mock_provider.logs_push(vec![
761 "system booting",
762 "stub line 1",
763 "stub line 2",
764 "system ready",
765 ]);
766
767 let options = LogLineCountOptions {
769 predicate: Arc::new(|n| n == 2),
770 timeout: Duration::from_secs(4),
771 wait_until_timeout_elapses: false,
772 };
773
774 let task = tokio::spawn({
775 async move {
776 mock_node
777 .wait_log_line_count_with_timeout("system ready", false, options)
778 .await
779 .unwrap()
780 }
781 });
782
783 tokio::time::sleep(Duration::from_secs(2)).await;
784
785 mock_provider.logs_push(vec!["system ready"]);
786
787 let log_line_count = task.await?;
788
789 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
790
791 Ok(())
792 }
793
794 #[tokio::test(flavor = "multi_thread")]
795 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
796 let mock_provider = Arc::new(MockNode::new());
797 let mock_node = NetworkNode::new(
798 "node1",
799 "ws_uri",
800 "prometheus_uri",
801 "multiaddr",
802 NodeSpec::default(),
803 mock_provider.clone(),
804 );
805
806 mock_provider.logs_push(vec![
807 "system booting",
808 "stub line 1",
809 "stub line 2",
810 "system ready",
811 ]);
812
813 let options = LogLineCountOptions {
815 predicate: Arc::new(|n| n == 2),
816 timeout: Duration::from_secs(2),
817 wait_until_timeout_elapses: false,
818 };
819
820 let log_line_count = mock_node
821 .wait_log_line_count_with_timeout("system ready", false, options)
822 .await?;
823
824 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
825
826 Ok(())
827 }
828
829 #[tokio::test(flavor = "multi_thread")]
830 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
831 let mock_provider = Arc::new(MockNode::new());
832 let mock_node = NetworkNode::new(
833 "node1",
834 "ws_uri",
835 "prometheus_uri",
836 "multiaddr",
837 NodeSpec::default(),
838 mock_provider.clone(),
839 );
840
841 mock_provider.logs_push(vec![
842 "system booting",
843 "stub line 1",
844 "stub line 2",
845 "system ready",
846 ]);
847
848 let options = LogLineCountOptions {
850 predicate: Arc::new(|n| n == 2),
851 timeout: Duration::from_secs(2),
852 wait_until_timeout_elapses: true,
853 };
854
855 let task = tokio::spawn({
856 async move {
857 mock_node
858 .wait_log_line_count_with_timeout("system ready", false, options)
859 .await
860 .unwrap()
861 }
862 });
863
864 tokio::time::sleep(Duration::from_secs(1)).await;
865
866 mock_provider.logs_push(vec!["system ready"]);
867 mock_provider.logs_push(vec!["system ready"]);
868
869 let log_line_count = task.await?;
870
871 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
872
873 Ok(())
874 }
875
876 #[tokio::test(flavor = "multi_thread")]
877 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
878 let mock_provider = Arc::new(MockNode::new());
879 let mock_node = NetworkNode::new(
880 "node1",
881 "ws_uri",
882 "prometheus_uri",
883 "multiaddr",
884 NodeSpec::default(),
885 mock_provider.clone(),
886 );
887
888 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
889
890 let task = tokio::spawn({
891 async move {
892 mock_node
893 .wait_log_line_count_with_timeout(
894 "system ready",
895 false,
896 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
898 )
899 .await
900 .unwrap()
901 }
902 });
903
904 tokio::time::sleep(Duration::from_secs(1)).await;
905
906 mock_provider.logs_push(vec!["stub line 3"]);
907
908 assert!(task.await?.success());
909
910 Ok(())
911 }
912
913 #[tokio::test(flavor = "multi_thread")]
914 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
915 let mock_provider = Arc::new(MockNode::new());
916 let mock_node = NetworkNode::new(
917 "node1",
918 "ws_uri",
919 "prometheus_uri",
920 "multiaddr",
921 NodeSpec::default(),
922 mock_provider.clone(),
923 );
924
925 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
926
927 let options = LogLineCountOptions {
929 predicate: Arc::new(|n| (2..=5).contains(&n)),
930 timeout: Duration::from_secs(2),
931 wait_until_timeout_elapses: true,
932 };
933
934 let task = tokio::spawn({
935 async move {
936 mock_node
937 .wait_log_line_count_with_timeout("system ready", false, options)
938 .await
939 .unwrap()
940 }
941 });
942
943 tokio::time::sleep(Duration::from_secs(1)).await;
944
945 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
946
947 assert!(task.await?.success());
948
949 Ok(())
950 }
951
952 #[tokio::test(flavor = "multi_thread")]
953 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
954 let mock_provider = Arc::new(MockNode::new());
955 let mock_node = NetworkNode::new(
956 "node1",
957 "ws_uri",
958 "prometheus_uri",
959 "multiaddr",
960 NodeSpec::default(),
961 mock_provider.clone(),
962 );
963
964 mock_provider.logs_push(vec![
965 "system booting",
966 "stub line 1",
967 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
969 "stub line 2"
970 ]);
971
972 let options = LogLineCountOptions {
973 predicate: Arc::new(|n| n == 1),
974 timeout: Duration::from_secs(3),
975 wait_until_timeout_elapses: true,
976 };
977
978 let task = tokio::spawn({
979 async move {
980 mock_node
981 .wait_log_line_count_with_timeout(
982 "error(?! importing block .*: block has an unknown parent)",
983 false,
984 options,
985 )
986 .await
987 .unwrap()
988 }
989 });
990
991 tokio::time::sleep(Duration::from_secs(1)).await;
992
993 mock_provider.logs_push(vec![
994 "system ready",
995 "system error",
997 "system ready",
998 ]);
999
1000 assert!(task.await?.success());
1001
1002 Ok(())
1003 }
1004
1005 #[tokio::test(flavor = "multi_thread")]
1006 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1007 ) -> Result<(), anyhow::Error> {
1008 let mock_provider = Arc::new(MockNode::new());
1009 let mock_node = NetworkNode::new(
1010 "node1",
1011 "ws_uri",
1012 "prometheus_uri",
1013 "multiaddr",
1014 NodeSpec::default(),
1015 mock_provider.clone(),
1016 );
1017
1018 mock_provider.logs_push(vec![
1019 "system booting",
1020 "stub line 1",
1021 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1023 "stub line 2"
1024 ]);
1025
1026 let options = LogLineCountOptions {
1027 predicate: Arc::new(|n| n == 1),
1028 timeout: Duration::from_secs(6),
1029 wait_until_timeout_elapses: true,
1030 };
1031
1032 let task = tokio::spawn({
1033 async move {
1034 mock_node
1035 .wait_log_line_count_with_timeout(
1036 "error(?! importing block .*: block has an unknown parent)",
1037 false,
1038 options,
1039 )
1040 .await
1041 .unwrap()
1042 }
1043 });
1044
1045 tokio::time::sleep(Duration::from_secs(1)).await;
1046
1047 mock_provider.logs_push(vec!["system ready", "system ready"]);
1048
1049 assert!(!task.await?.success());
1050
1051 Ok(())
1052 }
1053
1054 #[tokio::test(flavor = "multi_thread")]
1055 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1056 let mock_provider = Arc::new(MockNode::new());
1057 let mock_node = NetworkNode::new(
1058 "node1",
1059 "ws_uri",
1060 "prometheus_uri",
1061 "multiaddr",
1062 NodeSpec::default(),
1063 mock_provider.clone(),
1064 );
1065
1066 mock_provider.logs_push(vec![
1067 "system booting",
1068 "stub line 1",
1069 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1071 "stub line 2"
1072 ]);
1073
1074 let task = tokio::spawn({
1075 async move {
1076 mock_node
1077 .wait_log_line_count(
1078 "error(?! importing block .*: block has an unknown parent)",
1079 false,
1080 1,
1081 )
1082 .await
1083 .unwrap()
1084 }
1085 });
1086
1087 tokio::time::sleep(Duration::from_secs(1)).await;
1088
1089 mock_provider.logs_push(vec![
1090 "system ready",
1091 "system error",
1093 "system ready",
1094 ]);
1095
1096 assert!(task.await.is_ok());
1097
1098 Ok(())
1099 }
1100
1101 #[tokio::test(flavor = "multi_thread")]
1102 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1103 let mock_provider = Arc::new(MockNode::new());
1104 let mock_node = NetworkNode::new(
1105 "node1",
1106 "ws_uri",
1107 "prometheus_uri",
1108 "multiaddr",
1109 NodeSpec::default(),
1110 mock_provider.clone(),
1111 );
1112
1113 mock_provider.logs_push(vec![
1114 "system booting",
1115 "stub line 1",
1116 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1118 "stub line 2"
1119 ]);
1120
1121 let options = LogLineCountOptions {
1122 predicate: Arc::new(|count| count == 1),
1123 timeout: Duration::from_secs(2),
1124 wait_until_timeout_elapses: true,
1125 };
1126
1127 let task = tokio::spawn({
1128 async move {
1129 mock_node
1131 .wait_log_line_count_with_timeout(
1132 "error(?! importing block .*: block has an unknown parent)",
1133 false,
1134 options,
1135 )
1136 .await
1137 .unwrap()
1138 }
1139 });
1140
1141 tokio::time::sleep(Duration::from_secs(1)).await;
1142
1143 mock_provider.logs_push(vec!["system ready", "system ready"]);
1144
1145 assert!(!task.await?.success());
1146
1147 Ok(())
1148 }
1149}