1pub mod chain_upgrade;
2pub mod node;
3pub mod parachain;
4pub mod relaychain;
5
6use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration};
7
8use configuration::{
9 para_states::{Initial, Running},
10 shared::{helpers::generate_unique_node_name_from_names, node::EnvVar},
11 types::{Arg, Command, Image, Port, ValidationContext},
12 ParachainConfig, ParachainConfigBuilder, RegistrationStrategy,
13};
14use provider::{types::TransferedFile, DynNamespace, ProviderError};
15use serde::Serialize;
16use support::fs::FileSystem;
17use tokio::sync::RwLock;
18use tracing::{error, warn};
19
20use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
21use crate::{
22 generators::chain_spec::ChainSpec,
23 network_spec::{self, NetworkSpec},
24 shared::{
25 constants::{NODE_MONITORING_FAILURE_THRESHOLD_SECONDS, NODE_MONITORING_INTERVAL_SECONDS},
26 macros,
27 types::{ChainDefaultContext, RegisterParachainOptions},
28 },
29 spawner::{self, SpawnNodeCtx},
30 ScopedFilesystem, ZombieRole,
31};
32
33#[derive(Serialize)]
34pub struct Network<T: FileSystem> {
35 #[serde(skip)]
36 ns: DynNamespace,
37 #[serde(skip)]
38 filesystem: T,
39 relay: Relaychain,
40 initial_spec: NetworkSpec,
41 parachains: HashMap<u32, Vec<Parachain>>,
42 #[serde(skip)]
43 nodes_by_name: HashMap<String, NetworkNode>,
44 #[serde(skip)]
45 nodes_to_watch: Arc<RwLock<Vec<NetworkNode>>>,
46}
47
48impl<T: FileSystem> std::fmt::Debug for Network<T> {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("Network")
51 .field("ns", &"ns_skipped")
52 .field("relay", &self.relay)
53 .field("initial_spec", &self.initial_spec)
54 .field("parachains", &self.parachains)
55 .field("nodes_by_name", &self.nodes_by_name)
56 .finish()
57 }
58}
59
60macros::create_add_options!(AddNodeOptions {
61 chain_spec: Option<PathBuf>
62});
63
64macros::create_add_options!(AddCollatorOptions {
65 chain_spec: Option<PathBuf>,
66 chain_spec_relay: Option<PathBuf>
67});
68
69impl<T: FileSystem> Network<T> {
70 pub(crate) fn new_with_relay(
71 relay: Relaychain,
72 ns: DynNamespace,
73 fs: T,
74 initial_spec: NetworkSpec,
75 ) -> Self {
76 Self {
77 ns,
78 filesystem: fs,
79 relay,
80 initial_spec,
81 parachains: Default::default(),
82 nodes_by_name: Default::default(),
83 nodes_to_watch: Default::default(),
84 }
85 }
86
87 pub fn ns_name(&self) -> String {
89 self.ns.name().to_string()
90 }
91
92 pub fn base_dir(&self) -> Option<&str> {
93 self.ns.base_dir().to_str()
94 }
95
96 pub fn relaychain(&self) -> &Relaychain {
97 &self.relay
98 }
99
100 pub async fn destroy(self) -> Result<(), ProviderError> {
102 self.ns.destroy().await
103 }
104
105 pub async fn add_node(
131 &mut self,
132 name: impl Into<String>,
133 options: AddNodeOptions,
134 ) -> Result<(), anyhow::Error> {
135 let name = generate_unique_node_name_from_names(
136 name,
137 &mut self.nodes_by_name.keys().cloned().collect(),
138 );
139
140 let relaychain = self.relaychain();
141
142 let chain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec {
143 chain_spec_custom_path.clone()
144 } else {
145 PathBuf::from(format!(
146 "{}/{}.json",
147 self.ns.base_dir().to_string_lossy(),
148 relaychain.chain
149 ))
150 };
151
152 let chain_context = ChainDefaultContext {
153 default_command: self.initial_spec.relaychain.default_command.as_ref(),
154 default_image: self.initial_spec.relaychain.default_image.as_ref(),
155 default_resources: self.initial_spec.relaychain.default_resources.as_ref(),
156 default_db_snapshot: self.initial_spec.relaychain.default_db_snapshot.as_ref(),
157 default_args: self.initial_spec.relaychain.default_args.iter().collect(),
158 };
159
160 let mut node_spec = network_spec::node::NodeSpec::from_ad_hoc(
161 &name,
162 options.into(),
163 &chain_context,
164 false,
165 )?;
166
167 node_spec.available_args_output = Some(
168 self.initial_spec
169 .node_available_args_output(&node_spec, self.ns.clone())
170 .await?,
171 );
172
173 let base_dir = self.ns.base_dir().to_string_lossy();
174 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
175
176 let ctx = SpawnNodeCtx {
177 chain_id: &relaychain.chain_id,
178 parachain_id: None,
179 chain: &relaychain.chain,
180 role: ZombieRole::Node,
181 ns: &self.ns,
182 scoped_fs: &scoped_fs,
183 parachain: None,
184 bootnodes_addr: &vec![],
185 wait_ready: true,
186 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
187 };
188
189 let global_files_to_inject = vec![TransferedFile::new(
190 chain_spec_path,
191 PathBuf::from(format!("/cfg/{}.json", relaychain.chain)),
192 )];
193
194 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
195
196 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
208 .await?;
209
210 self.add_running_node(node.clone(), None).await;
212
213 Ok(())
214 }
215
216 pub async fn add_collator(
243 &mut self,
244 name: impl Into<String>,
245 options: AddCollatorOptions,
246 para_id: u32,
247 ) -> Result<(), anyhow::Error> {
248 let name = generate_unique_node_name_from_names(
249 name,
250 &mut self.nodes_by_name.keys().cloned().collect(),
251 );
252 let spec = self
253 .initial_spec
254 .parachains
255 .iter()
256 .find(|para| para.id == para_id)
257 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
258 let role = if spec.is_cumulus_based {
259 ZombieRole::CumulusCollator
260 } else {
261 ZombieRole::Collator
262 };
263 let chain_context = ChainDefaultContext {
264 default_command: spec.default_command.as_ref(),
265 default_image: spec.default_image.as_ref(),
266 default_resources: spec.default_resources.as_ref(),
267 default_db_snapshot: spec.default_db_snapshot.as_ref(),
268 default_args: spec.default_args.iter().collect(),
269 };
270
271 let parachain = self
272 .parachains
273 .get_mut(¶_id)
274 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?
275 .get_mut(0)
276 .ok_or(anyhow::anyhow!(format!("parachain: {para_id} not found!")))?;
277
278 let base_dir = self.ns.base_dir().to_string_lossy();
279 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
280
281 let ctx = SpawnNodeCtx {
283 chain_id: &self.relay.chain_id,
284 parachain_id: parachain.chain_id.as_deref(),
285 chain: &self.relay.chain,
286 role,
287 ns: &self.ns,
288 scoped_fs: &scoped_fs,
289 parachain: Some(spec),
290 bootnodes_addr: &vec![],
291 wait_ready: true,
292 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
293 };
294
295 let relaychain_spec_path = if let Some(chain_spec_custom_path) = &options.chain_spec_relay {
296 chain_spec_custom_path.clone()
297 } else {
298 PathBuf::from(format!(
299 "{}/{}.json",
300 self.ns.base_dir().to_string_lossy(),
301 self.relay.chain
302 ))
303 };
304
305 let mut global_files_to_inject = vec![TransferedFile::new(
306 relaychain_spec_path,
307 PathBuf::from(format!("/cfg/{}.json", self.relay.chain)),
308 )];
309
310 let para_chain_spec_local_path = if let Some(para_chain_spec_custom) = &options.chain_spec {
311 Some(para_chain_spec_custom.clone())
312 } else if let Some(para_spec_path) = ¶chain.chain_spec_path {
313 Some(PathBuf::from(format!(
314 "{}/{}",
315 self.ns.base_dir().to_string_lossy(),
316 para_spec_path.to_string_lossy()
317 )))
318 } else {
319 None
320 };
321
322 if let Some(para_spec_path) = para_chain_spec_local_path {
323 global_files_to_inject.push(TransferedFile::new(
324 para_spec_path,
325 PathBuf::from(format!("/cfg/{para_id}.json")),
326 ));
327 }
328
329 let mut node_spec =
330 network_spec::node::NodeSpec::from_ad_hoc(name, options.into(), &chain_context, true)?;
331
332 node_spec.available_args_output = Some(
333 self.initial_spec
334 .node_available_args_output(&node_spec, self.ns.clone())
335 .await?,
336 );
337
338 let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
339
340 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
342 .await?;
343
344 parachain.collators.push(node.clone());
345 self.add_running_node(node, None).await;
346
347 Ok(())
348 }
349
350 pub fn para_config_builder(&self) -> ParachainConfigBuilder<Initial, Running> {
355 let used_ports = self
356 .nodes_iter()
357 .map(|node| node.spec())
358 .flat_map(|spec| {
359 [
360 spec.ws_port.0,
361 spec.rpc_port.0,
362 spec.prometheus_port.0,
363 spec.p2p_port.0,
364 ]
365 })
366 .collect();
367
368 let used_nodes_names = self.nodes_by_name.keys().cloned().collect();
369
370 let used_para_ids = self
372 .parachains
373 .iter()
374 .map(|(id, paras)| (*id, paras.len().saturating_sub(1) as u8))
375 .collect();
376
377 let context = ValidationContext {
378 used_ports,
379 used_nodes_names,
380 used_para_ids,
381 };
382 let context = Rc::new(RefCell::new(context));
383
384 ParachainConfigBuilder::new_with_running(context)
385 }
386
387 pub async fn add_parachain(
421 &mut self,
422 para_config: &ParachainConfig,
423 custom_relaychain_spec: Option<PathBuf>,
424 custom_parchain_fs_prefix: Option<String>,
425 ) -> Result<(), anyhow::Error> {
426 let mut para_spec = network_spec::parachain::ParachainSpec::from_config(para_config)?;
428 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
429 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
430
431 let mut global_files_to_inject = vec![];
432
433 let relay_chain_id = if let Some(custom_path) = custom_relaychain_spec {
435 global_files_to_inject.push(TransferedFile::new(
437 custom_path.clone(),
438 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
439 ));
440 let content = std::fs::read_to_string(custom_path)?;
441 ChainSpec::chain_id_from_spec(&content)?
442 } else {
443 global_files_to_inject.push(TransferedFile::new(
444 PathBuf::from(format!(
445 "{}/{}",
446 scoped_fs.base_dir,
447 self.relaychain().chain_spec_path.to_string_lossy()
448 )),
449 PathBuf::from(format!("/cfg/{}.json", self.relaychain().chain)),
450 ));
451 self.relay.chain_id.clone()
452 };
453
454 let chain_spec_raw_path = para_spec
455 .build_chain_spec(&relay_chain_id, &self.ns, &scoped_fs)
456 .await?;
457
458 let para_path_prefix = if let Some(custom_prefix) = custom_parchain_fs_prefix {
460 custom_prefix
461 } else {
462 para_spec.id.to_string()
463 };
464
465 scoped_fs.create_dir(¶_path_prefix).await?;
466 para_spec
468 .genesis_state
469 .build(
470 chain_spec_raw_path.as_ref(),
471 format!("{}/genesis-state", ¶_path_prefix),
472 &self.ns,
473 &scoped_fs,
474 )
475 .await?;
476 para_spec
477 .genesis_wasm
478 .build(
479 chain_spec_raw_path.as_ref(),
480 format!("{}/para_spec-wasm", ¶_path_prefix),
481 &self.ns,
482 &scoped_fs,
483 )
484 .await?;
485
486 let parachain =
487 Parachain::from_spec(¶_spec, &global_files_to_inject, &scoped_fs).await?;
488 let parachain_id = parachain.chain_id.clone();
489
490 let ctx_para = SpawnNodeCtx {
492 parachain: Some(¶_spec),
493 parachain_id: parachain_id.as_deref(),
494 role: if para_spec.is_cumulus_based {
495 ZombieRole::CumulusCollator
496 } else {
497 ZombieRole::Collator
498 },
499 bootnodes_addr: &vec![],
500 chain_id: &self.relaychain().chain_id,
501 chain: &self.relaychain().chain,
502 ns: &self.ns,
503 scoped_fs: &scoped_fs,
504 wait_ready: false,
505 nodes_by_name: serde_json::to_value(&self.nodes_by_name)?,
506 };
507
508 let first_node_url = self
510 .relaychain()
511 .nodes
512 .first()
513 .ok_or(anyhow::anyhow!(
514 "At least one node of the relaychain should be running"
515 ))?
516 .ws_uri();
517
518 if para_config.registration_strategy() == Some(&RegistrationStrategy::UsingExtrinsic) {
519 let register_para_options = RegisterParachainOptions {
520 id: parachain.para_id,
521 wasm_path: para_spec
523 .genesis_wasm
524 .artifact_path()
525 .ok_or(anyhow::anyhow!(
526 "artifact path for wasm must be set at this point",
527 ))?
528 .to_path_buf(),
529 state_path: para_spec
530 .genesis_state
531 .artifact_path()
532 .ok_or(anyhow::anyhow!(
533 "artifact path for state must be set at this point",
534 ))?
535 .to_path_buf(),
536 node_ws_url: first_node_url.to_string(),
537 onboard_as_para: para_spec.onboard_as_parachain,
538 seed: None, finalization: false,
540 };
541
542 Parachain::register(register_para_options, &scoped_fs).await?;
543 }
544
545 let spawning_tasks = para_spec
547 .collators
548 .iter()
549 .map(|node| spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para));
550
551 let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
552
553 let waiting_tasks = running_nodes.iter().map(|node| {
555 node.wait_until_is_up(self.initial_spec.global_settings.network_spawn_timeout())
556 });
557
558 let _ = futures::future::try_join_all(waiting_tasks).await?;
559
560 let running_para_id = parachain.para_id;
561 self.add_para(parachain);
562 for node in running_nodes {
563 self.add_running_node(node, Some(running_para_id)).await;
564 }
565
566 Ok(())
567 }
568
569 pub async fn register_parachain(&mut self, para_id: u32) -> Result<(), anyhow::Error> {
608 let para = self
609 .initial_spec
610 .parachains
611 .iter()
612 .find(|p| p.id == para_id)
613 .ok_or(anyhow::anyhow!(
614 "no parachain with id = {para_id} available",
615 ))?;
616 let para_genesis_config = para.get_genesis_config()?;
617 let first_node_url = self
618 .relaychain()
619 .nodes
620 .first()
621 .ok_or(anyhow::anyhow!(
622 "At least one node of the relaychain should be running"
623 ))?
624 .ws_uri();
625 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
626 id: para_id,
627 wasm_path: para_genesis_config.wasm_path.clone(),
629 state_path: para_genesis_config.state_path.clone(),
630 node_ws_url: first_node_url.to_string(),
631 onboard_as_para: para_genesis_config.as_parachain,
632 seed: None, finalization: false,
634 };
635 let base_dir = self.ns.base_dir().to_string_lossy().to_string();
636 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
637 Parachain::register(register_para_options, &scoped_fs).await?;
638
639 Ok(())
640 }
641
642 pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
646 let name = name.into();
647 if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
648 return Ok(node);
649 }
650
651 let list = self
652 .nodes_iter()
653 .map(|n| &n.name)
654 .cloned()
655 .collect::<Vec<_>>()
656 .join(", ");
657
658 Err(anyhow::anyhow!(
659 "can't find node with name: {name:?}, should be one of {list}"
660 ))
661 }
662
663 pub fn get_node_mut(
664 &mut self,
665 name: impl Into<String>,
666 ) -> Result<&mut NetworkNode, anyhow::Error> {
667 let name = name.into();
668 self.nodes_iter_mut()
669 .find(|n| n.name == name)
670 .ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
671 }
672
673 pub fn nodes(&self) -> Vec<&NetworkNode> {
674 self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
675 }
676
677 pub async fn detach(&self) {
678 self.ns.detach().await
679 }
680
681 pub(crate) async fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
683 if let Some(para_id) = para_id {
684 if let Some(para) = self.parachains.get_mut(¶_id).and_then(|p| p.get_mut(0)) {
685 para.collators.push(node.clone());
686 } else {
687 unreachable!()
689 }
690 } else {
691 self.relay.nodes.push(node.clone());
692 }
693 node.set_is_running(true);
695 let node_name = node.name.clone();
696 self.nodes_by_name.insert(node_name, node.clone());
697 self.nodes_to_watch.write().await.push(node);
698 }
699
700 pub(crate) fn add_para(&mut self, para: Parachain) {
701 self.parachains.entry(para.para_id).or_default().push(para);
702 }
703
704 pub fn name(&self) -> &str {
705 self.ns.name()
706 }
707
708 pub fn parachain(&self, para_id: u32) -> Option<&Parachain> {
718 self.parachains.get(¶_id)?.first()
719 }
720
721 pub fn parachain_by_unique_id(&self, unique_id: impl AsRef<str>) -> Option<&Parachain> {
729 self.parachains
730 .values()
731 .flat_map(|p| p.iter())
732 .find(|p| p.unique_id == unique_id.as_ref())
733 }
734
735 pub fn parachains(&self) -> Vec<&Parachain> {
736 self.parachains.values().flatten().collect()
737 }
738
739 pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
740 self.relay.nodes.iter().chain(
741 self.parachains
742 .values()
743 .flat_map(|p| p.iter())
744 .flat_map(|p| &p.collators),
745 )
746 }
747
748 pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
749 self.relay.nodes.iter_mut().chain(
750 self.parachains
751 .values_mut()
752 .flat_map(|p| p.iter_mut())
753 .flat_map(|p| &mut p.collators),
754 )
755 }
756
757 pub async fn wait_until_is_up(&self, timeout_secs: u64) -> Result<(), anyhow::Error> {
767 let handles = self
768 .nodes_iter()
769 .map(|node| node.wait_until_is_up(timeout_secs));
770
771 futures::future::try_join_all(handles).await?;
772
773 Ok(())
774 }
775
776 pub(crate) fn spawn_watching_task(&self) {
777 let nodes_to_watch = Arc::clone(&self.nodes_to_watch);
778 let ns = Arc::clone(&self.ns);
779
780 tokio::spawn(async move {
781 loop {
782 tokio::time::sleep(Duration::from_secs(NODE_MONITORING_INTERVAL_SECONDS)).await;
783
784 let all_running = {
785 let guard = nodes_to_watch.read().await;
786 let nodes = guard.iter().filter(|n| n.is_running()).collect::<Vec<_>>();
787
788 let all_running =
789 futures::future::try_join_all(nodes.iter().map(|n| {
790 n.wait_until_is_up(NODE_MONITORING_FAILURE_THRESHOLD_SECONDS)
791 }))
792 .await;
793
794 if nodes.iter().any(|n| !n.is_running()) {
796 continue;
797 } else {
798 all_running
799 }
800 };
801
802 if let Err(e) = all_running {
803 warn!("\n\t🧟 One of the nodes crashed: {e}. tearing the network down...");
804
805 if let Err(e) = ns.destroy().await {
806 error!("an error occurred during network teardown: {}", e);
807 }
808
809 std::process::exit(1);
810 }
811 }
812 });
813 }
814}