zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    env,
19    net::IpAddr,
20    path::{Path, PathBuf},
21    time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28// re-exported
29pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32    types::{ProviderCapabilities, TransferedFile},
33    DynNamespace, DynProvider,
34};
35use serde_json::json;
36use support::{
37    constants::{
38        GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39        THIS_IS_A_BUG,
40    },
41    fs::{FileSystem, FileSystemError},
42    replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{
48    network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
49    shared::types::RegisterParachainOptions,
50    spawner::SpawnNodeCtx,
51    utils::write_zombie_json,
52};
53pub struct Orchestrator<T>
54where
55    T: FileSystem + Sync + Send,
56{
57    filesystem: T,
58    provider: DynProvider,
59}
60
61impl<T> Orchestrator<T>
62where
63    T: FileSystem + Sync + Send + Clone,
64{
65    pub fn new(filesystem: T, provider: DynProvider) -> Self {
66        Self {
67            filesystem,
68            provider,
69        }
70    }
71
72    pub async fn spawn(
73        &self,
74        network_config: NetworkConfig,
75    ) -> Result<Network<T>, OrchestratorError> {
76        let global_timeout = network_config.global_settings().network_spawn_timeout();
77        let network_spec = NetworkSpec::from_config(&network_config).await?;
78
79        let res = timeout(
80            Duration::from_secs(global_timeout.into()),
81            self.spawn_inner(network_spec),
82        )
83        .await
84        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
85        res?
86    }
87
88    pub async fn spawn_from_spec(
89        &self,
90        network_spec: NetworkSpec,
91    ) -> Result<Network<T>, OrchestratorError> {
92        let global_timeout = network_spec.global_settings.network_spawn_timeout();
93        let res = timeout(
94            Duration::from_secs(global_timeout as u64),
95            self.spawn_inner(network_spec),
96        )
97        .await
98        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
99        res?
100    }
101
102    pub async fn attach_to_live(
103        &self,
104        zombie_json_path: &Path,
105    ) -> Result<Network<T>, OrchestratorError> {
106        info!("attaching to live network...");
107        info!("reading zombie.json from {:?}", zombie_json_path);
108
109        let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
110        let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
111
112        info!("recreating namespace...");
113        let ns: DynNamespace = self
114            .provider
115            .create_namespace_from_json(&zombie_json)
116            .await?;
117
118        info!("recreating relaychain...");
119        let (relay, initial_spec) =
120            recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
121        let relay_nodes = relay.nodes.clone();
122
123        let mut network =
124            Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
125
126        for node in relay_nodes {
127            if node.is_responsive().await {
128                node.set_is_running(true);
129            }
130            network.insert_node(node);
131        }
132
133        info!("recreating parachains...");
134        let parachains_map =
135            recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
136        let para_nodes = parachains_map
137            .values()
138            .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
139            .collect::<Vec<NetworkNode>>();
140
141        network.set_parachains(parachains_map);
142        for node in para_nodes {
143            if node.is_responsive().await {
144                node.set_is_running(true);
145            }
146            network.insert_node(node);
147        }
148
149        Ok(network)
150    }
151
152    async fn spawn_inner(
153        &self,
154        mut network_spec: NetworkSpec,
155    ) -> Result<Network<T>, OrchestratorError> {
156        // main driver for spawn the network
157        debug!(network_spec = ?network_spec,"Network spec to spawn");
158
159        // TODO: move to Provider trait
160        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
161            .map_err(|err| {
162                OrchestratorError::InvalidConfigForProvider(
163                    self.provider.name().into(),
164                    err.to_string(),
165                )
166            })?;
167
168        // create namespace
169        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
170            self.provider
171                .create_namespace_with_base_dir(base_dir)
172                .await?
173        } else {
174            self.provider.create_namespace().await?
175        };
176
177        // set the spawn_concurrency
178        let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
179
180        let start_time = SystemTime::now();
181        info!("🧰 ns: {}", ns.name());
182        info!("🧰 base_dir: {:?}", ns.base_dir());
183        info!("🕰 start time: {:?}", start_time);
184        info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
185
186        network_spec
187            .populate_nodes_available_args(ns.clone())
188            .await?;
189
190        let base_dir = ns.base_dir().to_string_lossy();
191        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
192        // Create chain-spec for relaychain
193        network_spec
194            .relaychain
195            .chain_spec
196            .build(&ns, &scoped_fs)
197            .await?;
198
199        debug!("relaychain spec built!");
200        // Create parachain artifacts (chain-spec, wasm, state)
201        let relay_chain_id = network_spec
202            .relaychain
203            .chain_spec
204            .read_chain_id(&scoped_fs)
205            .await?;
206
207        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
208        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
209        network_spec
210            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
211            .await?;
212
213        // Gather the parachains to register in genesis and the ones to register with extrinsic
214        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
215            Vec<&ParachainSpec>,
216            Vec<&ParachainSpec>,
217        ) = network_spec
218            .parachains
219            .iter()
220            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
221            .partition(|para| {
222                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
223            });
224
225        let mut para_artifacts = vec![];
226        for para in para_to_register_in_genesis {
227            let genesis_config = para.get_genesis_config()?;
228            para_artifacts.push(genesis_config)
229        }
230
231        // Customize relaychain
232        network_spec
233            .relaychain
234            .chain_spec
235            .customize_relay(
236                &network_spec.relaychain,
237                &network_spec.hrmp_channels,
238                para_artifacts,
239                &scoped_fs,
240            )
241            .await?;
242
243        // Run post-process script if configured for the relaychain (run against plain spec before building raw)
244        if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
245            network_spec
246                .relaychain
247                .chain_spec
248                .run_post_process_script(script_cmd, &scoped_fs)
249                .await?;
250        }
251
252        // Build raw version (after any post-processing of the plain spec)
253        network_spec
254            .relaychain
255            .chain_spec
256            .build_raw(&ns, &scoped_fs, None)
257            .await?;
258
259        // override wasm if needed
260        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
261            network_spec
262                .relaychain
263                .chain_spec
264                .override_code(&scoped_fs, wasm_override)
265                .await?;
266        }
267
268        // override raw spec if needed
269        if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
270            network_spec
271                .relaychain
272                .chain_spec
273                .override_raw_spec(&scoped_fs, raw_spec_override)
274                .await?;
275        }
276
277        let (bootnodes, relaynodes) =
278            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
279
280        // TODO: we want to still supporting spawn a dedicated bootnode??
281        let mut ctx = SpawnNodeCtx {
282            chain_id: &relay_chain_id,
283            parachain_id: None,
284            chain: relay_chain_name.as_str(),
285            role: ZombieRole::Node,
286            ns: &ns,
287            scoped_fs: &scoped_fs,
288            parachain: None,
289            bootnodes_addr: &vec![],
290            wait_ready: false,
291            nodes_by_name: json!({}),
292            global_settings: &network_spec.global_settings,
293        };
294
295        let global_files_to_inject = vec![TransferedFile::new(
296            PathBuf::from(format!(
297                "{}/{relay_chain_name}.json",
298                ns.base_dir().to_string_lossy()
299            )),
300            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
301        )];
302
303        let r = Relaychain::new(
304            relay_chain_name.to_string(),
305            relay_chain_id.clone(),
306            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
307                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
308            )?),
309        );
310        let mut network =
311            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
312
313        // Initiate the node_ws_url which will be later used in the Parachain_with_extrinsic config
314        let mut node_ws_url: String = "".to_string();
315
316        // Calculate the bootnodes addr from the running nodes
317        let mut bootnodes_addr: Vec<String> = vec![];
318
319        for level in dependency_levels_among(&bootnodes)? {
320            let mut running_nodes_per_level = vec![];
321            for chunk in level.chunks(spawn_concurrency) {
322                let spawning_tasks = chunk
323                    .iter()
324                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
325
326                for node in futures::future::try_join_all(spawning_tasks).await? {
327                    let bootnode_multiaddr = node.multiaddr();
328
329                    bootnodes_addr.push(bootnode_multiaddr.to_string());
330
331                    // Is used in the register_para_options (We need to get this from the relay and not the collators)
332                    if node_ws_url.is_empty() {
333                        node_ws_url.clone_from(&node.ws_uri)
334                    }
335
336                    running_nodes_per_level.push(node);
337                }
338            }
339            info!(
340                "🕰  waiting for level: {:?} to be up...",
341                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
342            );
343
344            // Wait for all nodes in the current level to be up
345            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
346                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
347            });
348
349            let _ = futures::future::try_join_all(waiting_tasks).await?;
350
351            for node in running_nodes_per_level {
352                // Add the node to the  context and `Network` instance
353                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
354                network.add_running_node(node, None).await;
355            }
356        }
357
358        // Add the bootnodes to the relaychain spec file and ctx
359        network_spec
360            .relaychain
361            .chain_spec
362            .add_bootnodes(&scoped_fs, &bootnodes_addr)
363            .await?;
364
365        ctx.bootnodes_addr = &bootnodes_addr;
366
367        for level in dependency_levels_among(&relaynodes)? {
368            let mut running_nodes_per_level = vec![];
369            for chunk in level.chunks(spawn_concurrency) {
370                let spawning_tasks = chunk
371                    .iter()
372                    .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
373
374                for node in futures::future::try_join_all(spawning_tasks).await? {
375                    running_nodes_per_level.push(node);
376                }
377            }
378            info!(
379                "🕰 waiting for level: {:?} to be up...",
380                level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
381            );
382
383            // Wait for all nodes in the current level to be up
384            let waiting_tasks = running_nodes_per_level.iter().map(|node| {
385                node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
386            });
387
388            let _ = futures::future::try_join_all(waiting_tasks).await?;
389
390            for node in running_nodes_per_level {
391                ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
392                network.add_running_node(node, None).await;
393            }
394        }
395
396        // spawn paras
397        for para in network_spec.parachains.iter() {
398            // Create parachain (in the context of the running network)
399            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
400            let parachain_id = parachain.chain_id.clone();
401
402            let (bootnodes, collators) =
403                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
404
405            // Create `ctx` for spawn parachain nodes
406            let mut ctx_para = SpawnNodeCtx {
407                parachain: Some(para),
408                parachain_id: parachain_id.as_deref(),
409                role: if para.is_cumulus_based {
410                    ZombieRole::CumulusCollator
411                } else {
412                    ZombieRole::Collator
413                },
414                bootnodes_addr: &vec![],
415                ..ctx.clone()
416            };
417
418            // Calculate the bootnodes addr from the running nodes
419            let mut bootnodes_addr: Vec<String> = vec![];
420            let mut running_nodes: Vec<NetworkNode> = vec![];
421
422            for level in dependency_levels_among(&bootnodes)? {
423                let mut running_nodes_per_level = vec![];
424                for chunk in level.chunks(spawn_concurrency) {
425                    let spawning_tasks = chunk.iter().map(|node| {
426                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
427                    });
428
429                    for node in futures::future::try_join_all(spawning_tasks).await? {
430                        let bootnode_multiaddr = node.multiaddr();
431
432                        bootnodes_addr.push(bootnode_multiaddr.to_string());
433
434                        running_nodes_per_level.push(node);
435                    }
436                }
437                info!(
438                    "🕰  waiting for level: {:?} to be up...",
439                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
440                );
441
442                // Wait for all nodes in the current level to be up
443                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
444                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
445                });
446
447                let _ = futures::future::try_join_all(waiting_tasks).await?;
448
449                for node in running_nodes_per_level {
450                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
451                    running_nodes.push(node);
452                }
453            }
454
455            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
456                para_chain_spec
457                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
458                    .await?;
459            }
460
461            ctx_para.bootnodes_addr = &bootnodes_addr;
462
463            // Spawn the rest of the nodes
464            for level in dependency_levels_among(&collators)? {
465                let mut running_nodes_per_level = vec![];
466                for chunk in level.chunks(spawn_concurrency) {
467                    let spawning_tasks = chunk.iter().map(|node| {
468                        spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
469                    });
470
471                    for node in futures::future::try_join_all(spawning_tasks).await? {
472                        running_nodes_per_level.push(node);
473                    }
474                }
475                info!(
476                    "🕰  waiting for level: {:?} to be up...",
477                    level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
478                );
479
480                // Wait for all nodes in the current level to be up
481                let waiting_tasks = running_nodes_per_level.iter().map(|node| {
482                    node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
483                });
484
485                let _ = futures::future::try_join_all(waiting_tasks).await?;
486
487                for node in running_nodes_per_level {
488                    ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
489                    running_nodes.push(node);
490                }
491            }
492
493            let running_para_id = parachain.para_id;
494            network.add_para(parachain);
495            for node in running_nodes {
496                network.add_running_node(node, Some(running_para_id)).await;
497            }
498        }
499
500        // Now we need to register the paras with extrinsic from the Vec collected before;
501        for para in para_to_register_with_extrinsic {
502            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
503                id: para.id,
504                // This needs to resolve correctly
505                wasm_path: para
506                    .genesis_wasm
507                    .artifact_path()
508                    .ok_or(OrchestratorError::InvariantError(
509                        "artifact path for wasm must be set at this point",
510                    ))?
511                    .to_path_buf(),
512                state_path: para
513                    .genesis_state
514                    .artifact_path()
515                    .ok_or(OrchestratorError::InvariantError(
516                        "artifact path for state must be set at this point",
517                    ))?
518                    .to_path_buf(),
519                node_ws_url: node_ws_url.clone(),
520                onboard_as_para: para.onboard_as_parachain,
521                seed: None, // TODO: Seed is passed by?
522                finalization: false,
523            };
524
525            Parachain::register(register_para_options, &scoped_fs).await?;
526        }
527
528        if network_spec.global_settings.observability().enabled() {
529            match network
530                .start_observability(network_spec.global_settings.observability())
531                .await
532            {
533                Ok(obs) => {
534                    info!("📊 Prometheus URL: {}", obs.prometheus_url);
535                    info!("📊 Grafana URL: {}", obs.grafana_url);
536                },
537                Err(e) => {
538                    warn!("⚠️  Failed to spawn observability stack: {e}");
539                },
540            }
541        }
542
543        // start custom processes if needed
544        for cp in &network_spec.custom_processes {
545            if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
546                warn!("⚠️  Failed to spawn custom process {}, err: {e}", cp.name())
547            }
548        }
549
550        network.set_start_time_ts(start_time);
551
552        write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
553
554        if network_spec.global_settings.tear_down_on_failure() {
555            network.spawn_watching_task();
556        }
557
558        Ok(network)
559    }
560}
561
562// Helpers
563
564async fn recreate_network_nodes_from_json(
565    nodes_json: &serde_json::Value,
566    ns: DynNamespace,
567    provider_name: &str,
568) -> Result<Vec<NetworkNode>, OrchestratorError> {
569    let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
570
571    let mut nodes = Vec::with_capacity(raw_nodes.len());
572    for raw in raw_nodes {
573        // validate provider tag
574        let provider_tag = raw
575            .inner
576            .get("provider_tag")
577            .and_then(|v| v.as_str())
578            .ok_or_else(|| {
579                OrchestratorError::InvalidConfig(format!(
580                    "Node '{}' is missing `provider_tag` in inner node JSON",
581                    raw.name
582                ))
583            })?;
584
585        if provider_tag != provider_name {
586            return Err(OrchestratorError::InvalidConfigForProvider(
587                provider_name.to_string(),
588                provider_tag.to_string(),
589            ));
590        }
591        let inner = ns.spawn_node_from_json(&raw.inner).await?;
592        let relay_node = NetworkNode::new(
593            raw.name,
594            raw.ws_uri,
595            raw.prometheus_uri,
596            raw.multiaddr,
597            raw.spec,
598            inner,
599        );
600        nodes.push(relay_node);
601    }
602
603    Ok(nodes)
604}
605
606async fn recreate_relaychain_from_json(
607    zombie_json: &serde_json::Value,
608    ns: DynNamespace,
609    provider_name: &str,
610) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
611    let relay_json = zombie_json
612        .get("relay")
613        .ok_or(OrchestratorError::InvalidConfig(
614            "Missing `relay` field in zombie.json".into(),
615        ))?
616        .clone();
617
618    let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
619
620    let initial_spec: NetworkSpec = serde_json::from_value(
621        zombie_json
622            .get("initial_spec")
623            .ok_or(OrchestratorError::InvalidConfig(
624                "Missing `initial_spec` field in zombie.json".into(),
625            ))?
626            .clone(),
627    )?;
628
629    // Populate relay nodes
630    let nodes =
631        recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
632    relay_raw.inner.nodes = nodes;
633
634    Ok((relay_raw.inner, initial_spec))
635}
636
637async fn recreate_parachains_from_json(
638    zombie_json: &serde_json::Value,
639    ns: DynNamespace,
640    provider_name: &str,
641) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
642    let paras_json = zombie_json
643        .get("parachains")
644        .ok_or(OrchestratorError::InvalidConfig(
645            "Missing `parachains` field in zombie.json".into(),
646        ))?
647        .clone();
648
649    let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
650
651    let mut parachains_map = HashMap::new();
652
653    for (id, parachain_entries) in raw_paras {
654        let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
655
656        for raw_para in parachain_entries {
657            let mut para = raw_para.inner;
658            para.collators =
659                recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
660                    .await?;
661            parsed_vec.push(para);
662        }
663
664        parachains_map.insert(id, parsed_vec);
665    }
666
667    Ok(parachains_map)
668}
669
670// Split the node list depending if it's bootnode or not
671// NOTE: if there isn't a bootnode declared we use the first one
672fn split_nodes_by_bootnodes(
673    nodes: &[NodeSpec],
674    no_default_bootnodes: bool,
675) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
676    // get the bootnodes to spawn first and calculate the bootnode string for use later
677    let mut bootnodes = vec![];
678    let mut other_nodes = vec![];
679    nodes.iter().for_each(|node| {
680        if node.is_bootnode {
681            bootnodes.push(node)
682        } else {
683            other_nodes.push(node)
684        }
685    });
686
687    if bootnodes.is_empty() && !no_default_bootnodes {
688        bootnodes.push(other_nodes.remove(0))
689    }
690
691    (bootnodes, other_nodes)
692}
693
694// Generate a bootnode multiaddress and return as string
695fn generate_bootnode_addr(
696    node: &NetworkNode,
697    ip: &IpAddr,
698    port: u16,
699) -> Result<String, GeneratorError> {
700    generators::generate_node_bootnode_addr(
701        &node.spec.peer_id,
702        ip,
703        port,
704        node.inner.args().as_ref(),
705        &node.spec.p2p_cert_hash,
706    )
707}
708// Validate that the config fulfill all the requirements of the provider
709fn validate_spec_with_provider_capabilities(
710    network_spec: &NetworkSpec,
711    capabilities: &ProviderCapabilities,
712) -> Result<(), anyhow::Error> {
713    let mut errs: Vec<String> = vec![];
714
715    if capabilities.requires_image {
716        // Relaychain
717        if network_spec.relaychain.default_image.is_none() {
718            // we should check if each node have an image
719            let nodes = &network_spec.relaychain.nodes;
720            if nodes.iter().any(|node| node.image.is_none()) {
721                errs.push(String::from(
722                    "Missing image for node, and not default is set at relaychain",
723                ));
724            }
725        };
726
727        // Paras
728        for para in &network_spec.parachains {
729            if para.default_image.is_none() {
730                let nodes = &para.collators;
731                if nodes.iter().any(|node| node.image.is_none()) {
732                    errs.push(format!(
733                        "Missing image for node, and not default is set at parachain {}",
734                        para.id
735                    ));
736                }
737            }
738        }
739    } else {
740        // native
741        // We need to get all the `cmds` and verify if are part of the path
742        let mut cmds: HashSet<&str> = Default::default();
743        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
744            cmds.insert(cmd.as_str());
745        }
746        for node in network_spec.relaychain().nodes.iter() {
747            cmds.insert(node.command());
748        }
749
750        // Paras
751        for para in &network_spec.parachains {
752            if let Some(cmd) = para.default_command.as_ref() {
753                cmds.insert(cmd.as_str());
754            }
755
756            for node in para.collators.iter() {
757                cmds.insert(node.command());
758            }
759        }
760
761        // now check the binaries
762        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
763        trace!("current PATH: {path}");
764        let parts: Vec<_> = path.split(":").collect();
765        for cmd in cmds {
766            let missing = if cmd.contains('/') {
767                trace!("checking {cmd}");
768                if std::fs::metadata(cmd).is_err() {
769                    true
770                } else {
771                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
772                    false
773                }
774            } else {
775                // should be in the PATH
776                !parts.iter().any(|part| {
777                    let path_to = format!("{part}/{cmd}");
778                    trace!("checking {path_to}");
779                    let check_result = std::fs::metadata(&path_to);
780                    trace!("result {:?}", check_result);
781                    if check_result.is_ok() {
782                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
783                        true
784                    } else {
785                        false
786                    }
787                })
788            };
789
790            if missing {
791                errs.push(help_msg(cmd));
792            }
793        }
794    }
795
796    if !errs.is_empty() {
797        let msg = errs.join("\n");
798        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
799    }
800
801    Ok(())
802}
803
804fn help_msg(cmd: &str) -> String {
805    match cmd {
806        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
807            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
808        },
809        "polkadot" => {
810            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
811        },
812        "polkadot-parachain" => {
813            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
814        },
815        _ => {
816            format!("Missing binary {cmd}, please compile it.")
817        },
818    }
819}
820
821/// Allow to set the default concurrency through env var `ZOMBIE_SPAWN_CONCURRENCY`
822fn spawn_concurrency_from_env() -> Option<usize> {
823    if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
824        concurrency.parse::<usize>().ok()
825    } else {
826        None
827    }
828}
829
830fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
831    let desired_spawn_concurrency = match (
832        spawn_concurrency_from_env(),
833        spec.global_settings.spawn_concurrency(),
834    ) {
835        (Some(n), _) => Some(n),
836        (None, Some(n)) => Some(n),
837        _ => None,
838    };
839
840    let (spawn_concurrency, limited_by_tokens) =
841        if let Some(spawn_concurrency) = desired_spawn_concurrency {
842            if spawn_concurrency == 1 {
843                (1, false)
844            } else if has_tokens(&serde_json::to_string(spec)?) {
845                (1, true)
846            } else {
847                (spawn_concurrency, false)
848            }
849        } else {
850            // not set
851            if has_tokens(&serde_json::to_string(spec)?) {
852                (1, true)
853            } else {
854                // use 100 as max concurrency, we can set a max by provider later
855                (100, false)
856            }
857        };
858
859    Ok((spawn_concurrency, limited_by_tokens))
860}
861
862/// Build deterministic dependency **levels** among the given nodes.
863/// - Only dependencies **between nodes in `nodes`** are considered.
864/// - Unknown/out-of-scope references are ignored.
865/// - Self-dependencies are ignored.
866fn dependency_levels_among<'a>(
867    nodes: &'a [&'a NodeSpec],
868) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
869    let by_name = nodes
870        .iter()
871        .map(|n| (n.name.as_str(), *n))
872        .collect::<HashMap<_, _>>();
873
874    let mut graph = HashMap::with_capacity(nodes.len());
875    let mut indegree = HashMap::with_capacity(nodes.len());
876
877    for node in nodes {
878        graph.insert(node.name.as_str(), Vec::new());
879        indegree.insert(node.name.as_str(), 0);
880    }
881
882    // build dependency graph
883    for &node in nodes {
884        if let Ok(args_json) = serde_json::to_string(&node.args) {
885            // collect dependencies
886            let unique_deps = get_tokens_to_replace(&args_json)
887                .into_iter()
888                .filter(|dep| dep != &node.name)
889                .filter_map(|dep| by_name.get(dep.as_str()))
890                .map(|&dep_node| dep_node.name.as_str())
891                .collect::<HashSet<_>>();
892
893            for dep_name in unique_deps {
894                graph
895                    .get_mut(dep_name)
896                    .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
897                    .push(node);
898                *indegree
899                    .get_mut(node.name.as_str())
900                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
901            }
902        }
903    }
904
905    // find all nodes with no dependencies
906    let mut queue = nodes
907        .iter()
908        .filter(|n| {
909            *indegree
910                .get(n.name.as_str())
911                .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
912                == 0
913        })
914        .copied()
915        .collect::<VecDeque<_>>();
916
917    let mut processed_count = 0;
918    let mut levels = Vec::new();
919
920    // Kahn's algorithm
921    while !queue.is_empty() {
922        let level_size = queue.len();
923        let mut current_level = Vec::with_capacity(level_size);
924
925        for _ in 0..level_size {
926            let n = queue
927                .pop_front()
928                .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
929            current_level.push(n);
930            processed_count += 1;
931
932            for &neighbour in graph
933                .get(n.name.as_str())
934                .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
935            {
936                let neighbour_indegree = indegree
937                    .get_mut(neighbour.name.as_str())
938                    .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
939                *neighbour_indegree -= 1;
940
941                if *neighbour_indegree == 0 {
942                    queue.push_back(neighbour);
943                }
944            }
945        }
946
947        current_level.sort_by_key(|n| &n.name);
948        levels.push(current_level);
949    }
950
951    // cycles detected, e.g A -> B -> A
952    if processed_count != nodes.len() {
953        return Err(OrchestratorError::InvalidConfig(
954            "Tokens have cyclical dependencies".to_string(),
955        ));
956    }
957
958    Ok(levels)
959}
960
961// TODO: get the fs from `DynNamespace` will make this not needed
962// but the FileSystem trait isn't object-safe so we can't pass around
963// as `dyn FileSystem`. We can refactor or using some `erase` techniques
964// to resolve this and remove this struct
965// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
966// filesystem itself (the trait), so it will return this and we can move this
967// directly to the support crate, it can be useful in the future
968#[derive(Clone, Debug)]
969pub struct ScopedFilesystem<'a, FS: FileSystem> {
970    fs: &'a FS,
971    base_dir: &'a str,
972}
973
974impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
975    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
976        Self { fs, base_dir }
977    }
978
979    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
980        for file in files {
981            let full_remote_path = PathBuf::from(format!(
982                "{}/{}",
983                self.base_dir,
984                file.remote_path.to_string_lossy()
985            ));
986            trace!("coping file: {file}");
987            self.fs
988                .copy(file.local_path.as_path(), full_remote_path)
989                .await?;
990        }
991        Ok(())
992    }
993
994    async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
995        let file = file.as_ref();
996
997        let full_path = if file.is_absolute() {
998            file.to_owned()
999        } else {
1000            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1001        };
1002        let content = self.fs.read(full_path).await?;
1003        Ok(content)
1004    }
1005
1006    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1007        let file = file.as_ref();
1008
1009        let full_path = if file.is_absolute() {
1010            file.to_owned()
1011        } else {
1012            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1013        };
1014        let content = self.fs.read_to_string(full_path).await?;
1015        Ok(content)
1016    }
1017
1018    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1019        let path = PathBuf::from(format!(
1020            "{}/{}",
1021            self.base_dir,
1022            path.as_ref().to_string_lossy()
1023        ));
1024        self.fs.create_dir(path).await
1025    }
1026
1027    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1028        let path = PathBuf::from(format!(
1029            "{}/{}",
1030            self.base_dir,
1031            path.as_ref().to_string_lossy()
1032        ));
1033        self.fs.create_dir_all(path).await
1034    }
1035
1036    async fn write(
1037        &self,
1038        path: impl AsRef<Path>,
1039        contents: impl AsRef<[u8]> + Send,
1040    ) -> Result<(), FileSystemError> {
1041        let path = path.as_ref();
1042
1043        let full_path = if path.is_absolute() {
1044            path.to_owned()
1045        } else {
1046            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1047        };
1048
1049        self.fs.write(full_path, contents).await
1050    }
1051
1052    /// Get the full_path in the scoped FS
1053    fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1054        let path = path.as_ref();
1055
1056        let full_path = if path.is_absolute() {
1057            path.to_owned()
1058        } else {
1059            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1060        };
1061
1062        full_path
1063    }
1064
1065    /// Get the base_dir in the scoped FS
1066    fn base_dir(&self) -> &str {
1067        self.base_dir
1068    }
1069}
1070
1071#[derive(Clone, Debug)]
1072pub enum ZombieRole {
1073    Temp,
1074    Node,
1075    Bootnode,
1076    Collator,
1077    CumulusCollator,
1078    Companion,
1079}
1080
1081// re-exports
1082pub use network::{AddCollatorOptions, AddNodeOptions};
1083pub use network_helper::metrics;
1084pub use sc_chain_spec;
1085
1086#[cfg(test)]
1087mod tests {
1088    use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1089    use lazy_static::lazy_static;
1090    use tokio::sync::Mutex;
1091
1092    use super::*;
1093
1094    const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1095    // mutex for test that use env
1096    lazy_static! {
1097        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1098    }
1099
1100    fn set_env(concurrency: Option<u32>) {
1101        if let Some(value) = concurrency {
1102            env::set_var(ENV_KEY, value.to_string());
1103        } else {
1104            env::remove_var(ENV_KEY);
1105        }
1106    }
1107
1108    fn generate(
1109        with_image: bool,
1110        with_cmd: Option<&'static str>,
1111    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1112        NetworkConfigBuilder::new()
1113            .with_relaychain(|r| {
1114                let mut relay = r
1115                    .with_chain("rococo-local")
1116                    .with_default_command(with_cmd.unwrap_or("polkadot"));
1117                if with_image {
1118                    relay = relay.with_default_image("docker.io/parity/polkadot")
1119                }
1120
1121                relay
1122                    .with_validator(|node| node.with_name("alice"))
1123                    .with_validator(|node| node.with_name("bob"))
1124            })
1125            .with_parachain(|p| {
1126                p.with_id(2000).cumulus_based(true).with_collator(|n| {
1127                    let node = n
1128                        .with_name("collator")
1129                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1130                    if with_image {
1131                        node.with_image("docker.io/paritypr/test-parachain")
1132                    } else {
1133                        node
1134                    }
1135                })
1136            })
1137            .build()
1138    }
1139
1140    fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1141        let mut spec = NodeSpec {
1142            name: name.to_string(),
1143            ..Default::default()
1144        };
1145        if let Some(dependencies) = dependencies {
1146            for node in dependencies {
1147                spec.args.push(
1148                    format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1149                        .as_str()
1150                        .into(),
1151                );
1152            }
1153        }
1154        spec
1155    }
1156
1157    fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1158        actual_levels
1159            .iter()
1160            .zip(expected_levels)
1161            .for_each(|(actual_level, expected_level)| {
1162                assert_eq!(actual_level.len(), expected_level.len());
1163                actual_level
1164                    .iter()
1165                    .zip(expected_level.iter())
1166                    .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1167            });
1168    }
1169
1170    #[tokio::test]
1171    async fn valid_config_with_image() {
1172        let network_config = generate(true, None).unwrap();
1173        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1174        let caps = ProviderCapabilities {
1175            requires_image: true,
1176            has_resources: false,
1177            prefix_with_full_path: false,
1178            use_default_ports_in_cmd: false,
1179        };
1180
1181        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1182        assert!(valid.is_ok())
1183    }
1184
1185    #[tokio::test]
1186    async fn invalid_config_without_image() {
1187        let network_config = generate(false, None).unwrap();
1188        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1189        let caps = ProviderCapabilities {
1190            requires_image: true,
1191            has_resources: false,
1192            prefix_with_full_path: false,
1193            use_default_ports_in_cmd: false,
1194        };
1195
1196        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1197        assert!(valid.is_err())
1198    }
1199
1200    #[tokio::test]
1201    async fn invalid_config_missing_cmd() {
1202        let network_config = generate(false, Some("other")).unwrap();
1203        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1204        let caps = ProviderCapabilities {
1205            requires_image: false,
1206            has_resources: false,
1207            prefix_with_full_path: false,
1208            use_default_ports_in_cmd: false,
1209        };
1210
1211        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1212        assert!(valid.is_err())
1213    }
1214
1215    #[tokio::test]
1216    async fn valid_config_present_cmd() {
1217        let network_config = generate(false, Some("cargo")).unwrap();
1218        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1219        let caps = ProviderCapabilities {
1220            requires_image: false,
1221            has_resources: false,
1222            prefix_with_full_path: false,
1223            use_default_ports_in_cmd: false,
1224        };
1225
1226        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1227        println!("{valid:?}");
1228        assert!(valid.is_ok())
1229    }
1230
1231    #[tokio::test]
1232    async fn default_spawn_concurrency() {
1233        let _g = ENV_MUTEX.lock().await;
1234        set_env(None);
1235        let network_config = generate(false, Some("cargo")).unwrap();
1236        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1237        let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1238        assert_eq!(concurrency, 100);
1239    }
1240
1241    #[tokio::test]
1242    async fn set_spawn_concurrency() {
1243        let _g = ENV_MUTEX.lock().await;
1244        set_env(None);
1245
1246        let network_config = generate(false, Some("cargo")).unwrap();
1247        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1248
1249        let global_settings = GlobalSettingsBuilder::new()
1250            .with_spawn_concurrency(4)
1251            .build()
1252            .unwrap();
1253
1254        spec.set_global_settings(global_settings);
1255        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1256        assert_eq!(concurrency, 4);
1257        assert!(!limited);
1258    }
1259
1260    #[tokio::test]
1261    async fn set_spawn_concurrency_but_limited() {
1262        let _g = ENV_MUTEX.lock().await;
1263        set_env(None);
1264
1265        let network_config = generate(false, Some("cargo")).unwrap();
1266        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1267
1268        let global_settings = GlobalSettingsBuilder::new()
1269            .with_spawn_concurrency(4)
1270            .build()
1271            .unwrap();
1272
1273        spec.set_global_settings(global_settings);
1274        let node = spec.relaychain.nodes.first_mut().unwrap();
1275        node.args
1276            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1277        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1278        assert_eq!(concurrency, 1);
1279        assert!(limited);
1280    }
1281
1282    #[tokio::test]
1283    async fn set_spawn_concurrency_from_env() {
1284        let _g = ENV_MUTEX.lock().await;
1285        set_env(Some(10));
1286
1287        let network_config = generate(false, Some("cargo")).unwrap();
1288        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1289        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1290        assert_eq!(concurrency, 10);
1291        assert!(!limited);
1292    }
1293
1294    #[tokio::test]
1295    async fn set_spawn_concurrency_from_env_but_limited() {
1296        let _g = ENV_MUTEX.lock().await;
1297        set_env(Some(12));
1298
1299        let network_config = generate(false, Some("cargo")).unwrap();
1300        let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1301        let node = spec.relaychain.nodes.first_mut().unwrap();
1302        node.args
1303            .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1304        let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1305        assert_eq!(concurrency, 1);
1306        assert!(limited);
1307    }
1308
1309    #[test]
1310    fn dependency_levels_among_should_work() {
1311        // no nodes
1312        assert!(dependency_levels_among(&[]).unwrap().is_empty());
1313
1314        // one node
1315        let alice = get_node_with_dependencies("alice", None);
1316        let nodes = [&alice];
1317
1318        let levels = dependency_levels_among(&nodes).unwrap();
1319        let expected = vec![vec!["alice"]];
1320
1321        verify_levels(levels, expected);
1322
1323        // two independent nodes
1324        let alice = get_node_with_dependencies("alice", None);
1325        let bob = get_node_with_dependencies("bob", None);
1326        let nodes = [&alice, &bob];
1327
1328        let levels = dependency_levels_among(&nodes).unwrap();
1329        let expected = vec![vec!["alice", "bob"]];
1330
1331        verify_levels(levels, expected);
1332
1333        // alice -> bob -> charlie
1334        let alice = get_node_with_dependencies("alice", None);
1335        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1336        let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1337        let nodes = [&alice, &bob, &charlie];
1338
1339        let levels = dependency_levels_among(&nodes).unwrap();
1340        let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1341
1342        verify_levels(levels, expected);
1343
1344        //         ┌─> bob
1345        // alice ──|
1346        //         └─> charlie
1347        let alice = get_node_with_dependencies("alice", None);
1348        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1349        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1350        let nodes = [&alice, &bob, &charlie];
1351
1352        let levels = dependency_levels_among(&nodes).unwrap();
1353        let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1354
1355        verify_levels(levels, expected);
1356
1357        //         ┌─>   bob  ──┐
1358        // alice ──|            ├─> dave
1359        //         └─> charlie  ┘
1360        let alice = get_node_with_dependencies("alice", None);
1361        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1362        let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1363        let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1364        let nodes = [&alice, &bob, &charlie, &dave];
1365
1366        let levels = dependency_levels_among(&nodes).unwrap();
1367        let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1368
1369        verify_levels(levels, expected);
1370    }
1371
1372    #[test]
1373    fn dependency_levels_among_should_detect_cycles() {
1374        let mut alice = get_node_with_dependencies("alice", None);
1375        let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1376        alice.args.push("{{ZOMBIE:bob:someField}}".into());
1377
1378        assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1379    }
1380}