zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13mod utils;
14
15use std::{
16    collections::{HashMap, HashSet, VecDeque},
17    env,
18    net::IpAddr,
19    path::{Path, PathBuf},
20    time::{Duration, SystemTime},
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27// re-exported
28pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31    types::{ProviderCapabilities, TransferedFile},
32    DynNamespace, DynProvider,
33};
34use serde_json::json;
35use support::{
36    constants::{
37        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
38        THIS_IS_A_BUG,
39    },
40    fs::{FileSystem, FileSystemError},
41    replacer::{get_tokens_to_replace, has_tokens},
42};
43use tokio::time::timeout;
44use tracing::{debug, info, trace};
45
46use crate::{
47    network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
48    shared::types::RegisterParachainOptions,
49    spawner::SpawnNodeCtx,
50    utils::write_zombie_json,
51};
52pub struct Orchestrator<T>
53where
54    T: FileSystem + Sync + Send,
55{
56    filesystem: T,
57    provider: DynProvider,
58}
59
60impl<T> Orchestrator<T>
61where
62    T: FileSystem + Sync + Send + Clone,
63{
64    pub fn new(filesystem: T, provider: DynProvider) -> Self {
65        Self {
66            filesystem,
67            provider,
68        }
69    }
70
71    pub async fn spawn(
72        &self,
73        network_config: NetworkConfig,
74    ) -> Result<Network<T>, OrchestratorError> {
75        let global_timeout = network_config.global_settings().network_spawn_timeout();
76        let network_spec = NetworkSpec::from_config(&network_config).await?;
77
78        let res = timeout(
79            Duration::from_secs(global_timeout.into()),
80            self.spawn_inner(network_spec),
81        )
82        .await
83        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
84        res?
85    }
86
87    pub async fn spawn_from_spec(
88        &self,
89        network_spec: NetworkSpec,
90    ) -> Result<Network<T>, OrchestratorError> {
91        let global_timeout = network_spec.global_settings.network_spawn_timeout();
92        let res = timeout(
93            Duration::from_secs(global_timeout as u64),
94            self.spawn_inner(network_spec),
95        )
96        .await
97        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
98        res?
99    }
100
101    pub async fn attach_to_live(
102        &self,
103        zombie_json_path: &Path,
104    ) -> Result<Network<T>, OrchestratorError> {
105        info!("attaching to live network...");
106        info!("reading zombie.json from {:?}", zombie_json_path);
107
108        let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
109        let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
110
111        info!("recreating namespace...");
112        let ns: DynNamespace = self
113            .provider
114            .create_namespace_from_json(&zombie_json)
115            .await?;
116
117        info!("recreating relaychain...");
118        let (relay, initial_spec) =
119            recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
120        let relay_nodes = relay.nodes.clone();
121
122        let mut network =
123            Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
124
125        for node in relay_nodes {
126            network.insert_node(node);
127        }
128
129        info!("recreating parachains...");
130        let parachains_map =
131            recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
132        let para_nodes = parachains_map
133            .values()
134            .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
135            .collect::<Vec<NetworkNode>>();
136
137        network.set_parachains(parachains_map);
138        for node in para_nodes {
139            network.insert_node(node);
140        }
141
142        Ok(network)
143    }
144
145    async fn spawn_inner(
146        &self,
147        mut network_spec: NetworkSpec,
148    ) -> Result<Network<T>, OrchestratorError> {
149        // main driver for spawn the network
150        debug!(network_spec = ?network_spec,"Network spec to spawn");
151
152        // TODO: move to Provider trait
153        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
154            .map_err(|err| {
155                OrchestratorError::InvalidConfigForProvider(
156                    self.provider.name().into(),
157                    err.to_string(),
158                )
159            })?;
160
161        // create namespace
162        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
163            self.provider
164                .create_namespace_with_base_dir(base_dir)
165                .await?
166        } else {
167            self.provider.create_namespace().await?
168        };
169
170        // set the spawn_concurrency
171        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
172
173        let start_time = SystemTime::now();
174        info!("🧰 ns: {}", ns.name());
175        info!("🧰 base_dir: {:?}", ns.base_dir());
176        info!("🕰 start time: {:?}", start_time);
177        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
178
179        network_spec
180            .populate_nodes_available_args(ns.clone())
181            .await?;
182
183        let base_dir = ns.base_dir().to_string_lossy();
184        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
185        // Create chain-spec for relaychain
186        network_spec
187            .relaychain
188            .chain_spec
189            .build(&ns, &scoped_fs)
190            .await?;
191
192        debug!("relaychain spec built!");
193        // Create parachain artifacts (chain-spec, wasm, state)
194        let relay_chain_id = network_spec
195            .relaychain
196            .chain_spec
197            .read_chain_id(&scoped_fs)
198            .await?;
199
200        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
201        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
202        network_spec
203            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
204            .await?;
205
206        // Gather the parachains to register in genesis and the ones to register with extrinsic
207        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
208            Vec<&ParachainSpec>,
209            Vec<&ParachainSpec>,
210        ) = network_spec
211            .parachains
212            .iter()
213            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
214            .partition(|para| {
215                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
216            });
217
218        let mut para_artifacts = vec![];
219        for para in para_to_register_in_genesis {
220            let genesis_config = para.get_genesis_config()?;
221            para_artifacts.push(genesis_config)
222        }
223
224        // Customize relaychain
225        network_spec
226            .relaychain
227            .chain_spec
228            .customize_relay(
229                &network_spec.relaychain,
230                &network_spec.hrmp_channels,
231                para_artifacts,
232                &scoped_fs,
233            )
234            .await?;
235
236        // Build raw version
237        network_spec
238            .relaychain
239            .chain_spec
240            .build_raw(&ns, &scoped_fs, None)
241            .await?;
242
243        // override wasm if needed
244        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
245            network_spec
246                .relaychain
247                .chain_spec
248                .override_code(&scoped_fs, wasm_override)
249                .await?;
250        }
251
252        // override raw spec if needed
253        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
254            network_spec
255                .relaychain
256                .chain_spec
257                .override_raw_spec(&scoped_fs, raw_spec_override)
258                .await?;
259        }
260
261        let (bootnodes, relaynodes) =
262            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
263
264        // TODO: we want to still supporting spawn a dedicated bootnode??
265        let mut ctx = SpawnNodeCtx {
266            chain_id: &relay_chain_id,
267            parachain_id: None,
268            chain: relay_chain_name.as_str(),
269            role: ZombieRole::Node,
270            ns: &ns,
271            scoped_fs: &scoped_fs,
272            parachain: None,
273            bootnodes_addr: &vec![],
274            wait_ready: false,
275            nodes_by_name: json!({}),
276            global_settings: &network_spec.global_settings,
277        };
278
279        let global_files_to_inject = vec![TransferedFile::new(
280            PathBuf::from(format!(
281                "{}/{relay_chain_name}.json",
282                ns.base_dir().to_string_lossy()
283            )),
284            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
285        )];
286
287        let r = Relaychain::new(
288            relay_chain_name.to_string(),
289            relay_chain_id.clone(),
290            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
291                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
292            )?),
293        );
294        let mut network =
295            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
296
297        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
298        let mut node_ws_url: String = "".to_string();
299
300        // Calculate the bootnodes addr from the running nodes
301        let mut bootnodes_addr: Vec<String> = vec![];
302
303        for level in dependency_levels_among(&bootnodes)? {
304            let mut running_nodes_per_level = vec![];
305            for chunk in level.chunks(spawn_concurrency) {
306                let spawning_tasks = chunk
307                    .iter()
308                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
309
310                for node in futures::future::try_join_all(spawning_tasks).await? {
311                    let bootnode_multiaddr = node.multiaddr();
312
313                    bootnodes_addr.push(bootnode_multiaddr.to_string());
314
315                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
316                    if node_ws_url.is_empty() {
317                        node_ws_url.clone_from(&node.ws_uri)
318                    }
319
320                    running_nodes_per_level.push(node);
321                }
322            }
323            info!(
324                "🕰 waiting for level: {:?} to be up...",
325                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
326            );
327
328            // Wait for all nodes in the current level to be up
329            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
330                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
331            });
332
333            let _ = futures::future::try_join_all(waiting_tasks).await?;
334
335            for node in running_nodes_per_level {
336                // Add the node to the  context and `Network` instance
337                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
338                network.add_running_node(node, None).await;
339            }
340        }
341
342        // Add the bootnodes to the relaychain spec file and ctx
343        network_spec
344            .relaychain
345            .chain_spec
346            .add_bootnodes(&scoped_fs, &bootnodes_addr)
347            .await?;
348
349        ctx.bootnodes_addr = &bootnodes_addr;
350
351        for level in dependency_levels_among(&relaynodes)? {
352            let mut running_nodes_per_level = vec![];
353            for chunk in level.chunks(spawn_concurrency) {
354                let spawning_tasks = chunk
355                    .iter()
356                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
357
358                for node in futures::future::try_join_all(spawning_tasks).await? {
359                    running_nodes_per_level.push(node);
360                }
361            }
362            info!(
363                "🕰 waiting for level: {:?} to be up...",
364                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
365            );
366
367            // Wait for all nodes in the current level to be up
368            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
369                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
370            });
371
372            let _ = futures::future::try_join_all(waiting_tasks).await?;
373
374            for node in running_nodes_per_level {
375                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
376                network.add_running_node(node, None).await;
377            }
378        }
379
380        // spawn paras
381        for para in network_spec.parachains.iter() {
382            // Create parachain (in the context of the running network)
383            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
384            let parachain_id = parachain.chain_id.clone();
385
386            let (bootnodes, collators) =
387                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
388
389            // Create `ctx` for spawn parachain nodes
390            let mut ctx_para = SpawnNodeCtx {
391                parachain: Some(para),
392                parachain_id: parachain_id.as_deref(),
393                role: if para.is_cumulus_based {
394                    ZombieRole::CumulusCollator
395                } else {
396                    ZombieRole::Collator
397                },
398                bootnodes_addr: &vec![],
399                ..ctx.clone()
400            };
401
402            // Calculate the bootnodes addr from the running nodes
403            let mut bootnodes_addr: Vec<String> = vec![];
404            let mut running_nodes: Vec<NetworkNode> = vec![];
405
406            for level in dependency_levels_among(&bootnodes)? {
407                let mut running_nodes_per_level = vec![];
408                for chunk in level.chunks(spawn_concurrency) {
409                    let spawning_tasks = chunk.iter().map(|node| {
410                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
411                    });
412
413                    for node in futures::future::try_join_all(spawning_tasks).await? {
414                        let bootnode_multiaddr = node.multiaddr();
415
416                        bootnodes_addr.push(bootnode_multiaddr.to_string());
417
418                        running_nodes_per_level.push(node);
419                    }
420                }
421                info!(
422                    "🕰 waiting for level: {:?} to be up...",
423                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
424                );
425
426                // Wait for all nodes in the current level to be up
427                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
428                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
429                });
430
431                let _ = futures::future::try_join_all(waiting_tasks).await?;
432
433                for node in running_nodes_per_level {
434                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
435                    running_nodes.push(node);
436                }
437            }
438
439            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
440                para_chain_spec
441                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
442                    .await?;
443            }
444
445            ctx_para.bootnodes_addr = &bootnodes_addr;
446
447            // Spawn the rest of the nodes
448            for level in dependency_levels_among(&collators)? {
449                let mut running_nodes_per_level = vec![];
450                for chunk in level.chunks(spawn_concurrency) {
451                    let spawning_tasks = chunk.iter().map(|node| {
452                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
453                    });
454
455                    for node in futures::future::try_join_all(spawning_tasks).await? {
456                        running_nodes_per_level.push(node);
457                    }
458                }
459                info!(
460                    "🕰 waiting for level: {:?} to be up...",
461                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
462                );
463
464                // Wait for all nodes in the current level to be up
465                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
466                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
467                });
468
469                let _ = futures::future::try_join_all(waiting_tasks).await?;
470
471                for node in running_nodes_per_level {
472                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
473                    running_nodes.push(node);
474                }
475            }
476
477            let running_para_id = parachain.para_id;
478            network.add_para(parachain);
479            for node in running_nodes {
480                network.add_running_node(node, Some(running_para_id)).await;
481            }
482        }
483
484        // TODO:
485        // - add-ons (introspector/tracing/etc)
486
487        // verify nodes
488        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
489
490        // Now we need to register the paras with extrinsic from the Vec collected before;
491        for para in para_to_register_with_extrinsic {
492            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
493                id: para.id,
494                // This needs to resolve correctly
495                wasm_path: para
496                    .genesis_wasm
497                    .artifact_path()
498                    .ok_or(OrchestratorError::InvariantError(
499                        "artifact path for wasm must be set at this point",
500                    ))?
501                    .to_path_buf(),
502                state_path: para
503                    .genesis_state
504                    .artifact_path()
505                    .ok_or(OrchestratorError::InvariantError(
506                        "artifact path for state must be set at this point",
507                    ))?
508                    .to_path_buf(),
509                node_ws_url: node_ws_url.clone(),
510                onboard_as_para: para.onboard_as_parachain,
511                seed: None, // TODO: Seed is passed by?
512                finalization: false,
513            };
514
515            Parachain::register(register_para_options, &scoped_fs).await?;
516        }
517
518        network.set_start_time_ts(start_time);
519
520        write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
521
522        if network_spec.global_settings.tear_down_on_failure() {
523            network.spawn_watching_task();
524        }
525
526        Ok(network)
527    }
528}
529
530// Helpers
531
532async fn recreate_network_nodes_from_json(
533    nodes_json: &serde_json::Value,
534    ns: DynNamespace,
535    provider_name: &str,
536) -> Result<Vec<NetworkNode>, OrchestratorError> {
537    let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
538
539    let mut nodes = Vec::with_capacity(raw_nodes.len());
540    for raw in raw_nodes {
541        // validate provider tag
542        let provider_tag = raw
543            .inner
544            .get("provider_tag")
545            .and_then(|v| v.as_str())
546            .ok_or_else(|| {
547                OrchestratorError::InvalidConfig("Missing `provider_tag` in inner node JSON".into())
548            })?;
549
550        if provider_tag != provider_name {
551            return Err(OrchestratorError::InvalidConfigForProvider(
552                provider_name.to_string(),
553                provider_tag.to_string(),
554            ));
555        }
556        let inner = ns.spawn_node_from_json(&raw.inner).await?;
557        let relay_node = NetworkNode::new(
558            raw.name,
559            raw.ws_uri,
560            raw.prometheus_uri,
561            raw.multiaddr,
562            raw.spec,
563            inner,
564        );
565        nodes.push(relay_node);
566    }
567
568    Ok(nodes)
569}
570
571async fn recreate_relaychain_from_json(
572    zombie_json: &serde_json::Value,
573    ns: DynNamespace,
574    provider_name: &str,
575) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
576    let relay_json = zombie_json
577        .get("relay")
578        .ok_or(OrchestratorError::InvalidConfig(
579            "Missing `relay` field in zombie.json".into(),
580        ))?
581        .clone();
582
583    let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
584
585    let initial_spec: NetworkSpec = serde_json::from_value(
586        zombie_json
587            .get("initial_spec")
588            .ok_or(OrchestratorError::InvalidConfig(
589                "Missing `initial_spec` field in zombie.json".into(),
590            ))?
591            .clone(),
592    )?;
593
594    // Populate relay nodes
595    let nodes =
596        recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
597    relay_raw.inner.nodes = nodes;
598
599    Ok((relay_raw.inner, initial_spec))
600}
601
602async fn recreate_parachains_from_json(
603    zombie_json: &serde_json::Value,
604    ns: DynNamespace,
605    provider_name: &str,
606) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
607    let paras_json = zombie_json
608        .get("parachains")
609        .ok_or(OrchestratorError::InvalidConfig(
610            "Missing `parachains` field in zombie.json".into(),
611        ))?
612        .clone();
613
614    let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
615
616    let mut parachains_map = HashMap::new();
617
618    for (id, parachain_entries) in raw_paras {
619        let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
620
621        for raw_para in parachain_entries {
622            let mut para = raw_para.inner;
623            para.collators =
624                recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
625                    .await?;
626            parsed_vec.push(para);
627        }
628
629        parachains_map.insert(id, parsed_vec);
630    }
631
632    Ok(parachains_map)
633}
634
635// Split the node list depending if it's bootnode or not
636// NOTE: if there isn't a bootnode declared we use the first one
637fn split_nodes_by_bootnodes(
638    nodes: &[NodeSpec],
639    no_default_bootnodes: bool,
640) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
641    // get the bootnodes to spawn first and calculate the bootnode string for use later
642    let mut bootnodes = vec![];
643    let mut other_nodes = vec![];
644    nodes.iter().for_each(|node| {
645        if node.is_bootnode {
646            bootnodes.push(node)
647        } else {
648            other_nodes.push(node)
649        }
650    });
651
652    if bootnodes.is_empty() && !no_default_bootnodes {
653        bootnodes.push(other_nodes.remove(0))
654    }
655
656    (bootnodes, other_nodes)
657}
658
659// Generate a bootnode multiaddress and return as string
660fn generate_bootnode_addr(
661    node: &NetworkNode,
662    ip: &IpAddr,
663    port: u16,
664) -> Result<String, GeneratorError> {
665    generators::generate_node_bootnode_addr(
666        &node.spec.peer_id,
667        ip,
668        port,
669        node.inner.args().as_ref(),
670        &node.spec.p2p_cert_hash,
671    )
672}
673// Validate that the config fulfill all the requirements of the provider
674fn validate_spec_with_provider_capabilities(
675    network_spec: &NetworkSpec,
676    capabilities: &ProviderCapabilities,
677) -> Result<(), anyhow::Error> {
678    let mut errs: Vec<String> = vec![];
679
680    if capabilities.requires_image {
681        // Relaychain
682        if network_spec.relaychain.default_image.is_none() {
683            // we should check if each node have an image
684            let nodes = &network_spec.relaychain.nodes;
685            if nodes.iter().any(|node| node.image.is_none()) {
686                errs.push(String::from(
687                    "Missing image for node, and not default is set at relaychain",
688                ));
689            }
690        };
691
692        // Paras
693        for para in &network_spec.parachains {
694            if para.default_image.is_none() {
695                let nodes = &para.collators;
696                if nodes.iter().any(|node| node.image.is_none()) {
697                    errs.push(format!(
698                        "Missing image for node, and not default is set at parachain {}",
699                        para.id
700                    ));
701                }
702            }
703        }
704    } else {
705        // native
706        // We need to get all the `cmds` and verify if are part of the path
707        let mut cmds: HashSet<&str> = Default::default();
708        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
709            cmds.insert(cmd.as_str());
710        }
711        for node in network_spec.relaychain().nodes.iter() {
712            cmds.insert(node.command());
713        }
714
715        // Paras
716        for para in &network_spec.parachains {
717            if let Some(cmd) = para.default_command.as_ref() {
718                cmds.insert(cmd.as_str());
719            }
720
721            for node in para.collators.iter() {
722                cmds.insert(node.command());
723            }
724        }
725
726        // now check the binaries
727        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
728        trace!("current PATH: {path}");
729        let parts: Vec<_> = path.split(":").collect();
730        for cmd in cmds {
731            let missing = if cmd.contains('/') {
732                trace!("checking {cmd}");
733                if std::fs::metadata(cmd).is_err() {
734                    true
735                } else {
736                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
737                    false
738                }
739            } else {
740                // should be in the PATH
741                !parts.iter().any(|part| {
742                    let path_to = format!("{part}/{cmd}");
743                    trace!("checking {path_to}");
744                    let check_result = std::fs::metadata(&path_to);
745                    trace!("result {:?}", check_result);
746                    if check_result.is_ok() {
747                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
748                        true
749                    } else {
750                        false
751                    }
752                })
753            };
754
755            if missing {
756                errs.push(help_msg(cmd));
757            }
758        }
759    }
760
761    if !errs.is_empty() {
762        let msg = errs.join("\n");
763        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
764    }
765
766    Ok(())
767}
768
769fn help_msg(cmd: &str) -> String {
770    match cmd {
771        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
772            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
773        },
774        "polkadot" => {
775            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
776        },
777        "polkadot-parachain" => {
778            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
779        },
780        _ => {
781            format!("Missing binary {cmd}, please compile it.")
782        },
783    }
784}
785
786/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
787fn spawn_concurrency_from_env() -> Option<usize> {
788    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
789        concurrency.parse::<usize>().ok()
790    } else {
791        None
792    }
793}
794
795fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
796    let desired_spawn_concurrency = match (
797        spawn_concurrency_from_env(),
798        spec.global_settings.spawn_concurrency(),
799    ) {
800        (Some(n), _) => Some(n),
801        (None, Some(n)) => Some(n),
802        _ => None,
803    };
804
805    let (spawn_concurrency, limited_by_tokens) =
806        if let Some(spawn_concurrency) = desired_spawn_concurrency {
807            if spawn_concurrency == 1 {
808                (1, false)
809            } else if has_tokens(&serde_json::to_string(spec)?) {
810                (1, true)
811            } else {
812                (spawn_concurrency, false)
813            }
814        } else {
815            // not set
816            if has_tokens(&serde_json::to_string(spec)?) {
817                (1, true)
818            } else {
819                // use 100 as max concurrency, we can set a max by provider later
820                (100, false)
821            }
822        };
823
824    Ok((spawn_concurrency, limited_by_tokens))
825}
826
827/// Build deterministic dependency **levels** among the given nodes.
828/// - Only dependencies **between nodes in `nodes`** are considered.
829/// - Unknown/out-of-scope references are ignored.
830/// - Self-dependencies are ignored.
831fn dependency_levels_among<'a>(
832    nodes: &'a [&'a NodeSpec],
833) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
834    let by_name = nodes
835        .iter()
836        .map(|n| (n.name.as_str(), *n))
837        .collect::<HashMap<_, _>>();
838
839    let mut graph = HashMap::with_capacity(nodes.len());
840    let mut indegree = HashMap::with_capacity(nodes.len());
841
842    for node in nodes {
843        graph.insert(node.name.as_str(), Vec::new());
844        indegree.insert(node.name.as_str(), 0);
845    }
846
847    // build dependency graph
848    for &node in nodes {
849        if let Ok(args_json) = serde_json::to_string(&node.args) {
850            // collect dependencies
851            let unique_deps = get_tokens_to_replace(&args_json)
852                .into_iter()
853                .filter(|dep| dep != &node.name)
854                .filter_map(|dep| by_name.get(dep.as_str()))
855                .map(|&dep_node| dep_node.name.as_str())
856                .collect::<HashSet<_>>();
857
858            for dep_name in unique_deps {
859                graph
860                    .get_mut(dep_name)
861                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
862                    .push(node);
863                *indegree
864                    .get_mut(node.name.as_str())
865                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
866            }
867        }
868    }
869
870    // find all nodes with no dependencies
871    let mut queue = nodes
872        .iter()
873        .filter(|n| {
874            *indegree
875                .get(n.name.as_str())
876                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
877                == 0
878        })
879        .copied()
880        .collect::<VecDeque<_>>();
881
882    let mut processed_count = 0;
883    let mut levels = Vec::new();
884
885    // Kahn's algorithm
886    while !queue.is_empty() {
887        let level_size = queue.len();
888        let mut current_level = Vec::with_capacity(level_size);
889
890        for _ in 0..level_size {
891            let n = queue
892                .pop_front()
893                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
894            current_level.push(n);
895            processed_count += 1;
896
897            for &neighbour in graph
898                .get(n.name.as_str())
899                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
900            {
901                let neighbour_indegree = indegree
902                    .get_mut(neighbour.name.as_str())
903                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
904                *neighbour_indegree -= 1;
905
906                if *neighbour_indegree == 0 {
907                    queue.push_back(neighbour);
908                }
909            }
910        }
911
912        current_level.sort_by_key(|n| &n.name);
913        levels.push(current_level);
914    }
915
916    // cycles detected, e.g A -> B -> A
917    if processed_count != nodes.len() {
918        return Err(OrchestratorError::InvalidConfig(
919            "Tokens have cyclical dependencies".to_string(),
920        ));
921    }
922
923    Ok(levels)
924}
925
926// TODO: get the fs from `DynNamespace` will make this not needed
927// but the FileSystem trait isn't object-safe so we can't pass around
928// as `dyn FileSystem`. We can refactor or using some `erase` techniques
929// to resolve this and remove this struct
930// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
931// filesystem itself (the trait), so it will return this and we can move this
932// directly to the support crate, it can be useful in the future
933#[derive(Clone, Debug)]
934pub struct ScopedFilesystem<'a, FS: FileSystem> {
935    fs: &'a FS,
936    base_dir: &'a str,
937}
938
939impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
940    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
941        Self { fs, base_dir }
942    }
943
944    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
945        for file in files {
946            let full_remote_path = PathBuf::from(format!(
947                "{}/{}",
948                self.base_dir,
949                file.remote_path.to_string_lossy()
950            ));
951            trace!("coping file: {file}");
952            self.fs
953                .copy(file.local_path.as_path(), full_remote_path)
954                .await?;
955        }
956        Ok(())
957    }
958
959    async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
960        let file = file.as_ref();
961
962        let full_path = if file.is_absolute() {
963            file.to_owned()
964        } else {
965            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
966        };
967        let content = self.fs.read(full_path).await?;
968        Ok(content)
969    }
970
971    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
972        let file = file.as_ref();
973
974        let full_path = if file.is_absolute() {
975            file.to_owned()
976        } else {
977            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
978        };
979        let content = self.fs.read_to_string(full_path).await?;
980        Ok(content)
981    }
982
983    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
984        let path = PathBuf::from(format!(
985            "{}/{}",
986            self.base_dir,
987            path.as_ref().to_string_lossy()
988        ));
989        self.fs.create_dir(path).await
990    }
991
992    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
993        let path = PathBuf::from(format!(
994            "{}/{}",
995            self.base_dir,
996            path.as_ref().to_string_lossy()
997        ));
998        self.fs.create_dir_all(path).await
999    }
1000
1001    async fn write(
1002        &self,
1003        path: impl AsRef<Path>,
1004        contents: impl AsRef<[u8]> + Send,
1005    ) -> Result<(), FileSystemError> {
1006        let path = path.as_ref();
1007
1008        let full_path = if path.is_absolute() {
1009            path.to_owned()
1010        } else {
1011            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1012        };
1013
1014        self.fs.write(full_path, contents).await
1015    }
1016
1017    /// Get the full_path in the scoped FS
1018    fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1019        let path = path.as_ref();
1020
1021        let full_path = if path.is_absolute() {
1022            path.to_owned()
1023        } else {
1024            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1025        };
1026
1027        full_path
1028    }
1029
1030    /// Get the base_dir in the scoped FS
1031    fn base_dir(&self) -> &str {
1032        self.base_dir
1033    }
1034}
1035
1036#[derive(Clone, Debug)]
1037pub enum ZombieRole {
1038    Temp,
1039    Node,
1040    Bootnode,
1041    Collator,
1042    CumulusCollator,
1043    Companion,
1044}
1045
1046// re-exports
1047pub use network::{AddCollatorOptions, AddNodeOptions};
1048pub use network_helper::metrics;
1049pub use sc_chain_spec;
1050
1051#[cfg(test)]
1052mod tests {
1053    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1054    use lazy_static::lazy_static;
1055    use tokio::sync::Mutex;
1056
1057    use super::*;
1058
1059    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1060    // mutex for test that use env
1061    lazy_static! {
1062        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1063    }
1064
1065    fn set_env(concurrency: Option<u32>) {
1066        if let Some(value) = concurrency {
1067            env::set_var(ENV_KEY, value.to_string());
1068        } else {
1069            env::remove_var(ENV_KEY);
1070        }
1071    }
1072
1073    fn generate(
1074        with_image: bool,
1075        with_cmd: Option<&'static str>,
1076    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1077        NetworkConfigBuilder::new()
1078            .with_relaychain(|r| {
1079                let mut relay = r
1080                    .with_chain("rococo-local")
1081                    .with_default_command(with_cmd.unwrap_or("polkadot"));
1082                if with_image {
1083                    relay = relay.with_default_image("docker.io/parity/polkadot")
1084                }
1085
1086                relay
1087                    .with_validator(|node| node.with_name("alice"))
1088                    .with_validator(|node| node.with_name("bob"))
1089            })
1090            .with_parachain(|p| {
1091                p.with_id(2000).cumulus_based(true).with_collator(|n| {
1092                    let node = n
1093                        .with_name("collator")
1094                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1095                    if with_image {
1096                        node.with_image("docker.io/paritypr/test-parachain")
1097                    } else {
1098                        node
1099                    }
1100                })
1101            })
1102            .build()
1103    }
1104
1105    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1106        let mut spec = NodeSpec {
1107            name: name.to_string(),
1108            ..Default::default()
1109        };
1110        if let Some(dependencies) = dependencies {
1111            for node in dependencies {
1112                spec.args.push(
1113                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1114                        .as_str()
1115                        .into(),
1116                );
1117            }
1118        }
1119        spec
1120    }
1121
1122    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1123        actual_levels
1124            .iter()
1125            .zip(expected_levels)
1126            .for_each(|(actual_level, expected_level)| {
1127                assert_eq!(actual_level.len(), expected_level.len());
1128                actual_level
1129                    .iter()
1130                    .zip(expected_level.iter())
1131                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1132            });
1133    }
1134
1135    #[tokio::test]
1136    async fn valid_config_with_image() {
1137        let network_config = generate(true, None).unwrap();
1138        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1139        let caps = ProviderCapabilities {
1140            requires_image: true,
1141            has_resources: false,
1142            prefix_with_full_path: false,
1143            use_default_ports_in_cmd: false,
1144        };
1145
1146        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1147        assert!(valid.is_ok())
1148    }
1149
1150    #[tokio::test]
1151    async fn invalid_config_without_image() {
1152        let network_config = generate(false, None).unwrap();
1153        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1154        let caps = ProviderCapabilities {
1155            requires_image: true,
1156            has_resources: false,
1157            prefix_with_full_path: false,
1158            use_default_ports_in_cmd: false,
1159        };
1160
1161        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1162        assert!(valid.is_err())
1163    }
1164
1165    #[tokio::test]
1166    async fn invalid_config_missing_cmd() {
1167        let network_config = generate(false, Some("other")).unwrap();
1168        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1169        let caps = ProviderCapabilities {
1170            requires_image: false,
1171            has_resources: false,
1172            prefix_with_full_path: false,
1173            use_default_ports_in_cmd: false,
1174        };
1175
1176        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1177        assert!(valid.is_err())
1178    }
1179
1180    #[tokio::test]
1181    async fn valid_config_present_cmd() {
1182        let network_config = generate(false, Some("cargo")).unwrap();
1183        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1184        let caps = ProviderCapabilities {
1185            requires_image: false,
1186            has_resources: false,
1187            prefix_with_full_path: false,
1188            use_default_ports_in_cmd: false,
1189        };
1190
1191        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1192        println!("{valid:?}");
1193        assert!(valid.is_ok())
1194    }
1195
1196    #[tokio::test]
1197    async fn default_spawn_concurrency() {
1198        let _g = ENV_MUTEX.lock().await;
1199        set_env(None);
1200        let network_config = generate(false, Some("cargo")).unwrap();
1201        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1202        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1203        assert_eq!(concurrency, 100);
1204    }
1205
1206    #[tokio::test]
1207    async fn set_spawn_concurrency() {
1208        let _g = ENV_MUTEX.lock().await;
1209        set_env(None);
1210
1211        let network_config = generate(false, Some("cargo")).unwrap();
1212        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1213
1214        let global_settings = GlobalSettingsBuilder::new()
1215            .with_spawn_concurrency(4)
1216            .build()
1217            .unwrap();
1218
1219        spec.set_global_settings(global_settings);
1220        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1221        assert_eq!(concurrency, 4);
1222        assert!(!limited);
1223    }
1224
1225    #[tokio::test]
1226    async fn set_spawn_concurrency_but_limited() {
1227        let _g = ENV_MUTEX.lock().await;
1228        set_env(None);
1229
1230        let network_config = generate(false, Some("cargo")).unwrap();
1231        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1232
1233        let global_settings = GlobalSettingsBuilder::new()
1234            .with_spawn_concurrency(4)
1235            .build()
1236            .unwrap();
1237
1238        spec.set_global_settings(global_settings);
1239        let node = spec.relaychain.nodes.first_mut().unwrap();
1240        node.args
1241            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1242        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1243        assert_eq!(concurrency, 1);
1244        assert!(limited);
1245    }
1246
1247    #[tokio::test]
1248    async fn set_spawn_concurrency_from_env() {
1249        let _g = ENV_MUTEX.lock().await;
1250        set_env(Some(10));
1251
1252        let network_config = generate(false, Some("cargo")).unwrap();
1253        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1254        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1255        assert_eq!(concurrency, 10);
1256        assert!(!limited);
1257    }
1258
1259    #[tokio::test]
1260    async fn set_spawn_concurrency_from_env_but_limited() {
1261        let _g = ENV_MUTEX.lock().await;
1262        set_env(Some(12));
1263
1264        let network_config = generate(false, Some("cargo")).unwrap();
1265        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1266        let node = spec.relaychain.nodes.first_mut().unwrap();
1267        node.args
1268            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1269        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1270        assert_eq!(concurrency, 1);
1271        assert!(limited);
1272    }
1273
1274    #[test]
1275    fn dependency_levels_among_should_work() {
1276        // no nodes
1277        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1278
1279        // one node
1280        let alice = get_node_with_dependencies("alice", None);
1281        let nodes = [&alice];
1282
1283        let levels = dependency_levels_among(&nodes).unwrap();
1284        let expected = vec![vec!["alice"]];
1285
1286        verify_levels(levels, expected);
1287
1288        // two independent nodes
1289        let alice = get_node_with_dependencies("alice", None);
1290        let bob = get_node_with_dependencies("bob", None);
1291        let nodes = [&alice, &bob];
1292
1293        let levels = dependency_levels_among(&nodes).unwrap();
1294        let expected = vec![vec!["alice", "bob"]];
1295
1296        verify_levels(levels, expected);
1297
1298        // alice -> bob -> charlie
1299        let alice = get_node_with_dependencies("alice", None);
1300        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1301        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1302        let nodes = [&alice, &bob, &charlie];
1303
1304        let levels = dependency_levels_among(&nodes).unwrap();
1305        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1306
1307        verify_levels(levels, expected);
1308
1309        //         ┌─> bob
1310        // alice ──|
1311        //         └─> charlie
1312        let alice = get_node_with_dependencies("alice", None);
1313        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1314        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1315        let nodes = [&alice, &bob, &charlie];
1316
1317        let levels = dependency_levels_among(&nodes).unwrap();
1318        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1319
1320        verify_levels(levels, expected);
1321
1322        //         ┌─>   bob  ──┐
1323        // alice ──|            ├─> dave
1324        //         └─> charlie  ┘
1325        let alice = get_node_with_dependencies("alice", None);
1326        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1327        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1328        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1329        let nodes = [&alice, &bob, &charlie, &dave];
1330
1331        let levels = dependency_levels_among(&nodes).unwrap();
1332        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1333
1334        verify_levels(levels, expected);
1335    }
1336
1337    #[test]
1338    fn dependency_levels_among_should_detect_cycles() {
1339        let mut alice = get_node_with_dependencies("alice", None);
1340        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1341        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1342
1343        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1344    }
1345}