zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13mod utils;
14
15use std::{
16    collections::{HashMap, HashSet, VecDeque},
17    env,
18    net::IpAddr,
19    path::{Path, PathBuf},
20    time::{Duration, SystemTime},
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27// re-exported
28pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31    types::{ProviderCapabilities, TransferedFile},
32    DynNamespace, DynProvider,
33};
34use serde_json::json;
35use support::{
36    constants::{
37        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
38        THIS_IS_A_BUG,
39    },
40    fs::{FileSystem, FileSystemError},
41    replacer::{get_tokens_to_replace, has_tokens},
42};
43use tokio::time::timeout;
44use tracing::{debug, info, trace, warn};
45
46use crate::{
47    network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
48    shared::types::RegisterParachainOptions,
49    spawner::SpawnNodeCtx,
50};
51pub struct Orchestrator<T>
52where
53    T: FileSystem + Sync + Send,
54{
55    filesystem: T,
56    provider: DynProvider,
57}
58
59impl<T> Orchestrator<T>
60where
61    T: FileSystem + Sync + Send + Clone,
62{
63    pub fn new(filesystem: T, provider: DynProvider) -> Self {
64        Self {
65            filesystem,
66            provider,
67        }
68    }
69
70    pub async fn spawn(
71        &self,
72        network_config: NetworkConfig,
73    ) -> Result<Network<T>, OrchestratorError> {
74        let global_timeout = network_config.global_settings().network_spawn_timeout();
75        let network_spec = NetworkSpec::from_config(&network_config).await?;
76
77        let res = timeout(
78            Duration::from_secs(global_timeout.into()),
79            self.spawn_inner(network_spec),
80        )
81        .await
82        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
83        res?
84    }
85
86    pub async fn spawn_from_spec(
87        &self,
88        network_spec: NetworkSpec,
89    ) -> Result<Network<T>, OrchestratorError> {
90        let global_timeout = network_spec.global_settings.network_spawn_timeout();
91        let res = timeout(
92            Duration::from_secs(global_timeout as u64),
93            self.spawn_inner(network_spec),
94        )
95        .await
96        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
97        res?
98    }
99
100    pub async fn attach_to_live(
101        &self,
102        zombie_json_path: &Path,
103    ) -> Result<Network<T>, OrchestratorError> {
104        info!("attaching to live network...");
105        info!("reading zombie.json from {:?}", zombie_json_path);
106
107        let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
108        let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
109
110        info!("recreating namespace...");
111        let ns: DynNamespace = self
112            .provider
113            .create_namespace_from_json(&zombie_json)
114            .await?;
115
116        info!("recreating relaychain...");
117        let (relay, initial_spec) =
118            recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
119        let relay_nodes = relay.nodes.clone();
120
121        let mut network =
122            Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
123
124        for node in relay_nodes {
125            network.insert_node(node);
126        }
127
128        info!("recreating parachains...");
129        let parachains_map =
130            recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
131        let para_nodes = parachains_map
132            .values()
133            .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
134            .collect::<Vec<NetworkNode>>();
135
136        network.set_parachains(parachains_map);
137        for node in para_nodes {
138            network.insert_node(node);
139        }
140
141        Ok(network)
142    }
143
144    async fn spawn_inner(
145        &self,
146        mut network_spec: NetworkSpec,
147    ) -> Result<Network<T>, OrchestratorError> {
148        // main driver for spawn the network
149        debug!(network_spec = ?network_spec,"Network spec to spawn");
150
151        // TODO: move to Provider trait
152        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
153            .map_err(|err| {
154                OrchestratorError::InvalidConfigForProvider(
155                    self.provider.name().into(),
156                    err.to_string(),
157                )
158            })?;
159
160        // create namespace
161        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
162            self.provider
163                .create_namespace_with_base_dir(base_dir)
164                .await?
165        } else {
166            self.provider.create_namespace().await?
167        };
168
169        // set the spawn_concurrency
170        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
171
172        let start_time = SystemTime::now();
173        info!("🧰 ns: {}", ns.name());
174        info!("🧰 base_dir: {:?}", ns.base_dir());
175        info!("🕰 start time: {:?}", start_time);
176        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
177
178        network_spec
179            .populate_nodes_available_args(ns.clone())
180            .await?;
181
182        let base_dir = ns.base_dir().to_string_lossy();
183        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
184        // Create chain-spec for relaychain
185        network_spec
186            .relaychain
187            .chain_spec
188            .build(&ns, &scoped_fs)
189            .await?;
190
191        debug!("relaychain spec built!");
192        // Create parachain artifacts (chain-spec, wasm, state)
193        let relay_chain_id = network_spec
194            .relaychain
195            .chain_spec
196            .read_chain_id(&scoped_fs)
197            .await?;
198
199        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
200        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
201        network_spec
202            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
203            .await?;
204
205        // Gather the parachains to register in genesis and the ones to register with extrinsic
206        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
207            Vec<&ParachainSpec>,
208            Vec<&ParachainSpec>,
209        ) = network_spec
210            .parachains
211            .iter()
212            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
213            .partition(|para| {
214                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
215            });
216
217        let mut para_artifacts = vec![];
218        for para in para_to_register_in_genesis {
219            let genesis_config = para.get_genesis_config()?;
220            para_artifacts.push(genesis_config)
221        }
222
223        // Customize relaychain
224        network_spec
225            .relaychain
226            .chain_spec
227            .customize_relay(
228                &network_spec.relaychain,
229                &network_spec.hrmp_channels,
230                para_artifacts,
231                &scoped_fs,
232            )
233            .await?;
234
235        // Build raw version
236        network_spec
237            .relaychain
238            .chain_spec
239            .build_raw(&ns, &scoped_fs, None)
240            .await?;
241
242        // override wasm if needed
243        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
244            network_spec
245                .relaychain
246                .chain_spec
247                .override_code(&scoped_fs, wasm_override)
248                .await?;
249        }
250
251        // override raw spec if needed
252        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
253            network_spec
254                .relaychain
255                .chain_spec
256                .override_raw_spec(&scoped_fs, raw_spec_override)
257                .await?;
258        }
259
260        let (bootnodes, relaynodes) =
261            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
262
263        // TODO: we want to still supporting spawn a dedicated bootnode??
264        let mut ctx = SpawnNodeCtx {
265            chain_id: &relay_chain_id,
266            parachain_id: None,
267            chain: relay_chain_name.as_str(),
268            role: ZombieRole::Node,
269            ns: &ns,
270            scoped_fs: &scoped_fs,
271            parachain: None,
272            bootnodes_addr: &vec![],
273            wait_ready: false,
274            nodes_by_name: json!({}),
275            global_settings: &network_spec.global_settings,
276        };
277
278        let global_files_to_inject = vec![TransferedFile::new(
279            PathBuf::from(format!(
280                "{}/{relay_chain_name}.json",
281                ns.base_dir().to_string_lossy()
282            )),
283            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
284        )];
285
286        let r = Relaychain::new(
287            relay_chain_name.to_string(),
288            relay_chain_id.clone(),
289            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
290                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
291            )?),
292        );
293        let mut network =
294            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
295
296        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
297        let mut node_ws_url: String = "".to_string();
298
299        // Calculate the bootnodes addr from the running nodes
300        let mut bootnodes_addr: Vec<String> = vec![];
301
302        for level in dependency_levels_among(&bootnodes)? {
303            let mut running_nodes_per_level = vec![];
304            for chunk in level.chunks(spawn_concurrency) {
305                let spawning_tasks = chunk
306                    .iter()
307                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
308
309                for node in futures::future::try_join_all(spawning_tasks).await? {
310                    let bootnode_multiaddr = node.multiaddr();
311
312                    bootnodes_addr.push(bootnode_multiaddr.to_string());
313
314                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
315                    if node_ws_url.is_empty() {
316                        node_ws_url.clone_from(&node.ws_uri)
317                    }
318
319                    running_nodes_per_level.push(node);
320                }
321            }
322            info!(
323                "🕰 waiting for level: {:?} to be up...",
324                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
325            );
326
327            // Wait for all nodes in the current level to be up
328            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
329                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
330            });
331
332            let _ = futures::future::try_join_all(waiting_tasks).await?;
333
334            for node in running_nodes_per_level {
335                // Add the node to the  context and `Network` instance
336                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
337                network.add_running_node(node, None).await;
338            }
339        }
340
341        // Add the bootnodes to the relaychain spec file and ctx
342        network_spec
343            .relaychain
344            .chain_spec
345            .add_bootnodes(&scoped_fs, &bootnodes_addr)
346            .await?;
347
348        ctx.bootnodes_addr = &bootnodes_addr;
349
350        for level in dependency_levels_among(&relaynodes)? {
351            let mut running_nodes_per_level = vec![];
352            for chunk in level.chunks(spawn_concurrency) {
353                let spawning_tasks = chunk
354                    .iter()
355                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
356
357                for node in futures::future::try_join_all(spawning_tasks).await? {
358                    running_nodes_per_level.push(node);
359                }
360            }
361            info!(
362                "🕰 waiting for level: {:?} to be up...",
363                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
364            );
365
366            // Wait for all nodes in the current level to be up
367            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
368                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
369            });
370
371            let _ = futures::future::try_join_all(waiting_tasks).await?;
372
373            for node in running_nodes_per_level {
374                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
375                network.add_running_node(node, None).await;
376            }
377        }
378
379        // spawn paras
380        for para in network_spec.parachains.iter() {
381            // Create parachain (in the context of the running network)
382            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
383            let parachain_id = parachain.chain_id.clone();
384
385            let (bootnodes, collators) =
386                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
387
388            // Create `ctx` for spawn parachain nodes
389            let mut ctx_para = SpawnNodeCtx {
390                parachain: Some(para),
391                parachain_id: parachain_id.as_deref(),
392                role: if para.is_cumulus_based {
393                    ZombieRole::CumulusCollator
394                } else {
395                    ZombieRole::Collator
396                },
397                bootnodes_addr: &vec![],
398                ..ctx.clone()
399            };
400
401            // Calculate the bootnodes addr from the running nodes
402            let mut bootnodes_addr: Vec<String> = vec![];
403            let mut running_nodes: Vec<NetworkNode> = vec![];
404
405            for level in dependency_levels_among(&bootnodes)? {
406                let mut running_nodes_per_level = vec![];
407                for chunk in level.chunks(spawn_concurrency) {
408                    let spawning_tasks = chunk.iter().map(|node| {
409                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
410                    });
411
412                    for node in futures::future::try_join_all(spawning_tasks).await? {
413                        let bootnode_multiaddr = node.multiaddr();
414
415                        bootnodes_addr.push(bootnode_multiaddr.to_string());
416
417                        running_nodes_per_level.push(node);
418                    }
419                }
420                info!(
421                    "🕰 waiting for level: {:?} to be up...",
422                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
423                );
424
425                // Wait for all nodes in the current level to be up
426                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
427                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
428                });
429
430                let _ = futures::future::try_join_all(waiting_tasks).await?;
431
432                for node in running_nodes_per_level {
433                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
434                    running_nodes.push(node);
435                }
436            }
437
438            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
439                para_chain_spec
440                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
441                    .await?;
442            }
443
444            ctx_para.bootnodes_addr = &bootnodes_addr;
445
446            // Spawn the rest of the nodes
447            for level in dependency_levels_among(&collators)? {
448                let mut running_nodes_per_level = vec![];
449                for chunk in level.chunks(spawn_concurrency) {
450                    let spawning_tasks = chunk.iter().map(|node| {
451                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
452                    });
453
454                    for node in futures::future::try_join_all(spawning_tasks).await? {
455                        running_nodes_per_level.push(node);
456                    }
457                }
458                info!(
459                    "🕰 waiting for level: {:?} to be up...",
460                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
461                );
462
463                // Wait for all nodes in the current level to be up
464                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
465                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
466                });
467
468                let _ = futures::future::try_join_all(waiting_tasks).await?;
469
470                for node in running_nodes_per_level {
471                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
472                    running_nodes.push(node);
473                }
474            }
475
476            let running_para_id = parachain.para_id;
477            network.add_para(parachain);
478            for node in running_nodes {
479                network.add_running_node(node, Some(running_para_id)).await;
480            }
481        }
482
483        // TODO:
484        // - add-ons (introspector/tracing/etc)
485
486        // verify nodes
487        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
488
489        // Now we need to register the paras with extrinsic from the Vec collected before;
490        for para in para_to_register_with_extrinsic {
491            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
492                id: para.id,
493                // This needs to resolve correctly
494                wasm_path: para
495                    .genesis_wasm
496                    .artifact_path()
497                    .ok_or(OrchestratorError::InvariantError(
498                        "artifact path for wasm must be set at this point",
499                    ))?
500                    .to_path_buf(),
501                state_path: para
502                    .genesis_state
503                    .artifact_path()
504                    .ok_or(OrchestratorError::InvariantError(
505                        "artifact path for state must be set at this point",
506                    ))?
507                    .to_path_buf(),
508                node_ws_url: node_ws_url.clone(),
509                onboard_as_para: para.onboard_as_parachain,
510                seed: None, // TODO: Seed is passed by?
511                finalization: false,
512            };
513
514            Parachain::register(register_para_options, &scoped_fs).await?;
515        }
516
517        // - write zombie.json state file
518        let mut zombie_json = serde_json::to_value(&network)?;
519        zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
520        zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
521
522        if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
523            zombie_json["start_time_ts"] =
524                serde_json::value::Value::String(start_time_ts.as_millis().to_string());
525        } else {
526            // Just warn, do not propagate the err (this should not happens)
527            warn!("⚠️ Error getting start_time timestamp");
528        }
529
530        scoped_fs
531            .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
532            .await?;
533
534        if network_spec.global_settings.tear_down_on_failure() {
535            network.spawn_watching_task();
536        }
537
538        Ok(network)
539    }
540}
541
542// Helpers
543
544async fn recreate_network_nodes_from_json(
545    nodes_json: &serde_json::Value,
546    ns: DynNamespace,
547    provider_name: &str,
548) -> Result<Vec<NetworkNode>, OrchestratorError> {
549    let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
550
551    let mut nodes = Vec::with_capacity(raw_nodes.len());
552    for raw in raw_nodes {
553        // validate provider tag
554        let provider_tag = raw
555            .inner
556            .get("provider_tag")
557            .and_then(|v| v.as_str())
558            .ok_or_else(|| {
559                OrchestratorError::InvalidConfig("Missing `provider_tag` in inner node JSON".into())
560            })?;
561
562        if provider_tag != provider_name {
563            return Err(OrchestratorError::InvalidConfigForProvider(
564                provider_name.to_string(),
565                provider_tag.to_string(),
566            ));
567        }
568        let inner = ns.spawn_node_from_json(&raw.inner).await?;
569        let relay_node = NetworkNode::new(
570            raw.name,
571            raw.ws_uri,
572            raw.prometheus_uri,
573            raw.multiaddr,
574            raw.spec,
575            inner,
576        );
577        nodes.push(relay_node);
578    }
579
580    Ok(nodes)
581}
582
583async fn recreate_relaychain_from_json(
584    zombie_json: &serde_json::Value,
585    ns: DynNamespace,
586    provider_name: &str,
587) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
588    let relay_json = zombie_json
589        .get("relay")
590        .ok_or(OrchestratorError::InvalidConfig(
591            "Missing `relay` field in zombie.json".into(),
592        ))?
593        .clone();
594
595    let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
596
597    let initial_spec: NetworkSpec = serde_json::from_value(
598        zombie_json
599            .get("initial_spec")
600            .ok_or(OrchestratorError::InvalidConfig(
601                "Missing `initial_spec` field in zombie.json".into(),
602            ))?
603            .clone(),
604    )?;
605
606    // Populate relay nodes
607    let nodes =
608        recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
609    relay_raw.inner.nodes = nodes;
610
611    Ok((relay_raw.inner, initial_spec))
612}
613
614async fn recreate_parachains_from_json(
615    zombie_json: &serde_json::Value,
616    ns: DynNamespace,
617    provider_name: &str,
618) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
619    let paras_json = zombie_json
620        .get("parachains")
621        .ok_or(OrchestratorError::InvalidConfig(
622            "Missing `parachains` field in zombie.json".into(),
623        ))?
624        .clone();
625
626    let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
627
628    let mut parachains_map = HashMap::new();
629
630    for (id, parachain_entries) in raw_paras {
631        let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
632
633        for raw_para in parachain_entries {
634            let mut para = raw_para.inner;
635            para.collators =
636                recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
637                    .await?;
638            parsed_vec.push(para);
639        }
640
641        parachains_map.insert(id, parsed_vec);
642    }
643
644    Ok(parachains_map)
645}
646
647// Split the node list depending if it's bootnode or not
648// NOTE: if there isn't a bootnode declared we use the first one
649fn split_nodes_by_bootnodes(
650    nodes: &[NodeSpec],
651    no_default_bootnodes: bool,
652) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
653    // get the bootnodes to spawn first and calculate the bootnode string for use later
654    let mut bootnodes = vec![];
655    let mut other_nodes = vec![];
656    nodes.iter().for_each(|node| {
657        if node.is_bootnode {
658            bootnodes.push(node)
659        } else {
660            other_nodes.push(node)
661        }
662    });
663
664    if bootnodes.is_empty() && !no_default_bootnodes {
665        bootnodes.push(other_nodes.remove(0))
666    }
667
668    (bootnodes, other_nodes)
669}
670
671// Generate a bootnode multiaddress and return as string
672fn generate_bootnode_addr(
673    node: &NetworkNode,
674    ip: &IpAddr,
675    port: u16,
676) -> Result<String, GeneratorError> {
677    generators::generate_node_bootnode_addr(
678        &node.spec.peer_id,
679        ip,
680        port,
681        node.inner.args().as_ref(),
682        &node.spec.p2p_cert_hash,
683    )
684}
685// Validate that the config fulfill all the requirements of the provider
686fn validate_spec_with_provider_capabilities(
687    network_spec: &NetworkSpec,
688    capabilities: &ProviderCapabilities,
689) -> Result<(), anyhow::Error> {
690    let mut errs: Vec<String> = vec![];
691
692    if capabilities.requires_image {
693        // Relaychain
694        if network_spec.relaychain.default_image.is_none() {
695            // we should check if each node have an image
696            let nodes = &network_spec.relaychain.nodes;
697            if nodes.iter().any(|node| node.image.is_none()) {
698                errs.push(String::from(
699                    "Missing image for node, and not default is set at relaychain",
700                ));
701            }
702        };
703
704        // Paras
705        for para in &network_spec.parachains {
706            if para.default_image.is_none() {
707                let nodes = &para.collators;
708                if nodes.iter().any(|node| node.image.is_none()) {
709                    errs.push(format!(
710                        "Missing image for node, and not default is set at parachain {}",
711                        para.id
712                    ));
713                }
714            }
715        }
716    } else {
717        // native
718        // We need to get all the `cmds` and verify if are part of the path
719        let mut cmds: HashSet<&str> = Default::default();
720        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
721            cmds.insert(cmd.as_str());
722        }
723        for node in network_spec.relaychain().nodes.iter() {
724            cmds.insert(node.command());
725        }
726
727        // Paras
728        for para in &network_spec.parachains {
729            if let Some(cmd) = para.default_command.as_ref() {
730                cmds.insert(cmd.as_str());
731            }
732
733            for node in para.collators.iter() {
734                cmds.insert(node.command());
735            }
736        }
737
738        // now check the binaries
739        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
740        trace!("current PATH: {path}");
741        let parts: Vec<_> = path.split(":").collect();
742        for cmd in cmds {
743            let missing = if cmd.contains('/') {
744                trace!("checking {cmd}");
745                if std::fs::metadata(cmd).is_err() {
746                    true
747                } else {
748                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
749                    false
750                }
751            } else {
752                // should be in the PATH
753                !parts.iter().any(|part| {
754                    let path_to = format!("{part}/{cmd}");
755                    trace!("checking {path_to}");
756                    let check_result = std::fs::metadata(&path_to);
757                    trace!("result {:?}", check_result);
758                    if check_result.is_ok() {
759                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
760                        true
761                    } else {
762                        false
763                    }
764                })
765            };
766
767            if missing {
768                errs.push(help_msg(cmd));
769            }
770        }
771    }
772
773    if !errs.is_empty() {
774        let msg = errs.join("\n");
775        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
776    }
777
778    Ok(())
779}
780
781fn help_msg(cmd: &str) -> String {
782    match cmd {
783        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
784            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
785        },
786        "polkadot" => {
787            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
788        },
789        "polkadot-parachain" => {
790            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
791        },
792        _ => {
793            format!("Missing binary {cmd}, please compile it.")
794        },
795    }
796}
797
798/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
799fn spawn_concurrency_from_env() -> Option<usize> {
800    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
801        concurrency.parse::<usize>().ok()
802    } else {
803        None
804    }
805}
806
807fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
808    let desired_spawn_concurrency = match (
809        spawn_concurrency_from_env(),
810        spec.global_settings.spawn_concurrency(),
811    ) {
812        (Some(n), _) => Some(n),
813        (None, Some(n)) => Some(n),
814        _ => None,
815    };
816
817    let (spawn_concurrency, limited_by_tokens) =
818        if let Some(spawn_concurrency) = desired_spawn_concurrency {
819            if spawn_concurrency == 1 {
820                (1, false)
821            } else if has_tokens(&serde_json::to_string(spec)?) {
822                (1, true)
823            } else {
824                (spawn_concurrency, false)
825            }
826        } else {
827            // not set
828            if has_tokens(&serde_json::to_string(spec)?) {
829                (1, true)
830            } else {
831                // use 100 as max concurrency, we can set a max by provider later
832                (100, false)
833            }
834        };
835
836    Ok((spawn_concurrency, limited_by_tokens))
837}
838
839/// Build deterministic dependency **levels** among the given nodes.
840/// - Only dependencies **between nodes in `nodes`** are considered.
841/// - Unknown/out-of-scope references are ignored.
842/// - Self-dependencies are ignored.
843fn dependency_levels_among<'a>(
844    nodes: &'a [&'a NodeSpec],
845) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
846    let by_name = nodes
847        .iter()
848        .map(|n| (n.name.as_str(), *n))
849        .collect::<HashMap<_, _>>();
850
851    let mut graph = HashMap::with_capacity(nodes.len());
852    let mut indegree = HashMap::with_capacity(nodes.len());
853
854    for node in nodes {
855        graph.insert(node.name.as_str(), Vec::new());
856        indegree.insert(node.name.as_str(), 0);
857    }
858
859    // build dependency graph
860    for &node in nodes {
861        if let Ok(args_json) = serde_json::to_string(&node.args) {
862            // collect dependencies
863            let unique_deps = get_tokens_to_replace(&args_json)
864                .into_iter()
865                .filter(|dep| dep != &node.name)
866                .filter_map(|dep| by_name.get(dep.as_str()))
867                .map(|&dep_node| dep_node.name.as_str())
868                .collect::<HashSet<_>>();
869
870            for dep_name in unique_deps {
871                graph
872                    .get_mut(dep_name)
873                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
874                    .push(node);
875                *indegree
876                    .get_mut(node.name.as_str())
877                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
878            }
879        }
880    }
881
882    // find all nodes with no dependencies
883    let mut queue = nodes
884        .iter()
885        .filter(|n| {
886            *indegree
887                .get(n.name.as_str())
888                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
889                == 0
890        })
891        .copied()
892        .collect::<VecDeque<_>>();
893
894    let mut processed_count = 0;
895    let mut levels = Vec::new();
896
897    // Kahn's algorithm
898    while !queue.is_empty() {
899        let level_size = queue.len();
900        let mut current_level = Vec::with_capacity(level_size);
901
902        for _ in 0..level_size {
903            let n = queue
904                .pop_front()
905                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
906            current_level.push(n);
907            processed_count += 1;
908
909            for &neighbour in graph
910                .get(n.name.as_str())
911                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
912            {
913                let neighbour_indegree = indegree
914                    .get_mut(neighbour.name.as_str())
915                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
916                *neighbour_indegree -= 1;
917
918                if *neighbour_indegree == 0 {
919                    queue.push_back(neighbour);
920                }
921            }
922        }
923
924        current_level.sort_by_key(|n| &n.name);
925        levels.push(current_level);
926    }
927
928    // cycles detected, e.g A -> B -> A
929    if processed_count != nodes.len() {
930        return Err(OrchestratorError::InvalidConfig(
931            "Tokens have cyclical dependencies".to_string(),
932        ));
933    }
934
935    Ok(levels)
936}
937
938// TODO: get the fs from `DynNamespace` will make this not needed
939// but the FileSystem trait isn't object-safe so we can't pass around
940// as `dyn FileSystem`. We can refactor or using some `erase` techniques
941// to resolve this and remove this struct
942// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
943// filesystem itself (the trait), so it will return this and we can move this
944// directly to the support crate, it can be useful in the future
945#[derive(Clone, Debug)]
946pub struct ScopedFilesystem<'a, FS: FileSystem> {
947    fs: &'a FS,
948    base_dir: &'a str,
949}
950
951impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
952    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
953        Self { fs, base_dir }
954    }
955
956    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
957        for file in files {
958            let full_remote_path = PathBuf::from(format!(
959                "{}/{}",
960                self.base_dir,
961                file.remote_path.to_string_lossy()
962            ));
963            trace!("coping file: {file}");
964            self.fs
965                .copy(file.local_path.as_path(), full_remote_path)
966                .await?;
967        }
968        Ok(())
969    }
970
971    async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
972        let file = file.as_ref();
973
974        let full_path = if file.is_absolute() {
975            file.to_owned()
976        } else {
977            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
978        };
979        let content = self.fs.read(full_path).await?;
980        Ok(content)
981    }
982
983    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
984        let file = file.as_ref();
985
986        let full_path = if file.is_absolute() {
987            file.to_owned()
988        } else {
989            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
990        };
991        let content = self.fs.read_to_string(full_path).await?;
992        Ok(content)
993    }
994
995    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
996        let path = PathBuf::from(format!(
997            "{}/{}",
998            self.base_dir,
999            path.as_ref().to_string_lossy()
1000        ));
1001        self.fs.create_dir(path).await
1002    }
1003
1004    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1005        let path = PathBuf::from(format!(
1006            "{}/{}",
1007            self.base_dir,
1008            path.as_ref().to_string_lossy()
1009        ));
1010        self.fs.create_dir_all(path).await
1011    }
1012
1013    async fn write(
1014        &self,
1015        path: impl AsRef<Path>,
1016        contents: impl AsRef<[u8]> + Send,
1017    ) -> Result<(), FileSystemError> {
1018        let path = path.as_ref();
1019
1020        let full_path = if path.is_absolute() {
1021            path.to_owned()
1022        } else {
1023            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1024        };
1025
1026        self.fs.write(full_path, contents).await
1027    }
1028
1029    /// Get the full_path in the scoped FS
1030    fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1031        let path = path.as_ref();
1032
1033        let full_path = if path.is_absolute() {
1034            path.to_owned()
1035        } else {
1036            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1037        };
1038
1039        full_path
1040    }
1041}
1042
1043#[derive(Clone, Debug)]
1044pub enum ZombieRole {
1045    Temp,
1046    Node,
1047    Bootnode,
1048    Collator,
1049    CumulusCollator,
1050    Companion,
1051}
1052
1053// re-exports
1054pub use network::{AddCollatorOptions, AddNodeOptions};
1055pub use network_helper::metrics;
1056pub use sc_chain_spec;
1057
1058#[cfg(test)]
1059mod tests {
1060    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1061    use lazy_static::lazy_static;
1062    use tokio::sync::Mutex;
1063
1064    use super::*;
1065
1066    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1067    // mutex for test that use env
1068    lazy_static! {
1069        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1070    }
1071
1072    fn set_env(concurrency: Option<u32>) {
1073        if let Some(value) = concurrency {
1074            env::set_var(ENV_KEY, value.to_string());
1075        } else {
1076            env::remove_var(ENV_KEY);
1077        }
1078    }
1079
1080    fn generate(
1081        with_image: bool,
1082        with_cmd: Option<&'static str>,
1083    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1084        NetworkConfigBuilder::new()
1085            .with_relaychain(|r| {
1086                let mut relay = r
1087                    .with_chain("rococo-local")
1088                    .with_default_command(with_cmd.unwrap_or("polkadot"));
1089                if with_image {
1090                    relay = relay.with_default_image("docker.io/parity/polkadot")
1091                }
1092
1093                relay
1094                    .with_validator(|node| node.with_name("alice"))
1095                    .with_validator(|node| node.with_name("bob"))
1096            })
1097            .with_parachain(|p| {
1098                p.with_id(2000).cumulus_based(true).with_collator(|n| {
1099                    let node = n
1100                        .with_name("collator")
1101                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1102                    if with_image {
1103                        node.with_image("docker.io/paritypr/test-parachain")
1104                    } else {
1105                        node
1106                    }
1107                })
1108            })
1109            .build()
1110    }
1111
1112    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1113        let mut spec = NodeSpec {
1114            name: name.to_string(),
1115            ..Default::default()
1116        };
1117        if let Some(dependencies) = dependencies {
1118            for node in dependencies {
1119                spec.args.push(
1120                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1121                        .as_str()
1122                        .into(),
1123                );
1124            }
1125        }
1126        spec
1127    }
1128
1129    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1130        actual_levels
1131            .iter()
1132            .zip(expected_levels)
1133            .for_each(|(actual_level, expected_level)| {
1134                assert_eq!(actual_level.len(), expected_level.len());
1135                actual_level
1136                    .iter()
1137                    .zip(expected_level.iter())
1138                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1139            });
1140    }
1141
1142    #[tokio::test]
1143    async fn valid_config_with_image() {
1144        let network_config = generate(true, None).unwrap();
1145        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1146        let caps = ProviderCapabilities {
1147            requires_image: true,
1148            has_resources: false,
1149            prefix_with_full_path: false,
1150            use_default_ports_in_cmd: false,
1151        };
1152
1153        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1154        assert!(valid.is_ok())
1155    }
1156
1157    #[tokio::test]
1158    async fn invalid_config_without_image() {
1159        let network_config = generate(false, None).unwrap();
1160        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1161        let caps = ProviderCapabilities {
1162            requires_image: true,
1163            has_resources: false,
1164            prefix_with_full_path: false,
1165            use_default_ports_in_cmd: false,
1166        };
1167
1168        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1169        assert!(valid.is_err())
1170    }
1171
1172    #[tokio::test]
1173    async fn invalid_config_missing_cmd() {
1174        let network_config = generate(false, Some("other")).unwrap();
1175        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1176        let caps = ProviderCapabilities {
1177            requires_image: false,
1178            has_resources: false,
1179            prefix_with_full_path: false,
1180            use_default_ports_in_cmd: false,
1181        };
1182
1183        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1184        assert!(valid.is_err())
1185    }
1186
1187    #[tokio::test]
1188    async fn valid_config_present_cmd() {
1189        let network_config = generate(false, Some("cargo")).unwrap();
1190        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1191        let caps = ProviderCapabilities {
1192            requires_image: false,
1193            has_resources: false,
1194            prefix_with_full_path: false,
1195            use_default_ports_in_cmd: false,
1196        };
1197
1198        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1199        println!("{valid:?}");
1200        assert!(valid.is_ok())
1201    }
1202
1203    #[tokio::test]
1204    async fn default_spawn_concurrency() {
1205        let _g = ENV_MUTEX.lock().await;
1206        set_env(None);
1207        let network_config = generate(false, Some("cargo")).unwrap();
1208        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1209        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1210        assert_eq!(concurrency, 100);
1211    }
1212
1213    #[tokio::test]
1214    async fn set_spawn_concurrency() {
1215        let _g = ENV_MUTEX.lock().await;
1216        set_env(None);
1217
1218        let network_config = generate(false, Some("cargo")).unwrap();
1219        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1220
1221        let global_settings = GlobalSettingsBuilder::new()
1222            .with_spawn_concurrency(4)
1223            .build()
1224            .unwrap();
1225
1226        spec.set_global_settings(global_settings);
1227        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1228        assert_eq!(concurrency, 4);
1229        assert!(!limited);
1230    }
1231
1232    #[tokio::test]
1233    async fn set_spawn_concurrency_but_limited() {
1234        let _g = ENV_MUTEX.lock().await;
1235        set_env(None);
1236
1237        let network_config = generate(false, Some("cargo")).unwrap();
1238        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1239
1240        let global_settings = GlobalSettingsBuilder::new()
1241            .with_spawn_concurrency(4)
1242            .build()
1243            .unwrap();
1244
1245        spec.set_global_settings(global_settings);
1246        let node = spec.relaychain.nodes.first_mut().unwrap();
1247        node.args
1248            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1249        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1250        assert_eq!(concurrency, 1);
1251        assert!(limited);
1252    }
1253
1254    #[tokio::test]
1255    async fn set_spawn_concurrency_from_env() {
1256        let _g = ENV_MUTEX.lock().await;
1257        set_env(Some(10));
1258
1259        let network_config = generate(false, Some("cargo")).unwrap();
1260        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1261        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1262        assert_eq!(concurrency, 10);
1263        assert!(!limited);
1264    }
1265
1266    #[tokio::test]
1267    async fn set_spawn_concurrency_from_env_but_limited() {
1268        let _g = ENV_MUTEX.lock().await;
1269        set_env(Some(12));
1270
1271        let network_config = generate(false, Some("cargo")).unwrap();
1272        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1273        let node = spec.relaychain.nodes.first_mut().unwrap();
1274        node.args
1275            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1276        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1277        assert_eq!(concurrency, 1);
1278        assert!(limited);
1279    }
1280
1281    #[test]
1282    fn dependency_levels_among_should_work() {
1283        // no nodes
1284        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1285
1286        // one node
1287        let alice = get_node_with_dependencies("alice", None);
1288        let nodes = [&alice];
1289
1290        let levels = dependency_levels_among(&nodes).unwrap();
1291        let expected = vec![vec!["alice"]];
1292
1293        verify_levels(levels, expected);
1294
1295        // two independent nodes
1296        let alice = get_node_with_dependencies("alice", None);
1297        let bob = get_node_with_dependencies("bob", None);
1298        let nodes = [&alice, &bob];
1299
1300        let levels = dependency_levels_among(&nodes).unwrap();
1301        let expected = vec![vec!["alice", "bob"]];
1302
1303        verify_levels(levels, expected);
1304
1305        // alice -> bob -> charlie
1306        let alice = get_node_with_dependencies("alice", None);
1307        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1308        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1309        let nodes = [&alice, &bob, &charlie];
1310
1311        let levels = dependency_levels_among(&nodes).unwrap();
1312        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1313
1314        verify_levels(levels, expected);
1315
1316        //         ┌─> bob
1317        // alice ──|
1318        //         └─> charlie
1319        let alice = get_node_with_dependencies("alice", None);
1320        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1321        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1322        let nodes = [&alice, &bob, &charlie];
1323
1324        let levels = dependency_levels_among(&nodes).unwrap();
1325        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1326
1327        verify_levels(levels, expected);
1328
1329        //         ┌─>   bob  ──┐
1330        // alice ──|            ├─> dave
1331        //         └─> charlie  ┘
1332        let alice = get_node_with_dependencies("alice", None);
1333        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1334        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1335        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1336        let nodes = [&alice, &bob, &charlie, &dave];
1337
1338        let levels = dependency_levels_among(&nodes).unwrap();
1339        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1340
1341        verify_levels(levels, expected);
1342    }
1343
1344    #[test]
1345    fn dependency_levels_among_should_detect_cycles() {
1346        let mut alice = get_node_with_dependencies("alice", None);
1347        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1348        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1349
1350        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1351    }
1352}