1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7 cell::RefCell,
8 collections::HashMap,
9 path::PathBuf,
10 rc::Rc,
11 sync::Arc,
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use configuration::{
16 para_states::{Initial, Running},
17 shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18 types::{Arg, Command, Image, ParaId, Port, ValidationContext},
19 ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::{Deserialize, Serialize};
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29 generators::{self, chain_spec::ChainSpec},
30 network_spec::{self, NetworkSpec},
31 observability::{self, ObservabilityInfo, ObservabilityState},
32 shared::{
33 constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
34 macros,
35 types::{ChainDefaultContext, RegisterParachainOptions},
36 },
37 spawner::{self, SpawnNodeCtx},
38 utils::write_zombie_json,
39 ScopedFilesystem, ZombieRole,
40};
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
45pub enum NodeContext {
46 Rc,
47 Para {
48 para_id: ParaId,
49 is_cumulus_based: bool,
50 },
51}
52
53#[derive(Serialize)]
54pub struct Network<T: FileSystem> {
55 #[serde(skip)]
56 ns: DynNamespace,
57 #[serde(skip)]
58 filesystem: T,
59 relay: Relaychain,
60 initial_spec: NetworkSpec,
61 parachains: HashMap<u32, Vec<Parachain>>,
62 #[serde(skip)]
63 nodes_by_name: HashMap<String, NetworkNode>,
64 #[serde(skip)]
65 nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 start_time_ts: Option<String>,
68 #[serde(skip)]
69 observability: ObservabilityState,
70}
71
72impl<T: FileSystem> std::fmt::Debug for Network<T> {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("Network")
75 .field("ns", &"ns_skipped")
76 .field("relay", &self.relay)
77 .field("initial_spec", &self.initial_spec)
78 .field("parachains", &self.parachains)
79 .field("nodes_by_name", &self.nodes_by_name)
80 .field("observability", &self.observability)
81 .finish()
82 }
83}
84
85macros::create_add_options!(AddNodeOptions {
86 chain_spec: Option<PathBuf>,
87 override_eth_key: Option<String>
88});
89
90macros::create_add_options!(AddCollatorOptions {
91 chain_spec: Option<PathBuf>,
92 chain_spec_relay: Option<PathBuf>,
93 override_eth_key: Option<String>
94});
95
96impl<T: FileSystem> Network<T> {
97 pub(crate) fn new_with_relay(
98 relay: Relaychain,
99 ns: DynNamespace,
100 fs: T,
101 initial_spec: NetworkSpec,
102 ) -> Self {
103 Self {
104 ns,
105 filesystem: fs,
106 relay,
107 initial_spec,
108 parachains: Default::default(),
109 nodes_by_name: Default::default(),
110 nodes_to_watch: Default::default(),
111 start_time_ts: Default::default(),
112 observability: ObservabilityState::default(),
113 }
114 }
115
116 pub fn ns_name(&self) -> String {
118 self.ns.name().to_string()
119 }
120
121 pub fn base_dir(&self) -> Option<&str> {
122 self.ns.base_dir().to_str()
123 }
124
125 pub fn relaychain(&self) -> &Relaychain {
126 &self.relay
127 }
128
129 pub async fn destroy(mut self) -> Result<(), ProviderError> {
131 if let Err(e) = self.stop_observability().await {
132 warn!("⚠️ Failed to cleanup observability stack: {e}");
133 }
134 self.ns.destroy().await
135 }
136
137 pub fn observability(&self) -> Option<&ObservabilityInfo> {
138 self.observability.as_runnnig()
139 }
140
141 pub fn observability_state(&self) -> &ObservabilityState {
142 &self.observability
143 }
144
145 pub async fn add_node(
171 &mut self,
172 name: impl Into<String>,
173 options: AddNodeOptions,
174 ) -> Result<(), anyhow::Error> {
175 let name = generate_unique_node_name_from_names(
176 name,
177 &mut self.nodes_by_name.keys().cloned().collect(),
178 );
179
180 let relaychain = self.relaychain();
181
182 let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
183 chain_spec_custom_path.clone()
184 } else {
185 PathBuf::from(format!(
186 "{}/{}.json",
187 self.ns.base_dir().to_string_lossy(),
188 relaychain.chain
189 ))
190 };
191
192 let chain_context = ChainDefaultContext {
193 default_command: self.initial_spec.relaychain.default_command.as_ref(),
194 default_image: self.initial_spec.relaychain.default_image.as_ref(),
195 default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
196 default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
197 default_args: self.initial_spec.relaychain.default_args.iter().collect(),
198 };
199
200 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
201 &name,
202 options.into(),
203 &chain_context,
204 false,
205 false,
206 )?;
207
208 node_spec.available_args_output = Some(
209 self.initial_spec
210 .node_available_args_output(&node_spec, self.ns.clone())
211 .await?,
212 );
213
214 let base_dir = self.ns.base_dir().to_string_lossy();
215 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
216
217 let resolved_db_snapshots = generators::resolve_db_snapshots(
218 std::iter::once(&node_spec),
219 &self.ns,
220 &self.filesystem,
221 )
222 .await?;
223
224 let ctx = SpawnNodeCtx {
225 chain_id: &relaychain.chain_id,
226 parachain_id: None,
227 chain: &relaychain.chain,
228 role: ZombieRole::Node,
229 ns: &self.ns,
230 scoped_fs: &scoped_fs,
231 parachain: None,
232 bootnodes_addr: &vec![],
233 wait_ready: true,
234 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
235 global_settings: &self.initial_spec.global_settings,
236 resolved_db_snapshots: &resolved_db_snapshots,
237 };
238
239 let global_files_to_inject = vec![TransferedFile::new(
240 chain_spec_path,
241 PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
242 )];
243
244 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
245
246 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
258 .await?;
259
260 self.add_running_node(node.clone(), None).await;
262
263 self.write_zombie_json().await?;
265
266 generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
267
268 Ok(())
269 }
270
271 pub async fn add_collator(
298 &mut self,
299 name: impl Into<String>,
300 options: AddCollatorOptions,
301 para_id: u32,
302 ) -> Result<(), anyhow::Error> {
303 let name = generate_unique_node_name_from_names(
304 name,
305 &mut self.nodes_by_name.keys().cloned().collect(),
306 );
307 let spec = self
308 .initial_spec
309 .parachains
310 .iter()
311 .find(|para| para.id == para_id)
312 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
313 let role = if spec.is_cumulus_based {
314 ZombieRole::CumulusCollator
315 } else {
316 ZombieRole::Collator
317 };
318 let chain_context = ChainDefaultContext {
319 default_command: spec.default_command.as_ref(),
320 default_image: spec.default_image.as_ref(),
321 default_resources: spec.default_resources.as_ref(),
322 default_db_snapshot: spec.default_db_snapshot.as_ref(),
323 default_args: spec.default_args.iter().collect(),
324 };
325
326 let parachain = self
327 .parachains
328 .get_mut(¶_id)
329 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
330 .get_mut(0)
331 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
332
333 let base_dir = self.ns.base_dir().to_string_lossy();
334 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
335
336 let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
337 chain_spec_custom_path.clone()
338 } else {
339 PathBuf::from(format!(
340 "{}/{}.json",
341 self.ns.base_dir().to_string_lossy(),
342 self.relay.chain
343 ))
344 };
345
346 let mut global_files_to_inject = vec![TransferedFile::new(
347 relaychain_spec_path,
348 PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
349 )];
350
351 let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
352 Some(para_chain_spec_custom.clone())
353 } else if let Some(para_spec_path) = ¶chain.chain_spec_path {
354 Some(PathBuf::from(format!(
355 "{}/{}",
356 self.ns.base_dir().to_string_lossy(),
357 para_spec_path.to_string_lossy()
358 )))
359 } else {
360 None
361 };
362
363 if let Some(para_spec_path) = para_chain_spec_local_path {
364 global_files_to_inject.push(TransferedFile::new(
365 para_spec_path,
366 PathBuf::from(format!("/cfg/{para_id}.json")),
367 ));
368 }
369
370 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
371 name,
372 options.into(),
373 &chain_context,
374 true,
375 spec.is_evm_based,
376 )?;
377
378 node_spec.available_args_output = Some(
379 self.initial_spec
380 .node_available_args_output(&node_spec, self.ns.clone())
381 .await?,
382 );
383
384 let resolved_db_snapshots = generators::resolve_db_snapshots(
385 std::iter::once(&node_spec),
386 &self.ns,
387 &self.filesystem,
388 )
389 .await?;
390
391 let ctx = SpawnNodeCtx {
393 chain_id: &self.relay.chain_id,
394 parachain_id: parachain.chain_id.as_deref(),
395 chain: &self.relay.chain,
396 role,
397 ns: &self.ns,
398 scoped_fs: &scoped_fs,
399 parachain: Some(spec),
400 bootnodes_addr: &vec![],
401 wait_ready: true,
402 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
403 global_settings: &self.initial_spec.global_settings,
404 resolved_db_snapshots: &resolved_db_snapshots,
405 };
406
407 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
408
409 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
411 .await?;
412
413 self.add_running_node(node, Some(para_id)).await;
414
415 self.write_zombie_json().await?;
417
418 generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
419
420 Ok(())
421 }
422
423 pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
428 let used_ports = self
429 .nodes_iter()
430 .map(|node| node.spec())
431 .flat_map(|spec| {
432 [
433 spec.ws_port.0,
434 spec.rpc_port.0,
435 spec.prometheus_port.0,
436 spec.p2p_port.0,
437 ]
438 })
439 .collect();
440
441 let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
442
443 let used_para_ids = self
445 .parachains
446 .iter()
447 .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
448 .collect();
449
450 let context = ValidationContext {
451 used_ports,
452 used_nodes_names,
453 used_para_ids,
454 };
455 let context = Rc::new(RefCell::new(context));
456
457 ParachainConfigBuilder::new_with_running(context)
458 }
459
460 pub async fn add_parachain(
494 &mut self,
495 para_config: &ParachainConfig,
496 custom_relaychain_spec: Option<PathBuf>,
497 custom_parchain_fs_prefix: Option<String>,
498 ) -> Result<(), anyhow::Error> {
499 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
500 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
501
502 let mut global_files_to_inject = vec![];
503
504 let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
506 global_files_to_inject.push(TransferedFile::new(
508 custom_path.clone(),
509 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
510 ));
511 let content = std::fs::read_to_string(custom_path)?;
512 ChainSpec::chain_id_from_spec(&content)?
513 } else {
514 global_files_to_inject.push(TransferedFile::new(
515 PathBuf::from(format!(
516 "{}/{}",
517 scoped_fs.base_dir,
518 self.relaychain().chain_spec_path.to_string_lossy()
519 )),
520 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
521 ));
522 self.relay.chain_id.clone()
523 };
524
525 let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
526 para_config,
527 relay_chain_id.as_str().try_into()?,
528 )?;
529
530 let chain_spec_raw_path = para_spec
531 .build_chain_spec(
532 &relay_chain_id,
533 &self.ns,
534 &scoped_fs,
535 para_spec.post_process_script.clone().as_deref(),
536 )
537 .await?;
538
539 let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
541 custom_prefix
542 } else {
543 para_spec.id.to_string()
544 };
545
546 scoped_fs.create_dir(¶_path_prefix).await?;
547 para_spec
549 .genesis_state
550 .build(
551 chain_spec_raw_path.as_ref(),
552 format!("{}/genesis-state", ¶_path_prefix),
553 &self.ns,
554 &scoped_fs,
555 None,
556 )
557 .await?;
558 para_spec
559 .genesis_wasm
560 .build(
561 chain_spec_raw_path.as_ref(),
562 format!("{}/para_spec-wasm", ¶_path_prefix),
563 &self.ns,
564 &scoped_fs,
565 None,
566 )
567 .await?;
568
569 let parachain =
570 Parachain::from_spec(¶_spec, &global_files_to_inject, &scoped_fs).await?;
571 let parachain_id = parachain.chain_id.clone();
572
573 let resolved_db_snapshots = generators::resolve_db_snapshots(
574 para_spec.collators.iter(),
575 &self.ns,
576 &self.filesystem,
577 )
578 .await?;
579
580 let ctx_para = SpawnNodeCtx {
582 parachain: Some(¶_spec),
583 parachain_id: parachain_id.as_deref(),
584 role: if para_spec.is_cumulus_based {
585 ZombieRole::CumulusCollator
586 } else {
587 ZombieRole::Collator
588 },
589 bootnodes_addr: ¶_config
590 .bootnodes_addresses()
591 .iter()
592 .map(|&a| a.to_string())
593 .collect(),
594 chain_id: &self.relaychain().chain_id,
595 chain: &self.relaychain().chain,
596 ns: &self.ns,
597 scoped_fs: &scoped_fs,
598 wait_ready: false,
599 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
600 global_settings: &self.initial_spec.global_settings,
601 resolved_db_snapshots: &resolved_db_snapshots,
602 };
603
604 let first_node_url = self
606 .relaychain()
607 .nodes
608 .first()
609 .ok_or(anyhow::anyhow!(
610 "At least one node of the relaychain should be running"
611 ))?
612 .ws_uri();
613
614 if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
615 let register_para_options = RegisterParachainOptions {
616 id: parachain.para_id,
617 wasm_path: para_spec
619 .genesis_wasm
620 .artifact_path()
621 .ok_or(anyhow::anyhow!(
622 "artifact path for wasm must be set at this point",
623 ))?
624 .to_path_buf(),
625 state_path: para_spec
626 .genesis_state
627 .artifact_path()
628 .ok_or(anyhow::anyhow!(
629 "artifact path for state must be set at this point",
630 ))?
631 .to_path_buf(),
632 node_ws_url: first_node_url.to_string(),
633 onboard_as_para: para_spec.onboard_as_parachain,
634 seed: None, finalization: false,
636 };
637
638 Parachain::register(register_para_options, &scoped_fs).await?;
639 }
640
641 let spawning_tasks = para_spec
643 .collators
644 .iter()
645 .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
646
647 let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
648
649 let waiting_tasks = running_nodes.iter().map(|node| {
651 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
652 });
653
654 let _ = futures::future::try_join_all(waiting_tasks).await?;
655
656 let running_para_id = parachain.para_id;
657 self.add_para(parachain);
658 for node in running_nodes {
659 self.add_running_node(node, Some(running_para_id)).await;
660 }
661
662 self.write_zombie_json().await?;
664
665 generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
666
667 Ok(())
668 }
669
670 pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
709 let para = self
710 .initial_spec
711 .parachains
712 .iter()
713 .find(|p| p.id == para_id)
714 .ok_or(anyhow::anyhow!(
715 "no parachain with id = {para_id} available",
716 ))?;
717 let para_genesis_config = para.get_genesis_config()?;
718 let first_node_url = self
719 .relaychain()
720 .nodes
721 .first()
722 .ok_or(anyhow::anyhow!(
723 "At least one node of the relaychain should be running"
724 ))?
725 .ws_uri();
726 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
727 id: para_id,
728 wasm_path: para_genesis_config.wasm_path.clone(),
730 state_path: para_genesis_config.state_path.clone(),
731 node_ws_url: first_node_url.to_string(),
732 onboard_as_para: para_genesis_config.as_parachain,
733 seed: None, finalization: false,
735 };
736 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
737 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
738 Parachain::register(register_para_options, &scoped_fs).await?;
739
740 Ok(())
741 }
742
743 pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
747 let name = name.into();
748 if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
749 return Ok(node);
750 }
751
752 let list = self.node_names().join(", ");
753
754 Err(anyhow::anyhow!(
755 "can't find node with name: {name:?}, should be one of {list}"
756 ))
757 }
758
759 pub fn get_node_mut(
760 &mut self,
761 name: impl Into<String>,
762 ) -> Result<&mut NetworkNode, anyhow::Error> {
763 let name = name.into();
764 self.nodes_iter_mut()
765 .find(|n| n.name == name)
766 .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
767 }
768
769 pub fn node_names(&self) -> Vec<String> {
770 self.nodes_iter()
771 .map(|n| &n.name)
772 .cloned()
773 .collect::<Vec<_>>()
774 }
775
776 pub fn nodes(&self) -> Vec<&NetworkNode> {
777 self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
778 }
779
780 pub async fn detach(&self) {
781 self.ns.detach().await
782 }
783
784 pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
786 if let Some(para_id) = para_id {
787 if let Some(para) = self.parachains.get_mut(¶_id).and_then(|p| p.get_mut(0)) {
788 para.collators.push(node.clone());
789 } else {
790 unreachable!()
792 }
793 } else {
794 self.relay.nodes.push(node.clone());
795 }
796 node.set_is_running(true);
798 node.set_last_start_ts(
799 SystemTime::now()
800 .duration_since(UNIX_EPOCH)
801 .expect("Timestamp should be valid")
802 .as_secs(),
803 );
804 let node_name = node.name.clone();
805 self.nodes_by_name.insert(node_name, node.clone());
806 self.nodes_to_watch.write().await.push(node);
807 }
808
809 pub(crate) fn add_para(&mut self, para: Parachain) {
810 self.parachains.entry(para.para_id).or_default().push(para);
811 }
812
813 pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
814 let base_dir = self.ns.base_dir().to_string_lossy();
815 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
816 let ns_name = self.ns.name();
817
818 write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
819 Ok(())
820 }
821
822 pub fn name(&self) -> &str {
823 self.ns.name()
824 }
825
826 pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
836 self.parachains.get(¶_id)?.first()
837 }
838
839 pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
847 self.parachains
848 .values()
849 .flat_map(|p| p.iter())
850 .find(|p| p.unique_id == unique_id.as_ref())
851 }
852
853 pub fn parachains(&self) -> Vec<&Parachain> {
854 self.parachains.values().flatten().collect()
855 }
856
857 pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
858 self.relay.nodes.iter().chain(
859 self.parachains
860 .values()
861 .flat_map(|p| p.iter())
862 .flat_map(|p| &p.collators),
863 )
864 }
865
866 pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
867 self.relay.nodes.iter_mut().chain(
868 self.parachains
869 .values_mut()
870 .flat_map(|p| p.iter_mut())
871 .flat_map(|p| &mut p.collators),
872 )
873 }
874
875 pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
885 let handles = self
886 .nodes_iter()
887 .map(|node| node.wait_until_is_up(timeout_secs));
888
889 futures::future::try_join_all(handles).await?;
890
891 Ok(())
892 }
893
894 pub async fn pause(&self) -> Result<(), anyhow::Error> {
900 futures::future::try_join_all(self.nodes_iter().map(|n| n.pause())).await?;
901 Ok(())
902 }
903
904 pub async fn resume(&self) -> Result<(), anyhow::Error> {
910 futures::future::try_join_all(self.nodes_iter().map(|n| n.resume())).await?;
911 Ok(())
912 }
913
914 pub async fn start_observability(
943 &mut self,
944 config: &configuration::ObservabilityConfig,
945 ) -> Result<&ObservabilityInfo, anyhow::Error> {
946 if self.observability().is_some() {
947 self.stop_observability().await?;
948 }
949
950 let nodes = self.nodes();
951 let info = observability::spawn_observability_stack(
952 config,
953 &nodes,
954 self.ns.name(),
955 self.ns.base_dir(),
956 &self.filesystem,
957 )
958 .await?;
959
960 self.observability = ObservabilityState::Running(info);
961 self.observability()
962 .ok_or_else(|| anyhow::anyhow!("observability state was just set but is not running"))
963 }
964
965 pub async fn stop_observability(&mut self) -> Result<(), anyhow::Error> {
970 if let ObservabilityState::Running(info) =
971 std::mem::replace(&mut self.observability, ObservabilityState::Stopped)
972 {
973 observability::cleanup_observability_stack(&info).await?;
974 }
975 Ok(())
976 }
977
978 pub(crate) fn spawn_watching_task(&self) {
979 let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
980 let ns = Arc::clone(&self.ns);
981 let node_bootstrap_timeout = self.initial_spec.global_settings.node_spawn_timeout();
982
983 tokio::spawn(async move {
984 loop {
985 tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
986
987 let guard = nodes_to_watch.read().await;
988 let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
989
990 let all_running = {
991 let all_running =
992 futures::future::try_join_all(nodes.iter().map(|n| {
993 n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
994 }))
995 .await;
996
997 if nodes.iter().any(|n| !n.is_running()) {
999 continue;
1000 } else {
1001 all_running
1002 }
1003 };
1004
1005 if let Err(e) = all_running {
1006 if let Some(node) = nodes.iter().find(|n| n.name() == e.to_string()) {
1008 let now = SystemTime::now()
1009 .duration_since(UNIX_EPOCH)
1010 .expect("get current ts should work.")
1011 .as_secs();
1012 if node_bootstrap_timeout as u64 > (now - node.last_start_ts()) {
1013 warn!("[{}] still in bootstrap window from last starting ({}), continue waiting...", node.name(), node.last_start_ts());
1014 continue;
1015 }
1016 }
1017
1018 warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
1019
1020 if let Err(e) = ns.destroy().await {
1021 error!("an error occurred during network teardown: {}", e);
1022 }
1023
1024 std::process::exit(1);
1025 }
1026 }
1027 });
1028 }
1029
1030 pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
1031 self.parachains = parachains;
1032 }
1033
1034 pub(crate) fn insert_node(&mut self, node: NetworkNode) {
1035 self.nodes_by_name.insert(node.name.clone(), node);
1036 }
1037
1038 pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
1039 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
1040 self.start_time_ts = Some(start_time_ts.as_millis().to_string());
1041 } else {
1042 warn!("⚠️ Error getting start_time timestamp");
1044 }
1045 }
1046}