1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11pub mod shared;
12mod spawner;
13mod utils;
14
15use std::{
16 collections::{HashMap, HashSet, VecDeque},
17 env,
18 net::IpAddr,
19 path::{Path, PathBuf},
20 time::{Duration, SystemTime},
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31 types::{ProviderCapabilities, TransferedFile},
32 DynNamespace, DynProvider,
33};
34use serde_json::json;
35use support::{
36 constants::{
37 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
38 THIS_IS_A_BUG,
39 },
40 fs::{FileSystem, FileSystemError},
41 replacer::{get_tokens_to_replace, has_tokens},
42};
43use tokio::time::timeout;
44use tracing::{debug, info, trace, warn};
45
46use crate::{
47 network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
48 shared::types::RegisterParachainOptions,
49 spawner::SpawnNodeCtx,
50};
51pub struct Orchestrator<T>
52where
53 T: FileSystem + Sync + Send,
54{
55 filesystem: T,
56 provider: DynProvider,
57}
58
59impl<T> Orchestrator<T>
60where
61 T: FileSystem + Sync + Send + Clone,
62{
63 pub fn new(filesystem: T, provider: DynProvider) -> Self {
64 Self {
65 filesystem,
66 provider,
67 }
68 }
69
70 pub async fn spawn(
71 &self,
72 network_config: NetworkConfig,
73 ) -> Result<Network<T>, OrchestratorError> {
74 let global_timeout = network_config.global_settings().network_spawn_timeout();
75 let network_spec = NetworkSpec::from_config(&network_config).await?;
76
77 let res = timeout(
78 Duration::from_secs(global_timeout.into()),
79 self.spawn_inner(network_spec),
80 )
81 .await
82 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
83 res?
84 }
85
86 pub async fn spawn_from_spec(
87 &self,
88 network_spec: NetworkSpec,
89 ) -> Result<Network<T>, OrchestratorError> {
90 let global_timeout = network_spec.global_settings.network_spawn_timeout();
91 let res = timeout(
92 Duration::from_secs(global_timeout as u64),
93 self.spawn_inner(network_spec),
94 )
95 .await
96 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
97 res?
98 }
99
100 pub async fn attach_to_live(
101 &self,
102 zombie_json_path: &Path,
103 ) -> Result<Network<T>, OrchestratorError> {
104 info!("attaching to live network...");
105 info!("reading zombie.json from {:?}", zombie_json_path);
106
107 let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
108 let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
109
110 info!("recreating namespace...");
111 let ns: DynNamespace = self
112 .provider
113 .create_namespace_from_json(&zombie_json)
114 .await?;
115
116 info!("recreating relaychain...");
117 let (relay, initial_spec) =
118 recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
119 let relay_nodes = relay.nodes.clone();
120
121 let mut network =
122 Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
123
124 for node in relay_nodes {
125 network.insert_node(node);
126 }
127
128 info!("recreating parachains...");
129 let parachains_map =
130 recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
131 let para_nodes = parachains_map
132 .values()
133 .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
134 .collect::<Vec<NetworkNode>>();
135
136 network.set_parachains(parachains_map);
137 for node in para_nodes {
138 network.insert_node(node);
139 }
140
141 Ok(network)
142 }
143
144 async fn spawn_inner(
145 &self,
146 mut network_spec: NetworkSpec,
147 ) -> Result<Network<T>, OrchestratorError> {
148 debug!(network_spec = ?network_spec,"Network spec to spawn");
150
151 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
153 .map_err(|err| {
154 OrchestratorError::InvalidConfigForProvider(
155 self.provider.name().into(),
156 err.to_string(),
157 )
158 })?;
159
160 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
162 self.provider
163 .create_namespace_with_base_dir(base_dir)
164 .await?
165 } else {
166 self.provider.create_namespace().await?
167 };
168
169 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
171
172 let start_time = SystemTime::now();
173 info!("🧰 ns: {}", ns.name());
174 info!("🧰 base_dir: {:?}", ns.base_dir());
175 info!("🕰 start time: {:?}", start_time);
176 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
177
178 network_spec
179 .populate_nodes_available_args(ns.clone())
180 .await?;
181
182 let base_dir = ns.base_dir().to_string_lossy();
183 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
184 network_spec
186 .relaychain
187 .chain_spec
188 .build(&ns, &scoped_fs)
189 .await?;
190
191 debug!("relaychain spec built!");
192 let relay_chain_id = network_spec
194 .relaychain
195 .chain_spec
196 .read_chain_id(&scoped_fs)
197 .await?;
198
199 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
200 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
201 network_spec
202 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
203 .await?;
204
205 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
207 Vec<&ParachainSpec>,
208 Vec<&ParachainSpec>,
209 ) = network_spec
210 .parachains
211 .iter()
212 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
213 .partition(|para| {
214 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
215 });
216
217 let mut para_artifacts = vec![];
218 for para in para_to_register_in_genesis {
219 let genesis_config = para.get_genesis_config()?;
220 para_artifacts.push(genesis_config)
221 }
222
223 network_spec
225 .relaychain
226 .chain_spec
227 .customize_relay(
228 &network_spec.relaychain,
229 &network_spec.hrmp_channels,
230 para_artifacts,
231 &scoped_fs,
232 )
233 .await?;
234
235 network_spec
237 .relaychain
238 .chain_spec
239 .build_raw(&ns, &scoped_fs, None)
240 .await?;
241
242 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
244 network_spec
245 .relaychain
246 .chain_spec
247 .override_code(&scoped_fs, wasm_override)
248 .await?;
249 }
250
251 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
253 network_spec
254 .relaychain
255 .chain_spec
256 .override_raw_spec(&scoped_fs, raw_spec_override)
257 .await?;
258 }
259
260 let (bootnodes, relaynodes) =
261 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
262
263 let mut ctx = SpawnNodeCtx {
265 chain_id: &relay_chain_id,
266 parachain_id: None,
267 chain: relay_chain_name.as_str(),
268 role: ZombieRole::Node,
269 ns: &ns,
270 scoped_fs: &scoped_fs,
271 parachain: None,
272 bootnodes_addr: &vec![],
273 wait_ready: false,
274 nodes_by_name: json!({}),
275 global_settings: &network_spec.global_settings,
276 };
277
278 let global_files_to_inject = vec![TransferedFile::new(
279 PathBuf::from(format!(
280 "{}/{relay_chain_name}.json",
281 ns.base_dir().to_string_lossy()
282 )),
283 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
284 )];
285
286 let r = Relaychain::new(
287 relay_chain_name.to_string(),
288 relay_chain_id.clone(),
289 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
290 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
291 )?),
292 );
293 let mut network =
294 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
295
296 let mut node_ws_url: String = "".to_string();
298
299 let mut bootnodes_addr: Vec<String> = vec![];
301
302 for level in dependency_levels_among(&bootnodes)? {
303 let mut running_nodes_per_level = vec![];
304 for chunk in level.chunks(spawn_concurrency) {
305 let spawning_tasks = chunk
306 .iter()
307 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
308
309 for node in futures::future::try_join_all(spawning_tasks).await? {
310 let bootnode_multiaddr = node.multiaddr();
311
312 bootnodes_addr.push(bootnode_multiaddr.to_string());
313
314 if node_ws_url.is_empty() {
316 node_ws_url.clone_from(&node.ws_uri)
317 }
318
319 running_nodes_per_level.push(node);
320 }
321 }
322 info!(
323 "🕰 waiting for level: {:?} to be up...",
324 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
325 );
326
327 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
329 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
330 });
331
332 let _ = futures::future::try_join_all(waiting_tasks).await?;
333
334 for node in running_nodes_per_level {
335 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
337 network.add_running_node(node, None).await;
338 }
339 }
340
341 network_spec
343 .relaychain
344 .chain_spec
345 .add_bootnodes(&scoped_fs, &bootnodes_addr)
346 .await?;
347
348 ctx.bootnodes_addr = &bootnodes_addr;
349
350 for level in dependency_levels_among(&relaynodes)? {
351 let mut running_nodes_per_level = vec![];
352 for chunk in level.chunks(spawn_concurrency) {
353 let spawning_tasks = chunk
354 .iter()
355 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
356
357 for node in futures::future::try_join_all(spawning_tasks).await? {
358 running_nodes_per_level.push(node);
359 }
360 }
361 info!(
362 "🕰 waiting for level: {:?} to be up...",
363 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
364 );
365
366 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
368 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
369 });
370
371 let _ = futures::future::try_join_all(waiting_tasks).await?;
372
373 for node in running_nodes_per_level {
374 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
375 network.add_running_node(node, None).await;
376 }
377 }
378
379 for para in network_spec.parachains.iter() {
381 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
383 let parachain_id = parachain.chain_id.clone();
384
385 let (bootnodes, collators) =
386 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
387
388 let mut ctx_para = SpawnNodeCtx {
390 parachain: Some(para),
391 parachain_id: parachain_id.as_deref(),
392 role: if para.is_cumulus_based {
393 ZombieRole::CumulusCollator
394 } else {
395 ZombieRole::Collator
396 },
397 bootnodes_addr: &vec![],
398 ..ctx.clone()
399 };
400
401 let mut bootnodes_addr: Vec<String> = vec![];
403 let mut running_nodes: Vec<NetworkNode> = vec![];
404
405 for level in dependency_levels_among(&bootnodes)? {
406 let mut running_nodes_per_level = vec![];
407 for chunk in level.chunks(spawn_concurrency) {
408 let spawning_tasks = chunk.iter().map(|node| {
409 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
410 });
411
412 for node in futures::future::try_join_all(spawning_tasks).await? {
413 let bootnode_multiaddr = node.multiaddr();
414
415 bootnodes_addr.push(bootnode_multiaddr.to_string());
416
417 running_nodes_per_level.push(node);
418 }
419 }
420 info!(
421 "🕰 waiting for level: {:?} to be up...",
422 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
423 );
424
425 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
427 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
428 });
429
430 let _ = futures::future::try_join_all(waiting_tasks).await?;
431
432 for node in running_nodes_per_level {
433 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
434 running_nodes.push(node);
435 }
436 }
437
438 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
439 para_chain_spec
440 .add_bootnodes(&scoped_fs, &bootnodes_addr)
441 .await?;
442 }
443
444 ctx_para.bootnodes_addr = &bootnodes_addr;
445
446 for level in dependency_levels_among(&collators)? {
448 let mut running_nodes_per_level = vec![];
449 for chunk in level.chunks(spawn_concurrency) {
450 let spawning_tasks = chunk.iter().map(|node| {
451 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
452 });
453
454 for node in futures::future::try_join_all(spawning_tasks).await? {
455 running_nodes_per_level.push(node);
456 }
457 }
458 info!(
459 "🕰 waiting for level: {:?} to be up...",
460 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
461 );
462
463 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
465 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
466 });
467
468 let _ = futures::future::try_join_all(waiting_tasks).await?;
469
470 for node in running_nodes_per_level {
471 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
472 running_nodes.push(node);
473 }
474 }
475
476 let running_para_id = parachain.para_id;
477 network.add_para(parachain);
478 for node in running_nodes {
479 network.add_running_node(node, Some(running_para_id)).await;
480 }
481 }
482
483 for para in para_to_register_with_extrinsic {
491 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
492 id: para.id,
493 wasm_path: para
495 .genesis_wasm
496 .artifact_path()
497 .ok_or(OrchestratorError::InvariantError(
498 "artifact path for wasm must be set at this point",
499 ))?
500 .to_path_buf(),
501 state_path: para
502 .genesis_state
503 .artifact_path()
504 .ok_or(OrchestratorError::InvariantError(
505 "artifact path for state must be set at this point",
506 ))?
507 .to_path_buf(),
508 node_ws_url: node_ws_url.clone(),
509 onboard_as_para: para.onboard_as_parachain,
510 seed: None, finalization: false,
512 };
513
514 Parachain::register(register_para_options, &scoped_fs).await?;
515 }
516
517 let mut zombie_json = serde_json::to_value(&network)?;
519 zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
520 zombie_json["ns"] = serde_json::value::Value::String(ns.name().to_string());
521
522 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
523 zombie_json["start_time_ts"] =
524 serde_json::value::Value::String(start_time_ts.as_millis().to_string());
525 } else {
526 warn!("⚠️ Error getting start_time timestamp");
528 }
529
530 scoped_fs
531 .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
532 .await?;
533
534 if network_spec.global_settings.tear_down_on_failure() {
535 network.spawn_watching_task();
536 }
537
538 Ok(network)
539 }
540}
541
542async fn recreate_network_nodes_from_json(
545 nodes_json: &serde_json::Value,
546 ns: DynNamespace,
547 provider_name: &str,
548) -> Result<Vec<NetworkNode>, OrchestratorError> {
549 let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
550
551 let mut nodes = Vec::with_capacity(raw_nodes.len());
552 for raw in raw_nodes {
553 let provider_tag = raw
555 .inner
556 .get("provider_tag")
557 .and_then(|v| v.as_str())
558 .ok_or_else(|| {
559 OrchestratorError::InvalidConfig("Missing `provider_tag` in inner node JSON".into())
560 })?;
561
562 if provider_tag != provider_name {
563 return Err(OrchestratorError::InvalidConfigForProvider(
564 provider_name.to_string(),
565 provider_tag.to_string(),
566 ));
567 }
568 let inner = ns.spawn_node_from_json(&raw.inner).await?;
569 let relay_node = NetworkNode::new(
570 raw.name,
571 raw.ws_uri,
572 raw.prometheus_uri,
573 raw.multiaddr,
574 raw.spec,
575 inner,
576 );
577 nodes.push(relay_node);
578 }
579
580 Ok(nodes)
581}
582
583async fn recreate_relaychain_from_json(
584 zombie_json: &serde_json::Value,
585 ns: DynNamespace,
586 provider_name: &str,
587) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
588 let relay_json = zombie_json
589 .get("relay")
590 .ok_or(OrchestratorError::InvalidConfig(
591 "Missing `relay` field in zombie.json".into(),
592 ))?
593 .clone();
594
595 let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
596
597 let initial_spec: NetworkSpec = serde_json::from_value(
598 zombie_json
599 .get("initial_spec")
600 .ok_or(OrchestratorError::InvalidConfig(
601 "Missing `initial_spec` field in zombie.json".into(),
602 ))?
603 .clone(),
604 )?;
605
606 let nodes =
608 recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
609 relay_raw.inner.nodes = nodes;
610
611 Ok((relay_raw.inner, initial_spec))
612}
613
614async fn recreate_parachains_from_json(
615 zombie_json: &serde_json::Value,
616 ns: DynNamespace,
617 provider_name: &str,
618) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
619 let paras_json = zombie_json
620 .get("parachains")
621 .ok_or(OrchestratorError::InvalidConfig(
622 "Missing `parachains` field in zombie.json".into(),
623 ))?
624 .clone();
625
626 let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
627
628 let mut parachains_map = HashMap::new();
629
630 for (id, parachain_entries) in raw_paras {
631 let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
632
633 for raw_para in parachain_entries {
634 let mut para = raw_para.inner;
635 para.collators =
636 recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
637 .await?;
638 parsed_vec.push(para);
639 }
640
641 parachains_map.insert(id, parsed_vec);
642 }
643
644 Ok(parachains_map)
645}
646
647fn split_nodes_by_bootnodes(
650 nodes: &[NodeSpec],
651 no_default_bootnodes: bool,
652) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
653 let mut bootnodes = vec![];
655 let mut other_nodes = vec![];
656 nodes.iter().for_each(|node| {
657 if node.is_bootnode {
658 bootnodes.push(node)
659 } else {
660 other_nodes.push(node)
661 }
662 });
663
664 if bootnodes.is_empty() && !no_default_bootnodes {
665 bootnodes.push(other_nodes.remove(0))
666 }
667
668 (bootnodes, other_nodes)
669}
670
671fn generate_bootnode_addr(
673 node: &NetworkNode,
674 ip: &IpAddr,
675 port: u16,
676) -> Result<String, GeneratorError> {
677 generators::generate_node_bootnode_addr(
678 &node.spec.peer_id,
679 ip,
680 port,
681 node.inner.args().as_ref(),
682 &node.spec.p2p_cert_hash,
683 )
684}
685fn validate_spec_with_provider_capabilities(
687 network_spec: &NetworkSpec,
688 capabilities: &ProviderCapabilities,
689) -> Result<(), anyhow::Error> {
690 let mut errs: Vec<String> = vec![];
691
692 if capabilities.requires_image {
693 if network_spec.relaychain.default_image.is_none() {
695 let nodes = &network_spec.relaychain.nodes;
697 if nodes.iter().any(|node| node.image.is_none()) {
698 errs.push(String::from(
699 "Missing image for node, and not default is set at relaychain",
700 ));
701 }
702 };
703
704 for para in &network_spec.parachains {
706 if para.default_image.is_none() {
707 let nodes = ¶.collators;
708 if nodes.iter().any(|node| node.image.is_none()) {
709 errs.push(format!(
710 "Missing image for node, and not default is set at parachain {}",
711 para.id
712 ));
713 }
714 }
715 }
716 } else {
717 let mut cmds: HashSet<&str> = Default::default();
720 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
721 cmds.insert(cmd.as_str());
722 }
723 for node in network_spec.relaychain().nodes.iter() {
724 cmds.insert(node.command());
725 }
726
727 for para in &network_spec.parachains {
729 if let Some(cmd) = para.default_command.as_ref() {
730 cmds.insert(cmd.as_str());
731 }
732
733 for node in para.collators.iter() {
734 cmds.insert(node.command());
735 }
736 }
737
738 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
741 let parts: Vec<_> = path.split(":").collect();
742 for cmd in cmds {
743 let missing = if cmd.contains('/') {
744 trace!("checking {cmd}");
745 if std::fs::metadata(cmd).is_err() {
746 true
747 } else {
748 info!("🔎 We will use the full path {cmd} to spawn nodes.");
749 false
750 }
751 } else {
752 !parts.iter().any(|part| {
754 let path_to = format!("{part}/{cmd}");
755 trace!("checking {path_to}");
756 let check_result = std::fs::metadata(&path_to);
757 trace!("result {:?}", check_result);
758 if check_result.is_ok() {
759 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
760 true
761 } else {
762 false
763 }
764 })
765 };
766
767 if missing {
768 errs.push(help_msg(cmd));
769 }
770 }
771 }
772
773 if !errs.is_empty() {
774 let msg = errs.join("\n");
775 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
776 }
777
778 Ok(())
779}
780
781fn help_msg(cmd: &str) -> String {
782 match cmd {
783 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
784 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
785 },
786 "polkadot" => {
787 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
788 },
789 "polkadot-parachain" => {
790 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
791 },
792 _ => {
793 format!("Missing binary {cmd}, please compile it.")
794 },
795 }
796}
797
798fn spawn_concurrency_from_env() -> Option<usize> {
800 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
801 concurrency.parse::<usize>().ok()
802 } else {
803 None
804 }
805}
806
807fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
808 let desired_spawn_concurrency = match (
809 spawn_concurrency_from_env(),
810 spec.global_settings.spawn_concurrency(),
811 ) {
812 (Some(n), _) => Some(n),
813 (None, Some(n)) => Some(n),
814 _ => None,
815 };
816
817 let (spawn_concurrency, limited_by_tokens) =
818 if let Some(spawn_concurrency) = desired_spawn_concurrency {
819 if spawn_concurrency == 1 {
820 (1, false)
821 } else if has_tokens(&serde_json::to_string(spec)?) {
822 (1, true)
823 } else {
824 (spawn_concurrency, false)
825 }
826 } else {
827 if has_tokens(&serde_json::to_string(spec)?) {
829 (1, true)
830 } else {
831 (100, false)
833 }
834 };
835
836 Ok((spawn_concurrency, limited_by_tokens))
837}
838
839fn dependency_levels_among<'a>(
844 nodes: &'a [&'a NodeSpec],
845) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
846 let by_name = nodes
847 .iter()
848 .map(|n| (n.name.as_str(), *n))
849 .collect::<HashMap<_, _>>();
850
851 let mut graph = HashMap::with_capacity(nodes.len());
852 let mut indegree = HashMap::with_capacity(nodes.len());
853
854 for node in nodes {
855 graph.insert(node.name.as_str(), Vec::new());
856 indegree.insert(node.name.as_str(), 0);
857 }
858
859 for &node in nodes {
861 if let Ok(args_json) = serde_json::to_string(&node.args) {
862 let unique_deps = get_tokens_to_replace(&args_json)
864 .into_iter()
865 .filter(|dep| dep != &node.name)
866 .filter_map(|dep| by_name.get(dep.as_str()))
867 .map(|&dep_node| dep_node.name.as_str())
868 .collect::<HashSet<_>>();
869
870 for dep_name in unique_deps {
871 graph
872 .get_mut(dep_name)
873 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
874 .push(node);
875 *indegree
876 .get_mut(node.name.as_str())
877 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
878 }
879 }
880 }
881
882 let mut queue = nodes
884 .iter()
885 .filter(|n| {
886 *indegree
887 .get(n.name.as_str())
888 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
889 == 0
890 })
891 .copied()
892 .collect::<VecDeque<_>>();
893
894 let mut processed_count = 0;
895 let mut levels = Vec::new();
896
897 while !queue.is_empty() {
899 let level_size = queue.len();
900 let mut current_level = Vec::with_capacity(level_size);
901
902 for _ in 0..level_size {
903 let n = queue
904 .pop_front()
905 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
906 current_level.push(n);
907 processed_count += 1;
908
909 for &neighbour in graph
910 .get(n.name.as_str())
911 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
912 {
913 let neighbour_indegree = indegree
914 .get_mut(neighbour.name.as_str())
915 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
916 *neighbour_indegree -= 1;
917
918 if *neighbour_indegree == 0 {
919 queue.push_back(neighbour);
920 }
921 }
922 }
923
924 current_level.sort_by_key(|n| &n.name);
925 levels.push(current_level);
926 }
927
928 if processed_count != nodes.len() {
930 return Err(OrchestratorError::InvalidConfig(
931 "Tokens have cyclical dependencies".to_string(),
932 ));
933 }
934
935 Ok(levels)
936}
937
938#[derive(Clone, Debug)]
946pub struct ScopedFilesystem<'a, FS: FileSystem> {
947 fs: &'a FS,
948 base_dir: &'a str,
949}
950
951impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
952 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
953 Self { fs, base_dir }
954 }
955
956 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
957 for file in files {
958 let full_remote_path = PathBuf::from(format!(
959 "{}/{}",
960 self.base_dir,
961 file.remote_path.to_string_lossy()
962 ));
963 trace!("coping file: {file}");
964 self.fs
965 .copy(file.local_path.as_path(), full_remote_path)
966 .await?;
967 }
968 Ok(())
969 }
970
971 async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
972 let file = file.as_ref();
973
974 let full_path = if file.is_absolute() {
975 file.to_owned()
976 } else {
977 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
978 };
979 let content = self.fs.read(full_path).await?;
980 Ok(content)
981 }
982
983 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
984 let file = file.as_ref();
985
986 let full_path = if file.is_absolute() {
987 file.to_owned()
988 } else {
989 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
990 };
991 let content = self.fs.read_to_string(full_path).await?;
992 Ok(content)
993 }
994
995 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
996 let path = PathBuf::from(format!(
997 "{}/{}",
998 self.base_dir,
999 path.as_ref().to_string_lossy()
1000 ));
1001 self.fs.create_dir(path).await
1002 }
1003
1004 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1005 let path = PathBuf::from(format!(
1006 "{}/{}",
1007 self.base_dir,
1008 path.as_ref().to_string_lossy()
1009 ));
1010 self.fs.create_dir_all(path).await
1011 }
1012
1013 async fn write(
1014 &self,
1015 path: impl AsRef<Path>,
1016 contents: impl AsRef<[u8]> + Send,
1017 ) -> Result<(), FileSystemError> {
1018 let path = path.as_ref();
1019
1020 let full_path = if path.is_absolute() {
1021 path.to_owned()
1022 } else {
1023 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1024 };
1025
1026 self.fs.write(full_path, contents).await
1027 }
1028
1029 fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1031 let path = path.as_ref();
1032
1033 let full_path = if path.is_absolute() {
1034 path.to_owned()
1035 } else {
1036 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1037 };
1038
1039 full_path
1040 }
1041}
1042
1043#[derive(Clone, Debug)]
1044pub enum ZombieRole {
1045 Temp,
1046 Node,
1047 Bootnode,
1048 Collator,
1049 CumulusCollator,
1050 Companion,
1051}
1052
1053pub use network::{AddCollatorOptions, AddNodeOptions};
1055pub use network_helper::metrics;
1056pub use sc_chain_spec;
1057
1058#[cfg(test)]
1059mod tests {
1060 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1061 use lazy_static::lazy_static;
1062 use tokio::sync::Mutex;
1063
1064 use super::*;
1065
1066 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1067 lazy_static! {
1069 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1070 }
1071
1072 fn set_env(concurrency: Option<u32>) {
1073 if let Some(value) = concurrency {
1074 env::set_var(ENV_KEY, value.to_string());
1075 } else {
1076 env::remove_var(ENV_KEY);
1077 }
1078 }
1079
1080 fn generate(
1081 with_image: bool,
1082 with_cmd: Option<&'static str>,
1083 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1084 NetworkConfigBuilder::new()
1085 .with_relaychain(|r| {
1086 let mut relay = r
1087 .with_chain("rococo-local")
1088 .with_default_command(with_cmd.unwrap_or("polkadot"));
1089 if with_image {
1090 relay = relay.with_default_image("docker.io/parity/polkadot")
1091 }
1092
1093 relay
1094 .with_validator(|node| node.with_name("alice"))
1095 .with_validator(|node| node.with_name("bob"))
1096 })
1097 .with_parachain(|p| {
1098 p.with_id(2000).cumulus_based(true).with_collator(|n| {
1099 let node = n
1100 .with_name("collator")
1101 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1102 if with_image {
1103 node.with_image("docker.io/paritypr/test-parachain")
1104 } else {
1105 node
1106 }
1107 })
1108 })
1109 .build()
1110 }
1111
1112 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1113 let mut spec = NodeSpec {
1114 name: name.to_string(),
1115 ..Default::default()
1116 };
1117 if let Some(dependencies) = dependencies {
1118 for node in dependencies {
1119 spec.args.push(
1120 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1121 .as_str()
1122 .into(),
1123 );
1124 }
1125 }
1126 spec
1127 }
1128
1129 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1130 actual_levels
1131 .iter()
1132 .zip(expected_levels)
1133 .for_each(|(actual_level, expected_level)| {
1134 assert_eq!(actual_level.len(), expected_level.len());
1135 actual_level
1136 .iter()
1137 .zip(expected_level.iter())
1138 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1139 });
1140 }
1141
1142 #[tokio::test]
1143 async fn valid_config_with_image() {
1144 let network_config = generate(true, None).unwrap();
1145 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1146 let caps = ProviderCapabilities {
1147 requires_image: true,
1148 has_resources: false,
1149 prefix_with_full_path: false,
1150 use_default_ports_in_cmd: false,
1151 };
1152
1153 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1154 assert!(valid.is_ok())
1155 }
1156
1157 #[tokio::test]
1158 async fn invalid_config_without_image() {
1159 let network_config = generate(false, None).unwrap();
1160 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1161 let caps = ProviderCapabilities {
1162 requires_image: true,
1163 has_resources: false,
1164 prefix_with_full_path: false,
1165 use_default_ports_in_cmd: false,
1166 };
1167
1168 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1169 assert!(valid.is_err())
1170 }
1171
1172 #[tokio::test]
1173 async fn invalid_config_missing_cmd() {
1174 let network_config = generate(false, Some("other")).unwrap();
1175 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1176 let caps = ProviderCapabilities {
1177 requires_image: false,
1178 has_resources: false,
1179 prefix_with_full_path: false,
1180 use_default_ports_in_cmd: false,
1181 };
1182
1183 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1184 assert!(valid.is_err())
1185 }
1186
1187 #[tokio::test]
1188 async fn valid_config_present_cmd() {
1189 let network_config = generate(false, Some("cargo")).unwrap();
1190 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1191 let caps = ProviderCapabilities {
1192 requires_image: false,
1193 has_resources: false,
1194 prefix_with_full_path: false,
1195 use_default_ports_in_cmd: false,
1196 };
1197
1198 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1199 println!("{valid:?}");
1200 assert!(valid.is_ok())
1201 }
1202
1203 #[tokio::test]
1204 async fn default_spawn_concurrency() {
1205 let _g = ENV_MUTEX.lock().await;
1206 set_env(None);
1207 let network_config = generate(false, Some("cargo")).unwrap();
1208 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1209 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1210 assert_eq!(concurrency, 100);
1211 }
1212
1213 #[tokio::test]
1214 async fn set_spawn_concurrency() {
1215 let _g = ENV_MUTEX.lock().await;
1216 set_env(None);
1217
1218 let network_config = generate(false, Some("cargo")).unwrap();
1219 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1220
1221 let global_settings = GlobalSettingsBuilder::new()
1222 .with_spawn_concurrency(4)
1223 .build()
1224 .unwrap();
1225
1226 spec.set_global_settings(global_settings);
1227 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1228 assert_eq!(concurrency, 4);
1229 assert!(!limited);
1230 }
1231
1232 #[tokio::test]
1233 async fn set_spawn_concurrency_but_limited() {
1234 let _g = ENV_MUTEX.lock().await;
1235 set_env(None);
1236
1237 let network_config = generate(false, Some("cargo")).unwrap();
1238 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1239
1240 let global_settings = GlobalSettingsBuilder::new()
1241 .with_spawn_concurrency(4)
1242 .build()
1243 .unwrap();
1244
1245 spec.set_global_settings(global_settings);
1246 let node = spec.relaychain.nodes.first_mut().unwrap();
1247 node.args
1248 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1249 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1250 assert_eq!(concurrency, 1);
1251 assert!(limited);
1252 }
1253
1254 #[tokio::test]
1255 async fn set_spawn_concurrency_from_env() {
1256 let _g = ENV_MUTEX.lock().await;
1257 set_env(Some(10));
1258
1259 let network_config = generate(false, Some("cargo")).unwrap();
1260 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1261 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1262 assert_eq!(concurrency, 10);
1263 assert!(!limited);
1264 }
1265
1266 #[tokio::test]
1267 async fn set_spawn_concurrency_from_env_but_limited() {
1268 let _g = ENV_MUTEX.lock().await;
1269 set_env(Some(12));
1270
1271 let network_config = generate(false, Some("cargo")).unwrap();
1272 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1273 let node = spec.relaychain.nodes.first_mut().unwrap();
1274 node.args
1275 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1276 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1277 assert_eq!(concurrency, 1);
1278 assert!(limited);
1279 }
1280
1281 #[test]
1282 fn dependency_levels_among_should_work() {
1283 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1285
1286 let alice = get_node_with_dependencies("alice", None);
1288 let nodes = [&alice];
1289
1290 let levels = dependency_levels_among(&nodes).unwrap();
1291 let expected = vec![vec!["alice"]];
1292
1293 verify_levels(levels, expected);
1294
1295 let alice = get_node_with_dependencies("alice", None);
1297 let bob = get_node_with_dependencies("bob", None);
1298 let nodes = [&alice, &bob];
1299
1300 let levels = dependency_levels_among(&nodes).unwrap();
1301 let expected = vec![vec!["alice", "bob"]];
1302
1303 verify_levels(levels, expected);
1304
1305 let alice = get_node_with_dependencies("alice", None);
1307 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1308 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1309 let nodes = [&alice, &bob, &charlie];
1310
1311 let levels = dependency_levels_among(&nodes).unwrap();
1312 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1313
1314 verify_levels(levels, expected);
1315
1316 let alice = get_node_with_dependencies("alice", None);
1320 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1321 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1322 let nodes = [&alice, &bob, &charlie];
1323
1324 let levels = dependency_levels_among(&nodes).unwrap();
1325 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1326
1327 verify_levels(levels, expected);
1328
1329 let alice = get_node_with_dependencies("alice", None);
1333 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1334 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1335 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1336 let nodes = [&alice, &bob, &charlie, &dave];
1337
1338 let levels = dependency_levels_among(&nodes).unwrap();
1339 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1340
1341 verify_levels(levels, expected);
1342 }
1343
1344 #[test]
1345 fn dependency_levels_among_should_detect_cycles() {
1346 let mut alice = get_node_with_dependencies("alice", None);
1347 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1348 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1349
1350 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1351 }
1352}