1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{
7 cell::RefCell,
8 collections::HashMap,
9 path::PathBuf,
10 rc::Rc,
11 sync::Arc,
12 time::{Duration, SystemTime},
13};
14
15use configuration::{
16 para_states::{Initial, Running},
17 shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
18 types::{Arg, Command, Image, Port, ValidationContext},
19 ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
20};
21use provider::{types::TransferedFile, DynNamespace, ProviderError};
22use serde::Serialize;
23use support::fs::FileSystem;
24use tokio::sync::RwLock;
25use tracing::{error, warn};
26
27use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
28use crate::{
29 generators::chain_spec::ChainSpec,
30 network_spec::{self, NetworkSpec},
31 shared::{
32 constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
33 macros,
34 types::{ChainDefaultContext, RegisterParachainOptions},
35 },
36 spawner::{self, SpawnNodeCtx},
37 utils::write_zombie_json,
38 ScopedFilesystem, ZombieRole,
39};
40
41#[derive(Serialize)]
42pub struct Network<T: FileSystem> {
43 #[serde(skip)]
44 ns: DynNamespace,
45 #[serde(skip)]
46 filesystem: T,
47 relay: Relaychain,
48 initial_spec: NetworkSpec,
49 parachains: HashMap<u32, Vec<Parachain>>,
50 #[serde(skip)]
51 nodes_by_name: HashMap<String, NetworkNode>,
52 #[serde(skip)]
53 nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 start_time_ts: Option<String>,
56}
57
58impl<T: FileSystem> std::fmt::Debug for Network<T> {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("Network")
61 .field("ns", &"ns_skipped")
62 .field("relay", &self.relay)
63 .field("initial_spec", &self.initial_spec)
64 .field("parachains", &self.parachains)
65 .field("nodes_by_name", &self.nodes_by_name)
66 .finish()
67 }
68}
69
70macros::create_add_options!(AddNodeOptions {
71 chain_spec: Option<PathBuf>,
72 override_eth_key: Option<String>
73});
74
75macros::create_add_options!(AddCollatorOptions {
76 chain_spec: Option<PathBuf>,
77 chain_spec_relay: Option<PathBuf>,
78 override_eth_key: Option<String>
79});
80
81impl<T: FileSystem> Network<T> {
82 pub(crate) fn new_with_relay(
83 relay: Relaychain,
84 ns: DynNamespace,
85 fs: T,
86 initial_spec: NetworkSpec,
87 ) -> Self {
88 Self {
89 ns,
90 filesystem: fs,
91 relay,
92 initial_spec,
93 parachains: Default::default(),
94 nodes_by_name: Default::default(),
95 nodes_to_watch: Default::default(),
96 start_time_ts: Default::default(),
97 }
98 }
99
100 pub fn ns_name(&self) -> String {
102 self.ns.name().to_string()
103 }
104
105 pub fn base_dir(&self) -> Option<&str> {
106 self.ns.base_dir().to_str()
107 }
108
109 pub fn relaychain(&self) -> &Relaychain {
110 &self.relay
111 }
112
113 pub async fn destroy(self) -> Result<(), ProviderError> {
115 self.ns.destroy().await
116 }
117
118 pub async fn add_node(
144 &mut self,
145 name: impl Into<String>,
146 options: AddNodeOptions,
147 ) -> Result<(), anyhow::Error> {
148 let name = generate_unique_node_name_from_names(
149 name,
150 &mut self.nodes_by_name.keys().cloned().collect(),
151 );
152
153 let relaychain = self.relaychain();
154
155 let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
156 chain_spec_custom_path.clone()
157 } else {
158 PathBuf::from(format!(
159 "{}/{}.json",
160 self.ns.base_dir().to_string_lossy(),
161 relaychain.chain
162 ))
163 };
164
165 let chain_context = ChainDefaultContext {
166 default_command: self.initial_spec.relaychain.default_command.as_ref(),
167 default_image: self.initial_spec.relaychain.default_image.as_ref(),
168 default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
169 default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
170 default_args: self.initial_spec.relaychain.default_args.iter().collect(),
171 };
172
173 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
174 &name,
175 options.into(),
176 &chain_context,
177 false,
178 false,
179 )?;
180
181 node_spec.available_args_output = Some(
182 self.initial_spec
183 .node_available_args_output(&node_spec, self.ns.clone())
184 .await?,
185 );
186
187 let base_dir = self.ns.base_dir().to_string_lossy();
188 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
189
190 let ctx = SpawnNodeCtx {
191 chain_id: &relaychain.chain_id,
192 parachain_id: None,
193 chain: &relaychain.chain,
194 role: ZombieRole::Node,
195 ns: &self.ns,
196 scoped_fs: &scoped_fs,
197 parachain: None,
198 bootnodes_addr: &vec![],
199 wait_ready: true,
200 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
201 global_settings: &self.initial_spec.global_settings,
202 };
203
204 let global_files_to_inject = vec![TransferedFile::new(
205 chain_spec_path,
206 PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
207 )];
208
209 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
210
211 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
223 .await?;
224
225 self.add_running_node(node.clone(), None).await;
227
228 self.write_zombie_json().await?;
230
231 Ok(())
232 }
233
234 pub async fn add_collator(
261 &mut self,
262 name: impl Into<String>,
263 options: AddCollatorOptions,
264 para_id: u32,
265 ) -> Result<(), anyhow::Error> {
266 let name = generate_unique_node_name_from_names(
267 name,
268 &mut self.nodes_by_name.keys().cloned().collect(),
269 );
270 let spec = self
271 .initial_spec
272 .parachains
273 .iter()
274 .find(|para| para.id == para_id)
275 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
276 let role = if spec.is_cumulus_based {
277 ZombieRole::CumulusCollator
278 } else {
279 ZombieRole::Collator
280 };
281 let chain_context = ChainDefaultContext {
282 default_command: spec.default_command.as_ref(),
283 default_image: spec.default_image.as_ref(),
284 default_resources: spec.default_resources.as_ref(),
285 default_db_snapshot: spec.default_db_snapshot.as_ref(),
286 default_args: spec.default_args.iter().collect(),
287 };
288
289 let parachain = self
290 .parachains
291 .get_mut(¶_id)
292 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
293 .get_mut(0)
294 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
295
296 let base_dir = self.ns.base_dir().to_string_lossy();
297 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
298
299 let ctx = SpawnNodeCtx {
301 chain_id: &self.relay.chain_id,
302 parachain_id: parachain.chain_id.as_deref(),
303 chain: &self.relay.chain,
304 role,
305 ns: &self.ns,
306 scoped_fs: &scoped_fs,
307 parachain: Some(spec),
308 bootnodes_addr: &vec![],
309 wait_ready: true,
310 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
311 global_settings: &self.initial_spec.global_settings,
312 };
313
314 let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
315 chain_spec_custom_path.clone()
316 } else {
317 PathBuf::from(format!(
318 "{}/{}.json",
319 self.ns.base_dir().to_string_lossy(),
320 self.relay.chain
321 ))
322 };
323
324 let mut global_files_to_inject = vec![TransferedFile::new(
325 relaychain_spec_path,
326 PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
327 )];
328
329 let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
330 Some(para_chain_spec_custom.clone())
331 } else if let Some(para_spec_path) = ¶chain.chain_spec_path {
332 Some(PathBuf::from(format!(
333 "{}/{}",
334 self.ns.base_dir().to_string_lossy(),
335 para_spec_path.to_string_lossy()
336 )))
337 } else {
338 None
339 };
340
341 if let Some(para_spec_path) = para_chain_spec_local_path {
342 global_files_to_inject.push(TransferedFile::new(
343 para_spec_path,
344 PathBuf::from(format!("/cfg/{para_id}.json")),
345 ));
346 }
347
348 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
349 name,
350 options.into(),
351 &chain_context,
352 true,
353 spec.is_evm_based,
354 )?;
355
356 node_spec.available_args_output = Some(
357 self.initial_spec
358 .node_available_args_output(&node_spec, self.ns.clone())
359 .await?,
360 );
361
362 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
363
364 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
366 .await?;
367
368 parachain.collators.push(node.clone());
369 self.add_running_node(node, None).await;
370
371 self.write_zombie_json().await?;
373
374 Ok(())
375 }
376
377 pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
382 let used_ports = self
383 .nodes_iter()
384 .map(|node| node.spec())
385 .flat_map(|spec| {
386 [
387 spec.ws_port.0,
388 spec.rpc_port.0,
389 spec.prometheus_port.0,
390 spec.p2p_port.0,
391 ]
392 })
393 .collect();
394
395 let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
396
397 let used_para_ids = self
399 .parachains
400 .iter()
401 .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
402 .collect();
403
404 let context = ValidationContext {
405 used_ports,
406 used_nodes_names,
407 used_para_ids,
408 };
409 let context = Rc::new(RefCell::new(context));
410
411 ParachainConfigBuilder::new_with_running(context)
412 }
413
414 pub async fn add_parachain(
448 &mut self,
449 para_config: &ParachainConfig,
450 custom_relaychain_spec: Option<PathBuf>,
451 custom_parchain_fs_prefix: Option<String>,
452 ) -> Result<(), anyhow::Error> {
453 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
454 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
455
456 let mut global_files_to_inject = vec![];
457
458 let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
460 global_files_to_inject.push(TransferedFile::new(
462 custom_path.clone(),
463 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
464 ));
465 let content = std::fs::read_to_string(custom_path)?;
466 ChainSpec::chain_id_from_spec(&content)?
467 } else {
468 global_files_to_inject.push(TransferedFile::new(
469 PathBuf::from(format!(
470 "{}/{}",
471 scoped_fs.base_dir,
472 self.relaychain().chain_spec_path.to_string_lossy()
473 )),
474 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
475 ));
476 self.relay.chain_id.clone()
477 };
478
479 let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
480 para_config,
481 relay_chain_id.as_str().try_into()?,
482 )?;
483
484 let chain_spec_raw_path = para_spec
485 .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
486 .await?;
487
488 let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
490 custom_prefix
491 } else {
492 para_spec.id.to_string()
493 };
494
495 scoped_fs.create_dir(¶_path_prefix).await?;
496 para_spec
498 .genesis_state
499 .build(
500 chain_spec_raw_path.as_ref(),
501 format!("{}/genesis-state", ¶_path_prefix),
502 &self.ns,
503 &scoped_fs,
504 None,
505 )
506 .await?;
507 para_spec
508 .genesis_wasm
509 .build(
510 chain_spec_raw_path.as_ref(),
511 format!("{}/para_spec-wasm", ¶_path_prefix),
512 &self.ns,
513 &scoped_fs,
514 None,
515 )
516 .await?;
517
518 let parachain =
519 Parachain::from_spec(¶_spec, &global_files_to_inject, &scoped_fs).await?;
520 let parachain_id = parachain.chain_id.clone();
521
522 let ctx_para = SpawnNodeCtx {
524 parachain: Some(¶_spec),
525 parachain_id: parachain_id.as_deref(),
526 role: if para_spec.is_cumulus_based {
527 ZombieRole::CumulusCollator
528 } else {
529 ZombieRole::Collator
530 },
531 bootnodes_addr: ¶_config
532 .bootnodes_addresses()
533 .iter()
534 .map(|&a| a.to_string())
535 .collect(),
536 chain_id: &self.relaychain().chain_id,
537 chain: &self.relaychain().chain,
538 ns: &self.ns,
539 scoped_fs: &scoped_fs,
540 wait_ready: false,
541 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
542 global_settings: &self.initial_spec.global_settings,
543 };
544
545 let first_node_url = self
547 .relaychain()
548 .nodes
549 .first()
550 .ok_or(anyhow::anyhow!(
551 "At least one node of the relaychain should be running"
552 ))?
553 .ws_uri();
554
555 if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
556 let register_para_options = RegisterParachainOptions {
557 id: parachain.para_id,
558 wasm_path: para_spec
560 .genesis_wasm
561 .artifact_path()
562 .ok_or(anyhow::anyhow!(
563 "artifact path for wasm must be set at this point",
564 ))?
565 .to_path_buf(),
566 state_path: para_spec
567 .genesis_state
568 .artifact_path()
569 .ok_or(anyhow::anyhow!(
570 "artifact path for state must be set at this point",
571 ))?
572 .to_path_buf(),
573 node_ws_url: first_node_url.to_string(),
574 onboard_as_para: para_spec.onboard_as_parachain,
575 seed: None, finalization: false,
577 };
578
579 Parachain::register(register_para_options, &scoped_fs).await?;
580 }
581
582 let spawning_tasks = para_spec
584 .collators
585 .iter()
586 .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
587
588 let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
589
590 let waiting_tasks = running_nodes.iter().map(|node| {
592 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
593 });
594
595 let _ = futures::future::try_join_all(waiting_tasks).await?;
596
597 let running_para_id = parachain.para_id;
598 self.add_para(parachain);
599 for node in running_nodes {
600 self.add_running_node(node, Some(running_para_id)).await;
601 }
602
603 self.write_zombie_json().await?;
605
606 Ok(())
607 }
608
609 pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
648 let para = self
649 .initial_spec
650 .parachains
651 .iter()
652 .find(|p| p.id == para_id)
653 .ok_or(anyhow::anyhow!(
654 "no parachain with id = {para_id} available",
655 ))?;
656 let para_genesis_config = para.get_genesis_config()?;
657 let first_node_url = self
658 .relaychain()
659 .nodes
660 .first()
661 .ok_or(anyhow::anyhow!(
662 "At least one node of the relaychain should be running"
663 ))?
664 .ws_uri();
665 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
666 id: para_id,
667 wasm_path: para_genesis_config.wasm_path.clone(),
669 state_path: para_genesis_config.state_path.clone(),
670 node_ws_url: first_node_url.to_string(),
671 onboard_as_para: para_genesis_config.as_parachain,
672 seed: None, finalization: false,
674 };
675 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
676 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
677 Parachain::register(register_para_options, &scoped_fs).await?;
678
679 Ok(())
680 }
681
682 pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
686 let name = name.into();
687 if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
688 return Ok(node);
689 }
690
691 let list = self
692 .nodes_iter()
693 .map(|n| &n.name)
694 .cloned()
695 .collect::<Vec<_>>()
696 .join(", ");
697
698 Err(anyhow::anyhow!(
699 "can't find node with name: {name:?}, should be one of {list}"
700 ))
701 }
702
703 pub fn get_node_mut(
704 &mut self,
705 name: impl Into<String>,
706 ) -> Result<&mut NetworkNode, anyhow::Error> {
707 let name = name.into();
708 self.nodes_iter_mut()
709 .find(|n| n.name == name)
710 .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
711 }
712
713 pub fn nodes(&self) -> Vec<&NetworkNode> {
714 self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
715 }
716
717 pub async fn detach(&self) {
718 self.ns.detach().await
719 }
720
721 pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
723 if let Some(para_id) = para_id {
724 if let Some(para) = self.parachains.get_mut(¶_id).and_then(|p| p.get_mut(0)) {
725 para.collators.push(node.clone());
726 } else {
727 unreachable!()
729 }
730 } else {
731 self.relay.nodes.push(node.clone());
732 }
733 node.set_is_running(true);
735 let node_name = node.name.clone();
736 self.nodes_by_name.insert(node_name, node.clone());
737 self.nodes_to_watch.write().await.push(node);
738 }
739
740 pub(crate) fn add_para(&mut self, para: Parachain) {
741 self.parachains.entry(para.para_id).or_default().push(para);
742 }
743
744 pub(crate) async fn write_zombie_json(&self) -> Result<(), anyhow::Error> {
745 let base_dir = self.ns.base_dir().to_string_lossy();
746 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
747 let ns_name = self.ns.name();
748
749 write_zombie_json(serde_json::to_value(self)?, scoped_fs, ns_name).await?;
750 Ok(())
751 }
752
753 pub fn name(&self) -> &str {
754 self.ns.name()
755 }
756
757 pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
767 self.parachains.get(¶_id)?.first()
768 }
769
770 pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
778 self.parachains
779 .values()
780 .flat_map(|p| p.iter())
781 .find(|p| p.unique_id == unique_id.as_ref())
782 }
783
784 pub fn parachains(&self) -> Vec<&Parachain> {
785 self.parachains.values().flatten().collect()
786 }
787
788 pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
789 self.relay.nodes.iter().chain(
790 self.parachains
791 .values()
792 .flat_map(|p| p.iter())
793 .flat_map(|p| &p.collators),
794 )
795 }
796
797 pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
798 self.relay.nodes.iter_mut().chain(
799 self.parachains
800 .values_mut()
801 .flat_map(|p| p.iter_mut())
802 .flat_map(|p| &mut p.collators),
803 )
804 }
805
806 pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
816 let handles = self
817 .nodes_iter()
818 .map(|node| node.wait_until_is_up(timeout_secs));
819
820 futures::future::try_join_all(handles).await?;
821
822 Ok(())
823 }
824
825 pub(crate) fn spawn_watching_task(&self) {
826 let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
827 let ns = Arc::clone(&self.ns);
828
829 tokio::spawn(async move {
830 loop {
831 tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
832
833 let all_running = {
834 let guard = nodes_to_watch.read().await;
835 let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
836
837 let all_running =
838 futures::future::try_join_all(nodes.iter().map(|n| {
839 n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
840 }))
841 .await;
842
843 if nodes.iter().any(|n| !n.is_running()) {
845 continue;
846 } else {
847 all_running
848 }
849 };
850
851 if let Err(e) = all_running {
852 warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
853
854 if let Err(e) = ns.destroy().await {
855 error!("an error occurred during network teardown: {}", e);
856 }
857
858 std::process::exit(1);
859 }
860 }
861 });
862 }
863
864 pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
865 self.parachains = parachains;
866 }
867
868 pub(crate) fn insert_node(&mut self, node: NetworkNode) {
869 self.nodes_by_name.insert(node.name.clone(), node);
870 }
871
872 pub(crate) fn set_start_time_ts(&mut self, start_time: SystemTime) {
873 if let Ok(start_time_ts) = start_time.duration_since(SystemTime::UNIX_EPOCH) {
874 self.start_time_ts = Some(start_time_ts.as_millis().to_string());
875 } else {
876 warn!("⚠️ Error getting start_time timestamp");
878 }
879 }
880}