1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod tx_helper;
9
10mod network_spec;
11#[cfg(feature = "pjs")]
12pub mod pjs_helper;
13pub mod shared;
14mod spawner;
15
16use std::{
17 collections::HashSet,
18 net::IpAddr,
19 path::{Path, PathBuf},
20 time::Duration,
21};
22
23use configuration::{NetworkConfig, RegistrationStrategy};
24use errors::OrchestratorError;
25use generators::errors::GeneratorError;
26use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
27pub use network_spec::NetworkSpec;
29use network_spec::{node::NodeSpec, parachain::ParachainSpec};
30use provider::{
31 types::{ProviderCapabilities, TransferedFile},
32 DynProvider,
33};
34use serde_json::json;
35use support::fs::{FileSystem, FileSystemError};
36use tokio::time::timeout;
37use tracing::{debug, info, trace};
38
39use crate::{shared::types::RegisterParachainOptions, spawner::SpawnNodeCtx};
40pub struct Orchestrator<T>
41where
42 T: FileSystem + Sync + Send,
43{
44 filesystem: T,
45 provider: DynProvider,
46}
47
48impl<T> Orchestrator<T>
49where
50 T: FileSystem + Sync + Send + Clone,
51{
52 pub fn new(filesystem: T, provider: DynProvider) -> Self {
53 Self {
54 filesystem,
55 provider,
56 }
57 }
58
59 pub async fn spawn(
60 &self,
61 network_config: NetworkConfig,
62 ) -> Result<Network<T>, OrchestratorError> {
63 let global_timeout = network_config.global_settings().network_spawn_timeout();
64 let network_spec = NetworkSpec::from_config(&network_config).await?;
65
66 let res = timeout(
67 Duration::from_secs(global_timeout.into()),
68 self.spawn_inner(network_spec),
69 )
70 .await
71 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
72 res?
73 }
74
75 pub async fn spawn_from_spec(
76 &self,
77 network_spec: NetworkSpec,
78 ) -> Result<Network<T>, OrchestratorError> {
79 let global_timeout = network_spec.global_settings.network_spawn_timeout();
80 let res = timeout(
81 Duration::from_secs(global_timeout as u64),
82 self.spawn_inner(network_spec),
83 )
84 .await
85 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86 res?
87 }
88
89 async fn spawn_inner(
90 &self,
91 mut network_spec: NetworkSpec,
92 ) -> Result<Network<T>, OrchestratorError> {
93 debug!(network_spec = ?network_spec,"Network spec to spawn");
95
96 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
98 .map_err(|err| {
99 OrchestratorError::InvalidConfigForProvider(
100 self.provider.name().into(),
101 err.to_string(),
102 )
103 })?;
104
105 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
107 self.provider
108 .create_namespace_with_base_dir(base_dir)
109 .await?
110 } else {
111 self.provider.create_namespace().await?
112 };
113
114 info!("🧰 ns: {}", ns.name());
115 info!("🧰 base_dir: {:?}", ns.base_dir());
116
117 network_spec
118 .populate_nodes_available_args(ns.clone())
119 .await?;
120
121 let base_dir = ns.base_dir().to_string_lossy();
122 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
123 network_spec
125 .relaychain
126 .chain_spec
127 .build(&ns, &scoped_fs)
128 .await?;
129
130 debug!("relaychain spec built!");
131 let relay_chain_id = network_spec
133 .relaychain
134 .chain_spec
135 .read_chain_id(&scoped_fs)
136 .await?;
137
138 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
139 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
140 network_spec
141 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
142 .await?;
143
144 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
146 Vec<&ParachainSpec>,
147 Vec<&ParachainSpec>,
148 ) = network_spec
149 .parachains
150 .iter()
151 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
152 .partition(|para| {
153 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
154 });
155
156 let mut para_artifacts = vec![];
157 for para in para_to_register_in_genesis {
158 let genesis_config = para.get_genesis_config()?;
159 para_artifacts.push(genesis_config)
160 }
161
162 network_spec
164 .relaychain
165 .chain_spec
166 .customize_relay(
167 &network_spec.relaychain,
168 &network_spec.hrmp_channels,
169 para_artifacts,
170 &scoped_fs,
171 )
172 .await?;
173
174 network_spec
176 .relaychain
177 .chain_spec
178 .build_raw(&ns, &scoped_fs)
179 .await?;
180
181 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
183 network_spec
184 .relaychain
185 .chain_spec
186 .override_code(&scoped_fs, wasm_override)
187 .await?;
188 }
189
190 let (bootnodes, relaynodes) =
191 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
192
193 let mut ctx = SpawnNodeCtx {
195 chain_id: &relay_chain_id,
196 parachain_id: None,
197 chain: relay_chain_name.as_str(),
198 role: ZombieRole::Node,
199 ns: &ns,
200 scoped_fs: &scoped_fs,
201 parachain: None,
202 bootnodes_addr: &vec![],
203 wait_ready: false,
204 nodes_by_name: json!({}),
205 };
206
207 let global_files_to_inject = vec![TransferedFile::new(
208 PathBuf::from(format!(
209 "{}/{relay_chain_name}.json",
210 ns.base_dir().to_string_lossy()
211 )),
212 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
213 )];
214
215 let r = Relaychain::new(
216 relay_chain_name.to_string(),
217 relay_chain_id.clone(),
218 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
219 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
220 )?),
221 );
222 let mut network =
223 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
224
225 let spawning_tasks = bootnodes
226 .iter()
227 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
228
229 let mut node_ws_url: String = "".to_string();
231
232 let mut bootnodes_addr: Vec<String> = vec![];
234 for node in futures::future::try_join_all(spawning_tasks).await? {
235 let bootnode_multiaddr = node.multiaddr();
236
237 bootnodes_addr.push(bootnode_multiaddr.to_string());
238
239 if node_ws_url.is_empty() {
241 node_ws_url.clone_from(&node.ws_uri)
242 }
243
244 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
246 network.add_running_node(node, None);
247 }
248
249 network_spec
251 .relaychain
252 .chain_spec
253 .add_bootnodes(&scoped_fs, &bootnodes_addr)
254 .await?;
255
256 ctx.bootnodes_addr = &bootnodes_addr;
257
258 let spawning_tasks = relaynodes
260 .iter()
261 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
262
263 for node in futures::future::try_join_all(spawning_tasks).await? {
264 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
266 network.add_running_node(node, None);
267 }
268
269 for para in network_spec.parachains.iter() {
271 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
273 let parachain_id = parachain.chain_id.clone();
274
275 let (bootnodes, collators) =
276 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
277
278 let mut ctx_para = SpawnNodeCtx {
280 parachain: Some(para),
281 parachain_id: parachain_id.as_deref(),
282 role: if para.is_cumulus_based {
283 ZombieRole::CumulusCollator
284 } else {
285 ZombieRole::Collator
286 },
287 bootnodes_addr: &vec![],
288 ..ctx.clone()
289 };
290
291 let spawning_tasks = bootnodes.iter().map(|node| {
292 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
293 });
294
295 let mut bootnodes_addr: Vec<String> = vec![];
297 let mut running_nodes: Vec<NetworkNode> = vec![];
298 for node in futures::future::try_join_all(spawning_tasks).await? {
299 let bootnode_multiaddr = node.multiaddr();
300
301 bootnodes_addr.push(bootnode_multiaddr.to_string());
302 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
303 running_nodes.push(node);
304 }
305
306 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
307 para_chain_spec
308 .add_bootnodes(&scoped_fs, &bootnodes_addr)
309 .await?;
310 }
311
312 ctx_para.bootnodes_addr = &bootnodes_addr;
313
314 let spawning_tasks = collators.iter().map(|node| {
316 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
317 });
318
319 running_nodes.extend_from_slice(
321 futures::future::try_join_all(spawning_tasks)
322 .await?
323 .as_slice(),
324 );
325
326 let running_para_id = parachain.para_id;
327 network.add_para(parachain);
328 for node in running_nodes {
329 network.add_running_node(node, Some(running_para_id));
330 }
331 }
332
333 for para in para_to_register_with_extrinsic {
341 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
342 id: para.id,
343 wasm_path: para
345 .genesis_wasm
346 .artifact_path()
347 .ok_or(OrchestratorError::InvariantError(
348 "artifact path for wasm must be set at this point",
349 ))?
350 .to_path_buf(),
351 state_path: para
352 .genesis_state
353 .artifact_path()
354 .ok_or(OrchestratorError::InvariantError(
355 "artifact path for state must be set at this point",
356 ))?
357 .to_path_buf(),
358 node_ws_url: node_ws_url.clone(),
359 onboard_as_para: para.onboard_as_parachain,
360 seed: None, finalization: false,
362 };
363
364 Parachain::register(register_para_options, &scoped_fs).await?;
365 }
366
367 let mut zombie_json = serde_json::to_value(&network)?;
369 zombie_json["local_base_dir"] = serde_json::value::Value::String(base_dir.to_string());
370
371 scoped_fs
372 .write("zombie.json", serde_json::to_string_pretty(&zombie_json)?)
373 .await?;
374 Ok(network)
375 }
376}
377
378fn split_nodes_by_bootnodes(
383 nodes: &[NodeSpec],
384 no_default_bootnodes: bool,
385) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
386 let mut bootnodes = vec![];
388 let mut other_nodes = vec![];
389 nodes.iter().for_each(|node| {
390 if node.is_bootnode {
391 bootnodes.push(node)
392 } else {
393 other_nodes.push(node)
394 }
395 });
396
397 if bootnodes.is_empty() && !no_default_bootnodes {
398 bootnodes.push(other_nodes.remove(0))
399 }
400
401 (bootnodes, other_nodes)
402}
403
404fn generate_bootnode_addr(
406 node: &NetworkNode,
407 ip: &IpAddr,
408 port: u16,
409) -> Result<String, GeneratorError> {
410 generators::generate_node_bootnode_addr(
411 &node.spec.peer_id,
412 ip,
413 port,
414 node.inner.args().as_ref(),
415 &node.spec.p2p_cert_hash,
416 )
417}
418fn validate_spec_with_provider_capabilities(
420 network_spec: &NetworkSpec,
421 capabilities: &ProviderCapabilities,
422) -> Result<(), anyhow::Error> {
423 let mut errs: Vec<String> = vec![];
424
425 if capabilities.requires_image {
426 if network_spec.relaychain.default_image.is_none() {
428 let nodes = &network_spec.relaychain.nodes;
430 if nodes.iter().any(|node| node.image.is_none()) {
431 errs.push(String::from(
432 "Missing image for node, and not default is set at relaychain",
433 ));
434 }
435 };
436
437 for para in &network_spec.parachains {
439 if para.default_image.is_none() {
440 let nodes = ¶.collators;
441 if nodes.iter().any(|node| node.image.is_none()) {
442 errs.push(format!(
443 "Missing image for node, and not default is set at parachain {}",
444 para.id
445 ));
446 }
447 }
448 }
449 } else {
450 let mut cmds: HashSet<&str> = Default::default();
453 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
454 cmds.insert(cmd.as_str());
455 }
456 for node in network_spec.relaychain().nodes.iter() {
457 cmds.insert(node.command());
458 }
459
460 for para in &network_spec.parachains {
462 if let Some(cmd) = para.default_command.as_ref() {
463 cmds.insert(cmd.as_str());
464 }
465
466 for node in para.collators.iter() {
467 cmds.insert(node.command());
468 }
469 }
470
471 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
474 let parts: Vec<_> = path.split(":").collect();
475 for cmd in cmds {
476 let missing = if cmd.contains('/') {
477 trace!("checking {cmd}");
478 if std::fs::metadata(cmd).is_err() {
479 true
480 } else {
481 info!("🔎 We will use the full path {cmd} to spawn nodes.");
482 false
483 }
484 } else {
485 !parts.iter().any(|part| {
487 let path_to = format!("{}/{}", part, cmd);
488 trace!("checking {path_to}");
489 let check_result = std::fs::metadata(&path_to);
490 trace!("result {:?}", check_result);
491 if check_result.is_ok() {
492 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
493 true
494 } else {
495 false
496 }
497 })
498 };
499
500 if missing {
501 errs.push(help_msg(cmd));
502 }
503 }
504 }
505
506 if !errs.is_empty() {
507 let msg = errs.join("\n");
508 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
509 }
510
511 Ok(())
512}
513
514fn help_msg(cmd: &str) -> String {
515 match cmd {
516 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
517 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
518 },
519 "polkadot" => {
520 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
521 },
522 "polkadot-parachain" => {
523 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
524 },
525 _ => {
526 format!("Missing binary {cmd}, please compile it.")
527 },
528 }
529}
530
531#[derive(Clone, Debug)]
539pub struct ScopedFilesystem<'a, FS: FileSystem> {
540 fs: &'a FS,
541 base_dir: &'a str,
542}
543
544impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
545 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
546 Self { fs, base_dir }
547 }
548
549 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
550 for file in files {
551 let full_remote_path = PathBuf::from(format!(
552 "{}/{}",
553 self.base_dir,
554 file.remote_path.to_string_lossy()
555 ));
556 trace!("coping file: {file}");
557 self.fs
558 .copy(file.local_path.as_path(), full_remote_path)
559 .await?;
560 }
561 Ok(())
562 }
563
564 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
565 let file = file.as_ref();
566
567 let full_path = if file.is_absolute() {
568 file.to_owned()
569 } else {
570 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
571 };
572 let content = self.fs.read_to_string(full_path).await?;
573 Ok(content)
574 }
575
576 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
577 let path = PathBuf::from(format!(
578 "{}/{}",
579 self.base_dir,
580 path.as_ref().to_string_lossy()
581 ));
582 self.fs.create_dir(path).await
583 }
584
585 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
586 let path = PathBuf::from(format!(
587 "{}/{}",
588 self.base_dir,
589 path.as_ref().to_string_lossy()
590 ));
591 self.fs.create_dir_all(path).await
592 }
593
594 async fn write(
595 &self,
596 path: impl AsRef<Path>,
597 contents: impl AsRef<[u8]> + Send,
598 ) -> Result<(), FileSystemError> {
599 let path = path.as_ref();
600
601 let full_path = if path.is_absolute() {
602 path.to_owned()
603 } else {
604 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
605 };
606
607 self.fs.write(full_path, contents).await
608 }
609}
610
611#[derive(Clone, Debug)]
612pub enum ZombieRole {
613 Temp,
614 Node,
615 Bootnode,
616 Collator,
617 CumulusCollator,
618 Companion,
619}
620
621pub use network::{AddCollatorOptions, AddNodeOptions};
623pub use network_helper::metrics;
624#[cfg(feature = "pjs")]
625pub use pjs_helper::PjsResult;
626
627#[cfg(test)]
628mod tests {
629 use configuration::NetworkConfigBuilder;
630
631 use super::*;
632
633 fn generate(
634 with_image: bool,
635 with_cmd: Option<&'static str>,
636 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
637 NetworkConfigBuilder::new()
638 .with_relaychain(|r| {
639 let mut relay = r
640 .with_chain("rococo-local")
641 .with_default_command(with_cmd.unwrap_or("polkadot"));
642 if with_image {
643 relay = relay.with_default_image("docker.io/parity/polkadot")
644 }
645
646 relay
647 .with_node(|node| node.with_name("alice"))
648 .with_node(|node| node.with_name("bob"))
649 })
650 .with_parachain(|p| {
651 p.with_id(2000).cumulus_based(true).with_collator(|n| {
652 let node = n
653 .with_name("collator")
654 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
655 if with_image {
656 node.with_image("docker.io/paritypr/test-parachain")
657 } else {
658 node
659 }
660 })
661 })
662 .build()
663 }
664
665 #[tokio::test]
666 async fn valid_config_with_image() {
667 let network_config = generate(true, None).unwrap();
668 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
669 let caps = ProviderCapabilities {
670 requires_image: true,
671 has_resources: false,
672 prefix_with_full_path: false,
673 use_default_ports_in_cmd: false,
674 };
675
676 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
677 assert!(valid.is_ok())
678 }
679
680 #[tokio::test]
681 async fn invalid_config_without_image() {
682 let network_config = generate(false, None).unwrap();
683 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
684 let caps = ProviderCapabilities {
685 requires_image: true,
686 has_resources: false,
687 prefix_with_full_path: false,
688 use_default_ports_in_cmd: false,
689 };
690
691 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
692 assert!(valid.is_err())
693 }
694
695 #[tokio::test]
696 async fn invalid_config_missing_cmd() {
697 let network_config = generate(false, Some("other")).unwrap();
698 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
699 let caps = ProviderCapabilities {
700 requires_image: false,
701 has_resources: false,
702 prefix_with_full_path: false,
703 use_default_ports_in_cmd: false,
704 };
705
706 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
707 assert!(valid.is_err())
708 }
709
710 #[tokio::test]
711 async fn valid_config_present_cmd() {
712 let network_config = generate(false, Some("cargo")).unwrap();
713 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
714 let caps = ProviderCapabilities {
715 requires_image: false,
716 has_resources: false,
717 prefix_with_full_path: false,
718 use_default_ports_in_cmd: false,
719 };
720
721 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
722 println!("{:?}", valid);
723 assert!(valid.is_ok())
724 }
725}