1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13
14use std::{
15 collections::{HashMap, HashSet, VecDeque},
16 env,
17 net::IpAddr,
18 path::{Path, PathBuf},
19 time::{Duration, SystemTime},
20};
21
22use configuration::{NetworkConfig, RegistrationStrategy};
23use errors::OrchestratorError;
24use generators::errors::GeneratorError;
25use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
26pub use network_spec::NetworkSpec;
28use network_spec::{node::NodeSpec, parachain::ParachainSpec};
29use provider::{
30 types::{ProviderCapabilities, TransferedFile},
31 DynProvider,
32};
33use serde_json::json;
34use support::{
35 constants::{
36 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
37 THIS_IS_A_BUG,
38 },
39 fs::{FileSystem, FileSystemError},
40 replacer::{get_tokens_to_replace, has_tokens},
41};
42use tokio::time::timeout;
43use tracing::{debug, info, trace, warn};
44
45use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
46pub struct Orchestrator<T>
47where
48 T: FileSystem + Sync + Send,
49{
50 filesystem: T,
51 provider: DynProvider,
52}
53
54impl<T> Orchestrator<T>
55where
56 T: FileSystem + Sync + Send + Clone,
57{
58 pub fn new(filesystem: T, provider: DynProvider) -> Self {
59 Self {
60 filesystem,
61 provider,
62 }
63 }
64
65 pub async fn spawn(
66 &self,
67 network_config: NetworkConfig,
68 ) -> Result<Network<T>, OrchestratorError> {
69 let global_timeout = network_config.global_settings().network_spawn_timeout();
70 let network_spec = NetworkSpec::from_config(&network_config).await?;
71
72 let res = timeout(
73 Duration::from_secs(global_timeout.into()),
74 self.spawn_inner(network_spec),
75 )
76 .await
77 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
78 res?
79 }
80
81 pub async fn spawn_from_spec(
82 &self,
83 network_spec: NetworkSpec,
84 ) -> Result<Network<T>, OrchestratorError> {
85 let global_timeout = network_spec.global_settings.network_spawn_timeout();
86 let res = timeout(
87 Duration::from_secs(global_timeout as u64),
88 self.spawn_inner(network_spec),
89 )
90 .await
91 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
92 res?
93 }
94
95 async fn spawn_inner(
96 &self,
97 mut network_spec: NetworkSpec,
98 ) -> Result<Network<T>, OrchestratorError> {
99 debug!(network_spec = ?network_spec,"Network spec to spawn");
101
102 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
104 .map_err(|err| {
105 OrchestratorError::InvalidConfigForProvider(
106 self.provider.name().into(),
107 err.to_string(),
108 )
109 })?;
110
111 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
113 self.provider
114 .create_namespace_with_base_dir(base_dir)
115 .await?
116 } else {
117 self.provider.create_namespace().await?
118 };
119
120 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
122
123 let start_time = SystemTime::now();
124 info!("🧰 ns: {}", ns.name());
125 info!("🧰 base_dir: {:?}", ns.base_dir());
126 info!("🕰 start time: {:?}", start_time);
127 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
128
129 network_spec
130 .populate_nodes_available_args(ns.clone())
131 .await?;
132
133 let base_dir = ns.base_dir().to_string_lossy();
134 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
135 network_spec
137 .relaychain
138 .chain_spec
139 .build(&ns, &scoped_fs)
140 .await?;
141
142 debug!("relaychain spec built!");
143 let relay_chain_id = network_spec
145 .relaychain
146 .chain_spec
147 .read_chain_id(&scoped_fs)
148 .await?;
149
150 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
151 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
152 network_spec
153 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
154 .await?;
155
156 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
158 Vec<&ParachainSpec>,
159 Vec<&ParachainSpec>,
160 ) = network_spec
161 .parachains
162 .iter()
163 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
164 .partition(|para| {
165 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
166 });
167
168 let mut para_artifacts = vec![];
169 for para in para_to_register_in_genesis {
170 let genesis_config = para.get_genesis_config()?;
171 para_artifacts.push(genesis_config)
172 }
173
174 network_spec
176 .relaychain
177 .chain_spec
178 .customize_relay(
179 &network_spec.relaychain,
180 &network_spec.hrmp_channels,
181 para_artifacts,
182 &scoped_fs,
183 )
184 .await?;
185
186 network_spec
188 .relaychain
189 .chain_spec
190 .build_raw(&ns, &scoped_fs)
191 .await?;
192
193 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
195 network_spec
196 .relaychain
197 .chain_spec
198 .override_code(&scoped_fs, wasm_override)
199 .await?;
200 }
201
202 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
204 network_spec
205 .relaychain
206 .chain_spec
207 .override_raw_spec(&scoped_fs, raw_spec_override)
208 .await?;
209 }
210
211 let (bootnodes, relaynodes) =
212 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
213
214 let mut ctx = SpawnNodeCtx {
216 chain_id: &relay_chain_id,
217 parachain_id: None,
218 chain: relay_chain_name.as_str(),
219 role: ZombieRole::Node,
220 ns: &ns,
221 scoped_fs: &scoped_fs,
222 parachain: None,
223 bootnodes_addr: &vec![],
224 wait_ready: false,
225 nodes_by_name: json!({}),
226 };
227
228 let global_files_to_inject = vec![TransferedFile::new(
229 PathBuf::from(format!(
230 "{}/{relay_chain_name}.json",
231 ns.base_dir().to_string_lossy()
232 )),
233 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
234 )];
235
236 let r = Relaychain::new(
237 relay_chain_name.to_string(),
238 relay_chain_id.clone(),
239 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
240 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
241 )?),
242 );
243 let mut network =
244 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
245
246 let mut node_ws_url: String = "".to_string();
248
249 let mut bootnodes_addr: Vec<String> = vec![];
251
252 for level in dependency_levels_among(&bootnodes)? {
253 let mut running_nodes_per_level = vec![];
254 for chunk in level.chunks(spawn_concurrency) {
255 let spawning_tasks = chunk
256 .iter()
257 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
258
259 for node in futures::future::try_join_all(spawning_tasks).await? {
260 let bootnode_multiaddr = node.multiaddr();
261
262 bootnodes_addr.push(bootnode_multiaddr.to_string());
263
264 if node_ws_url.is_empty() {
266 node_ws_url.clone_from(&node.ws_uri)
267 }
268
269 running_nodes_per_level.push(node);
270 }
271 }
272 info!(
273 "🕰 waiting for level: {:?} to be up...",
274 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
275 );
276
277 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
279 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
280 });
281
282 let _ = futures::future::try_join_all(waiting_tasks).await?;
283
284 for node in running_nodes_per_level {
285 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
287 network.add_running_node(node, None).await;
288 }
289 }
290
291 network_spec
293 .relaychain
294 .chain_spec
295 .add_bootnodes(&scoped_fs, &bootnodes_addr)
296 .await?;
297
298 ctx.bootnodes_addr = &bootnodes_addr;
299
300 for level in dependency_levels_among(&relaynodes)? {
301 let mut running_nodes_per_level = vec![];
302 for chunk in level.chunks(spawn_concurrency) {
303 let spawning_tasks = chunk
304 .iter()
305 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
306
307 for node in futures::future::try_join_all(spawning_tasks).await? {
308 running_nodes_per_level.push(node);
309 }
310 }
311 info!(
312 "🕰 waiting for level: {:?} to be up...",
313 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
314 );
315
316 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
318 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
319 });
320
321 let _ = futures::future::try_join_all(waiting_tasks).await?;
322
323 for node in running_nodes_per_level {
324 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
325 network.add_running_node(node, None).await;
326 }
327 }
328
329 for para in network_spec.parachains.iter() {
331 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
333 let parachain_id = parachain.chain_id.clone();
334
335 let (bootnodes, collators) =
336 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
337
338 let mut ctx_para = SpawnNodeCtx {
340 parachain: Some(para),
341 parachain_id: parachain_id.as_deref(),
342 role: if para.is_cumulus_based {
343 ZombieRole::CumulusCollator
344 } else {
345 ZombieRole::Collator
346 },
347 bootnodes_addr: &vec![],
348 ..ctx.clone()
349 };
350
351 let mut bootnodes_addr: Vec<String> = vec![];
353 let mut running_nodes: Vec<NetworkNode> = vec![];
354
355 for level in dependency_levels_among(&bootnodes)? {
356 let mut running_nodes_per_level = vec![];
357 for chunk in level.chunks(spawn_concurrency) {
358 let spawning_tasks = chunk.iter().map(|node| {
359 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
360 });
361
362 for node in futures::future::try_join_all(spawning_tasks).await? {
363 let bootnode_multiaddr = node.multiaddr();
364
365 bootnodes_addr.push(bootnode_multiaddr.to_string());
366
367 running_nodes_per_level.push(node);
368 }
369 }
370 info!(
371 "🕰 waiting for level: {:?} to be up...",
372 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
373 );
374
375 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
377 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
378 });
379
380 let _ = futures::future::try_join_all(waiting_tasks).await?;
381
382 for node in running_nodes_per_level {
383 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
384 running_nodes.push(node);
385 }
386 }
387
388 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
389 para_chain_spec
390 .add_bootnodes(&scoped_fs, &bootnodes_addr)
391 .await?;
392 }
393
394 ctx_para.bootnodes_addr = &bootnodes_addr;
395
396 for level in dependency_levels_among(&collators)? {
398 let mut running_nodes_per_level = vec![];
399 for chunk in level.chunks(spawn_concurrency) {
400 let spawning_tasks = chunk.iter().map(|node| {
401 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
402 });
403
404 for node in futures::future::try_join_all(spawning_tasks).await? {
405 running_nodes_per_level.push(node);
406 }
407 }
408 info!(
409 "🕰 waiting for level: {:?} to be up...",
410 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
411 );
412
413 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
415 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
416 });
417
418 let _ = futures::future::try_join_all(waiting_tasks).await?;
419
420 for node in running_nodes_per_level {
421 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
422 running_nodes.push(node);
423 }
424 }
425
426 let running_para_id = parachain.para_id;
427 network.add_para(parachain);
428 for node in running_nodes {
429 network.add_running_node(node, Some(running_para_id)).await;
430 }
431 }
432
433 for para in para_to_register_with_extrinsic {
441 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
442 id: para.id,
443 wasm_path: para
445 .genesis_wasm
446 .artifact_path()
447 .ok_or(OrchestratorError::InvariantError(
448 "artifact path for wasm must be set at this point",
449 ))?
450 .to_path_buf(),
451 state_path: para
452 .genesis_state
453 .artifact_path()
454 .ok_or(OrchestratorError::InvariantError(
455 "artifact path for state must be set at this point",
456 ))?
457 .to_path_buf(),
458 node_ws_url: node_ws_url.clone(),
459 onboard_as_para: para.onboard_as_parachain,
460 seed: None, finalization: false,
462 };
463
464 Parachain::register(register_para_options, &scoped_fs).await?;
465 }
466
467 let mut zombie_json = serde_json::to_value(&network)?;
469 zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
470 zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
471
472 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
473 zombie_json["start_time_ts"] =
474 serde_json::value::Value::String(start_time_ts.as_millis().to_string());
475 } else {
476 warn!("⚠️ Error getting start_time timestamp");
478 }
479
480 scoped_fs
481 .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
482 .await?;
483
484 if network_spec.global_settings.tear_down_on_failure() {
485 network.spawn_watching_task();
486 }
487
488 Ok(network)
489 }
490}
491
492fn split_nodes_by_bootnodes(
497 nodes: &[NodeSpec],
498 no_default_bootnodes: bool,
499) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
500 let mut bootnodes = vec![];
502 let mut other_nodes = vec![];
503 nodes.iter().for_each(|node| {
504 if node.is_bootnode {
505 bootnodes.push(node)
506 } else {
507 other_nodes.push(node)
508 }
509 });
510
511 if bootnodes.is_empty() && !no_default_bootnodes {
512 bootnodes.push(other_nodes.remove(0))
513 }
514
515 (bootnodes, other_nodes)
516}
517
518fn generate_bootnode_addr(
520 node: &NetworkNode,
521 ip: &IpAddr,
522 port: u16,
523) -> Result<String, GeneratorError> {
524 generators::generate_node_bootnode_addr(
525 &node.spec.peer_id,
526 ip,
527 port,
528 node.inner.args().as_ref(),
529 &node.spec.p2p_cert_hash,
530 )
531}
532fn validate_spec_with_provider_capabilities(
534 network_spec: &NetworkSpec,
535 capabilities: &ProviderCapabilities,
536) -> Result<(), anyhow::Error> {
537 let mut errs: Vec<String> = vec![];
538
539 if capabilities.requires_image {
540 if network_spec.relaychain.default_image.is_none() {
542 let nodes = &network_spec.relaychain.nodes;
544 if nodes.iter().any(|node| node.image.is_none()) {
545 errs.push(String::from(
546 "Missing image for node, and not default is set at relaychain",
547 ));
548 }
549 };
550
551 for para in &network_spec.parachains {
553 if para.default_image.is_none() {
554 let nodes = ¶.collators;
555 if nodes.iter().any(|node| node.image.is_none()) {
556 errs.push(format!(
557 "Missing image for node, and not default is set at parachain {}",
558 para.id
559 ));
560 }
561 }
562 }
563 } else {
564 let mut cmds: HashSet<&str> = Default::default();
567 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
568 cmds.insert(cmd.as_str());
569 }
570 for node in network_spec.relaychain().nodes.iter() {
571 cmds.insert(node.command());
572 }
573
574 for para in &network_spec.parachains {
576 if let Some(cmd) = para.default_command.as_ref() {
577 cmds.insert(cmd.as_str());
578 }
579
580 for node in para.collators.iter() {
581 cmds.insert(node.command());
582 }
583 }
584
585 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
588 let parts: Vec<_> = path.split(":").collect();
589 for cmd in cmds {
590 let missing = if cmd.contains('/') {
591 trace!("checking {cmd}");
592 if std::fs::metadata(cmd).is_err() {
593 true
594 } else {
595 info!("🔎 We will use the full path {cmd} to spawn nodes.");
596 false
597 }
598 } else {
599 !parts.iter().any(|part| {
601 let path_to = format!("{part}/{cmd}");
602 trace!("checking {path_to}");
603 let check_result = std::fs::metadata(&path_to);
604 trace!("result {:?}", check_result);
605 if check_result.is_ok() {
606 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
607 true
608 } else {
609 false
610 }
611 })
612 };
613
614 if missing {
615 errs.push(help_msg(cmd));
616 }
617 }
618 }
619
620 if !errs.is_empty() {
621 let msg = errs.join("\n");
622 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
623 }
624
625 Ok(())
626}
627
628fn help_msg(cmd: &str) -> String {
629 match cmd {
630 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
631 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
632 },
633 "polkadot" => {
634 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
635 },
636 "polkadot-parachain" => {
637 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
638 },
639 _ => {
640 format!("Missing binary {cmd}, please compile it.")
641 },
642 }
643}
644
645fn spawn_concurrency_from_env() -> Option<usize> {
647 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
648 concurrency.parse::<usize>().ok()
649 } else {
650 None
651 }
652}
653
654fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
655 let desired_spawn_concurrency = match (
656 spawn_concurrency_from_env(),
657 spec.global_settings.spawn_concurrency(),
658 ) {
659 (Some(n), _) => Some(n),
660 (None, Some(n)) => Some(n),
661 _ => None,
662 };
663
664 let (spawn_concurrency, limited_by_tokens) =
665 if let Some(spawn_concurrency) = desired_spawn_concurrency {
666 if spawn_concurrency == 1 {
667 (1, false)
668 } else if has_tokens(&serde_json::to_string(spec)?) {
669 (1, true)
670 } else {
671 (spawn_concurrency, false)
672 }
673 } else {
674 if has_tokens(&serde_json::to_string(spec)?) {
676 (1, true)
677 } else {
678 (100, false)
680 }
681 };
682
683 Ok((spawn_concurrency, limited_by_tokens))
684}
685
686fn dependency_levels_among<'a>(
691 nodes: &'a [&'a NodeSpec],
692) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
693 let by_name = nodes
694 .iter()
695 .map(|n| (n.name.as_str(), *n))
696 .collect::<HashMap<_, _>>();
697
698 let mut graph = HashMap::with_capacity(nodes.len());
699 let mut indegree = HashMap::with_capacity(nodes.len());
700
701 for node in nodes {
702 graph.insert(node.name.as_str(), Vec::new());
703 indegree.insert(node.name.as_str(), 0);
704 }
705
706 for &node in nodes {
708 if let Ok(args_json) = serde_json::to_string(&node.args) {
709 let unique_deps = get_tokens_to_replace(&args_json)
711 .into_iter()
712 .filter(|dep| dep != &node.name)
713 .filter_map(|dep| by_name.get(dep.as_str()))
714 .map(|&dep_node| dep_node.name.as_str())
715 .collect::<HashSet<_>>();
716
717 for dep_name in unique_deps {
718 graph
719 .get_mut(dep_name)
720 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
721 .push(node);
722 *indegree
723 .get_mut(node.name.as_str())
724 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
725 }
726 }
727 }
728
729 let mut queue = nodes
731 .iter()
732 .filter(|n| {
733 *indegree
734 .get(n.name.as_str())
735 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
736 == 0
737 })
738 .copied()
739 .collect::<VecDeque<_>>();
740
741 let mut processed_count = 0;
742 let mut levels = Vec::new();
743
744 while !queue.is_empty() {
746 let level_size = queue.len();
747 let mut current_level = Vec::with_capacity(level_size);
748
749 for _ in 0..level_size {
750 let n = queue
751 .pop_front()
752 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
753 current_level.push(n);
754 processed_count += 1;
755
756 for &neighbour in graph
757 .get(n.name.as_str())
758 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
759 {
760 let neighbour_indegree = indegree
761 .get_mut(neighbour.name.as_str())
762 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
763 *neighbour_indegree -= 1;
764
765 if *neighbour_indegree == 0 {
766 queue.push_back(neighbour);
767 }
768 }
769 }
770
771 current_level.sort_by_key(|n| &n.name);
772 levels.push(current_level);
773 }
774
775 if processed_count != nodes.len() {
777 return Err(OrchestratorError::InvalidConfig(
778 "Tokens have cyclical dependencies".to_string(),
779 ));
780 }
781
782 Ok(levels)
783}
784
785#[derive(Clone, Debug)]
793pub struct ScopedFilesystem<'a, FS: FileSystem> {
794 fs: &'a FS,
795 base_dir: &'a str,
796}
797
798impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
799 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
800 Self { fs, base_dir }
801 }
802
803 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
804 for file in files {
805 let full_remote_path = PathBuf::from(format!(
806 "{}/{}",
807 self.base_dir,
808 file.remote_path.to_string_lossy()
809 ));
810 trace!("coping file: {file}");
811 self.fs
812 .copy(file.local_path.as_path(), full_remote_path)
813 .await?;
814 }
815 Ok(())
816 }
817
818 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
819 let file = file.as_ref();
820
821 let full_path = if file.is_absolute() {
822 file.to_owned()
823 } else {
824 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
825 };
826 let content = self.fs.read_to_string(full_path).await?;
827 Ok(content)
828 }
829
830 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
831 let path = PathBuf::from(format!(
832 "{}/{}",
833 self.base_dir,
834 path.as_ref().to_string_lossy()
835 ));
836 self.fs.create_dir(path).await
837 }
838
839 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
840 let path = PathBuf::from(format!(
841 "{}/{}",
842 self.base_dir,
843 path.as_ref().to_string_lossy()
844 ));
845 self.fs.create_dir_all(path).await
846 }
847
848 async fn write(
849 &self,
850 path: impl AsRef<Path>,
851 contents: impl AsRef<[u8]> + Send,
852 ) -> Result<(), FileSystemError> {
853 let path = path.as_ref();
854
855 let full_path = if path.is_absolute() {
856 path.to_owned()
857 } else {
858 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
859 };
860
861 self.fs.write(full_path, contents).await
862 }
863}
864
865#[derive(Clone, Debug)]
866pub enum ZombieRole {
867 Temp,
868 Node,
869 Bootnode,
870 Collator,
871 CumulusCollator,
872 Companion,
873}
874
875pub use network::{AddCollatorOptions, AddNodeOptions};
877pub use network_helper::metrics;
878
879#[cfg(test)]
880mod tests {
881 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
882 use lazy_static::lazy_static;
883 use tokio::sync::Mutex;
884
885 use super::*;
886
887 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
888 lazy_static! {
890 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
891 }
892
893 fn set_env(concurrency: Option<u32>) {
894 if let Some(value) = concurrency {
895 env::set_var(ENV_KEY, value.to_string());
896 } else {
897 env::remove_var(ENV_KEY);
898 }
899 }
900
901 fn generate(
902 with_image: bool,
903 with_cmd: Option<&'static str>,
904 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
905 NetworkConfigBuilder::new()
906 .with_relaychain(|r| {
907 let mut relay = r
908 .with_chain("rococo-local")
909 .with_default_command(with_cmd.unwrap_or("polkadot"));
910 if with_image {
911 relay = relay.with_default_image("docker.io/parity/polkadot")
912 }
913
914 relay
915 .with_validator(|node| node.with_name("alice"))
916 .with_validator(|node| node.with_name("bob"))
917 })
918 .with_parachain(|p| {
919 p.with_id(2000).cumulus_based(true).with_collator(|n| {
920 let node = n
921 .with_name("collator")
922 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
923 if with_image {
924 node.with_image("docker.io/paritypr/test-parachain")
925 } else {
926 node
927 }
928 })
929 })
930 .build()
931 }
932
933 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
934 let mut spec = NodeSpec {
935 name: name.to_string(),
936 ..Default::default()
937 };
938 if let Some(dependencies) = dependencies {
939 for node in dependencies {
940 spec.args.push(
941 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
942 .as_str()
943 .into(),
944 );
945 }
946 }
947 spec
948 }
949
950 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
951 actual_levels
952 .iter()
953 .zip(expected_levels)
954 .for_each(|(actual_level, expected_level)| {
955 assert_eq!(actual_level.len(), expected_level.len());
956 actual_level
957 .iter()
958 .zip(expected_level.iter())
959 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
960 });
961 }
962
963 #[tokio::test]
964 async fn valid_config_with_image() {
965 let network_config = generate(true, None).unwrap();
966 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
967 let caps = ProviderCapabilities {
968 requires_image: true,
969 has_resources: false,
970 prefix_with_full_path: false,
971 use_default_ports_in_cmd: false,
972 };
973
974 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
975 assert!(valid.is_ok())
976 }
977
978 #[tokio::test]
979 async fn invalid_config_without_image() {
980 let network_config = generate(false, None).unwrap();
981 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
982 let caps = ProviderCapabilities {
983 requires_image: true,
984 has_resources: false,
985 prefix_with_full_path: false,
986 use_default_ports_in_cmd: false,
987 };
988
989 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
990 assert!(valid.is_err())
991 }
992
993 #[tokio::test]
994 async fn invalid_config_missing_cmd() {
995 let network_config = generate(false, Some("other")).unwrap();
996 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
997 let caps = ProviderCapabilities {
998 requires_image: false,
999 has_resources: false,
1000 prefix_with_full_path: false,
1001 use_default_ports_in_cmd: false,
1002 };
1003
1004 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1005 assert!(valid.is_err())
1006 }
1007
1008 #[tokio::test]
1009 async fn valid_config_present_cmd() {
1010 let network_config = generate(false, Some("cargo")).unwrap();
1011 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1012 let caps = ProviderCapabilities {
1013 requires_image: false,
1014 has_resources: false,
1015 prefix_with_full_path: false,
1016 use_default_ports_in_cmd: false,
1017 };
1018
1019 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1020 println!("{valid:?}");
1021 assert!(valid.is_ok())
1022 }
1023
1024 #[tokio::test]
1025 async fn default_spawn_concurrency() {
1026 let _g = ENV_MUTEX.lock().await;
1027 set_env(None);
1028 let network_config = generate(false, Some("cargo")).unwrap();
1029 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1030 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1031 assert_eq!(concurrency, 100);
1032 }
1033
1034 #[tokio::test]
1035 async fn set_spawn_concurrency() {
1036 let _g = ENV_MUTEX.lock().await;
1037 set_env(None);
1038
1039 let network_config = generate(false, Some("cargo")).unwrap();
1040 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1041
1042 let global_settings = GlobalSettingsBuilder::new()
1043 .with_spawn_concurrency(4)
1044 .build()
1045 .unwrap();
1046
1047 spec.set_global_settings(global_settings);
1048 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1049 assert_eq!(concurrency, 4);
1050 assert!(!limited);
1051 }
1052
1053 #[tokio::test]
1054 async fn set_spawn_concurrency_but_limited() {
1055 let _g = ENV_MUTEX.lock().await;
1056 set_env(None);
1057
1058 let network_config = generate(false, Some("cargo")).unwrap();
1059 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1060
1061 let global_settings = GlobalSettingsBuilder::new()
1062 .with_spawn_concurrency(4)
1063 .build()
1064 .unwrap();
1065
1066 spec.set_global_settings(global_settings);
1067 let node = spec.relaychain.nodes.first_mut().unwrap();
1068 node.args
1069 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1070 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1071 assert_eq!(concurrency, 1);
1072 assert!(limited);
1073 }
1074
1075 #[tokio::test]
1076 async fn set_spawn_concurrency_from_env() {
1077 let _g = ENV_MUTEX.lock().await;
1078 set_env(Some(10));
1079
1080 let network_config = generate(false, Some("cargo")).unwrap();
1081 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1082 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1083 assert_eq!(concurrency, 10);
1084 assert!(!limited);
1085 }
1086
1087 #[tokio::test]
1088 async fn set_spawn_concurrency_from_env_but_limited() {
1089 let _g = ENV_MUTEX.lock().await;
1090 set_env(Some(12));
1091
1092 let network_config = generate(false, Some("cargo")).unwrap();
1093 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1094 let node = spec.relaychain.nodes.first_mut().unwrap();
1095 node.args
1096 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1097 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1098 assert_eq!(concurrency, 1);
1099 assert!(limited);
1100 }
1101
1102 #[test]
1103 fn dependency_levels_among_should_work() {
1104 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1106
1107 let alice = get_node_with_dependencies("alice", None);
1109 let nodes = [&alice];
1110
1111 let levels = dependency_levels_among(&nodes).unwrap();
1112 let expected = vec![vec!["alice"]];
1113
1114 verify_levels(levels, expected);
1115
1116 let alice = get_node_with_dependencies("alice", None);
1118 let bob = get_node_with_dependencies("bob", None);
1119 let nodes = [&alice, &bob];
1120
1121 let levels = dependency_levels_among(&nodes).unwrap();
1122 let expected = vec![vec!["alice", "bob"]];
1123
1124 verify_levels(levels, expected);
1125
1126 let alice = get_node_with_dependencies("alice", None);
1128 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1129 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1130 let nodes = [&alice, &bob, &charlie];
1131
1132 let levels = dependency_levels_among(&nodes).unwrap();
1133 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1134
1135 verify_levels(levels, expected);
1136
1137 let alice = get_node_with_dependencies("alice", None);
1141 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1142 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1143 let nodes = [&alice, &bob, &charlie];
1144
1145 let levels = dependency_levels_among(&nodes).unwrap();
1146 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1147
1148 verify_levels(levels, expected);
1149
1150 let alice = get_node_with_dependencies("alice", None);
1154 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1155 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1156 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1157 let nodes = [&alice, &bob, &charlie, &dave];
1158
1159 let levels = dependency_levels_among(&nodes).unwrap();
1160 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1161
1162 verify_levels(levels, expected);
1163 }
1164
1165 #[test]
1166 fn dependency_levels_among_should_detect_cycles() {
1167 let mut alice = get_node_with_dependencies("alice", None);
1168 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1169 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1170
1171 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1172 }
1173}