zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7    cell::RefCell,
8    collections::HashMap,
9    path::PathBuf,
10    rc::Rc,
11    sync::Arc,
12    time::{Duration, SystemTime},
13};
14
15use configuration::{
16    para_states::{Initial, Running},
17    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18    types::{Arg, Command, Image, Port, ValidationContext},
19    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::Serialize;
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29    generators::chain_spec::ChainSpec,
30    network_spec::{self, NetworkSpec},
31    shared::{
32        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
33        macros,
34        types::{ChainDefaultContext, RegisterParachainOptions},
35    },
36    spawner::{self, SpawnNodeCtx},
37    utils::write_zombie_json,
38    ScopedFilesystem, ZombieRole,
39};
40
41#[derive(Serialize)]
42pub struct Network<T: FileSystem> {
43    #[serde(skip)]
44    ns: DynNamespace,
45    #[serde(skip)]
46    filesystem: T,
47    relay: Relaychain,
48    initial_spec: NetworkSpec,
49    parachains: HashMap<u32, Vec<Parachain>>,
50    #[serde(skip)]
51    nodes_by_name: HashMap<String, NetworkNode>,
52    #[serde(skip)]
53    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    start_time_ts: Option<String>,
56}
57
58impl<T: FileSystem> std::fmt::Debug for Network<T> {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Network")
61            .field("ns", &"ns_skipped")
62            .field("relay", &self.relay)
63            .field("initial_spec", &self.initial_spec)
64            .field("parachains", &self.parachains)
65            .field("nodes_by_name", &self.nodes_by_name)
66            .finish()
67    }
68}
69
70macros::create_add_options!(AddNodeOptions {
71    chain_spec: Option<PathBuf>,
72    override_eth_key: Option<String>
73});
74
75macros::create_add_options!(AddCollatorOptions {
76    chain_spec: Option<PathBuf>,
77    chain_spec_relay: Option<PathBuf>,
78    override_eth_key: Option<String>
79});
80
81impl<T: FileSystem> Network<T> {
82    pub(crate) fn new_with_relay(
83        relay: Relaychain,
84        ns: DynNamespace,
85        fs: T,
86        initial_spec: NetworkSpec,
87    ) -> Self {
88        Self {
89            ns,
90            filesystem: fs,
91            relay,
92            initial_spec,
93            parachains: Default::default(),
94            nodes_by_name: Default::default(),
95            nodes_to_watch: Default::default(),
96            start_time_ts: Default::default(),
97        }
98    }
99
100    // Pubic API
101    pub fn ns_name(&self) -> String {
102        self.ns.name().to_string()
103    }
104
105    pub fn base_dir(&self) -> Option<&str> {
106        self.ns.base_dir().to_str()
107    }
108
109    pub fn relaychain(&self) -> &Relaychain {
110        &self.relay
111    }
112
113    // Teardown the network
114    pub async fn destroy(self) -> Result<(), ProviderError> {
115        self.ns.destroy().await
116    }
117
118    /// Add a node to the relaychain
119    // The new node is added to the running network instance.
120    /// # Example:
121    /// ```rust
122    /// # use provider::NativeProvider;
123    /// # use support::{fs::local::LocalFileSystem};
124    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
125    /// # use configuration::NetworkConfig;
126    /// # async fn example() -> Result<(), errors::OrchestratorError> {
127    /// #   let provider = NativeProvider::new(LocalFileSystem {});
128    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
129    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
130    /// let mut network = orchestrator.spawn(config).await?;
131    ///
132    /// // Create the options to add the new node
133    /// let opts = AddNodeOptions {
134    ///     rpc_port: Some(9444),
135    ///     is_validator: true,
136    ///     ..Default::default()
137    /// };
138    ///
139    /// network.add_node("new-node", opts).await?;
140    /// #   Ok(())
141    /// # }
142    /// ```
143    pub async fn add_node(
144        &mut self,
145        name: impl Into<String>,
146        options: AddNodeOptions,
147    ) -> Result<(), anyhow::Error> {
148        let name = generate_unique_node_name_from_names(
149            name,
150            &mut self.nodes_by_name.keys().cloned().collect(),
151        );
152
153        let relaychain = self.relaychain();
154
155        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
156            chain_spec_custom_path.clone()
157        } else {
158            PathBuf::from(format!(
159                "{}/{}.json",
160                self.ns.base_dir().to_string_lossy(),
161                relaychain.chain
162            ))
163        };
164
165        let chain_context = ChainDefaultContext {
166            default_command: self.initial_spec.relaychain.default_command.as_ref(),
167            default_image: self.initial_spec.relaychain.default_image.as_ref(),
168            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
169            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
170            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
171        };
172
173        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
174            &name,
175            options.into(),
176            &chain_context,
177            false,
178            false,
179        )?;
180
181        node_spec.available_args_output = Some(
182            self.initial_spec
183                .node_available_args_output(&node_spec, self.ns.clone())
184                .await?,
185        );
186
187        let base_dir = self.ns.base_dir().to_string_lossy();
188        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
189
190        let ctx = SpawnNodeCtx {
191            chain_id: &relaychain.chain_id,
192            parachain_id: None,
193            chain: &relaychain.chain,
194            role: ZombieRole::Node,
195            ns: &self.ns,
196            scoped_fs: &scoped_fs,
197            parachain: None,
198            bootnodes_addr: &vec![],
199            wait_ready: true,
200            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
201            global_settings: &self.initial_spec.global_settings,
202        };
203
204        let global_files_to_inject = vec![TransferedFile::new(
205            chain_spec_path,
206            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
207        )];
208
209        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
210
211        // TODO: register the new node as validator in the relaychain
212        // STEPS:
213        //  - check balance of `stash` derivation for validator account
214        //  - call rotate_keys on the new validator
215        //  - call setKeys on the new validator
216        // if node_spec.is_validator {
217        //     let running_node = self.relay.nodes.first().unwrap();
218        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
219        // }
220
221        // Let's make sure node is up before adding
222        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
223            .await?;
224
225        // Add node to relaychain data
226        self.add_running_node(node.clone(), None).await;
227
228        // Dump zombie.json
229        self.write_zombie_json().await?;
230
231        Ok(())
232    }
233
234    /// Add a new collator to a parachain
235    ///
236    /// NOTE: if more parachains with given id available (rare corner case)
237    /// then it adds collator to the first parachain
238    ///
239    /// # Example:
240    /// ```rust
241    /// # use provider::NativeProvider;
242    /// # use support::{fs::local::LocalFileSystem};
243    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
244    /// # use configuration::NetworkConfig;
245    /// # async fn example() -> Result<(), anyhow::Error> {
246    /// #   let provider = NativeProvider::new(LocalFileSystem {});
247    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
248    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
249    /// let mut network = orchestrator.spawn(config).await?;
250    ///
251    /// let col_opts = AddCollatorOptions {
252    ///     command: Some("polkadot-parachain".try_into()?),
253    ///     ..Default::default()
254    /// };
255    ///
256    /// network.add_collator("new-col-1", col_opts, 100).await?;
257    /// #   Ok(())
258    /// # }
259    /// ```
260    pub async fn add_collator(
261        &mut self,
262        name: impl Into<String>,
263        options: AddCollatorOptions,
264        para_id: u32,
265    ) -> Result<(), anyhow::Error> {
266        let name = generate_unique_node_name_from_names(
267            name,
268            &mut self.nodes_by_name.keys().cloned().collect(),
269        );
270        let spec = self
271            .initial_spec
272            .parachains
273            .iter()
274            .find(|para| para.id == para_id)
275            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
276        let role = if spec.is_cumulus_based {
277            ZombieRole::CumulusCollator
278        } else {
279            ZombieRole::Collator
280        };
281        let chain_context = ChainDefaultContext {
282            default_command: spec.default_command.as_ref(),
283            default_image: spec.default_image.as_ref(),
284            default_resources: spec.default_resources.as_ref(),
285            default_db_snapshot: spec.default_db_snapshot.as_ref(),
286            default_args: spec.default_args.iter().collect(),
287        };
288
289        let parachain = self
290            .parachains
291            .get_mut(&para_id)
292            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
293            .get_mut(0)
294            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
295
296        let base_dir = self.ns.base_dir().to_string_lossy();
297        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
298
299        // TODO: we want to still supporting spawn a dedicated bootnode??
300        let ctx = SpawnNodeCtx {
301            chain_id: &self.relay.chain_id,
302            parachain_id: parachain.chain_id.as_deref(),
303            chain: &self.relay.chain,
304            role,
305            ns: &self.ns,
306            scoped_fs: &scoped_fs,
307            parachain: Some(spec),
308            bootnodes_addr: &vec![],
309            wait_ready: true,
310            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
311            global_settings: &self.initial_spec.global_settings,
312        };
313
314        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
315            chain_spec_custom_path.clone()
316        } else {
317            PathBuf::from(format!(
318                "{}/{}.json",
319                self.ns.base_dir().to_string_lossy(),
320                self.relay.chain
321            ))
322        };
323
324        let mut global_files_to_inject = vec![TransferedFile::new(
325            relaychain_spec_path,
326            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
327        )];
328
329        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
330            Some(para_chain_spec_custom.clone())
331        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
332            Some(PathBuf::from(format!(
333                "{}/{}",
334                self.ns.base_dir().to_string_lossy(),
335                para_spec_path.to_string_lossy()
336            )))
337        } else {
338            None
339        };
340
341        if let Some(para_spec_path) = para_chain_spec_local_path {
342            global_files_to_inject.push(TransferedFile::new(
343                para_spec_path,
344                PathBuf::from(format!("/cfg/{para_id}.json")),
345            ));
346        }
347
348        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
349            name,
350            options.into(),
351            &chain_context,
352            true,
353            spec.is_evm_based,
354        )?;
355
356        node_spec.available_args_output = Some(
357            self.initial_spec
358                .node_available_args_output(&node_spec, self.ns.clone())
359                .await?,
360        );
361
362        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
363
364        // Let's make sure node is up before adding
365        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
366            .await?;
367
368        parachain.collators.push(node.clone());
369        self.add_running_node(node, None).await;
370
371        // Dump zombie.json
372        self.write_zombie_json().await?;
373
374        Ok(())
375    }
376
377    /// Get a parachain config builder from a running network
378    ///
379    /// This allow you to build a new parachain config to be deployed into
380    /// the running network.
381    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
382        let used_ports = self
383            .nodes_iter()
384            .map(|node| node.spec())
385            .flat_map(|spec| {
386                [
387                    spec.ws_port.0,
388                    spec.rpc_port.0,
389                    spec.prometheus_port.0,
390                    spec.p2p_port.0,
391                ]
392            })
393            .collect();
394
395        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
396
397        // need to inverse logic of generate_unique_para_id
398        let used_para_ids = self
399            .parachains
400            .iter()
401            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
402            .collect();
403
404        let context = ValidationContext {
405            used_ports,
406            used_nodes_names,
407            used_para_ids,
408        };
409        let context = Rc::new(RefCell::new(context));
410
411        ParachainConfigBuilder::new_with_running(context)
412    }
413
414    /// Add a new parachain to the running network
415    ///
416    /// # Arguments
417    /// * `para_config` - Parachain configuration to deploy
418    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
419    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
420    ///
421    ///
422    /// # Example:
423    /// ```rust
424    /// # use anyhow::anyhow;
425    /// # use provider::NativeProvider;
426    /// # use support::{fs::local::LocalFileSystem};
427    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
428    /// # use configuration::NetworkConfig;
429    /// # async fn example() -> Result<(), anyhow::Error> {
430    /// #   let provider = NativeProvider::new(LocalFileSystem {});
431    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
432    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
433    /// let mut network = orchestrator.spawn(config).await?;
434    /// let para_config = network
435    ///     .para_config_builder()
436    ///     .with_id(100)
437    ///     .with_default_command("polkadot-parachain")
438    ///     .with_collator(|c| c.with_name("col-100-1"))
439    ///     .build()
440    ///     .map_err(|_e| anyhow!("Building config"))?;
441    ///
442    /// network.add_parachain(&para_config, None, None).await?;
443    ///
444    /// #   Ok(())
445    /// # }
446    /// ```
447    pub async fn add_parachain(
448        &mut self,
449        para_config: &ParachainConfig,
450        custom_relaychain_spec: Option<PathBuf>,
451        custom_parchain_fs_prefix: Option<String>,
452    ) -> Result<(), anyhow::Error> {
453        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
454        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
455
456        let mut global_files_to_inject = vec![];
457
458        // get relaychain id
459        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
460            // use this file as relaychain spec
461            global_files_to_inject.push(TransferedFile::new(
462                custom_path.clone(),
463                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
464            ));
465            let content = std::fs::read_to_string(custom_path)?;
466            ChainSpec::chain_id_from_spec(&content)?
467        } else {
468            global_files_to_inject.push(TransferedFile::new(
469                PathBuf::from(format!(
470                    "{}/{}",
471                    scoped_fs.base_dir,
472                    self.relaychain().chain_spec_path.to_string_lossy()
473                )),
474                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
475            ));
476            self.relay.chain_id.clone()
477        };
478
479        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
480            para_config,
481            relay_chain_id.as_str().try_into()?,
482        )?;
483
484        let chain_spec_raw_path = para_spec
485            .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
486            .await?;
487
488        // Para artifacts
489        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
490            custom_prefix
491        } else {
492            para_spec.id.to_string()
493        };
494
495        scoped_fs.create_dir(&para_path_prefix).await?;
496        // create wasm/state
497        para_spec
498            .genesis_state
499            .build(
500                chain_spec_raw_path.as_ref(),
501                format!("{}/genesis-state", &para_path_prefix),
502                &self.ns,
503                &scoped_fs,
504                None,
505            )
506            .await?;
507        para_spec
508            .genesis_wasm
509            .build(
510                chain_spec_raw_path.as_ref(),
511                format!("{}/para_spec-wasm", &para_path_prefix),
512                &self.ns,
513                &scoped_fs,
514                None,
515            )
516            .await?;
517
518        let parachain =
519            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
520        let parachain_id = parachain.chain_id.clone();
521
522        // Create `ctx` for spawn the nodes
523        let ctx_para = SpawnNodeCtx {
524            parachain: Some(&para_spec),
525            parachain_id: parachain_id.as_deref(),
526            role: if para_spec.is_cumulus_based {
527                ZombieRole::CumulusCollator
528            } else {
529                ZombieRole::Collator
530            },
531            bootnodes_addr: &para_config
532                .bootnodes_addresses()
533                .iter()
534                .map(|&a| a.to_string())
535                .collect(),
536            chain_id: &self.relaychain().chain_id,
537            chain: &self.relaychain().chain,
538            ns: &self.ns,
539            scoped_fs: &scoped_fs,
540            wait_ready: false,
541            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
542            global_settings: &self.initial_spec.global_settings,
543        };
544
545        // Register the parachain to the running network
546        let first_node_url = self
547            .relaychain()
548            .nodes
549            .first()
550            .ok_or(anyhow::anyhow!(
551                "At least one node of the relaychain should be running"
552            ))?
553            .ws_uri();
554
555        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
556            let register_para_options = RegisterParachainOptions {
557                id: parachain.para_id,
558                // This needs to resolve correctly
559                wasm_path: para_spec
560                    .genesis_wasm
561                    .artifact_path()
562                    .ok_or(anyhow::anyhow!(
563                        "artifact path for wasm must be set at this point",
564                    ))?
565                    .to_path_buf(),
566                state_path: para_spec
567                    .genesis_state
568                    .artifact_path()
569                    .ok_or(anyhow::anyhow!(
570                        "artifact path for state must be set at this point",
571                    ))?
572                    .to_path_buf(),
573                node_ws_url: first_node_url.to_string(),
574                onboard_as_para: para_spec.onboard_as_parachain,
575                seed: None, // TODO: Seed is passed by?
576                finalization: false,
577            };
578
579            Parachain::register(register_para_options, &scoped_fs).await?;
580        }
581
582        // Spawn the nodes
583        let spawning_tasks = para_spec
584            .collators
585            .iter()
586            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
587
588        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
589
590        // Let's make sure nodes are up before adding them
591        let waiting_tasks = running_nodes.iter().map(|node| {
592            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
593        });
594
595        let _ = futures::future::try_join_all(waiting_tasks).await?;
596
597        let running_para_id = parachain.para_id;
598        self.add_para(parachain);
599        for node in running_nodes {
600            self.add_running_node(node, Some(running_para_id)).await;
601        }
602
603        // Dump zombie.json
604        self.write_zombie_json().await?;
605
606        Ok(())
607    }
608
609    /// Register a parachain, which has already been added to the network (with manual registration
610    /// strategy)
611    ///
612    /// # Arguments
613    /// * `para_id` - Parachain Id
614    ///
615    ///
616    /// # Example:
617    /// ```rust
618    /// # use anyhow::anyhow;
619    /// # use provider::NativeProvider;
620    /// # use support::{fs::local::LocalFileSystem};
621    /// # use zombienet_orchestrator::Orchestrator;
622    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
623    /// # async fn example() -> Result<(), anyhow::Error> {
624    /// #   let provider = NativeProvider::new(LocalFileSystem {});
625    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
626    /// #   let config = NetworkConfigBuilder::new()
627    /// #     .with_relaychain(|r| {
628    /// #       r.with_chain("rococo-local")
629    /// #         .with_default_command("polkadot")
630    /// #         .with_node(|node| node.with_name("alice"))
631    /// #     })
632    /// #     .with_parachain(|p| {
633    /// #       p.with_id(100)
634    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
635    /// #         .with_default_command("test-parachain")
636    /// #         .with_collator(|n| n.with_name("dave").validator(false))
637    /// #     })
638    /// #     .build()
639    /// #     .map_err(|_e| anyhow!("Building config"))?;
640    /// let mut network = orchestrator.spawn(config).await?;
641    ///
642    /// network.register_parachain(100).await?;
643    ///
644    /// #   Ok(())
645    /// # }
646    /// ```
647    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
648        let para = self
649            .initial_spec
650            .parachains
651            .iter()
652            .find(|p| p.id == para_id)
653            .ok_or(anyhow::anyhow!(
654                "no parachain with id = {para_id} available",
655            ))?;
656        let para_genesis_config = para.get_genesis_config()?;
657        let first_node_url = self
658            .relaychain()
659            .nodes
660            .first()
661            .ok_or(anyhow::anyhow!(
662                "At least one node of the relaychain should be running"
663            ))?
664            .ws_uri();
665        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
666            id: para_id,
667            // This needs to resolve correctly
668            wasm_path: para_genesis_config.wasm_path.clone(),
669            state_path: para_genesis_config.state_path.clone(),
670            node_ws_url: first_node_url.to_string(),
671            onboard_as_para: para_genesis_config.as_parachain,
672            seed: None, // TODO: Seed is passed by?
673            finalization: false,
674        };
675        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
676        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
677        Parachain::register(register_para_options, &scoped_fs).await?;
678
679        Ok(())
680    }
681
682    // deregister and stop the collator?
683    // remove_parachain()
684
685    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
686        let name = name.into();
687        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
688            return Ok(node);
689        }
690
691        let list = self
692            .nodes_iter()
693            .map(|n| &n.name)
694            .cloned()
695            .collect::<Vec<_>>()
696            .join(", ");
697
698        Err(anyhow::anyhow!(
699            "can't find node with name: {name:?}, should be one of {list}"
700        ))
701    }
702
703    pub fn get_node_mut(
704        &mut self,
705        name: impl Into<String>,
706    ) -> Result<&mut NetworkNode, anyhow::Error> {
707        let name = name.into();
708        self.nodes_iter_mut()
709            .find(|n| n.name == name)
710            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
711    }
712
713    pub fn nodes(&self) -> Vec<&NetworkNode> {
714        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
715    }
716
717    pub async fn detach(&self) {
718        self.ns.detach().await
719    }
720
721    // Internal API
722    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
723        if let Some(para_id) = para_id {
724            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
725                para.collators.push(node.clone());
726            } else {
727                // is the first node of the para, let create the entry
728                unreachable!()
729            }
730        } else {
731            self.relay.nodes.push(node.clone());
732        }
733        // TODO: we should hold a ref to the node in the vec in the future.
734        node.set_is_running(true);
735        let node_name = node.name.clone();
736        self.nodes_by_name.insert(node_name, node.clone());
737        self.nodes_to_watch.write().await.push(node);
738    }
739
740    pub(crate) fn add_para(&mut self, para: Parachain) {
741        self.parachains.entry(para.para_id).or_default().push(para);
742    }
743
744    pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
745        let base_dir = self.ns.base_dir().to_string_lossy();
746        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
747        let ns_name = self.ns.name();
748
749        write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
750        Ok(())
751    }
752
753    pub fn name(&self) -> &str {
754        self.ns.name()
755    }
756
757    /// Get a first parachain from the list of the parachains with specified id.
758    /// NOTE!
759    /// Usually the list will contain only one parachain.
760    /// Multiple parachains with the same id is a corner case.
761    /// If this is the case then one can get such parachain with
762    /// `parachain_by_unique_id()` method
763    ///
764    /// # Arguments
765    /// * `para_id` - Parachain Id
766    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
767        self.parachains.get(&para_id)?.first()
768    }
769
770    /// Get a parachain by its unique id.
771    ///
772    /// This is particularly useful if there are multiple parachains
773    /// with the same id (this is a rare corner case).
774    ///
775    /// # Arguments
776    /// * `unique_id` - unique id of the parachain
777    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
778        self.parachains
779            .values()
780            .flat_map(|p| p.iter())
781            .find(|p| p.unique_id == unique_id.as_ref())
782    }
783
784    pub fn parachains(&self) -> Vec<&Parachain> {
785        self.parachains.values().flatten().collect()
786    }
787
788    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
789        self.relay.nodes.iter().chain(
790            self.parachains
791                .values()
792                .flat_map(|p| p.iter())
793                .flat_map(|p| &p.collators),
794        )
795    }
796
797    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
798        self.relay.nodes.iter_mut().chain(
799            self.parachains
800                .values_mut()
801                .flat_map(|p| p.iter_mut())
802                .flat_map(|p| &mut p.collators),
803        )
804    }
805
806    /// Waits given number of seconds until all nodes in the network report that they are
807    /// up and running.
808    ///
809    /// # Arguments
810    /// * `timeout_secs` - The number of seconds to wait.
811    ///
812    /// # Returns
813    /// * `Ok()` if the node is up before timeout occured.
814    /// * `Err(e)` if timeout or other error occurred while waiting.
815    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
816        let handles = self
817            .nodes_iter()
818            .map(|node| node.wait_until_is_up(timeout_secs));
819
820        futures::future::try_join_all(handles).await?;
821
822        Ok(())
823    }
824
825    pub(crate) fn spawn_watching_task(&self) {
826        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
827        let ns = Arc::clone(&self.ns);
828
829        tokio::spawn(async move {
830            loop {
831                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
832
833                let all_running = {
834                    let guard = nodes_to_watch.read().await;
835                    let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
836
837                    let all_running =
838                        futures::future::try_join_all(nodes.iter().map(|n| {
839                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
840                        }))
841                        .await;
842
843                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
844                    if nodes.iter().any(|n| !n.is_running()) {
845                        continue;
846                    } else {
847                        all_running
848                    }
849                };
850
851                if let Err(e) = all_running {
852                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
853
854                    if let Err(e) = ns.destroy().await {
855                        error!("an error occurred during network teardown: {}", e);
856                    }
857
858                    std::process::exit(1);
859                }
860            }
861        });
862    }
863
864    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
865        self.parachains = parachains;
866    }
867
868    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
869        self.nodes_by_name.insert(node.name.clone(), node);
870    }
871
872    pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
873        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
874            self.start_time_ts = Some(start_time_ts.as_millis().to_string());
875        } else {
876            // Just warn, do not propagate the err (this should not happens)
877            warn!("⚠️ Error getting start_time timestamp");
878        }
879    }
880}