1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::{Deserialize, Serialize, Serializer};
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27 #[error("metric '{0}' not found!")]
28 MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33 #[serde(serialize_with = "serialize_provider_node")]
34 pub(crate) inner: DynNode,
35 pub(crate) spec: NodeSpec,
38 pub(crate) name: String,
39 pub(crate) ws_uri: String,
40 pub(crate) multiaddr: String,
41 pub(crate) prometheus_uri: String,
42 #[serde(skip)]
43 metrics_cache: Arc<RwLock<MetricMap>>,
44 #[serde(skip)]
45 is_running: Arc<AtomicBool>,
46}
47
48#[derive(Deserialize)]
49pub(crate) struct RawNetworkNode {
50 pub(crate) name: String,
51 pub(crate) ws_uri: String,
52 pub(crate) prometheus_uri: String,
53 pub(crate) multiaddr: String,
54 pub(crate) spec: NodeSpec,
55 pub(crate) inner: serde_json::Value,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum LogLineCount {
69 TargetReached(u32),
70 TargetFailed(u32),
71}
72
73impl LogLineCount {
74 pub fn success(&self) -> bool {
75 match self {
76 Self::TargetReached(..) => true,
77 Self::TargetFailed(..) => false,
78 }
79 }
80}
81
82#[derive(Clone)]
95pub struct LogLineCountOptions {
96 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
97 pub timeout: Duration,
98 pub wait_until_timeout_elapses: bool,
99}
100
101impl LogLineCountOptions {
102 pub fn new(
103 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
104 timeout: Duration,
105 wait_until_timeout_elapses: bool,
106 ) -> Self {
107 Self {
108 predicate: Arc::new(predicate),
109 timeout,
110 wait_until_timeout_elapses,
111 }
112 }
113
114 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
115 Self::new(|n| n == 0, timeout, true)
116 }
117}
118
119impl NetworkNode {
132 pub(crate) fn new<T: Into<String>>(
134 name: T,
135 ws_uri: T,
136 prometheus_uri: T,
137 multiaddr: T,
138 spec: NodeSpec,
139 inner: DynNode,
140 ) -> Self {
141 Self {
142 name: name.into(),
143 ws_uri: ws_uri.into(),
144 prometheus_uri: prometheus_uri.into(),
145 inner,
146 spec,
147 multiaddr: multiaddr.into(),
148 metrics_cache: Arc::new(Default::default()),
149 is_running: Arc::new(AtomicBool::new(false)),
150 }
151 }
152
153 pub(crate) fn is_running(&self) -> bool {
154 self.is_running.load(Ordering::Acquire)
155 }
156
157 pub(crate) fn set_is_running(&self, is_running: bool) {
158 self.is_running.store(is_running, Ordering::Release);
159 }
160
161 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
162 self.multiaddr = multiaddr.into();
163 }
164
165 pub fn name(&self) -> &str {
166 &self.name
167 }
168
169 pub fn args(&self) -> Vec<&str> {
170 self.inner.args()
171 }
172
173 pub fn spec(&self) -> &NodeSpec {
174 &self.spec
175 }
176
177 pub fn ws_uri(&self) -> &str {
178 &self.ws_uri
179 }
180
181 pub fn multiaddr(&self) -> &str {
182 self.multiaddr.as_ref()
183 }
184
185 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
189 get_client_from_url(&self.ws_uri).await
190 }
191
192 #[deprecated = "Use `wait_client` instead."]
194 pub async fn client<Config: subxt::Config>(
195 &self,
196 ) -> Result<OnlineClient<Config>, subxt::Error> {
197 self.try_client().await
198 }
199
200 pub async fn try_client<Config: subxt::Config>(
209 &self,
210 ) -> Result<OnlineClient<Config>, subxt::Error> {
211 get_client_from_url(&self.ws_uri).await
212 }
213
214 pub async fn wait_client<Config: subxt::Config>(
216 &self,
217 ) -> Result<OnlineClient<Config>, anyhow::Error> {
218 debug!("wait_client ws_uri: {}", self.ws_uri());
219 wait_ws_ready(self.ws_uri())
220 .await
221 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
222
223 self.try_client()
224 .await
225 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
226 }
227
228 pub async fn wait_client_with_timeout<Config: subxt::Config>(
230 &self,
231 timeout_secs: impl Into<u64>,
232 ) -> Result<OnlineClient<Config>, anyhow::Error> {
233 debug!("waiting until subxt client is ready");
234 tokio::time::timeout(
235 Duration::from_secs(timeout_secs.into()),
236 self.wait_client::<Config>(),
237 )
238 .await?
239 }
240
241 pub async fn pause(&self) -> Result<(), anyhow::Error> {
249 self.set_is_running(false);
250 self.inner.pause().await?;
251 Ok(())
252 }
253
254 pub async fn resume(&self) -> Result<(), anyhow::Error> {
260 self.set_is_running(true);
261 self.inner.resume().await?;
262 Ok(())
263 }
264
265 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
270 self.set_is_running(false);
271 self.inner.restart(after).await?;
272 self.set_is_running(true);
273 Ok(())
274 }
275
276 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
284 let metric_name = metric_name.into();
285 self.fetch_metrics().await?;
287 self.metric(&metric_name, true).await
289 }
290
291 pub async fn assert(
300 &self,
301 metric_name: impl Into<String>,
302 value: impl Into<f64>,
303 ) -> Result<bool, anyhow::Error> {
304 let value: f64 = value.into();
305 self.assert_with(metric_name, |v| v == value).await
306 }
307
308 pub async fn assert_with(
311 &self,
312 metric_name: impl Into<String>,
313 predicate: impl Fn(f64) -> bool,
314 ) -> Result<bool, anyhow::Error> {
315 let metric_name = metric_name.into();
316 self.fetch_metrics().await?;
318 let val = self.metric(&metric_name, true).await?;
319 trace!("🔎 Current value {val} passed to the predicated?");
320 Ok(predicate(val))
321 }
322
323 pub async fn wait_metric(
327 &self,
328 metric_name: impl Into<String>,
329 predicate: impl Fn(f64) -> bool,
330 ) -> Result<(), anyhow::Error> {
331 let metric_name = metric_name.into();
332 debug!("waiting until metric {metric_name} pass the predicate");
333 loop {
334 let res = self.assert_with(&metric_name, &predicate).await;
335 match res {
336 Ok(res) => {
337 if res {
338 return Ok(());
339 }
340 },
341 Err(e) => match e.downcast::<reqwest::Error>() {
342 Ok(io_err) => {
343 if !skip_err_while_waiting(&io_err) {
344 return Err(io_err.into());
345 }
346 },
347 Err(other) => {
348 match other.downcast::<NetworkNodeError>() {
349 Ok(node_err) => {
350 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
351 return Err(node_err.into());
352 }
353 },
354 Err(other) => return Err(other),
355 };
356 },
357 },
358 }
359
360 tokio::time::sleep(Duration::from_secs(1)).await;
362 }
363 }
364
365 pub async fn wait_metric_with_timeout(
368 &self,
369 metric_name: impl Into<String>,
370 predicate: impl Fn(f64) -> bool,
371 timeout_secs: impl Into<u64>,
372 ) -> Result<(), anyhow::Error> {
373 let metric_name = metric_name.into();
374 let secs = timeout_secs.into();
375 debug!("waiting until metric {metric_name} pass the predicate");
376 let res = tokio::time::timeout(
377 Duration::from_secs(secs),
378 self.wait_metric(&metric_name, predicate),
379 )
380 .await;
381
382 if let Ok(inner_res) = res {
383 match inner_res {
384 Ok(_) => Ok(()),
385 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
386 }
387 } else {
388 Err(anyhow!(
390 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
391 ))
392 }
393 }
394
395 pub async fn logs(&self) -> Result<String, anyhow::Error> {
400 Ok(self.inner.logs().await?)
401 }
402
403 pub async fn wait_log_line_count(
405 &self,
406 pattern: impl Into<String>,
407 is_glob: bool,
408 count: usize,
409 ) -> Result<(), anyhow::Error> {
410 let pattern = pattern.into();
411 let pattern_clone = pattern.clone();
412 debug!("waiting until we find pattern {pattern} {count} times");
413 let match_fn: BoxedClosure = if is_glob {
414 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
415 } else {
416 let re = Regex::new(&pattern)?;
417 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
418 };
419
420 loop {
421 let mut q = 0_usize;
422 let logs = self.logs().await?;
423 for line in logs.lines() {
424 trace!("line is {line}");
425 if match_fn(line)? {
426 trace!("pattern {pattern_clone} match in line {line}");
427 q += 1;
428 if q >= count {
429 return Ok(());
430 }
431 }
432 }
433
434 tokio::time::sleep(Duration::from_secs(2)).await;
435 }
436 }
437
438 pub async fn wait_log_line_count_with_timeout(
482 &self,
483 substring: impl Into<String>,
484 is_glob: bool,
485 options: LogLineCountOptions,
486 ) -> Result<LogLineCount, anyhow::Error> {
487 let substring = substring.into();
488 debug!(
489 "waiting until match lines count within {} seconds",
490 options.timeout.as_secs_f64()
491 );
492
493 let start = tokio::time::Instant::now();
494
495 let match_fn: BoxedClosure = if is_glob {
496 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
497 } else {
498 let re = Regex::new(&substring)?;
499 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
500 };
501
502 if options.wait_until_timeout_elapses {
503 tokio::time::sleep(options.timeout).await;
504 }
505
506 let mut q;
507 loop {
508 q = 0_u32;
509 let logs = self.logs().await?;
510 for line in logs.lines() {
511 if match_fn(line)? {
512 q += 1;
513
514 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
519 return Ok(LogLineCount::TargetReached(q));
520 }
521 }
522 }
523
524 if start.elapsed() >= options.timeout {
525 break;
526 }
527
528 tokio::time::sleep(Duration::from_secs(2)).await;
529 }
530
531 if (options.predicate)(q) {
532 Ok(LogLineCount::TargetReached(q))
533 } else {
534 Ok(LogLineCount::TargetFailed(q))
535 }
536 }
537
538 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
539 let response = reqwest::get(&self.prometheus_uri).await?;
540 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
541 let mut cache = self.metrics_cache.write().await;
542 *cache = metrics;
543 Ok(())
544 }
545
546 async fn metric(
548 &self,
549 metric_name: &str,
550 treat_not_found_as_zero: bool,
551 ) -> Result<f64, anyhow::Error> {
552 let mut metrics_map = self.metrics_cache.read().await;
553 if metrics_map.is_empty() {
554 drop(metrics_map);
556 self.fetch_metrics().await?;
557 metrics_map = self.metrics_cache.read().await;
558 }
559
560 if let Some(val) = metrics_map.get(metric_name) {
561 Ok(*val)
562 } else if treat_not_found_as_zero {
563 Ok(0_f64)
564 } else {
565 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
566 }
567 }
568
569 pub async fn wait_until_is_up(
581 &self,
582 timeout_secs: impl Into<u64>,
583 ) -> Result<(), anyhow::Error> {
584 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
585 .await
586 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
587 }
588}
589
590impl std::fmt::Debug for NetworkNode {
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 f.debug_struct("NetworkNode")
593 .field("inner", &"inner_skipped")
594 .field("spec", &self.spec)
595 .field("name", &self.name)
596 .field("ws_uri", &self.ws_uri)
597 .field("prometheus_uri", &self.prometheus_uri)
598 .finish()
599 }
600}
601
602fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
603where
604 S: Serializer,
605{
606 erased_serde::serialize(node.as_ref(), serializer)
607}
608
609#[cfg(test)]
611mod tests {
612 use std::{
613 path::{Path, PathBuf},
614 sync::{Arc, Mutex},
615 };
616
617 use async_trait::async_trait;
618 use provider::{types::*, ProviderError, ProviderNode};
619
620 use super::*;
621
622 #[derive(Serialize)]
623 struct MockNode {
624 logs: Arc<Mutex<Vec<String>>>,
625 }
626
627 impl MockNode {
628 fn new() -> Self {
629 Self {
630 logs: Arc::new(Mutex::new(vec![])),
631 }
632 }
633
634 fn logs_push(&self, lines: Vec<impl Into<String>>) {
635 self.logs
636 .lock()
637 .unwrap()
638 .extend(lines.into_iter().map(|l| l.into()));
639 }
640 }
641
642 #[async_trait]
643 impl ProviderNode for MockNode {
644 fn name(&self) -> &str {
645 todo!()
646 }
647
648 fn args(&self) -> Vec<&str> {
649 todo!()
650 }
651
652 fn base_dir(&self) -> &PathBuf {
653 todo!()
654 }
655
656 fn config_dir(&self) -> &PathBuf {
657 todo!()
658 }
659
660 fn data_dir(&self) -> &PathBuf {
661 todo!()
662 }
663
664 fn relay_data_dir(&self) -> &PathBuf {
665 todo!()
666 }
667
668 fn scripts_dir(&self) -> &PathBuf {
669 todo!()
670 }
671
672 fn log_path(&self) -> &PathBuf {
673 todo!()
674 }
675
676 fn log_cmd(&self) -> String {
677 todo!()
678 }
679
680 fn path_in_node(&self, _file: &Path) -> PathBuf {
681 todo!()
682 }
683
684 async fn logs(&self) -> Result<String, ProviderError> {
685 Ok(self.logs.lock().unwrap().join("\n"))
686 }
687
688 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
689 todo!()
690 }
691
692 async fn run_command(
693 &self,
694 _options: RunCommandOptions,
695 ) -> Result<ExecutionResult, ProviderError> {
696 todo!()
697 }
698
699 async fn run_script(
700 &self,
701 _options: RunScriptOptions,
702 ) -> Result<ExecutionResult, ProviderError> {
703 todo!()
704 }
705
706 async fn send_file(
707 &self,
708 _local_file_path: &Path,
709 _remote_file_path: &Path,
710 _mode: &str,
711 ) -> Result<(), ProviderError> {
712 todo!()
713 }
714
715 async fn receive_file(
716 &self,
717 _remote_file_path: &Path,
718 _local_file_path: &Path,
719 ) -> Result<(), ProviderError> {
720 todo!()
721 }
722
723 async fn pause(&self) -> Result<(), ProviderError> {
724 todo!()
725 }
726
727 async fn resume(&self) -> Result<(), ProviderError> {
728 todo!()
729 }
730
731 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
732 todo!()
733 }
734
735 async fn destroy(&self) -> Result<(), ProviderError> {
736 todo!()
737 }
738 }
739
740 #[tokio::test(flavor = "multi_thread")]
741 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
742 let mock_provider = Arc::new(MockNode::new());
743 let mock_node = NetworkNode::new(
744 "node1",
745 "ws_uri",
746 "prometheus_uri",
747 "multiaddr",
748 NodeSpec::default(),
749 mock_provider.clone(),
750 );
751
752 mock_provider.logs_push(vec![
753 "system booting",
754 "stub line 1",
755 "stub line 2",
756 "system ready",
757 ]);
758
759 let options = LogLineCountOptions {
761 predicate: Arc::new(|n| n == 1),
762 timeout: Duration::from_secs(10),
763 wait_until_timeout_elapses: false,
764 };
765
766 let log_line_count = mock_node
767 .wait_log_line_count_with_timeout("system ready", false, options)
768 .await?;
769
770 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
771
772 Ok(())
773 }
774
775 #[tokio::test(flavor = "multi_thread")]
776 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
777 let mock_provider = Arc::new(MockNode::new());
778 let mock_node = NetworkNode::new(
779 "node1",
780 "ws_uri",
781 "prometheus_uri",
782 "multiaddr",
783 NodeSpec::default(),
784 mock_provider.clone(),
785 );
786
787 mock_provider.logs_push(vec![
788 "system booting",
789 "stub line 1",
790 "stub line 2",
791 "system ready",
792 ]);
793
794 let options = LogLineCountOptions {
796 predicate: Arc::new(|n| n == 2),
797 timeout: Duration::from_secs(4),
798 wait_until_timeout_elapses: false,
799 };
800
801 let task = tokio::spawn({
802 async move {
803 mock_node
804 .wait_log_line_count_with_timeout("system ready", false, options)
805 .await
806 .unwrap()
807 }
808 });
809
810 tokio::time::sleep(Duration::from_secs(2)).await;
811
812 mock_provider.logs_push(vec!["system ready"]);
813
814 let log_line_count = task.await?;
815
816 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
817
818 Ok(())
819 }
820
821 #[tokio::test(flavor = "multi_thread")]
822 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
823 let mock_provider = Arc::new(MockNode::new());
824 let mock_node = NetworkNode::new(
825 "node1",
826 "ws_uri",
827 "prometheus_uri",
828 "multiaddr",
829 NodeSpec::default(),
830 mock_provider.clone(),
831 );
832
833 mock_provider.logs_push(vec![
834 "system booting",
835 "stub line 1",
836 "stub line 2",
837 "system ready",
838 ]);
839
840 let options = LogLineCountOptions {
842 predicate: Arc::new(|n| n == 2),
843 timeout: Duration::from_secs(2),
844 wait_until_timeout_elapses: false,
845 };
846
847 let log_line_count = mock_node
848 .wait_log_line_count_with_timeout("system ready", false, options)
849 .await?;
850
851 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
852
853 Ok(())
854 }
855
856 #[tokio::test(flavor = "multi_thread")]
857 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
858 let mock_provider = Arc::new(MockNode::new());
859 let mock_node = NetworkNode::new(
860 "node1",
861 "ws_uri",
862 "prometheus_uri",
863 "multiaddr",
864 NodeSpec::default(),
865 mock_provider.clone(),
866 );
867
868 mock_provider.logs_push(vec![
869 "system booting",
870 "stub line 1",
871 "stub line 2",
872 "system ready",
873 ]);
874
875 let options = LogLineCountOptions {
877 predicate: Arc::new(|n| n == 2),
878 timeout: Duration::from_secs(2),
879 wait_until_timeout_elapses: true,
880 };
881
882 let task = tokio::spawn({
883 async move {
884 mock_node
885 .wait_log_line_count_with_timeout("system ready", false, options)
886 .await
887 .unwrap()
888 }
889 });
890
891 tokio::time::sleep(Duration::from_secs(1)).await;
892
893 mock_provider.logs_push(vec!["system ready"]);
894 mock_provider.logs_push(vec!["system ready"]);
895
896 let log_line_count = task.await?;
897
898 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
899
900 Ok(())
901 }
902
903 #[tokio::test(flavor = "multi_thread")]
904 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
905 let mock_provider = Arc::new(MockNode::new());
906 let mock_node = NetworkNode::new(
907 "node1",
908 "ws_uri",
909 "prometheus_uri",
910 "multiaddr",
911 NodeSpec::default(),
912 mock_provider.clone(),
913 );
914
915 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
916
917 let task = tokio::spawn({
918 async move {
919 mock_node
920 .wait_log_line_count_with_timeout(
921 "system ready",
922 false,
923 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
925 )
926 .await
927 .unwrap()
928 }
929 });
930
931 tokio::time::sleep(Duration::from_secs(1)).await;
932
933 mock_provider.logs_push(vec!["stub line 3"]);
934
935 assert!(task.await?.success());
936
937 Ok(())
938 }
939
940 #[tokio::test(flavor = "multi_thread")]
941 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
942 let mock_provider = Arc::new(MockNode::new());
943 let mock_node = NetworkNode::new(
944 "node1",
945 "ws_uri",
946 "prometheus_uri",
947 "multiaddr",
948 NodeSpec::default(),
949 mock_provider.clone(),
950 );
951
952 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
953
954 let options = LogLineCountOptions {
956 predicate: Arc::new(|n| (2..=5).contains(&n)),
957 timeout: Duration::from_secs(2),
958 wait_until_timeout_elapses: true,
959 };
960
961 let task = tokio::spawn({
962 async move {
963 mock_node
964 .wait_log_line_count_with_timeout("system ready", false, options)
965 .await
966 .unwrap()
967 }
968 });
969
970 tokio::time::sleep(Duration::from_secs(1)).await;
971
972 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
973
974 assert!(task.await?.success());
975
976 Ok(())
977 }
978
979 #[tokio::test(flavor = "multi_thread")]
980 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
981 let mock_provider = Arc::new(MockNode::new());
982 let mock_node = NetworkNode::new(
983 "node1",
984 "ws_uri",
985 "prometheus_uri",
986 "multiaddr",
987 NodeSpec::default(),
988 mock_provider.clone(),
989 );
990
991 mock_provider.logs_push(vec![
992 "system booting",
993 "stub line 1",
994 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
996 "stub line 2"
997 ]);
998
999 let options = LogLineCountOptions {
1000 predicate: Arc::new(|n| n == 1),
1001 timeout: Duration::from_secs(3),
1002 wait_until_timeout_elapses: true,
1003 };
1004
1005 let task = tokio::spawn({
1006 async move {
1007 mock_node
1008 .wait_log_line_count_with_timeout(
1009 "error(?! importing block .*: block has an unknown parent)",
1010 false,
1011 options,
1012 )
1013 .await
1014 .unwrap()
1015 }
1016 });
1017
1018 tokio::time::sleep(Duration::from_secs(1)).await;
1019
1020 mock_provider.logs_push(vec![
1021 "system ready",
1022 "system error",
1024 "system ready",
1025 ]);
1026
1027 assert!(task.await?.success());
1028
1029 Ok(())
1030 }
1031
1032 #[tokio::test(flavor = "multi_thread")]
1033 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1034 ) -> Result<(), anyhow::Error> {
1035 let mock_provider = Arc::new(MockNode::new());
1036 let mock_node = NetworkNode::new(
1037 "node1",
1038 "ws_uri",
1039 "prometheus_uri",
1040 "multiaddr",
1041 NodeSpec::default(),
1042 mock_provider.clone(),
1043 );
1044
1045 mock_provider.logs_push(vec![
1046 "system booting",
1047 "stub line 1",
1048 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1050 "stub line 2"
1051 ]);
1052
1053 let options = LogLineCountOptions {
1054 predicate: Arc::new(|n| n == 1),
1055 timeout: Duration::from_secs(6),
1056 wait_until_timeout_elapses: true,
1057 };
1058
1059 let task = tokio::spawn({
1060 async move {
1061 mock_node
1062 .wait_log_line_count_with_timeout(
1063 "error(?! importing block .*: block has an unknown parent)",
1064 false,
1065 options,
1066 )
1067 .await
1068 .unwrap()
1069 }
1070 });
1071
1072 tokio::time::sleep(Duration::from_secs(1)).await;
1073
1074 mock_provider.logs_push(vec!["system ready", "system ready"]);
1075
1076 assert!(!task.await?.success());
1077
1078 Ok(())
1079 }
1080
1081 #[tokio::test(flavor = "multi_thread")]
1082 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1083 let mock_provider = Arc::new(MockNode::new());
1084 let mock_node = NetworkNode::new(
1085 "node1",
1086 "ws_uri",
1087 "prometheus_uri",
1088 "multiaddr",
1089 NodeSpec::default(),
1090 mock_provider.clone(),
1091 );
1092
1093 mock_provider.logs_push(vec![
1094 "system booting",
1095 "stub line 1",
1096 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1098 "stub line 2"
1099 ]);
1100
1101 let task = tokio::spawn({
1102 async move {
1103 mock_node
1104 .wait_log_line_count(
1105 "error(?! importing block .*: block has an unknown parent)",
1106 false,
1107 1,
1108 )
1109 .await
1110 .unwrap()
1111 }
1112 });
1113
1114 tokio::time::sleep(Duration::from_secs(1)).await;
1115
1116 mock_provider.logs_push(vec![
1117 "system ready",
1118 "system error",
1120 "system ready",
1121 ]);
1122
1123 assert!(task.await.is_ok());
1124
1125 Ok(())
1126 }
1127
1128 #[tokio::test(flavor = "multi_thread")]
1129 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1130 let mock_provider = Arc::new(MockNode::new());
1131 let mock_node = NetworkNode::new(
1132 "node1",
1133 "ws_uri",
1134 "prometheus_uri",
1135 "multiaddr",
1136 NodeSpec::default(),
1137 mock_provider.clone(),
1138 );
1139
1140 mock_provider.logs_push(vec![
1141 "system booting",
1142 "stub line 1",
1143 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1145 "stub line 2"
1146 ]);
1147
1148 let options = LogLineCountOptions {
1149 predicate: Arc::new(|count| count == 1),
1150 timeout: Duration::from_secs(2),
1151 wait_until_timeout_elapses: true,
1152 };
1153
1154 let task = tokio::spawn({
1155 async move {
1156 mock_node
1158 .wait_log_line_count_with_timeout(
1159 "error(?! importing block .*: block has an unknown parent)",
1160 false,
1161 options,
1162 )
1163 .await
1164 .unwrap()
1165 }
1166 });
1167
1168 tokio::time::sleep(Duration::from_secs(1)).await;
1169
1170 mock_provider.logs_push(vec!["system ready", "system ready"]);
1171
1172 assert!(!task.await?.success());
1173
1174 Ok(())
1175 }
1176}