zombienet_orchestrator/
lib.rs

1// TODO(Javier): Remove when we implement the logic in the orchestrator to spawn with the provider.
2#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17    collections::HashSet,
18    net::IpAddr,
19    path::{Path, PathBuf},
20    time::Duration,
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27// re-exported
28pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31    types::{ProviderCapabilities, TransferedFile},
32    DynProvider,
33};
34use serde_json::json;
35use support::fs::{FileSystem, FileSystemError};
36use tokio::time::timeout;
37use tracing::{debug, info, trace};
38
39use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
40pub struct Orchestrator<T>
41where
42    T: FileSystem + Sync + Send,
43{
44    filesystem: T,
45    provider: DynProvider,
46}
47
48impl<T> Orchestrator<T>
49where
50    T: FileSystem + Sync + Send + Clone,
51{
52    pub fn new(filesystem: T, provider: DynProvider) -> Self {
53        Self {
54            filesystem,
55            provider,
56        }
57    }
58
59    pub async fn spawn(
60        &self,
61        network_config: NetworkConfig,
62    ) -> Result<Network<T>, OrchestratorError> {
63        let global_timeout = network_config.global_settings().network_spawn_timeout();
64        let network_spec = NetworkSpec::from_config(&network_config).await?;
65
66        let res = timeout(
67            Duration::from_secs(global_timeout.into()),
68            self.spawn_inner(network_spec),
69        )
70        .await
71        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
72        res?
73    }
74
75    pub async fn spawn_from_spec(
76        &self,
77        network_spec: NetworkSpec,
78    ) -> Result<Network<T>, OrchestratorError> {
79        let global_timeout = network_spec.global_settings.network_spawn_timeout();
80        let res = timeout(
81            Duration::from_secs(global_timeout as u64),
82            self.spawn_inner(network_spec),
83        )
84        .await
85        .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86        res?
87    }
88
89    async fn spawn_inner(
90        &self,
91        mut network_spec: NetworkSpec,
92    ) -> Result<Network<T>, OrchestratorError> {
93        // main driver for spawn the network
94        debug!(network_spec = ?network_spec,"Network spec to spawn");
95
96        // TODO: move to Provider trait
97        validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
98            .map_err(|err| {
99                OrchestratorError::InvalidConfigForProvider(
100                    self.provider.name().into(),
101                    err.to_string(),
102                )
103            })?;
104
105        // create namespace
106        let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
107            self.provider
108                .create_namespace_with_base_dir(base_dir)
109                .await?
110        } else {
111            self.provider.create_namespace().await?
112        };
113
114        info!("🧰 ns: {}", ns.name());
115        info!("🧰 base_dir: {:?}", ns.base_dir());
116
117        network_spec
118            .populate_nodes_available_args(ns.clone())
119            .await?;
120
121        let base_dir = ns.base_dir().to_string_lossy();
122        let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
123        // Create chain-spec for relaychain
124        network_spec
125            .relaychain
126            .chain_spec
127            .build(&ns, &scoped_fs)
128            .await?;
129
130        debug!("relaychain spec built!");
131        // Create parachain artifacts (chain-spec, wasm, state)
132        let relay_chain_id = network_spec
133            .relaychain
134            .chain_spec
135            .read_chain_id(&scoped_fs)
136            .await?;
137
138        let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
139        let base_dir_exists = network_spec.global_settings.base_dir().is_some();
140        network_spec
141            .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
142            .await?;
143
144        // Gather the parachains to register in genesis and the ones to register with extrinsic
145        let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
146            Vec<&ParachainSpec>,
147            Vec<&ParachainSpec>,
148        ) = network_spec
149            .parachains
150            .iter()
151            .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
152            .partition(|para| {
153                matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
154            });
155
156        let mut para_artifacts = vec![];
157        for para in para_to_register_in_genesis {
158            let genesis_config = para.get_genesis_config()?;
159            para_artifacts.push(genesis_config)
160        }
161
162        // Customize relaychain
163        network_spec
164            .relaychain
165            .chain_spec
166            .customize_relay(
167                &network_spec.relaychain,
168                &network_spec.hrmp_channels,
169                para_artifacts,
170                &scoped_fs,
171            )
172            .await?;
173
174        // Build raw version
175        network_spec
176            .relaychain
177            .chain_spec
178            .build_raw(&ns, &scoped_fs)
179            .await?;
180
181        // override wasm if needed
182        if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
183            network_spec
184                .relaychain
185                .chain_spec
186                .override_code(&scoped_fs, wasm_override)
187                .await?;
188        }
189
190        let (bootnodes, relaynodes) =
191            split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
192
193        // TODO: we want to still supporting spawn a dedicated bootnode??
194        let mut ctx = SpawnNodeCtx {
195            chain_id: &relay_chain_id,
196            parachain_id: None,
197            chain: relay_chain_name.as_str(),
198            role: ZombieRole::Node,
199            ns: &ns,
200            scoped_fs: &scoped_fs,
201            parachain: None,
202            bootnodes_addr: &vec![],
203            wait_ready: false,
204            nodes_by_name: json!({}),
205        };
206
207        let global_files_to_inject = vec![TransferedFile::new(
208            PathBuf::from(format!(
209                "{}/{relay_chain_name}.json",
210                ns.base_dir().to_string_lossy()
211            )),
212            PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
213        )];
214
215        let r = Relaychain::new(
216            relay_chain_name.to_string(),
217            relay_chain_id.clone(),
218            PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
219                OrchestratorError::InvariantError("chain-spec raw path should be set now"),
220            )?),
221        );
222        let mut network =
223            Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
224
225        let spawning_tasks = bootnodes
226            .iter()
227            .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
228
229        // Initiate the node_ws_uel which will be later used in the Parachain_with_extrinsic config
230        let mut node_ws_url: String = "".to_string();
231
232        // Calculate the bootnodes addr from the running nodes
233        let mut bootnodes_addr: Vec<String> = vec![];
234        for node in futures::future::try_join_all(spawning_tasks).await? {
235            let bootnode_multiaddr = node.multiaddr();
236
237            bootnodes_addr.push(bootnode_multiaddr.to_string());
238
239            // Is used in the register_para_options (We need to get this from the relay and not the collators)
240            if node_ws_url.is_empty() {
241                node_ws_url.clone_from(&node.ws_uri)
242            }
243
244            // Add the node to the  context and `Network` instance
245            ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
246            network.add_running_node(node, None);
247        }
248
249        // Add the bootnodes to the relaychain spec file and ctx
250        network_spec
251            .relaychain
252            .chain_spec
253            .add_bootnodes(&scoped_fs, &bootnodes_addr)
254            .await?;
255
256        ctx.bootnodes_addr = &bootnodes_addr;
257
258        // spawn the rest of the nodes (TODO: in batches)
259        let spawning_tasks = relaynodes
260            .iter()
261            .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
262
263        for node in futures::future::try_join_all(spawning_tasks).await? {
264            // Add the node to the  context and `Network` instance
265            ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
266            network.add_running_node(node, None);
267        }
268
269        // spawn paras
270        for para in network_spec.parachains.iter() {
271            // Create parachain (in the context of the running network)
272            let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
273            let parachain_id = parachain.chain_id.clone();
274
275            let (bootnodes, collators) =
276                split_nodes_by_bootnodes(&para.collators, para.no_default_bootnodes);
277
278            // Create `ctx` for spawn parachain nodes
279            let mut ctx_para = SpawnNodeCtx {
280                parachain: Some(para),
281                parachain_id: parachain_id.as_deref(),
282                role: if para.is_cumulus_based {
283                    ZombieRole::CumulusCollator
284                } else {
285                    ZombieRole::Collator
286                },
287                bootnodes_addr: &vec![],
288                ..ctx.clone()
289            };
290
291            let spawning_tasks = bootnodes.iter().map(|node| {
292                spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
293            });
294
295            // Calculate the bootnodes addr from the running nodes
296            let mut bootnodes_addr: Vec<String> = vec![];
297            let mut running_nodes: Vec<NetworkNode> = vec![];
298            for node in futures::future::try_join_all(spawning_tasks).await? {
299                let bootnode_multiaddr = node.multiaddr();
300
301                bootnodes_addr.push(bootnode_multiaddr.to_string());
302                ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
303                running_nodes.push(node);
304            }
305
306            if let Some(para_chain_spec) = para.chain_spec.as_ref() {
307                para_chain_spec
308                    .add_bootnodes(&scoped_fs, &bootnodes_addr)
309                    .await?;
310            }
311
312            ctx_para.bootnodes_addr = &bootnodes_addr;
313
314            // Spawn the rest of the nodes
315            let spawning_tasks = collators.iter().map(|node| {
316                spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
317            });
318
319            // join all the running nodes
320            running_nodes.extend_from_slice(
321                futures::future::try_join_all(spawning_tasks)
322                    .await?
323                    .as_slice(),
324            );
325
326            let running_para_id = parachain.para_id;
327            network.add_para(parachain);
328            for node in running_nodes {
329                network.add_running_node(node, Some(running_para_id));
330            }
331        }
332
333        // TODO:
334        // - add-ons (introspector/tracing/etc)
335
336        // verify nodes
337        // network_helper::verifier::verify_nodes(&network.nodes()).await?;
338
339        // Now we need to register the paras with extrinsic from the Vec collected before;
340        for para in para_to_register_with_extrinsic {
341            let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
342                id: para.id,
343                // This needs to resolve correctly
344                wasm_path: para
345                    .genesis_wasm
346                    .artifact_path()
347                    .ok_or(OrchestratorError::InvariantError(
348                        "artifact path for wasm must be set at this point",
349                    ))?
350                    .to_path_buf(),
351                state_path: para
352                    .genesis_state
353                    .artifact_path()
354                    .ok_or(OrchestratorError::InvariantError(
355                        "artifact path for state must be set at this point",
356                    ))?
357                    .to_path_buf(),
358                node_ws_url: node_ws_url.clone(),
359                onboard_as_para: para.onboard_as_parachain,
360                seed: None, // TODO: Seed is passed by?
361                finalization: false,
362            };
363
364            Parachain::register(register_para_options, &scoped_fs).await?;
365        }
366
367        // - write zombie.json state file
368        let mut zombie_json = serde_json::to_value(&network)?;
369        zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
370
371        scoped_fs
372            .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
373            .await?;
374        Ok(network)
375    }
376}
377
378// Helpers
379
380// Split the node list depending if it's bootnode or not
381// NOTE: if there isn't a bootnode declared we use the first one
382fn split_nodes_by_bootnodes(
383    nodes: &[NodeSpec],
384    no_default_bootnodes: bool,
385) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
386    // get the bootnodes to spawn first and calculate the bootnode string for use later
387    let mut bootnodes = vec![];
388    let mut other_nodes = vec![];
389    nodes.iter().for_each(|node| {
390        if node.is_bootnode {
391            bootnodes.push(node)
392        } else {
393            other_nodes.push(node)
394        }
395    });
396
397    if bootnodes.is_empty() && !no_default_bootnodes {
398        bootnodes.push(other_nodes.remove(0))
399    }
400
401    (bootnodes, other_nodes)
402}
403
404// Generate a bootnode multiaddress and return as string
405fn generate_bootnode_addr(
406    node: &NetworkNode,
407    ip: &IpAddr,
408    port: u16,
409) -> Result<String, GeneratorError> {
410    generators::generate_node_bootnode_addr(
411        &node.spec.peer_id,
412        ip,
413        port,
414        node.inner.args().as_ref(),
415        &node.spec.p2p_cert_hash,
416    )
417}
418// Validate that the config fulfill all the requirements of the provider
419fn validate_spec_with_provider_capabilities(
420    network_spec: &NetworkSpec,
421    capabilities: &ProviderCapabilities,
422) -> Result<(), anyhow::Error> {
423    let mut errs: Vec<String> = vec![];
424
425    if capabilities.requires_image {
426        // Relaychain
427        if network_spec.relaychain.default_image.is_none() {
428            // we should check if each node have an image
429            let nodes = &network_spec.relaychain.nodes;
430            if nodes.iter().any(|node| node.image.is_none()) {
431                errs.push(String::from(
432                    "Missing image for node, and not default is set at relaychain",
433                ));
434            }
435        };
436
437        // Paras
438        for para in &network_spec.parachains {
439            if para.default_image.is_none() {
440                let nodes = &para.collators;
441                if nodes.iter().any(|node| node.image.is_none()) {
442                    errs.push(format!(
443                        "Missing image for node, and not default is set at parachain {}",
444                        para.id
445                    ));
446                }
447            }
448        }
449    } else {
450        // native
451        // We need to get all the `cmds` and verify if are part of the path
452        let mut cmds: HashSet<&str> = Default::default();
453        if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
454            cmds.insert(cmd.as_str());
455        }
456        for node in network_spec.relaychain().nodes.iter() {
457            cmds.insert(node.command());
458        }
459
460        // Paras
461        for para in &network_spec.parachains {
462            if let Some(cmd) = para.default_command.as_ref() {
463                cmds.insert(cmd.as_str());
464            }
465
466            for node in para.collators.iter() {
467                cmds.insert(node.command());
468            }
469        }
470
471        // now check the binaries
472        let path = std::env::var("PATH").unwrap_or_default(); // path should always be set
473        trace!("current PATH: {path}");
474        let parts: Vec<_> = path.split(":").collect();
475        for cmd in cmds {
476            let missing = if cmd.contains('/') {
477                trace!("checking {cmd}");
478                if std::fs::metadata(cmd).is_err() {
479                    true
480                } else {
481                    info!("🔎  We will use the full path {cmd} to spawn nodes.");
482                    false
483                }
484            } else {
485                // should be in the PATH
486                !parts.iter().any(|part| {
487                    let path_to = format!("{}/{}", part, cmd);
488                    trace!("checking {path_to}");
489                    let check_result = std::fs::metadata(&path_to);
490                    trace!("result {:?}", check_result);
491                    if check_result.is_ok() {
492                        info!("🔎  We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
493                        true
494                    } else {
495                        false
496                    }
497                })
498            };
499
500            if missing {
501                errs.push(help_msg(cmd));
502            }
503        }
504    }
505
506    if !errs.is_empty() {
507        let msg = errs.join("\n");
508        return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
509    }
510
511    Ok(())
512}
513
514fn help_msg(cmd: &str) -> String {
515    match cmd {
516        "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
517            format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
518        },
519        "polkadot" => {
520            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
521        },
522        "polkadot-parachain" => {
523            format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
524        },
525        _ => {
526            format!("Missing binary {cmd}, please compile it.")
527        },
528    }
529}
530
531// TODO: get the fs from `DynNamespace` will make this not needed
532// but the FileSystem trait isn't object-safe so we can't pass around
533// as `dyn FileSystem`. We can refactor or using some `erase` techniques
534// to resolve this and remove this struct
535// TODO (Loris): Probably we could have a .scoped(base_dir) method on the
536// filesystem itself (the trait), so it will return this and we can move this
537// directly to the support crate, it can be useful in the future
538#[derive(Clone, Debug)]
539pub struct ScopedFilesystem<'a, FS: FileSystem> {
540    fs: &'a FS,
541    base_dir: &'a str,
542}
543
544impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
545    pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
546        Self { fs, base_dir }
547    }
548
549    async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
550        for file in files {
551            let full_remote_path = PathBuf::from(format!(
552                "{}/{}",
553                self.base_dir,
554                file.remote_path.to_string_lossy()
555            ));
556            trace!("coping file: {file}");
557            self.fs
558                .copy(file.local_path.as_path(), full_remote_path)
559                .await?;
560        }
561        Ok(())
562    }
563
564    async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
565        let file = file.as_ref();
566
567        let full_path = if file.is_absolute() {
568            file.to_owned()
569        } else {
570            PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
571        };
572        let content = self.fs.read_to_string(full_path).await?;
573        Ok(content)
574    }
575
576    async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
577        let path = PathBuf::from(format!(
578            "{}/{}",
579            self.base_dir,
580            path.as_ref().to_string_lossy()
581        ));
582        self.fs.create_dir(path).await
583    }
584
585    async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
586        let path = PathBuf::from(format!(
587            "{}/{}",
588            self.base_dir,
589            path.as_ref().to_string_lossy()
590        ));
591        self.fs.create_dir_all(path).await
592    }
593
594    async fn write(
595        &self,
596        path: impl AsRef<Path>,
597        contents: impl AsRef<[u8]> + Send,
598    ) -> Result<(), FileSystemError> {
599        let path = path.as_ref();
600
601        let full_path = if path.is_absolute() {
602            path.to_owned()
603        } else {
604            PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
605        };
606
607        self.fs.write(full_path, contents).await
608    }
609}
610
611#[derive(Clone, Debug)]
612pub enum ZombieRole {
613    Temp,
614    Node,
615    Bootnode,
616    Collator,
617    CumulusCollator,
618    Companion,
619}
620
621// re-exports
622pub use network::{AddCollatorOptions, AddNodeOptions};
623pub use network_helper::metrics;
624#[cfg(feature = "pjs")]
625pub use pjs_helper::PjsResult;
626
627#[cfg(test)]
628mod tests {
629    use configuration::NetworkConfigBuilder;
630
631    use super::*;
632
633    fn generate(
634        with_image: bool,
635        with_cmd: Option<&'static str>,
636    ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
637        NetworkConfigBuilder::new()
638            .with_relaychain(|r| {
639                let mut relay = r
640                    .with_chain("rococo-local")
641                    .with_default_command(with_cmd.unwrap_or("polkadot"));
642                if with_image {
643                    relay = relay.with_default_image("docker.io/parity/polkadot")
644                }
645
646                relay
647                    .with_node(|node| node.with_name("alice"))
648                    .with_node(|node| node.with_name("bob"))
649            })
650            .with_parachain(|p| {
651                p.with_id(2000).cumulus_based(true).with_collator(|n| {
652                    let node = n
653                        .with_name("collator")
654                        .with_command(with_cmd.unwrap_or("polkadot-parachain"));
655                    if with_image {
656                        node.with_image("docker.io/paritypr/test-parachain")
657                    } else {
658                        node
659                    }
660                })
661            })
662            .build()
663    }
664
665    #[tokio::test]
666    async fn valid_config_with_image() {
667        let network_config = generate(true, None).unwrap();
668        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
669        let caps = ProviderCapabilities {
670            requires_image: true,
671            has_resources: false,
672            prefix_with_full_path: false,
673            use_default_ports_in_cmd: false,
674        };
675
676        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
677        assert!(valid.is_ok())
678    }
679
680    #[tokio::test]
681    async fn invalid_config_without_image() {
682        let network_config = generate(false, None).unwrap();
683        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
684        let caps = ProviderCapabilities {
685            requires_image: true,
686            has_resources: false,
687            prefix_with_full_path: false,
688            use_default_ports_in_cmd: false,
689        };
690
691        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
692        assert!(valid.is_err())
693    }
694
695    #[tokio::test]
696    async fn invalid_config_missing_cmd() {
697        let network_config = generate(false, Some("other")).unwrap();
698        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
699        let caps = ProviderCapabilities {
700            requires_image: false,
701            has_resources: false,
702            prefix_with_full_path: false,
703            use_default_ports_in_cmd: false,
704        };
705
706        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
707        assert!(valid.is_err())
708    }
709
710    #[tokio::test]
711    async fn valid_config_present_cmd() {
712        let network_config = generate(false, Some("cargo")).unwrap();
713        let spec = NetworkSpec::from_config(&network_config).await.unwrap();
714        let caps = ProviderCapabilities {
715            requires_image: false,
716            has_resources: false,
717            prefix_with_full_path: false,
718            use_default_ports_in_cmd: false,
719        };
720
721        let valid = validate_spec_with_provider_capabilities(&spec, &caps);
722        println!("{:?}", valid);
723        assert!(valid.is_ok())
724    }
725}