1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13mod utils;
14
15use std::{
16 collections::{HashMap, HashSet, VecDeque},
17 env,
18 net::IpAddr,
19 path::{Path, PathBuf},
20 time::{Duration, SystemTime},
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31 types::{ProviderCapabilities, TransferedFile},
32 DynNamespace, DynProvider,
33};
34use serde_json::json;
35use support::{
36 constants::{
37 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
38 THIS_IS_A_BUG,
39 },
40 fs::{FileSystem, FileSystemError},
41 replacer::{get_tokens_to_replace, has_tokens},
42};
43use tokio::time::timeout;
44use tracing::{debug, info, trace};
45
46use crate::{
47 network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
48 shared::types::RegisterParachainOptions,
49 spawner::SpawnNodeCtx,
50 utils::write_zombie_json,
51};
52pub struct Orchestrator<T>
53where
54 T: FileSystem + Sync + Send,
55{
56 filesystem: T,
57 provider: DynProvider,
58}
59
60impl<T> Orchestrator<T>
61where
62 T: FileSystem + Sync + Send + Clone,
63{
64 pub fn new(filesystem: T, provider: DynProvider) -> Self {
65 Self {
66 filesystem,
67 provider,
68 }
69 }
70
71 pub async fn spawn(
72 &self,
73 network_config: NetworkConfig,
74 ) -> Result<Network<T>, OrchestratorError> {
75 let global_timeout = network_config.global_settings().network_spawn_timeout();
76 let network_spec = NetworkSpec::from_config(&network_config).await?;
77
78 let res = timeout(
79 Duration::from_secs(global_timeout.into()),
80 self.spawn_inner(network_spec),
81 )
82 .await
83 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
84 res?
85 }
86
87 pub async fn spawn_from_spec(
88 &self,
89 network_spec: NetworkSpec,
90 ) -> Result<Network<T>, OrchestratorError> {
91 let global_timeout = network_spec.global_settings.network_spawn_timeout();
92 let res = timeout(
93 Duration::from_secs(global_timeout as u64),
94 self.spawn_inner(network_spec),
95 )
96 .await
97 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
98 res?
99 }
100
101 pub async fn attach_to_live(
102 &self,
103 zombie_json_path: &Path,
104 ) -> Result<Network<T>, OrchestratorError> {
105 info!("attaching to live network...");
106 info!("reading zombie.json from {:?}", zombie_json_path);
107
108 let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
109 let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
110
111 info!("recreating namespace...");
112 let ns: DynNamespace = self
113 .provider
114 .create_namespace_from_json(&zombie_json)
115 .await?;
116
117 info!("recreating relaychain...");
118 let (relay, initial_spec) =
119 recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
120 let relay_nodes = relay.nodes.clone();
121
122 let mut network =
123 Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
124
125 for node in relay_nodes {
126 network.insert_node(node);
127 }
128
129 info!("recreating parachains...");
130 let parachains_map =
131 recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
132 let para_nodes = parachains_map
133 .values()
134 .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
135 .collect::<Vec<NetworkNode>>();
136
137 network.set_parachains(parachains_map);
138 for node in para_nodes {
139 network.insert_node(node);
140 }
141
142 Ok(network)
143 }
144
145 async fn spawn_inner(
146 &self,
147 mut network_spec: NetworkSpec,
148 ) -> Result<Network<T>, OrchestratorError> {
149 debug!(network_spec = ?network_spec,"Network spec to spawn");
151
152 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
154 .map_err(|err| {
155 OrchestratorError::InvalidConfigForProvider(
156 self.provider.name().into(),
157 err.to_string(),
158 )
159 })?;
160
161 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
163 self.provider
164 .create_namespace_with_base_dir(base_dir)
165 .await?
166 } else {
167 self.provider.create_namespace().await?
168 };
169
170 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
172
173 let start_time = SystemTime::now();
174 info!("🧰 ns: {}", ns.name());
175 info!("🧰 base_dir: {:?}", ns.base_dir());
176 info!("🕰 start time: {:?}", start_time);
177 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
178
179 network_spec
180 .populate_nodes_available_args(ns.clone())
181 .await?;
182
183 let base_dir = ns.base_dir().to_string_lossy();
184 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
185 network_spec
187 .relaychain
188 .chain_spec
189 .build(&ns, &scoped_fs)
190 .await?;
191
192 debug!("relaychain spec built!");
193 let relay_chain_id = network_spec
195 .relaychain
196 .chain_spec
197 .read_chain_id(&scoped_fs)
198 .await?;
199
200 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
201 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
202 network_spec
203 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
204 .await?;
205
206 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
208 Vec<&ParachainSpec>,
209 Vec<&ParachainSpec>,
210 ) = network_spec
211 .parachains
212 .iter()
213 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
214 .partition(|para| {
215 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
216 });
217
218 let mut para_artifacts = vec![];
219 for para in para_to_register_in_genesis {
220 let genesis_config = para.get_genesis_config()?;
221 para_artifacts.push(genesis_config)
222 }
223
224 network_spec
226 .relaychain
227 .chain_spec
228 .customize_relay(
229 &network_spec.relaychain,
230 &network_spec.hrmp_channels,
231 para_artifacts,
232 &scoped_fs,
233 )
234 .await?;
235
236 network_spec
238 .relaychain
239 .chain_spec
240 .build_raw(&ns, &scoped_fs, None)
241 .await?;
242
243 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
245 network_spec
246 .relaychain
247 .chain_spec
248 .override_code(&scoped_fs, wasm_override)
249 .await?;
250 }
251
252 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
254 network_spec
255 .relaychain
256 .chain_spec
257 .override_raw_spec(&scoped_fs, raw_spec_override)
258 .await?;
259 }
260
261 let (bootnodes, relaynodes) =
262 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
263
264 let mut ctx = SpawnNodeCtx {
266 chain_id: &relay_chain_id,
267 parachain_id: None,
268 chain: relay_chain_name.as_str(),
269 role: ZombieRole::Node,
270 ns: &ns,
271 scoped_fs: &scoped_fs,
272 parachain: None,
273 bootnodes_addr: &vec![],
274 wait_ready: false,
275 nodes_by_name: json!({}),
276 global_settings: &network_spec.global_settings,
277 };
278
279 let global_files_to_inject = vec![TransferedFile::new(
280 PathBuf::from(format!(
281 "{}/{relay_chain_name}.json",
282 ns.base_dir().to_string_lossy()
283 )),
284 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
285 )];
286
287 let r = Relaychain::new(
288 relay_chain_name.to_string(),
289 relay_chain_id.clone(),
290 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
291 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
292 )?),
293 );
294 let mut network =
295 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
296
297 let mut node_ws_url: String = "".to_string();
299
300 let mut bootnodes_addr: Vec<String> = vec![];
302
303 for level in dependency_levels_among(&bootnodes)? {
304 let mut running_nodes_per_level = vec![];
305 for chunk in level.chunks(spawn_concurrency) {
306 let spawning_tasks = chunk
307 .iter()
308 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
309
310 for node in futures::future::try_join_all(spawning_tasks).await? {
311 let bootnode_multiaddr = node.multiaddr();
312
313 bootnodes_addr.push(bootnode_multiaddr.to_string());
314
315 if node_ws_url.is_empty() {
317 node_ws_url.clone_from(&node.ws_uri)
318 }
319
320 running_nodes_per_level.push(node);
321 }
322 }
323 info!(
324 "🕰 waiting for level: {:?} to be up...",
325 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
326 );
327
328 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
330 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
331 });
332
333 let _ = futures::future::try_join_all(waiting_tasks).await?;
334
335 for node in running_nodes_per_level {
336 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
338 network.add_running_node(node, None).await;
339 }
340 }
341
342 network_spec
344 .relaychain
345 .chain_spec
346 .add_bootnodes(&scoped_fs, &bootnodes_addr)
347 .await?;
348
349 ctx.bootnodes_addr = &bootnodes_addr;
350
351 for level in dependency_levels_among(&relaynodes)? {
352 let mut running_nodes_per_level = vec![];
353 for chunk in level.chunks(spawn_concurrency) {
354 let spawning_tasks = chunk
355 .iter()
356 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
357
358 for node in futures::future::try_join_all(spawning_tasks).await? {
359 running_nodes_per_level.push(node);
360 }
361 }
362 info!(
363 "🕰 waiting for level: {:?} to be up...",
364 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
365 );
366
367 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
369 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
370 });
371
372 let _ = futures::future::try_join_all(waiting_tasks).await?;
373
374 for node in running_nodes_per_level {
375 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
376 network.add_running_node(node, None).await;
377 }
378 }
379
380 for para in network_spec.parachains.iter() {
382 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
384 let parachain_id = parachain.chain_id.clone();
385
386 let (bootnodes, collators) =
387 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
388
389 let mut ctx_para = SpawnNodeCtx {
391 parachain: Some(para),
392 parachain_id: parachain_id.as_deref(),
393 role: if para.is_cumulus_based {
394 ZombieRole::CumulusCollator
395 } else {
396 ZombieRole::Collator
397 },
398 bootnodes_addr: &vec![],
399 ..ctx.clone()
400 };
401
402 let mut bootnodes_addr: Vec<String> = vec![];
404 let mut running_nodes: Vec<NetworkNode> = vec![];
405
406 for level in dependency_levels_among(&bootnodes)? {
407 let mut running_nodes_per_level = vec![];
408 for chunk in level.chunks(spawn_concurrency) {
409 let spawning_tasks = chunk.iter().map(|node| {
410 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
411 });
412
413 for node in futures::future::try_join_all(spawning_tasks).await? {
414 let bootnode_multiaddr = node.multiaddr();
415
416 bootnodes_addr.push(bootnode_multiaddr.to_string());
417
418 running_nodes_per_level.push(node);
419 }
420 }
421 info!(
422 "🕰 waiting for level: {:?} to be up...",
423 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
424 );
425
426 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
428 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
429 });
430
431 let _ = futures::future::try_join_all(waiting_tasks).await?;
432
433 for node in running_nodes_per_level {
434 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
435 running_nodes.push(node);
436 }
437 }
438
439 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
440 para_chain_spec
441 .add_bootnodes(&scoped_fs, &bootnodes_addr)
442 .await?;
443 }
444
445 ctx_para.bootnodes_addr = &bootnodes_addr;
446
447 for level in dependency_levels_among(&collators)? {
449 let mut running_nodes_per_level = vec![];
450 for chunk in level.chunks(spawn_concurrency) {
451 let spawning_tasks = chunk.iter().map(|node| {
452 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
453 });
454
455 for node in futures::future::try_join_all(spawning_tasks).await? {
456 running_nodes_per_level.push(node);
457 }
458 }
459 info!(
460 "🕰 waiting for level: {:?} to be up...",
461 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
462 );
463
464 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
466 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
467 });
468
469 let _ = futures::future::try_join_all(waiting_tasks).await?;
470
471 for node in running_nodes_per_level {
472 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
473 running_nodes.push(node);
474 }
475 }
476
477 let running_para_id = parachain.para_id;
478 network.add_para(parachain);
479 for node in running_nodes {
480 network.add_running_node(node, Some(running_para_id)).await;
481 }
482 }
483
484 for para in para_to_register_with_extrinsic {
492 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
493 id: para.id,
494 wasm_path: para
496 .genesis_wasm
497 .artifact_path()
498 .ok_or(OrchestratorError::InvariantError(
499 "artifact path for wasm must be set at this point",
500 ))?
501 .to_path_buf(),
502 state_path: para
503 .genesis_state
504 .artifact_path()
505 .ok_or(OrchestratorError::InvariantError(
506 "artifact path for state must be set at this point",
507 ))?
508 .to_path_buf(),
509 node_ws_url: node_ws_url.clone(),
510 onboard_as_para: para.onboard_as_parachain,
511 seed: None, finalization: false,
513 };
514
515 Parachain::register(register_para_options, &scoped_fs).await?;
516 }
517
518 network.set_start_time_ts(start_time);
519
520 write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
521
522 if network_spec.global_settings.tear_down_on_failure() {
523 network.spawn_watching_task();
524 }
525
526 Ok(network)
527 }
528}
529
530async fn recreate_network_nodes_from_json(
533 nodes_json: &serde_json::Value,
534 ns: DynNamespace,
535 provider_name: &str,
536) -> Result<Vec<NetworkNode>, OrchestratorError> {
537 let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
538
539 let mut nodes = Vec::with_capacity(raw_nodes.len());
540 for raw in raw_nodes {
541 let provider_tag = raw
543 .inner
544 .get("provider_tag")
545 .and_then(|v| v.as_str())
546 .ok_or_else(|| {
547 OrchestratorError::InvalidConfig("Missing `provider_tag` in inner node JSON".into())
548 })?;
549
550 if provider_tag != provider_name {
551 return Err(OrchestratorError::InvalidConfigForProvider(
552 provider_name.to_string(),
553 provider_tag.to_string(),
554 ));
555 }
556 let inner = ns.spawn_node_from_json(&raw.inner).await?;
557 let relay_node = NetworkNode::new(
558 raw.name,
559 raw.ws_uri,
560 raw.prometheus_uri,
561 raw.multiaddr,
562 raw.spec,
563 inner,
564 );
565 nodes.push(relay_node);
566 }
567
568 Ok(nodes)
569}
570
571async fn recreate_relaychain_from_json(
572 zombie_json: &serde_json::Value,
573 ns: DynNamespace,
574 provider_name: &str,
575) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
576 let relay_json = zombie_json
577 .get("relay")
578 .ok_or(OrchestratorError::InvalidConfig(
579 "Missing `relay` field in zombie.json".into(),
580 ))?
581 .clone();
582
583 let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
584
585 let initial_spec: NetworkSpec = serde_json::from_value(
586 zombie_json
587 .get("initial_spec")
588 .ok_or(OrchestratorError::InvalidConfig(
589 "Missing `initial_spec` field in zombie.json".into(),
590 ))?
591 .clone(),
592 )?;
593
594 let nodes =
596 recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
597 relay_raw.inner.nodes = nodes;
598
599 Ok((relay_raw.inner, initial_spec))
600}
601
602async fn recreate_parachains_from_json(
603 zombie_json: &serde_json::Value,
604 ns: DynNamespace,
605 provider_name: &str,
606) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
607 let paras_json = zombie_json
608 .get("parachains")
609 .ok_or(OrchestratorError::InvalidConfig(
610 "Missing `parachains` field in zombie.json".into(),
611 ))?
612 .clone();
613
614 let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
615
616 let mut parachains_map = HashMap::new();
617
618 for (id, parachain_entries) in raw_paras {
619 let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
620
621 for raw_para in parachain_entries {
622 let mut para = raw_para.inner;
623 para.collators =
624 recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
625 .await?;
626 parsed_vec.push(para);
627 }
628
629 parachains_map.insert(id, parsed_vec);
630 }
631
632 Ok(parachains_map)
633}
634
635fn split_nodes_by_bootnodes(
638 nodes: &[NodeSpec],
639 no_default_bootnodes: bool,
640) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
641 let mut bootnodes = vec![];
643 let mut other_nodes = vec![];
644 nodes.iter().for_each(|node| {
645 if node.is_bootnode {
646 bootnodes.push(node)
647 } else {
648 other_nodes.push(node)
649 }
650 });
651
652 if bootnodes.is_empty() && !no_default_bootnodes {
653 bootnodes.push(other_nodes.remove(0))
654 }
655
656 (bootnodes, other_nodes)
657}
658
659fn generate_bootnode_addr(
661 node: &NetworkNode,
662 ip: &IpAddr,
663 port: u16,
664) -> Result<String, GeneratorError> {
665 generators::generate_node_bootnode_addr(
666 &node.spec.peer_id,
667 ip,
668 port,
669 node.inner.args().as_ref(),
670 &node.spec.p2p_cert_hash,
671 )
672}
673fn validate_spec_with_provider_capabilities(
675 network_spec: &NetworkSpec,
676 capabilities: &ProviderCapabilities,
677) -> Result<(), anyhow::Error> {
678 let mut errs: Vec<String> = vec![];
679
680 if capabilities.requires_image {
681 if network_spec.relaychain.default_image.is_none() {
683 let nodes = &network_spec.relaychain.nodes;
685 if nodes.iter().any(|node| node.image.is_none()) {
686 errs.push(String::from(
687 "Missing image for node, and not default is set at relaychain",
688 ));
689 }
690 };
691
692 for para in &network_spec.parachains {
694 if para.default_image.is_none() {
695 let nodes = ¶.collators;
696 if nodes.iter().any(|node| node.image.is_none()) {
697 errs.push(format!(
698 "Missing image for node, and not default is set at parachain {}",
699 para.id
700 ));
701 }
702 }
703 }
704 } else {
705 let mut cmds: HashSet<&str> = Default::default();
708 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
709 cmds.insert(cmd.as_str());
710 }
711 for node in network_spec.relaychain().nodes.iter() {
712 cmds.insert(node.command());
713 }
714
715 for para in &network_spec.parachains {
717 if let Some(cmd) = para.default_command.as_ref() {
718 cmds.insert(cmd.as_str());
719 }
720
721 for node in para.collators.iter() {
722 cmds.insert(node.command());
723 }
724 }
725
726 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
729 let parts: Vec<_> = path.split(":").collect();
730 for cmd in cmds {
731 let missing = if cmd.contains('/') {
732 trace!("checking {cmd}");
733 if std::fs::metadata(cmd).is_err() {
734 true
735 } else {
736 info!("🔎 We will use the full path {cmd} to spawn nodes.");
737 false
738 }
739 } else {
740 !parts.iter().any(|part| {
742 let path_to = format!("{part}/{cmd}");
743 trace!("checking {path_to}");
744 let check_result = std::fs::metadata(&path_to);
745 trace!("result {:?}", check_result);
746 if check_result.is_ok() {
747 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
748 true
749 } else {
750 false
751 }
752 })
753 };
754
755 if missing {
756 errs.push(help_msg(cmd));
757 }
758 }
759 }
760
761 if !errs.is_empty() {
762 let msg = errs.join("\n");
763 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
764 }
765
766 Ok(())
767}
768
769fn help_msg(cmd: &str) -> String {
770 match cmd {
771 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
772 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
773 },
774 "polkadot" => {
775 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
776 },
777 "polkadot-parachain" => {
778 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
779 },
780 _ => {
781 format!("Missing binary {cmd}, please compile it.")
782 },
783 }
784}
785
786fn spawn_concurrency_from_env() -> Option<usize> {
788 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
789 concurrency.parse::<usize>().ok()
790 } else {
791 None
792 }
793}
794
795fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
796 let desired_spawn_concurrency = match (
797 spawn_concurrency_from_env(),
798 spec.global_settings.spawn_concurrency(),
799 ) {
800 (Some(n), _) => Some(n),
801 (None, Some(n)) => Some(n),
802 _ => None,
803 };
804
805 let (spawn_concurrency, limited_by_tokens) =
806 if let Some(spawn_concurrency) = desired_spawn_concurrency {
807 if spawn_concurrency == 1 {
808 (1, false)
809 } else if has_tokens(&serde_json::to_string(spec)?) {
810 (1, true)
811 } else {
812 (spawn_concurrency, false)
813 }
814 } else {
815 if has_tokens(&serde_json::to_string(spec)?) {
817 (1, true)
818 } else {
819 (100, false)
821 }
822 };
823
824 Ok((spawn_concurrency, limited_by_tokens))
825}
826
827fn dependency_levels_among<'a>(
832 nodes: &'a [&'a NodeSpec],
833) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
834 let by_name = nodes
835 .iter()
836 .map(|n| (n.name.as_str(), *n))
837 .collect::<HashMap<_, _>>();
838
839 let mut graph = HashMap::with_capacity(nodes.len());
840 let mut indegree = HashMap::with_capacity(nodes.len());
841
842 for node in nodes {
843 graph.insert(node.name.as_str(), Vec::new());
844 indegree.insert(node.name.as_str(), 0);
845 }
846
847 for &node in nodes {
849 if let Ok(args_json) = serde_json::to_string(&node.args) {
850 let unique_deps = get_tokens_to_replace(&args_json)
852 .into_iter()
853 .filter(|dep| dep != &node.name)
854 .filter_map(|dep| by_name.get(dep.as_str()))
855 .map(|&dep_node| dep_node.name.as_str())
856 .collect::<HashSet<_>>();
857
858 for dep_name in unique_deps {
859 graph
860 .get_mut(dep_name)
861 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
862 .push(node);
863 *indegree
864 .get_mut(node.name.as_str())
865 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
866 }
867 }
868 }
869
870 let mut queue = nodes
872 .iter()
873 .filter(|n| {
874 *indegree
875 .get(n.name.as_str())
876 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
877 == 0
878 })
879 .copied()
880 .collect::<VecDeque<_>>();
881
882 let mut processed_count = 0;
883 let mut levels = Vec::new();
884
885 while !queue.is_empty() {
887 let level_size = queue.len();
888 let mut current_level = Vec::with_capacity(level_size);
889
890 for _ in 0..level_size {
891 let n = queue
892 .pop_front()
893 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
894 current_level.push(n);
895 processed_count += 1;
896
897 for &neighbour in graph
898 .get(n.name.as_str())
899 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
900 {
901 let neighbour_indegree = indegree
902 .get_mut(neighbour.name.as_str())
903 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
904 *neighbour_indegree -= 1;
905
906 if *neighbour_indegree == 0 {
907 queue.push_back(neighbour);
908 }
909 }
910 }
911
912 current_level.sort_by_key(|n| &n.name);
913 levels.push(current_level);
914 }
915
916 if processed_count != nodes.len() {
918 return Err(OrchestratorError::InvalidConfig(
919 "Tokens have cyclical dependencies".to_string(),
920 ));
921 }
922
923 Ok(levels)
924}
925
926#[derive(Clone, Debug)]
934pub struct ScopedFilesystem<'a, FS: FileSystem> {
935 fs: &'a FS,
936 base_dir: &'a str,
937}
938
939impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
940 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
941 Self { fs, base_dir }
942 }
943
944 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
945 for file in files {
946 let full_remote_path = PathBuf::from(format!(
947 "{}/{}",
948 self.base_dir,
949 file.remote_path.to_string_lossy()
950 ));
951 trace!("coping file: {file}");
952 self.fs
953 .copy(file.local_path.as_path(), full_remote_path)
954 .await?;
955 }
956 Ok(())
957 }
958
959 async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
960 let file = file.as_ref();
961
962 let full_path = if file.is_absolute() {
963 file.to_owned()
964 } else {
965 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
966 };
967 let content = self.fs.read(full_path).await?;
968 Ok(content)
969 }
970
971 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
972 let file = file.as_ref();
973
974 let full_path = if file.is_absolute() {
975 file.to_owned()
976 } else {
977 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
978 };
979 let content = self.fs.read_to_string(full_path).await?;
980 Ok(content)
981 }
982
983 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
984 let path = PathBuf::from(format!(
985 "{}/{}",
986 self.base_dir,
987 path.as_ref().to_string_lossy()
988 ));
989 self.fs.create_dir(path).await
990 }
991
992 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
993 let path = PathBuf::from(format!(
994 "{}/{}",
995 self.base_dir,
996 path.as_ref().to_string_lossy()
997 ));
998 self.fs.create_dir_all(path).await
999 }
1000
1001 async fn write(
1002 &self,
1003 path: impl AsRef<Path>,
1004 contents: impl AsRef<[u8]> + Send,
1005 ) -> Result<(), FileSystemError> {
1006 let path = path.as_ref();
1007
1008 let full_path = if path.is_absolute() {
1009 path.to_owned()
1010 } else {
1011 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1012 };
1013
1014 self.fs.write(full_path, contents).await
1015 }
1016
1017 fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1019 let path = path.as_ref();
1020
1021 let full_path = if path.is_absolute() {
1022 path.to_owned()
1023 } else {
1024 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1025 };
1026
1027 full_path
1028 }
1029
1030 fn base_dir(&self) -> &str {
1032 self.base_dir
1033 }
1034}
1035
1036#[derive(Clone, Debug)]
1037pub enum ZombieRole {
1038 Temp,
1039 Node,
1040 Bootnode,
1041 Collator,
1042 CumulusCollator,
1043 Companion,
1044}
1045
1046pub use network::{AddCollatorOptions, AddNodeOptions};
1048pub use network_helper::metrics;
1049pub use sc_chain_spec;
1050
1051#[cfg(test)]
1052mod tests {
1053 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1054 use lazy_static::lazy_static;
1055 use tokio::sync::Mutex;
1056
1057 use super::*;
1058
1059 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1060 lazy_static! {
1062 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1063 }
1064
1065 fn set_env(concurrency: Option<u32>) {
1066 if let Some(value) = concurrency {
1067 env::set_var(ENV_KEY, value.to_string());
1068 } else {
1069 env::remove_var(ENV_KEY);
1070 }
1071 }
1072
1073 fn generate(
1074 with_image: bool,
1075 with_cmd: Option<&'static str>,
1076 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1077 NetworkConfigBuilder::new()
1078 .with_relaychain(|r| {
1079 let mut relay = r
1080 .with_chain("rococo-local")
1081 .with_default_command(with_cmd.unwrap_or("polkadot"));
1082 if with_image {
1083 relay = relay.with_default_image("docker.io/parity/polkadot")
1084 }
1085
1086 relay
1087 .with_validator(|node| node.with_name("alice"))
1088 .with_validator(|node| node.with_name("bob"))
1089 })
1090 .with_parachain(|p| {
1091 p.with_id(2000).cumulus_based(true).with_collator(|n| {
1092 let node = n
1093 .with_name("collator")
1094 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1095 if with_image {
1096 node.with_image("docker.io/paritypr/test-parachain")
1097 } else {
1098 node
1099 }
1100 })
1101 })
1102 .build()
1103 }
1104
1105 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1106 let mut spec = NodeSpec {
1107 name: name.to_string(),
1108 ..Default::default()
1109 };
1110 if let Some(dependencies) = dependencies {
1111 for node in dependencies {
1112 spec.args.push(
1113 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1114 .as_str()
1115 .into(),
1116 );
1117 }
1118 }
1119 spec
1120 }
1121
1122 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1123 actual_levels
1124 .iter()
1125 .zip(expected_levels)
1126 .for_each(|(actual_level, expected_level)| {
1127 assert_eq!(actual_level.len(), expected_level.len());
1128 actual_level
1129 .iter()
1130 .zip(expected_level.iter())
1131 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1132 });
1133 }
1134
1135 #[tokio::test]
1136 async fn valid_config_with_image() {
1137 let network_config = generate(true, None).unwrap();
1138 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1139 let caps = ProviderCapabilities {
1140 requires_image: true,
1141 has_resources: false,
1142 prefix_with_full_path: false,
1143 use_default_ports_in_cmd: false,
1144 };
1145
1146 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1147 assert!(valid.is_ok())
1148 }
1149
1150 #[tokio::test]
1151 async fn invalid_config_without_image() {
1152 let network_config = generate(false, None).unwrap();
1153 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1154 let caps = ProviderCapabilities {
1155 requires_image: true,
1156 has_resources: false,
1157 prefix_with_full_path: false,
1158 use_default_ports_in_cmd: false,
1159 };
1160
1161 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1162 assert!(valid.is_err())
1163 }
1164
1165 #[tokio::test]
1166 async fn invalid_config_missing_cmd() {
1167 let network_config = generate(false, Some("other")).unwrap();
1168 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1169 let caps = ProviderCapabilities {
1170 requires_image: false,
1171 has_resources: false,
1172 prefix_with_full_path: false,
1173 use_default_ports_in_cmd: false,
1174 };
1175
1176 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1177 assert!(valid.is_err())
1178 }
1179
1180 #[tokio::test]
1181 async fn valid_config_present_cmd() {
1182 let network_config = generate(false, Some("cargo")).unwrap();
1183 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1184 let caps = ProviderCapabilities {
1185 requires_image: false,
1186 has_resources: false,
1187 prefix_with_full_path: false,
1188 use_default_ports_in_cmd: false,
1189 };
1190
1191 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1192 println!("{valid:?}");
1193 assert!(valid.is_ok())
1194 }
1195
1196 #[tokio::test]
1197 async fn default_spawn_concurrency() {
1198 let _g = ENV_MUTEX.lock().await;
1199 set_env(None);
1200 let network_config = generate(false, Some("cargo")).unwrap();
1201 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1202 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1203 assert_eq!(concurrency, 100);
1204 }
1205
1206 #[tokio::test]
1207 async fn set_spawn_concurrency() {
1208 let _g = ENV_MUTEX.lock().await;
1209 set_env(None);
1210
1211 let network_config = generate(false, Some("cargo")).unwrap();
1212 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1213
1214 let global_settings = GlobalSettingsBuilder::new()
1215 .with_spawn_concurrency(4)
1216 .build()
1217 .unwrap();
1218
1219 spec.set_global_settings(global_settings);
1220 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1221 assert_eq!(concurrency, 4);
1222 assert!(!limited);
1223 }
1224
1225 #[tokio::test]
1226 async fn set_spawn_concurrency_but_limited() {
1227 let _g = ENV_MUTEX.lock().await;
1228 set_env(None);
1229
1230 let network_config = generate(false, Some("cargo")).unwrap();
1231 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1232
1233 let global_settings = GlobalSettingsBuilder::new()
1234 .with_spawn_concurrency(4)
1235 .build()
1236 .unwrap();
1237
1238 spec.set_global_settings(global_settings);
1239 let node = spec.relaychain.nodes.first_mut().unwrap();
1240 node.args
1241 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1242 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1243 assert_eq!(concurrency, 1);
1244 assert!(limited);
1245 }
1246
1247 #[tokio::test]
1248 async fn set_spawn_concurrency_from_env() {
1249 let _g = ENV_MUTEX.lock().await;
1250 set_env(Some(10));
1251
1252 let network_config = generate(false, Some("cargo")).unwrap();
1253 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1254 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1255 assert_eq!(concurrency, 10);
1256 assert!(!limited);
1257 }
1258
1259 #[tokio::test]
1260 async fn set_spawn_concurrency_from_env_but_limited() {
1261 let _g = ENV_MUTEX.lock().await;
1262 set_env(Some(12));
1263
1264 let network_config = generate(false, Some("cargo")).unwrap();
1265 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1266 let node = spec.relaychain.nodes.first_mut().unwrap();
1267 node.args
1268 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1269 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1270 assert_eq!(concurrency, 1);
1271 assert!(limited);
1272 }
1273
1274 #[test]
1275 fn dependency_levels_among_should_work() {
1276 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1278
1279 let alice = get_node_with_dependencies("alice", None);
1281 let nodes = [&alice];
1282
1283 let levels = dependency_levels_among(&nodes).unwrap();
1284 let expected = vec![vec!["alice"]];
1285
1286 verify_levels(levels, expected);
1287
1288 let alice = get_node_with_dependencies("alice", None);
1290 let bob = get_node_with_dependencies("bob", None);
1291 let nodes = [&alice, &bob];
1292
1293 let levels = dependency_levels_among(&nodes).unwrap();
1294 let expected = vec![vec!["alice", "bob"]];
1295
1296 verify_levels(levels, expected);
1297
1298 let alice = get_node_with_dependencies("alice", None);
1300 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1301 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1302 let nodes = [&alice, &bob, &charlie];
1303
1304 let levels = dependency_levels_among(&nodes).unwrap();
1305 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1306
1307 verify_levels(levels, expected);
1308
1309 let alice = get_node_with_dependencies("alice", None);
1313 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1314 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1315 let nodes = [&alice, &bob, &charlie];
1316
1317 let levels = dependency_levels_among(&nodes).unwrap();
1318 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1319
1320 verify_levels(levels, expected);
1321
1322 let alice = get_node_with_dependencies("alice", None);
1326 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1327 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1328 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1329 let nodes = [&alice, &bob, &charlie, &dave];
1330
1331 let levels = dependency_levels_among(&nodes).unwrap();
1332 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1333
1334 verify_levels(levels, expected);
1335 }
1336
1337 #[test]
1338 fn dependency_levels_among_should_detect_cycles() {
1339 let mut alice = get_node_with_dependencies("alice", None);
1340 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1341 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1342
1343 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1344 }
1345}