1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15 types::{ExecutionResult, RunScriptOptions},
16 DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31 #[error("metric '{0}' not found!")]
32 MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37 #[serde(serialize_with = "serialize_provider_node")]
38 pub(crate) inner: DynNode,
39 pub(crate) spec: NodeSpec,
42 pub(crate) name: String,
43 pub(crate) ws_uri: String,
44 pub(crate) multiaddr: String,
45 pub(crate) prometheus_uri: String,
46 #[serde(skip)]
47 metrics_cache: Arc<RwLock<MetricMap>>,
48 #[serde(skip)]
49 is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54 pub(crate) name: String,
55 pub(crate) ws_uri: String,
56 pub(crate) prometheus_uri: String,
57 pub(crate) multiaddr: String,
58 pub(crate) spec: NodeSpec,
59 pub(crate) inner: serde_json::Value,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum LogLineCount {
73 TargetReached(u32),
74 TargetFailed(u32),
75}
76
77impl LogLineCount {
78 pub fn success(&self) -> bool {
79 match self {
80 Self::TargetReached(..) => true,
81 Self::TargetFailed(..) => false,
82 }
83 }
84}
85
86#[derive(Clone)]
99pub struct LogLineCountOptions {
100 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
101 pub timeout: Duration,
102 pub wait_until_timeout_elapses: bool,
103}
104
105impl LogLineCountOptions {
106 pub fn new(
107 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
108 timeout: Duration,
109 wait_until_timeout_elapses: bool,
110 ) -> Self {
111 Self {
112 predicate: Arc::new(predicate),
113 timeout,
114 wait_until_timeout_elapses,
115 }
116 }
117
118 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
119 Self::new(|n| n == 0, timeout, true)
120 }
121
122 pub fn at_least_once(timeout: Duration) -> Self {
123 Self::new(|count| count >= 1, timeout, false)
124 }
125
126 pub fn exactly_once(timeout: Duration) -> Self {
127 Self::new(|count| count == 1, timeout, false)
128 }
129}
130
131impl NetworkNode {
144 pub(crate) fn new<T: Into<String>>(
146 name: T,
147 ws_uri: T,
148 prometheus_uri: T,
149 multiaddr: T,
150 spec: NodeSpec,
151 inner: DynNode,
152 ) -> Self {
153 Self {
154 name: name.into(),
155 ws_uri: ws_uri.into(),
156 prometheus_uri: prometheus_uri.into(),
157 inner,
158 spec,
159 multiaddr: multiaddr.into(),
160 metrics_cache: Arc::new(Default::default()),
161 is_running: Arc::new(AtomicBool::new(false)),
162 }
163 }
164
165 pub(crate) fn is_running(&self) -> bool {
166 self.is_running.load(Ordering::Acquire)
167 }
168
169 pub(crate) fn set_is_running(&self, is_running: bool) {
170 self.is_running.store(is_running, Ordering::Release);
171 }
172
173 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
174 self.multiaddr = multiaddr.into();
175 }
176
177 pub fn name(&self) -> &str {
178 &self.name
179 }
180
181 pub fn args(&self) -> Vec<&str> {
182 self.inner.args()
183 }
184
185 pub fn spec(&self) -> &NodeSpec {
186 &self.spec
187 }
188
189 pub fn ws_uri(&self) -> &str {
190 &self.ws_uri
191 }
192
193 pub fn multiaddr(&self) -> &str {
194 self.multiaddr.as_ref()
195 }
196
197 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
201 get_client_from_url(&self.ws_uri).await
202 }
203
204 #[deprecated = "Use `wait_client` instead."]
206 pub async fn client<Config: subxt::Config>(
207 &self,
208 ) -> Result<OnlineClient<Config>, subxt::Error> {
209 self.try_client().await
210 }
211
212 pub async fn try_client<Config: subxt::Config>(
221 &self,
222 ) -> Result<OnlineClient<Config>, subxt::Error> {
223 get_client_from_url(&self.ws_uri).await
224 }
225
226 pub async fn wait_client<Config: subxt::Config>(
228 &self,
229 ) -> Result<OnlineClient<Config>, anyhow::Error> {
230 debug!("wait_client ws_uri: {}", self.ws_uri());
231 wait_ws_ready(self.ws_uri())
232 .await
233 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
234
235 self.try_client()
236 .await
237 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
238 }
239
240 pub async fn wait_client_with_timeout<Config: subxt::Config>(
242 &self,
243 timeout_secs: impl Into<u64>,
244 ) -> Result<OnlineClient<Config>, anyhow::Error> {
245 debug!("waiting until subxt client is ready");
246 tokio::time::timeout(
247 Duration::from_secs(timeout_secs.into()),
248 self.wait_client::<Config>(),
249 )
250 .await?
251 }
252
253 pub async fn pause(&self) -> Result<(), anyhow::Error> {
261 self.set_is_running(false);
262 self.inner.pause().await?;
263 Ok(())
264 }
265
266 pub async fn resume(&self) -> Result<(), anyhow::Error> {
272 self.set_is_running(true);
273 self.inner.resume().await?;
274 Ok(())
275 }
276
277 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
282 self.set_is_running(false);
283 self.inner.restart(after).await?;
284 self.set_is_running(true);
285 Ok(())
286 }
287
288 pub async fn run_script(
295 &self,
296 options: RunScriptOptions,
297 ) -> Result<ExecutionResult, anyhow::Error> {
298 self.inner
299 .run_script(options)
300 .await
301 .map_err(|e| anyhow!("Failed to run script: {e}"))
302 }
303
304 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
312 let metric_name = metric_name.into();
313 self.fetch_metrics().await?;
315 self.metric(&metric_name, true).await
317 }
318
319 pub async fn assert(
328 &self,
329 metric_name: impl Into<String>,
330 value: impl Into<f64>,
331 ) -> Result<bool, anyhow::Error> {
332 let value: f64 = value.into();
333 self.assert_with(metric_name, |v| v == value).await
334 }
335
336 pub async fn assert_with(
339 &self,
340 metric_name: impl Into<String>,
341 predicate: impl Fn(f64) -> bool,
342 ) -> Result<bool, anyhow::Error> {
343 let metric_name = metric_name.into();
344 self.fetch_metrics().await?;
346 let val = self.metric(&metric_name, true).await?;
347 trace!("🔎 Current value {val} passed to the predicated?");
348 Ok(predicate(val))
349 }
350
351 pub async fn wait_metric(
355 &self,
356 metric_name: impl Into<String>,
357 predicate: impl Fn(f64) -> bool,
358 ) -> Result<(), anyhow::Error> {
359 let metric_name = metric_name.into();
360 debug!("waiting until metric {metric_name} pass the predicate");
361 loop {
362 let res = self.assert_with(&metric_name, &predicate).await;
363 match res {
364 Ok(res) => {
365 if res {
366 return Ok(());
367 }
368 },
369 Err(e) => match e.downcast::<reqwest::Error>() {
370 Ok(io_err) => {
371 if !skip_err_while_waiting(&io_err) {
372 return Err(io_err.into());
373 }
374 },
375 Err(other) => {
376 match other.downcast::<NetworkNodeError>() {
377 Ok(node_err) => {
378 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
379 return Err(node_err.into());
380 }
381 },
382 Err(other) => return Err(other),
383 };
384 },
385 },
386 }
387
388 tokio::time::sleep(Duration::from_secs(1)).await;
390 }
391 }
392
393 pub async fn wait_metric_with_timeout(
396 &self,
397 metric_name: impl Into<String>,
398 predicate: impl Fn(f64) -> bool,
399 timeout_secs: impl Into<u64>,
400 ) -> Result<(), anyhow::Error> {
401 let metric_name = metric_name.into();
402 let secs = timeout_secs.into();
403 debug!("waiting until metric {metric_name} pass the predicate");
404 let res = tokio::time::timeout(
405 Duration::from_secs(secs),
406 self.wait_metric(&metric_name, predicate),
407 )
408 .await;
409
410 if let Ok(inner_res) = res {
411 match inner_res {
412 Ok(_) => Ok(()),
413 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
414 }
415 } else {
416 Err(anyhow!(
418 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
419 ))
420 }
421 }
422
423 pub async fn logs(&self) -> Result<String, anyhow::Error> {
428 Ok(self.inner.logs().await?)
429 }
430
431 pub async fn wait_log_line_count(
433 &self,
434 pattern: impl Into<String>,
435 is_glob: bool,
436 count: usize,
437 ) -> Result<(), anyhow::Error> {
438 let pattern = pattern.into();
439 let pattern_clone = pattern.clone();
440 debug!("waiting until we find pattern {pattern} {count} times");
441 let match_fn: BoxedClosure = if is_glob {
442 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
443 } else {
444 let re = Regex::new(&pattern)?;
445 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
446 };
447
448 loop {
449 let mut q = 0_usize;
450 let logs = self.logs().await?;
451 for line in logs.lines() {
452 trace!("line is {line}");
453 if match_fn(line)? {
454 trace!("pattern {pattern_clone} match in line {line}");
455 q += 1;
456 if q >= count {
457 return Ok(());
458 }
459 }
460 }
461
462 tokio::time::sleep(Duration::from_secs(2)).await;
463 }
464 }
465
466 pub async fn wait_log_line_count_with_timeout(
510 &self,
511 substring: impl Into<String>,
512 is_glob: bool,
513 options: LogLineCountOptions,
514 ) -> Result<LogLineCount, anyhow::Error> {
515 let substring = substring.into();
516 debug!(
517 "waiting until match lines count within {} seconds",
518 options.timeout.as_secs_f64()
519 );
520
521 let start = tokio::time::Instant::now();
522
523 let match_fn: BoxedClosure = if is_glob {
524 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
525 } else {
526 let re = Regex::new(&substring)?;
527 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
528 };
529
530 if options.wait_until_timeout_elapses {
531 tokio::time::sleep(options.timeout).await;
532 }
533
534 let mut q;
535 loop {
536 q = 0_u32;
537 let logs = self.logs().await?;
538 for line in logs.lines() {
539 if match_fn(line)? {
540 q += 1;
541
542 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
547 return Ok(LogLineCount::TargetReached(q));
548 }
549 }
550 }
551
552 if start.elapsed() >= options.timeout {
553 break;
554 }
555
556 tokio::time::sleep(Duration::from_secs(2)).await;
557 }
558
559 if (options.predicate)(q) {
560 Ok(LogLineCount::TargetReached(q))
561 } else {
562 Ok(LogLineCount::TargetFailed(q))
563 }
564 }
565
566 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
567 let response = reqwest::get(&self.prometheus_uri).await?;
568 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
569 let mut cache = self.metrics_cache.write().await;
570 *cache = metrics;
571 Ok(())
572 }
573
574 async fn metric(
576 &self,
577 metric_name: &str,
578 treat_not_found_as_zero: bool,
579 ) -> Result<f64, anyhow::Error> {
580 let mut metrics_map = self.metrics_cache.read().await;
581 if metrics_map.is_empty() {
582 drop(metrics_map);
584 self.fetch_metrics().await?;
585 metrics_map = self.metrics_cache.read().await;
586 }
587
588 if let Some(val) = metrics_map.get(metric_name) {
589 Ok(*val)
590 } else if treat_not_found_as_zero {
591 Ok(0_f64)
592 } else {
593 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
594 }
595 }
596
597 pub async fn get_histogram_buckets(
617 &self,
618 metric_name: impl AsRef<str>,
619 label_filters: Option<HashMap<String, String>>,
620 ) -> Result<HashMap<String, u64>, anyhow::Error> {
621 let metric_name = metric_name.as_ref();
622
623 let response = reqwest::get(&self.prometheus_uri).await?;
625 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
626
627 let resolved_metric_name = if metric_name.contains("_bucket") {
629 metric_name.to_string()
630 } else {
631 format!("{}_bucket", metric_name)
632 };
633
634 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
638
639 for (key, &value) in metrics.iter() {
640 if !key.starts_with(&resolved_metric_name) {
641 continue;
642 }
643
644 let remaining = &key[resolved_metric_name.len()..];
645
646 let labels_str = &remaining[1..remaining.len() - 1];
647 let parsed_labels = Self::parse_label_string(labels_str);
648
649 if !parsed_labels.contains_key("le") {
651 continue;
652 }
653
654 if let Some(ref filters) = label_filters {
656 let mut all_match = true;
657 for (filter_key, filter_value) in filters {
658 if parsed_labels.get(filter_key) != Some(filter_value) {
659 all_match = false;
660 break;
661 }
662 }
663 if !all_match {
664 continue;
665 }
666 }
667
668 metric_entries.push((key.clone(), parsed_labels, value as u64));
669 }
670
671 let max_label_count = metric_entries
674 .iter()
675 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
676 .max()
677 .unwrap_or(0);
678
679 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
681 let mut seen_le_values = HashSet::new();
682 let mut active_series: Option<Vec<(String, String)>> = None;
683
684 for (_, parsed_labels, value) in metric_entries {
685 let le_value = parsed_labels.get("le").unwrap().clone();
686
687 let mut non_le_labels: Vec<(String, String)> = parsed_labels
689 .iter()
690 .filter(|(k, _)| k.as_str() != "le")
691 .map(|(k, v)| (k.clone(), v.clone()))
692 .collect();
693 non_le_labels.sort();
694
695 if non_le_labels.len() < max_label_count {
698 continue;
699 }
700
701 if let Some(ref prev_series) = active_series {
703 if prev_series != &non_le_labels {
704 if !raw_buckets.is_empty() {
705 break; }
707 active_series = Some(non_le_labels.clone());
708 seen_le_values.clear();
709 }
710 } else {
711 active_series = Some(non_le_labels.clone());
712 }
713
714 if !seen_le_values.insert(le_value.clone()) {
716 continue;
717 }
718
719 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
720 raw_buckets.push((le_value, value));
721 }
722
723 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
725
726 let mut buckets = HashMap::new();
728 let mut previous_value = 0_u64;
729 for (le, cumulative_count) in raw_buckets {
730 if cumulative_count < previous_value {
731 warn!(
732 "Warning: bucket count decreased from {} to {} at le={}",
733 previous_value, cumulative_count, le
734 );
735 }
736 let delta = cumulative_count.saturating_sub(previous_value);
737 buckets.insert(le, delta);
738 previous_value = cumulative_count;
739 }
740
741 Ok(buckets)
742 }
743
744 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
750 let mut labels = HashMap::new();
751 let mut current_key = String::new();
752 let mut current_value = String::new();
753 let mut in_value = false;
754 let mut in_quotes = false;
755
756 for ch in labels_str.chars() {
757 match ch {
758 '=' if !in_quotes && !in_value => {
759 in_value = true;
760 },
761 '"' if in_value => {
762 in_quotes = !in_quotes;
763 },
764 ',' if !in_quotes => {
765 if !current_key.is_empty() {
767 labels.insert(
768 current_key.trim().to_string(),
769 current_value.trim().to_string(),
770 );
771 current_key.clear();
772 current_value.clear();
773 in_value = false;
774 }
775 },
776 _ => {
777 if in_value {
778 current_value.push(ch);
779 } else {
780 current_key.push(ch);
781 }
782 },
783 }
784 }
785
786 if !current_key.is_empty() {
788 labels.insert(
789 current_key.trim().to_string(),
790 current_value.trim().to_string(),
791 );
792 }
793
794 labels
795 }
796
797 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
801 use std::cmp::Ordering;
802
803 match (a, b) {
805 ("+Inf", "+Inf") => Ordering::Equal,
806 ("+Inf", _) => Ordering::Greater,
807 (_, "+Inf") => Ordering::Less,
808 _ => {
809 match (a.parse::<f64>(), b.parse::<f64>()) {
811 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
812 _ => a.cmp(b),
814 }
815 },
816 }
817 }
818
819 pub async fn wait_until_is_up(
831 &self,
832 timeout_secs: impl Into<u64>,
833 ) -> Result<(), anyhow::Error> {
834 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
835 .await
836 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
837 }
838}
839
840impl std::fmt::Debug for NetworkNode {
841 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
842 f.debug_struct("NetworkNode")
843 .field("inner", &"inner_skipped")
844 .field("spec", &self.spec)
845 .field("name", &self.name)
846 .field("ws_uri", &self.ws_uri)
847 .field("prometheus_uri", &self.prometheus_uri)
848 .finish()
849 }
850}
851
852fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
853where
854 S: Serializer,
855{
856 erased_serde::serialize(node.as_ref(), serializer)
857}
858
859#[cfg(test)]
861mod tests {
862 use std::{
863 path::{Path, PathBuf},
864 sync::{Arc, Mutex},
865 };
866
867 use async_trait::async_trait;
868 use provider::{types::*, ProviderError, ProviderNode};
869
870 use super::*;
871
872 #[derive(Serialize)]
873 struct MockNode {
874 logs: Arc<Mutex<Vec<String>>>,
875 }
876
877 impl MockNode {
878 fn new() -> Self {
879 Self {
880 logs: Arc::new(Mutex::new(vec![])),
881 }
882 }
883
884 fn logs_push(&self, lines: Vec<impl Into<String>>) {
885 self.logs
886 .lock()
887 .unwrap()
888 .extend(lines.into_iter().map(|l| l.into()));
889 }
890 }
891
892 #[async_trait]
893 impl ProviderNode for MockNode {
894 fn name(&self) -> &str {
895 todo!()
896 }
897
898 fn args(&self) -> Vec<&str> {
899 todo!()
900 }
901
902 fn base_dir(&self) -> &PathBuf {
903 todo!()
904 }
905
906 fn config_dir(&self) -> &PathBuf {
907 todo!()
908 }
909
910 fn data_dir(&self) -> &PathBuf {
911 todo!()
912 }
913
914 fn relay_data_dir(&self) -> &PathBuf {
915 todo!()
916 }
917
918 fn scripts_dir(&self) -> &PathBuf {
919 todo!()
920 }
921
922 fn log_path(&self) -> &PathBuf {
923 todo!()
924 }
925
926 fn log_cmd(&self) -> String {
927 todo!()
928 }
929
930 fn path_in_node(&self, _file: &Path) -> PathBuf {
931 todo!()
932 }
933
934 async fn logs(&self) -> Result<String, ProviderError> {
935 Ok(self.logs.lock().unwrap().join("\n"))
936 }
937
938 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
939 todo!()
940 }
941
942 async fn run_command(
943 &self,
944 _options: RunCommandOptions,
945 ) -> Result<ExecutionResult, ProviderError> {
946 todo!()
947 }
948
949 async fn run_script(
950 &self,
951 _options: RunScriptOptions,
952 ) -> Result<ExecutionResult, ProviderError> {
953 todo!()
954 }
955
956 async fn send_file(
957 &self,
958 _local_file_path: &Path,
959 _remote_file_path: &Path,
960 _mode: &str,
961 ) -> Result<(), ProviderError> {
962 todo!()
963 }
964
965 async fn receive_file(
966 &self,
967 _remote_file_path: &Path,
968 _local_file_path: &Path,
969 ) -> Result<(), ProviderError> {
970 todo!()
971 }
972
973 async fn pause(&self) -> Result<(), ProviderError> {
974 todo!()
975 }
976
977 async fn resume(&self) -> Result<(), ProviderError> {
978 todo!()
979 }
980
981 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
982 todo!()
983 }
984
985 async fn destroy(&self) -> Result<(), ProviderError> {
986 todo!()
987 }
988 }
989
990 #[tokio::test(flavor = "multi_thread")]
991 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
992 let mock_provider = Arc::new(MockNode::new());
993 let mock_node = NetworkNode::new(
994 "node1",
995 "ws_uri",
996 "prometheus_uri",
997 "multiaddr",
998 NodeSpec::default(),
999 mock_provider.clone(),
1000 );
1001
1002 mock_provider.logs_push(vec![
1003 "system booting",
1004 "stub line 1",
1005 "stub line 2",
1006 "system ready",
1007 ]);
1008
1009 let options = LogLineCountOptions {
1011 predicate: Arc::new(|n| n == 1),
1012 timeout: Duration::from_secs(10),
1013 wait_until_timeout_elapses: false,
1014 };
1015
1016 let log_line_count = mock_node
1017 .wait_log_line_count_with_timeout("system ready", false, options)
1018 .await?;
1019
1020 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1021
1022 Ok(())
1023 }
1024
1025 #[tokio::test(flavor = "multi_thread")]
1026 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1027 let mock_provider = Arc::new(MockNode::new());
1028 let mock_node = NetworkNode::new(
1029 "node1",
1030 "ws_uri",
1031 "prometheus_uri",
1032 "multiaddr",
1033 NodeSpec::default(),
1034 mock_provider.clone(),
1035 );
1036
1037 mock_provider.logs_push(vec![
1038 "system booting",
1039 "stub line 1",
1040 "stub line 2",
1041 "system ready",
1042 ]);
1043
1044 let options = LogLineCountOptions {
1046 predicate: Arc::new(|n| n == 2),
1047 timeout: Duration::from_secs(4),
1048 wait_until_timeout_elapses: false,
1049 };
1050
1051 let task = tokio::spawn({
1052 async move {
1053 mock_node
1054 .wait_log_line_count_with_timeout("system ready", false, options)
1055 .await
1056 .unwrap()
1057 }
1058 });
1059
1060 tokio::time::sleep(Duration::from_secs(2)).await;
1061
1062 mock_provider.logs_push(vec!["system ready"]);
1063
1064 let log_line_count = task.await?;
1065
1066 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1067
1068 Ok(())
1069 }
1070
1071 #[tokio::test(flavor = "multi_thread")]
1072 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1073 let mock_provider = Arc::new(MockNode::new());
1074 let mock_node = NetworkNode::new(
1075 "node1",
1076 "ws_uri",
1077 "prometheus_uri",
1078 "multiaddr",
1079 NodeSpec::default(),
1080 mock_provider.clone(),
1081 );
1082
1083 mock_provider.logs_push(vec![
1084 "system booting",
1085 "stub line 1",
1086 "stub line 2",
1087 "system ready",
1088 ]);
1089
1090 let options = LogLineCountOptions {
1092 predicate: Arc::new(|n| n == 2),
1093 timeout: Duration::from_secs(2),
1094 wait_until_timeout_elapses: false,
1095 };
1096
1097 let log_line_count = mock_node
1098 .wait_log_line_count_with_timeout("system ready", false, options)
1099 .await?;
1100
1101 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1102
1103 Ok(())
1104 }
1105
1106 #[tokio::test(flavor = "multi_thread")]
1107 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1108 let mock_provider = Arc::new(MockNode::new());
1109 let mock_node = NetworkNode::new(
1110 "node1",
1111 "ws_uri",
1112 "prometheus_uri",
1113 "multiaddr",
1114 NodeSpec::default(),
1115 mock_provider.clone(),
1116 );
1117
1118 mock_provider.logs_push(vec![
1119 "system booting",
1120 "stub line 1",
1121 "stub line 2",
1122 "system ready",
1123 ]);
1124
1125 let options = LogLineCountOptions {
1127 predicate: Arc::new(|n| n == 2),
1128 timeout: Duration::from_secs(2),
1129 wait_until_timeout_elapses: true,
1130 };
1131
1132 let task = tokio::spawn({
1133 async move {
1134 mock_node
1135 .wait_log_line_count_with_timeout("system ready", false, options)
1136 .await
1137 .unwrap()
1138 }
1139 });
1140
1141 tokio::time::sleep(Duration::from_secs(1)).await;
1142
1143 mock_provider.logs_push(vec!["system ready"]);
1144 mock_provider.logs_push(vec!["system ready"]);
1145
1146 let log_line_count = task.await?;
1147
1148 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1149
1150 Ok(())
1151 }
1152
1153 #[tokio::test(flavor = "multi_thread")]
1154 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1155 let mock_provider = Arc::new(MockNode::new());
1156 let mock_node = NetworkNode::new(
1157 "node1",
1158 "ws_uri",
1159 "prometheus_uri",
1160 "multiaddr",
1161 NodeSpec::default(),
1162 mock_provider.clone(),
1163 );
1164
1165 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1166
1167 let task = tokio::spawn({
1168 async move {
1169 mock_node
1170 .wait_log_line_count_with_timeout(
1171 "system ready",
1172 false,
1173 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1175 )
1176 .await
1177 .unwrap()
1178 }
1179 });
1180
1181 tokio::time::sleep(Duration::from_secs(1)).await;
1182
1183 mock_provider.logs_push(vec!["stub line 3"]);
1184
1185 assert!(task.await?.success());
1186
1187 Ok(())
1188 }
1189
1190 #[tokio::test(flavor = "multi_thread")]
1191 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1192 let mock_provider = Arc::new(MockNode::new());
1193 let mock_node = NetworkNode::new(
1194 "node1",
1195 "ws_uri",
1196 "prometheus_uri",
1197 "multiaddr",
1198 NodeSpec::default(),
1199 mock_provider.clone(),
1200 );
1201
1202 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1203
1204 let options = LogLineCountOptions {
1206 predicate: Arc::new(|n| (2..=5).contains(&n)),
1207 timeout: Duration::from_secs(2),
1208 wait_until_timeout_elapses: true,
1209 };
1210
1211 let task = tokio::spawn({
1212 async move {
1213 mock_node
1214 .wait_log_line_count_with_timeout("system ready", false, options)
1215 .await
1216 .unwrap()
1217 }
1218 });
1219
1220 tokio::time::sleep(Duration::from_secs(1)).await;
1221
1222 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1223
1224 assert!(task.await?.success());
1225
1226 Ok(())
1227 }
1228
1229 #[tokio::test(flavor = "multi_thread")]
1230 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1231 let mock_provider = Arc::new(MockNode::new());
1232 let mock_node = NetworkNode::new(
1233 "node1",
1234 "ws_uri",
1235 "prometheus_uri",
1236 "multiaddr",
1237 NodeSpec::default(),
1238 mock_provider.clone(),
1239 );
1240
1241 mock_provider.logs_push(vec![
1242 "system booting",
1243 "stub line 1",
1244 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1246 "stub line 2"
1247 ]);
1248
1249 let options = LogLineCountOptions {
1250 predicate: Arc::new(|n| n == 1),
1251 timeout: Duration::from_secs(3),
1252 wait_until_timeout_elapses: true,
1253 };
1254
1255 let task = tokio::spawn({
1256 async move {
1257 mock_node
1258 .wait_log_line_count_with_timeout(
1259 "error(?! importing block .*: block has an unknown parent)",
1260 false,
1261 options,
1262 )
1263 .await
1264 .unwrap()
1265 }
1266 });
1267
1268 tokio::time::sleep(Duration::from_secs(1)).await;
1269
1270 mock_provider.logs_push(vec![
1271 "system ready",
1272 "system error",
1274 "system ready",
1275 ]);
1276
1277 assert!(task.await?.success());
1278
1279 Ok(())
1280 }
1281
1282 #[tokio::test(flavor = "multi_thread")]
1283 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1284 ) -> Result<(), anyhow::Error> {
1285 let mock_provider = Arc::new(MockNode::new());
1286 let mock_node = NetworkNode::new(
1287 "node1",
1288 "ws_uri",
1289 "prometheus_uri",
1290 "multiaddr",
1291 NodeSpec::default(),
1292 mock_provider.clone(),
1293 );
1294
1295 mock_provider.logs_push(vec![
1296 "system booting",
1297 "stub line 1",
1298 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1300 "stub line 2"
1301 ]);
1302
1303 let options = LogLineCountOptions {
1304 predicate: Arc::new(|n| n == 1),
1305 timeout: Duration::from_secs(6),
1306 wait_until_timeout_elapses: true,
1307 };
1308
1309 let task = tokio::spawn({
1310 async move {
1311 mock_node
1312 .wait_log_line_count_with_timeout(
1313 "error(?! importing block .*: block has an unknown parent)",
1314 false,
1315 options,
1316 )
1317 .await
1318 .unwrap()
1319 }
1320 });
1321
1322 tokio::time::sleep(Duration::from_secs(1)).await;
1323
1324 mock_provider.logs_push(vec!["system ready", "system ready"]);
1325
1326 assert!(!task.await?.success());
1327
1328 Ok(())
1329 }
1330
1331 #[tokio::test(flavor = "multi_thread")]
1332 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1333 let mock_provider = Arc::new(MockNode::new());
1334 let mock_node = NetworkNode::new(
1335 "node1",
1336 "ws_uri",
1337 "prometheus_uri",
1338 "multiaddr",
1339 NodeSpec::default(),
1340 mock_provider.clone(),
1341 );
1342
1343 mock_provider.logs_push(vec![
1344 "system booting",
1345 "stub line 1",
1346 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1348 "stub line 2"
1349 ]);
1350
1351 let task = tokio::spawn({
1352 async move {
1353 mock_node
1354 .wait_log_line_count(
1355 "error(?! importing block .*: block has an unknown parent)",
1356 false,
1357 1,
1358 )
1359 .await
1360 .unwrap()
1361 }
1362 });
1363
1364 tokio::time::sleep(Duration::from_secs(1)).await;
1365
1366 mock_provider.logs_push(vec![
1367 "system ready",
1368 "system error",
1370 "system ready",
1371 ]);
1372
1373 assert!(task.await.is_ok());
1374
1375 Ok(())
1376 }
1377
1378 #[tokio::test(flavor = "multi_thread")]
1379 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1380 let mock_provider = Arc::new(MockNode::new());
1381 let mock_node = NetworkNode::new(
1382 "node1",
1383 "ws_uri",
1384 "prometheus_uri",
1385 "multiaddr",
1386 NodeSpec::default(),
1387 mock_provider.clone(),
1388 );
1389
1390 mock_provider.logs_push(vec![
1391 "system booting",
1392 "stub line 1",
1393 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1395 "stub line 2"
1396 ]);
1397
1398 let options = LogLineCountOptions {
1399 predicate: Arc::new(|count| count == 1),
1400 timeout: Duration::from_secs(2),
1401 wait_until_timeout_elapses: true,
1402 };
1403
1404 let task = tokio::spawn({
1405 async move {
1406 mock_node
1408 .wait_log_line_count_with_timeout(
1409 "error(?! importing block .*: block has an unknown parent)",
1410 false,
1411 options,
1412 )
1413 .await
1414 .unwrap()
1415 }
1416 });
1417
1418 tokio::time::sleep(Duration::from_secs(1)).await;
1419
1420 mock_provider.logs_push(vec!["system ready", "system ready"]);
1421
1422 assert!(!task.await?.success());
1423
1424 Ok(())
1425 }
1426
1427 #[tokio::test]
1428 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1429 use std::sync::Arc;
1431
1432 let mock_metrics = concat!(
1434 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1435 "# TYPE substrate_block_verification_time histogram\n",
1436 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1437 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1438 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1439 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1440 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1441 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1442 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1443 );
1444
1445 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1447 let addr = listener.local_addr()?;
1448 let metrics = Arc::new(mock_metrics.to_string());
1449
1450 tokio::spawn({
1451 let metrics = metrics.clone();
1452 async move {
1453 loop {
1454 if let Ok((mut socket, _)) = listener.accept().await {
1455 let metrics = metrics.clone();
1456 tokio::spawn(async move {
1457 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1458 let mut buffer = [0; 1024];
1459 let _ = socket.read(&mut buffer).await;
1460
1461 let response = format!(
1462 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1463 metrics.len(),
1464 metrics
1465 );
1466 let _ = socket.write_all(response.as_bytes()).await;
1467 });
1468 }
1469 }
1470 }
1471 });
1472
1473 let mock_provider = Arc::new(MockNode::new());
1475 let mock_node = NetworkNode::new(
1476 "test_node",
1477 "ws://localhost:9944",
1478 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1479 "/ip4/127.0.0.1/tcp/30333",
1480 NodeSpec::default(),
1481 mock_provider,
1482 );
1483
1484 let mut label_filters = HashMap::new();
1486 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1487 let buckets = mock_node
1488 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1489 .await?;
1490
1491 assert_eq!(buckets.get("0.1"), Some(&10));
1493 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1500 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1501
1502 let buckets_filtered = mock_node
1503 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1504 .await?;
1505
1506 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1507 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1508
1509 let buckets_with_suffix = mock_node
1511 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1512 .await?;
1513
1514 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1515
1516 Ok(())
1517 }
1518
1519 #[tokio::test]
1520 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1521 use std::sync::Arc;
1523
1524 let mock_metrics = concat!(
1525 "# HELP test_metric A test metric\n",
1526 "# TYPE test_metric histogram\n",
1527 "test_metric_bucket{le=\"2.5\"} 40\n",
1528 "test_metric_bucket{le=\"0.1\"} 10\n",
1529 "test_metric_bucket{le=\"+Inf\"} 42\n",
1530 "test_metric_bucket{le=\"1.0\"} 35\n",
1531 "test_metric_bucket{le=\"0.5\"} 25\n",
1532 );
1533
1534 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1535 let addr = listener.local_addr()?;
1536 let metrics = Arc::new(mock_metrics.to_string());
1537
1538 tokio::spawn({
1539 let metrics = metrics.clone();
1540 async move {
1541 loop {
1542 if let Ok((mut socket, _)) = listener.accept().await {
1543 let metrics = metrics.clone();
1544 tokio::spawn(async move {
1545 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1546 let mut buffer = [0; 1024];
1547 let _ = socket.read(&mut buffer).await;
1548 let response = format!(
1549 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1550 metrics.len(),
1551 metrics
1552 );
1553 let _ = socket.write_all(response.as_bytes()).await;
1554 });
1555 }
1556 }
1557 }
1558 });
1559
1560 let mock_provider = Arc::new(MockNode::new());
1561 let mock_node = NetworkNode::new(
1562 "test_node",
1563 "ws://localhost:9944",
1564 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1565 "/ip4/127.0.0.1/tcp/30333",
1566 NodeSpec::default(),
1567 mock_provider,
1568 );
1569
1570 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1571
1572 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1580 }
1581
1582 #[tokio::test]
1583 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1584 use std::sync::Arc;
1586
1587 let mock_metrics = concat!(
1588 "# HELP test_metric A test metric\n",
1589 "# TYPE test_metric histogram\n",
1590 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1591 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1592 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1593 );
1594
1595 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1596 let addr = listener.local_addr()?;
1597 let metrics = Arc::new(mock_metrics.to_string());
1598
1599 tokio::spawn({
1600 let metrics = metrics.clone();
1601 async move {
1602 loop {
1603 if let Ok((mut socket, _)) = listener.accept().await {
1604 let metrics = metrics.clone();
1605 tokio::spawn(async move {
1606 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1607 let mut buffer = [0; 1024];
1608 let _ = socket.read(&mut buffer).await;
1609 let response = format!(
1610 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1611 metrics.len(),
1612 metrics
1613 );
1614 let _ = socket.write_all(response.as_bytes()).await;
1615 });
1616 }
1617 }
1618 }
1619 });
1620
1621 let mock_provider = Arc::new(MockNode::new());
1622 let mock_node = NetworkNode::new(
1623 "test_node",
1624 "ws://localhost:9944",
1625 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1626 "/ip4/127.0.0.1/tcp/30333",
1627 NodeSpec::default(),
1628 mock_provider,
1629 );
1630
1631 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1633 assert_eq!(buckets.get("0.1"), Some(&5));
1634 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1639 label_filters.insert("method".to_string(), "GET,POST".to_string());
1640
1641 let buckets_filtered = mock_node
1642 .get_histogram_buckets("test_metric", Some(label_filters))
1643 .await?;
1644
1645 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1646 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1647
1648 Ok(())
1649 }
1650
1651 #[test]
1652 fn test_compare_le_values() {
1653 use std::cmp::Ordering;
1654
1655 use crate::network::node::NetworkNode;
1656
1657 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1659 assert_eq!(
1660 NetworkNode::compare_le_values("1.0", "0.5"),
1661 Ordering::Greater
1662 );
1663 assert_eq!(
1664 NetworkNode::compare_le_values("1.0", "1.0"),
1665 Ordering::Equal
1666 );
1667
1668 assert_eq!(
1670 NetworkNode::compare_le_values("+Inf", "999"),
1671 Ordering::Greater
1672 );
1673 assert_eq!(
1674 NetworkNode::compare_le_values("0.1", "+Inf"),
1675 Ordering::Less
1676 );
1677 assert_eq!(
1678 NetworkNode::compare_le_values("+Inf", "+Inf"),
1679 Ordering::Equal
1680 );
1681
1682 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1684 assert_eq!(
1685 NetworkNode::compare_le_values("1000", "999"),
1686 Ordering::Greater
1687 );
1688 }
1689}