1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 env,
19 net::IpAddr,
20 path::{Path, PathBuf},
21 time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32 types::{ProviderCapabilities, TransferedFile},
33 DynProvider,
34};
35use serde_json::json;
36use support::{
37 constants::{
38 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39 THIS_IS_A_BUG,
40 },
41 fs::{FileSystem, FileSystemError},
42 replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
48pub struct Orchestrator<T>
49where
50 T: FileSystem + Sync + Send,
51{
52 filesystem: T,
53 provider: DynProvider,
54}
55
56impl<T> Orchestrator<T>
57where
58 T: FileSystem + Sync + Send + Clone,
59{
60 pub fn new(filesystem: T, provider: DynProvider) -> Self {
61 Self {
62 filesystem,
63 provider,
64 }
65 }
66
67 pub async fn spawn(
68 &self,
69 network_config: NetworkConfig,
70 ) -> Result<Network<T>, OrchestratorError> {
71 let global_timeout = network_config.global_settings().network_spawn_timeout();
72 let network_spec = NetworkSpec::from_config(&network_config).await?;
73
74 let res = timeout(
75 Duration::from_secs(global_timeout.into()),
76 self.spawn_inner(network_spec),
77 )
78 .await
79 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
80 res?
81 }
82
83 pub async fn spawn_from_spec(
84 &self,
85 network_spec: NetworkSpec,
86 ) -> Result<Network<T>, OrchestratorError> {
87 let global_timeout = network_spec.global_settings.network_spawn_timeout();
88 let res = timeout(
89 Duration::from_secs(global_timeout as u64),
90 self.spawn_inner(network_spec),
91 )
92 .await
93 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
94 res?
95 }
96
97 async fn spawn_inner(
98 &self,
99 mut network_spec: NetworkSpec,
100 ) -> Result<Network<T>, OrchestratorError> {
101 debug!(network_spec = ?network_spec,"Network spec to spawn");
103
104 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
106 .map_err(|err| {
107 OrchestratorError::InvalidConfigForProvider(
108 self.provider.name().into(),
109 err.to_string(),
110 )
111 })?;
112
113 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
115 self.provider
116 .create_namespace_with_base_dir(base_dir)
117 .await?
118 } else {
119 self.provider.create_namespace().await?
120 };
121
122 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
124
125 let start_time = SystemTime::now();
126 info!("🧰 ns: {}", ns.name());
127 info!("🧰 base_dir: {:?}", ns.base_dir());
128 info!("🕰 start time: {:?}", start_time);
129 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
130
131 network_spec
132 .populate_nodes_available_args(ns.clone())
133 .await?;
134
135 let base_dir = ns.base_dir().to_string_lossy();
136 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
137 network_spec
139 .relaychain
140 .chain_spec
141 .build(&ns, &scoped_fs)
142 .await?;
143
144 debug!("relaychain spec built!");
145 let relay_chain_id = network_spec
147 .relaychain
148 .chain_spec
149 .read_chain_id(&scoped_fs)
150 .await?;
151
152 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
153 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
154 network_spec
155 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
156 .await?;
157
158 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
160 Vec<&ParachainSpec>,
161 Vec<&ParachainSpec>,
162 ) = network_spec
163 .parachains
164 .iter()
165 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
166 .partition(|para| {
167 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
168 });
169
170 let mut para_artifacts = vec![];
171 for para in para_to_register_in_genesis {
172 let genesis_config = para.get_genesis_config()?;
173 para_artifacts.push(genesis_config)
174 }
175
176 network_spec
178 .relaychain
179 .chain_spec
180 .customize_relay(
181 &network_spec.relaychain,
182 &network_spec.hrmp_channels,
183 para_artifacts,
184 &scoped_fs,
185 )
186 .await?;
187
188 network_spec
190 .relaychain
191 .chain_spec
192 .build_raw(&ns, &scoped_fs)
193 .await?;
194
195 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
197 network_spec
198 .relaychain
199 .chain_spec
200 .override_code(&scoped_fs, wasm_override)
201 .await?;
202 }
203
204 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
206 network_spec
207 .relaychain
208 .chain_spec
209 .override_raw_spec(&scoped_fs, raw_spec_override)
210 .await?;
211 }
212
213 let (bootnodes, relaynodes) =
214 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
215
216 let mut ctx = SpawnNodeCtx {
218 chain_id: &relay_chain_id,
219 parachain_id: None,
220 chain: relay_chain_name.as_str(),
221 role: ZombieRole::Node,
222 ns: &ns,
223 scoped_fs: &scoped_fs,
224 parachain: None,
225 bootnodes_addr: &vec![],
226 wait_ready: false,
227 nodes_by_name: json!({}),
228 };
229
230 let global_files_to_inject = vec![TransferedFile::new(
231 PathBuf::from(format!(
232 "{}/{relay_chain_name}.json",
233 ns.base_dir().to_string_lossy()
234 )),
235 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
236 )];
237
238 let r = Relaychain::new(
239 relay_chain_name.to_string(),
240 relay_chain_id.clone(),
241 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
242 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
243 )?),
244 );
245 let mut network =
246 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
247
248 let mut node_ws_url: String = "".to_string();
250
251 let mut bootnodes_addr: Vec<String> = vec![];
253
254 for level in dependency_levels_among(&bootnodes)? {
255 let mut running_nodes_per_level = vec![];
256 for chunk in level.chunks(spawn_concurrency) {
257 let spawning_tasks = chunk
258 .iter()
259 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
260
261 for node in futures::future::try_join_all(spawning_tasks).await? {
262 let bootnode_multiaddr = node.multiaddr();
263
264 bootnodes_addr.push(bootnode_multiaddr.to_string());
265
266 if node_ws_url.is_empty() {
268 node_ws_url.clone_from(&node.ws_uri)
269 }
270
271 running_nodes_per_level.push(node);
272 }
273 }
274 info!(
275 "🕰 waiting for level: {:?} to be up...",
276 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
277 );
278
279 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
281 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
282 });
283
284 let _ = futures::future::try_join_all(waiting_tasks).await?;
285
286 for node in running_nodes_per_level {
287 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
289 network.add_running_node(node, None).await;
290 }
291 }
292
293 network_spec
295 .relaychain
296 .chain_spec
297 .add_bootnodes(&scoped_fs, &bootnodes_addr)
298 .await?;
299
300 ctx.bootnodes_addr = &bootnodes_addr;
301
302 for level in dependency_levels_among(&relaynodes)? {
303 let mut running_nodes_per_level = vec![];
304 for chunk in level.chunks(spawn_concurrency) {
305 let spawning_tasks = chunk
306 .iter()
307 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
308
309 for node in futures::future::try_join_all(spawning_tasks).await? {
310 running_nodes_per_level.push(node);
311 }
312 }
313 info!(
314 "🕰 waiting for level: {:?} to be up...",
315 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
316 );
317
318 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
320 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
321 });
322
323 let _ = futures::future::try_join_all(waiting_tasks).await?;
324
325 for node in running_nodes_per_level {
326 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
327 network.add_running_node(node, None).await;
328 }
329 }
330
331 for para in network_spec.parachains.iter() {
333 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
335 let parachain_id = parachain.chain_id.clone();
336
337 let (bootnodes, collators) =
338 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
339
340 let mut ctx_para = SpawnNodeCtx {
342 parachain: Some(para),
343 parachain_id: parachain_id.as_deref(),
344 role: if para.is_cumulus_based {
345 ZombieRole::CumulusCollator
346 } else {
347 ZombieRole::Collator
348 },
349 bootnodes_addr: &vec![],
350 ..ctx.clone()
351 };
352
353 let mut bootnodes_addr: Vec<String> = vec![];
355 let mut running_nodes: Vec<NetworkNode> = vec![];
356
357 for level in dependency_levels_among(&bootnodes)? {
358 let mut running_nodes_per_level = vec![];
359 for chunk in level.chunks(spawn_concurrency) {
360 let spawning_tasks = chunk.iter().map(|node| {
361 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
362 });
363
364 for node in futures::future::try_join_all(spawning_tasks).await? {
365 let bootnode_multiaddr = node.multiaddr();
366
367 bootnodes_addr.push(bootnode_multiaddr.to_string());
368
369 running_nodes_per_level.push(node);
370 }
371 }
372 info!(
373 "🕰 waiting for level: {:?} to be up...",
374 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
375 );
376
377 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
379 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
380 });
381
382 let _ = futures::future::try_join_all(waiting_tasks).await?;
383
384 for node in running_nodes_per_level {
385 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
386 running_nodes.push(node);
387 }
388 }
389
390 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
391 para_chain_spec
392 .add_bootnodes(&scoped_fs, &bootnodes_addr)
393 .await?;
394 }
395
396 ctx_para.bootnodes_addr = &bootnodes_addr;
397
398 for level in dependency_levels_among(&collators)? {
400 let mut running_nodes_per_level = vec![];
401 for chunk in level.chunks(spawn_concurrency) {
402 let spawning_tasks = chunk.iter().map(|node| {
403 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
404 });
405
406 for node in futures::future::try_join_all(spawning_tasks).await? {
407 running_nodes_per_level.push(node);
408 }
409 }
410 info!(
411 "🕰 waiting for level: {:?} to be up...",
412 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
413 );
414
415 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
417 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
418 });
419
420 let _ = futures::future::try_join_all(waiting_tasks).await?;
421
422 for node in running_nodes_per_level {
423 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
424 running_nodes.push(node);
425 }
426 }
427
428 let running_para_id = parachain.para_id;
429 network.add_para(parachain);
430 for node in running_nodes {
431 network.add_running_node(node, Some(running_para_id)).await;
432 }
433 }
434
435 for para in para_to_register_with_extrinsic {
443 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
444 id: para.id,
445 wasm_path: para
447 .genesis_wasm
448 .artifact_path()
449 .ok_or(OrchestratorError::InvariantError(
450 "artifact path for wasm must be set at this point",
451 ))?
452 .to_path_buf(),
453 state_path: para
454 .genesis_state
455 .artifact_path()
456 .ok_or(OrchestratorError::InvariantError(
457 "artifact path for state must be set at this point",
458 ))?
459 .to_path_buf(),
460 node_ws_url: node_ws_url.clone(),
461 onboard_as_para: para.onboard_as_parachain,
462 seed: None, finalization: false,
464 };
465
466 Parachain::register(register_para_options, &scoped_fs).await?;
467 }
468
469 let mut zombie_json = serde_json::to_value(&network)?;
471 zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
472 zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
473
474 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
475 zombie_json["start_time_ts"] =
476 serde_json::value::Value::String(start_time_ts.as_millis().to_string());
477 } else {
478 warn!("⚠️ Error getting start_time timestamp");
480 }
481
482 scoped_fs
483 .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
484 .await?;
485
486 if network_spec.global_settings.tear_down_on_failure() {
487 network.spawn_watching_task();
488 }
489
490 Ok(network)
491 }
492}
493
494fn split_nodes_by_bootnodes(
499 nodes: &[NodeSpec],
500 no_default_bootnodes: bool,
501) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
502 let mut bootnodes = vec![];
504 let mut other_nodes = vec![];
505 nodes.iter().for_each(|node| {
506 if node.is_bootnode {
507 bootnodes.push(node)
508 } else {
509 other_nodes.push(node)
510 }
511 });
512
513 if bootnodes.is_empty() && !no_default_bootnodes {
514 bootnodes.push(other_nodes.remove(0))
515 }
516
517 (bootnodes, other_nodes)
518}
519
520fn generate_bootnode_addr(
522 node: &NetworkNode,
523 ip: &IpAddr,
524 port: u16,
525) -> Result<String, GeneratorError> {
526 generators::generate_node_bootnode_addr(
527 &node.spec.peer_id,
528 ip,
529 port,
530 node.inner.args().as_ref(),
531 &node.spec.p2p_cert_hash,
532 )
533}
534fn validate_spec_with_provider_capabilities(
536 network_spec: &NetworkSpec,
537 capabilities: &ProviderCapabilities,
538) -> Result<(), anyhow::Error> {
539 let mut errs: Vec<String> = vec![];
540
541 if capabilities.requires_image {
542 if network_spec.relaychain.default_image.is_none() {
544 let nodes = &network_spec.relaychain.nodes;
546 if nodes.iter().any(|node| node.image.is_none()) {
547 errs.push(String::from(
548 "Missing image for node, and not default is set at relaychain",
549 ));
550 }
551 };
552
553 for para in &network_spec.parachains {
555 if para.default_image.is_none() {
556 let nodes = ¶.collators;
557 if nodes.iter().any(|node| node.image.is_none()) {
558 errs.push(format!(
559 "Missing image for node, and not default is set at parachain {}",
560 para.id
561 ));
562 }
563 }
564 }
565 } else {
566 let mut cmds: HashSet<&str> = Default::default();
569 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
570 cmds.insert(cmd.as_str());
571 }
572 for node in network_spec.relaychain().nodes.iter() {
573 cmds.insert(node.command());
574 }
575
576 for para in &network_spec.parachains {
578 if let Some(cmd) = para.default_command.as_ref() {
579 cmds.insert(cmd.as_str());
580 }
581
582 for node in para.collators.iter() {
583 cmds.insert(node.command());
584 }
585 }
586
587 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
590 let parts: Vec<_> = path.split(":").collect();
591 for cmd in cmds {
592 let missing = if cmd.contains('/') {
593 trace!("checking {cmd}");
594 if std::fs::metadata(cmd).is_err() {
595 true
596 } else {
597 info!("🔎 We will use the full path {cmd} to spawn nodes.");
598 false
599 }
600 } else {
601 !parts.iter().any(|part| {
603 let path_to = format!("{part}/{cmd}");
604 trace!("checking {path_to}");
605 let check_result = std::fs::metadata(&path_to);
606 trace!("result {:?}", check_result);
607 if check_result.is_ok() {
608 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
609 true
610 } else {
611 false
612 }
613 })
614 };
615
616 if missing {
617 errs.push(help_msg(cmd));
618 }
619 }
620 }
621
622 if !errs.is_empty() {
623 let msg = errs.join("\n");
624 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
625 }
626
627 Ok(())
628}
629
630fn help_msg(cmd: &str) -> String {
631 match cmd {
632 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
633 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
634 },
635 "polkadot" => {
636 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
637 },
638 "polkadot-parachain" => {
639 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
640 },
641 _ => {
642 format!("Missing binary {cmd}, please compile it.")
643 },
644 }
645}
646
647fn spawn_concurrency_from_env() -> Option<usize> {
649 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
650 concurrency.parse::<usize>().ok()
651 } else {
652 None
653 }
654}
655
656fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
657 let desired_spawn_concurrency = match (
658 spawn_concurrency_from_env(),
659 spec.global_settings.spawn_concurrency(),
660 ) {
661 (Some(n), _) => Some(n),
662 (None, Some(n)) => Some(n),
663 _ => None,
664 };
665
666 let (spawn_concurrency, limited_by_tokens) =
667 if let Some(spawn_concurrency) = desired_spawn_concurrency {
668 if spawn_concurrency == 1 {
669 (1, false)
670 } else if has_tokens(&serde_json::to_string(spec)?) {
671 (1, true)
672 } else {
673 (spawn_concurrency, false)
674 }
675 } else {
676 if has_tokens(&serde_json::to_string(spec)?) {
678 (1, true)
679 } else {
680 (100, false)
682 }
683 };
684
685 Ok((spawn_concurrency, limited_by_tokens))
686}
687
688fn dependency_levels_among<'a>(
693 nodes: &'a [&'a NodeSpec],
694) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
695 let by_name = nodes
696 .iter()
697 .map(|n| (n.name.as_str(), *n))
698 .collect::<HashMap<_, _>>();
699
700 let mut graph = HashMap::with_capacity(nodes.len());
701 let mut indegree = HashMap::with_capacity(nodes.len());
702
703 for node in nodes {
704 graph.insert(node.name.as_str(), Vec::new());
705 indegree.insert(node.name.as_str(), 0);
706 }
707
708 for &node in nodes {
710 if let Ok(args_json) = serde_json::to_string(&node.args) {
711 let unique_deps = get_tokens_to_replace(&args_json)
713 .into_iter()
714 .filter(|dep| dep != &node.name)
715 .filter_map(|dep| by_name.get(dep.as_str()))
716 .map(|&dep_node| dep_node.name.as_str())
717 .collect::<HashSet<_>>();
718
719 for dep_name in unique_deps {
720 graph
721 .get_mut(dep_name)
722 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
723 .push(node);
724 *indegree
725 .get_mut(node.name.as_str())
726 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
727 }
728 }
729 }
730
731 let mut queue = nodes
733 .iter()
734 .filter(|n| {
735 *indegree
736 .get(n.name.as_str())
737 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
738 == 0
739 })
740 .copied()
741 .collect::<VecDeque<_>>();
742
743 let mut processed_count = 0;
744 let mut levels = Vec::new();
745
746 while !queue.is_empty() {
748 let level_size = queue.len();
749 let mut current_level = Vec::with_capacity(level_size);
750
751 for _ in 0..level_size {
752 let n = queue
753 .pop_front()
754 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
755 current_level.push(n);
756 processed_count += 1;
757
758 for &neighbour in graph
759 .get(n.name.as_str())
760 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
761 {
762 let neighbour_indegree = indegree
763 .get_mut(neighbour.name.as_str())
764 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
765 *neighbour_indegree -= 1;
766
767 if *neighbour_indegree == 0 {
768 queue.push_back(neighbour);
769 }
770 }
771 }
772
773 current_level.sort_by_key(|n| &n.name);
774 levels.push(current_level);
775 }
776
777 if processed_count != nodes.len() {
779 return Err(OrchestratorError::InvalidConfig(
780 "Tokens have cyclical dependencies".to_string(),
781 ));
782 }
783
784 Ok(levels)
785}
786
787#[derive(Clone, Debug)]
795pub struct ScopedFilesystem<'a, FS: FileSystem> {
796 fs: &'a FS,
797 base_dir: &'a str,
798}
799
800impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
801 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
802 Self { fs, base_dir }
803 }
804
805 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
806 for file in files {
807 let full_remote_path = PathBuf::from(format!(
808 "{}/{}",
809 self.base_dir,
810 file.remote_path.to_string_lossy()
811 ));
812 trace!("coping file: {file}");
813 self.fs
814 .copy(file.local_path.as_path(), full_remote_path)
815 .await?;
816 }
817 Ok(())
818 }
819
820 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
821 let file = file.as_ref();
822
823 let full_path = if file.is_absolute() {
824 file.to_owned()
825 } else {
826 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
827 };
828 let content = self.fs.read_to_string(full_path).await?;
829 Ok(content)
830 }
831
832 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
833 let path = PathBuf::from(format!(
834 "{}/{}",
835 self.base_dir,
836 path.as_ref().to_string_lossy()
837 ));
838 self.fs.create_dir(path).await
839 }
840
841 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
842 let path = PathBuf::from(format!(
843 "{}/{}",
844 self.base_dir,
845 path.as_ref().to_string_lossy()
846 ));
847 self.fs.create_dir_all(path).await
848 }
849
850 async fn write(
851 &self,
852 path: impl AsRef<Path>,
853 contents: impl AsRef<[u8]> + Send,
854 ) -> Result<(), FileSystemError> {
855 let path = path.as_ref();
856
857 let full_path = if path.is_absolute() {
858 path.to_owned()
859 } else {
860 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
861 };
862
863 self.fs.write(full_path, contents).await
864 }
865}
866
867#[derive(Clone, Debug)]
868pub enum ZombieRole {
869 Temp,
870 Node,
871 Bootnode,
872 Collator,
873 CumulusCollator,
874 Companion,
875}
876
877pub use network::{AddCollatorOptions, AddNodeOptions};
879pub use network_helper::metrics;
880#[cfg(feature = "pjs")]
881pub use pjs_helper::PjsResult;
882
883#[cfg(test)]
884mod tests {
885 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
886 use lazy_static::lazy_static;
887 use tokio::sync::Mutex;
888
889 use super::*;
890
891 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
892 lazy_static! {
894 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
895 }
896
897 fn set_env(concurrency: Option<u32>) {
898 if let Some(value) = concurrency {
899 env::set_var(ENV_KEY, value.to_string());
900 } else {
901 env::remove_var(ENV_KEY);
902 }
903 }
904
905 fn generate(
906 with_image: bool,
907 with_cmd: Option<&'static str>,
908 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
909 NetworkConfigBuilder::new()
910 .with_relaychain(|r| {
911 let mut relay = r
912 .with_chain("rococo-local")
913 .with_default_command(with_cmd.unwrap_or("polkadot"));
914 if with_image {
915 relay = relay.with_default_image("docker.io/parity/polkadot")
916 }
917
918 relay
919 .with_validator(|node| node.with_name("alice"))
920 .with_validator(|node| node.with_name("bob"))
921 })
922 .with_parachain(|p| {
923 p.with_id(2000).cumulus_based(true).with_collator(|n| {
924 let node = n
925 .with_name("collator")
926 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
927 if with_image {
928 node.with_image("docker.io/paritypr/test-parachain")
929 } else {
930 node
931 }
932 })
933 })
934 .build()
935 }
936
937 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
938 let mut spec = NodeSpec {
939 name: name.to_string(),
940 ..Default::default()
941 };
942 if let Some(dependencies) = dependencies {
943 for node in dependencies {
944 spec.args.push(
945 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
946 .as_str()
947 .into(),
948 );
949 }
950 }
951 spec
952 }
953
954 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
955 actual_levels
956 .iter()
957 .zip(expected_levels)
958 .for_each(|(actual_level, expected_level)| {
959 assert_eq!(actual_level.len(), expected_level.len());
960 actual_level
961 .iter()
962 .zip(expected_level.iter())
963 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
964 });
965 }
966
967 #[tokio::test]
968 async fn valid_config_with_image() {
969 let network_config = generate(true, None).unwrap();
970 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
971 let caps = ProviderCapabilities {
972 requires_image: true,
973 has_resources: false,
974 prefix_with_full_path: false,
975 use_default_ports_in_cmd: false,
976 };
977
978 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
979 assert!(valid.is_ok())
980 }
981
982 #[tokio::test]
983 async fn invalid_config_without_image() {
984 let network_config = generate(false, None).unwrap();
985 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
986 let caps = ProviderCapabilities {
987 requires_image: true,
988 has_resources: false,
989 prefix_with_full_path: false,
990 use_default_ports_in_cmd: false,
991 };
992
993 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
994 assert!(valid.is_err())
995 }
996
997 #[tokio::test]
998 async fn invalid_config_missing_cmd() {
999 let network_config = generate(false, Some("other")).unwrap();
1000 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1001 let caps = ProviderCapabilities {
1002 requires_image: false,
1003 has_resources: false,
1004 prefix_with_full_path: false,
1005 use_default_ports_in_cmd: false,
1006 };
1007
1008 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1009 assert!(valid.is_err())
1010 }
1011
1012 #[tokio::test]
1013 async fn valid_config_present_cmd() {
1014 let network_config = generate(false, Some("cargo")).unwrap();
1015 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1016 let caps = ProviderCapabilities {
1017 requires_image: false,
1018 has_resources: false,
1019 prefix_with_full_path: false,
1020 use_default_ports_in_cmd: false,
1021 };
1022
1023 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1024 println!("{valid:?}");
1025 assert!(valid.is_ok())
1026 }
1027
1028 #[tokio::test]
1029 async fn default_spawn_concurrency() {
1030 let _g = ENV_MUTEX.lock().await;
1031 set_env(None);
1032 let network_config = generate(false, Some("cargo")).unwrap();
1033 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1034 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1035 assert_eq!(concurrency, 100);
1036 }
1037
1038 #[tokio::test]
1039 async fn set_spawn_concurrency() {
1040 let _g = ENV_MUTEX.lock().await;
1041 set_env(None);
1042
1043 let network_config = generate(false, Some("cargo")).unwrap();
1044 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1045
1046 let global_settings = GlobalSettingsBuilder::new()
1047 .with_spawn_concurrency(4)
1048 .build()
1049 .unwrap();
1050
1051 spec.set_global_settings(global_settings);
1052 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1053 assert_eq!(concurrency, 4);
1054 assert!(!limited);
1055 }
1056
1057 #[tokio::test]
1058 async fn set_spawn_concurrency_but_limited() {
1059 let _g = ENV_MUTEX.lock().await;
1060 set_env(None);
1061
1062 let network_config = generate(false, Some("cargo")).unwrap();
1063 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1064
1065 let global_settings = GlobalSettingsBuilder::new()
1066 .with_spawn_concurrency(4)
1067 .build()
1068 .unwrap();
1069
1070 spec.set_global_settings(global_settings);
1071 let node = spec.relaychain.nodes.first_mut().unwrap();
1072 node.args
1073 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1074 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1075 assert_eq!(concurrency, 1);
1076 assert!(limited);
1077 }
1078
1079 #[tokio::test]
1080 async fn set_spawn_concurrency_from_env() {
1081 let _g = ENV_MUTEX.lock().await;
1082 set_env(Some(10));
1083
1084 let network_config = generate(false, Some("cargo")).unwrap();
1085 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1086 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1087 assert_eq!(concurrency, 10);
1088 assert!(!limited);
1089 }
1090
1091 #[tokio::test]
1092 async fn set_spawn_concurrency_from_env_but_limited() {
1093 let _g = ENV_MUTEX.lock().await;
1094 set_env(Some(12));
1095
1096 let network_config = generate(false, Some("cargo")).unwrap();
1097 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1098 let node = spec.relaychain.nodes.first_mut().unwrap();
1099 node.args
1100 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1101 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1102 assert_eq!(concurrency, 1);
1103 assert!(limited);
1104 }
1105
1106 #[test]
1107 fn dependency_levels_among_should_work() {
1108 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1110
1111 let alice = get_node_with_dependencies("alice", None);
1113 let nodes = [&alice];
1114
1115 let levels = dependency_levels_among(&nodes).unwrap();
1116 let expected = vec![vec!["alice"]];
1117
1118 verify_levels(levels, expected);
1119
1120 let alice = get_node_with_dependencies("alice", None);
1122 let bob = get_node_with_dependencies("bob", None);
1123 let nodes = [&alice, &bob];
1124
1125 let levels = dependency_levels_among(&nodes).unwrap();
1126 let expected = vec![vec!["alice", "bob"]];
1127
1128 verify_levels(levels, expected);
1129
1130 let alice = get_node_with_dependencies("alice", None);
1132 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1133 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1134 let nodes = [&alice, &bob, &charlie];
1135
1136 let levels = dependency_levels_among(&nodes).unwrap();
1137 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1138
1139 verify_levels(levels, expected);
1140
1141 let alice = get_node_with_dependencies("alice", None);
1145 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1146 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1147 let nodes = [&alice, &bob, &charlie];
1148
1149 let levels = dependency_levels_among(&nodes).unwrap();
1150 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1151
1152 verify_levels(levels, expected);
1153
1154 let alice = get_node_with_dependencies("alice", None);
1158 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1159 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1160 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1161 let nodes = [&alice, &bob, &charlie, &dave];
1162
1163 let levels = dependency_levels_among(&nodes).unwrap();
1164 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1165
1166 verify_levels(levels, expected);
1167 }
1168
1169 #[test]
1170 fn dependency_levels_among_should_detect_cycles() {
1171 let mut alice = get_node_with_dependencies("alice", None);
1172 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1173 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1174
1175 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1176 }
1177}