Skip to main content

zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    env,
19    net::IpAddr,
20    path::{Path, PathBuf},
21    time::{Duration, SystemTime},
22};
23
24use anyhow::anyhow;
25use configuration::{types::JsonOverrides, NetworkConfig, RegistrationStrategy};
26use errors::OrchestratorError;
27use generators::{core_assignment, errors::GeneratorError};
28use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
29// re-exported
30pub use network_spec::NetworkSpec;
31use network_spec::{node::NodeSpec, parachain::ParachainSpec};
32use provider::{
33    types::{ProviderCapabilities, TransferedFile},
34    DynNamespace, DynProvider,
35};
36use serde_json::json;
37use support::{
38    constants::{
39        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
40        THIS_IS_A_BUG,
41    },
42    fs::{FileSystem, FileSystemError},
43    replacer::{get_tokens_to_replace, has_tokens},
44};
45use tokio::time::timeout;
46use tracing::{debug, info, trace, warn};
47
48use crate::{
49    network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
50    shared::types::RegisterParachainOptions,
51    spawner::SpawnNodeCtx,
52    utils::write_zombie_json,
53};
54pub struct Orchestrator<T>
55where
56    T: FileSystem + Sync + Send,
57{
58    filesystem: T,
59    provider: DynProvider,
60}
61
62impl<T> Orchestrator<T>
63where
64    T: FileSystem + Sync + Send + Clone,
65{
66    pub fn new(filesystem: T, provider: DynProvider) -> Self {
67        Self {
68            filesystem,
69            provider,
70        }
71    }
72
73    pub async fn spawn(
74        &self,
75        network_config: NetworkConfig,
76    ) -> Result<Network<T>, OrchestratorError> {
77        let global_timeout = network_config.global_settings().network_spawn_timeout();
78        let network_spec = NetworkSpec::from_config(&network_config).await?;
79
80        let res = timeout(
81            Duration::from_secs(global_timeout.into()),
82            self.spawn_inner(network_spec),
83        )
84        .await
85        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86        res?
87    }
88
89    pub async fn spawn_from_spec(
90        &self,
91        network_spec: NetworkSpec,
92    ) -> Result<Network<T>, OrchestratorError> {
93        let global_timeout = network_spec.global_settings.network_spawn_timeout();
94        let res = timeout(
95            Duration::from_secs(global_timeout as u64),
96            self.spawn_inner(network_spec),
97        )
98        .await
99        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
100        res?
101    }
102
103    pub async fn attach_to_live(
104        &self,
105        zombie_json_path: &Path,
106    ) -> Result<Network<T>, OrchestratorError> {
107        info!("attaching to live network...");
108        info!("reading zombie.json from {:?}", zombie_json_path);
109
110        let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
111        let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
112
113        info!("recreating namespace...");
114        let ns: DynNamespace = self
115            .provider
116            .create_namespace_from_json(&zombie_json)
117            .await?;
118
119        info!("recreating relaychain...");
120        let (relay, initial_spec) =
121            recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
122        let relay_nodes = relay.nodes.clone();
123
124        let mut network =
125            Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
126
127        for node in relay_nodes {
128            if node.is_responsive().await {
129                node.set_is_running(true);
130            }
131            network.insert_node(node);
132        }
133
134        info!("recreating parachains...");
135        let parachains_map =
136            recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
137        let para_nodes = parachains_map
138            .values()
139            .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
140            .collect::<Vec<NetworkNode>>();
141
142        network.set_parachains(parachains_map);
143        for node in para_nodes {
144            if node.is_responsive().await {
145                node.set_is_running(true);
146            }
147            network.insert_node(node);
148        }
149
150        Ok(network)
151    }
152
153    async fn spawn_inner(
154        &self,
155        mut network_spec: NetworkSpec,
156    ) -> Result<Network<T>, OrchestratorError> {
157        // main driver for spawn the network
158        debug!(network_spec = ?network_spec,"Network spec to spawn");
159
160        // TODO: move to Provider trait
161        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
162            .map_err(|err| {
163                OrchestratorError::InvalidConfigForProvider(
164                    self.provider.name().into(),
165                    err.to_string(),
166                )
167            })?;
168
169        // create namespace
170        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
171            self.provider
172                .create_namespace_with_base_dir(base_dir)
173                .await?
174        } else {
175            self.provider.create_namespace().await?
176        };
177
178        // set the spawn_concurrency
179        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
180
181        let start_time = SystemTime::now();
182        info!("🧰 ns: {}", ns.name());
183        info!("🧰 base_dir: {:?}", ns.base_dir());
184        info!("🕰 start time: {:?}", start_time);
185        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
186
187        network_spec
188            .populate_nodes_available_args(ns.clone())
189            .await?;
190
191        // Resolve every node's `db_snapshot` AssetLocation into a local
192        // cache file once, serially, before any parallel spawn.
193        let all_nodes: Vec<&NodeSpec> = network_spec
194            .relaychain()
195            .nodes
196            .iter()
197            .chain(
198                network_spec
199                    .parachains_iter()
200                    .flat_map(|p| p.collators.iter()),
201            )
202            .collect();
203        let resolved_db_snapshots =
204            generators::resolve_db_snapshots(all_nodes, &ns, &self.filesystem).await?;
205
206        let base_dir = ns.base_dir().to_string_lossy();
207        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
208        // Create chain-spec for relaychain
209        network_spec
210            .relaychain
211            .chain_spec
212            .build(&ns, &scoped_fs)
213            .await?;
214
215        debug!("relaychain spec built!");
216        // Create parachain artifacts (chain-spec, wasm, state)
217        let relay_chain_id = network_spec
218            .relaychain
219            .chain_spec
220            .read_chain_id(&scoped_fs)
221            .await?;
222
223        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
224        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
225        network_spec
226            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
227            .await?;
228
229        // Gather the parachains to register in genesis and the ones to register with extrinsic
230        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
231            Vec<&ParachainSpec>,
232            Vec<&ParachainSpec>,
233        ) = network_spec
234            .parachains
235            .iter()
236            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
237            .partition(|para| {
238                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
239            });
240
241        let mut para_artifacts = vec![];
242        for para in &para_to_register_in_genesis {
243            let genesis_config = para.get_genesis_config()?;
244            para_artifacts.push(genesis_config)
245        }
246
247        // Customize relaychain
248        network_spec
249            .relaychain
250            .chain_spec
251            .customize_relay(
252                &network_spec.relaychain,
253                &network_spec.hrmp_channels,
254                para_artifacts,
255                &scoped_fs,
256            )
257            .await?;
258
259        // Run post-process script if configured for the relaychain (run against plain spec before building raw)
260        if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
261            network_spec
262                .relaychain
263                .chain_spec
264                .run_post_process_script(script_cmd, &scoped_fs)
265                .await?;
266        }
267
268        // Override cores if needed
269        let num_cores = network_spec.parachains_iter().fold(0u32, |mut acc, para| {
270            if let Some(cores) = para.num_cores {
271                acc += cores;
272            } else if &RegistrationStrategy::InGenesis == para.registration_strategy() {
273                // add 1 by default
274                acc += 1;
275            }
276
277            acc
278        });
279
280        if num_cores > para_to_register_in_genesis.len() as u32 {
281            let num_cores_to_set = num_cores - para_to_register_in_genesis.len() as u32;
282            // we should set the correct core config
283            let overrides = json!({
284                "configuration": {
285                    "config": {
286                        "scheduler_params": {
287                            "num_cores": num_cores_to_set,
288                            "max_validators_per_core": 1
289                        },
290                    }
291                }
292            });
293
294            network_spec
295                .relaychain
296                .chain_spec
297                .apply_genesis_override(&scoped_fs, &overrides)
298                .await?;
299        }
300
301        // Build raw version (after any post-processing of the plain spec)
302        network_spec
303            .relaychain
304            .chain_spec
305            .build_raw(&ns, &scoped_fs, None)
306            .await?;
307
308        // override wasm if needed
309        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
310            network_spec
311                .relaychain
312                .chain_spec
313                .override_code(&scoped_fs, wasm_override)
314                .await?;
315        }
316
317        // custom override raw spec if needed
318        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
319            network_spec
320                .relaychain
321                .chain_spec
322                .override_raw_spec(&scoped_fs, raw_spec_override)
323                .await?;
324        }
325
326        // assign extra cores if needed
327        debug!("Raw overrides info: num_cores: {}, para_to_register_in_genesis_len: {:?}, override_session_0: {}", num_cores, para_to_register_in_genesis.len(), network_spec.relaychain().override_session_0);
328        if num_cores > para_to_register_in_genesis.len() as u32
329            || network_spec.relaychain().override_session_0
330        {
331            let mut core_index = 0u32;
332            // we should check with version the runtime is using
333            // could be ParaScheduler or CoretimeAssignmentProvider
334            let scheduler_key = core_assignment::get_parascheduler_storage_key();
335            let is_old = !network_spec
336                .relaychain
337                .chain_spec
338                .find_raw_key(&scoped_fs, &scheduler_key)
339                .await?;
340
341            let mut para_scheduler_value_parts: Vec<String> = vec![];
342            let mut raw_json_overrides = json!({});
343            // loop over para and assign cores from 0..
344            for para in &network_spec.parachains {
345                // cores we need to assign
346                let mut cores_for_para = 0_u32;
347                if let Some(cores) = para.num_cores {
348                    if &RegistrationStrategy::InGenesis == para.registration_strategy() {
349                        cores_for_para = cores;
350                    }
351                } else {
352                    // no num_cores set but we need to check if `override_session_0` is true
353                    // to assign the first core.
354                    if &RegistrationStrategy::InGenesis == para.registration_strategy()
355                        && network_spec.relaychain().override_session_0
356                    {
357                        cores_for_para += 1;
358                    }
359                }
360
361                debug!(
362                    "Assigning {cores_for_para} cores in raw spec for para {}. Using pallet {}",
363                    para.id,
364                    if is_old {
365                        "CoretimeAssignmentProvider"
366                    } else {
367                        "ParaScheduler"
368                    }
369                );
370                for _core in 0..cores_for_para {
371                    if is_old {
372                        let (core_assign_key, core_assign_value) =
373                            core_assignment::generate_old(core_index, para.id);
374                        raw_json_overrides[core_assign_key] = json!(core_assign_value);
375                    } else {
376                        let part = core_assignment::generate(core_index, para.id);
377                        para_scheduler_value_parts.push(part);
378                    }
379                    core_index += 1;
380                }
381            }
382
383            // if not old we need to store the k/v to override
384            if !is_old {
385                let count_prefix = format!("{:02x}", para_scheduler_value_parts.len() * 4);
386                let core_assign_value =
387                    format!("{count_prefix}{}", para_scheduler_value_parts.join(""));
388                raw_json_overrides[scheduler_key] = json!(core_assign_value);
389            }
390
391            // extra check to ensure we need to override session 0
392            if network_spec.relaychain().override_session_0 {
393                trace!("Overriding pallet ParaSessionInfo.session (0) to allow paras to produce blocks at first session.");
394                let raw_spec = network_spec
395                    .relaychain
396                    .chain_spec
397                    .read_raw_spec(&scoped_fs)
398                    .await?;
399                let overrides = generators::generate_session_0_overrides(&raw_spec, num_cores)?;
400
401                for (k, v) in overrides.as_object().ok_or(anyhow!(
402                    "'generate_session_0_overrides' should be a valid json Object."
403                ))? {
404                    raw_json_overrides[k] = v.clone();
405                }
406            }
407
408            debug!("Raw overrides keys: {:?}", raw_json_overrides);
409
410            network_spec
411                .relaychain
412                .chain_spec
413                .override_raw_spec(
414                    &scoped_fs,
415                    &JsonOverrides::Json(json!({
416                        "genesis": {
417                            "raw": {
418                                "top": raw_json_overrides
419                            }
420                        }
421                    })),
422                )
423                .await?;
424        }
425
426        let (bootnodes, relaynodes) =
427            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
428
429        // TODO: we want to still supporting spawn a dedicated bootnode??
430        let mut ctx = SpawnNodeCtx {
431            chain_id: &relay_chain_id,
432            parachain_id: None,
433            chain: relay_chain_name.as_str(),
434            role: ZombieRole::Node,
435            ns: &ns,
436            scoped_fs: &scoped_fs,
437            parachain: None,
438            bootnodes_addr: &vec![],
439            wait_ready: false,
440            nodes_by_name: json!({}),
441            global_settings: &network_spec.global_settings,
442            resolved_db_snapshots: &resolved_db_snapshots,
443        };
444
445        let global_files_to_inject = vec![TransferedFile::new(
446            PathBuf::from(format!(
447                "{}/{relay_chain_name}.json",
448                ns.base_dir().to_string_lossy()
449            )),
450            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
451        )];
452
453        let r = Relaychain::new(
454            relay_chain_name.to_string(),
455            relay_chain_id.clone(),
456            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
457                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
458            )?),
459        );
460        let mut network =
461            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
462
463        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
464        let mut node_ws_url: String = "".to_string();
465
466        // Calculate the bootnodes addr from the running nodes
467        let mut bootnodes_addr: Vec<String> = vec![];
468
469        for level in dependency_levels_among(&bootnodes)? {
470            let mut running_nodes_per_level = vec![];
471            for chunk in level.chunks(spawn_concurrency) {
472                let spawning_tasks = chunk
473                    .iter()
474                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
475
476                for node in futures::future::try_join_all(spawning_tasks).await? {
477                    let bootnode_multiaddr = node.multiaddr();
478
479                    bootnodes_addr.push(bootnode_multiaddr.to_string());
480
481                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
482                    if node_ws_url.is_empty() {
483                        node_ws_url.clone_from(&node.ws_uri)
484                    }
485
486                    running_nodes_per_level.push(node);
487                }
488            }
489            info!(
490                "🕰  waiting for level: {:?} to be up...",
491                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
492            );
493
494            // Wait for all nodes in the current level to be up
495            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
496                node.wait_until_is_up(network_spec.global_settings.node_spawn_timeout())
497            });
498
499            let _ = futures::future::try_join_all(waiting_tasks).await?;
500
501            for node in running_nodes_per_level {
502                // Add the node to the  context and `Network` instance
503                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
504                network.add_running_node(node, None).await;
505            }
506        }
507
508        // Add the bootnodes to the relaychain spec file and ctx
509        network_spec
510            .relaychain
511            .chain_spec
512            .add_bootnodes(&scoped_fs, &bootnodes_addr)
513            .await?;
514
515        ctx.bootnodes_addr = &bootnodes_addr;
516
517        for level in dependency_levels_among(&relaynodes)? {
518            let mut running_nodes_per_level = vec![];
519            for chunk in level.chunks(spawn_concurrency) {
520                let spawning_tasks = chunk
521                    .iter()
522                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
523
524                for node in futures::future::try_join_all(spawning_tasks).await? {
525                    running_nodes_per_level.push(node);
526                }
527            }
528            info!(
529                "🕰 waiting for level: {:?} to be up...",
530                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
531            );
532
533            // Wait for all nodes in the current level to be up
534            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
535                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
536            });
537
538            let _ = futures::future::try_join_all(waiting_tasks).await?;
539
540            for node in running_nodes_per_level {
541                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
542                network.add_running_node(node, None).await;
543            }
544        }
545
546        // spawn paras
547        for para in network_spec.parachains.iter() {
548            // Create parachain (in the context of the running network)
549            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
550            let parachain_id = parachain.chain_id.clone();
551
552            let (bootnodes, collators) =
553                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
554
555            // Create `ctx` for spawn parachain nodes
556            let mut ctx_para = SpawnNodeCtx {
557                parachain: Some(para),
558                parachain_id: parachain_id.as_deref(),
559                role: if para.is_cumulus_based {
560                    ZombieRole::CumulusCollator
561                } else {
562                    ZombieRole::Collator
563                },
564                bootnodes_addr: &vec![],
565                ..ctx.clone()
566            };
567
568            // Calculate the bootnodes addr from the running nodes
569            let mut bootnodes_addr: Vec<String> = vec![];
570            let mut running_nodes: Vec<NetworkNode> = vec![];
571
572            for level in dependency_levels_among(&bootnodes)? {
573                let mut running_nodes_per_level = vec![];
574                for chunk in level.chunks(spawn_concurrency) {
575                    let spawning_tasks = chunk.iter().map(|node| {
576                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
577                    });
578
579                    for node in futures::future::try_join_all(spawning_tasks).await? {
580                        let bootnode_multiaddr = node.multiaddr();
581
582                        bootnodes_addr.push(bootnode_multiaddr.to_string());
583
584                        running_nodes_per_level.push(node);
585                    }
586                }
587                info!(
588                    "🕰  waiting for level: {:?} to be up...",
589                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
590                );
591
592                // Wait for all nodes in the current level to be up
593                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
594                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
595                });
596
597                let _ = futures::future::try_join_all(waiting_tasks).await?;
598
599                for node in running_nodes_per_level {
600                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
601                    running_nodes.push(node);
602                }
603            }
604
605            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
606                para_chain_spec
607                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
608                    .await?;
609            }
610
611            ctx_para.bootnodes_addr = &bootnodes_addr;
612
613            // Spawn the rest of the nodes
614            for level in dependency_levels_among(&collators)? {
615                let mut running_nodes_per_level = vec![];
616                for chunk in level.chunks(spawn_concurrency) {
617                    let spawning_tasks = chunk.iter().map(|node| {
618                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
619                    });
620
621                    for node in futures::future::try_join_all(spawning_tasks).await? {
622                        running_nodes_per_level.push(node);
623                    }
624                }
625                info!(
626                    "🕰  waiting for level: {:?} to be up...",
627                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
628                );
629
630                // Wait for all nodes in the current level to be up
631                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
632                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
633                });
634
635                let _ = futures::future::try_join_all(waiting_tasks).await?;
636
637                for node in running_nodes_per_level {
638                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
639                    running_nodes.push(node);
640                }
641            }
642
643            let running_para_id = parachain.para_id;
644            network.add_para(parachain);
645            for node in running_nodes {
646                network.add_running_node(node, Some(running_para_id)).await;
647            }
648        }
649
650        // Now we need to register the paras with extrinsic from the Vec collected before;
651        for para in para_to_register_with_extrinsic {
652            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
653                id: para.id,
654                // This needs to resolve correctly
655                wasm_path: para
656                    .genesis_wasm
657                    .artifact_path()
658                    .ok_or(OrchestratorError::InvariantError(
659                        "artifact path for wasm must be set at this point",
660                    ))?
661                    .to_path_buf(),
662                state_path: para
663                    .genesis_state
664                    .artifact_path()
665                    .ok_or(OrchestratorError::InvariantError(
666                        "artifact path for state must be set at this point",
667                    ))?
668                    .to_path_buf(),
669                node_ws_url: node_ws_url.clone(),
670                onboard_as_para: para.onboard_as_parachain,
671                seed: None, // TODO: Seed is passed by?
672                finalization: false,
673            };
674
675            Parachain::register(register_para_options, &scoped_fs).await?;
676        }
677
678        if network_spec.global_settings.observability().enabled() {
679            match network
680                .start_observability(network_spec.global_settings.observability())
681                .await
682            {
683                Ok(obs) => {
684                    info!("📊 Prometheus URL: {}", obs.prometheus_url);
685                    info!("📊 Grafana URL: {}", obs.grafana_url);
686                },
687                Err(e) => {
688                    warn!("⚠️  Failed to spawn observability stack: {e}");
689                },
690            }
691        }
692
693        // start custom processes if needed
694        for cp in &network_spec.custom_processes {
695            if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
696                warn!("⚠️  Failed to spawn custom process {}, err: {e}", cp.name())
697            }
698        }
699
700        network.set_start_time_ts(start_time);
701
702        write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
703
704        if network_spec.global_settings.tear_down_on_failure() {
705            network.spawn_watching_task();
706        }
707
708        generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
709
710        Ok(network)
711    }
712}
713
714// Helpers
715
716async fn recreate_network_nodes_from_json(
717    nodes_json: &serde_json::Value,
718    ns: DynNamespace,
719    provider_name: &str,
720) -> Result<Vec<NetworkNode>, OrchestratorError> {
721    let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
722
723    let mut nodes = Vec::with_capacity(raw_nodes.len());
724    for raw in raw_nodes {
725        // validate provider tag
726        let provider_tag = raw
727            .inner
728            .get("provider_tag")
729            .and_then(|v| v.as_str())
730            .ok_or_else(|| {
731                OrchestratorError::InvalidConfig(format!(
732                    "Node '{}' is missing `provider_tag` in inner node JSON",
733                    raw.name
734                ))
735            })?;
736
737        if provider_tag != provider_name {
738            return Err(OrchestratorError::InvalidConfigForProvider(
739                provider_name.to_string(),
740                provider_tag.to_string(),
741            ));
742        }
743        let inner = ns.spawn_node_from_json(&raw.inner).await?;
744        let relay_node = NetworkNode::new(
745            raw.name,
746            raw.ws_uri,
747            raw.prometheus_uri,
748            raw.multiaddr,
749            raw.spec,
750            inner,
751            raw.cmd_generator_opts,
752            raw.context,
753        );
754        nodes.push(relay_node);
755    }
756
757    Ok(nodes)
758}
759
760async fn recreate_relaychain_from_json(
761    zombie_json: &serde_json::Value,
762    ns: DynNamespace,
763    provider_name: &str,
764) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
765    let relay_json = zombie_json
766        .get("relay")
767        .ok_or(OrchestratorError::InvalidConfig(
768            "Missing `relay` field in zombie.json".into(),
769        ))?
770        .clone();
771
772    let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
773
774    let initial_spec: NetworkSpec = serde_json::from_value(
775        zombie_json
776            .get("initial_spec")
777            .ok_or(OrchestratorError::InvalidConfig(
778                "Missing `initial_spec` field in zombie.json".into(),
779            ))?
780            .clone(),
781    )?;
782
783    // Populate relay nodes
784    let nodes =
785        recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
786    relay_raw.inner.nodes = nodes;
787
788    Ok((relay_raw.inner, initial_spec))
789}
790
791async fn recreate_parachains_from_json(
792    zombie_json: &serde_json::Value,
793    ns: DynNamespace,
794    provider_name: &str,
795) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
796    let paras_json = zombie_json
797        .get("parachains")
798        .ok_or(OrchestratorError::InvalidConfig(
799            "Missing `parachains` field in zombie.json".into(),
800        ))?
801        .clone();
802
803    let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
804
805    let mut parachains_map = HashMap::new();
806
807    for (id, parachain_entries) in raw_paras {
808        let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
809
810        for raw_para in parachain_entries {
811            let mut para = raw_para.inner;
812            para.collators =
813                recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
814                    .await?;
815            parsed_vec.push(para);
816        }
817
818        parachains_map.insert(id, parsed_vec);
819    }
820
821    Ok(parachains_map)
822}
823
824// Split the node list depending if it's bootnode or not
825// NOTE: if there isn't a bootnode declared we use the first one
826fn split_nodes_by_bootnodes(
827    nodes: &[NodeSpec],
828    no_default_bootnodes: bool,
829) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
830    // get the bootnodes to spawn first and calculate the bootnode string for use later
831    let mut bootnodes = vec![];
832    let mut other_nodes = vec![];
833    nodes.iter().for_each(|node| {
834        if node.is_bootnode {
835            bootnodes.push(node)
836        } else {
837            other_nodes.push(node)
838        }
839    });
840
841    if bootnodes.is_empty() && !no_default_bootnodes {
842        bootnodes.push(other_nodes.remove(0))
843    }
844
845    (bootnodes, other_nodes)
846}
847
848// Generate a bootnode multiaddress and return as string
849fn generate_bootnode_addr(
850    node: &NetworkNode,
851    ip: &IpAddr,
852    port: u16,
853) -> Result<String, GeneratorError> {
854    generators::generate_node_bootnode_addr(
855        &node.spec.peer_id,
856        ip,
857        port,
858        node.inner.args().as_ref(),
859        &node.spec.p2p_cert_hash,
860    )
861}
862// Validate that the config fulfill all the requirements of the provider
863fn validate_spec_with_provider_capabilities(
864    network_spec: &NetworkSpec,
865    capabilities: &ProviderCapabilities,
866) -> Result<(), anyhow::Error> {
867    let mut errs: Vec<String> = vec![];
868
869    if capabilities.requires_image {
870        // Relaychain
871        if network_spec.relaychain.default_image.is_none() {
872            // we should check if each node have an image
873            let nodes = &network_spec.relaychain.nodes;
874            if nodes.iter().any(|node| node.image.is_none()) {
875                errs.push(String::from(
876                    "Missing image for node, and not default is set at relaychain",
877                ));
878            }
879        };
880
881        // Paras
882        for para in &network_spec.parachains {
883            if para.default_image.is_none() {
884                let nodes = &para.collators;
885                if nodes.iter().any(|node| node.image.is_none()) {
886                    errs.push(format!(
887                        "Missing image for node, and not default is set at parachain {}",
888                        para.id
889                    ));
890                }
891            }
892        }
893    } else {
894        // native
895        // We need to get all the `cmds` and verify if are part of the path
896        let mut cmds: HashSet<&str> = Default::default();
897        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
898            cmds.insert(cmd.as_str());
899        }
900        for node in network_spec.relaychain().nodes.iter() {
901            cmds.insert(node.command());
902        }
903
904        // Paras
905        for para in &network_spec.parachains {
906            if let Some(cmd) = para.default_command.as_ref() {
907                cmds.insert(cmd.as_str());
908            }
909
910            for node in para.collators.iter() {
911                cmds.insert(node.command());
912            }
913        }
914
915        // now check the binaries
916        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
917        trace!("current PATH: {path}");
918        let parts: Vec<_> = path.split(":").collect();
919        for cmd in cmds {
920            let missing = if cmd.contains('/') {
921                trace!("checking {cmd}");
922                if std::fs::metadata(cmd).is_err() {
923                    true
924                } else {
925                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
926                    false
927                }
928            } else {
929                // should be in the PATH
930                !parts.iter().any(|part| {
931                    let path_to = format!("{part}/{cmd}");
932                    trace!("checking {path_to}");
933                    let check_result = std::fs::metadata(&path_to);
934                    trace!("result {:?}", check_result);
935                    if check_result.is_ok() {
936                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
937                        true
938                    } else {
939                        false
940                    }
941                })
942            };
943
944            if missing {
945                errs.push(help_msg(cmd));
946            }
947        }
948    }
949
950    if !errs.is_empty() {
951        let msg = errs.join("\n");
952        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
953    }
954
955    Ok(())
956}
957
958fn help_msg(cmd: &str) -> String {
959    match cmd {
960        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
961            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
962        },
963        "polkadot" => {
964            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
965        },
966        "polkadot-parachain" => {
967            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
968        },
969        _ => {
970            format!("Missing binary {cmd}, please compile it.")
971        },
972    }
973}
974
975/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
976fn spawn_concurrency_from_env() -> Option<usize> {
977    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
978        concurrency.parse::<usize>().ok()
979    } else {
980        None
981    }
982}
983
984fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
985    let desired_spawn_concurrency = match (
986        spawn_concurrency_from_env(),
987        spec.global_settings.spawn_concurrency(),
988    ) {
989        (Some(n), _) => Some(n),
990        (None, Some(n)) => Some(n),
991        _ => None,
992    };
993
994    let (spawn_concurrency, limited_by_tokens) =
995        if let Some(spawn_concurrency) = desired_spawn_concurrency {
996            if spawn_concurrency == 1 {
997                (1, false)
998            } else if has_tokens(&serde_json::to_string(spec)?) {
999                (1, true)
1000            } else {
1001                (spawn_concurrency, false)
1002            }
1003        } else {
1004            // not set
1005            if has_tokens(&serde_json::to_string(spec)?) {
1006                (1, true)
1007            } else {
1008                // use 100 as max concurrency, we can set a max by provider later
1009                (100, false)
1010            }
1011        };
1012
1013    Ok((spawn_concurrency, limited_by_tokens))
1014}
1015
1016/// Build deterministic dependency **levels** among the given nodes.
1017/// - Only dependencies **between nodes in `nodes`** are considered.
1018/// - Unknown/out-of-scope references are ignored.
1019/// - Self-dependencies are ignored.
1020fn dependency_levels_among<'a>(
1021    nodes: &'a [&'a NodeSpec],
1022) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
1023    let by_name = nodes
1024        .iter()
1025        .map(|n| (n.name.as_str(), *n))
1026        .collect::<HashMap<_, _>>();
1027
1028    let mut graph = HashMap::with_capacity(nodes.len());
1029    let mut indegree = HashMap::with_capacity(nodes.len());
1030
1031    for node in nodes {
1032        graph.insert(node.name.as_str(), Vec::new());
1033        indegree.insert(node.name.as_str(), 0);
1034    }
1035
1036    // build dependency graph
1037    for &node in nodes {
1038        if let Ok(args_json) = serde_json::to_string(&node.args) {
1039            // collect dependencies
1040            let unique_deps = get_tokens_to_replace(&args_json)
1041                .into_iter()
1042                .filter(|dep| dep != &node.name)
1043                .filter_map(|dep| by_name.get(dep.as_str()))
1044                .map(|&dep_node| dep_node.name.as_str())
1045                .collect::<HashSet<_>>();
1046
1047            for dep_name in unique_deps {
1048                graph
1049                    .get_mut(dep_name)
1050                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
1051                    .push(node);
1052                *indegree
1053                    .get_mut(node.name.as_str())
1054                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
1055            }
1056        }
1057    }
1058
1059    // find all nodes with no dependencies
1060    let mut queue = nodes
1061        .iter()
1062        .filter(|n| {
1063            *indegree
1064                .get(n.name.as_str())
1065                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1066                == 0
1067        })
1068        .copied()
1069        .collect::<VecDeque<_>>();
1070
1071    let mut processed_count = 0;
1072    let mut levels = Vec::new();
1073
1074    // Kahn's algorithm
1075    while !queue.is_empty() {
1076        let level_size = queue.len();
1077        let mut current_level = Vec::with_capacity(level_size);
1078
1079        for _ in 0..level_size {
1080            let n = queue
1081                .pop_front()
1082                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
1083            current_level.push(n);
1084            processed_count += 1;
1085
1086            for &neighbour in graph
1087                .get(n.name.as_str())
1088                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1089            {
1090                let neighbour_indegree = indegree
1091                    .get_mut(neighbour.name.as_str())
1092                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
1093                *neighbour_indegree -= 1;
1094
1095                if *neighbour_indegree == 0 {
1096                    queue.push_back(neighbour);
1097                }
1098            }
1099        }
1100
1101        current_level.sort_by_key(|n| &n.name);
1102        levels.push(current_level);
1103    }
1104
1105    // cycles detected, e.g A -> B -> A
1106    if processed_count != nodes.len() {
1107        return Err(OrchestratorError::InvalidConfig(
1108            "Tokens have cyclical dependencies".to_string(),
1109        ));
1110    }
1111
1112    Ok(levels)
1113}
1114
1115// TODO: get the fs from `DynNamespace` will make this not needed
1116// but the FileSystem trait isn't object-safe so we can't pass around
1117// as `dyn FileSystem`. We can refactor or using some `erase` techniques
1118// to resolve this and remove this struct
1119// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
1120// filesystem itself (the trait), so it will return this and we can move this
1121// directly to the support crate, it can be useful in the future
1122#[derive(Clone, Debug)]
1123pub struct ScopedFilesystem<'a, FS: FileSystem> {
1124    fs: &'a FS,
1125    base_dir: &'a str,
1126}
1127
1128impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
1129    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
1130        Self { fs, base_dir }
1131    }
1132
1133    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
1134        for file in files {
1135            let full_remote_path = PathBuf::from(format!(
1136                "{}/{}",
1137                self.base_dir,
1138                file.remote_path.to_string_lossy()
1139            ));
1140            trace!("coping file: {file}");
1141            self.fs
1142                .copy(file.local_path.as_path(), full_remote_path)
1143                .await?;
1144        }
1145        Ok(())
1146    }
1147
1148    async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
1149        let file = file.as_ref();
1150
1151        let full_path = if file.is_absolute() {
1152            file.to_owned()
1153        } else {
1154            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1155        };
1156        let content = self.fs.read(full_path).await?;
1157        Ok(content)
1158    }
1159
1160    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1161        let file = file.as_ref();
1162
1163        let full_path = if file.is_absolute() {
1164            file.to_owned()
1165        } else {
1166            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1167        };
1168        let content = self.fs.read_to_string(full_path).await?;
1169        Ok(content)
1170    }
1171
1172    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1173        let path = PathBuf::from(format!(
1174            "{}/{}",
1175            self.base_dir,
1176            path.as_ref().to_string_lossy()
1177        ));
1178        self.fs.create_dir(path).await
1179    }
1180
1181    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1182        let path = PathBuf::from(format!(
1183            "{}/{}",
1184            self.base_dir,
1185            path.as_ref().to_string_lossy()
1186        ));
1187        self.fs.create_dir_all(path).await
1188    }
1189
1190    async fn write(
1191        &self,
1192        path: impl AsRef<Path>,
1193        contents: impl AsRef<[u8]> + Send,
1194    ) -> Result<(), FileSystemError> {
1195        let path = path.as_ref();
1196
1197        let full_path = if path.is_absolute() {
1198            path.to_owned()
1199        } else {
1200            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1201        };
1202
1203        self.fs.write(full_path, contents).await
1204    }
1205
1206    /// Get the full_path in the scoped FS
1207    fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1208        let path = path.as_ref();
1209
1210        let full_path = if path.is_absolute() {
1211            path.to_owned()
1212        } else {
1213            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1214        };
1215
1216        full_path
1217    }
1218
1219    /// Get the base_dir in the scoped FS
1220    fn base_dir(&self) -> &str {
1221        self.base_dir
1222    }
1223}
1224
1225#[derive(Clone, Debug)]
1226pub enum ZombieRole {
1227    Temp,
1228    Node,
1229    Bootnode,
1230    Collator,
1231    CumulusCollator,
1232    Companion,
1233}
1234
1235// re-exports
1236pub use network::{AddCollatorOptions, AddNodeOptions};
1237pub use network_helper::metrics;
1238pub use sc_chain_spec;
1239
1240#[cfg(test)]
1241mod tests {
1242    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1243    use lazy_static::lazy_static;
1244    use tokio::sync::Mutex;
1245
1246    use super::*;
1247
1248    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1249    // mutex for test that use env
1250    lazy_static! {
1251        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1252    }
1253
1254    fn set_env(concurrency: Option<u32>) {
1255        if let Some(value) = concurrency {
1256            env::set_var(ENV_KEY, value.to_string());
1257        } else {
1258            env::remove_var(ENV_KEY);
1259        }
1260    }
1261
1262    fn generate(
1263        with_image: bool,
1264        with_cmd: Option<&'static str>,
1265    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1266        NetworkConfigBuilder::new()
1267            .with_relaychain(|r| {
1268                let mut relay = r
1269                    .with_chain("rococo-local")
1270                    .with_default_command(with_cmd.unwrap_or("polkadot"));
1271                if with_image {
1272                    relay = relay.with_default_image("docker.io/parity/polkadot")
1273                }
1274
1275                relay
1276                    .with_validator(|node| node.with_name("alice"))
1277                    .with_validator(|node| node.with_name("bob"))
1278            })
1279            .with_parachain(|p| {
1280                p.with_id(2000).cumulus_based(true).with_collator(|n| {
1281                    let node = n
1282                        .with_name("collator")
1283                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1284                    if with_image {
1285                        node.with_image("docker.io/paritypr/test-parachain")
1286                    } else {
1287                        node
1288                    }
1289                })
1290            })
1291            .build()
1292    }
1293
1294    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1295        let mut spec = NodeSpec {
1296            name: name.to_string(),
1297            ..Default::default()
1298        };
1299        if let Some(dependencies) = dependencies {
1300            for node in dependencies {
1301                spec.args.push(
1302                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1303                        .as_str()
1304                        .into(),
1305                );
1306            }
1307        }
1308        spec
1309    }
1310
1311    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1312        actual_levels
1313            .iter()
1314            .zip(expected_levels)
1315            .for_each(|(actual_level, expected_level)| {
1316                assert_eq!(actual_level.len(), expected_level.len());
1317                actual_level
1318                    .iter()
1319                    .zip(expected_level.iter())
1320                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1321            });
1322    }
1323
1324    #[tokio::test]
1325    async fn valid_config_with_image() {
1326        let network_config = generate(true, None).unwrap();
1327        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1328        let caps = ProviderCapabilities {
1329            requires_image: true,
1330            has_resources: false,
1331            prefix_with_full_path: false,
1332            use_default_ports_in_cmd: false,
1333        };
1334
1335        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1336        assert!(valid.is_ok())
1337    }
1338
1339    #[tokio::test]
1340    async fn invalid_config_without_image() {
1341        let network_config = generate(false, None).unwrap();
1342        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1343        let caps = ProviderCapabilities {
1344            requires_image: true,
1345            has_resources: false,
1346            prefix_with_full_path: false,
1347            use_default_ports_in_cmd: false,
1348        };
1349
1350        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1351        assert!(valid.is_err())
1352    }
1353
1354    #[tokio::test]
1355    async fn invalid_config_missing_cmd() {
1356        let network_config = generate(false, Some("other")).unwrap();
1357        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1358        let caps = ProviderCapabilities {
1359            requires_image: false,
1360            has_resources: false,
1361            prefix_with_full_path: false,
1362            use_default_ports_in_cmd: false,
1363        };
1364
1365        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1366        assert!(valid.is_err())
1367    }
1368
1369    #[tokio::test]
1370    async fn valid_config_present_cmd() {
1371        let network_config = generate(false, Some("cargo")).unwrap();
1372        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1373        let caps = ProviderCapabilities {
1374            requires_image: false,
1375            has_resources: false,
1376            prefix_with_full_path: false,
1377            use_default_ports_in_cmd: false,
1378        };
1379
1380        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1381        println!("{valid:?}");
1382        assert!(valid.is_ok())
1383    }
1384
1385    #[tokio::test]
1386    async fn default_spawn_concurrency() {
1387        let _g = ENV_MUTEX.lock().await;
1388        set_env(None);
1389        let network_config = generate(false, Some("cargo")).unwrap();
1390        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1391        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1392        assert_eq!(concurrency, 100);
1393    }
1394
1395    #[tokio::test]
1396    async fn set_spawn_concurrency() {
1397        let _g = ENV_MUTEX.lock().await;
1398        set_env(None);
1399
1400        let network_config = generate(false, Some("cargo")).unwrap();
1401        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1402
1403        let global_settings = GlobalSettingsBuilder::new()
1404            .with_spawn_concurrency(4)
1405            .build()
1406            .unwrap();
1407
1408        spec.set_global_settings(global_settings);
1409        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1410        assert_eq!(concurrency, 4);
1411        assert!(!limited);
1412    }
1413
1414    #[tokio::test]
1415    async fn set_spawn_concurrency_but_limited() {
1416        let _g = ENV_MUTEX.lock().await;
1417        set_env(None);
1418
1419        let network_config = generate(false, Some("cargo")).unwrap();
1420        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1421
1422        let global_settings = GlobalSettingsBuilder::new()
1423            .with_spawn_concurrency(4)
1424            .build()
1425            .unwrap();
1426
1427        spec.set_global_settings(global_settings);
1428        let node = spec.relaychain.nodes.first_mut().unwrap();
1429        node.args
1430            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1431        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1432        assert_eq!(concurrency, 1);
1433        assert!(limited);
1434    }
1435
1436    #[tokio::test]
1437    async fn set_spawn_concurrency_from_env() {
1438        let _g = ENV_MUTEX.lock().await;
1439        set_env(Some(10));
1440
1441        let network_config = generate(false, Some("cargo")).unwrap();
1442        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1443        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1444        assert_eq!(concurrency, 10);
1445        assert!(!limited);
1446    }
1447
1448    #[tokio::test]
1449    async fn set_spawn_concurrency_from_env_but_limited() {
1450        let _g = ENV_MUTEX.lock().await;
1451        set_env(Some(12));
1452
1453        let network_config = generate(false, Some("cargo")).unwrap();
1454        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1455        let node = spec.relaychain.nodes.first_mut().unwrap();
1456        node.args
1457            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1458        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1459        assert_eq!(concurrency, 1);
1460        assert!(limited);
1461    }
1462
1463    #[test]
1464    fn dependency_levels_among_should_work() {
1465        // no nodes
1466        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1467
1468        // one node
1469        let alice = get_node_with_dependencies("alice", None);
1470        let nodes = [&alice];
1471
1472        let levels = dependency_levels_among(&nodes).unwrap();
1473        let expected = vec![vec!["alice"]];
1474
1475        verify_levels(levels, expected);
1476
1477        // two independent nodes
1478        let alice = get_node_with_dependencies("alice", None);
1479        let bob = get_node_with_dependencies("bob", None);
1480        let nodes = [&alice, &bob];
1481
1482        let levels = dependency_levels_among(&nodes).unwrap();
1483        let expected = vec![vec!["alice", "bob"]];
1484
1485        verify_levels(levels, expected);
1486
1487        // alice -> bob -> charlie
1488        let alice = get_node_with_dependencies("alice", None);
1489        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1490        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1491        let nodes = [&alice, &bob, &charlie];
1492
1493        let levels = dependency_levels_among(&nodes).unwrap();
1494        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1495
1496        verify_levels(levels, expected);
1497
1498        //         ┌─> bob
1499        // alice ──|
1500        //         └─> charlie
1501        let alice = get_node_with_dependencies("alice", None);
1502        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1503        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1504        let nodes = [&alice, &bob, &charlie];
1505
1506        let levels = dependency_levels_among(&nodes).unwrap();
1507        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1508
1509        verify_levels(levels, expected);
1510
1511        //         ┌─>   bob  ──┐
1512        // alice ──|            ├─> dave
1513        //         └─> charlie  ┘
1514        let alice = get_node_with_dependencies("alice", None);
1515        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1516        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1517        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1518        let nodes = [&alice, &bob, &charlie, &dave];
1519
1520        let levels = dependency_levels_among(&nodes).unwrap();
1521        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1522
1523        verify_levels(levels, expected);
1524    }
1525
1526    #[test]
1527    fn dependency_levels_among_should_detect_cycles() {
1528        let mut alice = get_node_with_dependencies("alice", None);
1529        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1530        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1531
1532        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1533    }
1534}