zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration};
7
8use configuration::{
9    para_states::{Initial, Running},
10    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
11    types::{Arg, Command, Image, Port, ValidationContext},
12    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
13};
14use provider::{types::TransferedFile, DynNamespace, ProviderError};
15use serde::Serialize;
16use support::fs::FileSystem;
17use tokio::sync::RwLock;
18use tracing::{error, warn};
19
20use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
21use crate::{
22    generators::chain_spec::ChainSpec,
23    network_spec::{self, NetworkSpec},
24    shared::{
25        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
26        macros,
27        types::{ChainDefaultContext, RegisterParachainOptions},
28    },
29    spawner::{self, SpawnNodeCtx},
30    ScopedFilesystem, ZombieRole,
31};
32
33#[derive(Serialize)]
34pub struct Network<T: FileSystem> {
35    #[serde(skip)]
36    ns: DynNamespace,
37    #[serde(skip)]
38    filesystem: T,
39    relay: Relaychain,
40    initial_spec: NetworkSpec,
41    parachains: HashMap<u32, Vec<Parachain>>,
42    #[serde(skip)]
43    nodes_by_name: HashMap<String, NetworkNode>,
44    #[serde(skip)]
45    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
46}
47
48impl<T: FileSystem> std::fmt::Debug for Network<T> {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("Network")
51            .field("ns", &"ns_skipped")
52            .field("relay", &self.relay)
53            .field("initial_spec", &self.initial_spec)
54            .field("parachains", &self.parachains)
55            .field("nodes_by_name", &self.nodes_by_name)
56            .finish()
57    }
58}
59
60macros::create_add_options!(AddNodeOptions {
61    chain_spec: Option<PathBuf>
62});
63
64macros::create_add_options!(AddCollatorOptions {
65    chain_spec: Option<PathBuf>,
66    chain_spec_relay: Option<PathBuf>
67});
68
69impl<T: FileSystem> Network<T> {
70    pub(crate) fn new_with_relay(
71        relay: Relaychain,
72        ns: DynNamespace,
73        fs: T,
74        initial_spec: NetworkSpec,
75    ) -> Self {
76        Self {
77            ns,
78            filesystem: fs,
79            relay,
80            initial_spec,
81            parachains: Default::default(),
82            nodes_by_name: Default::default(),
83            nodes_to_watch: Default::default(),
84        }
85    }
86
87    // Pubic API
88    pub fn ns_name(&self) -> String {
89        self.ns.name().to_string()
90    }
91
92    pub fn base_dir(&self) -> Option<&str> {
93        self.ns.base_dir().to_str()
94    }
95
96    pub fn relaychain(&self) -> &Relaychain {
97        &self.relay
98    }
99
100    // Teardown the network
101    pub async fn destroy(self) -> Result<(), ProviderError> {
102        self.ns.destroy().await
103    }
104
105    /// Add a node to the relaychain
106    // The new node is added to the running network instance.
107    /// # Example:
108    /// ```rust
109    /// # use provider::NativeProvider;
110    /// # use support::{fs::local::LocalFileSystem};
111    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
112    /// # use configuration::NetworkConfig;
113    /// # async fn example() -> Result<(), errors::OrchestratorError> {
114    /// #   let provider = NativeProvider::new(LocalFileSystem {});
115    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
116    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
117    /// let mut network = orchestrator.spawn(config).await?;
118    ///
119    /// // Create the options to add the new node
120    /// let opts = AddNodeOptions {
121    ///     rpc_port: Some(9444),
122    ///     is_validator: true,
123    ///     ..Default::default()
124    /// };
125    ///
126    /// network.add_node("new-node", opts).await?;
127    /// #   Ok(())
128    /// # }
129    /// ```
130    pub async fn add_node(
131        &mut self,
132        name: impl Into<String>,
133        options: AddNodeOptions,
134    ) -> Result<(), anyhow::Error> {
135        let name = generate_unique_node_name_from_names(
136            name,
137            &mut self.nodes_by_name.keys().cloned().collect(),
138        );
139
140        let relaychain = self.relaychain();
141
142        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
143            chain_spec_custom_path.clone()
144        } else {
145            PathBuf::from(format!(
146                "{}/{}.json",
147                self.ns.base_dir().to_string_lossy(),
148                relaychain.chain
149            ))
150        };
151
152        let chain_context = ChainDefaultContext {
153            default_command: self.initial_spec.relaychain.default_command.as_ref(),
154            default_image: self.initial_spec.relaychain.default_image.as_ref(),
155            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
156            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
157            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
158        };
159
160        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
161            &name,
162            options.into(),
163            &chain_context,
164            false,
165        )?;
166
167        node_spec.available_args_output = Some(
168            self.initial_spec
169                .node_available_args_output(&node_spec, self.ns.clone())
170                .await?,
171        );
172
173        let base_dir = self.ns.base_dir().to_string_lossy();
174        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
175
176        let ctx = SpawnNodeCtx {
177            chain_id: &relaychain.chain_id,
178            parachain_id: None,
179            chain: &relaychain.chain,
180            role: ZombieRole::Node,
181            ns: &self.ns,
182            scoped_fs: &scoped_fs,
183            parachain: None,
184            bootnodes_addr: &vec![],
185            wait_ready: true,
186            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
187        };
188
189        let global_files_to_inject = vec![TransferedFile::new(
190            chain_spec_path,
191            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
192        )];
193
194        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
195
196        // TODO: register the new node as validator in the relaychain
197        // STEPS:
198        //  - check balance of `stash` derivation for validator account
199        //  - call rotate_keys on the new validator
200        //  - call setKeys on the new validator
201        // if node_spec.is_validator {
202        //     let running_node = self.relay.nodes.first().unwrap();
203        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
204        // }
205
206        // Let's make sure node is up before adding
207        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
208            .await?;
209
210        // Add node to relaychain data
211        self.add_running_node(node.clone(), None).await;
212
213        Ok(())
214    }
215
216    /// Add a new collator to a parachain
217    ///
218    /// NOTE: if more parachains with given id available (rare corner case)
219    /// then it adds collator to the first parachain
220    ///
221    /// # Example:
222    /// ```rust
223    /// # use provider::NativeProvider;
224    /// # use support::{fs::local::LocalFileSystem};
225    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
226    /// # use configuration::NetworkConfig;
227    /// # async fn example() -> Result<(), anyhow::Error> {
228    /// #   let provider = NativeProvider::new(LocalFileSystem {});
229    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
230    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
231    /// let mut network = orchestrator.spawn(config).await?;
232    ///
233    /// let col_opts = AddCollatorOptions {
234    ///     command: Some("polkadot-parachain".try_into()?),
235    ///     ..Default::default()
236    /// };
237    ///
238    /// network.add_collator("new-col-1", col_opts, 100).await?;
239    /// #   Ok(())
240    /// # }
241    /// ```
242    pub async fn add_collator(
243        &mut self,
244        name: impl Into<String>,
245        options: AddCollatorOptions,
246        para_id: u32,
247    ) -> Result<(), anyhow::Error> {
248        let name = generate_unique_node_name_from_names(
249            name,
250            &mut self.nodes_by_name.keys().cloned().collect(),
251        );
252        let spec = self
253            .initial_spec
254            .parachains
255            .iter()
256            .find(|para| para.id == para_id)
257            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
258        let role = if spec.is_cumulus_based {
259            ZombieRole::CumulusCollator
260        } else {
261            ZombieRole::Collator
262        };
263        let chain_context = ChainDefaultContext {
264            default_command: spec.default_command.as_ref(),
265            default_image: spec.default_image.as_ref(),
266            default_resources: spec.default_resources.as_ref(),
267            default_db_snapshot: spec.default_db_snapshot.as_ref(),
268            default_args: spec.default_args.iter().collect(),
269        };
270
271        let parachain = self
272            .parachains
273            .get_mut(&para_id)
274            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
275            .get_mut(0)
276            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
277
278        let base_dir = self.ns.base_dir().to_string_lossy();
279        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
280
281        // TODO: we want to still supporting spawn a dedicated bootnode??
282        let ctx = SpawnNodeCtx {
283            chain_id: &self.relay.chain_id,
284            parachain_id: parachain.chain_id.as_deref(),
285            chain: &self.relay.chain,
286            role,
287            ns: &self.ns,
288            scoped_fs: &scoped_fs,
289            parachain: Some(spec),
290            bootnodes_addr: &vec![],
291            wait_ready: true,
292            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
293        };
294
295        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
296            chain_spec_custom_path.clone()
297        } else {
298            PathBuf::from(format!(
299                "{}/{}.json",
300                self.ns.base_dir().to_string_lossy(),
301                self.relay.chain
302            ))
303        };
304
305        let mut global_files_to_inject = vec![TransferedFile::new(
306            relaychain_spec_path,
307            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
308        )];
309
310        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
311            Some(para_chain_spec_custom.clone())
312        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
313            Some(PathBuf::from(format!(
314                "{}/{}",
315                self.ns.base_dir().to_string_lossy(),
316                para_spec_path.to_string_lossy()
317            )))
318        } else {
319            None
320        };
321
322        if let Some(para_spec_path) = para_chain_spec_local_path {
323            global_files_to_inject.push(TransferedFile::new(
324                para_spec_path,
325                PathBuf::from(format!("/cfg/{para_id}.json")),
326            ));
327        }
328
329        let mut node_spec =
330            network_spec::node::NodeSpec::from_ad_hoc(name, options.into(), &chain_context, true)?;
331
332        node_spec.available_args_output = Some(
333            self.initial_spec
334                .node_available_args_output(&node_spec, self.ns.clone())
335                .await?,
336        );
337
338        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
339
340        // Let's make sure node is up before adding
341        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
342            .await?;
343
344        parachain.collators.push(node.clone());
345        self.add_running_node(node, None).await;
346
347        Ok(())
348    }
349
350    /// Get a parachain config builder from a running network
351    ///
352    /// This allow you to build a new parachain config to be deployed into
353    /// the running network.
354    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
355        let used_ports = self
356            .nodes_iter()
357            .map(|node| node.spec())
358            .flat_map(|spec| {
359                [
360                    spec.ws_port.0,
361                    spec.rpc_port.0,
362                    spec.prometheus_port.0,
363                    spec.p2p_port.0,
364                ]
365            })
366            .collect();
367
368        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
369
370        // need to inverse logic of generate_unique_para_id
371        let used_para_ids = self
372            .parachains
373            .iter()
374            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
375            .collect();
376
377        let context = ValidationContext {
378            used_ports,
379            used_nodes_names,
380            used_para_ids,
381        };
382        let context = Rc::new(RefCell::new(context));
383
384        ParachainConfigBuilder::new_with_running(context)
385    }
386
387    /// Add a new parachain to the running network
388    ///
389    /// # Arguments
390    /// * `para_config` - Parachain configuration to deploy
391    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
392    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
393    ///
394    ///
395    /// # Example:
396    /// ```rust
397    /// # use anyhow::anyhow;
398    /// # use provider::NativeProvider;
399    /// # use support::{fs::local::LocalFileSystem};
400    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
401    /// # use configuration::NetworkConfig;
402    /// # async fn example() -> Result<(), anyhow::Error> {
403    /// #   let provider = NativeProvider::new(LocalFileSystem {});
404    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
405    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
406    /// let mut network = orchestrator.spawn(config).await?;
407    /// let para_config = network
408    ///     .para_config_builder()
409    ///     .with_id(100)
410    ///     .with_default_command("polkadot-parachain")
411    ///     .with_collator(|c| c.with_name("col-100-1"))
412    ///     .build()
413    ///     .map_err(|_e| anyhow!("Building config"))?;
414    ///
415    /// network.add_parachain(&para_config, None, None).await?;
416    ///
417    /// #   Ok(())
418    /// # }
419    /// ```
420    pub async fn add_parachain(
421        &mut self,
422        para_config: &ParachainConfig,
423        custom_relaychain_spec: Option<PathBuf>,
424        custom_parchain_fs_prefix: Option<String>,
425    ) -> Result<(), anyhow::Error> {
426        // build
427        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(para_config)?;
428        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
429        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
430
431        let mut global_files_to_inject = vec![];
432
433        // get relaychain id
434        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
435            // use this file as relaychain spec
436            global_files_to_inject.push(TransferedFile::new(
437                custom_path.clone(),
438                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
439            ));
440            let content = std::fs::read_to_string(custom_path)?;
441            ChainSpec::chain_id_from_spec(&content)?
442        } else {
443            global_files_to_inject.push(TransferedFile::new(
444                PathBuf::from(format!(
445                    "{}/{}",
446                    scoped_fs.base_dir,
447                    self.relaychain().chain_spec_path.to_string_lossy()
448                )),
449                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
450            ));
451            self.relay.chain_id.clone()
452        };
453
454        let chain_spec_raw_path = para_spec
455            .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
456            .await?;
457
458        // Para artifacts
459        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
460            custom_prefix
461        } else {
462            para_spec.id.to_string()
463        };
464
465        scoped_fs.create_dir(&para_path_prefix).await?;
466        // create wasm/state
467        para_spec
468            .genesis_state
469            .build(
470                chain_spec_raw_path.as_ref(),
471                format!("{}/genesis-state", &para_path_prefix),
472                &self.ns,
473                &scoped_fs,
474            )
475            .await?;
476        para_spec
477            .genesis_wasm
478            .build(
479                chain_spec_raw_path.as_ref(),
480                format!("{}/para_spec-wasm", &para_path_prefix),
481                &self.ns,
482                &scoped_fs,
483            )
484            .await?;
485
486        let parachain =
487            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
488        let parachain_id = parachain.chain_id.clone();
489
490        // Create `ctx` for spawn the nodes
491        let ctx_para = SpawnNodeCtx {
492            parachain: Some(&para_spec),
493            parachain_id: parachain_id.as_deref(),
494            role: if para_spec.is_cumulus_based {
495                ZombieRole::CumulusCollator
496            } else {
497                ZombieRole::Collator
498            },
499            bootnodes_addr: &vec![],
500            chain_id: &self.relaychain().chain_id,
501            chain: &self.relaychain().chain,
502            ns: &self.ns,
503            scoped_fs: &scoped_fs,
504            wait_ready: false,
505            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
506        };
507
508        // Register the parachain to the running network
509        let first_node_url = self
510            .relaychain()
511            .nodes
512            .first()
513            .ok_or(anyhow::anyhow!(
514                "At least one node of the relaychain should be running"
515            ))?
516            .ws_uri();
517
518        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
519            let register_para_options = RegisterParachainOptions {
520                id: parachain.para_id,
521                // This needs to resolve correctly
522                wasm_path: para_spec
523                    .genesis_wasm
524                    .artifact_path()
525                    .ok_or(anyhow::anyhow!(
526                        "artifact path for wasm must be set at this point",
527                    ))?
528                    .to_path_buf(),
529                state_path: para_spec
530                    .genesis_state
531                    .artifact_path()
532                    .ok_or(anyhow::anyhow!(
533                        "artifact path for state must be set at this point",
534                    ))?
535                    .to_path_buf(),
536                node_ws_url: first_node_url.to_string(),
537                onboard_as_para: para_spec.onboard_as_parachain,
538                seed: None, // TODO: Seed is passed by?
539                finalization: false,
540            };
541
542            Parachain::register(register_para_options, &scoped_fs).await?;
543        }
544
545        // Spawn the nodes
546        let spawning_tasks = para_spec
547            .collators
548            .iter()
549            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
550
551        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
552
553        // Let's make sure nodes are up before adding them
554        let waiting_tasks = running_nodes.iter().map(|node| {
555            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
556        });
557
558        let _ = futures::future::try_join_all(waiting_tasks).await?;
559
560        let running_para_id = parachain.para_id;
561        self.add_para(parachain);
562        for node in running_nodes {
563            self.add_running_node(node, Some(running_para_id)).await;
564        }
565
566        Ok(())
567    }
568
569    /// Register a parachain, which has already been added to the network (with manual registration
570    /// strategy)
571    ///
572    /// # Arguments
573    /// * `para_id` - Parachain Id
574    ///
575    ///
576    /// # Example:
577    /// ```rust
578    /// # use anyhow::anyhow;
579    /// # use provider::NativeProvider;
580    /// # use support::{fs::local::LocalFileSystem};
581    /// # use zombienet_orchestrator::Orchestrator;
582    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
583    /// # async fn example() -> Result<(), anyhow::Error> {
584    /// #   let provider = NativeProvider::new(LocalFileSystem {});
585    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
586    /// #   let config = NetworkConfigBuilder::new()
587    /// #     .with_relaychain(|r| {
588    /// #       r.with_chain("rococo-local")
589    /// #         .with_default_command("polkadot")
590    /// #         .with_node(|node| node.with_name("alice"))
591    /// #     })
592    /// #     .with_parachain(|p| {
593    /// #       p.with_id(100)
594    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
595    /// #         .with_default_command("test-parachain")
596    /// #         .with_collator(|n| n.with_name("dave").validator(false))
597    /// #     })
598    /// #     .build()
599    /// #     .map_err(|_e| anyhow!("Building config"))?;
600    /// let mut network = orchestrator.spawn(config).await?;
601    ///
602    /// network.register_parachain(100).await?;
603    ///
604    /// #   Ok(())
605    /// # }
606    /// ```
607    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
608        let para = self
609            .initial_spec
610            .parachains
611            .iter()
612            .find(|p| p.id == para_id)
613            .ok_or(anyhow::anyhow!(
614                "no parachain with id = {para_id} available",
615            ))?;
616        let para_genesis_config = para.get_genesis_config()?;
617        let first_node_url = self
618            .relaychain()
619            .nodes
620            .first()
621            .ok_or(anyhow::anyhow!(
622                "At least one node of the relaychain should be running"
623            ))?
624            .ws_uri();
625        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
626            id: para_id,
627            // This needs to resolve correctly
628            wasm_path: para_genesis_config.wasm_path.clone(),
629            state_path: para_genesis_config.state_path.clone(),
630            node_ws_url: first_node_url.to_string(),
631            onboard_as_para: para_genesis_config.as_parachain,
632            seed: None, // TODO: Seed is passed by?
633            finalization: false,
634        };
635        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
636        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
637        Parachain::register(register_para_options, &scoped_fs).await?;
638
639        Ok(())
640    }
641
642    // deregister and stop the collator?
643    // remove_parachain()
644
645    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
646        let name = name.into();
647        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
648            return Ok(node);
649        }
650
651        let list = self
652            .nodes_iter()
653            .map(|n| &n.name)
654            .cloned()
655            .collect::<Vec<_>>()
656            .join(", ");
657
658        Err(anyhow::anyhow!(
659            "can't find node with name: {name:?}, should be one of {list}"
660        ))
661    }
662
663    pub fn get_node_mut(
664        &mut self,
665        name: impl Into<String>,
666    ) -> Result<&mut NetworkNode, anyhow::Error> {
667        let name = name.into();
668        self.nodes_iter_mut()
669            .find(|n| n.name == name)
670            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
671    }
672
673    pub fn nodes(&self) -> Vec<&NetworkNode> {
674        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
675    }
676
677    pub async fn detach(&self) {
678        self.ns.detach().await
679    }
680
681    // Internal API
682    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
683        if let Some(para_id) = para_id {
684            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
685                para.collators.push(node.clone());
686            } else {
687                // is the first node of the para, let create the entry
688                unreachable!()
689            }
690        } else {
691            self.relay.nodes.push(node.clone());
692        }
693        // TODO: we should hold a ref to the node in the vec in the future.
694        node.set_is_running(true);
695        let node_name = node.name.clone();
696        self.nodes_by_name.insert(node_name, node.clone());
697        self.nodes_to_watch.write().await.push(node);
698    }
699
700    pub(crate) fn add_para(&mut self, para: Parachain) {
701        self.parachains.entry(para.para_id).or_default().push(para);
702    }
703
704    pub fn name(&self) -> &str {
705        self.ns.name()
706    }
707
708    /// Get a first parachain from the list of the parachains with specified id.
709    /// NOTE!
710    /// Usually the list will contain only one parachain.
711    /// Multiple parachains with the same id is a corner case.
712    /// If this is the case then one can get such parachain with
713    /// `parachain_by_unique_id()` method
714    ///
715    /// # Arguments
716    /// * `para_id` - Parachain Id
717    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
718        self.parachains.get(&para_id)?.first()
719    }
720
721    /// Get a parachain by its unique id.
722    ///
723    /// This is particularly useful if there are multiple parachains
724    /// with the same id (this is a rare corner case).
725    ///
726    /// # Arguments
727    /// * `unique_id` - unique id of the parachain
728    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
729        self.parachains
730            .values()
731            .flat_map(|p| p.iter())
732            .find(|p| p.unique_id == unique_id.as_ref())
733    }
734
735    pub fn parachains(&self) -> Vec<&Parachain> {
736        self.parachains.values().flatten().collect()
737    }
738
739    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
740        self.relay.nodes.iter().chain(
741            self.parachains
742                .values()
743                .flat_map(|p| p.iter())
744                .flat_map(|p| &p.collators),
745        )
746    }
747
748    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
749        self.relay.nodes.iter_mut().chain(
750            self.parachains
751                .values_mut()
752                .flat_map(|p| p.iter_mut())
753                .flat_map(|p| &mut p.collators),
754        )
755    }
756
757    /// Waits given number of seconds until all nodes in the network report that they are
758    /// up and running.
759    ///
760    /// # Arguments
761    /// * `timeout_secs` - The number of seconds to wait.
762    ///
763    /// # Returns
764    /// * `Ok()` if the node is up before timeout occured.
765    /// * `Err(e)` if timeout or other error occurred while waiting.
766    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
767        let handles = self
768            .nodes_iter()
769            .map(|node| node.wait_until_is_up(timeout_secs));
770
771        futures::future::try_join_all(handles).await?;
772
773        Ok(())
774    }
775
776    pub(crate) fn spawn_watching_task(&self) {
777        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
778        let ns = Arc::clone(&self.ns);
779
780        tokio::spawn(async move {
781            loop {
782                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
783
784                let all_running = {
785                    let guard = nodes_to_watch.read().await;
786                    let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
787
788                    let all_running =
789                        futures::future::try_join_all(nodes.iter().map(|n| {
790                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
791                        }))
792                        .await;
793
794                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
795                    if nodes.iter().any(|n| !n.is_running()) {
796                        continue;
797                    } else {
798                        all_running
799                    }
800                };
801
802                if let Err(e) = all_running {
803                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
804
805                    if let Err(e) = ns.destroy().await {
806                        error!("an error occurred during network teardown: {}", e);
807                    }
808
809                    std::process::exit(1);
810                }
811            }
812        });
813    }
814}