zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7    cell::RefCell,
8    collections::HashMap,
9    path::PathBuf,
10    rc::Rc,
11    sync::Arc,
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use configuration::{
16    para_states::{Initial, Running},
17    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18    types::{Arg, Command, Image, ParaId, Port, ValidationContext},
19    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::{Deserialize, Serialize};
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29    generators::chain_spec::ChainSpec,
30    network_spec::{self, NetworkSpec},
31    observability::{self, ObservabilityInfo, ObservabilityState},
32    shared::{
33        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
34        macros,
35        types::{ChainDefaultContext, RegisterParachainOptions},
36    },
37    spawner::{self, SpawnNodeCtx},
38    utils::write_zombie_json,
39    ScopedFilesystem, ZombieRole,
40};
41
42/// Context where the node is running
43/// RC or Para
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
45pub enum NodeContext {
46    Rc,
47    Para {
48        para_id: ParaId,
49        is_cumulus_based: bool,
50    },
51}
52
53#[derive(Serialize)]
54pub struct Network<T: FileSystem> {
55    #[serde(skip)]
56    ns: DynNamespace,
57    #[serde(skip)]
58    filesystem: T,
59    relay: Relaychain,
60    initial_spec: NetworkSpec,
61    parachains: HashMap<u32, Vec<Parachain>>,
62    #[serde(skip)]
63    nodes_by_name: HashMap<String, NetworkNode>,
64    #[serde(skip)]
65    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    start_time_ts: Option<String>,
68    #[serde(skip)]
69    observability: ObservabilityState,
70}
71
72impl<T: FileSystem> std::fmt::Debug for Network<T> {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("Network")
75            .field("ns", &"ns_skipped")
76            .field("relay", &self.relay)
77            .field("initial_spec", &self.initial_spec)
78            .field("parachains", &self.parachains)
79            .field("nodes_by_name", &self.nodes_by_name)
80            .field("observability", &self.observability)
81            .finish()
82    }
83}
84
85macros::create_add_options!(AddNodeOptions {
86    chain_spec: Option<PathBuf>,
87    override_eth_key: Option<String>
88});
89
90macros::create_add_options!(AddCollatorOptions {
91    chain_spec: Option<PathBuf>,
92    chain_spec_relay: Option<PathBuf>,
93    override_eth_key: Option<String>
94});
95
96impl<T: FileSystem> Network<T> {
97    pub(crate) fn new_with_relay(
98        relay: Relaychain,
99        ns: DynNamespace,
100        fs: T,
101        initial_spec: NetworkSpec,
102    ) -> Self {
103        Self {
104            ns,
105            filesystem: fs,
106            relay,
107            initial_spec,
108            parachains: Default::default(),
109            nodes_by_name: Default::default(),
110            nodes_to_watch: Default::default(),
111            start_time_ts: Default::default(),
112            observability: ObservabilityState::default(),
113        }
114    }
115
116    // Pubic API
117    pub fn ns_name(&self) -> String {
118        self.ns.name().to_string()
119    }
120
121    pub fn base_dir(&self) -> Option<&str> {
122        self.ns.base_dir().to_str()
123    }
124
125    pub fn relaychain(&self) -> &Relaychain {
126        &self.relay
127    }
128
129    // Teardown the network
130    pub async fn destroy(mut self) -> Result<(), ProviderError> {
131        if let Err(e) = self.stop_observability().await {
132            warn!("⚠️  Failed to cleanup observability stack: {e}");
133        }
134        self.ns.destroy().await
135    }
136
137    pub fn observability(&self) -> Option<&ObservabilityInfo> {
138        self.observability.as_runnnig()
139    }
140
141    pub fn observability_state(&self) -> &ObservabilityState {
142        &self.observability
143    }
144
145    /// Add a node to the relaychain
146    // The new node is added to the running network instance.
147    /// # Example:
148    /// ```rust
149    /// # use provider::NativeProvider;
150    /// # use support::{fs::local::LocalFileSystem};
151    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
152    /// # use configuration::NetworkConfig;
153    /// # async fn example() -> Result<(), errors::OrchestratorError> {
154    /// #   let provider = NativeProvider::new(LocalFileSystem {});
155    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
156    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
157    /// let mut network = orchestrator.spawn(config).await?;
158    ///
159    /// // Create the options to add the new node
160    /// let opts = AddNodeOptions {
161    ///     rpc_port: Some(9444),
162    ///     is_validator: true,
163    ///     ..Default::default()
164    /// };
165    ///
166    /// network.add_node("new-node", opts).await?;
167    /// #   Ok(())
168    /// # }
169    /// ```
170    pub async fn add_node(
171        &mut self,
172        name: impl Into<String>,
173        options: AddNodeOptions,
174    ) -> Result<(), anyhow::Error> {
175        let name = generate_unique_node_name_from_names(
176            name,
177            &mut self.nodes_by_name.keys().cloned().collect(),
178        );
179
180        let relaychain = self.relaychain();
181
182        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
183            chain_spec_custom_path.clone()
184        } else {
185            PathBuf::from(format!(
186                "{}/{}.json",
187                self.ns.base_dir().to_string_lossy(),
188                relaychain.chain
189            ))
190        };
191
192        let chain_context = ChainDefaultContext {
193            default_command: self.initial_spec.relaychain.default_command.as_ref(),
194            default_image: self.initial_spec.relaychain.default_image.as_ref(),
195            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
196            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
197            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
198        };
199
200        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
201            &name,
202            options.into(),
203            &chain_context,
204            false,
205            false,
206        )?;
207
208        node_spec.available_args_output = Some(
209            self.initial_spec
210                .node_available_args_output(&node_spec, self.ns.clone())
211                .await?,
212        );
213
214        let base_dir = self.ns.base_dir().to_string_lossy();
215        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
216
217        let ctx = SpawnNodeCtx {
218            chain_id: &relaychain.chain_id,
219            parachain_id: None,
220            chain: &relaychain.chain,
221            role: ZombieRole::Node,
222            ns: &self.ns,
223            scoped_fs: &scoped_fs,
224            parachain: None,
225            bootnodes_addr: &vec![],
226            wait_ready: true,
227            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
228            global_settings: &self.initial_spec.global_settings,
229        };
230
231        let global_files_to_inject = vec![TransferedFile::new(
232            chain_spec_path,
233            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
234        )];
235
236        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
237
238        // TODO: register the new node as validator in the relaychain
239        // STEPS:
240        //  - check balance of `stash` derivation for validator account
241        //  - call rotate_keys on the new validator
242        //  - call setKeys on the new validator
243        // if node_spec.is_validator {
244        //     let running_node = self.relay.nodes.first().unwrap();
245        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
246        // }
247
248        // Let's make sure node is up before adding
249        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
250            .await?;
251
252        // Add node to relaychain data
253        self.add_running_node(node.clone(), None).await;
254
255        // Dump zombie.json
256        self.write_zombie_json().await?;
257
258        Ok(())
259    }
260
261    /// Add a new collator to a parachain
262    ///
263    /// NOTE: if more parachains with given id available (rare corner case)
264    /// then it adds collator to the first parachain
265    ///
266    /// # Example:
267    /// ```rust
268    /// # use provider::NativeProvider;
269    /// # use support::{fs::local::LocalFileSystem};
270    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
271    /// # use configuration::NetworkConfig;
272    /// # async fn example() -> Result<(), anyhow::Error> {
273    /// #   let provider = NativeProvider::new(LocalFileSystem {});
274    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
275    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
276    /// let mut network = orchestrator.spawn(config).await?;
277    ///
278    /// let col_opts = AddCollatorOptions {
279    ///     command: Some("polkadot-parachain".try_into()?),
280    ///     ..Default::default()
281    /// };
282    ///
283    /// network.add_collator("new-col-1", col_opts, 100).await?;
284    /// #   Ok(())
285    /// # }
286    /// ```
287    pub async fn add_collator(
288        &mut self,
289        name: impl Into<String>,
290        options: AddCollatorOptions,
291        para_id: u32,
292    ) -> Result<(), anyhow::Error> {
293        let name = generate_unique_node_name_from_names(
294            name,
295            &mut self.nodes_by_name.keys().cloned().collect(),
296        );
297        let spec = self
298            .initial_spec
299            .parachains
300            .iter()
301            .find(|para| para.id == para_id)
302            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
303        let role = if spec.is_cumulus_based {
304            ZombieRole::CumulusCollator
305        } else {
306            ZombieRole::Collator
307        };
308        let chain_context = ChainDefaultContext {
309            default_command: spec.default_command.as_ref(),
310            default_image: spec.default_image.as_ref(),
311            default_resources: spec.default_resources.as_ref(),
312            default_db_snapshot: spec.default_db_snapshot.as_ref(),
313            default_args: spec.default_args.iter().collect(),
314        };
315
316        let parachain = self
317            .parachains
318            .get_mut(&para_id)
319            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
320            .get_mut(0)
321            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
322
323        let base_dir = self.ns.base_dir().to_string_lossy();
324        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
325
326        // TODO: we want to still supporting spawn a dedicated bootnode??
327        let ctx = SpawnNodeCtx {
328            chain_id: &self.relay.chain_id,
329            parachain_id: parachain.chain_id.as_deref(),
330            chain: &self.relay.chain,
331            role,
332            ns: &self.ns,
333            scoped_fs: &scoped_fs,
334            parachain: Some(spec),
335            bootnodes_addr: &vec![],
336            wait_ready: true,
337            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
338            global_settings: &self.initial_spec.global_settings,
339        };
340
341        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
342            chain_spec_custom_path.clone()
343        } else {
344            PathBuf::from(format!(
345                "{}/{}.json",
346                self.ns.base_dir().to_string_lossy(),
347                self.relay.chain
348            ))
349        };
350
351        let mut global_files_to_inject = vec![TransferedFile::new(
352            relaychain_spec_path,
353            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
354        )];
355
356        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
357            Some(para_chain_spec_custom.clone())
358        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
359            Some(PathBuf::from(format!(
360                "{}/{}",
361                self.ns.base_dir().to_string_lossy(),
362                para_spec_path.to_string_lossy()
363            )))
364        } else {
365            None
366        };
367
368        if let Some(para_spec_path) = para_chain_spec_local_path {
369            global_files_to_inject.push(TransferedFile::new(
370                para_spec_path,
371                PathBuf::from(format!("/cfg/{para_id}.json")),
372            ));
373        }
374
375        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
376            name,
377            options.into(),
378            &chain_context,
379            true,
380            spec.is_evm_based,
381        )?;
382
383        node_spec.available_args_output = Some(
384            self.initial_spec
385                .node_available_args_output(&node_spec, self.ns.clone())
386                .await?,
387        );
388
389        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
390
391        // Let's make sure node is up before adding
392        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
393            .await?;
394
395        self.add_running_node(node, Some(para_id)).await;
396
397        // Dump zombie.json
398        self.write_zombie_json().await?;
399
400        Ok(())
401    }
402
403    /// Get a parachain config builder from a running network
404    ///
405    /// This allow you to build a new parachain config to be deployed into
406    /// the running network.
407    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
408        let used_ports = self
409            .nodes_iter()
410            .map(|node| node.spec())
411            .flat_map(|spec| {
412                [
413                    spec.ws_port.0,
414                    spec.rpc_port.0,
415                    spec.prometheus_port.0,
416                    spec.p2p_port.0,
417                ]
418            })
419            .collect();
420
421        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
422
423        // need to inverse logic of generate_unique_para_id
424        let used_para_ids = self
425            .parachains
426            .iter()
427            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
428            .collect();
429
430        let context = ValidationContext {
431            used_ports,
432            used_nodes_names,
433            used_para_ids,
434        };
435        let context = Rc::new(RefCell::new(context));
436
437        ParachainConfigBuilder::new_with_running(context)
438    }
439
440    /// Add a new parachain to the running network
441    ///
442    /// # Arguments
443    /// * `para_config` - Parachain configuration to deploy
444    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
445    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
446    ///
447    ///
448    /// # Example:
449    /// ```rust
450    /// # use anyhow::anyhow;
451    /// # use provider::NativeProvider;
452    /// # use support::{fs::local::LocalFileSystem};
453    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
454    /// # use configuration::NetworkConfig;
455    /// # async fn example() -> Result<(), anyhow::Error> {
456    /// #   let provider = NativeProvider::new(LocalFileSystem {});
457    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
458    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
459    /// let mut network = orchestrator.spawn(config).await?;
460    /// let para_config = network
461    ///     .para_config_builder()
462    ///     .with_id(100)
463    ///     .with_default_command("polkadot-parachain")
464    ///     .with_collator(|c| c.with_name("col-100-1"))
465    ///     .build()
466    ///     .map_err(|_e| anyhow!("Building config"))?;
467    ///
468    /// network.add_parachain(&para_config, None, None).await?;
469    ///
470    /// #   Ok(())
471    /// # }
472    /// ```
473    pub async fn add_parachain(
474        &mut self,
475        para_config: &ParachainConfig,
476        custom_relaychain_spec: Option<PathBuf>,
477        custom_parchain_fs_prefix: Option<String>,
478    ) -> Result<(), anyhow::Error> {
479        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
480        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
481
482        let mut global_files_to_inject = vec![];
483
484        // get relaychain id
485        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
486            // use this file as relaychain spec
487            global_files_to_inject.push(TransferedFile::new(
488                custom_path.clone(),
489                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
490            ));
491            let content = std::fs::read_to_string(custom_path)?;
492            ChainSpec::chain_id_from_spec(&content)?
493        } else {
494            global_files_to_inject.push(TransferedFile::new(
495                PathBuf::from(format!(
496                    "{}/{}",
497                    scoped_fs.base_dir,
498                    self.relaychain().chain_spec_path.to_string_lossy()
499                )),
500                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
501            ));
502            self.relay.chain_id.clone()
503        };
504
505        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
506            para_config,
507            relay_chain_id.as_str().try_into()?,
508        )?;
509
510        let chain_spec_raw_path = para_spec
511            .build_chain_spec(
512                &relay_chain_id,
513                &self.ns,
514                &scoped_fs,
515                para_spec.post_process_script.clone().as_deref(),
516            )
517            .await?;
518
519        // Para artifacts
520        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
521            custom_prefix
522        } else {
523            para_spec.id.to_string()
524        };
525
526        scoped_fs.create_dir(&para_path_prefix).await?;
527        // create wasm/state
528        para_spec
529            .genesis_state
530            .build(
531                chain_spec_raw_path.as_ref(),
532                format!("{}/genesis-state", &para_path_prefix),
533                &self.ns,
534                &scoped_fs,
535                None,
536            )
537            .await?;
538        para_spec
539            .genesis_wasm
540            .build(
541                chain_spec_raw_path.as_ref(),
542                format!("{}/para_spec-wasm", &para_path_prefix),
543                &self.ns,
544                &scoped_fs,
545                None,
546            )
547            .await?;
548
549        let parachain =
550            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
551        let parachain_id = parachain.chain_id.clone();
552
553        // Create `ctx` for spawn the nodes
554        let ctx_para = SpawnNodeCtx {
555            parachain: Some(&para_spec),
556            parachain_id: parachain_id.as_deref(),
557            role: if para_spec.is_cumulus_based {
558                ZombieRole::CumulusCollator
559            } else {
560                ZombieRole::Collator
561            },
562            bootnodes_addr: &para_config
563                .bootnodes_addresses()
564                .iter()
565                .map(|&a| a.to_string())
566                .collect(),
567            chain_id: &self.relaychain().chain_id,
568            chain: &self.relaychain().chain,
569            ns: &self.ns,
570            scoped_fs: &scoped_fs,
571            wait_ready: false,
572            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
573            global_settings: &self.initial_spec.global_settings,
574        };
575
576        // Register the parachain to the running network
577        let first_node_url = self
578            .relaychain()
579            .nodes
580            .first()
581            .ok_or(anyhow::anyhow!(
582                "At least one node of the relaychain should be running"
583            ))?
584            .ws_uri();
585
586        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
587            let register_para_options = RegisterParachainOptions {
588                id: parachain.para_id,
589                // This needs to resolve correctly
590                wasm_path: para_spec
591                    .genesis_wasm
592                    .artifact_path()
593                    .ok_or(anyhow::anyhow!(
594                        "artifact path for wasm must be set at this point",
595                    ))?
596                    .to_path_buf(),
597                state_path: para_spec
598                    .genesis_state
599                    .artifact_path()
600                    .ok_or(anyhow::anyhow!(
601                        "artifact path for state must be set at this point",
602                    ))?
603                    .to_path_buf(),
604                node_ws_url: first_node_url.to_string(),
605                onboard_as_para: para_spec.onboard_as_parachain,
606                seed: None, // TODO: Seed is passed by?
607                finalization: false,
608            };
609
610            Parachain::register(register_para_options, &scoped_fs).await?;
611        }
612
613        // Spawn the nodes
614        let spawning_tasks = para_spec
615            .collators
616            .iter()
617            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
618
619        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
620
621        // Let's make sure nodes are up before adding them
622        let waiting_tasks = running_nodes.iter().map(|node| {
623            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
624        });
625
626        let _ = futures::future::try_join_all(waiting_tasks).await?;
627
628        let running_para_id = parachain.para_id;
629        self.add_para(parachain);
630        for node in running_nodes {
631            self.add_running_node(node, Some(running_para_id)).await;
632        }
633
634        // Dump zombie.json
635        self.write_zombie_json().await?;
636
637        Ok(())
638    }
639
640    /// Register a parachain, which has already been added to the network (with manual registration
641    /// strategy)
642    ///
643    /// # Arguments
644    /// * `para_id` - Parachain Id
645    ///
646    ///
647    /// # Example:
648    /// ```rust
649    /// # use anyhow::anyhow;
650    /// # use provider::NativeProvider;
651    /// # use support::{fs::local::LocalFileSystem};
652    /// # use zombienet_orchestrator::Orchestrator;
653    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
654    /// # async fn example() -> Result<(), anyhow::Error> {
655    /// #   let provider = NativeProvider::new(LocalFileSystem {});
656    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
657    /// #   let config = NetworkConfigBuilder::new()
658    /// #     .with_relaychain(|r| {
659    /// #       r.with_chain("rococo-local")
660    /// #         .with_default_command("polkadot")
661    /// #         .with_node(|node| node.with_name("alice"))
662    /// #     })
663    /// #     .with_parachain(|p| {
664    /// #       p.with_id(100)
665    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
666    /// #         .with_default_command("test-parachain")
667    /// #         .with_collator(|n| n.with_name("dave").validator(false))
668    /// #     })
669    /// #     .build()
670    /// #     .map_err(|_e| anyhow!("Building config"))?;
671    /// let mut network = orchestrator.spawn(config).await?;
672    ///
673    /// network.register_parachain(100).await?;
674    ///
675    /// #   Ok(())
676    /// # }
677    /// ```
678    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
679        let para = self
680            .initial_spec
681            .parachains
682            .iter()
683            .find(|p| p.id == para_id)
684            .ok_or(anyhow::anyhow!(
685                "no parachain with id = {para_id} available",
686            ))?;
687        let para_genesis_config = para.get_genesis_config()?;
688        let first_node_url = self
689            .relaychain()
690            .nodes
691            .first()
692            .ok_or(anyhow::anyhow!(
693                "At least one node of the relaychain should be running"
694            ))?
695            .ws_uri();
696        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
697            id: para_id,
698            // This needs to resolve correctly
699            wasm_path: para_genesis_config.wasm_path.clone(),
700            state_path: para_genesis_config.state_path.clone(),
701            node_ws_url: first_node_url.to_string(),
702            onboard_as_para: para_genesis_config.as_parachain,
703            seed: None, // TODO: Seed is passed by?
704            finalization: false,
705        };
706        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
707        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
708        Parachain::register(register_para_options, &scoped_fs).await?;
709
710        Ok(())
711    }
712
713    // deregister and stop the collator?
714    // remove_parachain()
715
716    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
717        let name = name.into();
718        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
719            return Ok(node);
720        }
721
722        let list = self.node_names().join(", ");
723
724        Err(anyhow::anyhow!(
725            "can't find node with name: {name:?}, should be one of {list}"
726        ))
727    }
728
729    pub fn get_node_mut(
730        &mut self,
731        name: impl Into<String>,
732    ) -> Result<&mut NetworkNode, anyhow::Error> {
733        let name = name.into();
734        self.nodes_iter_mut()
735            .find(|n| n.name == name)
736            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
737    }
738
739    pub fn node_names(&self) -> Vec<String> {
740        self.nodes_iter()
741            .map(|n| &n.name)
742            .cloned()
743            .collect::<Vec<_>>()
744    }
745
746    pub fn nodes(&self) -> Vec<&NetworkNode> {
747        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
748    }
749
750    pub async fn detach(&self) {
751        self.ns.detach().await
752    }
753
754    // Internal API
755    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
756        if let Some(para_id) = para_id {
757            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
758                para.collators.push(node.clone());
759            } else {
760                // is the first node of the para, let create the entry
761                unreachable!()
762            }
763        } else {
764            self.relay.nodes.push(node.clone());
765        }
766        // TODO: we should hold a ref to the node in the vec in the future.
767        node.set_is_running(true);
768        node.set_last_start_ts(
769            SystemTime::now()
770                .duration_since(UNIX_EPOCH)
771                .expect("Timestamp should be valid")
772                .as_secs(),
773        );
774        let node_name = node.name.clone();
775        self.nodes_by_name.insert(node_name, node.clone());
776        self.nodes_to_watch.write().await.push(node);
777    }
778
779    pub(crate) fn add_para(&mut self, para: Parachain) {
780        self.parachains.entry(para.para_id).or_default().push(para);
781    }
782
783    pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
784        let base_dir = self.ns.base_dir().to_string_lossy();
785        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
786        let ns_name = self.ns.name();
787
788        write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
789        Ok(())
790    }
791
792    pub fn name(&self) -> &str {
793        self.ns.name()
794    }
795
796    /// Get a first parachain from the list of the parachains with specified id.
797    /// NOTE!
798    /// Usually the list will contain only one parachain.
799    /// Multiple parachains with the same id is a corner case.
800    /// If this is the case then one can get such parachain with
801    /// `parachain_by_unique_id()` method
802    ///
803    /// # Arguments
804    /// * `para_id` - Parachain Id
805    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
806        self.parachains.get(&para_id)?.first()
807    }
808
809    /// Get a parachain by its unique id.
810    ///
811    /// This is particularly useful if there are multiple parachains
812    /// with the same id (this is a rare corner case).
813    ///
814    /// # Arguments
815    /// * `unique_id` - unique id of the parachain
816    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
817        self.parachains
818            .values()
819            .flat_map(|p| p.iter())
820            .find(|p| p.unique_id == unique_id.as_ref())
821    }
822
823    pub fn parachains(&self) -> Vec<&Parachain> {
824        self.parachains.values().flatten().collect()
825    }
826
827    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
828        self.relay.nodes.iter().chain(
829            self.parachains
830                .values()
831                .flat_map(|p| p.iter())
832                .flat_map(|p| &p.collators),
833        )
834    }
835
836    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
837        self.relay.nodes.iter_mut().chain(
838            self.parachains
839                .values_mut()
840                .flat_map(|p| p.iter_mut())
841                .flat_map(|p| &mut p.collators),
842        )
843    }
844
845    /// Waits given number of seconds until all nodes in the network report that they are
846    /// up and running.
847    ///
848    /// # Arguments
849    /// * `timeout_secs` - The number of seconds to wait.
850    ///
851    /// # Returns
852    /// * `Ok()` if the node is up before timeout occured.
853    /// * `Err(e)` if timeout or other error occurred while waiting.
854    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
855        let handles = self
856            .nodes_iter()
857            .map(|node| node.wait_until_is_up(timeout_secs));
858
859        futures::future::try_join_all(handles).await?;
860
861        Ok(())
862    }
863
864    /// Start the observability stack (Prometheus + Grafana) as an add-on
865    ///
866    /// This can be called on any running network — whether freshly spawned or
867    /// re-attached via [`Orchestrator::attach_to_live`]. If observability is
868    /// already running, it will be stopped first
869    ///
870    /// # Example:
871    /// ```rust
872    /// # use provider::NativeProvider;
873    /// # use support::{fs::local::LocalFileSystem};
874    /// # use zombienet_orchestrator::{errors, Orchestrator};
875    /// # use configuration::{NetworkConfig, ObservabilityConfigBuilder};
876    /// # async fn example() -> Result<(), errors::OrchestratorError> {
877    /// #   let provider = NativeProvider::new(LocalFileSystem {});
878    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
879    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
880    /// let mut network = orchestrator.spawn(config).await?;
881    ///
882    /// let obs_config = ObservabilityConfigBuilder::new()
883    ///     .with_enabled(true)
884    ///     .with_grafana_port(3000)
885    ///     .build();
886    ///
887    /// let info = network.start_observability(&obs_config).await?;
888    /// println!("Grafana: {}", info.grafana_url);
889    /// #   Ok(())
890    /// # }
891    /// ```
892    pub async fn start_observability(
893        &mut self,
894        config: &configuration::ObservabilityConfig,
895    ) -> Result<&ObservabilityInfo, anyhow::Error> {
896        if self.observability().is_some() {
897            self.stop_observability().await?;
898        }
899
900        let nodes = self.nodes();
901        let info = observability::spawn_observability_stack(
902            config,
903            &nodes,
904            self.ns.name(),
905            self.ns.base_dir(),
906            &self.filesystem,
907        )
908        .await?;
909
910        self.observability = ObservabilityState::Running(info);
911        self.observability()
912            .ok_or_else(|| anyhow::anyhow!("observability state was just set but is not running"))
913    }
914
915    /// Stop the observability stack if running
916    ///
917    /// Removes the Prometheus and Grafana containers. This is safe to call
918    /// even if no observability stack is running (it will be a no-op)
919    pub async fn stop_observability(&mut self) -> Result<(), anyhow::Error> {
920        if let ObservabilityState::Running(info) =
921            std::mem::replace(&mut self.observability, ObservabilityState::Stopped)
922        {
923            observability::cleanup_observability_stack(&info).await?;
924        }
925        Ok(())
926    }
927
928    pub(crate) fn spawn_watching_task(&self) {
929        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
930        let ns = Arc::clone(&self.ns);
931        let node_bootstrap_timeout = self.initial_spec.global_settings.node_spawn_timeout();
932
933        tokio::spawn(async move {
934            loop {
935                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
936
937                let guard = nodes_to_watch.read().await;
938                let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
939
940                let all_running = {
941                    let all_running =
942                        futures::future::try_join_all(nodes.iter().map(|n| {
943                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
944                        }))
945                        .await;
946
947                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
948                    if nodes.iter().any(|n| !n.is_running()) {
949                        continue;
950                    } else {
951                        all_running
952                    }
953                };
954
955                if let Err(e) = all_running {
956                    // check if the node was restarted and we need to give it more time
957                    if let Some(node) = nodes.iter().find(|n| n.name() == e.to_string()) {
958                        let now = SystemTime::now()
959                            .duration_since(UNIX_EPOCH)
960                            .expect("get current ts should work.")
961                            .as_secs();
962                        if node_bootstrap_timeout as u64 > (now - node.last_start_ts()) {
963                            warn!("[{}] still in bootstrap window from last starting ({}), continue waiting...", node.name(), node.last_start_ts());
964                            continue;
965                        }
966                    }
967
968                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
969
970                    if let Err(e) = ns.destroy().await {
971                        error!("an error occurred during network teardown: {}", e);
972                    }
973
974                    std::process::exit(1);
975                }
976            }
977        });
978    }
979
980    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
981        self.parachains = parachains;
982    }
983
984    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
985        self.nodes_by_name.insert(node.name.clone(), node);
986    }
987
988    pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
989        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
990            self.start_time_ts = Some(start_time_ts.as_millis().to_string());
991        } else {
992            // Just warn, do not propagate the err (this should not happens)
993            warn!("⚠️ Error getting start_time timestamp");
994        }
995    }
996}