1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 env,
19 net::IpAddr,
20 path::{Path, PathBuf},
21 time::{Duration, SystemTime},
22};
23
24use anyhow::anyhow;
25use configuration::{types::JsonOverrides, NetworkConfig, RegistrationStrategy};
26use errors::OrchestratorError;
27use generators::{core_assignment, errors::GeneratorError};
28use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
29pub use network_spec::NetworkSpec;
31use network_spec::{node::NodeSpec, parachain::ParachainSpec};
32use provider::{
33 types::{ProviderCapabilities, TransferedFile},
34 DynNamespace, DynProvider,
35};
36use serde_json::json;
37use support::{
38 constants::{
39 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
40 THIS_IS_A_BUG,
41 },
42 fs::{FileSystem, FileSystemError},
43 replacer::{get_tokens_to_replace, has_tokens},
44};
45use tokio::time::timeout;
46use tracing::{debug, info, trace, warn};
47
48use crate::{
49 network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
50 shared::types::RegisterParachainOptions,
51 spawner::SpawnNodeCtx,
52 utils::write_zombie_json,
53};
54pub struct Orchestrator<T>
55where
56 T: FileSystem + Sync + Send,
57{
58 filesystem: T,
59 provider: DynProvider,
60}
61
62impl<T> Orchestrator<T>
63where
64 T: FileSystem + Sync + Send + Clone,
65{
66 pub fn new(filesystem: T, provider: DynProvider) -> Self {
67 Self {
68 filesystem,
69 provider,
70 }
71 }
72
73 pub async fn spawn(
74 &self,
75 network_config: NetworkConfig,
76 ) -> Result<Network<T>, OrchestratorError> {
77 let global_timeout = network_config.global_settings().network_spawn_timeout();
78 let network_spec = NetworkSpec::from_config(&network_config).await?;
79
80 let res = timeout(
81 Duration::from_secs(global_timeout.into()),
82 self.spawn_inner(network_spec),
83 )
84 .await
85 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86 res?
87 }
88
89 pub async fn spawn_from_spec(
90 &self,
91 network_spec: NetworkSpec,
92 ) -> Result<Network<T>, OrchestratorError> {
93 let global_timeout = network_spec.global_settings.network_spawn_timeout();
94 let res = timeout(
95 Duration::from_secs(global_timeout as u64),
96 self.spawn_inner(network_spec),
97 )
98 .await
99 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
100 res?
101 }
102
103 pub async fn attach_to_live(
104 &self,
105 zombie_json_path: &Path,
106 ) -> Result<Network<T>, OrchestratorError> {
107 info!("attaching to live network...");
108 info!("reading zombie.json from {:?}", zombie_json_path);
109
110 let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
111 let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
112
113 info!("recreating namespace...");
114 let ns: DynNamespace = self
115 .provider
116 .create_namespace_from_json(&zombie_json)
117 .await?;
118
119 info!("recreating relaychain...");
120 let (relay, initial_spec) =
121 recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
122 let relay_nodes = relay.nodes.clone();
123
124 let mut network =
125 Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
126
127 for node in relay_nodes {
128 if node.is_responsive().await {
129 node.set_is_running(true);
130 }
131 network.insert_node(node);
132 }
133
134 info!("recreating parachains...");
135 let parachains_map =
136 recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
137 let para_nodes = parachains_map
138 .values()
139 .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
140 .collect::<Vec<NetworkNode>>();
141
142 network.set_parachains(parachains_map);
143 for node in para_nodes {
144 if node.is_responsive().await {
145 node.set_is_running(true);
146 }
147 network.insert_node(node);
148 }
149
150 Ok(network)
151 }
152
153 async fn spawn_inner(
154 &self,
155 mut network_spec: NetworkSpec,
156 ) -> Result<Network<T>, OrchestratorError> {
157 debug!(network_spec = ?network_spec,"Network spec to spawn");
159
160 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
162 .map_err(|err| {
163 OrchestratorError::InvalidConfigForProvider(
164 self.provider.name().into(),
165 err.to_string(),
166 )
167 })?;
168
169 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
171 self.provider
172 .create_namespace_with_base_dir(base_dir)
173 .await?
174 } else {
175 self.provider.create_namespace().await?
176 };
177
178 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
180
181 let start_time = SystemTime::now();
182 info!("🧰 ns: {}", ns.name());
183 info!("🧰 base_dir: {:?}", ns.base_dir());
184 info!("🕰 start time: {:?}", start_time);
185 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
186
187 network_spec
188 .populate_nodes_available_args(ns.clone())
189 .await?;
190
191 let base_dir = ns.base_dir().to_string_lossy();
192 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
193 network_spec
195 .relaychain
196 .chain_spec
197 .build(&ns, &scoped_fs)
198 .await?;
199
200 debug!("relaychain spec built!");
201 let relay_chain_id = network_spec
203 .relaychain
204 .chain_spec
205 .read_chain_id(&scoped_fs)
206 .await?;
207
208 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
209 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
210 network_spec
211 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
212 .await?;
213
214 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
216 Vec<&ParachainSpec>,
217 Vec<&ParachainSpec>,
218 ) = network_spec
219 .parachains
220 .iter()
221 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
222 .partition(|para| {
223 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
224 });
225
226 let mut para_artifacts = vec![];
227 for para in ¶_to_register_in_genesis {
228 let genesis_config = para.get_genesis_config()?;
229 para_artifacts.push(genesis_config)
230 }
231
232 network_spec
234 .relaychain
235 .chain_spec
236 .customize_relay(
237 &network_spec.relaychain,
238 &network_spec.hrmp_channels,
239 para_artifacts,
240 &scoped_fs,
241 )
242 .await?;
243
244 if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
246 network_spec
247 .relaychain
248 .chain_spec
249 .run_post_process_script(script_cmd, &scoped_fs)
250 .await?;
251 }
252
253 let num_cores = network_spec.parachains_iter().fold(0u32, |mut acc, para| {
255 if let Some(cores) = para.num_cores {
256 acc += cores;
257 } else if &RegistrationStrategy::InGenesis == para.registration_strategy() {
258 acc += 1;
260 }
261
262 acc
263 });
264
265 if num_cores > para_to_register_in_genesis.len() as u32 {
266 let num_cores_to_set = num_cores - para_to_register_in_genesis.len() as u32;
267 let overrides = json!({
269 "configuration": {
270 "config": {
271 "scheduler_params": {
272 "num_cores": num_cores_to_set,
273 "max_validators_per_core": 1
274 },
275 }
276 }
277 });
278
279 network_spec
280 .relaychain
281 .chain_spec
282 .apply_genesis_override(&scoped_fs, &overrides)
283 .await?;
284 }
285
286 network_spec
288 .relaychain
289 .chain_spec
290 .build_raw(&ns, &scoped_fs, None)
291 .await?;
292
293 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
295 network_spec
296 .relaychain
297 .chain_spec
298 .override_code(&scoped_fs, wasm_override)
299 .await?;
300 }
301
302 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
304 network_spec
305 .relaychain
306 .chain_spec
307 .override_raw_spec(&scoped_fs, raw_spec_override)
308 .await?;
309 }
310
311 debug!("Raw overrides info: num_cores: {}, para_to_register_in_genesis_len: {:?}, override_session_0: {}", num_cores, para_to_register_in_genesis.len(), network_spec.relaychain().override_session_0);
313 if num_cores > para_to_register_in_genesis.len() as u32
314 || network_spec.relaychain().override_session_0
315 {
316 let mut core_index = 0u32;
317 let scheduler_key = core_assignment::get_parascheduler_storage_key();
320 let is_old = !network_spec
321 .relaychain
322 .chain_spec
323 .find_raw_key(&scoped_fs, &scheduler_key)
324 .await?;
325
326 let mut para_scheduler_value_parts: Vec<String> = vec![];
327 let mut raw_json_overrides = json!({});
328 for para in &network_spec.parachains {
330 let mut cores_for_para = 0_u32;
332 if let Some(cores) = para.num_cores {
333 if &RegistrationStrategy::InGenesis == para.registration_strategy() {
334 cores_for_para = cores;
335 }
336 } else {
337 if &RegistrationStrategy::InGenesis == para.registration_strategy()
340 && network_spec.relaychain().override_session_0
341 {
342 cores_for_para += 1;
343 }
344 }
345
346 debug!(
347 "Assigning {cores_for_para} cores in raw spec for para {}. Using pallet {}",
348 para.id,
349 if is_old {
350 "CoretimeAssignmentProvider"
351 } else {
352 "ParaScheduler"
353 }
354 );
355 for _core in 0..cores_for_para {
356 if is_old {
357 let (core_assign_key, core_assign_value) =
358 core_assignment::generate_old(core_index, para.id);
359 raw_json_overrides[core_assign_key] = json!(core_assign_value);
360 } else {
361 let part = core_assignment::generate(core_index, para.id);
362 para_scheduler_value_parts.push(part);
363 }
364 core_index += 1;
365 }
366 }
367
368 if !is_old {
370 let count_prefix = format!("{:02x}", para_scheduler_value_parts.len() * 4);
371 let core_assign_value =
372 format!("{count_prefix}{}", para_scheduler_value_parts.join(""));
373 raw_json_overrides[scheduler_key] = json!(core_assign_value);
374 }
375
376 if network_spec.relaychain().override_session_0 {
378 trace!("Overriding pallet ParaSessionInfo.session (0) to allow paras to produce blocks at first session.");
379 let raw_spec = network_spec
380 .relaychain
381 .chain_spec
382 .read_raw_spec(&scoped_fs)
383 .await?;
384 let overrides = generators::generate_session_0_overrides(&raw_spec, num_cores)?;
385
386 for (k, v) in overrides.as_object().ok_or(anyhow!(
387 "'generate_session_0_overrides' should be a valid json Object."
388 ))? {
389 raw_json_overrides[k] = v.clone();
390 }
391 }
392
393 debug!("Raw overrides keys: {:?}", raw_json_overrides);
394
395 network_spec
396 .relaychain
397 .chain_spec
398 .override_raw_spec(
399 &scoped_fs,
400 &JsonOverrides::Json(json!({
401 "genesis": {
402 "raw": {
403 "top": raw_json_overrides
404 }
405 }
406 })),
407 )
408 .await?;
409 }
410
411 let (bootnodes, relaynodes) =
412 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
413
414 let mut ctx = SpawnNodeCtx {
416 chain_id: &relay_chain_id,
417 parachain_id: None,
418 chain: relay_chain_name.as_str(),
419 role: ZombieRole::Node,
420 ns: &ns,
421 scoped_fs: &scoped_fs,
422 parachain: None,
423 bootnodes_addr: &vec![],
424 wait_ready: false,
425 nodes_by_name: json!({}),
426 global_settings: &network_spec.global_settings,
427 };
428
429 let global_files_to_inject = vec![TransferedFile::new(
430 PathBuf::from(format!(
431 "{}/{relay_chain_name}.json",
432 ns.base_dir().to_string_lossy()
433 )),
434 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
435 )];
436
437 let r = Relaychain::new(
438 relay_chain_name.to_string(),
439 relay_chain_id.clone(),
440 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
441 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
442 )?),
443 );
444 let mut network =
445 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
446
447 let mut node_ws_url: String = "".to_string();
449
450 let mut bootnodes_addr: Vec<String> = vec![];
452
453 for level in dependency_levels_among(&bootnodes)? {
454 let mut running_nodes_per_level = vec![];
455 for chunk in level.chunks(spawn_concurrency) {
456 let spawning_tasks = chunk
457 .iter()
458 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
459
460 for node in futures::future::try_join_all(spawning_tasks).await? {
461 let bootnode_multiaddr = node.multiaddr();
462
463 bootnodes_addr.push(bootnode_multiaddr.to_string());
464
465 if node_ws_url.is_empty() {
467 node_ws_url.clone_from(&node.ws_uri)
468 }
469
470 running_nodes_per_level.push(node);
471 }
472 }
473 info!(
474 "🕰 waiting for level: {:?} to be up...",
475 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
476 );
477
478 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
480 node.wait_until_is_up(network_spec.global_settings.node_spawn_timeout())
481 });
482
483 let _ = futures::future::try_join_all(waiting_tasks).await?;
484
485 for node in running_nodes_per_level {
486 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
488 network.add_running_node(node, None).await;
489 }
490 }
491
492 network_spec
494 .relaychain
495 .chain_spec
496 .add_bootnodes(&scoped_fs, &bootnodes_addr)
497 .await?;
498
499 ctx.bootnodes_addr = &bootnodes_addr;
500
501 for level in dependency_levels_among(&relaynodes)? {
502 let mut running_nodes_per_level = vec![];
503 for chunk in level.chunks(spawn_concurrency) {
504 let spawning_tasks = chunk
505 .iter()
506 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
507
508 for node in futures::future::try_join_all(spawning_tasks).await? {
509 running_nodes_per_level.push(node);
510 }
511 }
512 info!(
513 "🕰 waiting for level: {:?} to be up...",
514 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
515 );
516
517 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
519 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
520 });
521
522 let _ = futures::future::try_join_all(waiting_tasks).await?;
523
524 for node in running_nodes_per_level {
525 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
526 network.add_running_node(node, None).await;
527 }
528 }
529
530 for para in network_spec.parachains.iter() {
532 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
534 let parachain_id = parachain.chain_id.clone();
535
536 let (bootnodes, collators) =
537 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
538
539 let mut ctx_para = SpawnNodeCtx {
541 parachain: Some(para),
542 parachain_id: parachain_id.as_deref(),
543 role: if para.is_cumulus_based {
544 ZombieRole::CumulusCollator
545 } else {
546 ZombieRole::Collator
547 },
548 bootnodes_addr: &vec![],
549 ..ctx.clone()
550 };
551
552 let mut bootnodes_addr: Vec<String> = vec![];
554 let mut running_nodes: Vec<NetworkNode> = vec![];
555
556 for level in dependency_levels_among(&bootnodes)? {
557 let mut running_nodes_per_level = vec![];
558 for chunk in level.chunks(spawn_concurrency) {
559 let spawning_tasks = chunk.iter().map(|node| {
560 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
561 });
562
563 for node in futures::future::try_join_all(spawning_tasks).await? {
564 let bootnode_multiaddr = node.multiaddr();
565
566 bootnodes_addr.push(bootnode_multiaddr.to_string());
567
568 running_nodes_per_level.push(node);
569 }
570 }
571 info!(
572 "🕰 waiting for level: {:?} to be up...",
573 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
574 );
575
576 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
578 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
579 });
580
581 let _ = futures::future::try_join_all(waiting_tasks).await?;
582
583 for node in running_nodes_per_level {
584 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
585 running_nodes.push(node);
586 }
587 }
588
589 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
590 para_chain_spec
591 .add_bootnodes(&scoped_fs, &bootnodes_addr)
592 .await?;
593 }
594
595 ctx_para.bootnodes_addr = &bootnodes_addr;
596
597 for level in dependency_levels_among(&collators)? {
599 let mut running_nodes_per_level = vec![];
600 for chunk in level.chunks(spawn_concurrency) {
601 let spawning_tasks = chunk.iter().map(|node| {
602 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
603 });
604
605 for node in futures::future::try_join_all(spawning_tasks).await? {
606 running_nodes_per_level.push(node);
607 }
608 }
609 info!(
610 "🕰 waiting for level: {:?} to be up...",
611 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
612 );
613
614 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
616 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
617 });
618
619 let _ = futures::future::try_join_all(waiting_tasks).await?;
620
621 for node in running_nodes_per_level {
622 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
623 running_nodes.push(node);
624 }
625 }
626
627 let running_para_id = parachain.para_id;
628 network.add_para(parachain);
629 for node in running_nodes {
630 network.add_running_node(node, Some(running_para_id)).await;
631 }
632 }
633
634 for para in para_to_register_with_extrinsic {
636 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
637 id: para.id,
638 wasm_path: para
640 .genesis_wasm
641 .artifact_path()
642 .ok_or(OrchestratorError::InvariantError(
643 "artifact path for wasm must be set at this point",
644 ))?
645 .to_path_buf(),
646 state_path: para
647 .genesis_state
648 .artifact_path()
649 .ok_or(OrchestratorError::InvariantError(
650 "artifact path for state must be set at this point",
651 ))?
652 .to_path_buf(),
653 node_ws_url: node_ws_url.clone(),
654 onboard_as_para: para.onboard_as_parachain,
655 seed: None, finalization: false,
657 };
658
659 Parachain::register(register_para_options, &scoped_fs).await?;
660 }
661
662 if network_spec.global_settings.observability().enabled() {
663 match network
664 .start_observability(network_spec.global_settings.observability())
665 .await
666 {
667 Ok(obs) => {
668 info!("📊 Prometheus URL: {}", obs.prometheus_url);
669 info!("📊 Grafana URL: {}", obs.grafana_url);
670 },
671 Err(e) => {
672 warn!("⚠️ Failed to spawn observability stack: {e}");
673 },
674 }
675 }
676
677 for cp in &network_spec.custom_processes {
679 if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
680 warn!("⚠️ Failed to spawn custom process {}, err: {e}", cp.name())
681 }
682 }
683
684 network.set_start_time_ts(start_time);
685
686 write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
687
688 if network_spec.global_settings.tear_down_on_failure() {
689 network.spawn_watching_task();
690 }
691
692 Ok(network)
693 }
694}
695
696async fn recreate_network_nodes_from_json(
699 nodes_json: &serde_json::Value,
700 ns: DynNamespace,
701 provider_name: &str,
702) -> Result<Vec<NetworkNode>, OrchestratorError> {
703 let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
704
705 let mut nodes = Vec::with_capacity(raw_nodes.len());
706 for raw in raw_nodes {
707 let provider_tag = raw
709 .inner
710 .get("provider_tag")
711 .and_then(|v| v.as_str())
712 .ok_or_else(|| {
713 OrchestratorError::InvalidConfig(format!(
714 "Node '{}' is missing `provider_tag` in inner node JSON",
715 raw.name
716 ))
717 })?;
718
719 if provider_tag != provider_name {
720 return Err(OrchestratorError::InvalidConfigForProvider(
721 provider_name.to_string(),
722 provider_tag.to_string(),
723 ));
724 }
725 let inner = ns.spawn_node_from_json(&raw.inner).await?;
726 let relay_node = NetworkNode::new(
727 raw.name,
728 raw.ws_uri,
729 raw.prometheus_uri,
730 raw.multiaddr,
731 raw.spec,
732 inner,
733 raw.cmd_generator_opts,
734 raw.context,
735 );
736 nodes.push(relay_node);
737 }
738
739 Ok(nodes)
740}
741
742async fn recreate_relaychain_from_json(
743 zombie_json: &serde_json::Value,
744 ns: DynNamespace,
745 provider_name: &str,
746) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
747 let relay_json = zombie_json
748 .get("relay")
749 .ok_or(OrchestratorError::InvalidConfig(
750 "Missing `relay` field in zombie.json".into(),
751 ))?
752 .clone();
753
754 let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
755
756 let initial_spec: NetworkSpec = serde_json::from_value(
757 zombie_json
758 .get("initial_spec")
759 .ok_or(OrchestratorError::InvalidConfig(
760 "Missing `initial_spec` field in zombie.json".into(),
761 ))?
762 .clone(),
763 )?;
764
765 let nodes =
767 recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
768 relay_raw.inner.nodes = nodes;
769
770 Ok((relay_raw.inner, initial_spec))
771}
772
773async fn recreate_parachains_from_json(
774 zombie_json: &serde_json::Value,
775 ns: DynNamespace,
776 provider_name: &str,
777) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
778 let paras_json = zombie_json
779 .get("parachains")
780 .ok_or(OrchestratorError::InvalidConfig(
781 "Missing `parachains` field in zombie.json".into(),
782 ))?
783 .clone();
784
785 let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
786
787 let mut parachains_map = HashMap::new();
788
789 for (id, parachain_entries) in raw_paras {
790 let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
791
792 for raw_para in parachain_entries {
793 let mut para = raw_para.inner;
794 para.collators =
795 recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
796 .await?;
797 parsed_vec.push(para);
798 }
799
800 parachains_map.insert(id, parsed_vec);
801 }
802
803 Ok(parachains_map)
804}
805
806fn split_nodes_by_bootnodes(
809 nodes: &[NodeSpec],
810 no_default_bootnodes: bool,
811) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
812 let mut bootnodes = vec![];
814 let mut other_nodes = vec![];
815 nodes.iter().for_each(|node| {
816 if node.is_bootnode {
817 bootnodes.push(node)
818 } else {
819 other_nodes.push(node)
820 }
821 });
822
823 if bootnodes.is_empty() && !no_default_bootnodes {
824 bootnodes.push(other_nodes.remove(0))
825 }
826
827 (bootnodes, other_nodes)
828}
829
830fn generate_bootnode_addr(
832 node: &NetworkNode,
833 ip: &IpAddr,
834 port: u16,
835) -> Result<String, GeneratorError> {
836 generators::generate_node_bootnode_addr(
837 &node.spec.peer_id,
838 ip,
839 port,
840 node.inner.args().as_ref(),
841 &node.spec.p2p_cert_hash,
842 )
843}
844fn validate_spec_with_provider_capabilities(
846 network_spec: &NetworkSpec,
847 capabilities: &ProviderCapabilities,
848) -> Result<(), anyhow::Error> {
849 let mut errs: Vec<String> = vec![];
850
851 if capabilities.requires_image {
852 if network_spec.relaychain.default_image.is_none() {
854 let nodes = &network_spec.relaychain.nodes;
856 if nodes.iter().any(|node| node.image.is_none()) {
857 errs.push(String::from(
858 "Missing image for node, and not default is set at relaychain",
859 ));
860 }
861 };
862
863 for para in &network_spec.parachains {
865 if para.default_image.is_none() {
866 let nodes = ¶.collators;
867 if nodes.iter().any(|node| node.image.is_none()) {
868 errs.push(format!(
869 "Missing image for node, and not default is set at parachain {}",
870 para.id
871 ));
872 }
873 }
874 }
875 } else {
876 let mut cmds: HashSet<&str> = Default::default();
879 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
880 cmds.insert(cmd.as_str());
881 }
882 for node in network_spec.relaychain().nodes.iter() {
883 cmds.insert(node.command());
884 }
885
886 for para in &network_spec.parachains {
888 if let Some(cmd) = para.default_command.as_ref() {
889 cmds.insert(cmd.as_str());
890 }
891
892 for node in para.collators.iter() {
893 cmds.insert(node.command());
894 }
895 }
896
897 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
900 let parts: Vec<_> = path.split(":").collect();
901 for cmd in cmds {
902 let missing = if cmd.contains('/') {
903 trace!("checking {cmd}");
904 if std::fs::metadata(cmd).is_err() {
905 true
906 } else {
907 info!("🔎 We will use the full path {cmd} to spawn nodes.");
908 false
909 }
910 } else {
911 !parts.iter().any(|part| {
913 let path_to = format!("{part}/{cmd}");
914 trace!("checking {path_to}");
915 let check_result = std::fs::metadata(&path_to);
916 trace!("result {:?}", check_result);
917 if check_result.is_ok() {
918 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
919 true
920 } else {
921 false
922 }
923 })
924 };
925
926 if missing {
927 errs.push(help_msg(cmd));
928 }
929 }
930 }
931
932 if !errs.is_empty() {
933 let msg = errs.join("\n");
934 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
935 }
936
937 Ok(())
938}
939
940fn help_msg(cmd: &str) -> String {
941 match cmd {
942 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
943 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
944 },
945 "polkadot" => {
946 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
947 },
948 "polkadot-parachain" => {
949 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
950 },
951 _ => {
952 format!("Missing binary {cmd}, please compile it.")
953 },
954 }
955}
956
957fn spawn_concurrency_from_env() -> Option<usize> {
959 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
960 concurrency.parse::<usize>().ok()
961 } else {
962 None
963 }
964}
965
966fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
967 let desired_spawn_concurrency = match (
968 spawn_concurrency_from_env(),
969 spec.global_settings.spawn_concurrency(),
970 ) {
971 (Some(n), _) => Some(n),
972 (None, Some(n)) => Some(n),
973 _ => None,
974 };
975
976 let (spawn_concurrency, limited_by_tokens) =
977 if let Some(spawn_concurrency) = desired_spawn_concurrency {
978 if spawn_concurrency == 1 {
979 (1, false)
980 } else if has_tokens(&serde_json::to_string(spec)?) {
981 (1, true)
982 } else {
983 (spawn_concurrency, false)
984 }
985 } else {
986 if has_tokens(&serde_json::to_string(spec)?) {
988 (1, true)
989 } else {
990 (100, false)
992 }
993 };
994
995 Ok((spawn_concurrency, limited_by_tokens))
996}
997
998fn dependency_levels_among<'a>(
1003 nodes: &'a [&'a NodeSpec],
1004) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
1005 let by_name = nodes
1006 .iter()
1007 .map(|n| (n.name.as_str(), *n))
1008 .collect::<HashMap<_, _>>();
1009
1010 let mut graph = HashMap::with_capacity(nodes.len());
1011 let mut indegree = HashMap::with_capacity(nodes.len());
1012
1013 for node in nodes {
1014 graph.insert(node.name.as_str(), Vec::new());
1015 indegree.insert(node.name.as_str(), 0);
1016 }
1017
1018 for &node in nodes {
1020 if let Ok(args_json) = serde_json::to_string(&node.args) {
1021 let unique_deps = get_tokens_to_replace(&args_json)
1023 .into_iter()
1024 .filter(|dep| dep != &node.name)
1025 .filter_map(|dep| by_name.get(dep.as_str()))
1026 .map(|&dep_node| dep_node.name.as_str())
1027 .collect::<HashSet<_>>();
1028
1029 for dep_name in unique_deps {
1030 graph
1031 .get_mut(dep_name)
1032 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
1033 .push(node);
1034 *indegree
1035 .get_mut(node.name.as_str())
1036 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
1037 }
1038 }
1039 }
1040
1041 let mut queue = nodes
1043 .iter()
1044 .filter(|n| {
1045 *indegree
1046 .get(n.name.as_str())
1047 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1048 == 0
1049 })
1050 .copied()
1051 .collect::<VecDeque<_>>();
1052
1053 let mut processed_count = 0;
1054 let mut levels = Vec::new();
1055
1056 while !queue.is_empty() {
1058 let level_size = queue.len();
1059 let mut current_level = Vec::with_capacity(level_size);
1060
1061 for _ in 0..level_size {
1062 let n = queue
1063 .pop_front()
1064 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
1065 current_level.push(n);
1066 processed_count += 1;
1067
1068 for &neighbour in graph
1069 .get(n.name.as_str())
1070 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1071 {
1072 let neighbour_indegree = indegree
1073 .get_mut(neighbour.name.as_str())
1074 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
1075 *neighbour_indegree -= 1;
1076
1077 if *neighbour_indegree == 0 {
1078 queue.push_back(neighbour);
1079 }
1080 }
1081 }
1082
1083 current_level.sort_by_key(|n| &n.name);
1084 levels.push(current_level);
1085 }
1086
1087 if processed_count != nodes.len() {
1089 return Err(OrchestratorError::InvalidConfig(
1090 "Tokens have cyclical dependencies".to_string(),
1091 ));
1092 }
1093
1094 Ok(levels)
1095}
1096
1097#[derive(Clone, Debug)]
1105pub struct ScopedFilesystem<'a, FS: FileSystem> {
1106 fs: &'a FS,
1107 base_dir: &'a str,
1108}
1109
1110impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
1111 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
1112 Self { fs, base_dir }
1113 }
1114
1115 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
1116 for file in files {
1117 let full_remote_path = PathBuf::from(format!(
1118 "{}/{}",
1119 self.base_dir,
1120 file.remote_path.to_string_lossy()
1121 ));
1122 trace!("coping file: {file}");
1123 self.fs
1124 .copy(file.local_path.as_path(), full_remote_path)
1125 .await?;
1126 }
1127 Ok(())
1128 }
1129
1130 async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
1131 let file = file.as_ref();
1132
1133 let full_path = if file.is_absolute() {
1134 file.to_owned()
1135 } else {
1136 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1137 };
1138 let content = self.fs.read(full_path).await?;
1139 Ok(content)
1140 }
1141
1142 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1143 let file = file.as_ref();
1144
1145 let full_path = if file.is_absolute() {
1146 file.to_owned()
1147 } else {
1148 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1149 };
1150 let content = self.fs.read_to_string(full_path).await?;
1151 Ok(content)
1152 }
1153
1154 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1155 let path = PathBuf::from(format!(
1156 "{}/{}",
1157 self.base_dir,
1158 path.as_ref().to_string_lossy()
1159 ));
1160 self.fs.create_dir(path).await
1161 }
1162
1163 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1164 let path = PathBuf::from(format!(
1165 "{}/{}",
1166 self.base_dir,
1167 path.as_ref().to_string_lossy()
1168 ));
1169 self.fs.create_dir_all(path).await
1170 }
1171
1172 async fn write(
1173 &self,
1174 path: impl AsRef<Path>,
1175 contents: impl AsRef<[u8]> + Send,
1176 ) -> Result<(), FileSystemError> {
1177 let path = path.as_ref();
1178
1179 let full_path = if path.is_absolute() {
1180 path.to_owned()
1181 } else {
1182 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1183 };
1184
1185 self.fs.write(full_path, contents).await
1186 }
1187
1188 fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1190 let path = path.as_ref();
1191
1192 let full_path = if path.is_absolute() {
1193 path.to_owned()
1194 } else {
1195 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1196 };
1197
1198 full_path
1199 }
1200
1201 fn base_dir(&self) -> &str {
1203 self.base_dir
1204 }
1205}
1206
1207#[derive(Clone, Debug)]
1208pub enum ZombieRole {
1209 Temp,
1210 Node,
1211 Bootnode,
1212 Collator,
1213 CumulusCollator,
1214 Companion,
1215}
1216
1217pub use network::{AddCollatorOptions, AddNodeOptions};
1219pub use network_helper::metrics;
1220pub use sc_chain_spec;
1221
1222#[cfg(test)]
1223mod tests {
1224 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1225 use lazy_static::lazy_static;
1226 use tokio::sync::Mutex;
1227
1228 use super::*;
1229
1230 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1231 lazy_static! {
1233 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1234 }
1235
1236 fn set_env(concurrency: Option<u32>) {
1237 if let Some(value) = concurrency {
1238 env::set_var(ENV_KEY, value.to_string());
1239 } else {
1240 env::remove_var(ENV_KEY);
1241 }
1242 }
1243
1244 fn generate(
1245 with_image: bool,
1246 with_cmd: Option<&'static str>,
1247 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1248 NetworkConfigBuilder::new()
1249 .with_relaychain(|r| {
1250 let mut relay = r
1251 .with_chain("rococo-local")
1252 .with_default_command(with_cmd.unwrap_or("polkadot"));
1253 if with_image {
1254 relay = relay.with_default_image("docker.io/parity/polkadot")
1255 }
1256
1257 relay
1258 .with_validator(|node| node.with_name("alice"))
1259 .with_validator(|node| node.with_name("bob"))
1260 })
1261 .with_parachain(|p| {
1262 p.with_id(2000).cumulus_based(true).with_collator(|n| {
1263 let node = n
1264 .with_name("collator")
1265 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1266 if with_image {
1267 node.with_image("docker.io/paritypr/test-parachain")
1268 } else {
1269 node
1270 }
1271 })
1272 })
1273 .build()
1274 }
1275
1276 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1277 let mut spec = NodeSpec {
1278 name: name.to_string(),
1279 ..Default::default()
1280 };
1281 if let Some(dependencies) = dependencies {
1282 for node in dependencies {
1283 spec.args.push(
1284 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1285 .as_str()
1286 .into(),
1287 );
1288 }
1289 }
1290 spec
1291 }
1292
1293 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1294 actual_levels
1295 .iter()
1296 .zip(expected_levels)
1297 .for_each(|(actual_level, expected_level)| {
1298 assert_eq!(actual_level.len(), expected_level.len());
1299 actual_level
1300 .iter()
1301 .zip(expected_level.iter())
1302 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1303 });
1304 }
1305
1306 #[tokio::test]
1307 async fn valid_config_with_image() {
1308 let network_config = generate(true, None).unwrap();
1309 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1310 let caps = ProviderCapabilities {
1311 requires_image: true,
1312 has_resources: false,
1313 prefix_with_full_path: false,
1314 use_default_ports_in_cmd: false,
1315 };
1316
1317 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1318 assert!(valid.is_ok())
1319 }
1320
1321 #[tokio::test]
1322 async fn invalid_config_without_image() {
1323 let network_config = generate(false, None).unwrap();
1324 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1325 let caps = ProviderCapabilities {
1326 requires_image: true,
1327 has_resources: false,
1328 prefix_with_full_path: false,
1329 use_default_ports_in_cmd: false,
1330 };
1331
1332 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1333 assert!(valid.is_err())
1334 }
1335
1336 #[tokio::test]
1337 async fn invalid_config_missing_cmd() {
1338 let network_config = generate(false, Some("other")).unwrap();
1339 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1340 let caps = ProviderCapabilities {
1341 requires_image: false,
1342 has_resources: false,
1343 prefix_with_full_path: false,
1344 use_default_ports_in_cmd: false,
1345 };
1346
1347 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1348 assert!(valid.is_err())
1349 }
1350
1351 #[tokio::test]
1352 async fn valid_config_present_cmd() {
1353 let network_config = generate(false, Some("cargo")).unwrap();
1354 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1355 let caps = ProviderCapabilities {
1356 requires_image: false,
1357 has_resources: false,
1358 prefix_with_full_path: false,
1359 use_default_ports_in_cmd: false,
1360 };
1361
1362 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1363 println!("{valid:?}");
1364 assert!(valid.is_ok())
1365 }
1366
1367 #[tokio::test]
1368 async fn default_spawn_concurrency() {
1369 let _g = ENV_MUTEX.lock().await;
1370 set_env(None);
1371 let network_config = generate(false, Some("cargo")).unwrap();
1372 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1373 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1374 assert_eq!(concurrency, 100);
1375 }
1376
1377 #[tokio::test]
1378 async fn set_spawn_concurrency() {
1379 let _g = ENV_MUTEX.lock().await;
1380 set_env(None);
1381
1382 let network_config = generate(false, Some("cargo")).unwrap();
1383 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1384
1385 let global_settings = GlobalSettingsBuilder::new()
1386 .with_spawn_concurrency(4)
1387 .build()
1388 .unwrap();
1389
1390 spec.set_global_settings(global_settings);
1391 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1392 assert_eq!(concurrency, 4);
1393 assert!(!limited);
1394 }
1395
1396 #[tokio::test]
1397 async fn set_spawn_concurrency_but_limited() {
1398 let _g = ENV_MUTEX.lock().await;
1399 set_env(None);
1400
1401 let network_config = generate(false, Some("cargo")).unwrap();
1402 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1403
1404 let global_settings = GlobalSettingsBuilder::new()
1405 .with_spawn_concurrency(4)
1406 .build()
1407 .unwrap();
1408
1409 spec.set_global_settings(global_settings);
1410 let node = spec.relaychain.nodes.first_mut().unwrap();
1411 node.args
1412 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1413 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1414 assert_eq!(concurrency, 1);
1415 assert!(limited);
1416 }
1417
1418 #[tokio::test]
1419 async fn set_spawn_concurrency_from_env() {
1420 let _g = ENV_MUTEX.lock().await;
1421 set_env(Some(10));
1422
1423 let network_config = generate(false, Some("cargo")).unwrap();
1424 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1425 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1426 assert_eq!(concurrency, 10);
1427 assert!(!limited);
1428 }
1429
1430 #[tokio::test]
1431 async fn set_spawn_concurrency_from_env_but_limited() {
1432 let _g = ENV_MUTEX.lock().await;
1433 set_env(Some(12));
1434
1435 let network_config = generate(false, Some("cargo")).unwrap();
1436 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1437 let node = spec.relaychain.nodes.first_mut().unwrap();
1438 node.args
1439 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1440 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1441 assert_eq!(concurrency, 1);
1442 assert!(limited);
1443 }
1444
1445 #[test]
1446 fn dependency_levels_among_should_work() {
1447 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1449
1450 let alice = get_node_with_dependencies("alice", None);
1452 let nodes = [&alice];
1453
1454 let levels = dependency_levels_among(&nodes).unwrap();
1455 let expected = vec![vec!["alice"]];
1456
1457 verify_levels(levels, expected);
1458
1459 let alice = get_node_with_dependencies("alice", None);
1461 let bob = get_node_with_dependencies("bob", None);
1462 let nodes = [&alice, &bob];
1463
1464 let levels = dependency_levels_among(&nodes).unwrap();
1465 let expected = vec![vec!["alice", "bob"]];
1466
1467 verify_levels(levels, expected);
1468
1469 let alice = get_node_with_dependencies("alice", None);
1471 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1472 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1473 let nodes = [&alice, &bob, &charlie];
1474
1475 let levels = dependency_levels_among(&nodes).unwrap();
1476 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1477
1478 verify_levels(levels, expected);
1479
1480 let alice = get_node_with_dependencies("alice", None);
1484 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1485 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1486 let nodes = [&alice, &bob, &charlie];
1487
1488 let levels = dependency_levels_among(&nodes).unwrap();
1489 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1490
1491 verify_levels(levels, expected);
1492
1493 let alice = get_node_with_dependencies("alice", None);
1497 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1498 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1499 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1500 let nodes = [&alice, &bob, &charlie, &dave];
1501
1502 let levels = dependency_levels_among(&nodes).unwrap();
1503 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1504
1505 verify_levels(levels, expected);
1506 }
1507
1508 #[test]
1509 fn dependency_levels_among_should_detect_cycles() {
1510 let mut alice = get_node_with_dependencies("alice", None);
1511 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1512 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1513
1514 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1515 }
1516}