1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration};
7
8use configuration::{
9 para_states::{Initial, Running},
10 shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
11 types::{Arg, Command, Image, Port, ValidationContext},
12 ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
13};
14use provider::{types::TransferedFile, DynNamespace, ProviderError};
15use serde::Serialize;
16use support::fs::FileSystem;
17use tokio::sync::RwLock;
18use tracing::{error, warn};
19
20use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
21use crate::{
22 generators::chain_spec::ChainSpec,
23 network_spec::{self, NetworkSpec},
24 shared::{
25 constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
26 macros,
27 types::{ChainDefaultContext, RegisterParachainOptions},
28 },
29 spawner::{self, SpawnNodeCtx},
30 ScopedFilesystem, ZombieRole,
31};
32
33#[derive(Serialize)]
34pub struct Network<T: FileSystem> {
35 #[serde(skip)]
36 ns: DynNamespace,
37 #[serde(skip)]
38 filesystem: T,
39 relay: Relaychain,
40 initial_spec: NetworkSpec,
41 parachains: HashMap<u32, Vec<Parachain>>,
42 #[serde(skip)]
43 nodes_by_name: HashMap<String, NetworkNode>,
44 #[serde(skip)]
45 nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
46}
47
48impl<T: FileSystem> std::fmt::Debug for Network<T> {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("Network")
51 .field("ns", &"ns_skipped")
52 .field("relay", &self.relay)
53 .field("initial_spec", &self.initial_spec)
54 .field("parachains", &self.parachains)
55 .field("nodes_by_name", &self.nodes_by_name)
56 .finish()
57 }
58}
59
60macros::create_add_options!(AddNodeOptions {
61 chain_spec: Option<PathBuf>,
62 override_eth_key: Option<String>
63});
64
65macros::create_add_options!(AddCollatorOptions {
66 chain_spec: Option<PathBuf>,
67 chain_spec_relay: Option<PathBuf>,
68 override_eth_key: Option<String>
69});
70
71impl<T: FileSystem> Network<T> {
72 pub(crate) fn new_with_relay(
73 relay: Relaychain,
74 ns: DynNamespace,
75 fs: T,
76 initial_spec: NetworkSpec,
77 ) -> Self {
78 Self {
79 ns,
80 filesystem: fs,
81 relay,
82 initial_spec,
83 parachains: Default::default(),
84 nodes_by_name: Default::default(),
85 nodes_to_watch: Default::default(),
86 }
87 }
88
89 pub fn ns_name(&self) -> String {
91 self.ns.name().to_string()
92 }
93
94 pub fn base_dir(&self) -> Option<&str> {
95 self.ns.base_dir().to_str()
96 }
97
98 pub fn relaychain(&self) -> &Relaychain {
99 &self.relay
100 }
101
102 pub async fn destroy(self) -> Result<(), ProviderError> {
104 self.ns.destroy().await
105 }
106
107 pub async fn add_node(
133 &mut self,
134 name: impl Into<String>,
135 options: AddNodeOptions,
136 ) -> Result<(), anyhow::Error> {
137 let name = generate_unique_node_name_from_names(
138 name,
139 &mut self.nodes_by_name.keys().cloned().collect(),
140 );
141
142 let relaychain = self.relaychain();
143
144 let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
145 chain_spec_custom_path.clone()
146 } else {
147 PathBuf::from(format!(
148 "{}/{}.json",
149 self.ns.base_dir().to_string_lossy(),
150 relaychain.chain
151 ))
152 };
153
154 let chain_context = ChainDefaultContext {
155 default_command: self.initial_spec.relaychain.default_command.as_ref(),
156 default_image: self.initial_spec.relaychain.default_image.as_ref(),
157 default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
158 default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
159 default_args: self.initial_spec.relaychain.default_args.iter().collect(),
160 };
161
162 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
163 &name,
164 options.into(),
165 &chain_context,
166 false,
167 false,
168 )?;
169
170 node_spec.available_args_output = Some(
171 self.initial_spec
172 .node_available_args_output(&node_spec, self.ns.clone())
173 .await?,
174 );
175
176 let base_dir = self.ns.base_dir().to_string_lossy();
177 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
178
179 let ctx = SpawnNodeCtx {
180 chain_id: &relaychain.chain_id,
181 parachain_id: None,
182 chain: &relaychain.chain,
183 role: ZombieRole::Node,
184 ns: &self.ns,
185 scoped_fs: &scoped_fs,
186 parachain: None,
187 bootnodes_addr: &vec![],
188 wait_ready: true,
189 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
190 global_settings: &self.initial_spec.global_settings,
191 };
192
193 let global_files_to_inject = vec![TransferedFile::new(
194 chain_spec_path,
195 PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
196 )];
197
198 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
199
200 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
212 .await?;
213
214 self.add_running_node(node.clone(), None).await;
216
217 Ok(())
218 }
219
220 pub async fn add_collator(
247 &mut self,
248 name: impl Into<String>,
249 options: AddCollatorOptions,
250 para_id: u32,
251 ) -> Result<(), anyhow::Error> {
252 let name = generate_unique_node_name_from_names(
253 name,
254 &mut self.nodes_by_name.keys().cloned().collect(),
255 );
256 let spec = self
257 .initial_spec
258 .parachains
259 .iter()
260 .find(|para| para.id == para_id)
261 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
262 let role = if spec.is_cumulus_based {
263 ZombieRole::CumulusCollator
264 } else {
265 ZombieRole::Collator
266 };
267 let chain_context = ChainDefaultContext {
268 default_command: spec.default_command.as_ref(),
269 default_image: spec.default_image.as_ref(),
270 default_resources: spec.default_resources.as_ref(),
271 default_db_snapshot: spec.default_db_snapshot.as_ref(),
272 default_args: spec.default_args.iter().collect(),
273 };
274
275 let parachain = self
276 .parachains
277 .get_mut(¶_id)
278 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
279 .get_mut(0)
280 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
281
282 let base_dir = self.ns.base_dir().to_string_lossy();
283 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
284
285 let ctx = SpawnNodeCtx {
287 chain_id: &self.relay.chain_id,
288 parachain_id: parachain.chain_id.as_deref(),
289 chain: &self.relay.chain,
290 role,
291 ns: &self.ns,
292 scoped_fs: &scoped_fs,
293 parachain: Some(spec),
294 bootnodes_addr: &vec![],
295 wait_ready: true,
296 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
297 global_settings: &self.initial_spec.global_settings,
298 };
299
300 let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
301 chain_spec_custom_path.clone()
302 } else {
303 PathBuf::from(format!(
304 "{}/{}.json",
305 self.ns.base_dir().to_string_lossy(),
306 self.relay.chain
307 ))
308 };
309
310 let mut global_files_to_inject = vec![TransferedFile::new(
311 relaychain_spec_path,
312 PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
313 )];
314
315 let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
316 Some(para_chain_spec_custom.clone())
317 } else if let Some(para_spec_path) = ¶chain.chain_spec_path {
318 Some(PathBuf::from(format!(
319 "{}/{}",
320 self.ns.base_dir().to_string_lossy(),
321 para_spec_path.to_string_lossy()
322 )))
323 } else {
324 None
325 };
326
327 if let Some(para_spec_path) = para_chain_spec_local_path {
328 global_files_to_inject.push(TransferedFile::new(
329 para_spec_path,
330 PathBuf::from(format!("/cfg/{para_id}.json")),
331 ));
332 }
333
334 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
335 name,
336 options.into(),
337 &chain_context,
338 true,
339 spec.is_evm_based,
340 )?;
341
342 node_spec.available_args_output = Some(
343 self.initial_spec
344 .node_available_args_output(&node_spec, self.ns.clone())
345 .await?,
346 );
347
348 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
349
350 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
352 .await?;
353
354 parachain.collators.push(node.clone());
355 self.add_running_node(node, None).await;
356
357 Ok(())
358 }
359
360 pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
365 let used_ports = self
366 .nodes_iter()
367 .map(|node| node.spec())
368 .flat_map(|spec| {
369 [
370 spec.ws_port.0,
371 spec.rpc_port.0,
372 spec.prometheus_port.0,
373 spec.p2p_port.0,
374 ]
375 })
376 .collect();
377
378 let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
379
380 let used_para_ids = self
382 .parachains
383 .iter()
384 .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
385 .collect();
386
387 let context = ValidationContext {
388 used_ports,
389 used_nodes_names,
390 used_para_ids,
391 };
392 let context = Rc::new(RefCell::new(context));
393
394 ParachainConfigBuilder::new_with_running(context)
395 }
396
397 pub async fn add_parachain(
431 &mut self,
432 para_config: &ParachainConfig,
433 custom_relaychain_spec: Option<PathBuf>,
434 custom_parchain_fs_prefix: Option<String>,
435 ) -> Result<(), anyhow::Error> {
436 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
437 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
438
439 let mut global_files_to_inject = vec![];
440
441 let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
443 global_files_to_inject.push(TransferedFile::new(
445 custom_path.clone(),
446 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
447 ));
448 let content = std::fs::read_to_string(custom_path)?;
449 ChainSpec::chain_id_from_spec(&content)?
450 } else {
451 global_files_to_inject.push(TransferedFile::new(
452 PathBuf::from(format!(
453 "{}/{}",
454 scoped_fs.base_dir,
455 self.relaychain().chain_spec_path.to_string_lossy()
456 )),
457 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
458 ));
459 self.relay.chain_id.clone()
460 };
461
462 let mut para_spec = network_spec::parachain::ParachainSpec::from_config(
463 para_config,
464 relay_chain_id.as_str().try_into()?,
465 )?;
466
467 let chain_spec_raw_path = para_spec
468 .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
469 .await?;
470
471 let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
473 custom_prefix
474 } else {
475 para_spec.id.to_string()
476 };
477
478 scoped_fs.create_dir(¶_path_prefix).await?;
479 para_spec
481 .genesis_state
482 .build(
483 chain_spec_raw_path.as_ref(),
484 format!("{}/genesis-state", ¶_path_prefix),
485 &self.ns,
486 &scoped_fs,
487 None,
488 )
489 .await?;
490 para_spec
491 .genesis_wasm
492 .build(
493 chain_spec_raw_path.as_ref(),
494 format!("{}/para_spec-wasm", ¶_path_prefix),
495 &self.ns,
496 &scoped_fs,
497 None,
498 )
499 .await?;
500
501 let parachain =
502 Parachain::from_spec(¶_spec, &global_files_to_inject, &scoped_fs).await?;
503 let parachain_id = parachain.chain_id.clone();
504
505 let ctx_para = SpawnNodeCtx {
507 parachain: Some(¶_spec),
508 parachain_id: parachain_id.as_deref(),
509 role: if para_spec.is_cumulus_based {
510 ZombieRole::CumulusCollator
511 } else {
512 ZombieRole::Collator
513 },
514 bootnodes_addr: ¶_config
515 .bootnodes_addresses()
516 .iter()
517 .map(|&a| a.to_string())
518 .collect(),
519 chain_id: &self.relaychain().chain_id,
520 chain: &self.relaychain().chain,
521 ns: &self.ns,
522 scoped_fs: &scoped_fs,
523 wait_ready: false,
524 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
525 global_settings: &self.initial_spec.global_settings,
526 };
527
528 let first_node_url = self
530 .relaychain()
531 .nodes
532 .first()
533 .ok_or(anyhow::anyhow!(
534 "At least one node of the relaychain should be running"
535 ))?
536 .ws_uri();
537
538 if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
539 let register_para_options = RegisterParachainOptions {
540 id: parachain.para_id,
541 wasm_path: para_spec
543 .genesis_wasm
544 .artifact_path()
545 .ok_or(anyhow::anyhow!(
546 "artifact path for wasm must be set at this point",
547 ))?
548 .to_path_buf(),
549 state_path: para_spec
550 .genesis_state
551 .artifact_path()
552 .ok_or(anyhow::anyhow!(
553 "artifact path for state must be set at this point",
554 ))?
555 .to_path_buf(),
556 node_ws_url: first_node_url.to_string(),
557 onboard_as_para: para_spec.onboard_as_parachain,
558 seed: None, finalization: false,
560 };
561
562 Parachain::register(register_para_options, &scoped_fs).await?;
563 }
564
565 let spawning_tasks = para_spec
567 .collators
568 .iter()
569 .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
570
571 let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
572
573 let waiting_tasks = running_nodes.iter().map(|node| {
575 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
576 });
577
578 let _ = futures::future::try_join_all(waiting_tasks).await?;
579
580 let running_para_id = parachain.para_id;
581 self.add_para(parachain);
582 for node in running_nodes {
583 self.add_running_node(node, Some(running_para_id)).await;
584 }
585
586 Ok(())
587 }
588
589 pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
628 let para = self
629 .initial_spec
630 .parachains
631 .iter()
632 .find(|p| p.id == para_id)
633 .ok_or(anyhow::anyhow!(
634 "no parachain with id = {para_id} available",
635 ))?;
636 let para_genesis_config = para.get_genesis_config()?;
637 let first_node_url = self
638 .relaychain()
639 .nodes
640 .first()
641 .ok_or(anyhow::anyhow!(
642 "At least one node of the relaychain should be running"
643 ))?
644 .ws_uri();
645 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
646 id: para_id,
647 wasm_path: para_genesis_config.wasm_path.clone(),
649 state_path: para_genesis_config.state_path.clone(),
650 node_ws_url: first_node_url.to_string(),
651 onboard_as_para: para_genesis_config.as_parachain,
652 seed: None, finalization: false,
654 };
655 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
656 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
657 Parachain::register(register_para_options, &scoped_fs).await?;
658
659 Ok(())
660 }
661
662 pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
666 let name = name.into();
667 if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
668 return Ok(node);
669 }
670
671 let list = self
672 .nodes_iter()
673 .map(|n| &n.name)
674 .cloned()
675 .collect::<Vec<_>>()
676 .join(", ");
677
678 Err(anyhow::anyhow!(
679 "can't find node with name: {name:?}, should be one of {list}"
680 ))
681 }
682
683 pub fn get_node_mut(
684 &mut self,
685 name: impl Into<String>,
686 ) -> Result<&mut NetworkNode, anyhow::Error> {
687 let name = name.into();
688 self.nodes_iter_mut()
689 .find(|n| n.name == name)
690 .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
691 }
692
693 pub fn nodes(&self) -> Vec<&NetworkNode> {
694 self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
695 }
696
697 pub async fn detach(&self) {
698 self.ns.detach().await
699 }
700
701 pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
703 if let Some(para_id) = para_id {
704 if let Some(para) = self.parachains.get_mut(¶_id).and_then(|p| p.get_mut(0)) {
705 para.collators.push(node.clone());
706 } else {
707 unreachable!()
709 }
710 } else {
711 self.relay.nodes.push(node.clone());
712 }
713 node.set_is_running(true);
715 let node_name = node.name.clone();
716 self.nodes_by_name.insert(node_name, node.clone());
717 self.nodes_to_watch.write().await.push(node);
718 }
719
720 pub(crate) fn add_para(&mut self, para: Parachain) {
721 self.parachains.entry(para.para_id).or_default().push(para);
722 }
723
724 pub fn name(&self) -> &str {
725 self.ns.name()
726 }
727
728 pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
738 self.parachains.get(¶_id)?.first()
739 }
740
741 pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
749 self.parachains
750 .values()
751 .flat_map(|p| p.iter())
752 .find(|p| p.unique_id == unique_id.as_ref())
753 }
754
755 pub fn parachains(&self) -> Vec<&Parachain> {
756 self.parachains.values().flatten().collect()
757 }
758
759 pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
760 self.relay.nodes.iter().chain(
761 self.parachains
762 .values()
763 .flat_map(|p| p.iter())
764 .flat_map(|p| &p.collators),
765 )
766 }
767
768 pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
769 self.relay.nodes.iter_mut().chain(
770 self.parachains
771 .values_mut()
772 .flat_map(|p| p.iter_mut())
773 .flat_map(|p| &mut p.collators),
774 )
775 }
776
777 pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
787 let handles = self
788 .nodes_iter()
789 .map(|node| node.wait_until_is_up(timeout_secs));
790
791 futures::future::try_join_all(handles).await?;
792
793 Ok(())
794 }
795
796 pub(crate) fn spawn_watching_task(&self) {
797 let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
798 let ns = Arc::clone(&self.ns);
799
800 tokio::spawn(async move {
801 loop {
802 tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
803
804 let all_running = {
805 let guard = nodes_to_watch.read().await;
806 let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
807
808 let all_running =
809 futures::future::try_join_all(nodes.iter().map(|n| {
810 n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
811 }))
812 .await;
813
814 if nodes.iter().any(|n| !n.is_running()) {
816 continue;
817 } else {
818 all_running
819 }
820 };
821
822 if let Err(e) = all_running {
823 warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
824
825 if let Err(e) = ns.destroy().await {
826 error!("an error occurred during network teardown: {}", e);
827 }
828
829 std::process::exit(1);
830 }
831 }
832 });
833 }
834
835 pub(crate) fn set_parachains(&mut self, parachains: HashMap<u32, Vec<Parachain>>) {
836 self.parachains = parachains;
837 }
838
839 pub(crate) fn insert_node(&mut self, node: NetworkNode) {
840 self.nodes_by_name.insert(node.name.clone(), node);
841 }
842}