1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 env,
19 net::IpAddr,
20 path::{Path, PathBuf},
21 time::{Duration, SystemTime},
22};
23
24use configuration::{NetworkConfig, RegistrationStrategy};
25use errors::OrchestratorError;
26use generators::errors::GeneratorError;
27use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
28pub use network_spec::NetworkSpec;
30use network_spec::{node::NodeSpec, parachain::ParachainSpec};
31use provider::{
32 types::{ProviderCapabilities, TransferedFile},
33 DynProvider,
34};
35use serde_json::json;
36use support::{
37 constants::{
38 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
39 THIS_IS_A_BUG,
40 },
41 fs::{FileSystem, FileSystemError},
42 replacer::{get_tokens_to_replace, has_tokens},
43};
44use tokio::time::timeout;
45use tracing::{debug, info, trace, warn};
46
47use crate::{
48 shared::{constants::DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS, types::RegisterParachainOptions},
49 spawner::SpawnNodeCtx,
50};
51pub struct Orchestrator<T>
52where
53 T: FileSystem + Sync + Send,
54{
55 filesystem: T,
56 provider: DynProvider,
57}
58
59impl<T> Orchestrator<T>
60where
61 T: FileSystem + Sync + Send + Clone,
62{
63 pub fn new(filesystem: T, provider: DynProvider) -> Self {
64 Self {
65 filesystem,
66 provider,
67 }
68 }
69
70 pub async fn spawn(
71 &self,
72 network_config: NetworkConfig,
73 ) -> Result<Network<T>, OrchestratorError> {
74 let global_timeout = network_config.global_settings().network_spawn_timeout();
75 let network_spec = NetworkSpec::from_config(&network_config).await?;
76
77 let res = timeout(
78 Duration::from_secs(global_timeout.into()),
79 self.spawn_inner(network_spec),
80 )
81 .await
82 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
83 res?
84 }
85
86 pub async fn spawn_from_spec(
87 &self,
88 network_spec: NetworkSpec,
89 ) -> Result<Network<T>, OrchestratorError> {
90 let global_timeout = network_spec.global_settings.network_spawn_timeout();
91 let res = timeout(
92 Duration::from_secs(global_timeout as u64),
93 self.spawn_inner(network_spec),
94 )
95 .await
96 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
97 res?
98 }
99
100 async fn spawn_inner(
101 &self,
102 mut network_spec: NetworkSpec,
103 ) -> Result<Network<T>, OrchestratorError> {
104 debug!(network_spec = ?network_spec,"Network spec to spawn");
106
107 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
109 .map_err(|err| {
110 OrchestratorError::InvalidConfigForProvider(
111 self.provider.name().into(),
112 err.to_string(),
113 )
114 })?;
115
116 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
118 self.provider
119 .create_namespace_with_base_dir(base_dir)
120 .await?
121 } else {
122 self.provider.create_namespace().await?
123 };
124
125 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
127
128 let start_time = SystemTime::now();
129 info!("🧰 ns: {}", ns.name());
130 info!("🧰 base_dir: {:?}", ns.base_dir());
131 info!("🕰 start time: {:?}", start_time);
132 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
133
134 network_spec
135 .populate_nodes_available_args(ns.clone())
136 .await?;
137
138 let base_dir = ns.base_dir().to_string_lossy();
139 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
140 network_spec
142 .relaychain
143 .chain_spec
144 .build(&ns, &scoped_fs)
145 .await?;
146
147 debug!("relaychain spec built!");
148 let relay_chain_id = network_spec
150 .relaychain
151 .chain_spec
152 .read_chain_id(&scoped_fs)
153 .await?;
154
155 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
156 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
157 network_spec
158 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
159 .await?;
160
161 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
163 Vec<&ParachainSpec>,
164 Vec<&ParachainSpec>,
165 ) = network_spec
166 .parachains
167 .iter()
168 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
169 .partition(|para| {
170 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
171 });
172
173 let mut para_artifacts = vec![];
174 for para in para_to_register_in_genesis {
175 let genesis_config = para.get_genesis_config()?;
176 para_artifacts.push(genesis_config)
177 }
178
179 network_spec
181 .relaychain
182 .chain_spec
183 .customize_relay(
184 &network_spec.relaychain,
185 &network_spec.hrmp_channels,
186 para_artifacts,
187 &scoped_fs,
188 )
189 .await?;
190
191 network_spec
193 .relaychain
194 .chain_spec
195 .build_raw(&ns, &scoped_fs)
196 .await?;
197
198 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
200 network_spec
201 .relaychain
202 .chain_spec
203 .override_code(&scoped_fs, wasm_override)
204 .await?;
205 }
206
207 let (bootnodes, relaynodes) =
208 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
209
210 let mut ctx = SpawnNodeCtx {
212 chain_id: &relay_chain_id,
213 parachain_id: None,
214 chain: relay_chain_name.as_str(),
215 role: ZombieRole::Node,
216 ns: &ns,
217 scoped_fs: &scoped_fs,
218 parachain: None,
219 bootnodes_addr: &vec![],
220 wait_ready: false,
221 nodes_by_name: json!({}),
222 };
223
224 let global_files_to_inject = vec![TransferedFile::new(
225 PathBuf::from(format!(
226 "{}/{relay_chain_name}.json",
227 ns.base_dir().to_string_lossy()
228 )),
229 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
230 )];
231
232 let r = Relaychain::new(
233 relay_chain_name.to_string(),
234 relay_chain_id.clone(),
235 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
236 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
237 )?),
238 );
239 let mut network =
240 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
241
242 let mut node_ws_url: String = "".to_string();
244
245 let mut bootnodes_addr: Vec<String> = vec![];
247
248 for level in dependency_levels_among(&bootnodes)? {
249 let mut running_nodes_per_level = vec![];
250 for chunk in level.chunks(spawn_concurrency) {
251 let spawning_tasks = chunk
252 .iter()
253 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
254
255 for node in futures::future::try_join_all(spawning_tasks).await? {
256 let bootnode_multiaddr = node.multiaddr();
257
258 bootnodes_addr.push(bootnode_multiaddr.to_string());
259
260 if node_ws_url.is_empty() {
262 node_ws_url.clone_from(&node.ws_uri)
263 }
264
265 running_nodes_per_level.push(node);
266 }
267 }
268 info!(
269 "🕰 waiting for level: {:?} to be up...",
270 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
271 );
272
273 let waiting_tasks = running_nodes_per_level
275 .iter()
276 .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
277
278 let _ = futures::future::try_join_all(waiting_tasks).await?;
279
280 for node in running_nodes_per_level {
281 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
283 network.add_running_node(node, None);
284 }
285 }
286
287 network_spec
289 .relaychain
290 .chain_spec
291 .add_bootnodes(&scoped_fs, &bootnodes_addr)
292 .await?;
293
294 ctx.bootnodes_addr = &bootnodes_addr;
295
296 for level in dependency_levels_among(&relaynodes)? {
297 let mut running_nodes_per_level = vec![];
298 for chunk in level.chunks(spawn_concurrency) {
299 let spawning_tasks = chunk
300 .iter()
301 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
302
303 for node in futures::future::try_join_all(spawning_tasks).await? {
304 running_nodes_per_level.push(node);
305 }
306 }
307 info!(
308 "🕰 waiting for level: {:?} to be up...",
309 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
310 );
311
312 let waiting_tasks = running_nodes_per_level
314 .iter()
315 .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
316
317 let _ = futures::future::try_join_all(waiting_tasks).await?;
318
319 for node in running_nodes_per_level {
320 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
321 network.add_running_node(node, None);
322 }
323 }
324
325 for para in network_spec.parachains.iter() {
327 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
329 let parachain_id = parachain.chain_id.clone();
330
331 let (bootnodes, collators) =
332 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
333
334 let mut ctx_para = SpawnNodeCtx {
336 parachain: Some(para),
337 parachain_id: parachain_id.as_deref(),
338 role: if para.is_cumulus_based {
339 ZombieRole::CumulusCollator
340 } else {
341 ZombieRole::Collator
342 },
343 bootnodes_addr: &vec![],
344 ..ctx.clone()
345 };
346
347 let mut bootnodes_addr: Vec<String> = vec![];
349 let mut running_nodes: Vec<NetworkNode> = vec![];
350
351 for level in dependency_levels_among(&bootnodes)? {
352 let mut running_nodes_per_level = vec![];
353 for chunk in level.chunks(spawn_concurrency) {
354 let spawning_tasks = chunk.iter().map(|node| {
355 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
356 });
357
358 for node in futures::future::try_join_all(spawning_tasks).await? {
359 let bootnode_multiaddr = node.multiaddr();
360
361 bootnodes_addr.push(bootnode_multiaddr.to_string());
362
363 running_nodes_per_level.push(node);
364 }
365 }
366 info!(
367 "🕰 waiting for level: {:?} to be up...",
368 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
369 );
370
371 let waiting_tasks = running_nodes_per_level
373 .iter()
374 .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
375
376 let _ = futures::future::try_join_all(waiting_tasks).await?;
377
378 for node in running_nodes_per_level {
379 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
380 running_nodes.push(node);
381 }
382 }
383
384 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
385 para_chain_spec
386 .add_bootnodes(&scoped_fs, &bootnodes_addr)
387 .await?;
388 }
389
390 ctx_para.bootnodes_addr = &bootnodes_addr;
391
392 for level in dependency_levels_among(&collators)? {
394 let mut running_nodes_per_level = vec![];
395 for chunk in level.chunks(spawn_concurrency) {
396 let spawning_tasks = chunk.iter().map(|node| {
397 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
398 });
399
400 for node in futures::future::try_join_all(spawning_tasks).await? {
401 running_nodes_per_level.push(node);
402 }
403 }
404 info!(
405 "🕰 waiting for level: {:?} to be up...",
406 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
407 );
408
409 let waiting_tasks = running_nodes_per_level
411 .iter()
412 .map(|node| node.wait_until_is_up(DEFAULT_NODE_SPAWN_TIMEOUT_SECONDS));
413
414 let _ = futures::future::try_join_all(waiting_tasks).await?;
415
416 for node in running_nodes_per_level {
417 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
418 running_nodes.push(node);
419 }
420 }
421
422 let running_para_id = parachain.para_id;
423 network.add_para(parachain);
424 for node in running_nodes {
425 network.add_running_node(node, Some(running_para_id));
426 }
427 }
428
429 for para in para_to_register_with_extrinsic {
437 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
438 id: para.id,
439 wasm_path: para
441 .genesis_wasm
442 .artifact_path()
443 .ok_or(OrchestratorError::InvariantError(
444 "artifact path for wasm must be set at this point",
445 ))?
446 .to_path_buf(),
447 state_path: para
448 .genesis_state
449 .artifact_path()
450 .ok_or(OrchestratorError::InvariantError(
451 "artifact path for state must be set at this point",
452 ))?
453 .to_path_buf(),
454 node_ws_url: node_ws_url.clone(),
455 onboard_as_para: para.onboard_as_parachain,
456 seed: None, finalization: false,
458 };
459
460 Parachain::register(register_para_options, &scoped_fs).await?;
461 }
462
463 let mut zombie_json = serde_json::to_value(&network)?;
465 zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
466 zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
467
468 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
469 zombie_json["start_time_ts"] =
470 serde_json::value::Value::String(start_time_ts.as_millis().to_string());
471 } else {
472 warn!("⚠️ Error getting start_time timestamp");
474 }
475
476 scoped_fs
477 .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
478 .await?;
479 Ok(network)
480 }
481}
482
483fn split_nodes_by_bootnodes(
488 nodes: &[NodeSpec],
489 no_default_bootnodes: bool,
490) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
491 let mut bootnodes = vec![];
493 let mut other_nodes = vec![];
494 nodes.iter().for_each(|node| {
495 if node.is_bootnode {
496 bootnodes.push(node)
497 } else {
498 other_nodes.push(node)
499 }
500 });
501
502 if bootnodes.is_empty() && !no_default_bootnodes {
503 bootnodes.push(other_nodes.remove(0))
504 }
505
506 (bootnodes, other_nodes)
507}
508
509fn generate_bootnode_addr(
511 node: &NetworkNode,
512 ip: &IpAddr,
513 port: u16,
514) -> Result<String, GeneratorError> {
515 generators::generate_node_bootnode_addr(
516 &node.spec.peer_id,
517 ip,
518 port,
519 node.inner.args().as_ref(),
520 &node.spec.p2p_cert_hash,
521 )
522}
523fn validate_spec_with_provider_capabilities(
525 network_spec: &NetworkSpec,
526 capabilities: &ProviderCapabilities,
527) -> Result<(), anyhow::Error> {
528 let mut errs: Vec<String> = vec![];
529
530 if capabilities.requires_image {
531 if network_spec.relaychain.default_image.is_none() {
533 let nodes = &network_spec.relaychain.nodes;
535 if nodes.iter().any(|node| node.image.is_none()) {
536 errs.push(String::from(
537 "Missing image for node, and not default is set at relaychain",
538 ));
539 }
540 };
541
542 for para in &network_spec.parachains {
544 if para.default_image.is_none() {
545 let nodes = ¶.collators;
546 if nodes.iter().any(|node| node.image.is_none()) {
547 errs.push(format!(
548 "Missing image for node, and not default is set at parachain {}",
549 para.id
550 ));
551 }
552 }
553 }
554 } else {
555 let mut cmds: HashSet<&str> = Default::default();
558 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
559 cmds.insert(cmd.as_str());
560 }
561 for node in network_spec.relaychain().nodes.iter() {
562 cmds.insert(node.command());
563 }
564
565 for para in &network_spec.parachains {
567 if let Some(cmd) = para.default_command.as_ref() {
568 cmds.insert(cmd.as_str());
569 }
570
571 for node in para.collators.iter() {
572 cmds.insert(node.command());
573 }
574 }
575
576 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
579 let parts: Vec<_> = path.split(":").collect();
580 for cmd in cmds {
581 let missing = if cmd.contains('/') {
582 trace!("checking {cmd}");
583 if std::fs::metadata(cmd).is_err() {
584 true
585 } else {
586 info!("🔎 We will use the full path {cmd} to spawn nodes.");
587 false
588 }
589 } else {
590 !parts.iter().any(|part| {
592 let path_to = format!("{part}/{cmd}");
593 trace!("checking {path_to}");
594 let check_result = std::fs::metadata(&path_to);
595 trace!("result {:?}", check_result);
596 if check_result.is_ok() {
597 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
598 true
599 } else {
600 false
601 }
602 })
603 };
604
605 if missing {
606 errs.push(help_msg(cmd));
607 }
608 }
609 }
610
611 if !errs.is_empty() {
612 let msg = errs.join("\n");
613 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
614 }
615
616 Ok(())
617}
618
619fn help_msg(cmd: &str) -> String {
620 match cmd {
621 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
622 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
623 },
624 "polkadot" => {
625 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
626 },
627 "polkadot-parachain" => {
628 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
629 },
630 _ => {
631 format!("Missing binary {cmd}, please compile it.")
632 },
633 }
634}
635
636fn spawn_concurrency_from_env() -> Option<usize> {
638 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
639 concurrency.parse::<usize>().ok()
640 } else {
641 None
642 }
643}
644
645fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
646 let desired_spawn_concurrency = match (
647 spawn_concurrency_from_env(),
648 spec.global_settings.spawn_concurrency(),
649 ) {
650 (Some(n), _) => Some(n),
651 (None, Some(n)) => Some(n),
652 _ => None,
653 };
654
655 let (spawn_concurrency, limited_by_tokens) =
656 if let Some(spawn_concurrency) = desired_spawn_concurrency {
657 if spawn_concurrency == 1 {
658 (1, false)
659 } else if has_tokens(&serde_json::to_string(spec)?) {
660 (1, true)
661 } else {
662 (spawn_concurrency, false)
663 }
664 } else {
665 if has_tokens(&serde_json::to_string(spec)?) {
667 (1, true)
668 } else {
669 (100, false)
671 }
672 };
673
674 Ok((spawn_concurrency, limited_by_tokens))
675}
676
677fn dependency_levels_among<'a>(
682 nodes: &'a [&'a NodeSpec],
683) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
684 let by_name = nodes
685 .iter()
686 .map(|n| (n.name.as_str(), *n))
687 .collect::<HashMap<_, _>>();
688
689 let mut graph = HashMap::with_capacity(nodes.len());
690 let mut indegree = HashMap::with_capacity(nodes.len());
691
692 for node in nodes {
693 graph.insert(node.name.as_str(), Vec::new());
694 indegree.insert(node.name.as_str(), 0);
695 }
696
697 for &node in nodes {
699 if let Ok(args_json) = serde_json::to_string(&node.args) {
700 let unique_deps = get_tokens_to_replace(&args_json)
702 .into_iter()
703 .filter(|dep| dep != &node.name)
704 .filter_map(|dep| by_name.get(dep.as_str()))
705 .map(|&dep_node| dep_node.name.as_str())
706 .collect::<HashSet<_>>();
707
708 for dep_name in unique_deps {
709 graph
710 .get_mut(dep_name)
711 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
712 .push(node);
713 *indegree
714 .get_mut(node.name.as_str())
715 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
716 }
717 }
718 }
719
720 let mut queue = nodes
722 .iter()
723 .filter(|n| {
724 *indegree
725 .get(n.name.as_str())
726 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
727 == 0
728 })
729 .copied()
730 .collect::<VecDeque<_>>();
731
732 let mut processed_count = 0;
733 let mut levels = Vec::new();
734
735 while !queue.is_empty() {
737 let level_size = queue.len();
738 let mut current_level = Vec::with_capacity(level_size);
739
740 for _ in 0..level_size {
741 let n = queue
742 .pop_front()
743 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
744 current_level.push(n);
745 processed_count += 1;
746
747 for &neighbour in graph
748 .get(n.name.as_str())
749 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
750 {
751 let neighbour_indegree = indegree
752 .get_mut(neighbour.name.as_str())
753 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
754 *neighbour_indegree -= 1;
755
756 if *neighbour_indegree == 0 {
757 queue.push_back(neighbour);
758 }
759 }
760 }
761
762 current_level.sort_by_key(|n| &n.name);
763 levels.push(current_level);
764 }
765
766 if processed_count != nodes.len() {
768 return Err(OrchestratorError::InvalidConfig(
769 "Tokens have cyclical dependencies".to_string(),
770 ));
771 }
772
773 Ok(levels)
774}
775
776#[derive(Clone, Debug)]
784pub struct ScopedFilesystem<'a, FS: FileSystem> {
785 fs: &'a FS,
786 base_dir: &'a str,
787}
788
789impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
790 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
791 Self { fs, base_dir }
792 }
793
794 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
795 for file in files {
796 let full_remote_path = PathBuf::from(format!(
797 "{}/{}",
798 self.base_dir,
799 file.remote_path.to_string_lossy()
800 ));
801 trace!("coping file: {file}");
802 self.fs
803 .copy(file.local_path.as_path(), full_remote_path)
804 .await?;
805 }
806 Ok(())
807 }
808
809 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
810 let file = file.as_ref();
811
812 let full_path = if file.is_absolute() {
813 file.to_owned()
814 } else {
815 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
816 };
817 let content = self.fs.read_to_string(full_path).await?;
818 Ok(content)
819 }
820
821 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
822 let path = PathBuf::from(format!(
823 "{}/{}",
824 self.base_dir,
825 path.as_ref().to_string_lossy()
826 ));
827 self.fs.create_dir(path).await
828 }
829
830 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
831 let path = PathBuf::from(format!(
832 "{}/{}",
833 self.base_dir,
834 path.as_ref().to_string_lossy()
835 ));
836 self.fs.create_dir_all(path).await
837 }
838
839 async fn write(
840 &self,
841 path: impl AsRef<Path>,
842 contents: impl AsRef<[u8]> + Send,
843 ) -> Result<(), FileSystemError> {
844 let path = path.as_ref();
845
846 let full_path = if path.is_absolute() {
847 path.to_owned()
848 } else {
849 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
850 };
851
852 self.fs.write(full_path, contents).await
853 }
854}
855
856#[derive(Clone, Debug)]
857pub enum ZombieRole {
858 Temp,
859 Node,
860 Bootnode,
861 Collator,
862 CumulusCollator,
863 Companion,
864}
865
866pub use network::{AddCollatorOptions, AddNodeOptions};
868pub use network_helper::metrics;
869#[cfg(feature = "pjs")]
870pub use pjs_helper::PjsResult;
871
872#[cfg(test)]
873mod tests {
874 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
875 use lazy_static::lazy_static;
876 use tokio::sync::Mutex;
877
878 use super::*;
879
880 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
881 lazy_static! {
883 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
884 }
885
886 fn set_env(concurrency: Option<u32>) {
887 if let Some(value) = concurrency {
888 env::set_var(ENV_KEY, value.to_string());
889 } else {
890 env::remove_var(ENV_KEY);
891 }
892 }
893
894 fn generate(
895 with_image: bool,
896 with_cmd: Option<&'static str>,
897 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
898 NetworkConfigBuilder::new()
899 .with_relaychain(|r| {
900 let mut relay = r
901 .with_chain("rococo-local")
902 .with_default_command(with_cmd.unwrap_or("polkadot"));
903 if with_image {
904 relay = relay.with_default_image("docker.io/parity/polkadot")
905 }
906
907 relay
908 .with_node(|node| node.with_name("alice"))
909 .with_node(|node| node.with_name("bob"))
910 })
911 .with_parachain(|p| {
912 p.with_id(2000).cumulus_based(true).with_collator(|n| {
913 let node = n
914 .with_name("collator")
915 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
916 if with_image {
917 node.with_image("docker.io/paritypr/test-parachain")
918 } else {
919 node
920 }
921 })
922 })
923 .build()
924 }
925
926 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
927 let mut spec = NodeSpec {
928 name: name.to_string(),
929 ..Default::default()
930 };
931 if let Some(dependencies) = dependencies {
932 for node in dependencies {
933 spec.args.push(
934 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
935 .as_str()
936 .into(),
937 );
938 }
939 }
940 spec
941 }
942
943 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
944 actual_levels
945 .iter()
946 .zip(expected_levels)
947 .for_each(|(actual_level, expected_level)| {
948 assert_eq!(actual_level.len(), expected_level.len());
949 actual_level
950 .iter()
951 .zip(expected_level.iter())
952 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
953 });
954 }
955
956 #[tokio::test]
957 async fn valid_config_with_image() {
958 let network_config = generate(true, None).unwrap();
959 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
960 let caps = ProviderCapabilities {
961 requires_image: true,
962 has_resources: false,
963 prefix_with_full_path: false,
964 use_default_ports_in_cmd: false,
965 };
966
967 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
968 assert!(valid.is_ok())
969 }
970
971 #[tokio::test]
972 async fn invalid_config_without_image() {
973 let network_config = generate(false, None).unwrap();
974 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
975 let caps = ProviderCapabilities {
976 requires_image: true,
977 has_resources: false,
978 prefix_with_full_path: false,
979 use_default_ports_in_cmd: false,
980 };
981
982 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
983 assert!(valid.is_err())
984 }
985
986 #[tokio::test]
987 async fn invalid_config_missing_cmd() {
988 let network_config = generate(false, Some("other")).unwrap();
989 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
990 let caps = ProviderCapabilities {
991 requires_image: false,
992 has_resources: false,
993 prefix_with_full_path: false,
994 use_default_ports_in_cmd: false,
995 };
996
997 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
998 assert!(valid.is_err())
999 }
1000
1001 #[tokio::test]
1002 async fn valid_config_present_cmd() {
1003 let network_config = generate(false, Some("cargo")).unwrap();
1004 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1005 let caps = ProviderCapabilities {
1006 requires_image: false,
1007 has_resources: false,
1008 prefix_with_full_path: false,
1009 use_default_ports_in_cmd: false,
1010 };
1011
1012 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1013 println!("{valid:?}");
1014 assert!(valid.is_ok())
1015 }
1016
1017 #[tokio::test]
1018 async fn default_spawn_concurrency() {
1019 let _g = ENV_MUTEX.lock().await;
1020 set_env(None);
1021 let network_config = generate(false, Some("cargo")).unwrap();
1022 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1023 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1024 assert_eq!(concurrency, 100);
1025 }
1026
1027 #[tokio::test]
1028 async fn set_spawn_concurrency() {
1029 let _g = ENV_MUTEX.lock().await;
1030 set_env(None);
1031
1032 let network_config = generate(false, Some("cargo")).unwrap();
1033 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1034
1035 let global_settings = GlobalSettingsBuilder::new()
1036 .with_spawn_concurrency(4)
1037 .build()
1038 .unwrap();
1039
1040 spec.set_global_settings(global_settings);
1041 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1042 assert_eq!(concurrency, 4);
1043 assert!(!limited);
1044 }
1045
1046 #[tokio::test]
1047 async fn set_spawn_concurrency_but_limited() {
1048 let _g = ENV_MUTEX.lock().await;
1049 set_env(None);
1050
1051 let network_config = generate(false, Some("cargo")).unwrap();
1052 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1053
1054 let global_settings = GlobalSettingsBuilder::new()
1055 .with_spawn_concurrency(4)
1056 .build()
1057 .unwrap();
1058
1059 spec.set_global_settings(global_settings);
1060 let node = spec.relaychain.nodes.first_mut().unwrap();
1061 node.args
1062 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1063 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1064 assert_eq!(concurrency, 1);
1065 assert!(limited);
1066 }
1067
1068 #[tokio::test]
1069 async fn set_spawn_concurrency_from_env() {
1070 let _g = ENV_MUTEX.lock().await;
1071 set_env(Some(10));
1072
1073 let network_config = generate(false, Some("cargo")).unwrap();
1074 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1075 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1076 assert_eq!(concurrency, 10);
1077 assert!(!limited);
1078 }
1079
1080 #[tokio::test]
1081 async fn set_spawn_concurrency_from_env_but_limited() {
1082 let _g = ENV_MUTEX.lock().await;
1083 set_env(Some(12));
1084
1085 let network_config = generate(false, Some("cargo")).unwrap();
1086 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1087 let node = spec.relaychain.nodes.first_mut().unwrap();
1088 node.args
1089 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1090 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1091 assert_eq!(concurrency, 1);
1092 assert!(limited);
1093 }
1094
1095 #[test]
1096 fn dependency_levels_among_should_work() {
1097 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1099
1100 let alice = get_node_with_dependencies("alice", None);
1102 let nodes = [&alice];
1103
1104 let levels = dependency_levels_among(&nodes).unwrap();
1105 let expected = vec![vec!["alice"]];
1106
1107 verify_levels(levels, expected);
1108
1109 let alice = get_node_with_dependencies("alice", None);
1111 let bob = get_node_with_dependencies("bob", None);
1112 let nodes = [&alice, &bob];
1113
1114 let levels = dependency_levels_among(&nodes).unwrap();
1115 let expected = vec![vec!["alice", "bob"]];
1116
1117 verify_levels(levels, expected);
1118
1119 let alice = get_node_with_dependencies("alice", None);
1121 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1122 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1123 let nodes = [&alice, &bob, &charlie];
1124
1125 let levels = dependency_levels_among(&nodes).unwrap();
1126 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1127
1128 verify_levels(levels, expected);
1129
1130 let alice = get_node_with_dependencies("alice", None);
1134 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1135 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1136 let nodes = [&alice, &bob, &charlie];
1137
1138 let levels = dependency_levels_among(&nodes).unwrap();
1139 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1140
1141 verify_levels(levels, expected);
1142
1143 let alice = get_node_with_dependencies("alice", None);
1147 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1148 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1149 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1150 let nodes = [&alice, &bob, &charlie, &dave];
1151
1152 let levels = dependency_levels_among(&nodes).unwrap();
1153 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1154
1155 verify_levels(levels, expected);
1156 }
1157
1158 #[test]
1159 fn dependency_levels_among_should_detect_cycles() {
1160 let mut alice = get_node_with_dependencies("alice", None);
1161 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1162 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1163
1164 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1165 }
1166}