1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15 types::{ExecutionResult, RunScriptOptions},
16 DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31 #[error("metric '{0}' not found!")]
32 MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37 #[serde(serialize_with = "serialize_provider_node")]
38 pub(crate) inner: DynNode,
39 pub(crate) spec: NodeSpec,
42 pub(crate) name: String,
43 pub(crate) ws_uri: String,
44 pub(crate) multiaddr: String,
45 pub(crate) prometheus_uri: String,
46 #[serde(skip)]
47 metrics_cache: Arc<RwLock<MetricMap>>,
48 #[serde(skip)]
49 is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54 pub(crate) name: String,
55 pub(crate) ws_uri: String,
56 pub(crate) prometheus_uri: String,
57 pub(crate) multiaddr: String,
58 pub(crate) spec: NodeSpec,
59 #[serde(default)]
60 pub(crate) inner: serde_json::Value,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum LogLineCount {
74 TargetReached(u32),
75 TargetFailed(u32),
76}
77
78impl LogLineCount {
79 pub fn success(&self) -> bool {
80 match self {
81 Self::TargetReached(..) => true,
82 Self::TargetFailed(..) => false,
83 }
84 }
85}
86
87#[derive(Clone)]
100pub struct LogLineCountOptions {
101 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
102 pub timeout: Duration,
103 pub wait_until_timeout_elapses: bool,
104}
105
106impl LogLineCountOptions {
107 pub fn new(
108 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
109 timeout: Duration,
110 wait_until_timeout_elapses: bool,
111 ) -> Self {
112 Self {
113 predicate: Arc::new(predicate),
114 timeout,
115 wait_until_timeout_elapses,
116 }
117 }
118
119 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
120 Self::new(|n| n == 0, timeout, true)
121 }
122
123 pub fn at_least_once(timeout: Duration) -> Self {
124 Self::new(|count| count >= 1, timeout, false)
125 }
126
127 pub fn exactly_once(timeout: Duration) -> Self {
128 Self::new(|count| count == 1, timeout, false)
129 }
130}
131
132impl NetworkNode {
145 pub(crate) fn new<T: Into<String>>(
147 name: T,
148 ws_uri: T,
149 prometheus_uri: T,
150 multiaddr: T,
151 spec: NodeSpec,
152 inner: DynNode,
153 ) -> Self {
154 Self {
155 name: name.into(),
156 ws_uri: ws_uri.into(),
157 prometheus_uri: prometheus_uri.into(),
158 inner,
159 spec,
160 multiaddr: multiaddr.into(),
161 metrics_cache: Arc::new(Default::default()),
162 is_running: Arc::new(AtomicBool::new(false)),
163 }
164 }
165
166 pub fn is_running(&self) -> bool {
170 self.is_running.load(Ordering::Acquire)
171 }
172
173 pub async fn is_responsive(&self) -> bool {
180 tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
181 .await
182 .is_ok()
183 }
184
185 pub(crate) fn set_is_running(&self, is_running: bool) {
186 self.is_running.store(is_running, Ordering::Release);
187 }
188
189 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
190 self.multiaddr = multiaddr.into();
191 }
192
193 pub fn name(&self) -> &str {
194 &self.name
195 }
196
197 pub fn args(&self) -> Vec<&str> {
198 self.inner.args()
199 }
200
201 pub fn spec(&self) -> &NodeSpec {
202 &self.spec
203 }
204
205 pub fn ws_uri(&self) -> &str {
206 &self.ws_uri
207 }
208
209 pub fn multiaddr(&self) -> &str {
210 self.multiaddr.as_ref()
211 }
212
213 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
217 get_client_from_url(&self.ws_uri).await
218 }
219
220 #[deprecated = "Use `wait_client` instead."]
222 pub async fn client<Config: subxt::Config>(
223 &self,
224 ) -> Result<OnlineClient<Config>, subxt::Error> {
225 self.try_client().await
226 }
227
228 pub async fn try_client<Config: subxt::Config>(
237 &self,
238 ) -> Result<OnlineClient<Config>, subxt::Error> {
239 get_client_from_url(&self.ws_uri).await
240 }
241
242 pub async fn wait_client<Config: subxt::Config>(
244 &self,
245 ) -> Result<OnlineClient<Config>, anyhow::Error> {
246 debug!("wait_client ws_uri: {}", self.ws_uri());
247 wait_ws_ready(self.ws_uri())
248 .await
249 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
250
251 self.try_client()
252 .await
253 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
254 }
255
256 pub async fn wait_client_with_timeout<Config: subxt::Config>(
258 &self,
259 timeout_secs: impl Into<u64>,
260 ) -> Result<OnlineClient<Config>, anyhow::Error> {
261 debug!("waiting until subxt client is ready");
262 tokio::time::timeout(
263 Duration::from_secs(timeout_secs.into()),
264 self.wait_client::<Config>(),
265 )
266 .await?
267 }
268
269 pub async fn pause(&self) -> Result<(), anyhow::Error> {
277 self.set_is_running(false);
278 self.inner.pause().await?;
279 Ok(())
280 }
281
282 pub async fn resume(&self) -> Result<(), anyhow::Error> {
288 self.set_is_running(true);
289 self.inner.resume().await?;
290 Ok(())
291 }
292
293 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
298 self.set_is_running(false);
299 self.inner.restart(after).await?;
300 self.set_is_running(true);
301 Ok(())
302 }
303
304 pub async fn run_script(
311 &self,
312 options: RunScriptOptions,
313 ) -> Result<ExecutionResult, anyhow::Error> {
314 self.inner
315 .run_script(options)
316 .await
317 .map_err(|e| anyhow!("Failed to run script: {e}"))
318 }
319
320 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
328 let metric_name = metric_name.into();
329 self.fetch_metrics().await?;
331 self.metric(&metric_name, true).await
333 }
334
335 pub async fn assert(
344 &self,
345 metric_name: impl Into<String>,
346 value: impl Into<f64>,
347 ) -> Result<bool, anyhow::Error> {
348 let value: f64 = value.into();
349 self.assert_with(metric_name, |v| v == value).await
350 }
351
352 pub async fn assert_with(
355 &self,
356 metric_name: impl Into<String>,
357 predicate: impl Fn(f64) -> bool,
358 ) -> Result<bool, anyhow::Error> {
359 let metric_name = metric_name.into();
360 self.fetch_metrics().await?;
362 let val = self.metric(&metric_name, true).await?;
363 trace!("🔎 Current value {val} passed to the predicated?");
364 Ok(predicate(val))
365 }
366
367 pub async fn wait_metric(
371 &self,
372 metric_name: impl Into<String>,
373 predicate: impl Fn(f64) -> bool,
374 ) -> Result<(), anyhow::Error> {
375 let metric_name = metric_name.into();
376 debug!("waiting until metric {metric_name} pass the predicate");
377 loop {
378 let res = self.assert_with(&metric_name, &predicate).await;
379 match res {
380 Ok(res) => {
381 if res {
382 return Ok(());
383 }
384 },
385 Err(e) => match e.downcast::<reqwest::Error>() {
386 Ok(io_err) => {
387 if !skip_err_while_waiting(&io_err) {
388 return Err(io_err.into());
389 }
390 },
391 Err(other) => {
392 match other.downcast::<NetworkNodeError>() {
393 Ok(node_err) => {
394 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
395 return Err(node_err.into());
396 }
397 },
398 Err(other) => return Err(other),
399 };
400 },
401 },
402 }
403
404 tokio::time::sleep(Duration::from_secs(1)).await;
406 }
407 }
408
409 pub async fn wait_metric_with_timeout(
412 &self,
413 metric_name: impl Into<String>,
414 predicate: impl Fn(f64) -> bool,
415 timeout_secs: impl Into<u64>,
416 ) -> Result<(), anyhow::Error> {
417 let metric_name = metric_name.into();
418 let secs = timeout_secs.into();
419 debug!("waiting until metric {metric_name} pass the predicate");
420 let res = tokio::time::timeout(
421 Duration::from_secs(secs),
422 self.wait_metric(&metric_name, predicate),
423 )
424 .await;
425
426 if let Ok(inner_res) = res {
427 match inner_res {
428 Ok(_) => Ok(()),
429 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
430 }
431 } else {
432 Err(anyhow!(
434 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
435 ))
436 }
437 }
438
439 pub async fn logs(&self) -> Result<String, anyhow::Error> {
444 Ok(self.inner.logs().await?)
445 }
446
447 pub async fn wait_log_line_count(
449 &self,
450 pattern: impl Into<String>,
451 is_glob: bool,
452 count: usize,
453 ) -> Result<(), anyhow::Error> {
454 let pattern = pattern.into();
455 let pattern_clone = pattern.clone();
456 debug!("waiting until we find pattern {pattern} {count} times");
457 let match_fn: BoxedClosure = if is_glob {
458 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
459 } else {
460 let re = Regex::new(&pattern)?;
461 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
462 };
463
464 loop {
465 let mut q = 0_usize;
466 let logs = self.logs().await?;
467 for line in logs.lines() {
468 trace!("line is {line}");
469 if match_fn(line)? {
470 trace!("pattern {pattern_clone} match in line {line}");
471 q += 1;
472 if q >= count {
473 return Ok(());
474 }
475 }
476 }
477
478 tokio::time::sleep(Duration::from_secs(2)).await;
479 }
480 }
481
482 pub async fn wait_log_line_count_with_timeout(
526 &self,
527 substring: impl Into<String>,
528 is_glob: bool,
529 options: LogLineCountOptions,
530 ) -> Result<LogLineCount, anyhow::Error> {
531 let substring = substring.into();
532 debug!(
533 "waiting until match lines count within {} seconds",
534 options.timeout.as_secs_f64()
535 );
536
537 let start = tokio::time::Instant::now();
538
539 let match_fn: BoxedClosure = if is_glob {
540 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
541 } else {
542 let re = Regex::new(&substring)?;
543 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
544 };
545
546 if options.wait_until_timeout_elapses {
547 tokio::time::sleep(options.timeout).await;
548 }
549
550 let mut q;
551 loop {
552 q = 0_u32;
553 let logs = self.logs().await?;
554 for line in logs.lines() {
555 if match_fn(line)? {
556 q += 1;
557
558 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
563 return Ok(LogLineCount::TargetReached(q));
564 }
565 }
566 }
567
568 if start.elapsed() >= options.timeout {
569 break;
570 }
571
572 tokio::time::sleep(Duration::from_secs(2)).await;
573 }
574
575 if (options.predicate)(q) {
576 Ok(LogLineCount::TargetReached(q))
577 } else {
578 Ok(LogLineCount::TargetFailed(q))
579 }
580 }
581
582 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
583 let response = reqwest::get(&self.prometheus_uri).await?;
584 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
585 let mut cache = self.metrics_cache.write().await;
586 *cache = metrics;
587 Ok(())
588 }
589
590 async fn metric(
592 &self,
593 metric_name: &str,
594 treat_not_found_as_zero: bool,
595 ) -> Result<f64, anyhow::Error> {
596 let mut metrics_map = self.metrics_cache.read().await;
597 if metrics_map.is_empty() {
598 drop(metrics_map);
600 self.fetch_metrics().await?;
601 metrics_map = self.metrics_cache.read().await;
602 }
603
604 if let Some(val) = metrics_map.get(metric_name) {
605 Ok(*val)
606 } else if treat_not_found_as_zero {
607 Ok(0_f64)
608 } else {
609 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
610 }
611 }
612
613 pub async fn get_histogram_buckets(
633 &self,
634 metric_name: impl AsRef<str>,
635 label_filters: Option<HashMap<String, String>>,
636 ) -> Result<HashMap<String, u64>, anyhow::Error> {
637 let metric_name = metric_name.as_ref();
638
639 let response = reqwest::get(&self.prometheus_uri).await?;
641 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
642
643 let resolved_metric_name = if metric_name.contains("_bucket") {
645 metric_name.to_string()
646 } else {
647 format!("{}_bucket", metric_name)
648 };
649
650 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
654
655 for (key, &value) in metrics.iter() {
656 if !key.starts_with(&resolved_metric_name) {
657 continue;
658 }
659
660 let remaining = &key[resolved_metric_name.len()..];
661
662 let labels_str = &remaining[1..remaining.len() - 1];
663 let parsed_labels = Self::parse_label_string(labels_str);
664
665 if !parsed_labels.contains_key("le") {
667 continue;
668 }
669
670 if let Some(ref filters) = label_filters {
672 let mut all_match = true;
673 for (filter_key, filter_value) in filters {
674 if parsed_labels.get(filter_key) != Some(filter_value) {
675 all_match = false;
676 break;
677 }
678 }
679 if !all_match {
680 continue;
681 }
682 }
683
684 metric_entries.push((key.clone(), parsed_labels, value as u64));
685 }
686
687 let max_label_count = metric_entries
690 .iter()
691 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
692 .max()
693 .unwrap_or(0);
694
695 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
697 let mut seen_le_values = HashSet::new();
698 let mut active_series: Option<Vec<(String, String)>> = None;
699
700 for (_, parsed_labels, value) in metric_entries {
701 let le_value = parsed_labels.get("le").unwrap().clone();
702
703 let mut non_le_labels: Vec<(String, String)> = parsed_labels
705 .iter()
706 .filter(|(k, _)| k.as_str() != "le")
707 .map(|(k, v)| (k.clone(), v.clone()))
708 .collect();
709 non_le_labels.sort();
710
711 if non_le_labels.len() < max_label_count {
714 continue;
715 }
716
717 if let Some(ref prev_series) = active_series {
719 if prev_series != &non_le_labels {
720 if !raw_buckets.is_empty() {
721 break; }
723 active_series = Some(non_le_labels.clone());
724 seen_le_values.clear();
725 }
726 } else {
727 active_series = Some(non_le_labels.clone());
728 }
729
730 if !seen_le_values.insert(le_value.clone()) {
732 continue;
733 }
734
735 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
736 raw_buckets.push((le_value, value));
737 }
738
739 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
741
742 let mut buckets = HashMap::new();
744 let mut previous_value = 0_u64;
745 for (le, cumulative_count) in raw_buckets {
746 if cumulative_count < previous_value {
747 warn!(
748 "Warning: bucket count decreased from {} to {} at le={}",
749 previous_value, cumulative_count, le
750 );
751 }
752 let delta = cumulative_count.saturating_sub(previous_value);
753 buckets.insert(le, delta);
754 previous_value = cumulative_count;
755 }
756
757 Ok(buckets)
758 }
759
760 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
766 let mut labels = HashMap::new();
767 let mut current_key = String::new();
768 let mut current_value = String::new();
769 let mut in_value = false;
770 let mut in_quotes = false;
771
772 for ch in labels_str.chars() {
773 match ch {
774 '=' if !in_quotes && !in_value => {
775 in_value = true;
776 },
777 '"' if in_value => {
778 in_quotes = !in_quotes;
779 },
780 ',' if !in_quotes => {
781 if !current_key.is_empty() {
783 labels.insert(
784 current_key.trim().to_string(),
785 current_value.trim().to_string(),
786 );
787 current_key.clear();
788 current_value.clear();
789 in_value = false;
790 }
791 },
792 _ => {
793 if in_value {
794 current_value.push(ch);
795 } else {
796 current_key.push(ch);
797 }
798 },
799 }
800 }
801
802 if !current_key.is_empty() {
804 labels.insert(
805 current_key.trim().to_string(),
806 current_value.trim().to_string(),
807 );
808 }
809
810 labels
811 }
812
813 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
817 use std::cmp::Ordering;
818
819 match (a, b) {
821 ("+Inf", "+Inf") => Ordering::Equal,
822 ("+Inf", _) => Ordering::Greater,
823 (_, "+Inf") => Ordering::Less,
824 _ => {
825 match (a.parse::<f64>(), b.parse::<f64>()) {
827 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
828 _ => a.cmp(b),
830 }
831 },
832 }
833 }
834
835 pub async fn wait_until_is_up(
847 &self,
848 timeout_secs: impl Into<u64>,
849 ) -> Result<(), anyhow::Error> {
850 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
851 .await
852 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
853 }
854}
855
856impl std::fmt::Debug for NetworkNode {
857 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
858 f.debug_struct("NetworkNode")
859 .field("inner", &"inner_skipped")
860 .field("spec", &self.spec)
861 .field("name", &self.name)
862 .field("ws_uri", &self.ws_uri)
863 .field("prometheus_uri", &self.prometheus_uri)
864 .finish()
865 }
866}
867
868fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
869where
870 S: Serializer,
871{
872 erased_serde::serialize(node.as_ref(), serializer)
873}
874
875#[cfg(test)]
877mod tests {
878 use std::{
879 path::{Path, PathBuf},
880 sync::{Arc, Mutex},
881 };
882
883 use async_trait::async_trait;
884 use provider::{types::*, ProviderError, ProviderNode};
885
886 use super::*;
887
888 #[derive(Serialize)]
889 struct MockNode {
890 logs: Arc<Mutex<Vec<String>>>,
891 }
892
893 impl MockNode {
894 fn new() -> Self {
895 Self {
896 logs: Arc::new(Mutex::new(vec![])),
897 }
898 }
899
900 fn logs_push(&self, lines: Vec<impl Into<String>>) {
901 self.logs
902 .lock()
903 .unwrap()
904 .extend(lines.into_iter().map(|l| l.into()));
905 }
906 }
907
908 #[async_trait]
909 impl ProviderNode for MockNode {
910 fn name(&self) -> &str {
911 todo!()
912 }
913
914 fn args(&self) -> Vec<&str> {
915 todo!()
916 }
917
918 fn base_dir(&self) -> &PathBuf {
919 todo!()
920 }
921
922 fn config_dir(&self) -> &PathBuf {
923 todo!()
924 }
925
926 fn data_dir(&self) -> &PathBuf {
927 todo!()
928 }
929
930 fn relay_data_dir(&self) -> &PathBuf {
931 todo!()
932 }
933
934 fn scripts_dir(&self) -> &PathBuf {
935 todo!()
936 }
937
938 fn log_path(&self) -> &PathBuf {
939 todo!()
940 }
941
942 fn log_cmd(&self) -> String {
943 todo!()
944 }
945
946 fn path_in_node(&self, _file: &Path) -> PathBuf {
947 todo!()
948 }
949
950 async fn logs(&self) -> Result<String, ProviderError> {
951 Ok(self.logs.lock().unwrap().join("\n"))
952 }
953
954 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
955 todo!()
956 }
957
958 async fn run_command(
959 &self,
960 _options: RunCommandOptions,
961 ) -> Result<ExecutionResult, ProviderError> {
962 todo!()
963 }
964
965 async fn run_script(
966 &self,
967 _options: RunScriptOptions,
968 ) -> Result<ExecutionResult, ProviderError> {
969 todo!()
970 }
971
972 async fn send_file(
973 &self,
974 _local_file_path: &Path,
975 _remote_file_path: &Path,
976 _mode: &str,
977 ) -> Result<(), ProviderError> {
978 todo!()
979 }
980
981 async fn receive_file(
982 &self,
983 _remote_file_path: &Path,
984 _local_file_path: &Path,
985 ) -> Result<(), ProviderError> {
986 todo!()
987 }
988
989 async fn pause(&self) -> Result<(), ProviderError> {
990 todo!()
991 }
992
993 async fn resume(&self) -> Result<(), ProviderError> {
994 todo!()
995 }
996
997 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
998 todo!()
999 }
1000
1001 async fn destroy(&self) -> Result<(), ProviderError> {
1002 todo!()
1003 }
1004 }
1005
1006 #[tokio::test(flavor = "multi_thread")]
1007 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1008 let mock_provider = Arc::new(MockNode::new());
1009 let mock_node = NetworkNode::new(
1010 "node1",
1011 "ws_uri",
1012 "prometheus_uri",
1013 "multiaddr",
1014 NodeSpec::default(),
1015 mock_provider.clone(),
1016 );
1017
1018 mock_provider.logs_push(vec![
1019 "system booting",
1020 "stub line 1",
1021 "stub line 2",
1022 "system ready",
1023 ]);
1024
1025 let options = LogLineCountOptions {
1027 predicate: Arc::new(|n| n == 1),
1028 timeout: Duration::from_secs(10),
1029 wait_until_timeout_elapses: false,
1030 };
1031
1032 let log_line_count = mock_node
1033 .wait_log_line_count_with_timeout("system ready", false, options)
1034 .await?;
1035
1036 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1037
1038 Ok(())
1039 }
1040
1041 #[tokio::test(flavor = "multi_thread")]
1042 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1043 let mock_provider = Arc::new(MockNode::new());
1044 let mock_node = NetworkNode::new(
1045 "node1",
1046 "ws_uri",
1047 "prometheus_uri",
1048 "multiaddr",
1049 NodeSpec::default(),
1050 mock_provider.clone(),
1051 );
1052
1053 mock_provider.logs_push(vec![
1054 "system booting",
1055 "stub line 1",
1056 "stub line 2",
1057 "system ready",
1058 ]);
1059
1060 let options = LogLineCountOptions {
1062 predicate: Arc::new(|n| n == 2),
1063 timeout: Duration::from_secs(4),
1064 wait_until_timeout_elapses: false,
1065 };
1066
1067 let task = tokio::spawn({
1068 async move {
1069 mock_node
1070 .wait_log_line_count_with_timeout("system ready", false, options)
1071 .await
1072 .unwrap()
1073 }
1074 });
1075
1076 tokio::time::sleep(Duration::from_secs(2)).await;
1077
1078 mock_provider.logs_push(vec!["system ready"]);
1079
1080 let log_line_count = task.await?;
1081
1082 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1083
1084 Ok(())
1085 }
1086
1087 #[tokio::test(flavor = "multi_thread")]
1088 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1089 let mock_provider = Arc::new(MockNode::new());
1090 let mock_node = NetworkNode::new(
1091 "node1",
1092 "ws_uri",
1093 "prometheus_uri",
1094 "multiaddr",
1095 NodeSpec::default(),
1096 mock_provider.clone(),
1097 );
1098
1099 mock_provider.logs_push(vec![
1100 "system booting",
1101 "stub line 1",
1102 "stub line 2",
1103 "system ready",
1104 ]);
1105
1106 let options = LogLineCountOptions {
1108 predicate: Arc::new(|n| n == 2),
1109 timeout: Duration::from_secs(2),
1110 wait_until_timeout_elapses: false,
1111 };
1112
1113 let log_line_count = mock_node
1114 .wait_log_line_count_with_timeout("system ready", false, options)
1115 .await?;
1116
1117 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1118
1119 Ok(())
1120 }
1121
1122 #[tokio::test(flavor = "multi_thread")]
1123 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1124 let mock_provider = Arc::new(MockNode::new());
1125 let mock_node = NetworkNode::new(
1126 "node1",
1127 "ws_uri",
1128 "prometheus_uri",
1129 "multiaddr",
1130 NodeSpec::default(),
1131 mock_provider.clone(),
1132 );
1133
1134 mock_provider.logs_push(vec![
1135 "system booting",
1136 "stub line 1",
1137 "stub line 2",
1138 "system ready",
1139 ]);
1140
1141 let options = LogLineCountOptions {
1143 predicate: Arc::new(|n| n == 2),
1144 timeout: Duration::from_secs(2),
1145 wait_until_timeout_elapses: true,
1146 };
1147
1148 let task = tokio::spawn({
1149 async move {
1150 mock_node
1151 .wait_log_line_count_with_timeout("system ready", false, options)
1152 .await
1153 .unwrap()
1154 }
1155 });
1156
1157 tokio::time::sleep(Duration::from_secs(1)).await;
1158
1159 mock_provider.logs_push(vec!["system ready"]);
1160 mock_provider.logs_push(vec!["system ready"]);
1161
1162 let log_line_count = task.await?;
1163
1164 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test(flavor = "multi_thread")]
1170 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1171 let mock_provider = Arc::new(MockNode::new());
1172 let mock_node = NetworkNode::new(
1173 "node1",
1174 "ws_uri",
1175 "prometheus_uri",
1176 "multiaddr",
1177 NodeSpec::default(),
1178 mock_provider.clone(),
1179 );
1180
1181 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1182
1183 let task = tokio::spawn({
1184 async move {
1185 mock_node
1186 .wait_log_line_count_with_timeout(
1187 "system ready",
1188 false,
1189 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1191 )
1192 .await
1193 .unwrap()
1194 }
1195 });
1196
1197 tokio::time::sleep(Duration::from_secs(1)).await;
1198
1199 mock_provider.logs_push(vec!["stub line 3"]);
1200
1201 assert!(task.await?.success());
1202
1203 Ok(())
1204 }
1205
1206 #[tokio::test(flavor = "multi_thread")]
1207 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1208 let mock_provider = Arc::new(MockNode::new());
1209 let mock_node = NetworkNode::new(
1210 "node1",
1211 "ws_uri",
1212 "prometheus_uri",
1213 "multiaddr",
1214 NodeSpec::default(),
1215 mock_provider.clone(),
1216 );
1217
1218 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1219
1220 let options = LogLineCountOptions {
1222 predicate: Arc::new(|n| (2..=5).contains(&n)),
1223 timeout: Duration::from_secs(2),
1224 wait_until_timeout_elapses: true,
1225 };
1226
1227 let task = tokio::spawn({
1228 async move {
1229 mock_node
1230 .wait_log_line_count_with_timeout("system ready", false, options)
1231 .await
1232 .unwrap()
1233 }
1234 });
1235
1236 tokio::time::sleep(Duration::from_secs(1)).await;
1237
1238 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1239
1240 assert!(task.await?.success());
1241
1242 Ok(())
1243 }
1244
1245 #[tokio::test(flavor = "multi_thread")]
1246 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1247 let mock_provider = Arc::new(MockNode::new());
1248 let mock_node = NetworkNode::new(
1249 "node1",
1250 "ws_uri",
1251 "prometheus_uri",
1252 "multiaddr",
1253 NodeSpec::default(),
1254 mock_provider.clone(),
1255 );
1256
1257 mock_provider.logs_push(vec![
1258 "system booting",
1259 "stub line 1",
1260 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1262 "stub line 2"
1263 ]);
1264
1265 let options = LogLineCountOptions {
1266 predicate: Arc::new(|n| n == 1),
1267 timeout: Duration::from_secs(3),
1268 wait_until_timeout_elapses: true,
1269 };
1270
1271 let task = tokio::spawn({
1272 async move {
1273 mock_node
1274 .wait_log_line_count_with_timeout(
1275 "error(?! importing block .*: block has an unknown parent)",
1276 false,
1277 options,
1278 )
1279 .await
1280 .unwrap()
1281 }
1282 });
1283
1284 tokio::time::sleep(Duration::from_secs(1)).await;
1285
1286 mock_provider.logs_push(vec![
1287 "system ready",
1288 "system error",
1290 "system ready",
1291 ]);
1292
1293 assert!(task.await?.success());
1294
1295 Ok(())
1296 }
1297
1298 #[tokio::test(flavor = "multi_thread")]
1299 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1300 ) -> Result<(), anyhow::Error> {
1301 let mock_provider = Arc::new(MockNode::new());
1302 let mock_node = NetworkNode::new(
1303 "node1",
1304 "ws_uri",
1305 "prometheus_uri",
1306 "multiaddr",
1307 NodeSpec::default(),
1308 mock_provider.clone(),
1309 );
1310
1311 mock_provider.logs_push(vec![
1312 "system booting",
1313 "stub line 1",
1314 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1316 "stub line 2"
1317 ]);
1318
1319 let options = LogLineCountOptions {
1320 predicate: Arc::new(|n| n == 1),
1321 timeout: Duration::from_secs(6),
1322 wait_until_timeout_elapses: true,
1323 };
1324
1325 let task = tokio::spawn({
1326 async move {
1327 mock_node
1328 .wait_log_line_count_with_timeout(
1329 "error(?! importing block .*: block has an unknown parent)",
1330 false,
1331 options,
1332 )
1333 .await
1334 .unwrap()
1335 }
1336 });
1337
1338 tokio::time::sleep(Duration::from_secs(1)).await;
1339
1340 mock_provider.logs_push(vec!["system ready", "system ready"]);
1341
1342 assert!(!task.await?.success());
1343
1344 Ok(())
1345 }
1346
1347 #[tokio::test(flavor = "multi_thread")]
1348 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1349 let mock_provider = Arc::new(MockNode::new());
1350 let mock_node = NetworkNode::new(
1351 "node1",
1352 "ws_uri",
1353 "prometheus_uri",
1354 "multiaddr",
1355 NodeSpec::default(),
1356 mock_provider.clone(),
1357 );
1358
1359 mock_provider.logs_push(vec![
1360 "system booting",
1361 "stub line 1",
1362 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1364 "stub line 2"
1365 ]);
1366
1367 let task = tokio::spawn({
1368 async move {
1369 mock_node
1370 .wait_log_line_count(
1371 "error(?! importing block .*: block has an unknown parent)",
1372 false,
1373 1,
1374 )
1375 .await
1376 .unwrap()
1377 }
1378 });
1379
1380 tokio::time::sleep(Duration::from_secs(1)).await;
1381
1382 mock_provider.logs_push(vec![
1383 "system ready",
1384 "system error",
1386 "system ready",
1387 ]);
1388
1389 assert!(task.await.is_ok());
1390
1391 Ok(())
1392 }
1393
1394 #[tokio::test(flavor = "multi_thread")]
1395 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1396 let mock_provider = Arc::new(MockNode::new());
1397 let mock_node = NetworkNode::new(
1398 "node1",
1399 "ws_uri",
1400 "prometheus_uri",
1401 "multiaddr",
1402 NodeSpec::default(),
1403 mock_provider.clone(),
1404 );
1405
1406 mock_provider.logs_push(vec![
1407 "system booting",
1408 "stub line 1",
1409 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1411 "stub line 2"
1412 ]);
1413
1414 let options = LogLineCountOptions {
1415 predicate: Arc::new(|count| count == 1),
1416 timeout: Duration::from_secs(2),
1417 wait_until_timeout_elapses: true,
1418 };
1419
1420 let task = tokio::spawn({
1421 async move {
1422 mock_node
1424 .wait_log_line_count_with_timeout(
1425 "error(?! importing block .*: block has an unknown parent)",
1426 false,
1427 options,
1428 )
1429 .await
1430 .unwrap()
1431 }
1432 });
1433
1434 tokio::time::sleep(Duration::from_secs(1)).await;
1435
1436 mock_provider.logs_push(vec!["system ready", "system ready"]);
1437
1438 assert!(!task.await?.success());
1439
1440 Ok(())
1441 }
1442
1443 #[tokio::test]
1444 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1445 use std::sync::Arc;
1447
1448 let mock_metrics = concat!(
1450 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1451 "# TYPE substrate_block_verification_time histogram\n",
1452 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1453 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1454 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1455 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1456 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1457 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1458 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1459 );
1460
1461 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1463 let addr = listener.local_addr()?;
1464 let metrics = Arc::new(mock_metrics.to_string());
1465
1466 tokio::spawn({
1467 let metrics = metrics.clone();
1468 async move {
1469 loop {
1470 if let Ok((mut socket, _)) = listener.accept().await {
1471 let metrics = metrics.clone();
1472 tokio::spawn(async move {
1473 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1474 let mut buffer = [0; 1024];
1475 let _ = socket.read(&mut buffer).await;
1476
1477 let response = format!(
1478 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1479 metrics.len(),
1480 metrics
1481 );
1482 let _ = socket.write_all(response.as_bytes()).await;
1483 });
1484 }
1485 }
1486 }
1487 });
1488
1489 let mock_provider = Arc::new(MockNode::new());
1491 let mock_node = NetworkNode::new(
1492 "test_node",
1493 "ws://localhost:9944",
1494 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1495 "/ip4/127.0.0.1/tcp/30333",
1496 NodeSpec::default(),
1497 mock_provider,
1498 );
1499
1500 let mut label_filters = HashMap::new();
1502 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1503 let buckets = mock_node
1504 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1505 .await?;
1506
1507 assert_eq!(buckets.get("0.1"), Some(&10));
1509 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1516 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1517
1518 let buckets_filtered = mock_node
1519 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1520 .await?;
1521
1522 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1523 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1524
1525 let buckets_with_suffix = mock_node
1527 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1528 .await?;
1529
1530 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1531
1532 Ok(())
1533 }
1534
1535 #[tokio::test]
1536 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1537 use std::sync::Arc;
1539
1540 let mock_metrics = concat!(
1541 "# HELP test_metric A test metric\n",
1542 "# TYPE test_metric histogram\n",
1543 "test_metric_bucket{le=\"2.5\"} 40\n",
1544 "test_metric_bucket{le=\"0.1\"} 10\n",
1545 "test_metric_bucket{le=\"+Inf\"} 42\n",
1546 "test_metric_bucket{le=\"1.0\"} 35\n",
1547 "test_metric_bucket{le=\"0.5\"} 25\n",
1548 );
1549
1550 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1551 let addr = listener.local_addr()?;
1552 let metrics = Arc::new(mock_metrics.to_string());
1553
1554 tokio::spawn({
1555 let metrics = metrics.clone();
1556 async move {
1557 loop {
1558 if let Ok((mut socket, _)) = listener.accept().await {
1559 let metrics = metrics.clone();
1560 tokio::spawn(async move {
1561 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1562 let mut buffer = [0; 1024];
1563 let _ = socket.read(&mut buffer).await;
1564 let response = format!(
1565 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1566 metrics.len(),
1567 metrics
1568 );
1569 let _ = socket.write_all(response.as_bytes()).await;
1570 });
1571 }
1572 }
1573 }
1574 });
1575
1576 let mock_provider = Arc::new(MockNode::new());
1577 let mock_node = NetworkNode::new(
1578 "test_node",
1579 "ws://localhost:9944",
1580 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1581 "/ip4/127.0.0.1/tcp/30333",
1582 NodeSpec::default(),
1583 mock_provider,
1584 );
1585
1586 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1587
1588 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1596 }
1597
1598 #[tokio::test]
1599 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1600 use std::sync::Arc;
1602
1603 let mock_metrics = concat!(
1604 "# HELP test_metric A test metric\n",
1605 "# TYPE test_metric histogram\n",
1606 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1607 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1608 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1609 );
1610
1611 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1612 let addr = listener.local_addr()?;
1613 let metrics = Arc::new(mock_metrics.to_string());
1614
1615 tokio::spawn({
1616 let metrics = metrics.clone();
1617 async move {
1618 loop {
1619 if let Ok((mut socket, _)) = listener.accept().await {
1620 let metrics = metrics.clone();
1621 tokio::spawn(async move {
1622 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1623 let mut buffer = [0; 1024];
1624 let _ = socket.read(&mut buffer).await;
1625 let response = format!(
1626 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1627 metrics.len(),
1628 metrics
1629 );
1630 let _ = socket.write_all(response.as_bytes()).await;
1631 });
1632 }
1633 }
1634 }
1635 });
1636
1637 let mock_provider = Arc::new(MockNode::new());
1638 let mock_node = NetworkNode::new(
1639 "test_node",
1640 "ws://localhost:9944",
1641 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1642 "/ip4/127.0.0.1/tcp/30333",
1643 NodeSpec::default(),
1644 mock_provider,
1645 );
1646
1647 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1649 assert_eq!(buckets.get("0.1"), Some(&5));
1650 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1655 label_filters.insert("method".to_string(), "GET,POST".to_string());
1656
1657 let buckets_filtered = mock_node
1658 .get_histogram_buckets("test_metric", Some(label_filters))
1659 .await?;
1660
1661 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1662 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1663
1664 Ok(())
1665 }
1666
1667 #[test]
1668 fn test_compare_le_values() {
1669 use std::cmp::Ordering;
1670
1671 use crate::network::node::NetworkNode;
1672
1673 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1675 assert_eq!(
1676 NetworkNode::compare_le_values("1.0", "0.5"),
1677 Ordering::Greater
1678 );
1679 assert_eq!(
1680 NetworkNode::compare_le_values("1.0", "1.0"),
1681 Ordering::Equal
1682 );
1683
1684 assert_eq!(
1686 NetworkNode::compare_le_values("+Inf", "999"),
1687 Ordering::Greater
1688 );
1689 assert_eq!(
1690 NetworkNode::compare_le_values("0.1", "+Inf"),
1691 Ordering::Less
1692 );
1693 assert_eq!(
1694 NetworkNode::compare_le_values("+Inf", "+Inf"),
1695 Ordering::Equal
1696 );
1697
1698 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1700 assert_eq!(
1701 NetworkNode::compare_le_values("1000", "999"),
1702 Ordering::Greater
1703 );
1704 }
1705}