zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13
14use std::{
15    collections::{HashMap, HashSet, VecDeque},
16    env,
17    net::IpAddr,
18    path::{Path, PathBuf},
19    time::{Duration, SystemTime},
20};
21
22use configuration::{NetworkConfig, RegistrationStrategy};
23use errors::OrchestratorError;
24use generators::errors::GeneratorError;
25use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
26// re-exported
27pub use network_spec::NetworkSpec;
28use network_spec::{node::NodeSpec, parachain::ParachainSpec};
29use provider::{
30    types::{ProviderCapabilities, TransferedFile},
31    DynProvider,
32};
33use serde_json::json;
34use support::{
35    constants::{
36        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
37        THIS_IS_A_BUG,
38    },
39    fs::{FileSystem, FileSystemError},
40    replacer::{get_tokens_to_replace, has_tokens},
41};
42use tokio::time::timeout;
43use tracing::{debug, info, trace, warn};
44
45use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
46pub struct Orchestrator<T>
47where
48    T: FileSystem + Sync + Send,
49{
50    filesystem: T,
51    provider: DynProvider,
52}
53
54impl<T> Orchestrator<T>
55where
56    T: FileSystem + Sync + Send + Clone,
57{
58    pub fn new(filesystem: T, provider: DynProvider) -> Self {
59        Self {
60            filesystem,
61            provider,
62        }
63    }
64
65    pub async fn spawn(
66        &self,
67        network_config: NetworkConfig,
68    ) -> Result<Network<T>, OrchestratorError> {
69        let global_timeout = network_config.global_settings().network_spawn_timeout();
70        let network_spec = NetworkSpec::from_config(&network_config).await?;
71
72        let res = timeout(
73            Duration::from_secs(global_timeout.into()),
74            self.spawn_inner(network_spec),
75        )
76        .await
77        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
78        res?
79    }
80
81    pub async fn spawn_from_spec(
82        &self,
83        network_spec: NetworkSpec,
84    ) -> Result<Network<T>, OrchestratorError> {
85        let global_timeout = network_spec.global_settings.network_spawn_timeout();
86        let res = timeout(
87            Duration::from_secs(global_timeout as u64),
88            self.spawn_inner(network_spec),
89        )
90        .await
91        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
92        res?
93    }
94
95    async fn spawn_inner(
96        &self,
97        mut network_spec: NetworkSpec,
98    ) -> Result<Network<T>, OrchestratorError> {
99        // main driver for spawn the network
100        debug!(network_spec = ?network_spec,"Network spec to spawn");
101
102        // TODO: move to Provider trait
103        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
104            .map_err(|err| {
105                OrchestratorError::InvalidConfigForProvider(
106                    self.provider.name().into(),
107                    err.to_string(),
108                )
109            })?;
110
111        // create namespace
112        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
113            self.provider
114                .create_namespace_with_base_dir(base_dir)
115                .await?
116        } else {
117            self.provider.create_namespace().await?
118        };
119
120        // set the spawn_concurrency
121        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
122
123        let start_time = SystemTime::now();
124        info!("🧰 ns: {}", ns.name());
125        info!("🧰 base_dir: {:?}", ns.base_dir());
126        info!("🕰 start time: {:?}", start_time);
127        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
128
129        network_spec
130            .populate_nodes_available_args(ns.clone())
131            .await?;
132
133        let base_dir = ns.base_dir().to_string_lossy();
134        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
135        // Create chain-spec for relaychain
136        network_spec
137            .relaychain
138            .chain_spec
139            .build(&ns, &scoped_fs)
140            .await?;
141
142        debug!("relaychain spec built!");
143        // Create parachain artifacts (chain-spec, wasm, state)
144        let relay_chain_id = network_spec
145            .relaychain
146            .chain_spec
147            .read_chain_id(&scoped_fs)
148            .await?;
149
150        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
151        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
152        network_spec
153            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
154            .await?;
155
156        // Gather the parachains to register in genesis and the ones to register with extrinsic
157        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
158            Vec<&ParachainSpec>,
159            Vec<&ParachainSpec>,
160        ) = network_spec
161            .parachains
162            .iter()
163            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
164            .partition(|para| {
165                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
166            });
167
168        let mut para_artifacts = vec![];
169        for para in para_to_register_in_genesis {
170            let genesis_config = para.get_genesis_config()?;
171            para_artifacts.push(genesis_config)
172        }
173
174        // Customize relaychain
175        network_spec
176            .relaychain
177            .chain_spec
178            .customize_relay(
179                &network_spec.relaychain,
180                &network_spec.hrmp_channels,
181                para_artifacts,
182                &scoped_fs,
183            )
184            .await?;
185
186        // Build raw version
187        network_spec
188            .relaychain
189            .chain_spec
190            .build_raw(&ns, &scoped_fs)
191            .await?;
192
193        // override wasm if needed
194        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
195            network_spec
196                .relaychain
197                .chain_spec
198                .override_code(&scoped_fs, wasm_override)
199                .await?;
200        }
201
202        // override raw spec if needed
203        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
204            network_spec
205                .relaychain
206                .chain_spec
207                .override_raw_spec(&scoped_fs, raw_spec_override)
208                .await?;
209        }
210
211        let (bootnodes, relaynodes) =
212            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
213
214        // TODO: we want to still supporting spawn a dedicated bootnode??
215        let mut ctx = SpawnNodeCtx {
216            chain_id: &relay_chain_id,
217            parachain_id: None,
218            chain: relay_chain_name.as_str(),
219            role: ZombieRole::Node,
220            ns: &ns,
221            scoped_fs: &scoped_fs,
222            parachain: None,
223            bootnodes_addr: &vec![],
224            wait_ready: false,
225            nodes_by_name: json!({}),
226        };
227
228        let global_files_to_inject = vec![TransferedFile::new(
229            PathBuf::from(format!(
230                "{}/{relay_chain_name}.json",
231                ns.base_dir().to_string_lossy()
232            )),
233            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
234        )];
235
236        let r = Relaychain::new(
237            relay_chain_name.to_string(),
238            relay_chain_id.clone(),
239            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
240                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
241            )?),
242        );
243        let mut network =
244            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
245
246        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
247        let mut node_ws_url: String = "".to_string();
248
249        // Calculate the bootnodes addr from the running nodes
250        let mut bootnodes_addr: Vec<String> = vec![];
251
252        for level in dependency_levels_among(&bootnodes)? {
253            let mut running_nodes_per_level = vec![];
254            for chunk in level.chunks(spawn_concurrency) {
255                let spawning_tasks = chunk
256                    .iter()
257                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
258
259                for node in futures::future::try_join_all(spawning_tasks).await? {
260                    let bootnode_multiaddr = node.multiaddr();
261
262                    bootnodes_addr.push(bootnode_multiaddr.to_string());
263
264                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
265                    if node_ws_url.is_empty() {
266                        node_ws_url.clone_from(&node.ws_uri)
267                    }
268
269                    running_nodes_per_level.push(node);
270                }
271            }
272            info!(
273                "🕰 waiting for level: {:?} to be up...",
274                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
275            );
276
277            // Wait for all nodes in the current level to be up
278            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
279                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
280            });
281
282            let _ = futures::future::try_join_all(waiting_tasks).await?;
283
284            for node in running_nodes_per_level {
285                // Add the node to the  context and `Network` instance
286                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
287                network.add_running_node(node, None).await;
288            }
289        }
290
291        // Add the bootnodes to the relaychain spec file and ctx
292        network_spec
293            .relaychain
294            .chain_spec
295            .add_bootnodes(&scoped_fs, &bootnodes_addr)
296            .await?;
297
298        ctx.bootnodes_addr = &bootnodes_addr;
299
300        for level in dependency_levels_among(&relaynodes)? {
301            let mut running_nodes_per_level = vec![];
302            for chunk in level.chunks(spawn_concurrency) {
303                let spawning_tasks = chunk
304                    .iter()
305                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
306
307                for node in futures::future::try_join_all(spawning_tasks).await? {
308                    running_nodes_per_level.push(node);
309                }
310            }
311            info!(
312                "🕰 waiting for level: {:?} to be up...",
313                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
314            );
315
316            // Wait for all nodes in the current level to be up
317            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
318                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
319            });
320
321            let _ = futures::future::try_join_all(waiting_tasks).await?;
322
323            for node in running_nodes_per_level {
324                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
325                network.add_running_node(node, None).await;
326            }
327        }
328
329        // spawn paras
330        for para in network_spec.parachains.iter() {
331            // Create parachain (in the context of the running network)
332            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
333            let parachain_id = parachain.chain_id.clone();
334
335            let (bootnodes, collators) =
336                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
337
338            // Create `ctx` for spawn parachain nodes
339            let mut ctx_para = SpawnNodeCtx {
340                parachain: Some(para),
341                parachain_id: parachain_id.as_deref(),
342                role: if para.is_cumulus_based {
343                    ZombieRole::CumulusCollator
344                } else {
345                    ZombieRole::Collator
346                },
347                bootnodes_addr: &vec![],
348                ..ctx.clone()
349            };
350
351            // Calculate the bootnodes addr from the running nodes
352            let mut bootnodes_addr: Vec<String> = vec![];
353            let mut running_nodes: Vec<NetworkNode> = vec![];
354
355            for level in dependency_levels_among(&bootnodes)? {
356                let mut running_nodes_per_level = vec![];
357                for chunk in level.chunks(spawn_concurrency) {
358                    let spawning_tasks = chunk.iter().map(|node| {
359                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
360                    });
361
362                    for node in futures::future::try_join_all(spawning_tasks).await? {
363                        let bootnode_multiaddr = node.multiaddr();
364
365                        bootnodes_addr.push(bootnode_multiaddr.to_string());
366
367                        running_nodes_per_level.push(node);
368                    }
369                }
370                info!(
371                    "🕰 waiting for level: {:?} to be up...",
372                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
373                );
374
375                // Wait for all nodes in the current level to be up
376                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
377                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
378                });
379
380                let _ = futures::future::try_join_all(waiting_tasks).await?;
381
382                for node in running_nodes_per_level {
383                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
384                    running_nodes.push(node);
385                }
386            }
387
388            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
389                para_chain_spec
390                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
391                    .await?;
392            }
393
394            ctx_para.bootnodes_addr = &bootnodes_addr;
395
396            // Spawn the rest of the nodes
397            for level in dependency_levels_among(&collators)? {
398                let mut running_nodes_per_level = vec![];
399                for chunk in level.chunks(spawn_concurrency) {
400                    let spawning_tasks = chunk.iter().map(|node| {
401                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
402                    });
403
404                    for node in futures::future::try_join_all(spawning_tasks).await? {
405                        running_nodes_per_level.push(node);
406                    }
407                }
408                info!(
409                    "🕰 waiting for level: {:?} to be up...",
410                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
411                );
412
413                // Wait for all nodes in the current level to be up
414                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
415                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
416                });
417
418                let _ = futures::future::try_join_all(waiting_tasks).await?;
419
420                for node in running_nodes_per_level {
421                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
422                    running_nodes.push(node);
423                }
424            }
425
426            let running_para_id = parachain.para_id;
427            network.add_para(parachain);
428            for node in running_nodes {
429                network.add_running_node(node, Some(running_para_id)).await;
430            }
431        }
432
433        // TODO:
434        // - add-ons (introspector/tracing/etc)
435
436        // verify nodes
437        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
438
439        // Now we need to register the paras with extrinsic from the Vec collected before;
440        for para in para_to_register_with_extrinsic {
441            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
442                id: para.id,
443                // This needs to resolve correctly
444                wasm_path: para
445                    .genesis_wasm
446                    .artifact_path()
447                    .ok_or(OrchestratorError::InvariantError(
448                        "artifact path for wasm must be set at this point",
449                    ))?
450                    .to_path_buf(),
451                state_path: para
452                    .genesis_state
453                    .artifact_path()
454                    .ok_or(OrchestratorError::InvariantError(
455                        "artifact path for state must be set at this point",
456                    ))?
457                    .to_path_buf(),
458                node_ws_url: node_ws_url.clone(),
459                onboard_as_para: para.onboard_as_parachain,
460                seed: None, // TODO: Seed is passed by?
461                finalization: false,
462            };
463
464            Parachain::register(register_para_options, &scoped_fs).await?;
465        }
466
467        // - write zombie.json state file
468        let mut zombie_json = serde_json::to_value(&network)?;
469        zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
470        zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
471
472        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
473            zombie_json["start_time_ts"] =
474                serde_json::value::Value::String(start_time_ts.as_millis().to_string());
475        } else {
476            // Just warn, do not propagate the err (this should not happens)
477            warn!("⚠️ Error getting start_time timestamp");
478        }
479
480        scoped_fs
481            .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
482            .await?;
483
484        if network_spec.global_settings.tear_down_on_failure() {
485            network.spawn_watching_task();
486        }
487
488        Ok(network)
489    }
490}
491
492// Helpers
493
494// Split the node list depending if it's bootnode or not
495// NOTE: if there isn't a bootnode declared we use the first one
496fn split_nodes_by_bootnodes(
497    nodes: &[NodeSpec],
498    no_default_bootnodes: bool,
499) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
500    // get the bootnodes to spawn first and calculate the bootnode string for use later
501    let mut bootnodes = vec![];
502    let mut other_nodes = vec![];
503    nodes.iter().for_each(|node| {
504        if node.is_bootnode {
505            bootnodes.push(node)
506        } else {
507            other_nodes.push(node)
508        }
509    });
510
511    if bootnodes.is_empty() && !no_default_bootnodes {
512        bootnodes.push(other_nodes.remove(0))
513    }
514
515    (bootnodes, other_nodes)
516}
517
518// Generate a bootnode multiaddress and return as string
519fn generate_bootnode_addr(
520    node: &NetworkNode,
521    ip: &IpAddr,
522    port: u16,
523) -> Result<String, GeneratorError> {
524    generators::generate_node_bootnode_addr(
525        &node.spec.peer_id,
526        ip,
527        port,
528        node.inner.args().as_ref(),
529        &node.spec.p2p_cert_hash,
530    )
531}
532// Validate that the config fulfill all the requirements of the provider
533fn validate_spec_with_provider_capabilities(
534    network_spec: &NetworkSpec,
535    capabilities: &ProviderCapabilities,
536) -> Result<(), anyhow::Error> {
537    let mut errs: Vec<String> = vec![];
538
539    if capabilities.requires_image {
540        // Relaychain
541        if network_spec.relaychain.default_image.is_none() {
542            // we should check if each node have an image
543            let nodes = &network_spec.relaychain.nodes;
544            if nodes.iter().any(|node| node.image.is_none()) {
545                errs.push(String::from(
546                    "Missing image for node, and not default is set at relaychain",
547                ));
548            }
549        };
550
551        // Paras
552        for para in &network_spec.parachains {
553            if para.default_image.is_none() {
554                let nodes = &para.collators;
555                if nodes.iter().any(|node| node.image.is_none()) {
556                    errs.push(format!(
557                        "Missing image for node, and not default is set at parachain {}",
558                        para.id
559                    ));
560                }
561            }
562        }
563    } else {
564        // native
565        // We need to get all the `cmds` and verify if are part of the path
566        let mut cmds: HashSet<&str> = Default::default();
567        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
568            cmds.insert(cmd.as_str());
569        }
570        for node in network_spec.relaychain().nodes.iter() {
571            cmds.insert(node.command());
572        }
573
574        // Paras
575        for para in &network_spec.parachains {
576            if let Some(cmd) = para.default_command.as_ref() {
577                cmds.insert(cmd.as_str());
578            }
579
580            for node in para.collators.iter() {
581                cmds.insert(node.command());
582            }
583        }
584
585        // now check the binaries
586        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
587        trace!("current PATH: {path}");
588        let parts: Vec<_> = path.split(":").collect();
589        for cmd in cmds {
590            let missing = if cmd.contains('/') {
591                trace!("checking {cmd}");
592                if std::fs::metadata(cmd).is_err() {
593                    true
594                } else {
595                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
596                    false
597                }
598            } else {
599                // should be in the PATH
600                !parts.iter().any(|part| {
601                    let path_to = format!("{part}/{cmd}");
602                    trace!("checking {path_to}");
603                    let check_result = std::fs::metadata(&path_to);
604                    trace!("result {:?}", check_result);
605                    if check_result.is_ok() {
606                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
607                        true
608                    } else {
609                        false
610                    }
611                })
612            };
613
614            if missing {
615                errs.push(help_msg(cmd));
616            }
617        }
618    }
619
620    if !errs.is_empty() {
621        let msg = errs.join("\n");
622        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
623    }
624
625    Ok(())
626}
627
628fn help_msg(cmd: &str) -> String {
629    match cmd {
630        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
631            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
632        },
633        "polkadot" => {
634            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
635        },
636        "polkadot-parachain" => {
637            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
638        },
639        _ => {
640            format!("Missing binary {cmd}, please compile it.")
641        },
642    }
643}
644
645/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
646fn spawn_concurrency_from_env() -> Option<usize> {
647    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
648        concurrency.parse::<usize>().ok()
649    } else {
650        None
651    }
652}
653
654fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
655    let desired_spawn_concurrency = match (
656        spawn_concurrency_from_env(),
657        spec.global_settings.spawn_concurrency(),
658    ) {
659        (Some(n), _) => Some(n),
660        (None, Some(n)) => Some(n),
661        _ => None,
662    };
663
664    let (spawn_concurrency, limited_by_tokens) =
665        if let Some(spawn_concurrency) = desired_spawn_concurrency {
666            if spawn_concurrency == 1 {
667                (1, false)
668            } else if has_tokens(&serde_json::to_string(spec)?) {
669                (1, true)
670            } else {
671                (spawn_concurrency, false)
672            }
673        } else {
674            // not set
675            if has_tokens(&serde_json::to_string(spec)?) {
676                (1, true)
677            } else {
678                // use 100 as max concurrency, we can set a max by provider later
679                (100, false)
680            }
681        };
682
683    Ok((spawn_concurrency, limited_by_tokens))
684}
685
686/// Build deterministic dependency **levels** among the given nodes.
687/// - Only dependencies **between nodes in `nodes`** are considered.
688/// - Unknown/out-of-scope references are ignored.
689/// - Self-dependencies are ignored.
690fn dependency_levels_among<'a>(
691    nodes: &'a [&'a NodeSpec],
692) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
693    let by_name = nodes
694        .iter()
695        .map(|n| (n.name.as_str(), *n))
696        .collect::<HashMap<_, _>>();
697
698    let mut graph = HashMap::with_capacity(nodes.len());
699    let mut indegree = HashMap::with_capacity(nodes.len());
700
701    for node in nodes {
702        graph.insert(node.name.as_str(), Vec::new());
703        indegree.insert(node.name.as_str(), 0);
704    }
705
706    // build dependency graph
707    for &node in nodes {
708        if let Ok(args_json) = serde_json::to_string(&node.args) {
709            // collect dependencies
710            let unique_deps = get_tokens_to_replace(&args_json)
711                .into_iter()
712                .filter(|dep| dep != &node.name)
713                .filter_map(|dep| by_name.get(dep.as_str()))
714                .map(|&dep_node| dep_node.name.as_str())
715                .collect::<HashSet<_>>();
716
717            for dep_name in unique_deps {
718                graph
719                    .get_mut(dep_name)
720                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
721                    .push(node);
722                *indegree
723                    .get_mut(node.name.as_str())
724                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
725            }
726        }
727    }
728
729    // find all nodes with no dependencies
730    let mut queue = nodes
731        .iter()
732        .filter(|n| {
733            *indegree
734                .get(n.name.as_str())
735                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
736                == 0
737        })
738        .copied()
739        .collect::<VecDeque<_>>();
740
741    let mut processed_count = 0;
742    let mut levels = Vec::new();
743
744    // Kahn's algorithm
745    while !queue.is_empty() {
746        let level_size = queue.len();
747        let mut current_level = Vec::with_capacity(level_size);
748
749        for _ in 0..level_size {
750            let n = queue
751                .pop_front()
752                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
753            current_level.push(n);
754            processed_count += 1;
755
756            for &neighbour in graph
757                .get(n.name.as_str())
758                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
759            {
760                let neighbour_indegree = indegree
761                    .get_mut(neighbour.name.as_str())
762                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
763                *neighbour_indegree -= 1;
764
765                if *neighbour_indegree == 0 {
766                    queue.push_back(neighbour);
767                }
768            }
769        }
770
771        current_level.sort_by_key(|n| &n.name);
772        levels.push(current_level);
773    }
774
775    // cycles detected, e.g A -> B -> A
776    if processed_count != nodes.len() {
777        return Err(OrchestratorError::InvalidConfig(
778            "Tokens have cyclical dependencies".to_string(),
779        ));
780    }
781
782    Ok(levels)
783}
784
785// TODO: get the fs from `DynNamespace` will make this not needed
786// but the FileSystem trait isn't object-safe so we can't pass around
787// as `dyn FileSystem`. We can refactor or using some `erase` techniques
788// to resolve this and remove this struct
789// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
790// filesystem itself (the trait), so it will return this and we can move this
791// directly to the support crate, it can be useful in the future
792#[derive(Clone, Debug)]
793pub struct ScopedFilesystem<'a, FS: FileSystem> {
794    fs: &'a FS,
795    base_dir: &'a str,
796}
797
798impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
799    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
800        Self { fs, base_dir }
801    }
802
803    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
804        for file in files {
805            let full_remote_path = PathBuf::from(format!(
806                "{}/{}",
807                self.base_dir,
808                file.remote_path.to_string_lossy()
809            ));
810            trace!("coping file: {file}");
811            self.fs
812                .copy(file.local_path.as_path(), full_remote_path)
813                .await?;
814        }
815        Ok(())
816    }
817
818    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
819        let file = file.as_ref();
820
821        let full_path = if file.is_absolute() {
822            file.to_owned()
823        } else {
824            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
825        };
826        let content = self.fs.read_to_string(full_path).await?;
827        Ok(content)
828    }
829
830    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
831        let path = PathBuf::from(format!(
832            "{}/{}",
833            self.base_dir,
834            path.as_ref().to_string_lossy()
835        ));
836        self.fs.create_dir(path).await
837    }
838
839    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
840        let path = PathBuf::from(format!(
841            "{}/{}",
842            self.base_dir,
843            path.as_ref().to_string_lossy()
844        ));
845        self.fs.create_dir_all(path).await
846    }
847
848    async fn write(
849        &self,
850        path: impl AsRef<Path>,
851        contents: impl AsRef<[u8]> + Send,
852    ) -> Result<(), FileSystemError> {
853        let path = path.as_ref();
854
855        let full_path = if path.is_absolute() {
856            path.to_owned()
857        } else {
858            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
859        };
860
861        self.fs.write(full_path, contents).await
862    }
863}
864
865#[derive(Clone, Debug)]
866pub enum ZombieRole {
867    Temp,
868    Node,
869    Bootnode,
870    Collator,
871    CumulusCollator,
872    Companion,
873}
874
875// re-exports
876pub use network::{AddCollatorOptions, AddNodeOptions};
877pub use network_helper::metrics;
878
879#[cfg(test)]
880mod tests {
881    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
882    use lazy_static::lazy_static;
883    use tokio::sync::Mutex;
884
885    use super::*;
886
887    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
888    // mutex for test that use env
889    lazy_static! {
890        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
891    }
892
893    fn set_env(concurrency: Option<u32>) {
894        if let Some(value) = concurrency {
895            env::set_var(ENV_KEY, value.to_string());
896        } else {
897            env::remove_var(ENV_KEY);
898        }
899    }
900
901    fn generate(
902        with_image: bool,
903        with_cmd: Option<&'static str>,
904    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
905        NetworkConfigBuilder::new()
906            .with_relaychain(|r| {
907                let mut relay = r
908                    .with_chain("rococo-local")
909                    .with_default_command(with_cmd.unwrap_or("polkadot"));
910                if with_image {
911                    relay = relay.with_default_image("docker.io/parity/polkadot")
912                }
913
914                relay
915                    .with_validator(|node| node.with_name("alice"))
916                    .with_validator(|node| node.with_name("bob"))
917            })
918            .with_parachain(|p| {
919                p.with_id(2000).cumulus_based(true).with_collator(|n| {
920                    let node = n
921                        .with_name("collator")
922                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
923                    if with_image {
924                        node.with_image("docker.io/paritypr/test-parachain")
925                    } else {
926                        node
927                    }
928                })
929            })
930            .build()
931    }
932
933    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
934        let mut spec = NodeSpec {
935            name: name.to_string(),
936            ..Default::default()
937        };
938        if let Some(dependencies) = dependencies {
939            for node in dependencies {
940                spec.args.push(
941                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
942                        .as_str()
943                        .into(),
944                );
945            }
946        }
947        spec
948    }
949
950    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
951        actual_levels
952            .iter()
953            .zip(expected_levels)
954            .for_each(|(actual_level, expected_level)| {
955                assert_eq!(actual_level.len(), expected_level.len());
956                actual_level
957                    .iter()
958                    .zip(expected_level.iter())
959                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
960            });
961    }
962
963    #[tokio::test]
964    async fn valid_config_with_image() {
965        let network_config = generate(true, None).unwrap();
966        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
967        let caps = ProviderCapabilities {
968            requires_image: true,
969            has_resources: false,
970            prefix_with_full_path: false,
971            use_default_ports_in_cmd: false,
972        };
973
974        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
975        assert!(valid.is_ok())
976    }
977
978    #[tokio::test]
979    async fn invalid_config_without_image() {
980        let network_config = generate(false, None).unwrap();
981        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
982        let caps = ProviderCapabilities {
983            requires_image: true,
984            has_resources: false,
985            prefix_with_full_path: false,
986            use_default_ports_in_cmd: false,
987        };
988
989        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
990        assert!(valid.is_err())
991    }
992
993    #[tokio::test]
994    async fn invalid_config_missing_cmd() {
995        let network_config = generate(false, Some("other")).unwrap();
996        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
997        let caps = ProviderCapabilities {
998            requires_image: false,
999            has_resources: false,
1000            prefix_with_full_path: false,
1001            use_default_ports_in_cmd: false,
1002        };
1003
1004        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1005        assert!(valid.is_err())
1006    }
1007
1008    #[tokio::test]
1009    async fn valid_config_present_cmd() {
1010        let network_config = generate(false, Some("cargo")).unwrap();
1011        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1012        let caps = ProviderCapabilities {
1013            requires_image: false,
1014            has_resources: false,
1015            prefix_with_full_path: false,
1016            use_default_ports_in_cmd: false,
1017        };
1018
1019        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1020        println!("{valid:?}");
1021        assert!(valid.is_ok())
1022    }
1023
1024    #[tokio::test]
1025    async fn default_spawn_concurrency() {
1026        let _g = ENV_MUTEX.lock().await;
1027        set_env(None);
1028        let network_config = generate(false, Some("cargo")).unwrap();
1029        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1030        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1031        assert_eq!(concurrency, 100);
1032    }
1033
1034    #[tokio::test]
1035    async fn set_spawn_concurrency() {
1036        let _g = ENV_MUTEX.lock().await;
1037        set_env(None);
1038
1039        let network_config = generate(false, Some("cargo")).unwrap();
1040        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1041
1042        let global_settings = GlobalSettingsBuilder::new()
1043            .with_spawn_concurrency(4)
1044            .build()
1045            .unwrap();
1046
1047        spec.set_global_settings(global_settings);
1048        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1049        assert_eq!(concurrency, 4);
1050        assert!(!limited);
1051    }
1052
1053    #[tokio::test]
1054    async fn set_spawn_concurrency_but_limited() {
1055        let _g = ENV_MUTEX.lock().await;
1056        set_env(None);
1057
1058        let network_config = generate(false, Some("cargo")).unwrap();
1059        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1060
1061        let global_settings = GlobalSettingsBuilder::new()
1062            .with_spawn_concurrency(4)
1063            .build()
1064            .unwrap();
1065
1066        spec.set_global_settings(global_settings);
1067        let node = spec.relaychain.nodes.first_mut().unwrap();
1068        node.args
1069            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1070        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1071        assert_eq!(concurrency, 1);
1072        assert!(limited);
1073    }
1074
1075    #[tokio::test]
1076    async fn set_spawn_concurrency_from_env() {
1077        let _g = ENV_MUTEX.lock().await;
1078        set_env(Some(10));
1079
1080        let network_config = generate(false, Some("cargo")).unwrap();
1081        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1082        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1083        assert_eq!(concurrency, 10);
1084        assert!(!limited);
1085    }
1086
1087    #[tokio::test]
1088    async fn set_spawn_concurrency_from_env_but_limited() {
1089        let _g = ENV_MUTEX.lock().await;
1090        set_env(Some(12));
1091
1092        let network_config = generate(false, Some("cargo")).unwrap();
1093        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1094        let node = spec.relaychain.nodes.first_mut().unwrap();
1095        node.args
1096            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1097        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1098        assert_eq!(concurrency, 1);
1099        assert!(limited);
1100    }
1101
1102    #[test]
1103    fn dependency_levels_among_should_work() {
1104        // no nodes
1105        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1106
1107        // one node
1108        let alice = get_node_with_dependencies("alice", None);
1109        let nodes = [&alice];
1110
1111        let levels = dependency_levels_among(&nodes).unwrap();
1112        let expected = vec![vec!["alice"]];
1113
1114        verify_levels(levels, expected);
1115
1116        // two independent nodes
1117        let alice = get_node_with_dependencies("alice", None);
1118        let bob = get_node_with_dependencies("bob", None);
1119        let nodes = [&alice, &bob];
1120
1121        let levels = dependency_levels_among(&nodes).unwrap();
1122        let expected = vec![vec!["alice", "bob"]];
1123
1124        verify_levels(levels, expected);
1125
1126        // alice -> bob -> charlie
1127        let alice = get_node_with_dependencies("alice", None);
1128        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1129        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1130        let nodes = [&alice, &bob, &charlie];
1131
1132        let levels = dependency_levels_among(&nodes).unwrap();
1133        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1134
1135        verify_levels(levels, expected);
1136
1137        //         ┌─> bob
1138        // alice ──|
1139        //         └─> charlie
1140        let alice = get_node_with_dependencies("alice", None);
1141        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1142        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1143        let nodes = [&alice, &bob, &charlie];
1144
1145        let levels = dependency_levels_among(&nodes).unwrap();
1146        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1147
1148        verify_levels(levels, expected);
1149
1150        //         ┌─>   bob  ──┐
1151        // alice ──|            ├─> dave
1152        //         └─> charlie  ┘
1153        let alice = get_node_with_dependencies("alice", None);
1154        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1155        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1156        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1157        let nodes = [&alice, &bob, &charlie, &dave];
1158
1159        let levels = dependency_levels_among(&nodes).unwrap();
1160        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1161
1162        verify_levels(levels, expected);
1163    }
1164
1165    #[test]
1166    fn dependency_levels_among_should_detect_cycles() {
1167        let mut alice = get_node_with_dependencies("alice", None);
1168        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1169        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1170
1171        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1172    }
1173}