zombienet_orchestrator/
spawner.rs

1use std::{collections::HashMap, path::PathBuf};
2
3use anyhow::Context;
4use configuration::GlobalSettings;
5use provider::{
6    constants::{LOCALHOST, NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, P2P_PORT},
7    shared::helpers::running_in_ci,
8    types::{SpawnNodeOptions, TransferedFile},
9    DynNamespace,
10};
11use support::{
12    constants::THIS_IS_A_BUG, fs::FileSystem, replacer::apply_running_network_replacements,
13};
14use tracing::info;
15
16use crate::{
17    generators,
18    network::node::NetworkNode,
19    network_spec::{node::NodeSpec, parachain::ParachainSpec},
20    shared::constants::{FULL_NODE_PROMETHEUS_PORT, PROMETHEUS_PORT, RPC_PORT},
21    ScopedFilesystem, ZombieRole,
22};
23
24#[derive(Clone)]
25pub struct SpawnNodeCtx<'a, T: FileSystem> {
26    /// Relaychain id, from the chain-spec (e.g rococo_local_testnet)
27    pub(crate) chain_id: &'a str,
28    // Parachain id, from the chain-spec (e.g local_testnet)
29    pub(crate) parachain_id: Option<&'a str>,
30    /// Relaychain chain name (e.g rococo-local)
31    pub(crate) chain: &'a str,
32    /// Role of the node in the network
33    pub(crate) role: ZombieRole,
34    /// Ref to the namespace
35    pub(crate) ns: &'a DynNamespace,
36    /// Ref to an scoped filesystem (encapsulate fs actions inside the ns directory)
37    pub(crate) scoped_fs: &'a ScopedFilesystem<'a, T>,
38    /// Ref to a parachain (used to spawn collators)
39    pub(crate) parachain: Option<&'a ParachainSpec>,
40    /// The string representation of the bootnode address to pass to nodes
41    pub(crate) bootnodes_addr: &'a Vec<String>,
42    /// Flag to wait node is ready or not
43    /// Ready state means we can query Prometheus internal server
44    pub(crate) wait_ready: bool,
45    /// A json representation of the running nodes with their names as 'key'
46    pub(crate) nodes_by_name: serde_json::Value,
47    /// A ref to the global settings
48    pub(crate) global_settings: &'a GlobalSettings,
49}
50
51pub async fn spawn_node<'a, T>(
52    node: &NodeSpec,
53    mut files_to_inject: Vec<TransferedFile>,
54    ctx: &SpawnNodeCtx<'a, T>,
55) -> Result<NetworkNode, anyhow::Error>
56where
57    T: FileSystem,
58{
59    let mut created_paths = vec![];
60    // Create and inject the keystore IFF
61    // - The node is validator in the relaychain
62    // - The node is collator (encoded as validator) and the parachain is cumulus_based
63    // (parachain_id) should be set then.
64    if node.is_validator && (ctx.parachain.is_none() || ctx.parachain_id.is_some()) {
65        // Generate keystore for node
66        let node_files_path = if let Some(para) = ctx.parachain {
67            para.id.to_string()
68        } else {
69            node.name.clone()
70        };
71        let asset_hub_polkadot = ctx
72            .parachain_id
73            .map(|id| id.starts_with("asset-hub-polkadot"))
74            .unwrap_or_default();
75        let keystore_key_types = node.keystore_key_types.iter().map(String::as_str).collect();
76        let key_filenames = generators::generate_node_keystore(
77            &node.accounts,
78            &node_files_path,
79            ctx.scoped_fs,
80            asset_hub_polkadot,
81            keystore_key_types,
82        )
83        .await
84        .unwrap();
85
86        // Paths returned are relative to the base dir, we need to convert into
87        // fullpaths to inject them in the nodes.
88        let remote_keystore_chain_id = if let Some(id) = ctx.parachain_id {
89            id
90        } else {
91            ctx.chain_id
92        };
93
94        let keystore_path = node.keystore_path.clone().unwrap_or(PathBuf::from(format!(
95            "/data/chains/{remote_keystore_chain_id}/keystore",
96        )));
97
98        for key_filename in key_filenames {
99            let f = TransferedFile::new(
100                PathBuf::from(format!(
101                    "{}/{}/{}",
102                    ctx.ns.base_dir().to_string_lossy(),
103                    node_files_path,
104                    key_filename.to_string_lossy()
105                )),
106                keystore_path.join(key_filename),
107            );
108            files_to_inject.push(f);
109        }
110        created_paths.push(keystore_path);
111    }
112
113    let base_dir = format!("{}/{}", ctx.ns.base_dir().to_string_lossy(), &node.name);
114
115    let (cfg_path, data_path, relay_data_path) = if !ctx.ns.capabilities().prefix_with_full_path {
116        (
117            NODE_CONFIG_DIR.into(),
118            NODE_DATA_DIR.into(),
119            NODE_RELAY_DATA_DIR.into(),
120        )
121    } else {
122        let cfg_path = format!("{}{NODE_CONFIG_DIR}", &base_dir);
123        let data_path = format!("{}{NODE_DATA_DIR}", &base_dir);
124        let relay_data_path = format!("{}{NODE_RELAY_DATA_DIR}", &base_dir);
125        (cfg_path, data_path, relay_data_path)
126    };
127
128    let gen_opts = generators::GenCmdOptions {
129        relay_chain_name: ctx.chain,
130        cfg_path: &cfg_path,               // TODO: get from provider/ns
131        data_path: &data_path,             // TODO: get from provider
132        relay_data_path: &relay_data_path, // TODO: get from provider
133        use_wrapper: false,                // TODO: get from provider
134        bootnode_addr: ctx.bootnodes_addr.clone(),
135        use_default_ports_in_cmd: ctx.ns.capabilities().use_default_ports_in_cmd,
136        // IFF the provider require an image (e.g k8s) we know this is not native
137        is_native: !ctx.ns.capabilities().requires_image,
138    };
139
140    let mut collator_full_node_prom_port: Option<u16> = None;
141    let mut collator_full_node_prom_port_external: Option<u16> = None;
142
143    let (program, args) = match ctx.role {
144        // Collator should be `non-cumulus` one (e.g adder/undying)
145        ZombieRole::Node | ZombieRole::Collator => {
146            let maybe_para_id = ctx.parachain.map(|para| para.id);
147
148            generators::generate_node_command(node, gen_opts, maybe_para_id)
149        },
150        ZombieRole::CumulusCollator => {
151            let para = ctx.parachain.expect(&format!(
152                "parachain must be part of the context {THIS_IS_A_BUG}"
153            ));
154            collator_full_node_prom_port = node.full_node_prometheus_port.as_ref().map(|p| p.0);
155
156            generators::generate_node_command_cumulus(node, gen_opts, para.id)
157        },
158        _ => unreachable!(), /* TODO: do we need those?
159                              * ZombieRole::Bootnode => todo!(),
160                              * ZombieRole::Companion => todo!(), */
161    };
162
163    // apply running networ replacements
164    let args: Vec<String> = args
165        .iter()
166        .map(|arg| apply_running_network_replacements(arg, &ctx.nodes_by_name))
167        .collect();
168
169    info!(
170        "🚀 {}, spawning.... with command: {} {}",
171        node.name,
172        program,
173        args.join(" ")
174    );
175
176    let ports = if ctx.ns.capabilities().use_default_ports_in_cmd {
177        // should use default ports to as internal
178        [
179            (P2P_PORT, node.p2p_port.0),
180            (RPC_PORT, node.rpc_port.0),
181            (PROMETHEUS_PORT, node.prometheus_port.0),
182        ]
183    } else {
184        [
185            (P2P_PORT, P2P_PORT),
186            (RPC_PORT, RPC_PORT),
187            (PROMETHEUS_PORT, PROMETHEUS_PORT),
188        ]
189    };
190
191    let spawn_ops = SpawnNodeOptions::new(node.name.clone(), program)
192        .args(args)
193        .env(
194            node.env
195                .iter()
196                .map(|var| (var.name.clone(), var.value.clone())),
197        )
198        .injected_files(files_to_inject)
199        .created_paths(created_paths)
200        .db_snapshot(node.db_snapshot.clone())
201        .port_mapping(HashMap::from(ports))
202        .node_log_path(node.node_log_path.clone());
203
204    let spawn_ops = if let Some(image) = node.image.as_ref() {
205        spawn_ops.image(image.as_str())
206    } else {
207        spawn_ops
208    };
209
210    // Drops the port parking listeners before spawn
211    node.ws_port.drop_listener();
212    node.p2p_port.drop_listener();
213    node.rpc_port.drop_listener();
214    node.prometheus_port.drop_listener();
215    if let Some(port) = &node.full_node_p2p_port {
216        port.drop_listener();
217    }
218    if let Some(port) = &node.full_node_prometheus_port {
219        port.drop_listener();
220    }
221
222    let running_node = ctx.ns.spawn_node(&spawn_ops).await.with_context(|| {
223        format!(
224            "Failed to spawn node: {} with opts: {:#?}",
225            node.name, spawn_ops
226        )
227    })?;
228
229    let mut ip_to_use = if let Some(local_ip) = ctx.global_settings.local_ip() {
230        *local_ip
231    } else {
232        LOCALHOST
233    };
234
235    let (rpc_port_external, prometheus_port_external, p2p_external);
236
237    if running_in_ci() && ctx.ns.provider_name() == "k8s" {
238        // running kubernets in ci require to use ip and default port
239        (rpc_port_external, prometheus_port_external, p2p_external) =
240            (RPC_PORT, PROMETHEUS_PORT, P2P_PORT);
241        collator_full_node_prom_port_external = Some(FULL_NODE_PROMETHEUS_PORT);
242        ip_to_use = running_node.ip().await?;
243    } else {
244        // Create port-forward iff we are not in CI or provider doesn't use the default ports (native)
245        let ports = futures::future::try_join_all(vec![
246            running_node.create_port_forward(node.rpc_port.0, RPC_PORT),
247            running_node.create_port_forward(node.prometheus_port.0, PROMETHEUS_PORT),
248        ])
249        .await?;
250
251        (rpc_port_external, prometheus_port_external, p2p_external) = (
252            ports[0].unwrap_or(node.rpc_port.0),
253            ports[1].unwrap_or(node.prometheus_port.0),
254            // p2p don't need port-fwd
255            node.p2p_port.0,
256        );
257
258        if let Some(full_node_prom_port) = collator_full_node_prom_port {
259            let port_fwd = running_node
260                .create_port_forward(full_node_prom_port, FULL_NODE_PROMETHEUS_PORT)
261                .await?;
262            collator_full_node_prom_port_external = Some(port_fwd.unwrap_or(full_node_prom_port));
263        }
264    }
265
266    let multiaddr = generators::generate_node_bootnode_addr(
267        &node.peer_id,
268        &running_node.ip().await?,
269        p2p_external,
270        running_node.args().as_ref(),
271        &node.p2p_cert_hash,
272    )?;
273
274    let ws_uri = format!("ws://{ip_to_use}:{rpc_port_external}");
275    let prometheus_uri = format!("http://{ip_to_use}:{prometheus_port_external}/metrics");
276    info!("🚀 {}, should be running now", node.name);
277    info!(
278        "💻 {}: direct link (pjs) https://polkadot.js.org/apps/?rpc={ws_uri}#/explorer",
279        node.name
280    );
281    info!(
282        "💻 {}: direct link (papi) https://dev.papi.how/explorer#networkId=custom&endpoint={ws_uri}",
283        node.name
284    );
285
286    info!("📊 {}: metrics link {prometheus_uri}", node.name);
287
288    if let Some(full_node_prom_port) = collator_full_node_prom_port_external {
289        info!(
290            "📊 {}: collator full-node metrics link http://{}:{}/metrics",
291            node.name, ip_to_use, full_node_prom_port
292        );
293    }
294
295    info!("📓 logs cmd: {}", running_node.log_cmd());
296
297    Ok(NetworkNode::new(
298        node.name.clone(),
299        ws_uri,
300        prometheus_uri,
301        multiaddr,
302        node.clone(),
303        running_node,
304    ))
305}