zombienet_orchestrator/
network.rs

1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration};
7
8use configuration::{
9    para_states::{Initial, Running},
10    shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
11    types::{Arg, Command, Image, Port, ValidationContext},
12    ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
13};
14use provider::{types::TransferedFile, DynNamespace, ProviderError};
15use serde::Serialize;
16use support::fs::FileSystem;
17use tokio::sync::RwLock;
18use tracing::{error, warn};
19
20use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
21use crate::{
22    generators::chain_spec::ChainSpec,
23    network_spec::{self, NetworkSpec},
24    shared::{
25        constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
26        macros,
27        types::{ChainDefaultContext, RegisterParachainOptions},
28    },
29    spawner::{self, SpawnNodeCtx},
30    ScopedFilesystem, ZombieRole,
31};
32
33#[derive(Serialize)]
34pub struct Network<T: FileSystem> {
35    #[serde(skip)]
36    ns: DynNamespace,
37    #[serde(skip)]
38    filesystem: T,
39    relay: Relaychain,
40    initial_spec: NetworkSpec,
41    parachains: HashMap<u32, Vec<Parachain>>,
42    #[serde(skip)]
43    nodes_by_name: HashMap<String, NetworkNode>,
44    #[serde(skip)]
45    nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
46}
47
48impl<T: FileSystem> std::fmt::Debug for Network<T> {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("Network")
51            .field("ns", &"ns_skipped")
52            .field("relay", &self.relay)
53            .field("initial_spec", &self.initial_spec)
54            .field("parachains", &self.parachains)
55            .field("nodes_by_name", &self.nodes_by_name)
56            .finish()
57    }
58}
59
60macros::create_add_options!(AddNodeOptions {
61    chain_spec: Option<PathBuf>,
62    override_eth_key: Option<String>
63});
64
65macros::create_add_options!(AddCollatorOptions {
66    chain_spec: Option<PathBuf>,
67    chain_spec_relay: Option<PathBuf>,
68    override_eth_key: Option<String>
69});
70
71impl<T: FileSystem> Network<T> {
72    pub(crate) fn new_with_relay(
73        relay: Relaychain,
74        ns: DynNamespace,
75        fs: T,
76        initial_spec: NetworkSpec,
77    ) -> Self {
78        Self {
79            ns,
80            filesystem: fs,
81            relay,
82            initial_spec,
83            parachains: Default::default(),
84            nodes_by_name: Default::default(),
85            nodes_to_watch: Default::default(),
86        }
87    }
88
89    // Pubic API
90    pub fn ns_name(&self) -> String {
91        self.ns.name().to_string()
92    }
93
94    pub fn base_dir(&self) -> Option<&str> {
95        self.ns.base_dir().to_str()
96    }
97
98    pub fn relaychain(&self) -> &Relaychain {
99        &self.relay
100    }
101
102    // Teardown the network
103    pub async fn destroy(self) -> Result<(), ProviderError> {
104        self.ns.destroy().await
105    }
106
107    /// Add a node to the relaychain
108    // The new node is added to the running network instance.
109    /// # Example:
110    /// ```rust
111    /// # use provider::NativeProvider;
112    /// # use support::{fs::local::LocalFileSystem};
113    /// # use zombienet_orchestrator::{errors, AddNodeOptions, Orchestrator};
114    /// # use configuration::NetworkConfig;
115    /// # async fn example() -> Result<(), errors::OrchestratorError> {
116    /// #   let provider = NativeProvider::new(LocalFileSystem {});
117    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
118    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
119    /// let mut network = orchestrator.spawn(config).await?;
120    ///
121    /// // Create the options to add the new node
122    /// let opts = AddNodeOptions {
123    ///     rpc_port: Some(9444),
124    ///     is_validator: true,
125    ///     ..Default::default()
126    /// };
127    ///
128    /// network.add_node("new-node", opts).await?;
129    /// #   Ok(())
130    /// # }
131    /// ```
132    pub async fn add_node(
133        &mut self,
134        name: impl Into<String>,
135        options: AddNodeOptions,
136    ) -> Result<(), anyhow::Error> {
137        let name = generate_unique_node_name_from_names(
138            name,
139            &mut self.nodes_by_name.keys().cloned().collect(),
140        );
141
142        let relaychain = self.relaychain();
143
144        let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
145            chain_spec_custom_path.clone()
146        } else {
147            PathBuf::from(format!(
148                "{}/{}.json",
149                self.ns.base_dir().to_string_lossy(),
150                relaychain.chain
151            ))
152        };
153
154        let chain_context = ChainDefaultContext {
155            default_command: self.initial_spec.relaychain.default_command.as_ref(),
156            default_image: self.initial_spec.relaychain.default_image.as_ref(),
157            default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
158            default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
159            default_args: self.initial_spec.relaychain.default_args.iter().collect(),
160        };
161
162        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
163            &name,
164            options.into(),
165            &chain_context,
166            false,
167            false,
168        )?;
169
170        node_spec.available_args_output = Some(
171            self.initial_spec
172                .node_available_args_output(&node_spec, self.ns.clone())
173                .await?,
174        );
175
176        let base_dir = self.ns.base_dir().to_string_lossy();
177        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
178
179        let ctx = SpawnNodeCtx {
180            chain_id: &relaychain.chain_id,
181            parachain_id: None,
182            chain: &relaychain.chain,
183            role: ZombieRole::Node,
184            ns: &self.ns,
185            scoped_fs: &scoped_fs,
186            parachain: None,
187            bootnodes_addr: &vec![],
188            wait_ready: true,
189            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
190            global_settings: &self.initial_spec.global_settings,
191        };
192
193        let global_files_to_inject = vec![TransferedFile::new(
194            chain_spec_path,
195            PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
196        )];
197
198        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
199
200        // TODO: register the new node as validator in the relaychain
201        // STEPS:
202        //  - check balance of `stash` derivation for validator account
203        //  - call rotate_keys on the new validator
204        //  - call setKeys on the new validator
205        // if node_spec.is_validator {
206        //     let running_node = self.relay.nodes.first().unwrap();
207        //     // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
208        // }
209
210        // Let's make sure node is up before adding
211        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
212            .await?;
213
214        // Add node to relaychain data
215        self.add_running_node(node.clone(), None).await;
216
217        Ok(())
218    }
219
220    /// Add a new collator to a parachain
221    ///
222    /// NOTE: if more parachains with given id available (rare corner case)
223    /// then it adds collator to the first parachain
224    ///
225    /// # Example:
226    /// ```rust
227    /// # use provider::NativeProvider;
228    /// # use support::{fs::local::LocalFileSystem};
229    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
230    /// # use configuration::NetworkConfig;
231    /// # async fn example() -> Result<(), anyhow::Error> {
232    /// #   let provider = NativeProvider::new(LocalFileSystem {});
233    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
234    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
235    /// let mut network = orchestrator.spawn(config).await?;
236    ///
237    /// let col_opts = AddCollatorOptions {
238    ///     command: Some("polkadot-parachain".try_into()?),
239    ///     ..Default::default()
240    /// };
241    ///
242    /// network.add_collator("new-col-1", col_opts, 100).await?;
243    /// #   Ok(())
244    /// # }
245    /// ```
246    pub async fn add_collator(
247        &mut self,
248        name: impl Into<String>,
249        options: AddCollatorOptions,
250        para_id: u32,
251    ) -> Result<(), anyhow::Error> {
252        let name = generate_unique_node_name_from_names(
253            name,
254            &mut self.nodes_by_name.keys().cloned().collect(),
255        );
256        let spec = self
257            .initial_spec
258            .parachains
259            .iter()
260            .find(|para| para.id == para_id)
261            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
262        let role = if spec.is_cumulus_based {
263            ZombieRole::CumulusCollator
264        } else {
265            ZombieRole::Collator
266        };
267        let chain_context = ChainDefaultContext {
268            default_command: spec.default_command.as_ref(),
269            default_image: spec.default_image.as_ref(),
270            default_resources: spec.default_resources.as_ref(),
271            default_db_snapshot: spec.default_db_snapshot.as_ref(),
272            default_args: spec.default_args.iter().collect(),
273        };
274
275        let parachain = self
276            .parachains
277            .get_mut(&para_id)
278            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
279            .get_mut(0)
280            .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
281
282        let base_dir = self.ns.base_dir().to_string_lossy();
283        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
284
285        // TODO: we want to still supporting spawn a dedicated bootnode??
286        let ctx = SpawnNodeCtx {
287            chain_id: &self.relay.chain_id,
288            parachain_id: parachain.chain_id.as_deref(),
289            chain: &self.relay.chain,
290            role,
291            ns: &self.ns,
292            scoped_fs: &scoped_fs,
293            parachain: Some(spec),
294            bootnodes_addr: &vec![],
295            wait_ready: true,
296            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
297            global_settings: &self.initial_spec.global_settings,
298        };
299
300        let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
301            chain_spec_custom_path.clone()
302        } else {
303            PathBuf::from(format!(
304                "{}/{}.json",
305                self.ns.base_dir().to_string_lossy(),
306                self.relay.chain
307            ))
308        };
309
310        let mut global_files_to_inject = vec![TransferedFile::new(
311            relaychain_spec_path,
312            PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
313        )];
314
315        let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
316            Some(para_chain_spec_custom.clone())
317        } else if let Some(para_spec_path) = &parachain.chain_spec_path {
318            Some(PathBuf::from(format!(
319                "{}/{}",
320                self.ns.base_dir().to_string_lossy(),
321                para_spec_path.to_string_lossy()
322            )))
323        } else {
324            None
325        };
326
327        if let Some(para_spec_path) = para_chain_spec_local_path {
328            global_files_to_inject.push(TransferedFile::new(
329                para_spec_path,
330                PathBuf::from(format!("/cfg/{para_id}.json")),
331            ));
332        }
333
334        let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
335            name,
336            options.into(),
337            &chain_context,
338            true,
339            spec.is_evm_based,
340        )?;
341
342        node_spec.available_args_output = Some(
343            self.initial_spec
344                .node_available_args_output(&node_spec, self.ns.clone())
345                .await?,
346        );
347
348        let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
349
350        // Let's make sure node is up before adding
351        node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
352            .await?;
353
354        parachain.collators.push(node.clone());
355        self.add_running_node(node, None).await;
356
357        Ok(())
358    }
359
360    /// Get a parachain config builder from a running network
361    ///
362    /// This allow you to build a new parachain config to be deployed into
363    /// the running network.
364    pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
365        let used_ports = self
366            .nodes_iter()
367            .map(|node| node.spec())
368            .flat_map(|spec| {
369                [
370                    spec.ws_port.0,
371                    spec.rpc_port.0,
372                    spec.prometheus_port.0,
373                    spec.p2p_port.0,
374                ]
375            })
376            .collect();
377
378        let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
379
380        // need to inverse logic of generate_unique_para_id
381        let used_para_ids = self
382            .parachains
383            .iter()
384            .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
385            .collect();
386
387        let context = ValidationContext {
388            used_ports,
389            used_nodes_names,
390            used_para_ids,
391        };
392        let context = Rc::new(RefCell::new(context));
393
394        ParachainConfigBuilder::new_with_running(context)
395    }
396
397    /// Add a new parachain to the running network
398    ///
399    /// # Arguments
400    /// * `para_config` - Parachain configuration to deploy
401    /// * `custom_relaychain_spec` - Optional path to a custom relaychain spec to use
402    /// * `custom_parchain_fs_prefix` - Optional prefix to use when artifacts are created
403    ///
404    ///
405    /// # Example:
406    /// ```rust
407    /// # use anyhow::anyhow;
408    /// # use provider::NativeProvider;
409    /// # use support::{fs::local::LocalFileSystem};
410    /// # use zombienet_orchestrator::{errors, AddCollatorOptions, Orchestrator};
411    /// # use configuration::NetworkConfig;
412    /// # async fn example() -> Result<(), anyhow::Error> {
413    /// #   let provider = NativeProvider::new(LocalFileSystem {});
414    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
415    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
416    /// let mut network = orchestrator.spawn(config).await?;
417    /// let para_config = network
418    ///     .para_config_builder()
419    ///     .with_id(100)
420    ///     .with_default_command("polkadot-parachain")
421    ///     .with_collator(|c| c.with_name("col-100-1"))
422    ///     .build()
423    ///     .map_err(|_e| anyhow!("Building config"))?;
424    ///
425    /// network.add_parachain(&para_config, None, None).await?;
426    ///
427    /// #   Ok(())
428    /// # }
429    /// ```
430    pub async fn add_parachain(
431        &mut self,
432        para_config: &ParachainConfig,
433        custom_relaychain_spec: Option<PathBuf>,
434        custom_parchain_fs_prefix: Option<String>,
435    ) -> Result<(), anyhow::Error> {
436        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
437        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
438
439        let mut global_files_to_inject = vec![];
440
441        // get relaychain id
442        let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
443            // use this file as relaychain spec
444            global_files_to_inject.push(TransferedFile::new(
445                custom_path.clone(),
446                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
447            ));
448            let content = std::fs::read_to_string(custom_path)?;
449            ChainSpec::chain_id_from_spec(&content)?
450        } else {
451            global_files_to_inject.push(TransferedFile::new(
452                PathBuf::from(format!(
453                    "{}/{}",
454                    scoped_fs.base_dir,
455                    self.relaychain().chain_spec_path.to_string_lossy()
456                )),
457                PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
458            ));
459            self.relay.chain_id.clone()
460        };
461
462        let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
463            para_config,
464            relay_chain_id.as_str().try_into()?,
465        )?;
466
467        let chain_spec_raw_path = para_spec
468            .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
469            .await?;
470
471        // Para artifacts
472        let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
473            custom_prefix
474        } else {
475            para_spec.id.to_string()
476        };
477
478        scoped_fs.create_dir(&para_path_prefix).await?;
479        // create wasm/state
480        para_spec
481            .genesis_state
482            .build(
483                chain_spec_raw_path.as_ref(),
484                format!("{}/genesis-state", &para_path_prefix),
485                &self.ns,
486                &scoped_fs,
487                None,
488            )
489            .await?;
490        para_spec
491            .genesis_wasm
492            .build(
493                chain_spec_raw_path.as_ref(),
494                format!("{}/para_spec-wasm", &para_path_prefix),
495                &self.ns,
496                &scoped_fs,
497                None,
498            )
499            .await?;
500
501        let parachain =
502            Parachain::from_spec(&para_spec, &global_files_to_inject, &scoped_fs).await?;
503        let parachain_id = parachain.chain_id.clone();
504
505        // Create `ctx` for spawn the nodes
506        let ctx_para = SpawnNodeCtx {
507            parachain: Some(&para_spec),
508            parachain_id: parachain_id.as_deref(),
509            role: if para_spec.is_cumulus_based {
510                ZombieRole::CumulusCollator
511            } else {
512                ZombieRole::Collator
513            },
514            bootnodes_addr: &para_config
515                .bootnodes_addresses()
516                .iter()
517                .map(|&a| a.to_string())
518                .collect(),
519            chain_id: &self.relaychain().chain_id,
520            chain: &self.relaychain().chain,
521            ns: &self.ns,
522            scoped_fs: &scoped_fs,
523            wait_ready: false,
524            nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
525            global_settings: &self.initial_spec.global_settings,
526        };
527
528        // Register the parachain to the running network
529        let first_node_url = self
530            .relaychain()
531            .nodes
532            .first()
533            .ok_or(anyhow::anyhow!(
534                "At least one node of the relaychain should be running"
535            ))?
536            .ws_uri();
537
538        if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
539            let register_para_options = RegisterParachainOptions {
540                id: parachain.para_id,
541                // This needs to resolve correctly
542                wasm_path: para_spec
543                    .genesis_wasm
544                    .artifact_path()
545                    .ok_or(anyhow::anyhow!(
546                        "artifact path for wasm must be set at this point",
547                    ))?
548                    .to_path_buf(),
549                state_path: para_spec
550                    .genesis_state
551                    .artifact_path()
552                    .ok_or(anyhow::anyhow!(
553                        "artifact path for state must be set at this point",
554                    ))?
555                    .to_path_buf(),
556                node_ws_url: first_node_url.to_string(),
557                onboard_as_para: para_spec.onboard_as_parachain,
558                seed: None, // TODO: Seed is passed by?
559                finalization: false,
560            };
561
562            Parachain::register(register_para_options, &scoped_fs).await?;
563        }
564
565        // Spawn the nodes
566        let spawning_tasks = para_spec
567            .collators
568            .iter()
569            .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
570
571        let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
572
573        // Let's make sure nodes are up before adding them
574        let waiting_tasks = running_nodes.iter().map(|node| {
575            node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
576        });
577
578        let _ = futures::future::try_join_all(waiting_tasks).await?;
579
580        let running_para_id = parachain.para_id;
581        self.add_para(parachain);
582        for node in running_nodes {
583            self.add_running_node(node, Some(running_para_id)).await;
584        }
585
586        Ok(())
587    }
588
589    /// Register a parachain, which has already been added to the network (with manual registration
590    /// strategy)
591    ///
592    /// # Arguments
593    /// * `para_id` - Parachain Id
594    ///
595    ///
596    /// # Example:
597    /// ```rust
598    /// # use anyhow::anyhow;
599    /// # use provider::NativeProvider;
600    /// # use support::{fs::local::LocalFileSystem};
601    /// # use zombienet_orchestrator::Orchestrator;
602    /// # use configuration::{NetworkConfig, NetworkConfigBuilder, RegistrationStrategy};
603    /// # async fn example() -> Result<(), anyhow::Error> {
604    /// #   let provider = NativeProvider::new(LocalFileSystem {});
605    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
606    /// #   let config = NetworkConfigBuilder::new()
607    /// #     .with_relaychain(|r| {
608    /// #       r.with_chain("rococo-local")
609    /// #         .with_default_command("polkadot")
610    /// #         .with_node(|node| node.with_name("alice"))
611    /// #     })
612    /// #     .with_parachain(|p| {
613    /// #       p.with_id(100)
614    /// #         .with_registration_strategy(RegistrationStrategy::Manual)
615    /// #         .with_default_command("test-parachain")
616    /// #         .with_collator(|n| n.with_name("dave").validator(false))
617    /// #     })
618    /// #     .build()
619    /// #     .map_err(|_e| anyhow!("Building config"))?;
620    /// let mut network = orchestrator.spawn(config).await?;
621    ///
622    /// network.register_parachain(100).await?;
623    ///
624    /// #   Ok(())
625    /// # }
626    /// ```
627    pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
628        let para = self
629            .initial_spec
630            .parachains
631            .iter()
632            .find(|p| p.id == para_id)
633            .ok_or(anyhow::anyhow!(
634                "no parachain with id = {para_id} available",
635            ))?;
636        let para_genesis_config = para.get_genesis_config()?;
637        let first_node_url = self
638            .relaychain()
639            .nodes
640            .first()
641            .ok_or(anyhow::anyhow!(
642                "At least one node of the relaychain should be running"
643            ))?
644            .ws_uri();
645        let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
646            id: para_id,
647            // This needs to resolve correctly
648            wasm_path: para_genesis_config.wasm_path.clone(),
649            state_path: para_genesis_config.state_path.clone(),
650            node_ws_url: first_node_url.to_string(),
651            onboard_as_para: para_genesis_config.as_parachain,
652            seed: None, // TODO: Seed is passed by?
653            finalization: false,
654        };
655        let base_dir = self.ns.base_dir().to_string_lossy().to_string();
656        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
657        Parachain::register(register_para_options, &scoped_fs).await?;
658
659        Ok(())
660    }
661
662    // deregister and stop the collator?
663    // remove_parachain()
664
665    pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
666        let name = name.into();
667        if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
668            return Ok(node);
669        }
670
671        let list = self
672            .nodes_iter()
673            .map(|n| &n.name)
674            .cloned()
675            .collect::<Vec<_>>()
676            .join(", ");
677
678        Err(anyhow::anyhow!(
679            "can't find node with name: {name:?}, should be one of {list}"
680        ))
681    }
682
683    pub fn get_node_mut(
684        &mut self,
685        name: impl Into<String>,
686    ) -> Result<&mut NetworkNode, anyhow::Error> {
687        let name = name.into();
688        self.nodes_iter_mut()
689            .find(|n| n.name == name)
690            .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
691    }
692
693    pub fn nodes(&self) -> Vec<&NetworkNode> {
694        self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
695    }
696
697    pub async fn detach(&self) {
698        self.ns.detach().await
699    }
700
701    // Internal API
702    pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
703        if let Some(para_id) = para_id {
704            if let Some(para) = self.parachains.get_mut(&para_id).and_then(|p| p.get_mut(0)) {
705                para.collators.push(node.clone());
706            } else {
707                // is the first node of the para, let create the entry
708                unreachable!()
709            }
710        } else {
711            self.relay.nodes.push(node.clone());
712        }
713        // TODO: we should hold a ref to the node in the vec in the future.
714        node.set_is_running(true);
715        let node_name = node.name.clone();
716        self.nodes_by_name.insert(node_name, node.clone());
717        self.nodes_to_watch.write().await.push(node);
718    }
719
720    pub(crate) fn add_para(&mut self, para: Parachain) {
721        self.parachains.entry(para.para_id).or_default().push(para);
722    }
723
724    pub fn name(&self) -> &str {
725        self.ns.name()
726    }
727
728    /// Get a first parachain from the list of the parachains with specified id.
729    /// NOTE!
730    /// Usually the list will contain only one parachain.
731    /// Multiple parachains with the same id is a corner case.
732    /// If this is the case then one can get such parachain with
733    /// `parachain_by_unique_id()` method
734    ///
735    /// # Arguments
736    /// * `para_id` - Parachain Id
737    pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
738        self.parachains.get(&para_id)?.first()
739    }
740
741    /// Get a parachain by its unique id.
742    ///
743    /// This is particularly useful if there are multiple parachains
744    /// with the same id (this is a rare corner case).
745    ///
746    /// # Arguments
747    /// * `unique_id` - unique id of the parachain
748    pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
749        self.parachains
750            .values()
751            .flat_map(|p| p.iter())
752            .find(|p| p.unique_id == unique_id.as_ref())
753    }
754
755    pub fn parachains(&self) -> Vec<&Parachain> {
756        self.parachains.values().flatten().collect()
757    }
758
759    pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
760        self.relay.nodes.iter().chain(
761            self.parachains
762                .values()
763                .flat_map(|p| p.iter())
764                .flat_map(|p| &p.collators),
765        )
766    }
767
768    pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
769        self.relay.nodes.iter_mut().chain(
770            self.parachains
771                .values_mut()
772                .flat_map(|p| p.iter_mut())
773                .flat_map(|p| &mut p.collators),
774        )
775    }
776
777    /// Waits given number of seconds until all nodes in the network report that they are
778    /// up and running.
779    ///
780    /// # Arguments
781    /// * `timeout_secs` - The number of seconds to wait.
782    ///
783    /// # Returns
784    /// * `Ok()` if the node is up before timeout occured.
785    /// * `Err(e)` if timeout or other error occurred while waiting.
786    pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
787        let handles = self
788            .nodes_iter()
789            .map(|node| node.wait_until_is_up(timeout_secs));
790
791        futures::future::try_join_all(handles).await?;
792
793        Ok(())
794    }
795
796    pub(crate) fn spawn_watching_task(&self) {
797        let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
798        let ns = Arc::clone(&self.ns);
799
800        tokio::spawn(async move {
801            loop {
802                tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
803
804                let all_running = {
805                    let guard = nodes_to_watch.read().await;
806                    let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
807
808                    let all_running =
809                        futures::future::try_join_all(nodes.iter().map(|n| {
810                            n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
811                        }))
812                        .await;
813
814                    // Re-check `is_running` to make sure we don't kill the network unnecessarily
815                    if nodes.iter().any(|n| !n.is_running()) {
816                        continue;
817                    } else {
818                        all_running
819                    }
820                };
821
822                if let Err(e) = all_running {
823                    warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
824
825                    if let Err(e) = ns.destroy().await {
826                        error!("an error occurred during network teardown: {}", e);
827                    }
828
829                    std::process::exit(1);
830                }
831            }
832        });
833    }
834
835    pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
836        self.parachains = parachains;
837    }
838
839    pub(crate) fn insert_node(&mut self, node: NetworkNode) {
840        self.nodes_by_name.insert(node.name.clone(), node);
841    }
842}