1use std::{
2 collections::{HashMap, HashSet},
3 path::{Path, PathBuf},
4 sync::{
5 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
6 Arc,
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::anyhow;
12use configuration::types::{Arg, AssetLocation};
13use fancy_regex::Regex;
14use glob_match::glob_match;
15use prom_metrics_parser::MetricMap;
16use provider::{
17 types::{ExecutionResult, InnerSnapshotDb, RunScriptOptions},
18 DynNode,
19};
20use serde::{Deserialize, Serialize, Serializer};
21use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
22use support::net::{skip_err_while_waiting, wait_ws_ready};
23use thiserror::Error;
24use tokio::sync::RwLock;
25use tracing::{debug, trace, warn};
26
27use crate::{
28 generators::{generate_node_command, generate_node_command_cumulus, GenCmdOptions},
29 network::NodeContext,
30 network_spec::node::NodeSpec,
31 shared::{constants::PROCESS_START_TIME_METRIC, types::NodeSnapshot},
32 tx_helper::client::get_client_from_url,
33};
34
35type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
36
37#[derive(Error, Debug)]
38pub enum NetworkNodeError {
39 #[error("metric '{0}' not found!")]
40 MetricNotFound(String),
41}
42
43const MONITOR_TARGET: &str = "zombie_monitor";
45#[derive(Clone, Serialize)]
46pub struct NetworkNode {
47 #[serde(serialize_with = "serialize_provider_node")]
48 pub(crate) inner: DynNode,
49 pub(crate) spec: NodeSpec,
52 pub(crate) name: String,
53 pub(crate) ws_uri: String,
54 pub(crate) multiaddr: String,
55 pub(crate) prometheus_uri: String,
56 pub(crate) cmd_generator_opts: GenCmdOptions,
60 pub(crate) context: NodeContext,
62 #[serde(skip)]
63 metrics_cache: Arc<RwLock<MetricMap>>,
64 #[serde(skip)]
65 is_running: Arc<AtomicBool>,
66 #[serde(skip)]
68 last_start_ts: Arc<AtomicU64>,
69}
70
71#[derive(Deserialize)]
72pub(crate) struct RawNetworkNode {
73 pub(crate) name: String,
74 pub(crate) ws_uri: String,
75 pub(crate) prometheus_uri: String,
76 pub(crate) multiaddr: String,
77 pub(crate) spec: NodeSpec,
78 pub(crate) cmd_generator_opts: GenCmdOptions,
79 pub(crate) context: NodeContext,
80 #[serde(default)]
81 pub(crate) inner: serde_json::Value,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum WaitCount {
95 TargetReached(u32),
96 TargetFailed(u32),
97}
98
99impl WaitCount {
100 pub fn success(&self) -> bool {
101 match self {
102 Self::TargetReached(..) => true,
103 Self::TargetFailed(..) => false,
104 }
105 }
106}
107
108pub type LogLineCount = WaitCount;
109
110#[derive(Clone)]
123pub struct CountOptions {
124 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
125 pub timeout: Duration,
126 pub wait_until_timeout_elapses: bool,
127}
128
129impl CountOptions {
130 pub fn new(
131 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
132 timeout: Duration,
133 wait_until_timeout_elapses: bool,
134 ) -> Self {
135 Self {
136 predicate: Arc::new(predicate),
137 timeout,
138 wait_until_timeout_elapses,
139 }
140 }
141
142 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
143 Self::new(|n| n == 0, timeout, true)
144 }
145
146 pub fn at_least_once(timeout: Duration) -> Self {
147 Self::new(|count| count >= 1, timeout, false)
148 }
149
150 pub fn at_least(target: u32, timeout: Duration) -> Self {
151 Self::new(move |count| count >= target, timeout, false)
152 }
153
154 pub fn exactly_once(timeout: Duration) -> Self {
155 Self::new(|count| count == 1, timeout, false)
156 }
157}
158
159pub type LogLineCountOptions = CountOptions;
160
161impl NetworkNode {
162 #[allow(clippy::too_many_arguments)]
164 pub(crate) fn new<T: Into<String>>(
165 name: T,
166 ws_uri: T,
167 prometheus_uri: T,
168 multiaddr: T,
169 spec: NodeSpec,
170 inner: DynNode,
171 cmd_generator_opts: GenCmdOptions,
172 context: NodeContext,
173 ) -> Self {
174 Self {
175 name: name.into(),
176 ws_uri: ws_uri.into(),
177 prometheus_uri: prometheus_uri.into(),
178 inner,
179 spec,
180 cmd_generator_opts,
181 context,
182 multiaddr: multiaddr.into(),
183 metrics_cache: Arc::new(Default::default()),
184 is_running: Arc::new(AtomicBool::new(false)),
185 last_start_ts: Arc::new(AtomicU64::new(0)),
186 }
187 }
188
189 pub fn is_running(&self) -> bool {
193 self.is_running.load(Ordering::Acquire)
194 }
195
196 pub fn last_start_ts(&self) -> u64 {
198 self.last_start_ts.load(Ordering::Acquire)
199 }
200
201 pub async fn is_responsive(&self) -> bool {
208 tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
209 .await
210 .is_ok()
211 }
212
213 pub(crate) fn set_is_running(&self, is_running: bool) {
214 self.is_running.store(is_running, Ordering::Release);
215 }
216
217 pub(crate) fn set_last_start_ts(&self, ts: u64) {
219 self.last_start_ts.store(ts, Ordering::Release);
220 }
221
222 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
223 self.multiaddr = multiaddr.into();
224 }
225
226 pub fn name(&self) -> &str {
227 &self.name
228 }
229
230 pub fn args(&self) -> Vec<&str> {
233 self.inner.args()
234 }
235
236 pub fn user_args(&self) -> Vec<String> {
239 self.spec
240 .args
241 .iter()
242 .fold(vec![], |acc, arg| [acc, arg.to_vec()].concat())
243 }
244
245 pub fn spec(&self) -> &NodeSpec {
246 &self.spec
247 }
248
249 pub fn ws_uri(&self) -> &str {
250 &self.ws_uri
251 }
252
253 pub fn multiaddr(&self) -> &str {
254 self.multiaddr.as_ref()
255 }
256
257 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
261 get_client_from_url(&self.ws_uri).await
262 }
263
264 #[deprecated = "Use `wait_client` instead."]
266 pub async fn client<Config: subxt::Config>(
267 &self,
268 ) -> Result<OnlineClient<Config>, subxt::Error> {
269 self.try_client().await
270 }
271
272 pub async fn try_client<Config: subxt::Config>(
281 &self,
282 ) -> Result<OnlineClient<Config>, subxt::Error> {
283 get_client_from_url(&self.ws_uri).await
284 }
285
286 pub async fn wait_client<Config: subxt::Config>(
288 &self,
289 ) -> Result<OnlineClient<Config>, anyhow::Error> {
290 debug!("wait_client ws_uri: {}", self.ws_uri());
291 wait_ws_ready(self.ws_uri())
292 .await
293 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
294
295 self.try_client()
296 .await
297 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
298 }
299
300 pub async fn wait_client_with_timeout<Config: subxt::Config>(
302 &self,
303 timeout_secs: impl Into<u64>,
304 ) -> Result<OnlineClient<Config>, anyhow::Error> {
305 debug!("waiting until subxt client is ready");
306 tokio::time::timeout(
307 Duration::from_secs(timeout_secs.into()),
308 self.wait_client::<Config>(),
309 )
310 .await?
311 }
312
313 pub async fn pause(&self) -> Result<(), anyhow::Error> {
321 self.set_is_running(false);
322 self.inner.pause().await?;
323 Ok(())
324 }
325
326 pub async fn resume(&self) -> Result<(), anyhow::Error> {
332 self.set_is_running(true);
333 self.inner.resume().await?;
334 Ok(())
335 }
336
337 pub fn base_dir(&self) -> &PathBuf {
341 self.inner.base_dir()
342 }
343
344 pub async fn snapshot_db(
350 &self,
351 out_path: impl AsRef<Path>,
352 ) -> Result<NodeSnapshot, anyhow::Error> {
353 let out_path = out_path.as_ref().to_path_buf();
354 let is_cumulus_based = matches!(
355 self.context,
356 NodeContext::Para {
357 is_cumulus_based: true,
358 ..
359 }
360 );
361
362 let InnerSnapshotDb {
363 filename,
364 sha256,
365 size,
366 } = self.inner.snapshot_db(is_cumulus_based).await?;
367
368 let remote_file_path = PathBuf::from(&filename);
370 self.inner
371 .receive_file(remote_file_path.as_ref(), out_path.as_ref())
372 .await?;
373
374 Ok(NodeSnapshot {
375 path: out_path,
376 sha256,
377 size,
378 node_name: self.name().into(),
379 })
380 }
381
382 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
387 self.set_is_running(false);
388 self.inner.restart(after).await?;
389 self.set_is_running(true);
390 self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
391 Ok(())
392 }
393
394 pub async fn restart_with(
406 &self,
407 assets: Vec<AssetLocation>,
408 program: Option<String>,
409 args: Option<Vec<Arg>>,
410 after: Option<Duration>,
411 ) -> Result<(), anyhow::Error> {
412 self.set_is_running(false);
413 let mut spec_cloned = self.spec.clone();
414
415 if let Some(args) = args {
416 spec_cloned.args = args;
417 }
418 if let Some(program) = program {
419 spec_cloned.command = program.as_str().try_into()?;
420 }
421
422 let (program, args) = match self.context {
423 NodeContext::Rc
424 | NodeContext::Para {
425 is_cumulus_based: false,
426 ..
427 } => generate_node_command(&spec_cloned, self.cmd_generator_opts.clone(), None),
428 NodeContext::Para {
429 para_id,
430 is_cumulus_based: true,
431 } => generate_node_command_cumulus(
432 &spec_cloned,
433 self.cmd_generator_opts.clone(),
434 para_id,
435 ),
436 };
437
438 self.inner
439 .restart_with(&assets, &program, &args, after)
440 .await?;
441 self.set_is_running(true);
442 self.set_last_start_ts(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs());
443 Ok(())
444 }
445
446 pub async fn run_script(
453 &self,
454 options: RunScriptOptions,
455 ) -> Result<ExecutionResult, anyhow::Error> {
456 self.inner
457 .run_script(options)
458 .await
459 .map_err(|e| anyhow!("Failed to run script: {e}"))
460 }
461
462 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
470 let metric_name = metric_name.into();
471 self.fetch_metrics().await?;
473 self.metric(&metric_name, true).await
475 }
476
477 pub async fn assert(
486 &self,
487 metric_name: impl Into<String>,
488 value: impl Into<f64>,
489 ) -> Result<bool, anyhow::Error> {
490 let value: f64 = value.into();
491 self.assert_with(metric_name, |v| v == value).await
492 }
493
494 pub async fn assert_with(
497 &self,
498 metric_name: impl Into<String>,
499 predicate: impl Fn(f64) -> bool,
500 ) -> Result<bool, anyhow::Error> {
501 let metric_name = metric_name.into();
502 self.fetch_metrics().await?;
504 let val = self.metric(&metric_name, true).await?;
505 let log_msg = format!("🔎 Current value {val} passed to the predicated?");
506 if metric_name == PROCESS_START_TIME_METRIC {
507 trace!(target: MONITOR_TARGET, "{log_msg}");
508 } else {
509 trace!("{log_msg}");
510 }
511 Ok(predicate(val))
512 }
513
514 pub async fn wait_metric(
518 &self,
519 metric_name: impl Into<String>,
520 predicate: impl Fn(f64) -> bool,
521 ) -> Result<(), anyhow::Error> {
522 let metric_name = metric_name.into();
523 let log_msg = format!(
524 "[{}] waiting until metric {metric_name} pass the predicate",
525 self.name()
526 );
527 if metric_name == PROCESS_START_TIME_METRIC {
528 trace!(target: MONITOR_TARGET, "{log_msg}");
529 } else {
530 trace!("{log_msg}");
531 }
532
533 loop {
534 let res = self.assert_with(&metric_name, &predicate).await;
535 let log_msg = format!("res: {res:?}");
536 match res {
537 Ok(res) => {
538 if res {
539 return Ok(());
540 }
541 },
542 Err(e) => match e.downcast::<reqwest::Error>() {
543 Ok(io_err) => {
544 if !skip_err_while_waiting(&io_err) {
545 return Err(io_err.into());
546 }
547 },
548 Err(other) => {
549 match other.downcast::<NetworkNodeError>() {
550 Ok(node_err) => {
551 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
552 return Err(node_err.into());
553 }
554 },
555 Err(other) => return Err(other),
556 };
557 },
558 },
559 }
560
561 if metric_name == PROCESS_START_TIME_METRIC {
562 trace!(target: MONITOR_TARGET, "{log_msg}");
563 } else {
564 trace!("{log_msg}");
565 }
566
567 tokio::time::sleep(Duration::from_secs(1)).await;
569 }
570 }
571
572 pub async fn wait_metric_with_timeout(
575 &self,
576 metric_name: impl Into<String>,
577 predicate: impl Fn(f64) -> bool,
578 timeout_secs: impl Into<u64>,
579 ) -> Result<(), anyhow::Error> {
580 let metric_name = metric_name.into();
581 let secs = timeout_secs.into();
582 let log_msg = format!(
583 "[{}] waiting until metric {metric_name} pass the predicate for {secs}s",
584 self.name()
585 );
586
587 if metric_name == PROCESS_START_TIME_METRIC {
588 trace!(target: MONITOR_TARGET, "{log_msg}");
589 } else {
590 debug!("{log_msg}");
591 }
592
593 let res = tokio::time::timeout(
594 Duration::from_secs(secs),
595 self.wait_metric(&metric_name, predicate),
596 )
597 .await;
598
599 if let Ok(inner_res) = res {
600 match inner_res {
601 Ok(_) => Ok(()),
602 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
603 }
604 } else {
605 Err(anyhow!(
607 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
608 ))
609 }
610 }
611
612 pub async fn logs(&self) -> Result<String, anyhow::Error> {
617 Ok(self.inner.logs().await?)
618 }
619
620 pub async fn wait_log_line_count(
622 &self,
623 pattern: impl Into<String>,
624 is_glob: bool,
625 count: usize,
626 ) -> Result<(), anyhow::Error> {
627 let pattern = pattern.into();
628 let pattern_clone = pattern.clone();
629 debug!("waiting until we find pattern {pattern} {count} times");
630 let match_fn: BoxedClosure = if is_glob {
631 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
632 } else {
633 let re = Regex::new(&pattern)?;
634 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
635 };
636
637 loop {
638 let mut q = 0_usize;
639 let logs = self.logs().await?;
640 for line in logs.lines() {
641 trace!("line is {line}");
642 if match_fn(line)? {
643 trace!("pattern {pattern_clone} match in line {line}");
644 q += 1;
645 if q >= count {
646 return Ok(());
647 }
648 }
649 }
650
651 tokio::time::sleep(Duration::from_secs(2)).await;
652 }
653 }
654
655 pub async fn wait_log_line_count_with_timeout(
699 &self,
700 substring: impl Into<String>,
701 is_glob: bool,
702 options: LogLineCountOptions,
703 ) -> Result<LogLineCount, anyhow::Error> {
704 let substring = substring.into();
705 debug!(
706 "waiting until match lines count within {} seconds",
707 options.timeout.as_secs_f64()
708 );
709
710 let start = tokio::time::Instant::now();
711
712 let match_fn: BoxedClosure = if is_glob {
713 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
714 } else {
715 let re = Regex::new(&substring)?;
716 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
717 };
718
719 if options.wait_until_timeout_elapses {
720 tokio::time::sleep(options.timeout).await;
721 }
722
723 let mut q;
724 loop {
725 q = 0_u32;
726 let logs = self.logs().await?;
727 for line in logs.lines() {
728 if match_fn(line)? {
729 q += 1;
730
731 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
736 return Ok(LogLineCount::TargetReached(q));
737 }
738 }
739 }
740
741 if start.elapsed() >= options.timeout {
742 break;
743 }
744
745 tokio::time::sleep(Duration::from_secs(2)).await;
746 }
747
748 if (options.predicate)(q) {
749 Ok(LogLineCount::TargetReached(q))
750 } else {
751 Ok(LogLineCount::TargetFailed(q))
752 }
753 }
754
755 pub async fn wait_event_count_with_timeout(
799 &self,
800 pallet: impl Into<String>,
801 variant: impl Into<String>,
802 options: CountOptions,
803 ) -> Result<WaitCount, anyhow::Error> {
804 let pallet = pallet.into();
805 let variant = variant.into();
806 debug!(
807 "waiting until match event ({pallet} {variant}) count within {} seconds",
808 options.timeout.as_secs_f64()
809 );
810
811 let init_value = Arc::new(AtomicU32::new(0));
812
813 let res = tokio::time::timeout(
814 options.timeout,
815 self.wait_event_count(&pallet, &variant, &options, init_value.clone()),
816 )
817 .await;
818
819 let q = init_value.load(Ordering::Relaxed);
820 if let Ok(inner_res) = res {
821 match inner_res {
822 Ok(_) => Ok(WaitCount::TargetReached(q)),
823 Err(e) => Err(anyhow!("Error waiting for counter: {e}")),
824 }
825 } else {
826 if options.wait_until_timeout_elapses {
828 let q = init_value.load(Ordering::Relaxed);
829 if (options.predicate)(q) {
830 Ok(LogLineCount::TargetReached(q))
831 } else {
832 Ok(LogLineCount::TargetFailed(q))
833 }
834 } else {
835 Err(anyhow!(
836 "Timeout ({}), waiting for counter",
837 options.timeout.as_secs()
838 ))
839 }
840 }
841 }
842
843 async fn wait_event_count(
845 &self,
846 pallet: &str,
847 variant: &str,
848 options: &CountOptions,
849 init_count: Arc<AtomicU32>,
850 ) -> Result<(), anyhow::Error> {
851 let client: OnlineClient<PolkadotConfig> = self.wait_client().await?;
852 let mut blocks_sub: subxt::backend::StreamOf<
853 Result<
854 subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
855 subxt::Error,
856 >,
857 > = client.blocks().subscribe_finalized().await?;
858 while let Some(block) = blocks_sub.next().await {
859 let events = block?.events().await?;
860 for event in events.iter() {
861 let evt = event?;
862 if evt.pallet_name() == pallet && evt.variant_name() == variant {
863 let old_value = init_count.fetch_add(1, Ordering::Relaxed);
864 if !options.wait_until_timeout_elapses && (options.predicate)(old_value + 1) {
865 return Ok(());
866 }
867 }
868 }
869 }
870
871 Ok(())
872 }
873
874 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
875 let response = reqwest::get(&self.prometheus_uri).await?;
876 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
877 let mut cache = self.metrics_cache.write().await;
878 *cache = metrics;
879 Ok(())
880 }
881
882 async fn metric(
884 &self,
885 metric_name: &str,
886 treat_not_found_as_zero: bool,
887 ) -> Result<f64, anyhow::Error> {
888 let mut metrics_map = self.metrics_cache.read().await;
889 if metrics_map.is_empty() {
890 drop(metrics_map);
892 self.fetch_metrics().await?;
893 metrics_map = self.metrics_cache.read().await;
894 }
895
896 if let Some(val) = metrics_map.get(metric_name) {
897 Ok(*val)
898 } else if treat_not_found_as_zero {
899 Ok(0_f64)
900 } else {
901 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
902 }
903 }
904
905 pub async fn get_histogram_buckets(
925 &self,
926 metric_name: impl AsRef<str>,
927 label_filters: Option<HashMap<String, String>>,
928 ) -> Result<HashMap<String, u64>, anyhow::Error> {
929 let metric_name = metric_name.as_ref();
930
931 let response = reqwest::get(&self.prometheus_uri).await?;
933 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
934
935 let resolved_metric_name = if metric_name.contains("_bucket") {
937 metric_name.to_string()
938 } else {
939 format!("{metric_name}_bucket")
940 };
941
942 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
946
947 for (key, &value) in metrics.iter() {
948 if !key.starts_with(&resolved_metric_name) {
949 continue;
950 }
951
952 let remaining = &key[resolved_metric_name.len()..];
953
954 let labels_str = &remaining[1..remaining.len() - 1];
955 let parsed_labels = Self::parse_label_string(labels_str);
956
957 if !parsed_labels.contains_key("le") {
959 continue;
960 }
961
962 if let Some(ref filters) = label_filters {
964 let mut all_match = true;
965 for (filter_key, filter_value) in filters {
966 if parsed_labels.get(filter_key) != Some(filter_value) {
967 all_match = false;
968 break;
969 }
970 }
971 if !all_match {
972 continue;
973 }
974 }
975
976 metric_entries.push((key.clone(), parsed_labels, value as u64));
977 }
978
979 let max_label_count = metric_entries
982 .iter()
983 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
984 .max()
985 .unwrap_or(0);
986
987 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
989 let mut seen_le_values = HashSet::new();
990 let mut active_series: Option<Vec<(String, String)>> = None;
991
992 for (_, parsed_labels, value) in metric_entries {
993 let le_value = parsed_labels.get("le").unwrap().clone();
994
995 let mut non_le_labels: Vec<(String, String)> = parsed_labels
997 .iter()
998 .filter(|(k, _)| k.as_str() != "le")
999 .map(|(k, v)| (k.clone(), v.clone()))
1000 .collect();
1001 non_le_labels.sort();
1002
1003 if non_le_labels.len() < max_label_count {
1006 continue;
1007 }
1008
1009 if let Some(ref prev_series) = active_series {
1011 if prev_series != &non_le_labels {
1012 if !raw_buckets.is_empty() {
1013 break; }
1015 active_series = Some(non_le_labels.clone());
1016 seen_le_values.clear();
1017 }
1018 } else {
1019 active_series = Some(non_le_labels.clone());
1020 }
1021
1022 if !seen_le_values.insert(le_value.clone()) {
1024 continue;
1025 }
1026
1027 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
1028 raw_buckets.push((le_value, value));
1029 }
1030
1031 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
1033
1034 let mut buckets = HashMap::new();
1036 let mut previous_value = 0_u64;
1037 for (le, cumulative_count) in raw_buckets {
1038 if cumulative_count < previous_value {
1039 warn!(
1040 "Warning: bucket count decreased from {} to {} at le={}",
1041 previous_value, cumulative_count, le
1042 );
1043 }
1044 let delta = cumulative_count.saturating_sub(previous_value);
1045 buckets.insert(le, delta);
1046 previous_value = cumulative_count;
1047 }
1048
1049 Ok(buckets)
1050 }
1051
1052 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
1058 let mut labels = HashMap::new();
1059 let mut current_key = String::new();
1060 let mut current_value = String::new();
1061 let mut in_value = false;
1062 let mut in_quotes = false;
1063
1064 for ch in labels_str.chars() {
1065 match ch {
1066 '=' if !in_quotes && !in_value => {
1067 in_value = true;
1068 },
1069 '"' if in_value => {
1070 in_quotes = !in_quotes;
1071 },
1072 ',' if !in_quotes => {
1073 if !current_key.is_empty() {
1075 labels.insert(
1076 current_key.trim().to_string(),
1077 current_value.trim().to_string(),
1078 );
1079 current_key.clear();
1080 current_value.clear();
1081 in_value = false;
1082 }
1083 },
1084 _ => {
1085 if in_value {
1086 current_value.push(ch);
1087 } else {
1088 current_key.push(ch);
1089 }
1090 },
1091 }
1092 }
1093
1094 if !current_key.is_empty() {
1096 labels.insert(
1097 current_key.trim().to_string(),
1098 current_value.trim().to_string(),
1099 );
1100 }
1101
1102 labels
1103 }
1104
1105 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
1109 use std::cmp::Ordering;
1110
1111 match (a, b) {
1113 ("+Inf", "+Inf") => Ordering::Equal,
1114 ("+Inf", _) => Ordering::Greater,
1115 (_, "+Inf") => Ordering::Less,
1116 _ => {
1117 match (a.parse::<f64>(), b.parse::<f64>()) {
1119 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
1120 _ => a.cmp(b),
1122 }
1123 },
1124 }
1125 }
1126
1127 pub async fn wait_until_is_up(
1139 &self,
1140 timeout_secs: impl Into<u64>,
1141 ) -> Result<(), anyhow::Error> {
1142 self.wait_metric_with_timeout(PROCESS_START_TIME_METRIC, |b| b >= 1.0, timeout_secs)
1143 .await
1144 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
1145 }
1146}
1147
1148impl std::fmt::Debug for NetworkNode {
1149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1150 f.debug_struct("NetworkNode")
1151 .field("inner", &"inner_skipped")
1152 .field("spec", &self.spec)
1153 .field("name", &self.name)
1154 .field("ws_uri", &self.ws_uri)
1155 .field("prometheus_uri", &self.prometheus_uri)
1156 .finish()
1157 }
1158}
1159
1160fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
1161where
1162 S: Serializer,
1163{
1164 erased_serde::serialize(node.as_ref(), serializer)
1165}
1166
1167#[cfg(test)]
1169mod tests {
1170 use std::{
1171 path::{Path, PathBuf},
1172 sync::{Arc, Mutex},
1173 };
1174
1175 use async_trait::async_trait;
1176 use provider::{types::*, ProviderError, ProviderNode};
1177
1178 use super::*;
1179
1180 #[derive(Serialize)]
1181 struct MockNode {
1182 logs: Arc<Mutex<Vec<String>>>,
1183 }
1184
1185 impl MockNode {
1186 fn new() -> Self {
1187 Self {
1188 logs: Arc::new(Mutex::new(vec![])),
1189 }
1190 }
1191
1192 fn logs_push(&self, lines: Vec<impl Into<String>>) {
1193 self.logs
1194 .lock()
1195 .unwrap()
1196 .extend(lines.into_iter().map(|l| l.into()));
1197 }
1198 }
1199
1200 #[async_trait]
1201 impl ProviderNode for MockNode {
1202 fn name(&self) -> &str {
1203 todo!()
1204 }
1205
1206 fn args(&self) -> Vec<&str> {
1207 todo!()
1208 }
1209
1210 fn base_dir(&self) -> &PathBuf {
1211 todo!()
1212 }
1213
1214 fn config_dir(&self) -> &PathBuf {
1215 todo!()
1216 }
1217
1218 fn data_dir(&self) -> &PathBuf {
1219 todo!()
1220 }
1221
1222 fn relay_data_dir(&self) -> &PathBuf {
1223 todo!()
1224 }
1225
1226 fn scripts_dir(&self) -> &PathBuf {
1227 todo!()
1228 }
1229
1230 fn log_path(&self) -> &PathBuf {
1231 todo!()
1232 }
1233
1234 fn log_cmd(&self) -> String {
1235 todo!()
1236 }
1237
1238 fn path_in_node(&self, _file: &Path) -> PathBuf {
1239 todo!()
1240 }
1241
1242 async fn logs(&self) -> Result<String, ProviderError> {
1243 Ok(self.logs.lock().unwrap().join("\n"))
1244 }
1245
1246 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
1247 todo!()
1248 }
1249
1250 async fn run_command(
1251 &self,
1252 _options: RunCommandOptions,
1253 ) -> Result<ExecutionResult, ProviderError> {
1254 todo!()
1255 }
1256
1257 async fn run_script(
1258 &self,
1259 _options: RunScriptOptions,
1260 ) -> Result<ExecutionResult, ProviderError> {
1261 todo!()
1262 }
1263
1264 async fn send_file(
1265 &self,
1266 _local_file_path: &Path,
1267 _remote_file_path: &Path,
1268 _mode: &str,
1269 ) -> Result<(), ProviderError> {
1270 todo!()
1271 }
1272
1273 async fn receive_file(
1274 &self,
1275 _remote_file_path: &Path,
1276 _local_file_path: &Path,
1277 ) -> Result<(), ProviderError> {
1278 todo!()
1279 }
1280
1281 async fn pause(&self) -> Result<(), ProviderError> {
1282 todo!()
1283 }
1284
1285 async fn resume(&self) -> Result<(), ProviderError> {
1286 todo!()
1287 }
1288
1289 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
1290 todo!()
1291 }
1292
1293 async fn restart_with(
1294 &self,
1295 _assets: &[AssetLocation],
1296 _cmd: &str,
1297 _args: &[String],
1298 _after: Option<Duration>,
1299 ) -> Result<(), ProviderError> {
1300 todo!()
1301 }
1302
1303 async fn destroy(&self) -> Result<(), ProviderError> {
1304 todo!()
1305 }
1306
1307 async fn snapshot_db(&self, _: bool) -> Result<InnerSnapshotDb, ProviderError> {
1308 todo!()
1309 }
1310 }
1311
1312 #[tokio::test(flavor = "multi_thread")]
1313 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1314 let mock_provider = Arc::new(MockNode::new());
1315 let mock_node = NetworkNode::new(
1316 "node1",
1317 "ws_uri",
1318 "prometheus_uri",
1319 "multiaddr",
1320 NodeSpec::default(),
1321 mock_provider.clone(),
1322 GenCmdOptions::default(),
1323 NodeContext::Rc,
1324 );
1325
1326 mock_provider.logs_push(vec![
1327 "system booting",
1328 "stub line 1",
1329 "stub line 2",
1330 "system ready",
1331 ]);
1332
1333 let options = LogLineCountOptions {
1335 predicate: Arc::new(|n| n == 1),
1336 timeout: Duration::from_secs(10),
1337 wait_until_timeout_elapses: false,
1338 };
1339
1340 let log_line_count = mock_node
1341 .wait_log_line_count_with_timeout("system ready", false, options)
1342 .await?;
1343
1344 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1345
1346 Ok(())
1347 }
1348
1349 #[tokio::test(flavor = "multi_thread")]
1350 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1351 let mock_provider = Arc::new(MockNode::new());
1352 let mock_node = NetworkNode::new(
1353 "node1",
1354 "ws_uri",
1355 "prometheus_uri",
1356 "multiaddr",
1357 NodeSpec::default(),
1358 mock_provider.clone(),
1359 GenCmdOptions::default(),
1360 NodeContext::Rc,
1361 );
1362
1363 mock_provider.logs_push(vec![
1364 "system booting",
1365 "stub line 1",
1366 "stub line 2",
1367 "system ready",
1368 ]);
1369
1370 let options = LogLineCountOptions {
1372 predicate: Arc::new(|n| n == 2),
1373 timeout: Duration::from_secs(4),
1374 wait_until_timeout_elapses: false,
1375 };
1376
1377 let task = tokio::spawn({
1378 async move {
1379 mock_node
1380 .wait_log_line_count_with_timeout("system ready", false, options)
1381 .await
1382 .unwrap()
1383 }
1384 });
1385
1386 tokio::time::sleep(Duration::from_secs(2)).await;
1387
1388 mock_provider.logs_push(vec!["system ready"]);
1389
1390 let log_line_count = task.await?;
1391
1392 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1393
1394 Ok(())
1395 }
1396
1397 #[tokio::test(flavor = "multi_thread")]
1398 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1399 let mock_provider = Arc::new(MockNode::new());
1400 let mock_node = NetworkNode::new(
1401 "node1",
1402 "ws_uri",
1403 "prometheus_uri",
1404 "multiaddr",
1405 NodeSpec::default(),
1406 mock_provider.clone(),
1407 GenCmdOptions::default(),
1408 NodeContext::Rc,
1409 );
1410
1411 mock_provider.logs_push(vec![
1412 "system booting",
1413 "stub line 1",
1414 "stub line 2",
1415 "system ready",
1416 ]);
1417
1418 let options = LogLineCountOptions {
1420 predicate: Arc::new(|n| n == 2),
1421 timeout: Duration::from_secs(2),
1422 wait_until_timeout_elapses: false,
1423 };
1424
1425 let log_line_count = mock_node
1426 .wait_log_line_count_with_timeout("system ready", false, options)
1427 .await?;
1428
1429 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1430
1431 Ok(())
1432 }
1433
1434 #[tokio::test(flavor = "multi_thread")]
1435 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1436 let mock_provider = Arc::new(MockNode::new());
1437 let mock_node = NetworkNode::new(
1438 "node1",
1439 "ws_uri",
1440 "prometheus_uri",
1441 "multiaddr",
1442 NodeSpec::default(),
1443 mock_provider.clone(),
1444 GenCmdOptions::default(),
1445 NodeContext::Rc,
1446 );
1447
1448 mock_provider.logs_push(vec![
1449 "system booting",
1450 "stub line 1",
1451 "stub line 2",
1452 "system ready",
1453 ]);
1454
1455 let options = LogLineCountOptions {
1457 predicate: Arc::new(|n| n == 2),
1458 timeout: Duration::from_secs(2),
1459 wait_until_timeout_elapses: true,
1460 };
1461
1462 let task = tokio::spawn({
1463 async move {
1464 mock_node
1465 .wait_log_line_count_with_timeout("system ready", false, options)
1466 .await
1467 .unwrap()
1468 }
1469 });
1470
1471 tokio::time::sleep(Duration::from_secs(1)).await;
1472
1473 mock_provider.logs_push(vec!["system ready"]);
1474 mock_provider.logs_push(vec!["system ready"]);
1475
1476 let log_line_count = task.await?;
1477
1478 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1479
1480 Ok(())
1481 }
1482
1483 #[tokio::test(flavor = "multi_thread")]
1484 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1485 let mock_provider = Arc::new(MockNode::new());
1486 let mock_node = NetworkNode::new(
1487 "node1",
1488 "ws_uri",
1489 "prometheus_uri",
1490 "multiaddr",
1491 NodeSpec::default(),
1492 mock_provider.clone(),
1493 GenCmdOptions::default(),
1494 NodeContext::Rc,
1495 );
1496
1497 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1498
1499 let task = tokio::spawn({
1500 async move {
1501 mock_node
1502 .wait_log_line_count_with_timeout(
1503 "system ready",
1504 false,
1505 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1507 )
1508 .await
1509 .unwrap()
1510 }
1511 });
1512
1513 tokio::time::sleep(Duration::from_secs(1)).await;
1514
1515 mock_provider.logs_push(vec!["stub line 3"]);
1516
1517 assert!(task.await?.success());
1518
1519 Ok(())
1520 }
1521
1522 #[tokio::test(flavor = "multi_thread")]
1523 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1524 let mock_provider = Arc::new(MockNode::new());
1525 let mock_node = NetworkNode::new(
1526 "node1",
1527 "ws_uri",
1528 "prometheus_uri",
1529 "multiaddr",
1530 NodeSpec::default(),
1531 mock_provider.clone(),
1532 GenCmdOptions::default(),
1533 NodeContext::Rc,
1534 );
1535
1536 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1537
1538 let options = LogLineCountOptions {
1540 predicate: Arc::new(|n| (2..=5).contains(&n)),
1541 timeout: Duration::from_secs(2),
1542 wait_until_timeout_elapses: true,
1543 };
1544
1545 let task = tokio::spawn({
1546 async move {
1547 mock_node
1548 .wait_log_line_count_with_timeout("system ready", false, options)
1549 .await
1550 .unwrap()
1551 }
1552 });
1553
1554 tokio::time::sleep(Duration::from_secs(1)).await;
1555
1556 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1557
1558 assert!(task.await?.success());
1559
1560 Ok(())
1561 }
1562
1563 #[tokio::test(flavor = "multi_thread")]
1564 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1565 let mock_provider = Arc::new(MockNode::new());
1566 let mock_node = NetworkNode::new(
1567 "node1",
1568 "ws_uri",
1569 "prometheus_uri",
1570 "multiaddr",
1571 NodeSpec::default(),
1572 mock_provider.clone(),
1573 GenCmdOptions::default(),
1574 NodeContext::Rc,
1575 );
1576
1577 mock_provider.logs_push(vec![
1578 "system booting",
1579 "stub line 1",
1580 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1582 "stub line 2"
1583 ]);
1584
1585 let options = LogLineCountOptions {
1586 predicate: Arc::new(|n| n == 1),
1587 timeout: Duration::from_secs(3),
1588 wait_until_timeout_elapses: true,
1589 };
1590
1591 let task = tokio::spawn({
1592 async move {
1593 mock_node
1594 .wait_log_line_count_with_timeout(
1595 "error(?! importing block .*: block has an unknown parent)",
1596 false,
1597 options,
1598 )
1599 .await
1600 .unwrap()
1601 }
1602 });
1603
1604 tokio::time::sleep(Duration::from_secs(1)).await;
1605
1606 mock_provider.logs_push(vec![
1607 "system ready",
1608 "system error",
1610 "system ready",
1611 ]);
1612
1613 assert!(task.await?.success());
1614
1615 Ok(())
1616 }
1617
1618 #[tokio::test(flavor = "multi_thread")]
1619 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1620 ) -> Result<(), anyhow::Error> {
1621 let mock_provider = Arc::new(MockNode::new());
1622 let mock_node = NetworkNode::new(
1623 "node1",
1624 "ws_uri",
1625 "prometheus_uri",
1626 "multiaddr",
1627 NodeSpec::default(),
1628 mock_provider.clone(),
1629 GenCmdOptions::default(),
1630 NodeContext::Rc,
1631 );
1632
1633 mock_provider.logs_push(vec![
1634 "system booting",
1635 "stub line 1",
1636 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1638 "stub line 2"
1639 ]);
1640
1641 let options = LogLineCountOptions {
1642 predicate: Arc::new(|n| n == 1),
1643 timeout: Duration::from_secs(6),
1644 wait_until_timeout_elapses: true,
1645 };
1646
1647 let task = tokio::spawn({
1648 async move {
1649 mock_node
1650 .wait_log_line_count_with_timeout(
1651 "error(?! importing block .*: block has an unknown parent)",
1652 false,
1653 options,
1654 )
1655 .await
1656 .unwrap()
1657 }
1658 });
1659
1660 tokio::time::sleep(Duration::from_secs(1)).await;
1661
1662 mock_provider.logs_push(vec!["system ready", "system ready"]);
1663
1664 assert!(!task.await?.success());
1665
1666 Ok(())
1667 }
1668
1669 #[tokio::test(flavor = "multi_thread")]
1670 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1671 let mock_provider = Arc::new(MockNode::new());
1672 let mock_node = NetworkNode::new(
1673 "node1",
1674 "ws_uri",
1675 "prometheus_uri",
1676 "multiaddr",
1677 NodeSpec::default(),
1678 mock_provider.clone(),
1679 GenCmdOptions::default(),
1680 NodeContext::Rc,
1681 );
1682
1683 mock_provider.logs_push(vec![
1684 "system booting",
1685 "stub line 1",
1686 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1688 "stub line 2"
1689 ]);
1690
1691 let task = tokio::spawn({
1692 async move {
1693 mock_node
1694 .wait_log_line_count(
1695 "error(?! importing block .*: block has an unknown parent)",
1696 false,
1697 1,
1698 )
1699 .await
1700 .unwrap()
1701 }
1702 });
1703
1704 tokio::time::sleep(Duration::from_secs(1)).await;
1705
1706 mock_provider.logs_push(vec![
1707 "system ready",
1708 "system error",
1710 "system ready",
1711 ]);
1712
1713 assert!(task.await.is_ok());
1714
1715 Ok(())
1716 }
1717
1718 #[tokio::test(flavor = "multi_thread")]
1719 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1720 let mock_provider = Arc::new(MockNode::new());
1721 let mock_node = NetworkNode::new(
1722 "node1",
1723 "ws_uri",
1724 "prometheus_uri",
1725 "multiaddr",
1726 NodeSpec::default(),
1727 mock_provider.clone(),
1728 GenCmdOptions::default(),
1729 NodeContext::Rc,
1730 );
1731
1732 mock_provider.logs_push(vec![
1733 "system booting",
1734 "stub line 1",
1735 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1737 "stub line 2"
1738 ]);
1739
1740 let options = LogLineCountOptions {
1741 predicate: Arc::new(|count| count == 1),
1742 timeout: Duration::from_secs(2),
1743 wait_until_timeout_elapses: true,
1744 };
1745
1746 let task = tokio::spawn({
1747 async move {
1748 mock_node
1750 .wait_log_line_count_with_timeout(
1751 "error(?! importing block .*: block has an unknown parent)",
1752 false,
1753 options,
1754 )
1755 .await
1756 .unwrap()
1757 }
1758 });
1759
1760 tokio::time::sleep(Duration::from_secs(1)).await;
1761
1762 mock_provider.logs_push(vec!["system ready", "system ready"]);
1763
1764 assert!(!task.await?.success());
1765
1766 Ok(())
1767 }
1768
1769 #[tokio::test]
1770 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1771 use std::sync::Arc;
1773
1774 let mock_metrics = concat!(
1776 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1777 "# TYPE substrate_block_verification_time histogram\n",
1778 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1779 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1780 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1781 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1782 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1783 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1784 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1785 );
1786
1787 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1789 let addr = listener.local_addr()?;
1790 let metrics = Arc::new(mock_metrics.to_string());
1791
1792 tokio::spawn({
1793 let metrics = metrics.clone();
1794 async move {
1795 loop {
1796 if let Ok((mut socket, _)) = listener.accept().await {
1797 let metrics = metrics.clone();
1798 tokio::spawn(async move {
1799 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1800 let mut buffer = [0; 1024];
1801 let _ = socket.read(&mut buffer).await;
1802
1803 let response = format!(
1804 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1805 metrics.len(),
1806 metrics
1807 );
1808 let _ = socket.write_all(response.as_bytes()).await;
1809 });
1810 }
1811 }
1812 }
1813 });
1814
1815 let mock_provider = Arc::new(MockNode::new());
1817 let mock_node = NetworkNode::new(
1818 "test_node",
1819 "ws://localhost:9944",
1820 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1821 "/ip4/127.0.0.1/tcp/30333",
1822 NodeSpec::default(),
1823 mock_provider,
1824 GenCmdOptions::default(),
1825 NodeContext::Rc,
1826 );
1827
1828 let mut label_filters = HashMap::new();
1830 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1831 let buckets = mock_node
1832 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1833 .await?;
1834
1835 assert_eq!(buckets.get("0.1"), Some(&10));
1837 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1844 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1845
1846 let buckets_filtered = mock_node
1847 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1848 .await?;
1849
1850 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1851 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1852
1853 let buckets_with_suffix = mock_node
1855 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1856 .await?;
1857
1858 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1859
1860 Ok(())
1861 }
1862
1863 #[tokio::test]
1864 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1865 use std::sync::Arc;
1867
1868 let mock_metrics = concat!(
1869 "# HELP test_metric A test metric\n",
1870 "# TYPE test_metric histogram\n",
1871 "test_metric_bucket{le=\"2.5\"} 40\n",
1872 "test_metric_bucket{le=\"0.1\"} 10\n",
1873 "test_metric_bucket{le=\"+Inf\"} 42\n",
1874 "test_metric_bucket{le=\"1.0\"} 35\n",
1875 "test_metric_bucket{le=\"0.5\"} 25\n",
1876 );
1877
1878 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1879 let addr = listener.local_addr()?;
1880 let metrics = Arc::new(mock_metrics.to_string());
1881
1882 tokio::spawn({
1883 let metrics = metrics.clone();
1884 async move {
1885 loop {
1886 if let Ok((mut socket, _)) = listener.accept().await {
1887 let metrics = metrics.clone();
1888 tokio::spawn(async move {
1889 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1890 let mut buffer = [0; 1024];
1891 let _ = socket.read(&mut buffer).await;
1892 let response = format!(
1893 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1894 metrics.len(),
1895 metrics
1896 );
1897 let _ = socket.write_all(response.as_bytes()).await;
1898 });
1899 }
1900 }
1901 }
1902 });
1903
1904 let mock_provider = Arc::new(MockNode::new());
1905 let mock_node = NetworkNode::new(
1906 "test_node",
1907 "ws://localhost:9944",
1908 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1909 "/ip4/127.0.0.1/tcp/30333",
1910 NodeSpec::default(),
1911 mock_provider,
1912 GenCmdOptions::default(),
1913 NodeContext::Rc,
1914 );
1915
1916 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1917
1918 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1926 }
1927
1928 #[tokio::test]
1929 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1930 use std::sync::Arc;
1932
1933 let mock_metrics = concat!(
1934 "# HELP test_metric A test metric\n",
1935 "# TYPE test_metric histogram\n",
1936 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1937 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1938 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1939 );
1940
1941 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1942 let addr = listener.local_addr()?;
1943 let metrics = Arc::new(mock_metrics.to_string());
1944
1945 tokio::spawn({
1946 let metrics = metrics.clone();
1947 async move {
1948 loop {
1949 if let Ok((mut socket, _)) = listener.accept().await {
1950 let metrics = metrics.clone();
1951 tokio::spawn(async move {
1952 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953 let mut buffer = [0; 1024];
1954 let _ = socket.read(&mut buffer).await;
1955 let response = format!(
1956 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1957 metrics.len(),
1958 metrics
1959 );
1960 let _ = socket.write_all(response.as_bytes()).await;
1961 });
1962 }
1963 }
1964 }
1965 });
1966
1967 let mock_provider = Arc::new(MockNode::new());
1968 let mock_node = NetworkNode::new(
1969 "test_node",
1970 "ws://localhost:9944",
1971 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1972 "/ip4/127.0.0.1/tcp/30333",
1973 NodeSpec::default(),
1974 mock_provider,
1975 GenCmdOptions::default(),
1976 NodeContext::Rc,
1977 );
1978
1979 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1981 assert_eq!(buckets.get("0.1"), Some(&5));
1982 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1987 label_filters.insert("method".to_string(), "GET,POST".to_string());
1988
1989 let buckets_filtered = mock_node
1990 .get_histogram_buckets("test_metric", Some(label_filters))
1991 .await?;
1992
1993 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1994 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1995
1996 Ok(())
1997 }
1998
1999 #[test]
2000 fn test_compare_le_values() {
2001 use std::cmp::Ordering;
2002
2003 use crate::network::node::NetworkNode;
2004
2005 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
2007 assert_eq!(
2008 NetworkNode::compare_le_values("1.0", "0.5"),
2009 Ordering::Greater
2010 );
2011 assert_eq!(
2012 NetworkNode::compare_le_values("1.0", "1.0"),
2013 Ordering::Equal
2014 );
2015
2016 assert_eq!(
2018 NetworkNode::compare_le_values("+Inf", "999"),
2019 Ordering::Greater
2020 );
2021 assert_eq!(
2022 NetworkNode::compare_le_values("0.1", "+Inf"),
2023 Ordering::Less
2024 );
2025 assert_eq!(
2026 NetworkNode::compare_le_values("+Inf", "+Inf"),
2027 Ordering::Equal
2028 );
2029
2030 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
2032 assert_eq!(
2033 NetworkNode::compare_le_values("1000", "999"),
2034 Ordering::Greater
2035 );
2036 }
2037}