zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    env,
19    net::IpAddr,
20    path::{Path, PathBuf},
21    time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28// re-exported
29pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32    types::{ProviderCapabilities, TransferedFile},
33    DynProvider,
34};
35use serde_json::json;
36use support::{
37    constants::{
38        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39        THIS_IS_A_BUG,
40    },
41    fs::{FileSystem, FileSystemError},
42    replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{
48    shared::{constants::DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS, types::RegisterParachainOptions},
49    spawner::SpawnNodeCtx,
50};
51pub struct Orchestrator<T>
52where
53    T: FileSystem + Sync + Send,
54{
55    filesystem: T,
56    provider: DynProvider,
57}
58
59impl<T> Orchestrator<T>
60where
61    T: FileSystem + Sync + Send + Clone,
62{
63    pub fn new(filesystem: T, provider: DynProvider) -> Self {
64        Self {
65            filesystem,
66            provider,
67        }
68    }
69
70    pub async fn spawn(
71        &self,
72        network_config: NetworkConfig,
73    ) -> Result<Network<T>, OrchestratorError> {
74        let global_timeout = network_config.global_settings().network_spawn_timeout();
75        let network_spec = NetworkSpec::from_config(&network_config).await?;
76
77        let res = timeout(
78            Duration::from_secs(global_timeout.into()),
79            self.spawn_inner(network_spec),
80        )
81        .await
82        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
83        res?
84    }
85
86    pub async fn spawn_from_spec(
87        &self,
88        network_spec: NetworkSpec,
89    ) -> Result<Network<T>, OrchestratorError> {
90        let global_timeout = network_spec.global_settings.network_spawn_timeout();
91        let res = timeout(
92            Duration::from_secs(global_timeout as u64),
93            self.spawn_inner(network_spec),
94        )
95        .await
96        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
97        res?
98    }
99
100    async fn spawn_inner(
101        &self,
102        mut network_spec: NetworkSpec,
103    ) -> Result<Network<T>, OrchestratorError> {
104        // main driver for spawn the network
105        debug!(network_spec = ?network_spec,"Network spec to spawn");
106
107        // TODO: move to Provider trait
108        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
109            .map_err(|err| {
110                OrchestratorError::InvalidConfigForProvider(
111                    self.provider.name().into(),
112                    err.to_string(),
113                )
114            })?;
115
116        // create namespace
117        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
118            self.provider
119                .create_namespace_with_base_dir(base_dir)
120                .await?
121        } else {
122            self.provider.create_namespace().await?
123        };
124
125        // set the spawn_concurrency
126        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
127
128        let start_time = SystemTime::now();
129        info!("🧰 ns: {}", ns.name());
130        info!("🧰 base_dir: {:?}", ns.base_dir());
131        info!("🕰 start time: {:?}", start_time);
132        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
133
134        network_spec
135            .populate_nodes_available_args(ns.clone())
136            .await?;
137
138        let base_dir = ns.base_dir().to_string_lossy();
139        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
140        // Create chain-spec for relaychain
141        network_spec
142            .relaychain
143            .chain_spec
144            .build(&ns, &scoped_fs)
145            .await?;
146
147        debug!("relaychain spec built!");
148        // Create parachain artifacts (chain-spec, wasm, state)
149        let relay_chain_id = network_spec
150            .relaychain
151            .chain_spec
152            .read_chain_id(&scoped_fs)
153            .await?;
154
155        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
156        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
157        network_spec
158            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
159            .await?;
160
161        // Gather the parachains to register in genesis and the ones to register with extrinsic
162        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
163            Vec<&ParachainSpec>,
164            Vec<&ParachainSpec>,
165        ) = network_spec
166            .parachains
167            .iter()
168            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
169            .partition(|para| {
170                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
171            });
172
173        let mut para_artifacts = vec![];
174        for para in para_to_register_in_genesis {
175            let genesis_config = para.get_genesis_config()?;
176            para_artifacts.push(genesis_config)
177        }
178
179        // Customize relaychain
180        network_spec
181            .relaychain
182            .chain_spec
183            .customize_relay(
184                &network_spec.relaychain,
185                &network_spec.hrmp_channels,
186                para_artifacts,
187                &scoped_fs,
188            )
189            .await?;
190
191        // Build raw version
192        network_spec
193            .relaychain
194            .chain_spec
195            .build_raw(&ns, &scoped_fs)
196            .await?;
197
198        // override wasm if needed
199        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
200            network_spec
201                .relaychain
202                .chain_spec
203                .override_code(&scoped_fs, wasm_override)
204                .await?;
205        }
206
207        let (bootnodes, relaynodes) =
208            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
209
210        // TODO: we want to still supporting spawn a dedicated bootnode??
211        let mut ctx = SpawnNodeCtx {
212            chain_id: &relay_chain_id,
213            parachain_id: None,
214            chain: relay_chain_name.as_str(),
215            role: ZombieRole::Node,
216            ns: &ns,
217            scoped_fs: &scoped_fs,
218            parachain: None,
219            bootnodes_addr: &vec![],
220            wait_ready: false,
221            nodes_by_name: json!({}),
222        };
223
224        let global_files_to_inject = vec![TransferedFile::new(
225            PathBuf::from(format!(
226                "{}/{relay_chain_name}.json",
227                ns.base_dir().to_string_lossy()
228            )),
229            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
230        )];
231
232        let r = Relaychain::new(
233            relay_chain_name.to_string(),
234            relay_chain_id.clone(),
235            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
236                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
237            )?),
238        );
239        let mut network =
240            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
241
242        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
243        let mut node_ws_url: String = "".to_string();
244
245        // Calculate the bootnodes addr from the running nodes
246        let mut bootnodes_addr: Vec<String> = vec![];
247
248        for level in dependency_levels_among(&bootnodes)? {
249            let mut running_nodes_per_level = vec![];
250            for chunk in level.chunks(spawn_concurrency) {
251                let spawning_tasks = chunk
252                    .iter()
253                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
254
255                for node in futures::future::try_join_all(spawning_tasks).await? {
256                    let bootnode_multiaddr = node.multiaddr();
257
258                    bootnodes_addr.push(bootnode_multiaddr.to_string());
259
260                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
261                    if node_ws_url.is_empty() {
262                        node_ws_url.clone_from(&node.ws_uri)
263                    }
264
265                    running_nodes_per_level.push(node);
266                }
267            }
268            info!(
269                "🕰 waiting for level: {:?} to be up...",
270                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
271            );
272
273            // Wait for all nodes in the current level to be up
274            let waiting_tasks = running_nodes_per_level
275                .iter()
276                .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
277
278            let _ = futures::future::try_join_all(waiting_tasks).await?;
279
280            for node in running_nodes_per_level {
281                // Add the node to the  context and `Network` instance
282                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
283                network.add_running_node(node, None);
284            }
285        }
286
287        // Add the bootnodes to the relaychain spec file and ctx
288        network_spec
289            .relaychain
290            .chain_spec
291            .add_bootnodes(&scoped_fs, &bootnodes_addr)
292            .await?;
293
294        ctx.bootnodes_addr = &bootnodes_addr;
295
296        for level in dependency_levels_among(&relaynodes)? {
297            let mut running_nodes_per_level = vec![];
298            for chunk in level.chunks(spawn_concurrency) {
299                let spawning_tasks = chunk
300                    .iter()
301                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
302
303                for node in futures::future::try_join_all(spawning_tasks).await? {
304                    running_nodes_per_level.push(node);
305                }
306            }
307            info!(
308                "🕰 waiting for level: {:?} to be up...",
309                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
310            );
311
312            // Wait for all nodes in the current level to be up
313            let waiting_tasks = running_nodes_per_level
314                .iter()
315                .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
316
317            let _ = futures::future::try_join_all(waiting_tasks).await?;
318
319            for node in running_nodes_per_level {
320                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
321                network.add_running_node(node, None);
322            }
323        }
324
325        // spawn paras
326        for para in network_spec.parachains.iter() {
327            // Create parachain (in the context of the running network)
328            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
329            let parachain_id = parachain.chain_id.clone();
330
331            let (bootnodes, collators) =
332                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
333
334            // Create `ctx` for spawn parachain nodes
335            let mut ctx_para = SpawnNodeCtx {
336                parachain: Some(para),
337                parachain_id: parachain_id.as_deref(),
338                role: if para.is_cumulus_based {
339                    ZombieRole::CumulusCollator
340                } else {
341                    ZombieRole::Collator
342                },
343                bootnodes_addr: &vec![],
344                ..ctx.clone()
345            };
346
347            // Calculate the bootnodes addr from the running nodes
348            let mut bootnodes_addr: Vec<String> = vec![];
349            let mut running_nodes: Vec<NetworkNode> = vec![];
350
351            for level in dependency_levels_among(&bootnodes)? {
352                let mut running_nodes_per_level = vec![];
353                for chunk in level.chunks(spawn_concurrency) {
354                    let spawning_tasks = chunk.iter().map(|node| {
355                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
356                    });
357
358                    for node in futures::future::try_join_all(spawning_tasks).await? {
359                        let bootnode_multiaddr = node.multiaddr();
360
361                        bootnodes_addr.push(bootnode_multiaddr.to_string());
362
363                        running_nodes_per_level.push(node);
364                    }
365                }
366                info!(
367                    "🕰 waiting for level: {:?} to be up...",
368                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
369                );
370
371                // Wait for all nodes in the current level to be up
372                let waiting_tasks = running_nodes_per_level
373                    .iter()
374                    .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
375
376                let _ = futures::future::try_join_all(waiting_tasks).await?;
377
378                for node in running_nodes_per_level {
379                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
380                    running_nodes.push(node);
381                }
382            }
383
384            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
385                para_chain_spec
386                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
387                    .await?;
388            }
389
390            ctx_para.bootnodes_addr = &bootnodes_addr;
391
392            // Spawn the rest of the nodes
393            for level in dependency_levels_among(&collators)? {
394                let mut running_nodes_per_level = vec![];
395                for chunk in level.chunks(spawn_concurrency) {
396                    let spawning_tasks = chunk.iter().map(|node| {
397                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
398                    });
399
400                    for node in futures::future::try_join_all(spawning_tasks).await? {
401                        running_nodes_per_level.push(node);
402                    }
403                }
404                info!(
405                    "🕰 waiting for level: {:?} to be up...",
406                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
407                );
408
409                // Wait for all nodes in the current level to be up
410                let waiting_tasks = running_nodes_per_level
411                    .iter()
412                    .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
413
414                let _ = futures::future::try_join_all(waiting_tasks).await?;
415
416                for node in running_nodes_per_level {
417                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
418                    running_nodes.push(node);
419                }
420            }
421
422            let running_para_id = parachain.para_id;
423            network.add_para(parachain);
424            for node in running_nodes {
425                network.add_running_node(node, Some(running_para_id));
426            }
427        }
428
429        // TODO:
430        // - add-ons (introspector/tracing/etc)
431
432        // verify nodes
433        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
434
435        // Now we need to register the paras with extrinsic from the Vec collected before;
436        for para in para_to_register_with_extrinsic {
437            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
438                id: para.id,
439                // This needs to resolve correctly
440                wasm_path: para
441                    .genesis_wasm
442                    .artifact_path()
443                    .ok_or(OrchestratorError::InvariantError(
444                        "artifact path for wasm must be set at this point",
445                    ))?
446                    .to_path_buf(),
447                state_path: para
448                    .genesis_state
449                    .artifact_path()
450                    .ok_or(OrchestratorError::InvariantError(
451                        "artifact path for state must be set at this point",
452                    ))?
453                    .to_path_buf(),
454                node_ws_url: node_ws_url.clone(),
455                onboard_as_para: para.onboard_as_parachain,
456                seed: None, // TODO: Seed is passed by?
457                finalization: false,
458            };
459
460            Parachain::register(register_para_options, &scoped_fs).await?;
461        }
462
463        // - write zombie.json state file
464        let mut zombie_json = serde_json::to_value(&network)?;
465        zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
466        zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
467
468        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
469            zombie_json["start_time_ts"] =
470                serde_json::value::Value::String(start_time_ts.as_millis().to_string());
471        } else {
472            // Just warn, do not propagate the err (this should not happens)
473            warn!("⚠️ Error getting start_time timestamp");
474        }
475
476        scoped_fs
477            .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
478            .await?;
479        Ok(network)
480    }
481}
482
483// Helpers
484
485// Split the node list depending if it's bootnode or not
486// NOTE: if there isn't a bootnode declared we use the first one
487fn split_nodes_by_bootnodes(
488    nodes: &[NodeSpec],
489    no_default_bootnodes: bool,
490) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
491    // get the bootnodes to spawn first and calculate the bootnode string for use later
492    let mut bootnodes = vec![];
493    let mut other_nodes = vec![];
494    nodes.iter().for_each(|node| {
495        if node.is_bootnode {
496            bootnodes.push(node)
497        } else {
498            other_nodes.push(node)
499        }
500    });
501
502    if bootnodes.is_empty() && !no_default_bootnodes {
503        bootnodes.push(other_nodes.remove(0))
504    }
505
506    (bootnodes, other_nodes)
507}
508
509// Generate a bootnode multiaddress and return as string
510fn generate_bootnode_addr(
511    node: &NetworkNode,
512    ip: &IpAddr,
513    port: u16,
514) -> Result<String, GeneratorError> {
515    generators::generate_node_bootnode_addr(
516        &node.spec.peer_id,
517        ip,
518        port,
519        node.inner.args().as_ref(),
520        &node.spec.p2p_cert_hash,
521    )
522}
523// Validate that the config fulfill all the requirements of the provider
524fn validate_spec_with_provider_capabilities(
525    network_spec: &NetworkSpec,
526    capabilities: &ProviderCapabilities,
527) -> Result<(), anyhow::Error> {
528    let mut errs: Vec<String> = vec![];
529
530    if capabilities.requires_image {
531        // Relaychain
532        if network_spec.relaychain.default_image.is_none() {
533            // we should check if each node have an image
534            let nodes = &network_spec.relaychain.nodes;
535            if nodes.iter().any(|node| node.image.is_none()) {
536                errs.push(String::from(
537                    "Missing image for node, and not default is set at relaychain",
538                ));
539            }
540        };
541
542        // Paras
543        for para in &network_spec.parachains {
544            if para.default_image.is_none() {
545                let nodes = &para.collators;
546                if nodes.iter().any(|node| node.image.is_none()) {
547                    errs.push(format!(
548                        "Missing image for node, and not default is set at parachain {}",
549                        para.id
550                    ));
551                }
552            }
553        }
554    } else {
555        // native
556        // We need to get all the `cmds` and verify if are part of the path
557        let mut cmds: HashSet<&str> = Default::default();
558        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
559            cmds.insert(cmd.as_str());
560        }
561        for node in network_spec.relaychain().nodes.iter() {
562            cmds.insert(node.command());
563        }
564
565        // Paras
566        for para in &network_spec.parachains {
567            if let Some(cmd) = para.default_command.as_ref() {
568                cmds.insert(cmd.as_str());
569            }
570
571            for node in para.collators.iter() {
572                cmds.insert(node.command());
573            }
574        }
575
576        // now check the binaries
577        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
578        trace!("current PATH: {path}");
579        let parts: Vec<_> = path.split(":").collect();
580        for cmd in cmds {
581            let missing = if cmd.contains('/') {
582                trace!("checking {cmd}");
583                if std::fs::metadata(cmd).is_err() {
584                    true
585                } else {
586                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
587                    false
588                }
589            } else {
590                // should be in the PATH
591                !parts.iter().any(|part| {
592                    let path_to = format!("{part}/{cmd}");
593                    trace!("checking {path_to}");
594                    let check_result = std::fs::metadata(&path_to);
595                    trace!("result {:?}", check_result);
596                    if check_result.is_ok() {
597                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
598                        true
599                    } else {
600                        false
601                    }
602                })
603            };
604
605            if missing {
606                errs.push(help_msg(cmd));
607            }
608        }
609    }
610
611    if !errs.is_empty() {
612        let msg = errs.join("\n");
613        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
614    }
615
616    Ok(())
617}
618
619fn help_msg(cmd: &str) -> String {
620    match cmd {
621        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
622            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
623        },
624        "polkadot" => {
625            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
626        },
627        "polkadot-parachain" => {
628            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
629        },
630        _ => {
631            format!("Missing binary {cmd}, please compile it.")
632        },
633    }
634}
635
636/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
637fn spawn_concurrency_from_env() -> Option<usize> {
638    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
639        concurrency.parse::<usize>().ok()
640    } else {
641        None
642    }
643}
644
645fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
646    let desired_spawn_concurrency = match (
647        spawn_concurrency_from_env(),
648        spec.global_settings.spawn_concurrency(),
649    ) {
650        (Some(n), _) => Some(n),
651        (None, Some(n)) => Some(n),
652        _ => None,
653    };
654
655    let (spawn_concurrency, limited_by_tokens) =
656        if let Some(spawn_concurrency) = desired_spawn_concurrency {
657            if spawn_concurrency == 1 {
658                (1, false)
659            } else if has_tokens(&serde_json::to_string(spec)?) {
660                (1, true)
661            } else {
662                (spawn_concurrency, false)
663            }
664        } else {
665            // not set
666            if has_tokens(&serde_json::to_string(spec)?) {
667                (1, true)
668            } else {
669                // use 100 as max concurrency, we can set a max by provider later
670                (100, false)
671            }
672        };
673
674    Ok((spawn_concurrency, limited_by_tokens))
675}
676
677/// Build deterministic dependency **levels** among the given nodes.
678/// - Only dependencies **between nodes in `nodes`** are considered.
679/// - Unknown/out-of-scope references are ignored.
680/// - Self-dependencies are ignored.
681fn dependency_levels_among<'a>(
682    nodes: &'a [&'a NodeSpec],
683) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
684    let by_name = nodes
685        .iter()
686        .map(|n| (n.name.as_str(), *n))
687        .collect::<HashMap<_, _>>();
688
689    let mut graph = HashMap::with_capacity(nodes.len());
690    let mut indegree = HashMap::with_capacity(nodes.len());
691
692    for node in nodes {
693        graph.insert(node.name.as_str(), Vec::new());
694        indegree.insert(node.name.as_str(), 0);
695    }
696
697    // build dependency graph
698    for &node in nodes {
699        if let Ok(args_json) = serde_json::to_string(&node.args) {
700            // collect dependencies
701            let unique_deps = get_tokens_to_replace(&args_json)
702                .into_iter()
703                .filter(|dep| dep != &node.name)
704                .filter_map(|dep| by_name.get(dep.as_str()))
705                .map(|&dep_node| dep_node.name.as_str())
706                .collect::<HashSet<_>>();
707
708            for dep_name in unique_deps {
709                graph
710                    .get_mut(dep_name)
711                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
712                    .push(node);
713                *indegree
714                    .get_mut(node.name.as_str())
715                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
716            }
717        }
718    }
719
720    // find all nodes with no dependencies
721    let mut queue = nodes
722        .iter()
723        .filter(|n| {
724            *indegree
725                .get(n.name.as_str())
726                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
727                == 0
728        })
729        .copied()
730        .collect::<VecDeque<_>>();
731
732    let mut processed_count = 0;
733    let mut levels = Vec::new();
734
735    // Kahn's algorithm
736    while !queue.is_empty() {
737        let level_size = queue.len();
738        let mut current_level = Vec::with_capacity(level_size);
739
740        for _ in 0..level_size {
741            let n = queue
742                .pop_front()
743                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
744            current_level.push(n);
745            processed_count += 1;
746
747            for &neighbour in graph
748                .get(n.name.as_str())
749                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
750            {
751                let neighbour_indegree = indegree
752                    .get_mut(neighbour.name.as_str())
753                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
754                *neighbour_indegree -= 1;
755
756                if *neighbour_indegree == 0 {
757                    queue.push_back(neighbour);
758                }
759            }
760        }
761
762        current_level.sort_by_key(|n| &n.name);
763        levels.push(current_level);
764    }
765
766    // cycles detected, e.g A -> B -> A
767    if processed_count != nodes.len() {
768        return Err(OrchestratorError::InvalidConfig(
769            "Tokens have cyclical dependencies".to_string(),
770        ));
771    }
772
773    Ok(levels)
774}
775
776// TODO: get the fs from `DynNamespace` will make this not needed
777// but the FileSystem trait isn't object-safe so we can't pass around
778// as `dyn FileSystem`. We can refactor or using some `erase` techniques
779// to resolve this and remove this struct
780// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
781// filesystem itself (the trait), so it will return this and we can move this
782// directly to the support crate, it can be useful in the future
783#[derive(Clone, Debug)]
784pub struct ScopedFilesystem<'a, FS: FileSystem> {
785    fs: &'a FS,
786    base_dir: &'a str,
787}
788
789impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
790    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
791        Self { fs, base_dir }
792    }
793
794    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
795        for file in files {
796            let full_remote_path = PathBuf::from(format!(
797                "{}/{}",
798                self.base_dir,
799                file.remote_path.to_string_lossy()
800            ));
801            trace!("coping file: {file}");
802            self.fs
803                .copy(file.local_path.as_path(), full_remote_path)
804                .await?;
805        }
806        Ok(())
807    }
808
809    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
810        let file = file.as_ref();
811
812        let full_path = if file.is_absolute() {
813            file.to_owned()
814        } else {
815            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
816        };
817        let content = self.fs.read_to_string(full_path).await?;
818        Ok(content)
819    }
820
821    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
822        let path = PathBuf::from(format!(
823            "{}/{}",
824            self.base_dir,
825            path.as_ref().to_string_lossy()
826        ));
827        self.fs.create_dir(path).await
828    }
829
830    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
831        let path = PathBuf::from(format!(
832            "{}/{}",
833            self.base_dir,
834            path.as_ref().to_string_lossy()
835        ));
836        self.fs.create_dir_all(path).await
837    }
838
839    async fn write(
840        &self,
841        path: impl AsRef<Path>,
842        contents: impl AsRef<[u8]> + Send,
843    ) -> Result<(), FileSystemError> {
844        let path = path.as_ref();
845
846        let full_path = if path.is_absolute() {
847            path.to_owned()
848        } else {
849            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
850        };
851
852        self.fs.write(full_path, contents).await
853    }
854}
855
856#[derive(Clone, Debug)]
857pub enum ZombieRole {
858    Temp,
859    Node,
860    Bootnode,
861    Collator,
862    CumulusCollator,
863    Companion,
864}
865
866// re-exports
867pub use network::{AddCollatorOptions, AddNodeOptions};
868pub use network_helper::metrics;
869#[cfg(feature = "pjs")]
870pub use pjs_helper::PjsResult;
871
872#[cfg(test)]
873mod tests {
874    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
875    use lazy_static::lazy_static;
876    use tokio::sync::Mutex;
877
878    use super::*;
879
880    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
881    // mutex for test that use env
882    lazy_static! {
883        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
884    }
885
886    fn set_env(concurrency: Option<u32>) {
887        if let Some(value) = concurrency {
888            env::set_var(ENV_KEY, value.to_string());
889        } else {
890            env::remove_var(ENV_KEY);
891        }
892    }
893
894    fn generate(
895        with_image: bool,
896        with_cmd: Option<&'static str>,
897    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
898        NetworkConfigBuilder::new()
899            .with_relaychain(|r| {
900                let mut relay = r
901                    .with_chain("rococo-local")
902                    .with_default_command(with_cmd.unwrap_or("polkadot"));
903                if with_image {
904                    relay = relay.with_default_image("docker.io/parity/polkadot")
905                }
906
907                relay
908                    .with_node(|node| node.with_name("alice"))
909                    .with_node(|node| node.with_name("bob"))
910            })
911            .with_parachain(|p| {
912                p.with_id(2000).cumulus_based(true).with_collator(|n| {
913                    let node = n
914                        .with_name("collator")
915                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
916                    if with_image {
917                        node.with_image("docker.io/paritypr/test-parachain")
918                    } else {
919                        node
920                    }
921                })
922            })
923            .build()
924    }
925
926    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
927        let mut spec = NodeSpec {
928            name: name.to_string(),
929            ..Default::default()
930        };
931        if let Some(dependencies) = dependencies {
932            for node in dependencies {
933                spec.args.push(
934                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
935                        .as_str()
936                        .into(),
937                );
938            }
939        }
940        spec
941    }
942
943    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
944        actual_levels
945            .iter()
946            .zip(expected_levels)
947            .for_each(|(actual_level, expected_level)| {
948                assert_eq!(actual_level.len(), expected_level.len());
949                actual_level
950                    .iter()
951                    .zip(expected_level.iter())
952                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
953            });
954    }
955
956    #[tokio::test]
957    async fn valid_config_with_image() {
958        let network_config = generate(true, None).unwrap();
959        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
960        let caps = ProviderCapabilities {
961            requires_image: true,
962            has_resources: false,
963            prefix_with_full_path: false,
964            use_default_ports_in_cmd: false,
965        };
966
967        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
968        assert!(valid.is_ok())
969    }
970
971    #[tokio::test]
972    async fn invalid_config_without_image() {
973        let network_config = generate(false, None).unwrap();
974        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
975        let caps = ProviderCapabilities {
976            requires_image: true,
977            has_resources: false,
978            prefix_with_full_path: false,
979            use_default_ports_in_cmd: false,
980        };
981
982        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
983        assert!(valid.is_err())
984    }
985
986    #[tokio::test]
987    async fn invalid_config_missing_cmd() {
988        let network_config = generate(false, Some("other")).unwrap();
989        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
990        let caps = ProviderCapabilities {
991            requires_image: false,
992            has_resources: false,
993            prefix_with_full_path: false,
994            use_default_ports_in_cmd: false,
995        };
996
997        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
998        assert!(valid.is_err())
999    }
1000
1001    #[tokio::test]
1002    async fn valid_config_present_cmd() {
1003        let network_config = generate(false, Some("cargo")).unwrap();
1004        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1005        let caps = ProviderCapabilities {
1006            requires_image: false,
1007            has_resources: false,
1008            prefix_with_full_path: false,
1009            use_default_ports_in_cmd: false,
1010        };
1011
1012        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1013        println!("{valid:?}");
1014        assert!(valid.is_ok())
1015    }
1016
1017    #[tokio::test]
1018    async fn default_spawn_concurrency() {
1019        let _g = ENV_MUTEX.lock().await;
1020        set_env(None);
1021        let network_config = generate(false, Some("cargo")).unwrap();
1022        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1023        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1024        assert_eq!(concurrency, 100);
1025    }
1026
1027    #[tokio::test]
1028    async fn set_spawn_concurrency() {
1029        let _g = ENV_MUTEX.lock().await;
1030        set_env(None);
1031
1032        let network_config = generate(false, Some("cargo")).unwrap();
1033        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1034
1035        let global_settings = GlobalSettingsBuilder::new()
1036            .with_spawn_concurrency(4)
1037            .build()
1038            .unwrap();
1039
1040        spec.set_global_settings(global_settings);
1041        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1042        assert_eq!(concurrency, 4);
1043        assert!(!limited);
1044    }
1045
1046    #[tokio::test]
1047    async fn set_spawn_concurrency_but_limited() {
1048        let _g = ENV_MUTEX.lock().await;
1049        set_env(None);
1050
1051        let network_config = generate(false, Some("cargo")).unwrap();
1052        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1053
1054        let global_settings = GlobalSettingsBuilder::new()
1055            .with_spawn_concurrency(4)
1056            .build()
1057            .unwrap();
1058
1059        spec.set_global_settings(global_settings);
1060        let node = spec.relaychain.nodes.first_mut().unwrap();
1061        node.args
1062            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1063        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1064        assert_eq!(concurrency, 1);
1065        assert!(limited);
1066    }
1067
1068    #[tokio::test]
1069    async fn set_spawn_concurrency_from_env() {
1070        let _g = ENV_MUTEX.lock().await;
1071        set_env(Some(10));
1072
1073        let network_config = generate(false, Some("cargo")).unwrap();
1074        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1075        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1076        assert_eq!(concurrency, 10);
1077        assert!(!limited);
1078    }
1079
1080    #[tokio::test]
1081    async fn set_spawn_concurrency_from_env_but_limited() {
1082        let _g = ENV_MUTEX.lock().await;
1083        set_env(Some(12));
1084
1085        let network_config = generate(false, Some("cargo")).unwrap();
1086        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1087        let node = spec.relaychain.nodes.first_mut().unwrap();
1088        node.args
1089            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1090        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1091        assert_eq!(concurrency, 1);
1092        assert!(limited);
1093    }
1094
1095    #[test]
1096    fn dependency_levels_among_should_work() {
1097        // no nodes
1098        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1099
1100        // one node
1101        let alice = get_node_with_dependencies("alice", None);
1102        let nodes = [&alice];
1103
1104        let levels = dependency_levels_among(&nodes).unwrap();
1105        let expected = vec![vec!["alice"]];
1106
1107        verify_levels(levels, expected);
1108
1109        // two independent nodes
1110        let alice = get_node_with_dependencies("alice", None);
1111        let bob = get_node_with_dependencies("bob", None);
1112        let nodes = [&alice, &bob];
1113
1114        let levels = dependency_levels_among(&nodes).unwrap();
1115        let expected = vec![vec!["alice", "bob"]];
1116
1117        verify_levels(levels, expected);
1118
1119        // alice -> bob -> charlie
1120        let alice = get_node_with_dependencies("alice", None);
1121        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1122        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1123        let nodes = [&alice, &bob, &charlie];
1124
1125        let levels = dependency_levels_among(&nodes).unwrap();
1126        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1127
1128        verify_levels(levels, expected);
1129
1130        //         ┌─> bob
1131        // alice ──|
1132        //         └─> charlie
1133        let alice = get_node_with_dependencies("alice", None);
1134        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1135        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1136        let nodes = [&alice, &bob, &charlie];
1137
1138        let levels = dependency_levels_among(&nodes).unwrap();
1139        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1140
1141        verify_levels(levels, expected);
1142
1143        //         ┌─>   bob  ──┐
1144        // alice ──|            ├─> dave
1145        //         └─> charlie  ┘
1146        let alice = get_node_with_dependencies("alice", None);
1147        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1148        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1149        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1150        let nodes = [&alice, &bob, &charlie, &dave];
1151
1152        let levels = dependency_levels_among(&nodes).unwrap();
1153        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1154
1155        verify_levels(levels, expected);
1156    }
1157
1158    #[test]
1159    fn dependency_levels_among_should_detect_cycles() {
1160        let mut alice = get_node_with_dependencies("alice", None);
1161        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1162        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1163
1164        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1165    }
1166}