zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7    cell::RefCell,
8    collections::HashMap,
9    path::PathBuf,
10    rc::Rc,
11    sync::Arc,
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use configuration::{
16    para_states::{Initial, Running},
17    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18    types::{Arg, Command, Image, Port, ValidationContext},
19    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::Serialize;
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29    generators::chain_spec::ChainSpec,
30    network_spec::{self, NetworkSpec},
31    observability::{self, ObservabilityInfo, ObservabilityState},
32    shared::{
33        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
34        macros,
35        types::{ChainDefaultContext, RegisterParachainOptions},
36    },
37    spawner::{self, SpawnNodeCtx},
38    utils::write_zombie_json,
39    ScopedFilesystem, ZombieRole,
40};
41
42#[derive(Serialize)]
43pub struct Network<T: FileSystem> {
44    #[serde(skip)]
45    ns: DynNamespace,
46    #[serde(skip)]
47    filesystem: T,
48    relay: Relaychain,
49    initial_spec: NetworkSpec,
50    parachains: HashMap<u32, Vec<Parachain>>,
51    #[serde(skip)]
52    nodes_by_name: HashMap<String, NetworkNode>,
53    #[serde(skip)]
54    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    start_time_ts: Option<String>,
57    #[serde(skip)]
58    observability: ObservabilityState,
59}
60
61impl<T: FileSystem> std::fmt::Debug for Network<T> {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Network")
64            .field("ns", &"ns_skipped")
65            .field("relay", &self.relay)
66            .field("initial_spec", &self.initial_spec)
67            .field("parachains", &self.parachains)
68            .field("nodes_by_name", &self.nodes_by_name)
69            .field("observability", &self.observability)
70            .finish()
71    }
72}
73
74macros::create_add_options!(AddNodeOptions {
75    chain_spec: Option<PathBuf>,
76    override_eth_key: Option<String>
77});
78
79macros::create_add_options!(AddCollatorOptions {
80    chain_spec: Option<PathBuf>,
81    chain_spec_relay: Option<PathBuf>,
82    override_eth_key: Option<String>
83});
84
85impl<T: FileSystem> Network<T> {
86    pub(crate) fn new_with_relay(
87        relay: Relaychain,
88        ns: DynNamespace,
89        fs: T,
90        initial_spec: NetworkSpec,
91    ) -> Self {
92        Self {
93            ns,
94            filesystem: fs,
95            relay,
96            initial_spec,
97            parachains: Default::default(),
98            nodes_by_name: Default::default(),
99            nodes_to_watch: Default::default(),
100            start_time_ts: Default::default(),
101            observability: ObservabilityState::default(),
102        }
103    }
104
105    // Pubic API
106    pub fn ns_name(&self) -> String {
107        self.ns.name().to_string()
108    }
109
110    pub fn base_dir(&self) -> Option<&str> {
111        self.ns.base_dir().to_str()
112    }
113
114    pub fn relaychain(&self) -> &Relaychain {
115        &self.relay
116    }
117
118    // Teardown the network
119    pub async fn destroy(mut self) -> Result<(), ProviderError> {
120        if let Err(e) = self.stop_observability().await {
121            warn!("⚠️  Failed to cleanup observability stack: {e}");
122        }
123        self.ns.destroy().await
124    }
125
126    pub fn observability(&self) -> Option<&ObservabilityInfo> {
127        self.observability.as_runnnig()
128    }
129
130    pub fn observability_state(&self) -> &ObservabilityState {
131        &self.observability
132    }
133
134    /// Add a node to the relaychain
135    // The new node is added to the running network instance.
136    /// # Example:
137    /// ```rust
138    /// # use provider::NativeProvider;
139    /// # use support::{fs::local::LocalFileSystem};
140    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
141    /// # use configuration::NetworkConfig;
142    /// # async fn example() -> Result<(), errors::OrchestratorError> {
143    /// #   let provider = NativeProvider::new(LocalFileSystem {});
144    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
145    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
146    /// let mut network = orchestrator.spawn(config).await?;
147    ///
148    /// // Create the options to add the new node
149    /// let opts = AddNodeOptions {
150    ///     rpc_port: Some(9444),
151    ///     is_validator: true,
152    ///     ..Default::default()
153    /// };
154    ///
155    /// network.add_node("new-node", opts).await?;
156    /// #   Ok(())
157    /// # }
158    /// ```
159    pub async fn add_node(
160        &mut self,
161        name: impl Into<String>,
162        options: AddNodeOptions,
163    ) -> Result<(), anyhow::Error> {
164        let name = generate_unique_node_name_from_names(
165            name,
166            &mut self.nodes_by_name.keys().cloned().collect(),
167        );
168
169        let relaychain = self.relaychain();
170
171        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
172            chain_spec_custom_path.clone()
173        } else {
174            PathBuf::from(format!(
175                "{}/{}.json",
176                self.ns.base_dir().to_string_lossy(),
177                relaychain.chain
178            ))
179        };
180
181        let chain_context = ChainDefaultContext {
182            default_command: self.initial_spec.relaychain.default_command.as_ref(),
183            default_image: self.initial_spec.relaychain.default_image.as_ref(),
184            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
185            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
186            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
187        };
188
189        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
190            &name,
191            options.into(),
192            &chain_context,
193            false,
194            false,
195        )?;
196
197        node_spec.available_args_output = Some(
198            self.initial_spec
199                .node_available_args_output(&node_spec, self.ns.clone())
200                .await?,
201        );
202
203        let base_dir = self.ns.base_dir().to_string_lossy();
204        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
205
206        let ctx = SpawnNodeCtx {
207            chain_id: &relaychain.chain_id,
208            parachain_id: None,
209            chain: &relaychain.chain,
210            role: ZombieRole::Node,
211            ns: &self.ns,
212            scoped_fs: &scoped_fs,
213            parachain: None,
214            bootnodes_addr: &vec![],
215            wait_ready: true,
216            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
217            global_settings: &self.initial_spec.global_settings,
218        };
219
220        let global_files_to_inject = vec![TransferedFile::new(
221            chain_spec_path,
222            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
223        )];
224
225        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
226
227        // TODO: register the new node as validator in the relaychain
228        // STEPS:
229        //  - check balance of `stash` derivation for validator account
230        //  - call rotate_keys on the new validator
231        //  - call setKeys on the new validator
232        // if node_spec.is_validator {
233        //     let running_node = self.relay.nodes.first().unwrap();
234        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
235        // }
236
237        // Let's make sure node is up before adding
238        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
239            .await?;
240
241        // Add node to relaychain data
242        self.add_running_node(node.clone(), None).await;
243
244        // Dump zombie.json
245        self.write_zombie_json().await?;
246
247        Ok(())
248    }
249
250    /// Add a new collator to a parachain
251    ///
252    /// NOTE: if more parachains with given id available (rare corner case)
253    /// then it adds collator to the first parachain
254    ///
255    /// # Example:
256    /// ```rust
257    /// # use provider::NativeProvider;
258    /// # use support::{fs::local::LocalFileSystem};
259    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
260    /// # use configuration::NetworkConfig;
261    /// # async fn example() -> Result<(), anyhow::Error> {
262    /// #   let provider = NativeProvider::new(LocalFileSystem {});
263    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
264    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
265    /// let mut network = orchestrator.spawn(config).await?;
266    ///
267    /// let col_opts = AddCollatorOptions {
268    ///     command: Some("polkadot-parachain".try_into()?),
269    ///     ..Default::default()
270    /// };
271    ///
272    /// network.add_collator("new-col-1", col_opts, 100).await?;
273    /// #   Ok(())
274    /// # }
275    /// ```
276    pub async fn add_collator(
277        &mut self,
278        name: impl Into<String>,
279        options: AddCollatorOptions,
280        para_id: u32,
281    ) -> Result<(), anyhow::Error> {
282        let name = generate_unique_node_name_from_names(
283            name,
284            &mut self.nodes_by_name.keys().cloned().collect(),
285        );
286        let spec = self
287            .initial_spec
288            .parachains
289            .iter()
290            .find(|para| para.id == para_id)
291            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
292        let role = if spec.is_cumulus_based {
293            ZombieRole::CumulusCollator
294        } else {
295            ZombieRole::Collator
296        };
297        let chain_context = ChainDefaultContext {
298            default_command: spec.default_command.as_ref(),
299            default_image: spec.default_image.as_ref(),
300            default_resources: spec.default_resources.as_ref(),
301            default_db_snapshot: spec.default_db_snapshot.as_ref(),
302            default_args: spec.default_args.iter().collect(),
303        };
304
305        let parachain = self
306            .parachains
307            .get_mut(&para_id)
308            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
309            .get_mut(0)
310            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
311
312        let base_dir = self.ns.base_dir().to_string_lossy();
313        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
314
315        // TODO: we want to still supporting spawn a dedicated bootnode??
316        let ctx = SpawnNodeCtx {
317            chain_id: &self.relay.chain_id,
318            parachain_id: parachain.chain_id.as_deref(),
319            chain: &self.relay.chain,
320            role,
321            ns: &self.ns,
322            scoped_fs: &scoped_fs,
323            parachain: Some(spec),
324            bootnodes_addr: &vec![],
325            wait_ready: true,
326            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
327            global_settings: &self.initial_spec.global_settings,
328        };
329
330        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
331            chain_spec_custom_path.clone()
332        } else {
333            PathBuf::from(format!(
334                "{}/{}.json",
335                self.ns.base_dir().to_string_lossy(),
336                self.relay.chain
337            ))
338        };
339
340        let mut global_files_to_inject = vec![TransferedFile::new(
341            relaychain_spec_path,
342            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
343        )];
344
345        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
346            Some(para_chain_spec_custom.clone())
347        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
348            Some(PathBuf::from(format!(
349                "{}/{}",
350                self.ns.base_dir().to_string_lossy(),
351                para_spec_path.to_string_lossy()
352            )))
353        } else {
354            None
355        };
356
357        if let Some(para_spec_path) = para_chain_spec_local_path {
358            global_files_to_inject.push(TransferedFile::new(
359                para_spec_path,
360                PathBuf::from(format!("/cfg/{para_id}.json")),
361            ));
362        }
363
364        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
365            name,
366            options.into(),
367            &chain_context,
368            true,
369            spec.is_evm_based,
370        )?;
371
372        node_spec.available_args_output = Some(
373            self.initial_spec
374                .node_available_args_output(&node_spec, self.ns.clone())
375                .await?,
376        );
377
378        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
379
380        // Let's make sure node is up before adding
381        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
382            .await?;
383
384        self.add_running_node(node, Some(para_id)).await;
385
386        // Dump zombie.json
387        self.write_zombie_json().await?;
388
389        Ok(())
390    }
391
392    /// Get a parachain config builder from a running network
393    ///
394    /// This allow you to build a new parachain config to be deployed into
395    /// the running network.
396    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
397        let used_ports = self
398            .nodes_iter()
399            .map(|node| node.spec())
400            .flat_map(|spec| {
401                [
402                    spec.ws_port.0,
403                    spec.rpc_port.0,
404                    spec.prometheus_port.0,
405                    spec.p2p_port.0,
406                ]
407            })
408            .collect();
409
410        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
411
412        // need to inverse logic of generate_unique_para_id
413        let used_para_ids = self
414            .parachains
415            .iter()
416            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
417            .collect();
418
419        let context = ValidationContext {
420            used_ports,
421            used_nodes_names,
422            used_para_ids,
423        };
424        let context = Rc::new(RefCell::new(context));
425
426        ParachainConfigBuilder::new_with_running(context)
427    }
428
429    /// Add a new parachain to the running network
430    ///
431    /// # Arguments
432    /// * `para_config` - Parachain configuration to deploy
433    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
434    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
435    ///
436    ///
437    /// # Example:
438    /// ```rust
439    /// # use anyhow::anyhow;
440    /// # use provider::NativeProvider;
441    /// # use support::{fs::local::LocalFileSystem};
442    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
443    /// # use configuration::NetworkConfig;
444    /// # async fn example() -> Result<(), anyhow::Error> {
445    /// #   let provider = NativeProvider::new(LocalFileSystem {});
446    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
447    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
448    /// let mut network = orchestrator.spawn(config).await?;
449    /// let para_config = network
450    ///     .para_config_builder()
451    ///     .with_id(100)
452    ///     .with_default_command("polkadot-parachain")
453    ///     .with_collator(|c| c.with_name("col-100-1"))
454    ///     .build()
455    ///     .map_err(|_e| anyhow!("Building config"))?;
456    ///
457    /// network.add_parachain(&para_config, None, None).await?;
458    ///
459    /// #   Ok(())
460    /// # }
461    /// ```
462    pub async fn add_parachain(
463        &mut self,
464        para_config: &ParachainConfig,
465        custom_relaychain_spec: Option<PathBuf>,
466        custom_parchain_fs_prefix: Option<String>,
467    ) -> Result<(), anyhow::Error> {
468        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
469        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
470
471        let mut global_files_to_inject = vec![];
472
473        // get relaychain id
474        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
475            // use this file as relaychain spec
476            global_files_to_inject.push(TransferedFile::new(
477                custom_path.clone(),
478                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
479            ));
480            let content = std::fs::read_to_string(custom_path)?;
481            ChainSpec::chain_id_from_spec(&content)?
482        } else {
483            global_files_to_inject.push(TransferedFile::new(
484                PathBuf::from(format!(
485                    "{}/{}",
486                    scoped_fs.base_dir,
487                    self.relaychain().chain_spec_path.to_string_lossy()
488                )),
489                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
490            ));
491            self.relay.chain_id.clone()
492        };
493
494        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
495            para_config,
496            relay_chain_id.as_str().try_into()?,
497        )?;
498
499        let chain_spec_raw_path = para_spec
500            .build_chain_spec(
501                &relay_chain_id,
502                &self.ns,
503                &scoped_fs,
504                para_spec.post_process_script.clone().as_deref(),
505            )
506            .await?;
507
508        // Para artifacts
509        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
510            custom_prefix
511        } else {
512            para_spec.id.to_string()
513        };
514
515        scoped_fs.create_dir(&para_path_prefix).await?;
516        // create wasm/state
517        para_spec
518            .genesis_state
519            .build(
520                chain_spec_raw_path.as_ref(),
521                format!("{}/genesis-state", &para_path_prefix),
522                &self.ns,
523                &scoped_fs,
524                None,
525            )
526            .await?;
527        para_spec
528            .genesis_wasm
529            .build(
530                chain_spec_raw_path.as_ref(),
531                format!("{}/para_spec-wasm", &para_path_prefix),
532                &self.ns,
533                &scoped_fs,
534                None,
535            )
536            .await?;
537
538        let parachain =
539            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
540        let parachain_id = parachain.chain_id.clone();
541
542        // Create `ctx` for spawn the nodes
543        let ctx_para = SpawnNodeCtx {
544            parachain: Some(&para_spec),
545            parachain_id: parachain_id.as_deref(),
546            role: if para_spec.is_cumulus_based {
547                ZombieRole::CumulusCollator
548            } else {
549                ZombieRole::Collator
550            },
551            bootnodes_addr: &para_config
552                .bootnodes_addresses()
553                .iter()
554                .map(|&a| a.to_string())
555                .collect(),
556            chain_id: &self.relaychain().chain_id,
557            chain: &self.relaychain().chain,
558            ns: &self.ns,
559            scoped_fs: &scoped_fs,
560            wait_ready: false,
561            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
562            global_settings: &self.initial_spec.global_settings,
563        };
564
565        // Register the parachain to the running network
566        let first_node_url = self
567            .relaychain()
568            .nodes
569            .first()
570            .ok_or(anyhow::anyhow!(
571                "At least one node of the relaychain should be running"
572            ))?
573            .ws_uri();
574
575        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
576            let register_para_options = RegisterParachainOptions {
577                id: parachain.para_id,
578                // This needs to resolve correctly
579                wasm_path: para_spec
580                    .genesis_wasm
581                    .artifact_path()
582                    .ok_or(anyhow::anyhow!(
583                        "artifact path for wasm must be set at this point",
584                    ))?
585                    .to_path_buf(),
586                state_path: para_spec
587                    .genesis_state
588                    .artifact_path()
589                    .ok_or(anyhow::anyhow!(
590                        "artifact path for state must be set at this point",
591                    ))?
592                    .to_path_buf(),
593                node_ws_url: first_node_url.to_string(),
594                onboard_as_para: para_spec.onboard_as_parachain,
595                seed: None, // TODO: Seed is passed by?
596                finalization: false,
597            };
598
599            Parachain::register(register_para_options, &scoped_fs).await?;
600        }
601
602        // Spawn the nodes
603        let spawning_tasks = para_spec
604            .collators
605            .iter()
606            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
607
608        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
609
610        // Let's make sure nodes are up before adding them
611        let waiting_tasks = running_nodes.iter().map(|node| {
612            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
613        });
614
615        let _ = futures::future::try_join_all(waiting_tasks).await?;
616
617        let running_para_id = parachain.para_id;
618        self.add_para(parachain);
619        for node in running_nodes {
620            self.add_running_node(node, Some(running_para_id)).await;
621        }
622
623        // Dump zombie.json
624        self.write_zombie_json().await?;
625
626        Ok(())
627    }
628
629    /// Register a parachain, which has already been added to the network (with manual registration
630    /// strategy)
631    ///
632    /// # Arguments
633    /// * `para_id` - Parachain Id
634    ///
635    ///
636    /// # Example:
637    /// ```rust
638    /// # use anyhow::anyhow;
639    /// # use provider::NativeProvider;
640    /// # use support::{fs::local::LocalFileSystem};
641    /// # use zombienet_orchestrator::Orchestrator;
642    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
643    /// # async fn example() -> Result<(), anyhow::Error> {
644    /// #   let provider = NativeProvider::new(LocalFileSystem {});
645    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
646    /// #   let config = NetworkConfigBuilder::new()
647    /// #     .with_relaychain(|r| {
648    /// #       r.with_chain("rococo-local")
649    /// #         .with_default_command("polkadot")
650    /// #         .with_node(|node| node.with_name("alice"))
651    /// #     })
652    /// #     .with_parachain(|p| {
653    /// #       p.with_id(100)
654    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
655    /// #         .with_default_command("test-parachain")
656    /// #         .with_collator(|n| n.with_name("dave").validator(false))
657    /// #     })
658    /// #     .build()
659    /// #     .map_err(|_e| anyhow!("Building config"))?;
660    /// let mut network = orchestrator.spawn(config).await?;
661    ///
662    /// network.register_parachain(100).await?;
663    ///
664    /// #   Ok(())
665    /// # }
666    /// ```
667    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
668        let para = self
669            .initial_spec
670            .parachains
671            .iter()
672            .find(|p| p.id == para_id)
673            .ok_or(anyhow::anyhow!(
674                "no parachain with id = {para_id} available",
675            ))?;
676        let para_genesis_config = para.get_genesis_config()?;
677        let first_node_url = self
678            .relaychain()
679            .nodes
680            .first()
681            .ok_or(anyhow::anyhow!(
682                "At least one node of the relaychain should be running"
683            ))?
684            .ws_uri();
685        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
686            id: para_id,
687            // This needs to resolve correctly
688            wasm_path: para_genesis_config.wasm_path.clone(),
689            state_path: para_genesis_config.state_path.clone(),
690            node_ws_url: first_node_url.to_string(),
691            onboard_as_para: para_genesis_config.as_parachain,
692            seed: None, // TODO: Seed is passed by?
693            finalization: false,
694        };
695        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
696        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
697        Parachain::register(register_para_options, &scoped_fs).await?;
698
699        Ok(())
700    }
701
702    // deregister and stop the collator?
703    // remove_parachain()
704
705    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
706        let name = name.into();
707        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
708            return Ok(node);
709        }
710
711        let list = self
712            .nodes_iter()
713            .map(|n| &n.name)
714            .cloned()
715            .collect::<Vec<_>>()
716            .join(", ");
717
718        Err(anyhow::anyhow!(
719            "can't find node with name: {name:?}, should be one of {list}"
720        ))
721    }
722
723    pub fn get_node_mut(
724        &mut self,
725        name: impl Into<String>,
726    ) -> Result<&mut NetworkNode, anyhow::Error> {
727        let name = name.into();
728        self.nodes_iter_mut()
729            .find(|n| n.name == name)
730            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
731    }
732
733    pub fn nodes(&self) -> Vec<&NetworkNode> {
734        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
735    }
736
737    pub async fn detach(&self) {
738        self.ns.detach().await
739    }
740
741    // Internal API
742    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
743        if let Some(para_id) = para_id {
744            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
745                para.collators.push(node.clone());
746            } else {
747                // is the first node of the para, let create the entry
748                unreachable!()
749            }
750        } else {
751            self.relay.nodes.push(node.clone());
752        }
753        // TODO: we should hold a ref to the node in the vec in the future.
754        node.set_is_running(true);
755        node.set_last_start_ts(
756            SystemTime::now()
757                .duration_since(UNIX_EPOCH)
758                .expect("Timestamp should be valid")
759                .as_secs(),
760        );
761        let node_name = node.name.clone();
762        self.nodes_by_name.insert(node_name, node.clone());
763        self.nodes_to_watch.write().await.push(node);
764    }
765
766    pub(crate) fn add_para(&mut self, para: Parachain) {
767        self.parachains.entry(para.para_id).or_default().push(para);
768    }
769
770    pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
771        let base_dir = self.ns.base_dir().to_string_lossy();
772        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
773        let ns_name = self.ns.name();
774
775        write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
776        Ok(())
777    }
778
779    pub fn name(&self) -> &str {
780        self.ns.name()
781    }
782
783    /// Get a first parachain from the list of the parachains with specified id.
784    /// NOTE!
785    /// Usually the list will contain only one parachain.
786    /// Multiple parachains with the same id is a corner case.
787    /// If this is the case then one can get such parachain with
788    /// `parachain_by_unique_id()` method
789    ///
790    /// # Arguments
791    /// * `para_id` - Parachain Id
792    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
793        self.parachains.get(&para_id)?.first()
794    }
795
796    /// Get a parachain by its unique id.
797    ///
798    /// This is particularly useful if there are multiple parachains
799    /// with the same id (this is a rare corner case).
800    ///
801    /// # Arguments
802    /// * `unique_id` - unique id of the parachain
803    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
804        self.parachains
805            .values()
806            .flat_map(|p| p.iter())
807            .find(|p| p.unique_id == unique_id.as_ref())
808    }
809
810    pub fn parachains(&self) -> Vec<&Parachain> {
811        self.parachains.values().flatten().collect()
812    }
813
814    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
815        self.relay.nodes.iter().chain(
816            self.parachains
817                .values()
818                .flat_map(|p| p.iter())
819                .flat_map(|p| &p.collators),
820        )
821    }
822
823    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
824        self.relay.nodes.iter_mut().chain(
825            self.parachains
826                .values_mut()
827                .flat_map(|p| p.iter_mut())
828                .flat_map(|p| &mut p.collators),
829        )
830    }
831
832    /// Waits given number of seconds until all nodes in the network report that they are
833    /// up and running.
834    ///
835    /// # Arguments
836    /// * `timeout_secs` - The number of seconds to wait.
837    ///
838    /// # Returns
839    /// * `Ok()` if the node is up before timeout occured.
840    /// * `Err(e)` if timeout or other error occurred while waiting.
841    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
842        let handles = self
843            .nodes_iter()
844            .map(|node| node.wait_until_is_up(timeout_secs));
845
846        futures::future::try_join_all(handles).await?;
847
848        Ok(())
849    }
850
851    /// Start the observability stack (Prometheus + Grafana) as an add-on
852    ///
853    /// This can be called on any running network — whether freshly spawned or
854    /// re-attached via [`Orchestrator::attach_to_live`]. If observability is
855    /// already running, it will be stopped first
856    ///
857    /// # Example:
858    /// ```rust
859    /// # use provider::NativeProvider;
860    /// # use support::{fs::local::LocalFileSystem};
861    /// # use zombienet_orchestrator::{errors, Orchestrator};
862    /// # use configuration::{NetworkConfig, ObservabilityConfigBuilder};
863    /// # async fn example() -> Result<(), errors::OrchestratorError> {
864    /// #   let provider = NativeProvider::new(LocalFileSystem {});
865    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
866    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
867    /// let mut network = orchestrator.spawn(config).await?;
868    ///
869    /// let obs_config = ObservabilityConfigBuilder::new()
870    ///     .with_enabled(true)
871    ///     .with_grafana_port(3000)
872    ///     .build();
873    ///
874    /// let info = network.start_observability(&obs_config).await?;
875    /// println!("Grafana: {}", info.grafana_url);
876    /// #   Ok(())
877    /// # }
878    /// ```
879    pub async fn start_observability(
880        &mut self,
881        config: &configuration::ObservabilityConfig,
882    ) -> Result<&ObservabilityInfo, anyhow::Error> {
883        if self.observability().is_some() {
884            self.stop_observability().await?;
885        }
886
887        let nodes = self.nodes();
888        let info = observability::spawn_observability_stack(
889            config,
890            &nodes,
891            self.ns.name(),
892            self.ns.base_dir(),
893            &self.filesystem,
894        )
895        .await?;
896
897        self.observability = ObservabilityState::Running(info);
898        self.observability()
899            .ok_or_else(|| anyhow::anyhow!("observability state was just set but is not running"))
900    }
901
902    /// Stop the observability stack if running
903    ///
904    /// Removes the Prometheus and Grafana containers. This is safe to call
905    /// even if no observability stack is running (it will be a no-op)
906    pub async fn stop_observability(&mut self) -> Result<(), anyhow::Error> {
907        if let ObservabilityState::Running(info) =
908            std::mem::replace(&mut self.observability, ObservabilityState::Stopped)
909        {
910            observability::cleanup_observability_stack(&info).await?;
911        }
912        Ok(())
913    }
914
915    pub(crate) fn spawn_watching_task(&self) {
916        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
917        let ns = Arc::clone(&self.ns);
918        let node_bootstrap_timeout = self.initial_spec.global_settings.node_spawn_timeout();
919
920        tokio::spawn(async move {
921            loop {
922                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
923
924                let guard = nodes_to_watch.read().await;
925                let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
926
927                let all_running = {
928                    let all_running =
929                        futures::future::try_join_all(nodes.iter().map(|n| {
930                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
931                        }))
932                        .await;
933
934                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
935                    if nodes.iter().any(|n| !n.is_running()) {
936                        continue;
937                    } else {
938                        all_running
939                    }
940                };
941
942                if let Err(e) = all_running {
943                    // check if the node was restarted and we need to give it more time
944                    if let Some(node) = nodes.iter().find(|n| n.name() == e.to_string()) {
945                        let now = SystemTime::now()
946                            .duration_since(UNIX_EPOCH)
947                            .expect("get current ts should work.")
948                            .as_secs();
949                        if node_bootstrap_timeout as u64 > (now - node.last_start_ts()) {
950                            warn!("[{}] still in bootstrap window from last starting ({}), continue waiting...", node.name(), node.last_start_ts());
951                            continue;
952                        }
953                    }
954
955                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
956
957                    if let Err(e) = ns.destroy().await {
958                        error!("an error occurred during network teardown: {}", e);
959                    }
960
961                    std::process::exit(1);
962                }
963            }
964        });
965    }
966
967    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
968        self.parachains = parachains;
969    }
970
971    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
972        self.nodes_by_name.insert(node.name.clone(), node);
973    }
974
975    pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
976        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
977            self.start_time_ts = Some(start_time_ts.as_millis().to_string());
978        } else {
979            // Just warn, do not propagate the err (this should not happens)
980            warn!("⚠️ Error getting start_time timestamp");
981        }
982    }
983}