1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 env,
19 net::IpAddr,
20 path::{Path, PathBuf},
21 time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32 types::{ProviderCapabilities, TransferedFile},
33 DynNamespace, DynProvider,
34};
35use serde_json::json;
36use support::{
37 constants::{
38 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39 THIS_IS_A_BUG,
40 },
41 fs::{FileSystem, FileSystemError},
42 replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{
48 network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
49 shared::types::RegisterParachainOptions,
50 spawner::SpawnNodeCtx,
51 utils::write_zombie_json,
52};
53pub struct Orchestrator<T>
54where
55 T: FileSystem + Sync + Send,
56{
57 filesystem: T,
58 provider: DynProvider,
59}
60
61impl<T> Orchestrator<T>
62where
63 T: FileSystem + Sync + Send + Clone,
64{
65 pub fn new(filesystem: T, provider: DynProvider) -> Self {
66 Self {
67 filesystem,
68 provider,
69 }
70 }
71
72 pub async fn spawn(
73 &self,
74 network_config: NetworkConfig,
75 ) -> Result<Network<T>, OrchestratorError> {
76 let global_timeout = network_config.global_settings().network_spawn_timeout();
77 let network_spec = NetworkSpec::from_config(&network_config).await?;
78
79 let res = timeout(
80 Duration::from_secs(global_timeout.into()),
81 self.spawn_inner(network_spec),
82 )
83 .await
84 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
85 res?
86 }
87
88 pub async fn spawn_from_spec(
89 &self,
90 network_spec: NetworkSpec,
91 ) -> Result<Network<T>, OrchestratorError> {
92 let global_timeout = network_spec.global_settings.network_spawn_timeout();
93 let res = timeout(
94 Duration::from_secs(global_timeout as u64),
95 self.spawn_inner(network_spec),
96 )
97 .await
98 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
99 res?
100 }
101
102 pub async fn attach_to_live(
103 &self,
104 zombie_json_path: &Path,
105 ) -> Result<Network<T>, OrchestratorError> {
106 info!("attaching to live network...");
107 info!("reading zombie.json from {:?}", zombie_json_path);
108
109 let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
110 let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
111
112 info!("recreating namespace...");
113 let ns: DynNamespace = self
114 .provider
115 .create_namespace_from_json(&zombie_json)
116 .await?;
117
118 info!("recreating relaychain...");
119 let (relay, initial_spec) =
120 recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
121 let relay_nodes = relay.nodes.clone();
122
123 let mut network =
124 Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
125
126 for node in relay_nodes {
127 if node.is_responsive().await {
128 node.set_is_running(true);
129 }
130 network.insert_node(node);
131 }
132
133 info!("recreating parachains...");
134 let parachains_map =
135 recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
136 let para_nodes = parachains_map
137 .values()
138 .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
139 .collect::<Vec<NetworkNode>>();
140
141 network.set_parachains(parachains_map);
142 for node in para_nodes {
143 if node.is_responsive().await {
144 node.set_is_running(true);
145 }
146 network.insert_node(node);
147 }
148
149 Ok(network)
150 }
151
152 async fn spawn_inner(
153 &self,
154 mut network_spec: NetworkSpec,
155 ) -> Result<Network<T>, OrchestratorError> {
156 debug!(network_spec = ?network_spec,"Network spec to spawn");
158
159 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
161 .map_err(|err| {
162 OrchestratorError::InvalidConfigForProvider(
163 self.provider.name().into(),
164 err.to_string(),
165 )
166 })?;
167
168 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
170 self.provider
171 .create_namespace_with_base_dir(base_dir)
172 .await?
173 } else {
174 self.provider.create_namespace().await?
175 };
176
177 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
179
180 let start_time = SystemTime::now();
181 info!("🧰 ns: {}", ns.name());
182 info!("🧰 base_dir: {:?}", ns.base_dir());
183 info!("🕰 start time: {:?}", start_time);
184 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
185
186 network_spec
187 .populate_nodes_available_args(ns.clone())
188 .await?;
189
190 let base_dir = ns.base_dir().to_string_lossy();
191 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
192 network_spec
194 .relaychain
195 .chain_spec
196 .build(&ns, &scoped_fs)
197 .await?;
198
199 debug!("relaychain spec built!");
200 let relay_chain_id = network_spec
202 .relaychain
203 .chain_spec
204 .read_chain_id(&scoped_fs)
205 .await?;
206
207 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
208 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
209 network_spec
210 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
211 .await?;
212
213 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
215 Vec<&ParachainSpec>,
216 Vec<&ParachainSpec>,
217 ) = network_spec
218 .parachains
219 .iter()
220 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
221 .partition(|para| {
222 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
223 });
224
225 let mut para_artifacts = vec![];
226 for para in para_to_register_in_genesis {
227 let genesis_config = para.get_genesis_config()?;
228 para_artifacts.push(genesis_config)
229 }
230
231 network_spec
233 .relaychain
234 .chain_spec
235 .customize_relay(
236 &network_spec.relaychain,
237 &network_spec.hrmp_channels,
238 para_artifacts,
239 &scoped_fs,
240 )
241 .await?;
242
243 if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
245 network_spec
246 .relaychain
247 .chain_spec
248 .run_post_process_script(script_cmd, &scoped_fs)
249 .await?;
250 }
251
252 network_spec
254 .relaychain
255 .chain_spec
256 .build_raw(&ns, &scoped_fs, None)
257 .await?;
258
259 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
261 network_spec
262 .relaychain
263 .chain_spec
264 .override_code(&scoped_fs, wasm_override)
265 .await?;
266 }
267
268 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
270 network_spec
271 .relaychain
272 .chain_spec
273 .override_raw_spec(&scoped_fs, raw_spec_override)
274 .await?;
275 }
276
277 let (bootnodes, relaynodes) =
278 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
279
280 let mut ctx = SpawnNodeCtx {
282 chain_id: &relay_chain_id,
283 parachain_id: None,
284 chain: relay_chain_name.as_str(),
285 role: ZombieRole::Node,
286 ns: &ns,
287 scoped_fs: &scoped_fs,
288 parachain: None,
289 bootnodes_addr: &vec![],
290 wait_ready: false,
291 nodes_by_name: json!({}),
292 global_settings: &network_spec.global_settings,
293 };
294
295 let global_files_to_inject = vec![TransferedFile::new(
296 PathBuf::from(format!(
297 "{}/{relay_chain_name}.json",
298 ns.base_dir().to_string_lossy()
299 )),
300 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
301 )];
302
303 let r = Relaychain::new(
304 relay_chain_name.to_string(),
305 relay_chain_id.clone(),
306 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
307 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
308 )?),
309 );
310 let mut network =
311 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
312
313 let mut node_ws_url: String = "".to_string();
315
316 let mut bootnodes_addr: Vec<String> = vec![];
318
319 for level in dependency_levels_among(&bootnodes)? {
320 let mut running_nodes_per_level = vec![];
321 for chunk in level.chunks(spawn_concurrency) {
322 let spawning_tasks = chunk
323 .iter()
324 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
325
326 for node in futures::future::try_join_all(spawning_tasks).await? {
327 let bootnode_multiaddr = node.multiaddr();
328
329 bootnodes_addr.push(bootnode_multiaddr.to_string());
330
331 if node_ws_url.is_empty() {
333 node_ws_url.clone_from(&node.ws_uri)
334 }
335
336 running_nodes_per_level.push(node);
337 }
338 }
339 info!(
340 "🕰 waiting for level: {:?} to be up...",
341 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
342 );
343
344 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
346 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
347 });
348
349 let _ = futures::future::try_join_all(waiting_tasks).await?;
350
351 for node in running_nodes_per_level {
352 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
354 network.add_running_node(node, None).await;
355 }
356 }
357
358 network_spec
360 .relaychain
361 .chain_spec
362 .add_bootnodes(&scoped_fs, &bootnodes_addr)
363 .await?;
364
365 ctx.bootnodes_addr = &bootnodes_addr;
366
367 for level in dependency_levels_among(&relaynodes)? {
368 let mut running_nodes_per_level = vec![];
369 for chunk in level.chunks(spawn_concurrency) {
370 let spawning_tasks = chunk
371 .iter()
372 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
373
374 for node in futures::future::try_join_all(spawning_tasks).await? {
375 running_nodes_per_level.push(node);
376 }
377 }
378 info!(
379 "🕰 waiting for level: {:?} to be up...",
380 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
381 );
382
383 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
385 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
386 });
387
388 let _ = futures::future::try_join_all(waiting_tasks).await?;
389
390 for node in running_nodes_per_level {
391 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
392 network.add_running_node(node, None).await;
393 }
394 }
395
396 for para in network_spec.parachains.iter() {
398 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
400 let parachain_id = parachain.chain_id.clone();
401
402 let (bootnodes, collators) =
403 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
404
405 let mut ctx_para = SpawnNodeCtx {
407 parachain: Some(para),
408 parachain_id: parachain_id.as_deref(),
409 role: if para.is_cumulus_based {
410 ZombieRole::CumulusCollator
411 } else {
412 ZombieRole::Collator
413 },
414 bootnodes_addr: &vec![],
415 ..ctx.clone()
416 };
417
418 let mut bootnodes_addr: Vec<String> = vec![];
420 let mut running_nodes: Vec<NetworkNode> = vec![];
421
422 for level in dependency_levels_among(&bootnodes)? {
423 let mut running_nodes_per_level = vec![];
424 for chunk in level.chunks(spawn_concurrency) {
425 let spawning_tasks = chunk.iter().map(|node| {
426 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
427 });
428
429 for node in futures::future::try_join_all(spawning_tasks).await? {
430 let bootnode_multiaddr = node.multiaddr();
431
432 bootnodes_addr.push(bootnode_multiaddr.to_string());
433
434 running_nodes_per_level.push(node);
435 }
436 }
437 info!(
438 "🕰 waiting for level: {:?} to be up...",
439 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
440 );
441
442 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
444 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
445 });
446
447 let _ = futures::future::try_join_all(waiting_tasks).await?;
448
449 for node in running_nodes_per_level {
450 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
451 running_nodes.push(node);
452 }
453 }
454
455 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
456 para_chain_spec
457 .add_bootnodes(&scoped_fs, &bootnodes_addr)
458 .await?;
459 }
460
461 ctx_para.bootnodes_addr = &bootnodes_addr;
462
463 for level in dependency_levels_among(&collators)? {
465 let mut running_nodes_per_level = vec![];
466 for chunk in level.chunks(spawn_concurrency) {
467 let spawning_tasks = chunk.iter().map(|node| {
468 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
469 });
470
471 for node in futures::future::try_join_all(spawning_tasks).await? {
472 running_nodes_per_level.push(node);
473 }
474 }
475 info!(
476 "🕰 waiting for level: {:?} to be up...",
477 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
478 );
479
480 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
482 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
483 });
484
485 let _ = futures::future::try_join_all(waiting_tasks).await?;
486
487 for node in running_nodes_per_level {
488 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
489 running_nodes.push(node);
490 }
491 }
492
493 let running_para_id = parachain.para_id;
494 network.add_para(parachain);
495 for node in running_nodes {
496 network.add_running_node(node, Some(running_para_id)).await;
497 }
498 }
499
500 for para in para_to_register_with_extrinsic {
502 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
503 id: para.id,
504 wasm_path: para
506 .genesis_wasm
507 .artifact_path()
508 .ok_or(OrchestratorError::InvariantError(
509 "artifact path for wasm must be set at this point",
510 ))?
511 .to_path_buf(),
512 state_path: para
513 .genesis_state
514 .artifact_path()
515 .ok_or(OrchestratorError::InvariantError(
516 "artifact path for state must be set at this point",
517 ))?
518 .to_path_buf(),
519 node_ws_url: node_ws_url.clone(),
520 onboard_as_para: para.onboard_as_parachain,
521 seed: None, finalization: false,
523 };
524
525 Parachain::register(register_para_options, &scoped_fs).await?;
526 }
527
528 if network_spec.global_settings.observability().enabled() {
529 match network
530 .start_observability(network_spec.global_settings.observability())
531 .await
532 {
533 Ok(obs) => {
534 info!("📊 Prometheus URL: {}", obs.prometheus_url);
535 info!("📊 Grafana URL: {}", obs.grafana_url);
536 },
537 Err(e) => {
538 warn!("⚠️ Failed to spawn observability stack: {e}");
539 },
540 }
541 }
542
543 for cp in &network_spec.custom_processes {
545 if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
546 warn!("⚠️ Failed to spawn custom process {}, err: {e}", cp.name())
547 }
548 }
549
550 network.set_start_time_ts(start_time);
551
552 write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
553
554 if network_spec.global_settings.tear_down_on_failure() {
555 network.spawn_watching_task();
556 }
557
558 Ok(network)
559 }
560}
561
562async fn recreate_network_nodes_from_json(
565 nodes_json: &serde_json::Value,
566 ns: DynNamespace,
567 provider_name: &str,
568) -> Result<Vec<NetworkNode>, OrchestratorError> {
569 let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
570
571 let mut nodes = Vec::with_capacity(raw_nodes.len());
572 for raw in raw_nodes {
573 let provider_tag = raw
575 .inner
576 .get("provider_tag")
577 .and_then(|v| v.as_str())
578 .ok_or_else(|| {
579 OrchestratorError::InvalidConfig(format!(
580 "Node '{}' is missing `provider_tag` in inner node JSON",
581 raw.name
582 ))
583 })?;
584
585 if provider_tag != provider_name {
586 return Err(OrchestratorError::InvalidConfigForProvider(
587 provider_name.to_string(),
588 provider_tag.to_string(),
589 ));
590 }
591 let inner = ns.spawn_node_from_json(&raw.inner).await?;
592 let relay_node = NetworkNode::new(
593 raw.name,
594 raw.ws_uri,
595 raw.prometheus_uri,
596 raw.multiaddr,
597 raw.spec,
598 inner,
599 );
600 nodes.push(relay_node);
601 }
602
603 Ok(nodes)
604}
605
606async fn recreate_relaychain_from_json(
607 zombie_json: &serde_json::Value,
608 ns: DynNamespace,
609 provider_name: &str,
610) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
611 let relay_json = zombie_json
612 .get("relay")
613 .ok_or(OrchestratorError::InvalidConfig(
614 "Missing `relay` field in zombie.json".into(),
615 ))?
616 .clone();
617
618 let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
619
620 let initial_spec: NetworkSpec = serde_json::from_value(
621 zombie_json
622 .get("initial_spec")
623 .ok_or(OrchestratorError::InvalidConfig(
624 "Missing `initial_spec` field in zombie.json".into(),
625 ))?
626 .clone(),
627 )?;
628
629 let nodes =
631 recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
632 relay_raw.inner.nodes = nodes;
633
634 Ok((relay_raw.inner, initial_spec))
635}
636
637async fn recreate_parachains_from_json(
638 zombie_json: &serde_json::Value,
639 ns: DynNamespace,
640 provider_name: &str,
641) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
642 let paras_json = zombie_json
643 .get("parachains")
644 .ok_or(OrchestratorError::InvalidConfig(
645 "Missing `parachains` field in zombie.json".into(),
646 ))?
647 .clone();
648
649 let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
650
651 let mut parachains_map = HashMap::new();
652
653 for (id, parachain_entries) in raw_paras {
654 let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
655
656 for raw_para in parachain_entries {
657 let mut para = raw_para.inner;
658 para.collators =
659 recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
660 .await?;
661 parsed_vec.push(para);
662 }
663
664 parachains_map.insert(id, parsed_vec);
665 }
666
667 Ok(parachains_map)
668}
669
670fn split_nodes_by_bootnodes(
673 nodes: &[NodeSpec],
674 no_default_bootnodes: bool,
675) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
676 let mut bootnodes = vec![];
678 let mut other_nodes = vec![];
679 nodes.iter().for_each(|node| {
680 if node.is_bootnode {
681 bootnodes.push(node)
682 } else {
683 other_nodes.push(node)
684 }
685 });
686
687 if bootnodes.is_empty() && !no_default_bootnodes {
688 bootnodes.push(other_nodes.remove(0))
689 }
690
691 (bootnodes, other_nodes)
692}
693
694fn generate_bootnode_addr(
696 node: &NetworkNode,
697 ip: &IpAddr,
698 port: u16,
699) -> Result<String, GeneratorError> {
700 generators::generate_node_bootnode_addr(
701 &node.spec.peer_id,
702 ip,
703 port,
704 node.inner.args().as_ref(),
705 &node.spec.p2p_cert_hash,
706 )
707}
708fn validate_spec_with_provider_capabilities(
710 network_spec: &NetworkSpec,
711 capabilities: &ProviderCapabilities,
712) -> Result<(), anyhow::Error> {
713 let mut errs: Vec<String> = vec![];
714
715 if capabilities.requires_image {
716 if network_spec.relaychain.default_image.is_none() {
718 let nodes = &network_spec.relaychain.nodes;
720 if nodes.iter().any(|node| node.image.is_none()) {
721 errs.push(String::from(
722 "Missing image for node, and not default is set at relaychain",
723 ));
724 }
725 };
726
727 for para in &network_spec.parachains {
729 if para.default_image.is_none() {
730 let nodes = ¶.collators;
731 if nodes.iter().any(|node| node.image.is_none()) {
732 errs.push(format!(
733 "Missing image for node, and not default is set at parachain {}",
734 para.id
735 ));
736 }
737 }
738 }
739 } else {
740 let mut cmds: HashSet<&str> = Default::default();
743 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
744 cmds.insert(cmd.as_str());
745 }
746 for node in network_spec.relaychain().nodes.iter() {
747 cmds.insert(node.command());
748 }
749
750 for para in &network_spec.parachains {
752 if let Some(cmd) = para.default_command.as_ref() {
753 cmds.insert(cmd.as_str());
754 }
755
756 for node in para.collators.iter() {
757 cmds.insert(node.command());
758 }
759 }
760
761 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
764 let parts: Vec<_> = path.split(":").collect();
765 for cmd in cmds {
766 let missing = if cmd.contains('/') {
767 trace!("checking {cmd}");
768 if std::fs::metadata(cmd).is_err() {
769 true
770 } else {
771 info!("🔎 We will use the full path {cmd} to spawn nodes.");
772 false
773 }
774 } else {
775 !parts.iter().any(|part| {
777 let path_to = format!("{part}/{cmd}");
778 trace!("checking {path_to}");
779 let check_result = std::fs::metadata(&path_to);
780 trace!("result {:?}", check_result);
781 if check_result.is_ok() {
782 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
783 true
784 } else {
785 false
786 }
787 })
788 };
789
790 if missing {
791 errs.push(help_msg(cmd));
792 }
793 }
794 }
795
796 if !errs.is_empty() {
797 let msg = errs.join("\n");
798 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
799 }
800
801 Ok(())
802}
803
804fn help_msg(cmd: &str) -> String {
805 match cmd {
806 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
807 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
808 },
809 "polkadot" => {
810 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
811 },
812 "polkadot-parachain" => {
813 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
814 },
815 _ => {
816 format!("Missing binary {cmd}, please compile it.")
817 },
818 }
819}
820
821fn spawn_concurrency_from_env() -> Option<usize> {
823 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
824 concurrency.parse::<usize>().ok()
825 } else {
826 None
827 }
828}
829
830fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
831 let desired_spawn_concurrency = match (
832 spawn_concurrency_from_env(),
833 spec.global_settings.spawn_concurrency(),
834 ) {
835 (Some(n), _) => Some(n),
836 (None, Some(n)) => Some(n),
837 _ => None,
838 };
839
840 let (spawn_concurrency, limited_by_tokens) =
841 if let Some(spawn_concurrency) = desired_spawn_concurrency {
842 if spawn_concurrency == 1 {
843 (1, false)
844 } else if has_tokens(&serde_json::to_string(spec)?) {
845 (1, true)
846 } else {
847 (spawn_concurrency, false)
848 }
849 } else {
850 if has_tokens(&serde_json::to_string(spec)?) {
852 (1, true)
853 } else {
854 (100, false)
856 }
857 };
858
859 Ok((spawn_concurrency, limited_by_tokens))
860}
861
862fn dependency_levels_among<'a>(
867 nodes: &'a [&'a NodeSpec],
868) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
869 let by_name = nodes
870 .iter()
871 .map(|n| (n.name.as_str(), *n))
872 .collect::<HashMap<_, _>>();
873
874 let mut graph = HashMap::with_capacity(nodes.len());
875 let mut indegree = HashMap::with_capacity(nodes.len());
876
877 for node in nodes {
878 graph.insert(node.name.as_str(), Vec::new());
879 indegree.insert(node.name.as_str(), 0);
880 }
881
882 for &node in nodes {
884 if let Ok(args_json) = serde_json::to_string(&node.args) {
885 let unique_deps = get_tokens_to_replace(&args_json)
887 .into_iter()
888 .filter(|dep| dep != &node.name)
889 .filter_map(|dep| by_name.get(dep.as_str()))
890 .map(|&dep_node| dep_node.name.as_str())
891 .collect::<HashSet<_>>();
892
893 for dep_name in unique_deps {
894 graph
895 .get_mut(dep_name)
896 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
897 .push(node);
898 *indegree
899 .get_mut(node.name.as_str())
900 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
901 }
902 }
903 }
904
905 let mut queue = nodes
907 .iter()
908 .filter(|n| {
909 *indegree
910 .get(n.name.as_str())
911 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
912 == 0
913 })
914 .copied()
915 .collect::<VecDeque<_>>();
916
917 let mut processed_count = 0;
918 let mut levels = Vec::new();
919
920 while !queue.is_empty() {
922 let level_size = queue.len();
923 let mut current_level = Vec::with_capacity(level_size);
924
925 for _ in 0..level_size {
926 let n = queue
927 .pop_front()
928 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
929 current_level.push(n);
930 processed_count += 1;
931
932 for &neighbour in graph
933 .get(n.name.as_str())
934 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
935 {
936 let neighbour_indegree = indegree
937 .get_mut(neighbour.name.as_str())
938 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
939 *neighbour_indegree -= 1;
940
941 if *neighbour_indegree == 0 {
942 queue.push_back(neighbour);
943 }
944 }
945 }
946
947 current_level.sort_by_key(|n| &n.name);
948 levels.push(current_level);
949 }
950
951 if processed_count != nodes.len() {
953 return Err(OrchestratorError::InvalidConfig(
954 "Tokens have cyclical dependencies".to_string(),
955 ));
956 }
957
958 Ok(levels)
959}
960
961#[derive(Clone, Debug)]
969pub struct ScopedFilesystem<'a, FS: FileSystem> {
970 fs: &'a FS,
971 base_dir: &'a str,
972}
973
974impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
975 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
976 Self { fs, base_dir }
977 }
978
979 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
980 for file in files {
981 let full_remote_path = PathBuf::from(format!(
982 "{}/{}",
983 self.base_dir,
984 file.remote_path.to_string_lossy()
985 ));
986 trace!("coping file: {file}");
987 self.fs
988 .copy(file.local_path.as_path(), full_remote_path)
989 .await?;
990 }
991 Ok(())
992 }
993
994 async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
995 let file = file.as_ref();
996
997 let full_path = if file.is_absolute() {
998 file.to_owned()
999 } else {
1000 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1001 };
1002 let content = self.fs.read(full_path).await?;
1003 Ok(content)
1004 }
1005
1006 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1007 let file = file.as_ref();
1008
1009 let full_path = if file.is_absolute() {
1010 file.to_owned()
1011 } else {
1012 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1013 };
1014 let content = self.fs.read_to_string(full_path).await?;
1015 Ok(content)
1016 }
1017
1018 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1019 let path = PathBuf::from(format!(
1020 "{}/{}",
1021 self.base_dir,
1022 path.as_ref().to_string_lossy()
1023 ));
1024 self.fs.create_dir(path).await
1025 }
1026
1027 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1028 let path = PathBuf::from(format!(
1029 "{}/{}",
1030 self.base_dir,
1031 path.as_ref().to_string_lossy()
1032 ));
1033 self.fs.create_dir_all(path).await
1034 }
1035
1036 async fn write(
1037 &self,
1038 path: impl AsRef<Path>,
1039 contents: impl AsRef<[u8]> + Send,
1040 ) -> Result<(), FileSystemError> {
1041 let path = path.as_ref();
1042
1043 let full_path = if path.is_absolute() {
1044 path.to_owned()
1045 } else {
1046 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1047 };
1048
1049 self.fs.write(full_path, contents).await
1050 }
1051
1052 fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1054 let path = path.as_ref();
1055
1056 let full_path = if path.is_absolute() {
1057 path.to_owned()
1058 } else {
1059 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1060 };
1061
1062 full_path
1063 }
1064
1065 fn base_dir(&self) -> &str {
1067 self.base_dir
1068 }
1069}
1070
1071#[derive(Clone, Debug)]
1072pub enum ZombieRole {
1073 Temp,
1074 Node,
1075 Bootnode,
1076 Collator,
1077 CumulusCollator,
1078 Companion,
1079}
1080
1081pub use network::{AddCollatorOptions, AddNodeOptions};
1083pub use network_helper::metrics;
1084pub use sc_chain_spec;
1085
1086#[cfg(test)]
1087mod tests {
1088 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1089 use lazy_static::lazy_static;
1090 use tokio::sync::Mutex;
1091
1092 use super::*;
1093
1094 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1095 lazy_static! {
1097 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1098 }
1099
1100 fn set_env(concurrency: Option<u32>) {
1101 if let Some(value) = concurrency {
1102 env::set_var(ENV_KEY, value.to_string());
1103 } else {
1104 env::remove_var(ENV_KEY);
1105 }
1106 }
1107
1108 fn generate(
1109 with_image: bool,
1110 with_cmd: Option<&'static str>,
1111 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1112 NetworkConfigBuilder::new()
1113 .with_relaychain(|r| {
1114 let mut relay = r
1115 .with_chain("rococo-local")
1116 .with_default_command(with_cmd.unwrap_or("polkadot"));
1117 if with_image {
1118 relay = relay.with_default_image("docker.io/parity/polkadot")
1119 }
1120
1121 relay
1122 .with_validator(|node| node.with_name("alice"))
1123 .with_validator(|node| node.with_name("bob"))
1124 })
1125 .with_parachain(|p| {
1126 p.with_id(2000).cumulus_based(true).with_collator(|n| {
1127 let node = n
1128 .with_name("collator")
1129 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1130 if with_image {
1131 node.with_image("docker.io/paritypr/test-parachain")
1132 } else {
1133 node
1134 }
1135 })
1136 })
1137 .build()
1138 }
1139
1140 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1141 let mut spec = NodeSpec {
1142 name: name.to_string(),
1143 ..Default::default()
1144 };
1145 if let Some(dependencies) = dependencies {
1146 for node in dependencies {
1147 spec.args.push(
1148 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1149 .as_str()
1150 .into(),
1151 );
1152 }
1153 }
1154 spec
1155 }
1156
1157 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1158 actual_levels
1159 .iter()
1160 .zip(expected_levels)
1161 .for_each(|(actual_level, expected_level)| {
1162 assert_eq!(actual_level.len(), expected_level.len());
1163 actual_level
1164 .iter()
1165 .zip(expected_level.iter())
1166 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1167 });
1168 }
1169
1170 #[tokio::test]
1171 async fn valid_config_with_image() {
1172 let network_config = generate(true, None).unwrap();
1173 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1174 let caps = ProviderCapabilities {
1175 requires_image: true,
1176 has_resources: false,
1177 prefix_with_full_path: false,
1178 use_default_ports_in_cmd: false,
1179 };
1180
1181 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1182 assert!(valid.is_ok())
1183 }
1184
1185 #[tokio::test]
1186 async fn invalid_config_without_image() {
1187 let network_config = generate(false, None).unwrap();
1188 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1189 let caps = ProviderCapabilities {
1190 requires_image: true,
1191 has_resources: false,
1192 prefix_with_full_path: false,
1193 use_default_ports_in_cmd: false,
1194 };
1195
1196 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1197 assert!(valid.is_err())
1198 }
1199
1200 #[tokio::test]
1201 async fn invalid_config_missing_cmd() {
1202 let network_config = generate(false, Some("other")).unwrap();
1203 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1204 let caps = ProviderCapabilities {
1205 requires_image: false,
1206 has_resources: false,
1207 prefix_with_full_path: false,
1208 use_default_ports_in_cmd: false,
1209 };
1210
1211 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1212 assert!(valid.is_err())
1213 }
1214
1215 #[tokio::test]
1216 async fn valid_config_present_cmd() {
1217 let network_config = generate(false, Some("cargo")).unwrap();
1218 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1219 let caps = ProviderCapabilities {
1220 requires_image: false,
1221 has_resources: false,
1222 prefix_with_full_path: false,
1223 use_default_ports_in_cmd: false,
1224 };
1225
1226 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1227 println!("{valid:?}");
1228 assert!(valid.is_ok())
1229 }
1230
1231 #[tokio::test]
1232 async fn default_spawn_concurrency() {
1233 let _g = ENV_MUTEX.lock().await;
1234 set_env(None);
1235 let network_config = generate(false, Some("cargo")).unwrap();
1236 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1237 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1238 assert_eq!(concurrency, 100);
1239 }
1240
1241 #[tokio::test]
1242 async fn set_spawn_concurrency() {
1243 let _g = ENV_MUTEX.lock().await;
1244 set_env(None);
1245
1246 let network_config = generate(false, Some("cargo")).unwrap();
1247 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1248
1249 let global_settings = GlobalSettingsBuilder::new()
1250 .with_spawn_concurrency(4)
1251 .build()
1252 .unwrap();
1253
1254 spec.set_global_settings(global_settings);
1255 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1256 assert_eq!(concurrency, 4);
1257 assert!(!limited);
1258 }
1259
1260 #[tokio::test]
1261 async fn set_spawn_concurrency_but_limited() {
1262 let _g = ENV_MUTEX.lock().await;
1263 set_env(None);
1264
1265 let network_config = generate(false, Some("cargo")).unwrap();
1266 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1267
1268 let global_settings = GlobalSettingsBuilder::new()
1269 .with_spawn_concurrency(4)
1270 .build()
1271 .unwrap();
1272
1273 spec.set_global_settings(global_settings);
1274 let node = spec.relaychain.nodes.first_mut().unwrap();
1275 node.args
1276 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1277 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1278 assert_eq!(concurrency, 1);
1279 assert!(limited);
1280 }
1281
1282 #[tokio::test]
1283 async fn set_spawn_concurrency_from_env() {
1284 let _g = ENV_MUTEX.lock().await;
1285 set_env(Some(10));
1286
1287 let network_config = generate(false, Some("cargo")).unwrap();
1288 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1289 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1290 assert_eq!(concurrency, 10);
1291 assert!(!limited);
1292 }
1293
1294 #[tokio::test]
1295 async fn set_spawn_concurrency_from_env_but_limited() {
1296 let _g = ENV_MUTEX.lock().await;
1297 set_env(Some(12));
1298
1299 let network_config = generate(false, Some("cargo")).unwrap();
1300 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1301 let node = spec.relaychain.nodes.first_mut().unwrap();
1302 node.args
1303 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1304 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1305 assert_eq!(concurrency, 1);
1306 assert!(limited);
1307 }
1308
1309 #[test]
1310 fn dependency_levels_among_should_work() {
1311 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1313
1314 let alice = get_node_with_dependencies("alice", None);
1316 let nodes = [&alice];
1317
1318 let levels = dependency_levels_among(&nodes).unwrap();
1319 let expected = vec![vec!["alice"]];
1320
1321 verify_levels(levels, expected);
1322
1323 let alice = get_node_with_dependencies("alice", None);
1325 let bob = get_node_with_dependencies("bob", None);
1326 let nodes = [&alice, &bob];
1327
1328 let levels = dependency_levels_among(&nodes).unwrap();
1329 let expected = vec![vec!["alice", "bob"]];
1330
1331 verify_levels(levels, expected);
1332
1333 let alice = get_node_with_dependencies("alice", None);
1335 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1336 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1337 let nodes = [&alice, &bob, &charlie];
1338
1339 let levels = dependency_levels_among(&nodes).unwrap();
1340 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1341
1342 verify_levels(levels, expected);
1343
1344 let alice = get_node_with_dependencies("alice", None);
1348 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1349 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1350 let nodes = [&alice, &bob, &charlie];
1351
1352 let levels = dependency_levels_among(&nodes).unwrap();
1353 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1354
1355 verify_levels(levels, expected);
1356
1357 let alice = get_node_with_dependencies("alice", None);
1361 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1362 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1363 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1364 let nodes = [&alice, &bob, &charlie, &dave];
1365
1366 let levels = dependency_levels_among(&nodes).unwrap();
1367 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1368
1369 verify_levels(levels, expected);
1370 }
1371
1372 #[test]
1373 fn dependency_levels_among_should_detect_cycles() {
1374 let mut alice = get_node_with_dependencies("alice", None);
1375 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1376 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1377
1378 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1379 }
1380}