Skip to main content

zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7    cell::RefCell,
8    collections::HashMap,
9    path::PathBuf,
10    rc::Rc,
11    sync::Arc,
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use configuration::{
16    para_states::{Initial, Running},
17    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18    types::{Arg, Command, Image, ParaId, Port, ValidationContext},
19    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::{Deserialize, Serialize};
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29    generators::{self, chain_spec::ChainSpec},
30    network_spec::{self, NetworkSpec},
31    observability::{self, ObservabilityInfo, ObservabilityState},
32    shared::{
33        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
34        macros,
35        types::{ChainDefaultContext, RegisterParachainOptions},
36    },
37    spawner::{self, SpawnNodeCtx},
38    utils::write_zombie_json,
39    ScopedFilesystem, ZombieRole,
40};
41
42/// Context where the node is running
43/// RC or Para
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
45pub enum NodeContext {
46    Rc,
47    Para {
48        para_id: ParaId,
49        is_cumulus_based: bool,
50    },
51}
52
53#[derive(Serialize)]
54pub struct Network<T: FileSystem> {
55    #[serde(skip)]
56    ns: DynNamespace,
57    #[serde(skip)]
58    filesystem: T,
59    relay: Relaychain,
60    initial_spec: NetworkSpec,
61    parachains: HashMap<u32, Vec<Parachain>>,
62    #[serde(skip)]
63    nodes_by_name: HashMap<String, NetworkNode>,
64    #[serde(skip)]
65    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    start_time_ts: Option<String>,
68    #[serde(skip)]
69    observability: ObservabilityState,
70}
71
72impl<T: FileSystem> std::fmt::Debug for Network<T> {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("Network")
75            .field("ns", &"ns_skipped")
76            .field("relay", &self.relay)
77            .field("initial_spec", &self.initial_spec)
78            .field("parachains", &self.parachains)
79            .field("nodes_by_name", &self.nodes_by_name)
80            .field("observability", &self.observability)
81            .finish()
82    }
83}
84
85macros::create_add_options!(AddNodeOptions {
86    chain_spec: Option<PathBuf>,
87    override_eth_key: Option<String>
88});
89
90macros::create_add_options!(AddCollatorOptions {
91    chain_spec: Option<PathBuf>,
92    chain_spec_relay: Option<PathBuf>,
93    override_eth_key: Option<String>
94});
95
96impl<T: FileSystem> Network<T> {
97    pub(crate) fn new_with_relay(
98        relay: Relaychain,
99        ns: DynNamespace,
100        fs: T,
101        initial_spec: NetworkSpec,
102    ) -> Self {
103        Self {
104            ns,
105            filesystem: fs,
106            relay,
107            initial_spec,
108            parachains: Default::default(),
109            nodes_by_name: Default::default(),
110            nodes_to_watch: Default::default(),
111            start_time_ts: Default::default(),
112            observability: ObservabilityState::default(),
113        }
114    }
115
116    // Pubic API
117    pub fn ns_name(&self) -> String {
118        self.ns.name().to_string()
119    }
120
121    pub fn base_dir(&self) -> Option<&str> {
122        self.ns.base_dir().to_str()
123    }
124
125    pub fn relaychain(&self) -> &Relaychain {
126        &self.relay
127    }
128
129    // Teardown the network
130    pub async fn destroy(mut self) -> Result<(), ProviderError> {
131        if let Err(e) = self.stop_observability().await {
132            warn!("⚠️  Failed to cleanup observability stack: {e}");
133        }
134        self.ns.destroy().await
135    }
136
137    pub fn observability(&self) -> Option<&ObservabilityInfo> {
138        self.observability.as_runnnig()
139    }
140
141    pub fn observability_state(&self) -> &ObservabilityState {
142        &self.observability
143    }
144
145    /// Add a node to the relaychain
146    // The new node is added to the running network instance.
147    /// # Example:
148    /// ```rust
149    /// # use provider::NativeProvider;
150    /// # use support::{fs::local::LocalFileSystem};
151    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
152    /// # use configuration::NetworkConfig;
153    /// # async fn example() -> Result<(), errors::OrchestratorError> {
154    /// #   let provider = NativeProvider::new(LocalFileSystem {});
155    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
156    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
157    /// let mut network = orchestrator.spawn(config).await?;
158    ///
159    /// // Create the options to add the new node
160    /// let opts = AddNodeOptions {
161    ///     rpc_port: Some(9444),
162    ///     is_validator: true,
163    ///     ..Default::default()
164    /// };
165    ///
166    /// network.add_node("new-node", opts).await?;
167    /// #   Ok(())
168    /// # }
169    /// ```
170    pub async fn add_node(
171        &mut self,
172        name: impl Into<String>,
173        options: AddNodeOptions,
174    ) -> Result<(), anyhow::Error> {
175        let name = generate_unique_node_name_from_names(
176            name,
177            &mut self.nodes_by_name.keys().cloned().collect(),
178        );
179
180        let relaychain = self.relaychain();
181
182        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
183            chain_spec_custom_path.clone()
184        } else {
185            PathBuf::from(format!(
186                "{}/{}.json",
187                self.ns.base_dir().to_string_lossy(),
188                relaychain.chain
189            ))
190        };
191
192        let chain_context = ChainDefaultContext {
193            default_command: self.initial_spec.relaychain.default_command.as_ref(),
194            default_image: self.initial_spec.relaychain.default_image.as_ref(),
195            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
196            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
197            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
198        };
199
200        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
201            &name,
202            options.into(),
203            &chain_context,
204            false,
205            false,
206        )?;
207
208        node_spec.available_args_output = Some(
209            self.initial_spec
210                .node_available_args_output(&node_spec, self.ns.clone())
211                .await?,
212        );
213
214        let base_dir = self.ns.base_dir().to_string_lossy();
215        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
216
217        let resolved_db_snapshots = generators::resolve_db_snapshots(
218            std::iter::once(&node_spec),
219            &self.ns,
220            &self.filesystem,
221        )
222        .await?;
223
224        let ctx = SpawnNodeCtx {
225            chain_id: &relaychain.chain_id,
226            parachain_id: None,
227            chain: &relaychain.chain,
228            role: ZombieRole::Node,
229            ns: &self.ns,
230            scoped_fs: &scoped_fs,
231            parachain: None,
232            bootnodes_addr: &vec![],
233            wait_ready: true,
234            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
235            global_settings: &self.initial_spec.global_settings,
236            resolved_db_snapshots: &resolved_db_snapshots,
237        };
238
239        let global_files_to_inject = vec![TransferedFile::new(
240            chain_spec_path,
241            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
242        )];
243
244        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
245
246        // TODO: register the new node as validator in the relaychain
247        // STEPS:
248        //  - check balance of `stash` derivation for validator account
249        //  - call rotate_keys on the new validator
250        //  - call setKeys on the new validator
251        // if node_spec.is_validator {
252        //     let running_node = self.relay.nodes.first().unwrap();
253        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
254        // }
255
256        // Let's make sure node is up before adding
257        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
258            .await?;
259
260        // Add node to relaychain data
261        self.add_running_node(node.clone(), None).await;
262
263        // Dump zombie.json
264        self.write_zombie_json().await?;
265
266        generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
267
268        Ok(())
269    }
270
271    /// Add a new collator to a parachain
272    ///
273    /// NOTE: if more parachains with given id available (rare corner case)
274    /// then it adds collator to the first parachain
275    ///
276    /// # Example:
277    /// ```rust
278    /// # use provider::NativeProvider;
279    /// # use support::{fs::local::LocalFileSystem};
280    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
281    /// # use configuration::NetworkConfig;
282    /// # async fn example() -> Result<(), anyhow::Error> {
283    /// #   let provider = NativeProvider::new(LocalFileSystem {});
284    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
285    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
286    /// let mut network = orchestrator.spawn(config).await?;
287    ///
288    /// let col_opts = AddCollatorOptions {
289    ///     command: Some("polkadot-parachain".try_into()?),
290    ///     ..Default::default()
291    /// };
292    ///
293    /// network.add_collator("new-col-1", col_opts, 100).await?;
294    /// #   Ok(())
295    /// # }
296    /// ```
297    pub async fn add_collator(
298        &mut self,
299        name: impl Into<String>,
300        options: AddCollatorOptions,
301        para_id: u32,
302    ) -> Result<(), anyhow::Error> {
303        let name = generate_unique_node_name_from_names(
304            name,
305            &mut self.nodes_by_name.keys().cloned().collect(),
306        );
307        let spec = self
308            .initial_spec
309            .parachains
310            .iter()
311            .find(|para| para.id == para_id)
312            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
313        let role = if spec.is_cumulus_based {
314            ZombieRole::CumulusCollator
315        } else {
316            ZombieRole::Collator
317        };
318        let chain_context = ChainDefaultContext {
319            default_command: spec.default_command.as_ref(),
320            default_image: spec.default_image.as_ref(),
321            default_resources: spec.default_resources.as_ref(),
322            default_db_snapshot: spec.default_db_snapshot.as_ref(),
323            default_args: spec.default_args.iter().collect(),
324        };
325
326        let parachain = self
327            .parachains
328            .get_mut(&para_id)
329            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
330            .get_mut(0)
331            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
332
333        let base_dir = self.ns.base_dir().to_string_lossy();
334        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
335
336        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
337            chain_spec_custom_path.clone()
338        } else {
339            PathBuf::from(format!(
340                "{}/{}.json",
341                self.ns.base_dir().to_string_lossy(),
342                self.relay.chain
343            ))
344        };
345
346        let mut global_files_to_inject = vec![TransferedFile::new(
347            relaychain_spec_path,
348            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
349        )];
350
351        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
352            Some(para_chain_spec_custom.clone())
353        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
354            Some(PathBuf::from(format!(
355                "{}/{}",
356                self.ns.base_dir().to_string_lossy(),
357                para_spec_path.to_string_lossy()
358            )))
359        } else {
360            None
361        };
362
363        if let Some(para_spec_path) = para_chain_spec_local_path {
364            global_files_to_inject.push(TransferedFile::new(
365                para_spec_path,
366                PathBuf::from(format!("/cfg/{para_id}.json")),
367            ));
368        }
369
370        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
371            name,
372            options.into(),
373            &chain_context,
374            true,
375            spec.is_evm_based,
376        )?;
377
378        node_spec.available_args_output = Some(
379            self.initial_spec
380                .node_available_args_output(&node_spec, self.ns.clone())
381                .await?,
382        );
383
384        let resolved_db_snapshots = generators::resolve_db_snapshots(
385            std::iter::once(&node_spec),
386            &self.ns,
387            &self.filesystem,
388        )
389        .await?;
390
391        // TODO: we want to still supporting spawn a dedicated bootnode??
392        let ctx = SpawnNodeCtx {
393            chain_id: &self.relay.chain_id,
394            parachain_id: parachain.chain_id.as_deref(),
395            chain: &self.relay.chain,
396            role,
397            ns: &self.ns,
398            scoped_fs: &scoped_fs,
399            parachain: Some(spec),
400            bootnodes_addr: &vec![],
401            wait_ready: true,
402            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
403            global_settings: &self.initial_spec.global_settings,
404            resolved_db_snapshots: &resolved_db_snapshots,
405        };
406
407        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
408
409        // Let's make sure node is up before adding
410        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
411            .await?;
412
413        self.add_running_node(node, Some(para_id)).await;
414
415        // Dump zombie.json
416        self.write_zombie_json().await?;
417
418        generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
419
420        Ok(())
421    }
422
423    /// Get a parachain config builder from a running network
424    ///
425    /// This allow you to build a new parachain config to be deployed into
426    /// the running network.
427    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
428        let used_ports = self
429            .nodes_iter()
430            .map(|node| node.spec())
431            .flat_map(|spec| {
432                [
433                    spec.ws_port.0,
434                    spec.rpc_port.0,
435                    spec.prometheus_port.0,
436                    spec.p2p_port.0,
437                ]
438            })
439            .collect();
440
441        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
442
443        // need to inverse logic of generate_unique_para_id
444        let used_para_ids = self
445            .parachains
446            .iter()
447            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
448            .collect();
449
450        let context = ValidationContext {
451            used_ports,
452            used_nodes_names,
453            used_para_ids,
454        };
455        let context = Rc::new(RefCell::new(context));
456
457        ParachainConfigBuilder::new_with_running(context)
458    }
459
460    /// Add a new parachain to the running network
461    ///
462    /// # Arguments
463    /// * `para_config` - Parachain configuration to deploy
464    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
465    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
466    ///
467    ///
468    /// # Example:
469    /// ```rust
470    /// # use anyhow::anyhow;
471    /// # use provider::NativeProvider;
472    /// # use support::{fs::local::LocalFileSystem};
473    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
474    /// # use configuration::NetworkConfig;
475    /// # async fn example() -> Result<(), anyhow::Error> {
476    /// #   let provider = NativeProvider::new(LocalFileSystem {});
477    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
478    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
479    /// let mut network = orchestrator.spawn(config).await?;
480    /// let para_config = network
481    ///     .para_config_builder()
482    ///     .with_id(100)
483    ///     .with_default_command("polkadot-parachain")
484    ///     .with_collator(|c| c.with_name("col-100-1"))
485    ///     .build()
486    ///     .map_err(|_e| anyhow!("Building config"))?;
487    ///
488    /// network.add_parachain(&para_config, None, None).await?;
489    ///
490    /// #   Ok(())
491    /// # }
492    /// ```
493    pub async fn add_parachain(
494        &mut self,
495        para_config: &ParachainConfig,
496        custom_relaychain_spec: Option<PathBuf>,
497        custom_parchain_fs_prefix: Option<String>,
498    ) -> Result<(), anyhow::Error> {
499        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
500        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
501
502        let mut global_files_to_inject = vec![];
503
504        // get relaychain id
505        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
506            // use this file as relaychain spec
507            global_files_to_inject.push(TransferedFile::new(
508                custom_path.clone(),
509                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
510            ));
511            let content = std::fs::read_to_string(custom_path)?;
512            ChainSpec::chain_id_from_spec(&content)?
513        } else {
514            global_files_to_inject.push(TransferedFile::new(
515                PathBuf::from(format!(
516                    "{}/{}",
517                    scoped_fs.base_dir,
518                    self.relaychain().chain_spec_path.to_string_lossy()
519                )),
520                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
521            ));
522            self.relay.chain_id.clone()
523        };
524
525        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
526            para_config,
527            relay_chain_id.as_str().try_into()?,
528        )?;
529
530        let chain_spec_raw_path = para_spec
531            .build_chain_spec(
532                &relay_chain_id,
533                &self.ns,
534                &scoped_fs,
535                para_spec.post_process_script.clone().as_deref(),
536            )
537            .await?;
538
539        // Para artifacts
540        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
541            custom_prefix
542        } else {
543            para_spec.id.to_string()
544        };
545
546        scoped_fs.create_dir(&para_path_prefix).await?;
547        // create wasm/state
548        para_spec
549            .genesis_state
550            .build(
551                chain_spec_raw_path.as_ref(),
552                format!("{}/genesis-state", &para_path_prefix),
553                &self.ns,
554                &scoped_fs,
555                None,
556            )
557            .await?;
558        para_spec
559            .genesis_wasm
560            .build(
561                chain_spec_raw_path.as_ref(),
562                format!("{}/para_spec-wasm", &para_path_prefix),
563                &self.ns,
564                &scoped_fs,
565                None,
566            )
567            .await?;
568
569        let parachain =
570            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
571        let parachain_id = parachain.chain_id.clone();
572
573        let resolved_db_snapshots = generators::resolve_db_snapshots(
574            para_spec.collators.iter(),
575            &self.ns,
576            &self.filesystem,
577        )
578        .await?;
579
580        // Create `ctx` for spawn the nodes
581        let ctx_para = SpawnNodeCtx {
582            parachain: Some(&para_spec),
583            parachain_id: parachain_id.as_deref(),
584            role: if para_spec.is_cumulus_based {
585                ZombieRole::CumulusCollator
586            } else {
587                ZombieRole::Collator
588            },
589            bootnodes_addr: &para_config
590                .bootnodes_addresses()
591                .iter()
592                .map(|&a| a.to_string())
593                .collect(),
594            chain_id: &self.relaychain().chain_id,
595            chain: &self.relaychain().chain,
596            ns: &self.ns,
597            scoped_fs: &scoped_fs,
598            wait_ready: false,
599            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
600            global_settings: &self.initial_spec.global_settings,
601            resolved_db_snapshots: &resolved_db_snapshots,
602        };
603
604        // Register the parachain to the running network
605        let first_node_url = self
606            .relaychain()
607            .nodes
608            .first()
609            .ok_or(anyhow::anyhow!(
610                "At least one node of the relaychain should be running"
611            ))?
612            .ws_uri();
613
614        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
615            let register_para_options = RegisterParachainOptions {
616                id: parachain.para_id,
617                // This needs to resolve correctly
618                wasm_path: para_spec
619                    .genesis_wasm
620                    .artifact_path()
621                    .ok_or(anyhow::anyhow!(
622                        "artifact path for wasm must be set at this point",
623                    ))?
624                    .to_path_buf(),
625                state_path: para_spec
626                    .genesis_state
627                    .artifact_path()
628                    .ok_or(anyhow::anyhow!(
629                        "artifact path for state must be set at this point",
630                    ))?
631                    .to_path_buf(),
632                node_ws_url: first_node_url.to_string(),
633                onboard_as_para: para_spec.onboard_as_parachain,
634                seed: None, // TODO: Seed is passed by?
635                finalization: false,
636            };
637
638            Parachain::register(register_para_options, &scoped_fs).await?;
639        }
640
641        // Spawn the nodes
642        let spawning_tasks = para_spec
643            .collators
644            .iter()
645            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
646
647        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
648
649        // Let's make sure nodes are up before adding them
650        let waiting_tasks = running_nodes.iter().map(|node| {
651            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
652        });
653
654        let _ = futures::future::try_join_all(waiting_tasks).await?;
655
656        let running_para_id = parachain.para_id;
657        self.add_para(parachain);
658        for node in running_nodes {
659            self.add_running_node(node, Some(running_para_id)).await;
660        }
661
662        // Dump zombie.json
663        self.write_zombie_json().await?;
664
665        generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
666
667        Ok(())
668    }
669
670    /// Register a parachain, which has already been added to the network (with manual registration
671    /// strategy)
672    ///
673    /// # Arguments
674    /// * `para_id` - Parachain Id
675    ///
676    ///
677    /// # Example:
678    /// ```rust
679    /// # use anyhow::anyhow;
680    /// # use provider::NativeProvider;
681    /// # use support::{fs::local::LocalFileSystem};
682    /// # use zombienet_orchestrator::Orchestrator;
683    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
684    /// # async fn example() -> Result<(), anyhow::Error> {
685    /// #   let provider = NativeProvider::new(LocalFileSystem {});
686    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
687    /// #   let config = NetworkConfigBuilder::new()
688    /// #     .with_relaychain(|r| {
689    /// #       r.with_chain("rococo-local")
690    /// #         .with_default_command("polkadot")
691    /// #         .with_node(|node| node.with_name("alice"))
692    /// #     })
693    /// #     .with_parachain(|p| {
694    /// #       p.with_id(100)
695    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
696    /// #         .with_default_command("test-parachain")
697    /// #         .with_collator(|n| n.with_name("dave").validator(false))
698    /// #     })
699    /// #     .build()
700    /// #     .map_err(|_e| anyhow!("Building config"))?;
701    /// let mut network = orchestrator.spawn(config).await?;
702    ///
703    /// network.register_parachain(100).await?;
704    ///
705    /// #   Ok(())
706    /// # }
707    /// ```
708    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
709        let para = self
710            .initial_spec
711            .parachains
712            .iter()
713            .find(|p| p.id == para_id)
714            .ok_or(anyhow::anyhow!(
715                "no parachain with id = {para_id} available",
716            ))?;
717        let para_genesis_config = para.get_genesis_config()?;
718        let first_node_url = self
719            .relaychain()
720            .nodes
721            .first()
722            .ok_or(anyhow::anyhow!(
723                "At least one node of the relaychain should be running"
724            ))?
725            .ws_uri();
726        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
727            id: para_id,
728            // This needs to resolve correctly
729            wasm_path: para_genesis_config.wasm_path.clone(),
730            state_path: para_genesis_config.state_path.clone(),
731            node_ws_url: first_node_url.to_string(),
732            onboard_as_para: para_genesis_config.as_parachain,
733            seed: None, // TODO: Seed is passed by?
734            finalization: false,
735        };
736        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
737        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
738        Parachain::register(register_para_options, &scoped_fs).await?;
739
740        Ok(())
741    }
742
743    // deregister and stop the collator?
744    // remove_parachain()
745
746    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
747        let name = name.into();
748        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
749            return Ok(node);
750        }
751
752        let list = self.node_names().join(", ");
753
754        Err(anyhow::anyhow!(
755            "can't find node with name: {name:?}, should be one of {list}"
756        ))
757    }
758
759    pub fn get_node_mut(
760        &mut self,
761        name: impl Into<String>,
762    ) -> Result<&mut NetworkNode, anyhow::Error> {
763        let name = name.into();
764        self.nodes_iter_mut()
765            .find(|n| n.name == name)
766            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
767    }
768
769    pub fn node_names(&self) -> Vec<String> {
770        self.nodes_iter()
771            .map(|n| &n.name)
772            .cloned()
773            .collect::<Vec<_>>()
774    }
775
776    pub fn nodes(&self) -> Vec<&NetworkNode> {
777        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
778    }
779
780    pub async fn detach(&self) {
781        self.ns.detach().await
782    }
783
784    // Internal API
785    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
786        if let Some(para_id) = para_id {
787            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
788                para.collators.push(node.clone());
789            } else {
790                // is the first node of the para, let create the entry
791                unreachable!()
792            }
793        } else {
794            self.relay.nodes.push(node.clone());
795        }
796        // TODO: we should hold a ref to the node in the vec in the future.
797        node.set_is_running(true);
798        node.set_last_start_ts(
799            SystemTime::now()
800                .duration_since(UNIX_EPOCH)
801                .expect("Timestamp should be valid")
802                .as_secs(),
803        );
804        let node_name = node.name.clone();
805        self.nodes_by_name.insert(node_name, node.clone());
806        self.nodes_to_watch.write().await.push(node);
807    }
808
809    pub(crate) fn add_para(&mut self, para: Parachain) {
810        self.parachains.entry(para.para_id).or_default().push(para);
811    }
812
813    pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
814        let base_dir = self.ns.base_dir().to_string_lossy();
815        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
816        let ns_name = self.ns.name();
817
818        write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
819        Ok(())
820    }
821
822    pub fn name(&self) -> &str {
823        self.ns.name()
824    }
825
826    /// Get a first parachain from the list of the parachains with specified id.
827    /// NOTE!
828    /// Usually the list will contain only one parachain.
829    /// Multiple parachains with the same id is a corner case.
830    /// If this is the case then one can get such parachain with
831    /// `parachain_by_unique_id()` method
832    ///
833    /// # Arguments
834    /// * `para_id` - Parachain Id
835    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
836        self.parachains.get(&para_id)?.first()
837    }
838
839    /// Get a parachain by its unique id.
840    ///
841    /// This is particularly useful if there are multiple parachains
842    /// with the same id (this is a rare corner case).
843    ///
844    /// # Arguments
845    /// * `unique_id` - unique id of the parachain
846    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
847        self.parachains
848            .values()
849            .flat_map(|p| p.iter())
850            .find(|p| p.unique_id == unique_id.as_ref())
851    }
852
853    pub fn parachains(&self) -> Vec<&Parachain> {
854        self.parachains.values().flatten().collect()
855    }
856
857    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
858        self.relay.nodes.iter().chain(
859            self.parachains
860                .values()
861                .flat_map(|p| p.iter())
862                .flat_map(|p| &p.collators),
863        )
864    }
865
866    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
867        self.relay.nodes.iter_mut().chain(
868            self.parachains
869                .values_mut()
870                .flat_map(|p| p.iter_mut())
871                .flat_map(|p| &mut p.collators),
872        )
873    }
874
875    /// Waits given number of seconds until all nodes in the network report that they are
876    /// up and running.
877    ///
878    /// # Arguments
879    /// * `timeout_secs` - The number of seconds to wait.
880    ///
881    /// # Returns
882    /// * `Ok()` if the node is up before timeout occured.
883    /// * `Err(e)` if timeout or other error occurred while waiting.
884    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
885        let handles = self
886            .nodes_iter()
887            .map(|node| node.wait_until_is_up(timeout_secs));
888
889        futures::future::try_join_all(handles).await?;
890
891        Ok(())
892    }
893
894    /// Pause every node in the network (SIGSTOP). Issued in parallel.
895    ///
896    /// Note: with the native provider on an attached network, the live
897    /// network has to be running with global setting `teardown_on_failure`
898    /// disabled.
899    pub async fn pause(&self) -> Result<(), anyhow::Error> {
900        futures::future::try_join_all(self.nodes_iter().map(|n| n.pause())).await?;
901        Ok(())
902    }
903
904    /// Resume every node in the network (SIGCONT). Issued in parallel.
905    ///
906    /// Note: with the native provider on an attached network, the live
907    /// network has to be running with global setting `teardown_on_failure`
908    /// disabled.
909    pub async fn resume(&self) -> Result<(), anyhow::Error> {
910        futures::future::try_join_all(self.nodes_iter().map(|n| n.resume())).await?;
911        Ok(())
912    }
913
914    /// Start the observability stack (Prometheus + Grafana) as an add-on
915    ///
916    /// This can be called on any running network — whether freshly spawned or
917    /// re-attached via [`Orchestrator::attach_to_live`]. If observability is
918    /// already running, it will be stopped first
919    ///
920    /// # Example:
921    /// ```rust
922    /// # use provider::NativeProvider;
923    /// # use support::{fs::local::LocalFileSystem};
924    /// # use zombienet_orchestrator::{errors, Orchestrator};
925    /// # use configuration::{NetworkConfig, ObservabilityConfigBuilder};
926    /// # async fn example() -> Result<(), errors::OrchestratorError> {
927    /// #   let provider = NativeProvider::new(LocalFileSystem {});
928    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
929    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
930    /// let mut network = orchestrator.spawn(config).await?;
931    ///
932    /// let obs_config = ObservabilityConfigBuilder::new()
933    ///     .with_enabled(true)
934    ///     .with_grafana_port(3000)
935    ///     .build();
936    ///
937    /// let info = network.start_observability(&obs_config).await?;
938    /// println!("Grafana: {}", info.grafana_url);
939    /// #   Ok(())
940    /// # }
941    /// ```
942    pub async fn start_observability(
943        &mut self,
944        config: &configuration::ObservabilityConfig,
945    ) -> Result<&ObservabilityInfo, anyhow::Error> {
946        if self.observability().is_some() {
947            self.stop_observability().await?;
948        }
949
950        let nodes = self.nodes();
951        let info = observability::spawn_observability_stack(
952            config,
953            &nodes,
954            self.ns.name(),
955            self.ns.base_dir(),
956            &self.filesystem,
957        )
958        .await?;
959
960        self.observability = ObservabilityState::Running(info);
961        self.observability()
962            .ok_or_else(|| anyhow::anyhow!("observability state was just set but is not running"))
963    }
964
965    /// Stop the observability stack if running
966    ///
967    /// Removes the Prometheus and Grafana containers. This is safe to call
968    /// even if no observability stack is running (it will be a no-op)
969    pub async fn stop_observability(&mut self) -> Result<(), anyhow::Error> {
970        if let ObservabilityState::Running(info) =
971            std::mem::replace(&mut self.observability, ObservabilityState::Stopped)
972        {
973            observability::cleanup_observability_stack(&info).await?;
974        }
975        Ok(())
976    }
977
978    pub(crate) fn spawn_watching_task(&self) {
979        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
980        let ns = Arc::clone(&self.ns);
981        let node_bootstrap_timeout = self.initial_spec.global_settings.node_spawn_timeout();
982
983        tokio::spawn(async move {
984            loop {
985                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
986
987                let guard = nodes_to_watch.read().await;
988                let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
989
990                let all_running = {
991                    let all_running =
992                        futures::future::try_join_all(nodes.iter().map(|n| {
993                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
994                        }))
995                        .await;
996
997                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
998                    if nodes.iter().any(|n| !n.is_running()) {
999                        continue;
1000                    } else {
1001                        all_running
1002                    }
1003                };
1004
1005                if let Err(e) = all_running {
1006                    // check if the node was restarted and we need to give it more time
1007                    if let Some(node) = nodes.iter().find(|n| n.name() == e.to_string()) {
1008                        let now = SystemTime::now()
1009                            .duration_since(UNIX_EPOCH)
1010                            .expect("get current ts should work.")
1011                            .as_secs();
1012                        if node_bootstrap_timeout as u64 > (now - node.last_start_ts()) {
1013                            warn!("[{}] still in bootstrap window from last starting ({}), continue waiting...", node.name(), node.last_start_ts());
1014                            continue;
1015                        }
1016                    }
1017
1018                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
1019
1020                    if let Err(e) = ns.destroy().await {
1021                        error!("an error occurred during network teardown: {}", e);
1022                    }
1023
1024                    std::process::exit(1);
1025                }
1026            }
1027        });
1028    }
1029
1030    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
1031        self.parachains = parachains;
1032    }
1033
1034    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
1035        self.nodes_by_name.insert(node.name.clone(), node);
1036    }
1037
1038    pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
1039        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
1040            self.start_time_ts = Some(start_time_ts.as_millis().to_string());
1041        } else {
1042            // Just warn, do not propagate the err (this should not happens)
1043            warn!("⚠️ Error getting start_time timestamp");
1044        }
1045    }
1046}