zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7    cell::RefCell,
8    collections::HashMap,
9    path::PathBuf,
10    rc::Rc,
11    sync::Arc,
12    time::{Duration, SystemTime},
13};
14
15use configuration::{
16    para_states::{Initial, Running},
17    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18    types::{Arg, Command, Image, Port, ValidationContext},
19    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::Serialize;
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29    generators::chain_spec::ChainSpec,
30    network_spec::{self, NetworkSpec},
31    observability::{self, ObservabilityInfo, ObservabilityState},
32    shared::{
33        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
34        macros,
35        types::{ChainDefaultContext, RegisterParachainOptions},
36    },
37    spawner::{self, SpawnNodeCtx},
38    utils::write_zombie_json,
39    ScopedFilesystem, ZombieRole,
40};
41
42#[derive(Serialize)]
43pub struct Network<T: FileSystem> {
44    #[serde(skip)]
45    ns: DynNamespace,
46    #[serde(skip)]
47    filesystem: T,
48    relay: Relaychain,
49    initial_spec: NetworkSpec,
50    parachains: HashMap<u32, Vec<Parachain>>,
51    #[serde(skip)]
52    nodes_by_name: HashMap<String, NetworkNode>,
53    #[serde(skip)]
54    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    start_time_ts: Option<String>,
57    #[serde(skip)]
58    observability: ObservabilityState,
59}
60
61impl<T: FileSystem> std::fmt::Debug for Network<T> {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Network")
64            .field("ns", &"ns_skipped")
65            .field("relay", &self.relay)
66            .field("initial_spec", &self.initial_spec)
67            .field("parachains", &self.parachains)
68            .field("nodes_by_name", &self.nodes_by_name)
69            .field("observability", &self.observability)
70            .finish()
71    }
72}
73
74macros::create_add_options!(AddNodeOptions {
75    chain_spec: Option<PathBuf>,
76    override_eth_key: Option<String>
77});
78
79macros::create_add_options!(AddCollatorOptions {
80    chain_spec: Option<PathBuf>,
81    chain_spec_relay: Option<PathBuf>,
82    override_eth_key: Option<String>
83});
84
85impl<T: FileSystem> Network<T> {
86    pub(crate) fn new_with_relay(
87        relay: Relaychain,
88        ns: DynNamespace,
89        fs: T,
90        initial_spec: NetworkSpec,
91    ) -> Self {
92        Self {
93            ns,
94            filesystem: fs,
95            relay,
96            initial_spec,
97            parachains: Default::default(),
98            nodes_by_name: Default::default(),
99            nodes_to_watch: Default::default(),
100            start_time_ts: Default::default(),
101            observability: ObservabilityState::default(),
102        }
103    }
104
105    // Pubic API
106    pub fn ns_name(&self) -> String {
107        self.ns.name().to_string()
108    }
109
110    pub fn base_dir(&self) -> Option<&str> {
111        self.ns.base_dir().to_str()
112    }
113
114    pub fn relaychain(&self) -> &Relaychain {
115        &self.relay
116    }
117
118    // Teardown the network
119    pub async fn destroy(mut self) -> Result<(), ProviderError> {
120        if let Err(e) = self.stop_observability().await {
121            warn!("⚠️  Failed to cleanup observability stack: {e}");
122        }
123        self.ns.destroy().await
124    }
125
126    pub fn observability(&self) -> Option<&ObservabilityInfo> {
127        self.observability.as_runnnig()
128    }
129
130    pub fn observability_state(&self) -> &ObservabilityState {
131        &self.observability
132    }
133
134    /// Add a node to the relaychain
135    // The new node is added to the running network instance.
136    /// # Example:
137    /// ```rust
138    /// # use provider::NativeProvider;
139    /// # use support::{fs::local::LocalFileSystem};
140    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
141    /// # use configuration::NetworkConfig;
142    /// # async fn example() -> Result<(), errors::OrchestratorError> {
143    /// #   let provider = NativeProvider::new(LocalFileSystem {});
144    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
145    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
146    /// let mut network = orchestrator.spawn(config).await?;
147    ///
148    /// // Create the options to add the new node
149    /// let opts = AddNodeOptions {
150    ///     rpc_port: Some(9444),
151    ///     is_validator: true,
152    ///     ..Default::default()
153    /// };
154    ///
155    /// network.add_node("new-node", opts).await?;
156    /// #   Ok(())
157    /// # }
158    /// ```
159    pub async fn add_node(
160        &mut self,
161        name: impl Into<String>,
162        options: AddNodeOptions,
163    ) -> Result<(), anyhow::Error> {
164        let name = generate_unique_node_name_from_names(
165            name,
166            &mut self.nodes_by_name.keys().cloned().collect(),
167        );
168
169        let relaychain = self.relaychain();
170
171        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
172            chain_spec_custom_path.clone()
173        } else {
174            PathBuf::from(format!(
175                "{}/{}.json",
176                self.ns.base_dir().to_string_lossy(),
177                relaychain.chain
178            ))
179        };
180
181        let chain_context = ChainDefaultContext {
182            default_command: self.initial_spec.relaychain.default_command.as_ref(),
183            default_image: self.initial_spec.relaychain.default_image.as_ref(),
184            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
185            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
186            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
187        };
188
189        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
190            &name,
191            options.into(),
192            &chain_context,
193            false,
194            false,
195        )?;
196
197        node_spec.available_args_output = Some(
198            self.initial_spec
199                .node_available_args_output(&node_spec, self.ns.clone())
200                .await?,
201        );
202
203        let base_dir = self.ns.base_dir().to_string_lossy();
204        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
205
206        let ctx = SpawnNodeCtx {
207            chain_id: &relaychain.chain_id,
208            parachain_id: None,
209            chain: &relaychain.chain,
210            role: ZombieRole::Node,
211            ns: &self.ns,
212            scoped_fs: &scoped_fs,
213            parachain: None,
214            bootnodes_addr: &vec![],
215            wait_ready: true,
216            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
217            global_settings: &self.initial_spec.global_settings,
218        };
219
220        let global_files_to_inject = vec![TransferedFile::new(
221            chain_spec_path,
222            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
223        )];
224
225        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
226
227        // TODO: register the new node as validator in the relaychain
228        // STEPS:
229        //  - check balance of `stash` derivation for validator account
230        //  - call rotate_keys on the new validator
231        //  - call setKeys on the new validator
232        // if node_spec.is_validator {
233        //     let running_node = self.relay.nodes.first().unwrap();
234        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
235        // }
236
237        // Let's make sure node is up before adding
238        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
239            .await?;
240
241        // Add node to relaychain data
242        self.add_running_node(node.clone(), None).await;
243
244        // Dump zombie.json
245        self.write_zombie_json().await?;
246
247        Ok(())
248    }
249
250    /// Add a new collator to a parachain
251    ///
252    /// NOTE: if more parachains with given id available (rare corner case)
253    /// then it adds collator to the first parachain
254    ///
255    /// # Example:
256    /// ```rust
257    /// # use provider::NativeProvider;
258    /// # use support::{fs::local::LocalFileSystem};
259    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
260    /// # use configuration::NetworkConfig;
261    /// # async fn example() -> Result<(), anyhow::Error> {
262    /// #   let provider = NativeProvider::new(LocalFileSystem {});
263    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
264    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
265    /// let mut network = orchestrator.spawn(config).await?;
266    ///
267    /// let col_opts = AddCollatorOptions {
268    ///     command: Some("polkadot-parachain".try_into()?),
269    ///     ..Default::default()
270    /// };
271    ///
272    /// network.add_collator("new-col-1", col_opts, 100).await?;
273    /// #   Ok(())
274    /// # }
275    /// ```
276    pub async fn add_collator(
277        &mut self,
278        name: impl Into<String>,
279        options: AddCollatorOptions,
280        para_id: u32,
281    ) -> Result<(), anyhow::Error> {
282        let name = generate_unique_node_name_from_names(
283            name,
284            &mut self.nodes_by_name.keys().cloned().collect(),
285        );
286        let spec = self
287            .initial_spec
288            .parachains
289            .iter()
290            .find(|para| para.id == para_id)
291            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
292        let role = if spec.is_cumulus_based {
293            ZombieRole::CumulusCollator
294        } else {
295            ZombieRole::Collator
296        };
297        let chain_context = ChainDefaultContext {
298            default_command: spec.default_command.as_ref(),
299            default_image: spec.default_image.as_ref(),
300            default_resources: spec.default_resources.as_ref(),
301            default_db_snapshot: spec.default_db_snapshot.as_ref(),
302            default_args: spec.default_args.iter().collect(),
303        };
304
305        let parachain = self
306            .parachains
307            .get_mut(&para_id)
308            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
309            .get_mut(0)
310            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
311
312        let base_dir = self.ns.base_dir().to_string_lossy();
313        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
314
315        // TODO: we want to still supporting spawn a dedicated bootnode??
316        let ctx = SpawnNodeCtx {
317            chain_id: &self.relay.chain_id,
318            parachain_id: parachain.chain_id.as_deref(),
319            chain: &self.relay.chain,
320            role,
321            ns: &self.ns,
322            scoped_fs: &scoped_fs,
323            parachain: Some(spec),
324            bootnodes_addr: &vec![],
325            wait_ready: true,
326            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
327            global_settings: &self.initial_spec.global_settings,
328        };
329
330        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
331            chain_spec_custom_path.clone()
332        } else {
333            PathBuf::from(format!(
334                "{}/{}.json",
335                self.ns.base_dir().to_string_lossy(),
336                self.relay.chain
337            ))
338        };
339
340        let mut global_files_to_inject = vec![TransferedFile::new(
341            relaychain_spec_path,
342            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
343        )];
344
345        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
346            Some(para_chain_spec_custom.clone())
347        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
348            Some(PathBuf::from(format!(
349                "{}/{}",
350                self.ns.base_dir().to_string_lossy(),
351                para_spec_path.to_string_lossy()
352            )))
353        } else {
354            None
355        };
356
357        if let Some(para_spec_path) = para_chain_spec_local_path {
358            global_files_to_inject.push(TransferedFile::new(
359                para_spec_path,
360                PathBuf::from(format!("/cfg/{para_id}.json")),
361            ));
362        }
363
364        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
365            name,
366            options.into(),
367            &chain_context,
368            true,
369            spec.is_evm_based,
370        )?;
371
372        node_spec.available_args_output = Some(
373            self.initial_spec
374                .node_available_args_output(&node_spec, self.ns.clone())
375                .await?,
376        );
377
378        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
379
380        // Let's make sure node is up before adding
381        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
382            .await?;
383
384        self.add_running_node(node, Some(para_id)).await;
385
386        // Dump zombie.json
387        self.write_zombie_json().await?;
388
389        Ok(())
390    }
391
392    /// Get a parachain config builder from a running network
393    ///
394    /// This allow you to build a new parachain config to be deployed into
395    /// the running network.
396    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
397        let used_ports = self
398            .nodes_iter()
399            .map(|node| node.spec())
400            .flat_map(|spec| {
401                [
402                    spec.ws_port.0,
403                    spec.rpc_port.0,
404                    spec.prometheus_port.0,
405                    spec.p2p_port.0,
406                ]
407            })
408            .collect();
409
410        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
411
412        // need to inverse logic of generate_unique_para_id
413        let used_para_ids = self
414            .parachains
415            .iter()
416            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
417            .collect();
418
419        let context = ValidationContext {
420            used_ports,
421            used_nodes_names,
422            used_para_ids,
423        };
424        let context = Rc::new(RefCell::new(context));
425
426        ParachainConfigBuilder::new_with_running(context)
427    }
428
429    /// Add a new parachain to the running network
430    ///
431    /// # Arguments
432    /// * `para_config` - Parachain configuration to deploy
433    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
434    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
435    ///
436    ///
437    /// # Example:
438    /// ```rust
439    /// # use anyhow::anyhow;
440    /// # use provider::NativeProvider;
441    /// # use support::{fs::local::LocalFileSystem};
442    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
443    /// # use configuration::NetworkConfig;
444    /// # async fn example() -> Result<(), anyhow::Error> {
445    /// #   let provider = NativeProvider::new(LocalFileSystem {});
446    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
447    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
448    /// let mut network = orchestrator.spawn(config).await?;
449    /// let para_config = network
450    ///     .para_config_builder()
451    ///     .with_id(100)
452    ///     .with_default_command("polkadot-parachain")
453    ///     .with_collator(|c| c.with_name("col-100-1"))
454    ///     .build()
455    ///     .map_err(|_e| anyhow!("Building config"))?;
456    ///
457    /// network.add_parachain(&para_config, None, None).await?;
458    ///
459    /// #   Ok(())
460    /// # }
461    /// ```
462    pub async fn add_parachain(
463        &mut self,
464        para_config: &ParachainConfig,
465        custom_relaychain_spec: Option<PathBuf>,
466        custom_parchain_fs_prefix: Option<String>,
467    ) -> Result<(), anyhow::Error> {
468        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
469        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
470
471        let mut global_files_to_inject = vec![];
472
473        // get relaychain id
474        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
475            // use this file as relaychain spec
476            global_files_to_inject.push(TransferedFile::new(
477                custom_path.clone(),
478                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
479            ));
480            let content = std::fs::read_to_string(custom_path)?;
481            ChainSpec::chain_id_from_spec(&content)?
482        } else {
483            global_files_to_inject.push(TransferedFile::new(
484                PathBuf::from(format!(
485                    "{}/{}",
486                    scoped_fs.base_dir,
487                    self.relaychain().chain_spec_path.to_string_lossy()
488                )),
489                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
490            ));
491            self.relay.chain_id.clone()
492        };
493
494        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
495            para_config,
496            relay_chain_id.as_str().try_into()?,
497        )?;
498
499        let chain_spec_raw_path = para_spec
500            .build_chain_spec(
501                &relay_chain_id,
502                &self.ns,
503                &scoped_fs,
504                para_spec.post_process_script.clone().as_deref(),
505            )
506            .await?;
507
508        // Para artifacts
509        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
510            custom_prefix
511        } else {
512            para_spec.id.to_string()
513        };
514
515        scoped_fs.create_dir(&para_path_prefix).await?;
516        // create wasm/state
517        para_spec
518            .genesis_state
519            .build(
520                chain_spec_raw_path.as_ref(),
521                format!("{}/genesis-state", &para_path_prefix),
522                &self.ns,
523                &scoped_fs,
524                None,
525            )
526            .await?;
527        para_spec
528            .genesis_wasm
529            .build(
530                chain_spec_raw_path.as_ref(),
531                format!("{}/para_spec-wasm", &para_path_prefix),
532                &self.ns,
533                &scoped_fs,
534                None,
535            )
536            .await?;
537
538        let parachain =
539            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
540        let parachain_id = parachain.chain_id.clone();
541
542        // Create `ctx` for spawn the nodes
543        let ctx_para = SpawnNodeCtx {
544            parachain: Some(&para_spec),
545            parachain_id: parachain_id.as_deref(),
546            role: if para_spec.is_cumulus_based {
547                ZombieRole::CumulusCollator
548            } else {
549                ZombieRole::Collator
550            },
551            bootnodes_addr: &para_config
552                .bootnodes_addresses()
553                .iter()
554                .map(|&a| a.to_string())
555                .collect(),
556            chain_id: &self.relaychain().chain_id,
557            chain: &self.relaychain().chain,
558            ns: &self.ns,
559            scoped_fs: &scoped_fs,
560            wait_ready: false,
561            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
562            global_settings: &self.initial_spec.global_settings,
563        };
564
565        // Register the parachain to the running network
566        let first_node_url = self
567            .relaychain()
568            .nodes
569            .first()
570            .ok_or(anyhow::anyhow!(
571                "At least one node of the relaychain should be running"
572            ))?
573            .ws_uri();
574
575        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
576            let register_para_options = RegisterParachainOptions {
577                id: parachain.para_id,
578                // This needs to resolve correctly
579                wasm_path: para_spec
580                    .genesis_wasm
581                    .artifact_path()
582                    .ok_or(anyhow::anyhow!(
583                        "artifact path for wasm must be set at this point",
584                    ))?
585                    .to_path_buf(),
586                state_path: para_spec
587                    .genesis_state
588                    .artifact_path()
589                    .ok_or(anyhow::anyhow!(
590                        "artifact path for state must be set at this point",
591                    ))?
592                    .to_path_buf(),
593                node_ws_url: first_node_url.to_string(),
594                onboard_as_para: para_spec.onboard_as_parachain,
595                seed: None, // TODO: Seed is passed by?
596                finalization: false,
597            };
598
599            Parachain::register(register_para_options, &scoped_fs).await?;
600        }
601
602        // Spawn the nodes
603        let spawning_tasks = para_spec
604            .collators
605            .iter()
606            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
607
608        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
609
610        // Let's make sure nodes are up before adding them
611        let waiting_tasks = running_nodes.iter().map(|node| {
612            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
613        });
614
615        let _ = futures::future::try_join_all(waiting_tasks).await?;
616
617        let running_para_id = parachain.para_id;
618        self.add_para(parachain);
619        for node in running_nodes {
620            self.add_running_node(node, Some(running_para_id)).await;
621        }
622
623        // Dump zombie.json
624        self.write_zombie_json().await?;
625
626        Ok(())
627    }
628
629    /// Register a parachain, which has already been added to the network (with manual registration
630    /// strategy)
631    ///
632    /// # Arguments
633    /// * `para_id` - Parachain Id
634    ///
635    ///
636    /// # Example:
637    /// ```rust
638    /// # use anyhow::anyhow;
639    /// # use provider::NativeProvider;
640    /// # use support::{fs::local::LocalFileSystem};
641    /// # use zombienet_orchestrator::Orchestrator;
642    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
643    /// # async fn example() -> Result<(), anyhow::Error> {
644    /// #   let provider = NativeProvider::new(LocalFileSystem {});
645    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
646    /// #   let config = NetworkConfigBuilder::new()
647    /// #     .with_relaychain(|r| {
648    /// #       r.with_chain("rococo-local")
649    /// #         .with_default_command("polkadot")
650    /// #         .with_node(|node| node.with_name("alice"))
651    /// #     })
652    /// #     .with_parachain(|p| {
653    /// #       p.with_id(100)
654    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
655    /// #         .with_default_command("test-parachain")
656    /// #         .with_collator(|n| n.with_name("dave").validator(false))
657    /// #     })
658    /// #     .build()
659    /// #     .map_err(|_e| anyhow!("Building config"))?;
660    /// let mut network = orchestrator.spawn(config).await?;
661    ///
662    /// network.register_parachain(100).await?;
663    ///
664    /// #   Ok(())
665    /// # }
666    /// ```
667    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
668        let para = self
669            .initial_spec
670            .parachains
671            .iter()
672            .find(|p| p.id == para_id)
673            .ok_or(anyhow::anyhow!(
674                "no parachain with id = {para_id} available",
675            ))?;
676        let para_genesis_config = para.get_genesis_config()?;
677        let first_node_url = self
678            .relaychain()
679            .nodes
680            .first()
681            .ok_or(anyhow::anyhow!(
682                "At least one node of the relaychain should be running"
683            ))?
684            .ws_uri();
685        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
686            id: para_id,
687            // This needs to resolve correctly
688            wasm_path: para_genesis_config.wasm_path.clone(),
689            state_path: para_genesis_config.state_path.clone(),
690            node_ws_url: first_node_url.to_string(),
691            onboard_as_para: para_genesis_config.as_parachain,
692            seed: None, // TODO: Seed is passed by?
693            finalization: false,
694        };
695        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
696        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
697        Parachain::register(register_para_options, &scoped_fs).await?;
698
699        Ok(())
700    }
701
702    // deregister and stop the collator?
703    // remove_parachain()
704
705    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
706        let name = name.into();
707        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
708            return Ok(node);
709        }
710
711        let list = self
712            .nodes_iter()
713            .map(|n| &n.name)
714            .cloned()
715            .collect::<Vec<_>>()
716            .join(", ");
717
718        Err(anyhow::anyhow!(
719            "can't find node with name: {name:?}, should be one of {list}"
720        ))
721    }
722
723    pub fn get_node_mut(
724        &mut self,
725        name: impl Into<String>,
726    ) -> Result<&mut NetworkNode, anyhow::Error> {
727        let name = name.into();
728        self.nodes_iter_mut()
729            .find(|n| n.name == name)
730            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
731    }
732
733    pub fn nodes(&self) -> Vec<&NetworkNode> {
734        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
735    }
736
737    pub async fn detach(&self) {
738        self.ns.detach().await
739    }
740
741    // Internal API
742    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
743        if let Some(para_id) = para_id {
744            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
745                para.collators.push(node.clone());
746            } else {
747                // is the first node of the para, let create the entry
748                unreachable!()
749            }
750        } else {
751            self.relay.nodes.push(node.clone());
752        }
753        // TODO: we should hold a ref to the node in the vec in the future.
754        node.set_is_running(true);
755        let node_name = node.name.clone();
756        self.nodes_by_name.insert(node_name, node.clone());
757        self.nodes_to_watch.write().await.push(node);
758    }
759
760    pub(crate) fn add_para(&mut self, para: Parachain) {
761        self.parachains.entry(para.para_id).or_default().push(para);
762    }
763
764    pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
765        let base_dir = self.ns.base_dir().to_string_lossy();
766        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
767        let ns_name = self.ns.name();
768
769        write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
770        Ok(())
771    }
772
773    pub fn name(&self) -> &str {
774        self.ns.name()
775    }
776
777    /// Get a first parachain from the list of the parachains with specified id.
778    /// NOTE!
779    /// Usually the list will contain only one parachain.
780    /// Multiple parachains with the same id is a corner case.
781    /// If this is the case then one can get such parachain with
782    /// `parachain_by_unique_id()` method
783    ///
784    /// # Arguments
785    /// * `para_id` - Parachain Id
786    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
787        self.parachains.get(&para_id)?.first()
788    }
789
790    /// Get a parachain by its unique id.
791    ///
792    /// This is particularly useful if there are multiple parachains
793    /// with the same id (this is a rare corner case).
794    ///
795    /// # Arguments
796    /// * `unique_id` - unique id of the parachain
797    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
798        self.parachains
799            .values()
800            .flat_map(|p| p.iter())
801            .find(|p| p.unique_id == unique_id.as_ref())
802    }
803
804    pub fn parachains(&self) -> Vec<&Parachain> {
805        self.parachains.values().flatten().collect()
806    }
807
808    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
809        self.relay.nodes.iter().chain(
810            self.parachains
811                .values()
812                .flat_map(|p| p.iter())
813                .flat_map(|p| &p.collators),
814        )
815    }
816
817    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
818        self.relay.nodes.iter_mut().chain(
819            self.parachains
820                .values_mut()
821                .flat_map(|p| p.iter_mut())
822                .flat_map(|p| &mut p.collators),
823        )
824    }
825
826    /// Waits given number of seconds until all nodes in the network report that they are
827    /// up and running.
828    ///
829    /// # Arguments
830    /// * `timeout_secs` - The number of seconds to wait.
831    ///
832    /// # Returns
833    /// * `Ok()` if the node is up before timeout occured.
834    /// * `Err(e)` if timeout or other error occurred while waiting.
835    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
836        let handles = self
837            .nodes_iter()
838            .map(|node| node.wait_until_is_up(timeout_secs));
839
840        futures::future::try_join_all(handles).await?;
841
842        Ok(())
843    }
844
845    /// Start the observability stack (Prometheus + Grafana) as an add-on
846    ///
847    /// This can be called on any running network — whether freshly spawned or
848    /// re-attached via [`Orchestrator::attach_to_live`]. If observability is
849    /// already running, it will be stopped first
850    ///
851    /// # Example:
852    /// ```rust
853    /// # use provider::NativeProvider;
854    /// # use support::{fs::local::LocalFileSystem};
855    /// # use zombienet_orchestrator::{errors, Orchestrator};
856    /// # use configuration::{NetworkConfig, ObservabilityConfigBuilder};
857    /// # async fn example() -> Result<(), errors::OrchestratorError> {
858    /// #   let provider = NativeProvider::new(LocalFileSystem {});
859    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
860    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
861    /// let mut network = orchestrator.spawn(config).await?;
862    ///
863    /// let obs_config = ObservabilityConfigBuilder::new()
864    ///     .with_enabled(true)
865    ///     .with_grafana_port(3000)
866    ///     .build();
867    ///
868    /// let info = network.start_observability(&obs_config).await?;
869    /// println!("Grafana: {}", info.grafana_url);
870    /// #   Ok(())
871    /// # }
872    /// ```
873    pub async fn start_observability(
874        &mut self,
875        config: &configuration::ObservabilityConfig,
876    ) -> Result<&ObservabilityInfo, anyhow::Error> {
877        if self.observability().is_some() {
878            self.stop_observability().await?;
879        }
880
881        let nodes = self.nodes();
882        let info = observability::spawn_observability_stack(
883            config,
884            &nodes,
885            self.ns.name(),
886            self.ns.base_dir(),
887            &self.filesystem,
888        )
889        .await?;
890
891        self.observability = ObservabilityState::Running(info);
892        self.observability()
893            .ok_or_else(|| anyhow::anyhow!("observability state was just set but is not running"))
894    }
895
896    /// Stop the observability stack if running
897    ///
898    /// Removes the Prometheus and Grafana containers. This is safe to call
899    /// even if no observability stack is running (it will be a no-op)
900    pub async fn stop_observability(&mut self) -> Result<(), anyhow::Error> {
901        if let ObservabilityState::Running(info) =
902            std::mem::replace(&mut self.observability, ObservabilityState::Stopped)
903        {
904            observability::cleanup_observability_stack(&info).await?;
905        }
906        Ok(())
907    }
908
909    pub(crate) fn spawn_watching_task(&self) {
910        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
911        let ns = Arc::clone(&self.ns);
912
913        tokio::spawn(async move {
914            loop {
915                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
916
917                let all_running = {
918                    let guard = nodes_to_watch.read().await;
919                    let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
920
921                    let all_running =
922                        futures::future::try_join_all(nodes.iter().map(|n| {
923                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
924                        }))
925                        .await;
926
927                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
928                    if nodes.iter().any(|n| !n.is_running()) {
929                        continue;
930                    } else {
931                        all_running
932                    }
933                };
934
935                if let Err(e) = all_running {
936                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
937
938                    if let Err(e) = ns.destroy().await {
939                        error!("an error occurred during network teardown: {}", e);
940                    }
941
942                    std::process::exit(1);
943                }
944            }
945        });
946    }
947
948    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
949        self.parachains = parachains;
950    }
951
952    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
953        self.nodes_by_name.insert(node.name.clone(), node);
954    }
955
956    pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
957        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
958            self.start_time_ts = Some(start_time_ts.as_millis().to_string());
959        } else {
960            // Just warn, do not propagate the err (this should not happens)
961            warn!("⚠️ Error getting start_time timestamp");
962        }
963    }
964}