zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    env,
19    net::IpAddr,
20    path::{Path, PathBuf},
21    time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28// re-exported
29pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32    types::{ProviderCapabilities, TransferedFile},
33    DynProvider,
34};
35use serde_json::json;
36use support::{
37    constants::{
38        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39        THIS_IS_A_BUG,
40    },
41    fs::{FileSystem, FileSystemError},
42    replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
48pub struct Orchestrator<T>
49where
50    T: FileSystem + Sync + Send,
51{
52    filesystem: T,
53    provider: DynProvider,
54}
55
56impl<T> Orchestrator<T>
57where
58    T: FileSystem + Sync + Send + Clone,
59{
60    pub fn new(filesystem: T, provider: DynProvider) -> Self {
61        Self {
62            filesystem,
63            provider,
64        }
65    }
66
67    pub async fn spawn(
68        &self,
69        network_config: NetworkConfig,
70    ) -> Result<Network<T>, OrchestratorError> {
71        let global_timeout = network_config.global_settings().network_spawn_timeout();
72        let network_spec = NetworkSpec::from_config(&network_config).await?;
73
74        let res = timeout(
75            Duration::from_secs(global_timeout.into()),
76            self.spawn_inner(network_spec),
77        )
78        .await
79        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
80        res?
81    }
82
83    pub async fn spawn_from_spec(
84        &self,
85        network_spec: NetworkSpec,
86    ) -> Result<Network<T>, OrchestratorError> {
87        let global_timeout = network_spec.global_settings.network_spawn_timeout();
88        let res = timeout(
89            Duration::from_secs(global_timeout as u64),
90            self.spawn_inner(network_spec),
91        )
92        .await
93        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
94        res?
95    }
96
97    async fn spawn_inner(
98        &self,
99        mut network_spec: NetworkSpec,
100    ) -> Result<Network<T>, OrchestratorError> {
101        // main driver for spawn the network
102        debug!(network_spec = ?network_spec,"Network spec to spawn");
103
104        // TODO: move to Provider trait
105        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
106            .map_err(|err| {
107                OrchestratorError::InvalidConfigForProvider(
108                    self.provider.name().into(),
109                    err.to_string(),
110                )
111            })?;
112
113        // create namespace
114        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
115            self.provider
116                .create_namespace_with_base_dir(base_dir)
117                .await?
118        } else {
119            self.provider.create_namespace().await?
120        };
121
122        // set the spawn_concurrency
123        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
124
125        let start_time = SystemTime::now();
126        info!("🧰 ns: {}", ns.name());
127        info!("🧰 base_dir: {:?}", ns.base_dir());
128        info!("🕰 start time: {:?}", start_time);
129        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
130
131        network_spec
132            .populate_nodes_available_args(ns.clone())
133            .await?;
134
135        let base_dir = ns.base_dir().to_string_lossy();
136        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
137        // Create chain-spec for relaychain
138        network_spec
139            .relaychain
140            .chain_spec
141            .build(&ns, &scoped_fs)
142            .await?;
143
144        debug!("relaychain spec built!");
145        // Create parachain artifacts (chain-spec, wasm, state)
146        let relay_chain_id = network_spec
147            .relaychain
148            .chain_spec
149            .read_chain_id(&scoped_fs)
150            .await?;
151
152        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
153        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
154        network_spec
155            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
156            .await?;
157
158        // Gather the parachains to register in genesis and the ones to register with extrinsic
159        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
160            Vec<&ParachainSpec>,
161            Vec<&ParachainSpec>,
162        ) = network_spec
163            .parachains
164            .iter()
165            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
166            .partition(|para| {
167                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
168            });
169
170        let mut para_artifacts = vec![];
171        for para in para_to_register_in_genesis {
172            let genesis_config = para.get_genesis_config()?;
173            para_artifacts.push(genesis_config)
174        }
175
176        // Customize relaychain
177        network_spec
178            .relaychain
179            .chain_spec
180            .customize_relay(
181                &network_spec.relaychain,
182                &network_spec.hrmp_channels,
183                para_artifacts,
184                &scoped_fs,
185            )
186            .await?;
187
188        // Build raw version
189        network_spec
190            .relaychain
191            .chain_spec
192            .build_raw(&ns, &scoped_fs)
193            .await?;
194
195        // override wasm if needed
196        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
197            network_spec
198                .relaychain
199                .chain_spec
200                .override_code(&scoped_fs, wasm_override)
201                .await?;
202        }
203
204        // override raw spec if needed
205        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
206            network_spec
207                .relaychain
208                .chain_spec
209                .override_raw_spec(&scoped_fs, raw_spec_override)
210                .await?;
211        }
212
213        let (bootnodes, relaynodes) =
214            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
215
216        // TODO: we want to still supporting spawn a dedicated bootnode??
217        let mut ctx = SpawnNodeCtx {
218            chain_id: &relay_chain_id,
219            parachain_id: None,
220            chain: relay_chain_name.as_str(),
221            role: ZombieRole::Node,
222            ns: &ns,
223            scoped_fs: &scoped_fs,
224            parachain: None,
225            bootnodes_addr: &vec![],
226            wait_ready: false,
227            nodes_by_name: json!({}),
228        };
229
230        let global_files_to_inject = vec![TransferedFile::new(
231            PathBuf::from(format!(
232                "{}/{relay_chain_name}.json",
233                ns.base_dir().to_string_lossy()
234            )),
235            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
236        )];
237
238        let r = Relaychain::new(
239            relay_chain_name.to_string(),
240            relay_chain_id.clone(),
241            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
242                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
243            )?),
244        );
245        let mut network =
246            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
247
248        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
249        let mut node_ws_url: String = "".to_string();
250
251        // Calculate the bootnodes addr from the running nodes
252        let mut bootnodes_addr: Vec<String> = vec![];
253
254        for level in dependency_levels_among(&bootnodes)? {
255            let mut running_nodes_per_level = vec![];
256            for chunk in level.chunks(spawn_concurrency) {
257                let spawning_tasks = chunk
258                    .iter()
259                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
260
261                for node in futures::future::try_join_all(spawning_tasks).await? {
262                    let bootnode_multiaddr = node.multiaddr();
263
264                    bootnodes_addr.push(bootnode_multiaddr.to_string());
265
266                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
267                    if node_ws_url.is_empty() {
268                        node_ws_url.clone_from(&node.ws_uri)
269                    }
270
271                    running_nodes_per_level.push(node);
272                }
273            }
274            info!(
275                "🕰 waiting for level: {:?} to be up...",
276                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
277            );
278
279            // Wait for all nodes in the current level to be up
280            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
281                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
282            });
283
284            let _ = futures::future::try_join_all(waiting_tasks).await?;
285
286            for node in running_nodes_per_level {
287                // Add the node to the  context and `Network` instance
288                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
289                network.add_running_node(node, None).await;
290            }
291        }
292
293        // Add the bootnodes to the relaychain spec file and ctx
294        network_spec
295            .relaychain
296            .chain_spec
297            .add_bootnodes(&scoped_fs, &bootnodes_addr)
298            .await?;
299
300        ctx.bootnodes_addr = &bootnodes_addr;
301
302        for level in dependency_levels_among(&relaynodes)? {
303            let mut running_nodes_per_level = vec![];
304            for chunk in level.chunks(spawn_concurrency) {
305                let spawning_tasks = chunk
306                    .iter()
307                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
308
309                for node in futures::future::try_join_all(spawning_tasks).await? {
310                    running_nodes_per_level.push(node);
311                }
312            }
313            info!(
314                "🕰 waiting for level: {:?} to be up...",
315                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
316            );
317
318            // Wait for all nodes in the current level to be up
319            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
320                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
321            });
322
323            let _ = futures::future::try_join_all(waiting_tasks).await?;
324
325            for node in running_nodes_per_level {
326                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
327                network.add_running_node(node, None).await;
328            }
329        }
330
331        // spawn paras
332        for para in network_spec.parachains.iter() {
333            // Create parachain (in the context of the running network)
334            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
335            let parachain_id = parachain.chain_id.clone();
336
337            let (bootnodes, collators) =
338                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
339
340            // Create `ctx` for spawn parachain nodes
341            let mut ctx_para = SpawnNodeCtx {
342                parachain: Some(para),
343                parachain_id: parachain_id.as_deref(),
344                role: if para.is_cumulus_based {
345                    ZombieRole::CumulusCollator
346                } else {
347                    ZombieRole::Collator
348                },
349                bootnodes_addr: &vec![],
350                ..ctx.clone()
351            };
352
353            // Calculate the bootnodes addr from the running nodes
354            let mut bootnodes_addr: Vec<String> = vec![];
355            let mut running_nodes: Vec<NetworkNode> = vec![];
356
357            for level in dependency_levels_among(&bootnodes)? {
358                let mut running_nodes_per_level = vec![];
359                for chunk in level.chunks(spawn_concurrency) {
360                    let spawning_tasks = chunk.iter().map(|node| {
361                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
362                    });
363
364                    for node in futures::future::try_join_all(spawning_tasks).await? {
365                        let bootnode_multiaddr = node.multiaddr();
366
367                        bootnodes_addr.push(bootnode_multiaddr.to_string());
368
369                        running_nodes_per_level.push(node);
370                    }
371                }
372                info!(
373                    "🕰 waiting for level: {:?} to be up...",
374                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
375                );
376
377                // Wait for all nodes in the current level to be up
378                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
379                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
380                });
381
382                let _ = futures::future::try_join_all(waiting_tasks).await?;
383
384                for node in running_nodes_per_level {
385                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
386                    running_nodes.push(node);
387                }
388            }
389
390            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
391                para_chain_spec
392                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
393                    .await?;
394            }
395
396            ctx_para.bootnodes_addr = &bootnodes_addr;
397
398            // Spawn the rest of the nodes
399            for level in dependency_levels_among(&collators)? {
400                let mut running_nodes_per_level = vec![];
401                for chunk in level.chunks(spawn_concurrency) {
402                    let spawning_tasks = chunk.iter().map(|node| {
403                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
404                    });
405
406                    for node in futures::future::try_join_all(spawning_tasks).await? {
407                        running_nodes_per_level.push(node);
408                    }
409                }
410                info!(
411                    "🕰 waiting for level: {:?} to be up...",
412                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
413                );
414
415                // Wait for all nodes in the current level to be up
416                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
417                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
418                });
419
420                let _ = futures::future::try_join_all(waiting_tasks).await?;
421
422                for node in running_nodes_per_level {
423                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
424                    running_nodes.push(node);
425                }
426            }
427
428            let running_para_id = parachain.para_id;
429            network.add_para(parachain);
430            for node in running_nodes {
431                network.add_running_node(node, Some(running_para_id)).await;
432            }
433        }
434
435        // TODO:
436        // - add-ons (introspector/tracing/etc)
437
438        // verify nodes
439        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
440
441        // Now we need to register the paras with extrinsic from the Vec collected before;
442        for para in para_to_register_with_extrinsic {
443            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
444                id: para.id,
445                // This needs to resolve correctly
446                wasm_path: para
447                    .genesis_wasm
448                    .artifact_path()
449                    .ok_or(OrchestratorError::InvariantError(
450                        "artifact path for wasm must be set at this point",
451                    ))?
452                    .to_path_buf(),
453                state_path: para
454                    .genesis_state
455                    .artifact_path()
456                    .ok_or(OrchestratorError::InvariantError(
457                        "artifact path for state must be set at this point",
458                    ))?
459                    .to_path_buf(),
460                node_ws_url: node_ws_url.clone(),
461                onboard_as_para: para.onboard_as_parachain,
462                seed: None, // TODO: Seed is passed by?
463                finalization: false,
464            };
465
466            Parachain::register(register_para_options, &scoped_fs).await?;
467        }
468
469        // - write zombie.json state file
470        let mut zombie_json = serde_json::to_value(&network)?;
471        zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
472        zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
473
474        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
475            zombie_json["start_time_ts"] =
476                serde_json::value::Value::String(start_time_ts.as_millis().to_string());
477        } else {
478            // Just warn, do not propagate the err (this should not happens)
479            warn!("⚠️ Error getting start_time timestamp");
480        }
481
482        scoped_fs
483            .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
484            .await?;
485
486        if network_spec.global_settings.tear_down_on_failure() {
487            network.spawn_watching_task();
488        }
489
490        Ok(network)
491    }
492}
493
494// Helpers
495
496// Split the node list depending if it's bootnode or not
497// NOTE: if there isn't a bootnode declared we use the first one
498fn split_nodes_by_bootnodes(
499    nodes: &[NodeSpec],
500    no_default_bootnodes: bool,
501) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
502    // get the bootnodes to spawn first and calculate the bootnode string for use later
503    let mut bootnodes = vec![];
504    let mut other_nodes = vec![];
505    nodes.iter().for_each(|node| {
506        if node.is_bootnode {
507            bootnodes.push(node)
508        } else {
509            other_nodes.push(node)
510        }
511    });
512
513    if bootnodes.is_empty() && !no_default_bootnodes {
514        bootnodes.push(other_nodes.remove(0))
515    }
516
517    (bootnodes, other_nodes)
518}
519
520// Generate a bootnode multiaddress and return as string
521fn generate_bootnode_addr(
522    node: &NetworkNode,
523    ip: &IpAddr,
524    port: u16,
525) -> Result<String, GeneratorError> {
526    generators::generate_node_bootnode_addr(
527        &node.spec.peer_id,
528        ip,
529        port,
530        node.inner.args().as_ref(),
531        &node.spec.p2p_cert_hash,
532    )
533}
534// Validate that the config fulfill all the requirements of the provider
535fn validate_spec_with_provider_capabilities(
536    network_spec: &NetworkSpec,
537    capabilities: &ProviderCapabilities,
538) -> Result<(), anyhow::Error> {
539    let mut errs: Vec<String> = vec![];
540
541    if capabilities.requires_image {
542        // Relaychain
543        if network_spec.relaychain.default_image.is_none() {
544            // we should check if each node have an image
545            let nodes = &network_spec.relaychain.nodes;
546            if nodes.iter().any(|node| node.image.is_none()) {
547                errs.push(String::from(
548                    "Missing image for node, and not default is set at relaychain",
549                ));
550            }
551        };
552
553        // Paras
554        for para in &network_spec.parachains {
555            if para.default_image.is_none() {
556                let nodes = &para.collators;
557                if nodes.iter().any(|node| node.image.is_none()) {
558                    errs.push(format!(
559                        "Missing image for node, and not default is set at parachain {}",
560                        para.id
561                    ));
562                }
563            }
564        }
565    } else {
566        // native
567        // We need to get all the `cmds` and verify if are part of the path
568        let mut cmds: HashSet<&str> = Default::default();
569        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
570            cmds.insert(cmd.as_str());
571        }
572        for node in network_spec.relaychain().nodes.iter() {
573            cmds.insert(node.command());
574        }
575
576        // Paras
577        for para in &network_spec.parachains {
578            if let Some(cmd) = para.default_command.as_ref() {
579                cmds.insert(cmd.as_str());
580            }
581
582            for node in para.collators.iter() {
583                cmds.insert(node.command());
584            }
585        }
586
587        // now check the binaries
588        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
589        trace!("current PATH: {path}");
590        let parts: Vec<_> = path.split(":").collect();
591        for cmd in cmds {
592            let missing = if cmd.contains('/') {
593                trace!("checking {cmd}");
594                if std::fs::metadata(cmd).is_err() {
595                    true
596                } else {
597                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
598                    false
599                }
600            } else {
601                // should be in the PATH
602                !parts.iter().any(|part| {
603                    let path_to = format!("{part}/{cmd}");
604                    trace!("checking {path_to}");
605                    let check_result = std::fs::metadata(&path_to);
606                    trace!("result {:?}", check_result);
607                    if check_result.is_ok() {
608                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
609                        true
610                    } else {
611                        false
612                    }
613                })
614            };
615
616            if missing {
617                errs.push(help_msg(cmd));
618            }
619        }
620    }
621
622    if !errs.is_empty() {
623        let msg = errs.join("\n");
624        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
625    }
626
627    Ok(())
628}
629
630fn help_msg(cmd: &str) -> String {
631    match cmd {
632        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
633            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
634        },
635        "polkadot" => {
636            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
637        },
638        "polkadot-parachain" => {
639            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
640        },
641        _ => {
642            format!("Missing binary {cmd}, please compile it.")
643        },
644    }
645}
646
647/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
648fn spawn_concurrency_from_env() -> Option<usize> {
649    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
650        concurrency.parse::<usize>().ok()
651    } else {
652        None
653    }
654}
655
656fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
657    let desired_spawn_concurrency = match (
658        spawn_concurrency_from_env(),
659        spec.global_settings.spawn_concurrency(),
660    ) {
661        (Some(n), _) => Some(n),
662        (None, Some(n)) => Some(n),
663        _ => None,
664    };
665
666    let (spawn_concurrency, limited_by_tokens) =
667        if let Some(spawn_concurrency) = desired_spawn_concurrency {
668            if spawn_concurrency == 1 {
669                (1, false)
670            } else if has_tokens(&serde_json::to_string(spec)?) {
671                (1, true)
672            } else {
673                (spawn_concurrency, false)
674            }
675        } else {
676            // not set
677            if has_tokens(&serde_json::to_string(spec)?) {
678                (1, true)
679            } else {
680                // use 100 as max concurrency, we can set a max by provider later
681                (100, false)
682            }
683        };
684
685    Ok((spawn_concurrency, limited_by_tokens))
686}
687
688/// Build deterministic dependency **levels** among the given nodes.
689/// - Only dependencies **between nodes in `nodes`** are considered.
690/// - Unknown/out-of-scope references are ignored.
691/// - Self-dependencies are ignored.
692fn dependency_levels_among<'a>(
693    nodes: &'a [&'a NodeSpec],
694) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
695    let by_name = nodes
696        .iter()
697        .map(|n| (n.name.as_str(), *n))
698        .collect::<HashMap<_, _>>();
699
700    let mut graph = HashMap::with_capacity(nodes.len());
701    let mut indegree = HashMap::with_capacity(nodes.len());
702
703    for node in nodes {
704        graph.insert(node.name.as_str(), Vec::new());
705        indegree.insert(node.name.as_str(), 0);
706    }
707
708    // build dependency graph
709    for &node in nodes {
710        if let Ok(args_json) = serde_json::to_string(&node.args) {
711            // collect dependencies
712            let unique_deps = get_tokens_to_replace(&args_json)
713                .into_iter()
714                .filter(|dep| dep != &node.name)
715                .filter_map(|dep| by_name.get(dep.as_str()))
716                .map(|&dep_node| dep_node.name.as_str())
717                .collect::<HashSet<_>>();
718
719            for dep_name in unique_deps {
720                graph
721                    .get_mut(dep_name)
722                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
723                    .push(node);
724                *indegree
725                    .get_mut(node.name.as_str())
726                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
727            }
728        }
729    }
730
731    // find all nodes with no dependencies
732    let mut queue = nodes
733        .iter()
734        .filter(|n| {
735            *indegree
736                .get(n.name.as_str())
737                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
738                == 0
739        })
740        .copied()
741        .collect::<VecDeque<_>>();
742
743    let mut processed_count = 0;
744    let mut levels = Vec::new();
745
746    // Kahn's algorithm
747    while !queue.is_empty() {
748        let level_size = queue.len();
749        let mut current_level = Vec::with_capacity(level_size);
750
751        for _ in 0..level_size {
752            let n = queue
753                .pop_front()
754                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
755            current_level.push(n);
756            processed_count += 1;
757
758            for &neighbour in graph
759                .get(n.name.as_str())
760                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
761            {
762                let neighbour_indegree = indegree
763                    .get_mut(neighbour.name.as_str())
764                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
765                *neighbour_indegree -= 1;
766
767                if *neighbour_indegree == 0 {
768                    queue.push_back(neighbour);
769                }
770            }
771        }
772
773        current_level.sort_by_key(|n| &n.name);
774        levels.push(current_level);
775    }
776
777    // cycles detected, e.g A -> B -> A
778    if processed_count != nodes.len() {
779        return Err(OrchestratorError::InvalidConfig(
780            "Tokens have cyclical dependencies".to_string(),
781        ));
782    }
783
784    Ok(levels)
785}
786
787// TODO: get the fs from `DynNamespace` will make this not needed
788// but the FileSystem trait isn't object-safe so we can't pass around
789// as `dyn FileSystem`. We can refactor or using some `erase` techniques
790// to resolve this and remove this struct
791// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
792// filesystem itself (the trait), so it will return this and we can move this
793// directly to the support crate, it can be useful in the future
794#[derive(Clone, Debug)]
795pub struct ScopedFilesystem<'a, FS: FileSystem> {
796    fs: &'a FS,
797    base_dir: &'a str,
798}
799
800impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
801    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
802        Self { fs, base_dir }
803    }
804
805    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
806        for file in files {
807            let full_remote_path = PathBuf::from(format!(
808                "{}/{}",
809                self.base_dir,
810                file.remote_path.to_string_lossy()
811            ));
812            trace!("coping file: {file}");
813            self.fs
814                .copy(file.local_path.as_path(), full_remote_path)
815                .await?;
816        }
817        Ok(())
818    }
819
820    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
821        let file = file.as_ref();
822
823        let full_path = if file.is_absolute() {
824            file.to_owned()
825        } else {
826            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
827        };
828        let content = self.fs.read_to_string(full_path).await?;
829        Ok(content)
830    }
831
832    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
833        let path = PathBuf::from(format!(
834            "{}/{}",
835            self.base_dir,
836            path.as_ref().to_string_lossy()
837        ));
838        self.fs.create_dir(path).await
839    }
840
841    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
842        let path = PathBuf::from(format!(
843            "{}/{}",
844            self.base_dir,
845            path.as_ref().to_string_lossy()
846        ));
847        self.fs.create_dir_all(path).await
848    }
849
850    async fn write(
851        &self,
852        path: impl AsRef<Path>,
853        contents: impl AsRef<[u8]> + Send,
854    ) -> Result<(), FileSystemError> {
855        let path = path.as_ref();
856
857        let full_path = if path.is_absolute() {
858            path.to_owned()
859        } else {
860            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
861        };
862
863        self.fs.write(full_path, contents).await
864    }
865}
866
867#[derive(Clone, Debug)]
868pub enum ZombieRole {
869    Temp,
870    Node,
871    Bootnode,
872    Collator,
873    CumulusCollator,
874    Companion,
875}
876
877// re-exports
878pub use network::{AddCollatorOptions, AddNodeOptions};
879pub use network_helper::metrics;
880#[cfg(feature = "pjs")]
881pub use pjs_helper::PjsResult;
882
883#[cfg(test)]
884mod tests {
885    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
886    use lazy_static::lazy_static;
887    use tokio::sync::Mutex;
888
889    use super::*;
890
891    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
892    // mutex for test that use env
893    lazy_static! {
894        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
895    }
896
897    fn set_env(concurrency: Option<u32>) {
898        if let Some(value) = concurrency {
899            env::set_var(ENV_KEY, value.to_string());
900        } else {
901            env::remove_var(ENV_KEY);
902        }
903    }
904
905    fn generate(
906        with_image: bool,
907        with_cmd: Option<&'static str>,
908    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
909        NetworkConfigBuilder::new()
910            .with_relaychain(|r| {
911                let mut relay = r
912                    .with_chain("rococo-local")
913                    .with_default_command(with_cmd.unwrap_or("polkadot"));
914                if with_image {
915                    relay = relay.with_default_image("docker.io/parity/polkadot")
916                }
917
918                relay
919                    .with_validator(|node| node.with_name("alice"))
920                    .with_validator(|node| node.with_name("bob"))
921            })
922            .with_parachain(|p| {
923                p.with_id(2000).cumulus_based(true).with_collator(|n| {
924                    let node = n
925                        .with_name("collator")
926                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
927                    if with_image {
928                        node.with_image("docker.io/paritypr/test-parachain")
929                    } else {
930                        node
931                    }
932                })
933            })
934            .build()
935    }
936
937    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
938        let mut spec = NodeSpec {
939            name: name.to_string(),
940            ..Default::default()
941        };
942        if let Some(dependencies) = dependencies {
943            for node in dependencies {
944                spec.args.push(
945                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
946                        .as_str()
947                        .into(),
948                );
949            }
950        }
951        spec
952    }
953
954    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
955        actual_levels
956            .iter()
957            .zip(expected_levels)
958            .for_each(|(actual_level, expected_level)| {
959                assert_eq!(actual_level.len(), expected_level.len());
960                actual_level
961                    .iter()
962                    .zip(expected_level.iter())
963                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
964            });
965    }
966
967    #[tokio::test]
968    async fn valid_config_with_image() {
969        let network_config = generate(true, None).unwrap();
970        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
971        let caps = ProviderCapabilities {
972            requires_image: true,
973            has_resources: false,
974            prefix_with_full_path: false,
975            use_default_ports_in_cmd: false,
976        };
977
978        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
979        assert!(valid.is_ok())
980    }
981
982    #[tokio::test]
983    async fn invalid_config_without_image() {
984        let network_config = generate(false, None).unwrap();
985        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
986        let caps = ProviderCapabilities {
987            requires_image: true,
988            has_resources: false,
989            prefix_with_full_path: false,
990            use_default_ports_in_cmd: false,
991        };
992
993        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
994        assert!(valid.is_err())
995    }
996
997    #[tokio::test]
998    async fn invalid_config_missing_cmd() {
999        let network_config = generate(false, Some("other")).unwrap();
1000        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1001        let caps = ProviderCapabilities {
1002            requires_image: false,
1003            has_resources: false,
1004            prefix_with_full_path: false,
1005            use_default_ports_in_cmd: false,
1006        };
1007
1008        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1009        assert!(valid.is_err())
1010    }
1011
1012    #[tokio::test]
1013    async fn valid_config_present_cmd() {
1014        let network_config = generate(false, Some("cargo")).unwrap();
1015        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1016        let caps = ProviderCapabilities {
1017            requires_image: false,
1018            has_resources: false,
1019            prefix_with_full_path: false,
1020            use_default_ports_in_cmd: false,
1021        };
1022
1023        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1024        println!("{valid:?}");
1025        assert!(valid.is_ok())
1026    }
1027
1028    #[tokio::test]
1029    async fn default_spawn_concurrency() {
1030        let _g = ENV_MUTEX.lock().await;
1031        set_env(None);
1032        let network_config = generate(false, Some("cargo")).unwrap();
1033        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1034        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1035        assert_eq!(concurrency, 100);
1036    }
1037
1038    #[tokio::test]
1039    async fn set_spawn_concurrency() {
1040        let _g = ENV_MUTEX.lock().await;
1041        set_env(None);
1042
1043        let network_config = generate(false, Some("cargo")).unwrap();
1044        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1045
1046        let global_settings = GlobalSettingsBuilder::new()
1047            .with_spawn_concurrency(4)
1048            .build()
1049            .unwrap();
1050
1051        spec.set_global_settings(global_settings);
1052        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1053        assert_eq!(concurrency, 4);
1054        assert!(!limited);
1055    }
1056
1057    #[tokio::test]
1058    async fn set_spawn_concurrency_but_limited() {
1059        let _g = ENV_MUTEX.lock().await;
1060        set_env(None);
1061
1062        let network_config = generate(false, Some("cargo")).unwrap();
1063        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1064
1065        let global_settings = GlobalSettingsBuilder::new()
1066            .with_spawn_concurrency(4)
1067            .build()
1068            .unwrap();
1069
1070        spec.set_global_settings(global_settings);
1071        let node = spec.relaychain.nodes.first_mut().unwrap();
1072        node.args
1073            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1074        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1075        assert_eq!(concurrency, 1);
1076        assert!(limited);
1077    }
1078
1079    #[tokio::test]
1080    async fn set_spawn_concurrency_from_env() {
1081        let _g = ENV_MUTEX.lock().await;
1082        set_env(Some(10));
1083
1084        let network_config = generate(false, Some("cargo")).unwrap();
1085        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1086        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1087        assert_eq!(concurrency, 10);
1088        assert!(!limited);
1089    }
1090
1091    #[tokio::test]
1092    async fn set_spawn_concurrency_from_env_but_limited() {
1093        let _g = ENV_MUTEX.lock().await;
1094        set_env(Some(12));
1095
1096        let network_config = generate(false, Some("cargo")).unwrap();
1097        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1098        let node = spec.relaychain.nodes.first_mut().unwrap();
1099        node.args
1100            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1101        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1102        assert_eq!(concurrency, 1);
1103        assert!(limited);
1104    }
1105
1106    #[test]
1107    fn dependency_levels_among_should_work() {
1108        // no nodes
1109        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1110
1111        // one node
1112        let alice = get_node_with_dependencies("alice", None);
1113        let nodes = [&alice];
1114
1115        let levels = dependency_levels_among(&nodes).unwrap();
1116        let expected = vec![vec!["alice"]];
1117
1118        verify_levels(levels, expected);
1119
1120        // two independent nodes
1121        let alice = get_node_with_dependencies("alice", None);
1122        let bob = get_node_with_dependencies("bob", None);
1123        let nodes = [&alice, &bob];
1124
1125        let levels = dependency_levels_among(&nodes).unwrap();
1126        let expected = vec![vec!["alice", "bob"]];
1127
1128        verify_levels(levels, expected);
1129
1130        // alice -> bob -> charlie
1131        let alice = get_node_with_dependencies("alice", None);
1132        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1133        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1134        let nodes = [&alice, &bob, &charlie];
1135
1136        let levels = dependency_levels_among(&nodes).unwrap();
1137        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1138
1139        verify_levels(levels, expected);
1140
1141        //         ┌─> bob
1142        // alice ──|
1143        //         └─> charlie
1144        let alice = get_node_with_dependencies("alice", None);
1145        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1146        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1147        let nodes = [&alice, &bob, &charlie];
1148
1149        let levels = dependency_levels_among(&nodes).unwrap();
1150        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1151
1152        verify_levels(levels, expected);
1153
1154        //         ┌─>   bob  ──┐
1155        // alice ──|            ├─> dave
1156        //         └─> charlie  ┘
1157        let alice = get_node_with_dependencies("alice", None);
1158        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1159        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1160        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1161        let nodes = [&alice, &bob, &charlie, &dave];
1162
1163        let levels = dependency_levels_among(&nodes).unwrap();
1164        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1165
1166        verify_levels(levels, expected);
1167    }
1168
1169    #[test]
1170    fn dependency_levels_among_should_detect_cycles() {
1171        let mut alice = get_node_with_dependencies("alice", None);
1172        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1173        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1174
1175        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1176    }
1177}