zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    env,
19    net::IpAddr,
20    path::{Path, PathBuf},
21    time::{Duration, SystemTime},
22};
23
24use anyhow::anyhow;
25use configuration::{types::JsonOverrides, NetworkConfig, RegistrationStrategy};
26use errors::OrchestratorError;
27use generators::{core_assignment, errors::GeneratorError};
28use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
29// re-exported
30pub use network_spec::NetworkSpec;
31use network_spec::{node::NodeSpec, parachain::ParachainSpec};
32use provider::{
33    types::{ProviderCapabilities, TransferedFile},
34    DynNamespace, DynProvider,
35};
36use serde_json::json;
37use support::{
38    constants::{
39        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
40        THIS_IS_A_BUG,
41    },
42    fs::{FileSystem, FileSystemError},
43    replacer::{get_tokens_to_replace, has_tokens},
44};
45use tokio::time::timeout;
46use tracing::{debug, info, trace, warn};
47
48use crate::{
49    network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
50    shared::types::RegisterParachainOptions,
51    spawner::SpawnNodeCtx,
52    utils::write_zombie_json,
53};
54pub struct Orchestrator<T>
55where
56    T: FileSystem + Sync + Send,
57{
58    filesystem: T,
59    provider: DynProvider,
60}
61
62impl<T> Orchestrator<T>
63where
64    T: FileSystem + Sync + Send + Clone,
65{
66    pub fn new(filesystem: T, provider: DynProvider) -> Self {
67        Self {
68            filesystem,
69            provider,
70        }
71    }
72
73    pub async fn spawn(
74        &self,
75        network_config: NetworkConfig,
76    ) -> Result<Network<T>, OrchestratorError> {
77        let global_timeout = network_config.global_settings().network_spawn_timeout();
78        let network_spec = NetworkSpec::from_config(&network_config).await?;
79
80        let res = timeout(
81            Duration::from_secs(global_timeout.into()),
82            self.spawn_inner(network_spec),
83        )
84        .await
85        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86        res?
87    }
88
89    pub async fn spawn_from_spec(
90        &self,
91        network_spec: NetworkSpec,
92    ) -> Result<Network<T>, OrchestratorError> {
93        let global_timeout = network_spec.global_settings.network_spawn_timeout();
94        let res = timeout(
95            Duration::from_secs(global_timeout as u64),
96            self.spawn_inner(network_spec),
97        )
98        .await
99        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
100        res?
101    }
102
103    pub async fn attach_to_live(
104        &self,
105        zombie_json_path: &Path,
106    ) -> Result<Network<T>, OrchestratorError> {
107        info!("attaching to live network...");
108        info!("reading zombie.json from {:?}", zombie_json_path);
109
110        let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
111        let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
112
113        info!("recreating namespace...");
114        let ns: DynNamespace = self
115            .provider
116            .create_namespace_from_json(&zombie_json)
117            .await?;
118
119        info!("recreating relaychain...");
120        let (relay, initial_spec) =
121            recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
122        let relay_nodes = relay.nodes.clone();
123
124        let mut network =
125            Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
126
127        for node in relay_nodes {
128            if node.is_responsive().await {
129                node.set_is_running(true);
130            }
131            network.insert_node(node);
132        }
133
134        info!("recreating parachains...");
135        let parachains_map =
136            recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
137        let para_nodes = parachains_map
138            .values()
139            .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
140            .collect::<Vec<NetworkNode>>();
141
142        network.set_parachains(parachains_map);
143        for node in para_nodes {
144            if node.is_responsive().await {
145                node.set_is_running(true);
146            }
147            network.insert_node(node);
148        }
149
150        Ok(network)
151    }
152
153    async fn spawn_inner(
154        &self,
155        mut network_spec: NetworkSpec,
156    ) -> Result<Network<T>, OrchestratorError> {
157        // main driver for spawn the network
158        debug!(network_spec = ?network_spec,"Network spec to spawn");
159
160        // TODO: move to Provider trait
161        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
162            .map_err(|err| {
163                OrchestratorError::InvalidConfigForProvider(
164                    self.provider.name().into(),
165                    err.to_string(),
166                )
167            })?;
168
169        // create namespace
170        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
171            self.provider
172                .create_namespace_with_base_dir(base_dir)
173                .await?
174        } else {
175            self.provider.create_namespace().await?
176        };
177
178        // set the spawn_concurrency
179        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
180
181        let start_time = SystemTime::now();
182        info!("🧰 ns: {}", ns.name());
183        info!("🧰 base_dir: {:?}", ns.base_dir());
184        info!("🕰 start time: {:?}", start_time);
185        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
186
187        network_spec
188            .populate_nodes_available_args(ns.clone())
189            .await?;
190
191        let base_dir = ns.base_dir().to_string_lossy();
192        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
193        // Create chain-spec for relaychain
194        network_spec
195            .relaychain
196            .chain_spec
197            .build(&ns, &scoped_fs)
198            .await?;
199
200        debug!("relaychain spec built!");
201        // Create parachain artifacts (chain-spec, wasm, state)
202        let relay_chain_id = network_spec
203            .relaychain
204            .chain_spec
205            .read_chain_id(&scoped_fs)
206            .await?;
207
208        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
209        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
210        network_spec
211            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
212            .await?;
213
214        // Gather the parachains to register in genesis and the ones to register with extrinsic
215        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
216            Vec<&ParachainSpec>,
217            Vec<&ParachainSpec>,
218        ) = network_spec
219            .parachains
220            .iter()
221            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
222            .partition(|para| {
223                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
224            });
225
226        let mut para_artifacts = vec![];
227        for para in &para_to_register_in_genesis {
228            let genesis_config = para.get_genesis_config()?;
229            para_artifacts.push(genesis_config)
230        }
231
232        // Customize relaychain
233        network_spec
234            .relaychain
235            .chain_spec
236            .customize_relay(
237                &network_spec.relaychain,
238                &network_spec.hrmp_channels,
239                para_artifacts,
240                &scoped_fs,
241            )
242            .await?;
243
244        // Run post-process script if configured for the relaychain (run against plain spec before building raw)
245        if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
246            network_spec
247                .relaychain
248                .chain_spec
249                .run_post_process_script(script_cmd, &scoped_fs)
250                .await?;
251        }
252
253        // Override cores if needed
254        let num_cores = network_spec.parachains_iter().fold(0u32, |mut acc, para| {
255            if let Some(cores) = para.num_cores {
256                acc += cores;
257            } else if &RegistrationStrategy::InGenesis == para.registration_strategy() {
258                // add 1 by default
259                acc += 1;
260            }
261
262            acc
263        });
264
265        if num_cores > para_to_register_in_genesis.len() as u32 {
266            let num_cores_to_set = num_cores - para_to_register_in_genesis.len() as u32;
267            // we should set the correct core config
268            let overrides = json!({
269                "configuration": {
270                    "config": {
271                        "scheduler_params": {
272                            "num_cores": num_cores_to_set,
273                            "max_validators_per_core": 1
274                        },
275                    }
276                }
277            });
278
279            network_spec
280                .relaychain
281                .chain_spec
282                .apply_genesis_override(&scoped_fs, &overrides)
283                .await?;
284        }
285
286        // Build raw version (after any post-processing of the plain spec)
287        network_spec
288            .relaychain
289            .chain_spec
290            .build_raw(&ns, &scoped_fs, None)
291            .await?;
292
293        // override wasm if needed
294        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
295            network_spec
296                .relaychain
297                .chain_spec
298                .override_code(&scoped_fs, wasm_override)
299                .await?;
300        }
301
302        // custom override raw spec if needed
303        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
304            network_spec
305                .relaychain
306                .chain_spec
307                .override_raw_spec(&scoped_fs, raw_spec_override)
308                .await?;
309        }
310
311        // assign extra cores if needed
312        debug!("Raw overrides info: num_cores: {}, para_to_register_in_genesis_len: {:?}, override_session_0: {}", num_cores, para_to_register_in_genesis.len(), network_spec.relaychain().override_session_0);
313        if num_cores > para_to_register_in_genesis.len() as u32
314            || network_spec.relaychain().override_session_0
315        {
316            let mut core_index = 0u32;
317            // we should check with version the runtime is using
318            // could be ParaScheduler or CoretimeAssignmentProvider
319            let scheduler_key = core_assignment::get_parascheduler_storage_key();
320            let is_old = !network_spec
321                .relaychain
322                .chain_spec
323                .find_raw_key(&scoped_fs, &scheduler_key)
324                .await?;
325
326            let mut para_scheduler_value_parts: Vec<String> = vec![];
327            let mut raw_json_overrides = json!({});
328            // loop over para and assign cores from 0..
329            for para in &network_spec.parachains {
330                // cores we need to assign
331                let mut cores_for_para = 0_u32;
332                if let Some(cores) = para.num_cores {
333                    if &RegistrationStrategy::InGenesis == para.registration_strategy() {
334                        cores_for_para = cores;
335                    }
336                } else {
337                    // no num_cores set but we need to check if `override_session_0` is true
338                    // to assign the first core.
339                    if &RegistrationStrategy::InGenesis == para.registration_strategy()
340                        && network_spec.relaychain().override_session_0
341                    {
342                        cores_for_para += 1;
343                    }
344                }
345
346                debug!(
347                    "Assigning {cores_for_para} cores in raw spec for para {}. Using pallet {}",
348                    para.id,
349                    if is_old {
350                        "CoretimeAssignmentProvider"
351                    } else {
352                        "ParaScheduler"
353                    }
354                );
355                for _core in 0..cores_for_para {
356                    if is_old {
357                        let (core_assign_key, core_assign_value) =
358                            core_assignment::generate_old(core_index, para.id);
359                        raw_json_overrides[core_assign_key] = json!(core_assign_value);
360                    } else {
361                        let part = core_assignment::generate(core_index, para.id);
362                        para_scheduler_value_parts.push(part);
363                    }
364                    core_index += 1;
365                }
366            }
367
368            // if not old we need to store the k/v to override
369            if !is_old {
370                let count_prefix = format!("{:02x}", para_scheduler_value_parts.len() * 4);
371                let core_assign_value =
372                    format!("{count_prefix}{}", para_scheduler_value_parts.join(""));
373                raw_json_overrides[scheduler_key] = json!(core_assign_value);
374            }
375
376            // extra check to ensure we need to override session 0
377            if network_spec.relaychain().override_session_0 {
378                trace!("Overriding pallet ParaSessionInfo.session (0) to allow paras to produce blocks at first session.");
379                let raw_spec = network_spec
380                    .relaychain
381                    .chain_spec
382                    .read_raw_spec(&scoped_fs)
383                    .await?;
384                let overrides = generators::generate_session_0_overrides(&raw_spec, num_cores)?;
385
386                for (k, v) in overrides.as_object().ok_or(anyhow!(
387                    "'generate_session_0_overrides' should be a valid json Object."
388                ))? {
389                    raw_json_overrides[k] = v.clone();
390                }
391            }
392
393            debug!("Raw overrides keys: {:?}", raw_json_overrides);
394
395            network_spec
396                .relaychain
397                .chain_spec
398                .override_raw_spec(
399                    &scoped_fs,
400                    &JsonOverrides::Json(json!({
401                        "genesis": {
402                            "raw": {
403                                "top": raw_json_overrides
404                            }
405                        }
406                    })),
407                )
408                .await?;
409        }
410
411        let (bootnodes, relaynodes) =
412            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
413
414        // TODO: we want to still supporting spawn a dedicated bootnode??
415        let mut ctx = SpawnNodeCtx {
416            chain_id: &relay_chain_id,
417            parachain_id: None,
418            chain: relay_chain_name.as_str(),
419            role: ZombieRole::Node,
420            ns: &ns,
421            scoped_fs: &scoped_fs,
422            parachain: None,
423            bootnodes_addr: &vec![],
424            wait_ready: false,
425            nodes_by_name: json!({}),
426            global_settings: &network_spec.global_settings,
427        };
428
429        let global_files_to_inject = vec![TransferedFile::new(
430            PathBuf::from(format!(
431                "{}/{relay_chain_name}.json",
432                ns.base_dir().to_string_lossy()
433            )),
434            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
435        )];
436
437        let r = Relaychain::new(
438            relay_chain_name.to_string(),
439            relay_chain_id.clone(),
440            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
441                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
442            )?),
443        );
444        let mut network =
445            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
446
447        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
448        let mut node_ws_url: String = "".to_string();
449
450        // Calculate the bootnodes addr from the running nodes
451        let mut bootnodes_addr: Vec<String> = vec![];
452
453        for level in dependency_levels_among(&bootnodes)? {
454            let mut running_nodes_per_level = vec![];
455            for chunk in level.chunks(spawn_concurrency) {
456                let spawning_tasks = chunk
457                    .iter()
458                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
459
460                for node in futures::future::try_join_all(spawning_tasks).await? {
461                    let bootnode_multiaddr = node.multiaddr();
462
463                    bootnodes_addr.push(bootnode_multiaddr.to_string());
464
465                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
466                    if node_ws_url.is_empty() {
467                        node_ws_url.clone_from(&node.ws_uri)
468                    }
469
470                    running_nodes_per_level.push(node);
471                }
472            }
473            info!(
474                "🕰  waiting for level: {:?} to be up...",
475                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
476            );
477
478            // Wait for all nodes in the current level to be up
479            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
480                node.wait_until_is_up(network_spec.global_settings.node_spawn_timeout())
481            });
482
483            let _ = futures::future::try_join_all(waiting_tasks).await?;
484
485            for node in running_nodes_per_level {
486                // Add the node to the  context and `Network` instance
487                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
488                network.add_running_node(node, None).await;
489            }
490        }
491
492        // Add the bootnodes to the relaychain spec file and ctx
493        network_spec
494            .relaychain
495            .chain_spec
496            .add_bootnodes(&scoped_fs, &bootnodes_addr)
497            .await?;
498
499        ctx.bootnodes_addr = &bootnodes_addr;
500
501        for level in dependency_levels_among(&relaynodes)? {
502            let mut running_nodes_per_level = vec![];
503            for chunk in level.chunks(spawn_concurrency) {
504                let spawning_tasks = chunk
505                    .iter()
506                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
507
508                for node in futures::future::try_join_all(spawning_tasks).await? {
509                    running_nodes_per_level.push(node);
510                }
511            }
512            info!(
513                "🕰 waiting for level: {:?} to be up...",
514                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
515            );
516
517            // Wait for all nodes in the current level to be up
518            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
519                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
520            });
521
522            let _ = futures::future::try_join_all(waiting_tasks).await?;
523
524            for node in running_nodes_per_level {
525                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
526                network.add_running_node(node, None).await;
527            }
528        }
529
530        // spawn paras
531        for para in network_spec.parachains.iter() {
532            // Create parachain (in the context of the running network)
533            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
534            let parachain_id = parachain.chain_id.clone();
535
536            let (bootnodes, collators) =
537                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
538
539            // Create `ctx` for spawn parachain nodes
540            let mut ctx_para = SpawnNodeCtx {
541                parachain: Some(para),
542                parachain_id: parachain_id.as_deref(),
543                role: if para.is_cumulus_based {
544                    ZombieRole::CumulusCollator
545                } else {
546                    ZombieRole::Collator
547                },
548                bootnodes_addr: &vec![],
549                ..ctx.clone()
550            };
551
552            // Calculate the bootnodes addr from the running nodes
553            let mut bootnodes_addr: Vec<String> = vec![];
554            let mut running_nodes: Vec<NetworkNode> = vec![];
555
556            for level in dependency_levels_among(&bootnodes)? {
557                let mut running_nodes_per_level = vec![];
558                for chunk in level.chunks(spawn_concurrency) {
559                    let spawning_tasks = chunk.iter().map(|node| {
560                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
561                    });
562
563                    for node in futures::future::try_join_all(spawning_tasks).await? {
564                        let bootnode_multiaddr = node.multiaddr();
565
566                        bootnodes_addr.push(bootnode_multiaddr.to_string());
567
568                        running_nodes_per_level.push(node);
569                    }
570                }
571                info!(
572                    "🕰  waiting for level: {:?} to be up...",
573                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
574                );
575
576                // Wait for all nodes in the current level to be up
577                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
578                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
579                });
580
581                let _ = futures::future::try_join_all(waiting_tasks).await?;
582
583                for node in running_nodes_per_level {
584                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
585                    running_nodes.push(node);
586                }
587            }
588
589            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
590                para_chain_spec
591                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
592                    .await?;
593            }
594
595            ctx_para.bootnodes_addr = &bootnodes_addr;
596
597            // Spawn the rest of the nodes
598            for level in dependency_levels_among(&collators)? {
599                let mut running_nodes_per_level = vec![];
600                for chunk in level.chunks(spawn_concurrency) {
601                    let spawning_tasks = chunk.iter().map(|node| {
602                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
603                    });
604
605                    for node in futures::future::try_join_all(spawning_tasks).await? {
606                        running_nodes_per_level.push(node);
607                    }
608                }
609                info!(
610                    "🕰  waiting for level: {:?} to be up...",
611                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
612                );
613
614                // Wait for all nodes in the current level to be up
615                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
616                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
617                });
618
619                let _ = futures::future::try_join_all(waiting_tasks).await?;
620
621                for node in running_nodes_per_level {
622                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
623                    running_nodes.push(node);
624                }
625            }
626
627            let running_para_id = parachain.para_id;
628            network.add_para(parachain);
629            for node in running_nodes {
630                network.add_running_node(node, Some(running_para_id)).await;
631            }
632        }
633
634        // Now we need to register the paras with extrinsic from the Vec collected before;
635        for para in para_to_register_with_extrinsic {
636            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
637                id: para.id,
638                // This needs to resolve correctly
639                wasm_path: para
640                    .genesis_wasm
641                    .artifact_path()
642                    .ok_or(OrchestratorError::InvariantError(
643                        "artifact path for wasm must be set at this point",
644                    ))?
645                    .to_path_buf(),
646                state_path: para
647                    .genesis_state
648                    .artifact_path()
649                    .ok_or(OrchestratorError::InvariantError(
650                        "artifact path for state must be set at this point",
651                    ))?
652                    .to_path_buf(),
653                node_ws_url: node_ws_url.clone(),
654                onboard_as_para: para.onboard_as_parachain,
655                seed: None, // TODO: Seed is passed by?
656                finalization: false,
657            };
658
659            Parachain::register(register_para_options, &scoped_fs).await?;
660        }
661
662        if network_spec.global_settings.observability().enabled() {
663            match network
664                .start_observability(network_spec.global_settings.observability())
665                .await
666            {
667                Ok(obs) => {
668                    info!("📊 Prometheus URL: {}", obs.prometheus_url);
669                    info!("📊 Grafana URL: {}", obs.grafana_url);
670                },
671                Err(e) => {
672                    warn!("⚠️  Failed to spawn observability stack: {e}");
673                },
674            }
675        }
676
677        // start custom processes if needed
678        for cp in &network_spec.custom_processes {
679            if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
680                warn!("⚠️  Failed to spawn custom process {}, err: {e}", cp.name())
681            }
682        }
683
684        network.set_start_time_ts(start_time);
685
686        write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
687
688        if network_spec.global_settings.tear_down_on_failure() {
689            network.spawn_watching_task();
690        }
691
692        Ok(network)
693    }
694}
695
696// Helpers
697
698async fn recreate_network_nodes_from_json(
699    nodes_json: &serde_json::Value,
700    ns: DynNamespace,
701    provider_name: &str,
702) -> Result<Vec<NetworkNode>, OrchestratorError> {
703    let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
704
705    let mut nodes = Vec::with_capacity(raw_nodes.len());
706    for raw in raw_nodes {
707        // validate provider tag
708        let provider_tag = raw
709            .inner
710            .get("provider_tag")
711            .and_then(|v| v.as_str())
712            .ok_or_else(|| {
713                OrchestratorError::InvalidConfig(format!(
714                    "Node '{}' is missing `provider_tag` in inner node JSON",
715                    raw.name
716                ))
717            })?;
718
719        if provider_tag != provider_name {
720            return Err(OrchestratorError::InvalidConfigForProvider(
721                provider_name.to_string(),
722                provider_tag.to_string(),
723            ));
724        }
725        let inner = ns.spawn_node_from_json(&raw.inner).await?;
726        let relay_node = NetworkNode::new(
727            raw.name,
728            raw.ws_uri,
729            raw.prometheus_uri,
730            raw.multiaddr,
731            raw.spec,
732            inner,
733            raw.cmd_generator_opts,
734            raw.context,
735        );
736        nodes.push(relay_node);
737    }
738
739    Ok(nodes)
740}
741
742async fn recreate_relaychain_from_json(
743    zombie_json: &serde_json::Value,
744    ns: DynNamespace,
745    provider_name: &str,
746) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
747    let relay_json = zombie_json
748        .get("relay")
749        .ok_or(OrchestratorError::InvalidConfig(
750            "Missing `relay` field in zombie.json".into(),
751        ))?
752        .clone();
753
754    let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
755
756    let initial_spec: NetworkSpec = serde_json::from_value(
757        zombie_json
758            .get("initial_spec")
759            .ok_or(OrchestratorError::InvalidConfig(
760                "Missing `initial_spec` field in zombie.json".into(),
761            ))?
762            .clone(),
763    )?;
764
765    // Populate relay nodes
766    let nodes =
767        recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
768    relay_raw.inner.nodes = nodes;
769
770    Ok((relay_raw.inner, initial_spec))
771}
772
773async fn recreate_parachains_from_json(
774    zombie_json: &serde_json::Value,
775    ns: DynNamespace,
776    provider_name: &str,
777) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
778    let paras_json = zombie_json
779        .get("parachains")
780        .ok_or(OrchestratorError::InvalidConfig(
781            "Missing `parachains` field in zombie.json".into(),
782        ))?
783        .clone();
784
785    let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
786
787    let mut parachains_map = HashMap::new();
788
789    for (id, parachain_entries) in raw_paras {
790        let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
791
792        for raw_para in parachain_entries {
793            let mut para = raw_para.inner;
794            para.collators =
795                recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
796                    .await?;
797            parsed_vec.push(para);
798        }
799
800        parachains_map.insert(id, parsed_vec);
801    }
802
803    Ok(parachains_map)
804}
805
806// Split the node list depending if it's bootnode or not
807// NOTE: if there isn't a bootnode declared we use the first one
808fn split_nodes_by_bootnodes(
809    nodes: &[NodeSpec],
810    no_default_bootnodes: bool,
811) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
812    // get the bootnodes to spawn first and calculate the bootnode string for use later
813    let mut bootnodes = vec![];
814    let mut other_nodes = vec![];
815    nodes.iter().for_each(|node| {
816        if node.is_bootnode {
817            bootnodes.push(node)
818        } else {
819            other_nodes.push(node)
820        }
821    });
822
823    if bootnodes.is_empty() && !no_default_bootnodes {
824        bootnodes.push(other_nodes.remove(0))
825    }
826
827    (bootnodes, other_nodes)
828}
829
830// Generate a bootnode multiaddress and return as string
831fn generate_bootnode_addr(
832    node: &NetworkNode,
833    ip: &IpAddr,
834    port: u16,
835) -> Result<String, GeneratorError> {
836    generators::generate_node_bootnode_addr(
837        &node.spec.peer_id,
838        ip,
839        port,
840        node.inner.args().as_ref(),
841        &node.spec.p2p_cert_hash,
842    )
843}
844// Validate that the config fulfill all the requirements of the provider
845fn validate_spec_with_provider_capabilities(
846    network_spec: &NetworkSpec,
847    capabilities: &ProviderCapabilities,
848) -> Result<(), anyhow::Error> {
849    let mut errs: Vec<String> = vec![];
850
851    if capabilities.requires_image {
852        // Relaychain
853        if network_spec.relaychain.default_image.is_none() {
854            // we should check if each node have an image
855            let nodes = &network_spec.relaychain.nodes;
856            if nodes.iter().any(|node| node.image.is_none()) {
857                errs.push(String::from(
858                    "Missing image for node, and not default is set at relaychain",
859                ));
860            }
861        };
862
863        // Paras
864        for para in &network_spec.parachains {
865            if para.default_image.is_none() {
866                let nodes = &para.collators;
867                if nodes.iter().any(|node| node.image.is_none()) {
868                    errs.push(format!(
869                        "Missing image for node, and not default is set at parachain {}",
870                        para.id
871                    ));
872                }
873            }
874        }
875    } else {
876        // native
877        // We need to get all the `cmds` and verify if are part of the path
878        let mut cmds: HashSet<&str> = Default::default();
879        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
880            cmds.insert(cmd.as_str());
881        }
882        for node in network_spec.relaychain().nodes.iter() {
883            cmds.insert(node.command());
884        }
885
886        // Paras
887        for para in &network_spec.parachains {
888            if let Some(cmd) = para.default_command.as_ref() {
889                cmds.insert(cmd.as_str());
890            }
891
892            for node in para.collators.iter() {
893                cmds.insert(node.command());
894            }
895        }
896
897        // now check the binaries
898        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
899        trace!("current PATH: {path}");
900        let parts: Vec<_> = path.split(":").collect();
901        for cmd in cmds {
902            let missing = if cmd.contains('/') {
903                trace!("checking {cmd}");
904                if std::fs::metadata(cmd).is_err() {
905                    true
906                } else {
907                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
908                    false
909                }
910            } else {
911                // should be in the PATH
912                !parts.iter().any(|part| {
913                    let path_to = format!("{part}/{cmd}");
914                    trace!("checking {path_to}");
915                    let check_result = std::fs::metadata(&path_to);
916                    trace!("result {:?}", check_result);
917                    if check_result.is_ok() {
918                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
919                        true
920                    } else {
921                        false
922                    }
923                })
924            };
925
926            if missing {
927                errs.push(help_msg(cmd));
928            }
929        }
930    }
931
932    if !errs.is_empty() {
933        let msg = errs.join("\n");
934        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
935    }
936
937    Ok(())
938}
939
940fn help_msg(cmd: &str) -> String {
941    match cmd {
942        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
943            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
944        },
945        "polkadot" => {
946            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
947        },
948        "polkadot-parachain" => {
949            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
950        },
951        _ => {
952            format!("Missing binary {cmd}, please compile it.")
953        },
954    }
955}
956
957/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
958fn spawn_concurrency_from_env() -> Option<usize> {
959    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
960        concurrency.parse::<usize>().ok()
961    } else {
962        None
963    }
964}
965
966fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
967    let desired_spawn_concurrency = match (
968        spawn_concurrency_from_env(),
969        spec.global_settings.spawn_concurrency(),
970    ) {
971        (Some(n), _) => Some(n),
972        (None, Some(n)) => Some(n),
973        _ => None,
974    };
975
976    let (spawn_concurrency, limited_by_tokens) =
977        if let Some(spawn_concurrency) = desired_spawn_concurrency {
978            if spawn_concurrency == 1 {
979                (1, false)
980            } else if has_tokens(&serde_json::to_string(spec)?) {
981                (1, true)
982            } else {
983                (spawn_concurrency, false)
984            }
985        } else {
986            // not set
987            if has_tokens(&serde_json::to_string(spec)?) {
988                (1, true)
989            } else {
990                // use 100 as max concurrency, we can set a max by provider later
991                (100, false)
992            }
993        };
994
995    Ok((spawn_concurrency, limited_by_tokens))
996}
997
998/// Build deterministic dependency **levels** among the given nodes.
999/// - Only dependencies **between nodes in `nodes`** are considered.
1000/// - Unknown/out-of-scope references are ignored.
1001/// - Self-dependencies are ignored.
1002fn dependency_levels_among<'a>(
1003    nodes: &'a [&'a NodeSpec],
1004) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
1005    let by_name = nodes
1006        .iter()
1007        .map(|n| (n.name.as_str(), *n))
1008        .collect::<HashMap<_, _>>();
1009
1010    let mut graph = HashMap::with_capacity(nodes.len());
1011    let mut indegree = HashMap::with_capacity(nodes.len());
1012
1013    for node in nodes {
1014        graph.insert(node.name.as_str(), Vec::new());
1015        indegree.insert(node.name.as_str(), 0);
1016    }
1017
1018    // build dependency graph
1019    for &node in nodes {
1020        if let Ok(args_json) = serde_json::to_string(&node.args) {
1021            // collect dependencies
1022            let unique_deps = get_tokens_to_replace(&args_json)
1023                .into_iter()
1024                .filter(|dep| dep != &node.name)
1025                .filter_map(|dep| by_name.get(dep.as_str()))
1026                .map(|&dep_node| dep_node.name.as_str())
1027                .collect::<HashSet<_>>();
1028
1029            for dep_name in unique_deps {
1030                graph
1031                    .get_mut(dep_name)
1032                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
1033                    .push(node);
1034                *indegree
1035                    .get_mut(node.name.as_str())
1036                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
1037            }
1038        }
1039    }
1040
1041    // find all nodes with no dependencies
1042    let mut queue = nodes
1043        .iter()
1044        .filter(|n| {
1045            *indegree
1046                .get(n.name.as_str())
1047                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1048                == 0
1049        })
1050        .copied()
1051        .collect::<VecDeque<_>>();
1052
1053    let mut processed_count = 0;
1054    let mut levels = Vec::new();
1055
1056    // Kahn's algorithm
1057    while !queue.is_empty() {
1058        let level_size = queue.len();
1059        let mut current_level = Vec::with_capacity(level_size);
1060
1061        for _ in 0..level_size {
1062            let n = queue
1063                .pop_front()
1064                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
1065            current_level.push(n);
1066            processed_count += 1;
1067
1068            for &neighbour in graph
1069                .get(n.name.as_str())
1070                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1071            {
1072                let neighbour_indegree = indegree
1073                    .get_mut(neighbour.name.as_str())
1074                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
1075                *neighbour_indegree -= 1;
1076
1077                if *neighbour_indegree == 0 {
1078                    queue.push_back(neighbour);
1079                }
1080            }
1081        }
1082
1083        current_level.sort_by_key(|n| &n.name);
1084        levels.push(current_level);
1085    }
1086
1087    // cycles detected, e.g A -> B -> A
1088    if processed_count != nodes.len() {
1089        return Err(OrchestratorError::InvalidConfig(
1090            "Tokens have cyclical dependencies".to_string(),
1091        ));
1092    }
1093
1094    Ok(levels)
1095}
1096
1097// TODO: get the fs from `DynNamespace` will make this not needed
1098// but the FileSystem trait isn't object-safe so we can't pass around
1099// as `dyn FileSystem`. We can refactor or using some `erase` techniques
1100// to resolve this and remove this struct
1101// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
1102// filesystem itself (the trait), so it will return this and we can move this
1103// directly to the support crate, it can be useful in the future
1104#[derive(Clone, Debug)]
1105pub struct ScopedFilesystem<'a, FS: FileSystem> {
1106    fs: &'a FS,
1107    base_dir: &'a str,
1108}
1109
1110impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
1111    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
1112        Self { fs, base_dir }
1113    }
1114
1115    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
1116        for file in files {
1117            let full_remote_path = PathBuf::from(format!(
1118                "{}/{}",
1119                self.base_dir,
1120                file.remote_path.to_string_lossy()
1121            ));
1122            trace!("coping file: {file}");
1123            self.fs
1124                .copy(file.local_path.as_path(), full_remote_path)
1125                .await?;
1126        }
1127        Ok(())
1128    }
1129
1130    async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
1131        let file = file.as_ref();
1132
1133        let full_path = if file.is_absolute() {
1134            file.to_owned()
1135        } else {
1136            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1137        };
1138        let content = self.fs.read(full_path).await?;
1139        Ok(content)
1140    }
1141
1142    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1143        let file = file.as_ref();
1144
1145        let full_path = if file.is_absolute() {
1146            file.to_owned()
1147        } else {
1148            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1149        };
1150        let content = self.fs.read_to_string(full_path).await?;
1151        Ok(content)
1152    }
1153
1154    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1155        let path = PathBuf::from(format!(
1156            "{}/{}",
1157            self.base_dir,
1158            path.as_ref().to_string_lossy()
1159        ));
1160        self.fs.create_dir(path).await
1161    }
1162
1163    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1164        let path = PathBuf::from(format!(
1165            "{}/{}",
1166            self.base_dir,
1167            path.as_ref().to_string_lossy()
1168        ));
1169        self.fs.create_dir_all(path).await
1170    }
1171
1172    async fn write(
1173        &self,
1174        path: impl AsRef<Path>,
1175        contents: impl AsRef<[u8]> + Send,
1176    ) -> Result<(), FileSystemError> {
1177        let path = path.as_ref();
1178
1179        let full_path = if path.is_absolute() {
1180            path.to_owned()
1181        } else {
1182            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1183        };
1184
1185        self.fs.write(full_path, contents).await
1186    }
1187
1188    /// Get the full_path in the scoped FS
1189    fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1190        let path = path.as_ref();
1191
1192        let full_path = if path.is_absolute() {
1193            path.to_owned()
1194        } else {
1195            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1196        };
1197
1198        full_path
1199    }
1200
1201    /// Get the base_dir in the scoped FS
1202    fn base_dir(&self) -> &str {
1203        self.base_dir
1204    }
1205}
1206
1207#[derive(Clone, Debug)]
1208pub enum ZombieRole {
1209    Temp,
1210    Node,
1211    Bootnode,
1212    Collator,
1213    CumulusCollator,
1214    Companion,
1215}
1216
1217// re-exports
1218pub use network::{AddCollatorOptions, AddNodeOptions};
1219pub use network_helper::metrics;
1220pub use sc_chain_spec;
1221
1222#[cfg(test)]
1223mod tests {
1224    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1225    use lazy_static::lazy_static;
1226    use tokio::sync::Mutex;
1227
1228    use super::*;
1229
1230    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1231    // mutex for test that use env
1232    lazy_static! {
1233        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1234    }
1235
1236    fn set_env(concurrency: Option<u32>) {
1237        if let Some(value) = concurrency {
1238            env::set_var(ENV_KEY, value.to_string());
1239        } else {
1240            env::remove_var(ENV_KEY);
1241        }
1242    }
1243
1244    fn generate(
1245        with_image: bool,
1246        with_cmd: Option<&'static str>,
1247    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1248        NetworkConfigBuilder::new()
1249            .with_relaychain(|r| {
1250                let mut relay = r
1251                    .with_chain("rococo-local")
1252                    .with_default_command(with_cmd.unwrap_or("polkadot"));
1253                if with_image {
1254                    relay = relay.with_default_image("docker.io/parity/polkadot")
1255                }
1256
1257                relay
1258                    .with_validator(|node| node.with_name("alice"))
1259                    .with_validator(|node| node.with_name("bob"))
1260            })
1261            .with_parachain(|p| {
1262                p.with_id(2000).cumulus_based(true).with_collator(|n| {
1263                    let node = n
1264                        .with_name("collator")
1265                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1266                    if with_image {
1267                        node.with_image("docker.io/paritypr/test-parachain")
1268                    } else {
1269                        node
1270                    }
1271                })
1272            })
1273            .build()
1274    }
1275
1276    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1277        let mut spec = NodeSpec {
1278            name: name.to_string(),
1279            ..Default::default()
1280        };
1281        if let Some(dependencies) = dependencies {
1282            for node in dependencies {
1283                spec.args.push(
1284                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1285                        .as_str()
1286                        .into(),
1287                );
1288            }
1289        }
1290        spec
1291    }
1292
1293    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1294        actual_levels
1295            .iter()
1296            .zip(expected_levels)
1297            .for_each(|(actual_level, expected_level)| {
1298                assert_eq!(actual_level.len(), expected_level.len());
1299                actual_level
1300                    .iter()
1301                    .zip(expected_level.iter())
1302                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1303            });
1304    }
1305
1306    #[tokio::test]
1307    async fn valid_config_with_image() {
1308        let network_config = generate(true, None).unwrap();
1309        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1310        let caps = ProviderCapabilities {
1311            requires_image: true,
1312            has_resources: false,
1313            prefix_with_full_path: false,
1314            use_default_ports_in_cmd: false,
1315        };
1316
1317        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1318        assert!(valid.is_ok())
1319    }
1320
1321    #[tokio::test]
1322    async fn invalid_config_without_image() {
1323        let network_config = generate(false, None).unwrap();
1324        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1325        let caps = ProviderCapabilities {
1326            requires_image: true,
1327            has_resources: false,
1328            prefix_with_full_path: false,
1329            use_default_ports_in_cmd: false,
1330        };
1331
1332        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1333        assert!(valid.is_err())
1334    }
1335
1336    #[tokio::test]
1337    async fn invalid_config_missing_cmd() {
1338        let network_config = generate(false, Some("other")).unwrap();
1339        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1340        let caps = ProviderCapabilities {
1341            requires_image: false,
1342            has_resources: false,
1343            prefix_with_full_path: false,
1344            use_default_ports_in_cmd: false,
1345        };
1346
1347        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1348        assert!(valid.is_err())
1349    }
1350
1351    #[tokio::test]
1352    async fn valid_config_present_cmd() {
1353        let network_config = generate(false, Some("cargo")).unwrap();
1354        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1355        let caps = ProviderCapabilities {
1356            requires_image: false,
1357            has_resources: false,
1358            prefix_with_full_path: false,
1359            use_default_ports_in_cmd: false,
1360        };
1361
1362        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1363        println!("{valid:?}");
1364        assert!(valid.is_ok())
1365    }
1366
1367    #[tokio::test]
1368    async fn default_spawn_concurrency() {
1369        let _g = ENV_MUTEX.lock().await;
1370        set_env(None);
1371        let network_config = generate(false, Some("cargo")).unwrap();
1372        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1373        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1374        assert_eq!(concurrency, 100);
1375    }
1376
1377    #[tokio::test]
1378    async fn set_spawn_concurrency() {
1379        let _g = ENV_MUTEX.lock().await;
1380        set_env(None);
1381
1382        let network_config = generate(false, Some("cargo")).unwrap();
1383        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1384
1385        let global_settings = GlobalSettingsBuilder::new()
1386            .with_spawn_concurrency(4)
1387            .build()
1388            .unwrap();
1389
1390        spec.set_global_settings(global_settings);
1391        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1392        assert_eq!(concurrency, 4);
1393        assert!(!limited);
1394    }
1395
1396    #[tokio::test]
1397    async fn set_spawn_concurrency_but_limited() {
1398        let _g = ENV_MUTEX.lock().await;
1399        set_env(None);
1400
1401        let network_config = generate(false, Some("cargo")).unwrap();
1402        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1403
1404        let global_settings = GlobalSettingsBuilder::new()
1405            .with_spawn_concurrency(4)
1406            .build()
1407            .unwrap();
1408
1409        spec.set_global_settings(global_settings);
1410        let node = spec.relaychain.nodes.first_mut().unwrap();
1411        node.args
1412            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1413        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1414        assert_eq!(concurrency, 1);
1415        assert!(limited);
1416    }
1417
1418    #[tokio::test]
1419    async fn set_spawn_concurrency_from_env() {
1420        let _g = ENV_MUTEX.lock().await;
1421        set_env(Some(10));
1422
1423        let network_config = generate(false, Some("cargo")).unwrap();
1424        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1425        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1426        assert_eq!(concurrency, 10);
1427        assert!(!limited);
1428    }
1429
1430    #[tokio::test]
1431    async fn set_spawn_concurrency_from_env_but_limited() {
1432        let _g = ENV_MUTEX.lock().await;
1433        set_env(Some(12));
1434
1435        let network_config = generate(false, Some("cargo")).unwrap();
1436        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1437        let node = spec.relaychain.nodes.first_mut().unwrap();
1438        node.args
1439            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1440        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1441        assert_eq!(concurrency, 1);
1442        assert!(limited);
1443    }
1444
1445    #[test]
1446    fn dependency_levels_among_should_work() {
1447        // no nodes
1448        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1449
1450        // one node
1451        let alice = get_node_with_dependencies("alice", None);
1452        let nodes = [&alice];
1453
1454        let levels = dependency_levels_among(&nodes).unwrap();
1455        let expected = vec![vec!["alice"]];
1456
1457        verify_levels(levels, expected);
1458
1459        // two independent nodes
1460        let alice = get_node_with_dependencies("alice", None);
1461        let bob = get_node_with_dependencies("bob", None);
1462        let nodes = [&alice, &bob];
1463
1464        let levels = dependency_levels_among(&nodes).unwrap();
1465        let expected = vec![vec!["alice", "bob"]];
1466
1467        verify_levels(levels, expected);
1468
1469        // alice -> bob -> charlie
1470        let alice = get_node_with_dependencies("alice", None);
1471        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1472        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1473        let nodes = [&alice, &bob, &charlie];
1474
1475        let levels = dependency_levels_among(&nodes).unwrap();
1476        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1477
1478        verify_levels(levels, expected);
1479
1480        //         ┌─> bob
1481        // alice ──|
1482        //         └─> charlie
1483        let alice = get_node_with_dependencies("alice", None);
1484        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1485        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1486        let nodes = [&alice, &bob, &charlie];
1487
1488        let levels = dependency_levels_among(&nodes).unwrap();
1489        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1490
1491        verify_levels(levels, expected);
1492
1493        //         ┌─>   bob  ──┐
1494        // alice ──|            ├─> dave
1495        //         └─> charlie  ┘
1496        let alice = get_node_with_dependencies("alice", None);
1497        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1498        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1499        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1500        let nodes = [&alice, &bob, &charlie, &dave];
1501
1502        let levels = dependency_levels_among(&nodes).unwrap();
1503        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1504
1505        verify_levels(levels, expected);
1506    }
1507
1508    #[test]
1509    fn dependency_levels_among_should_detect_cycles() {
1510        let mut alice = get_node_with_dependencies("alice", None);
1511        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1512        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1513
1514        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1515    }
1516}