1#![allow(dead_code, clippy::expect_fun_call)]
3
4pub mod errors;
5pub mod generators;
6pub mod network;
7pub mod network_helper;
8pub mod observability;
9pub mod tx_helper;
10
11mod network_spec;
12pub mod shared;
13mod spawner;
14mod utils;
15
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 env,
19 net::IpAddr,
20 path::{Path, PathBuf},
21 time::{Duration, SystemTime},
22};
23
24use anyhow::anyhow;
25use configuration::{types::JsonOverrides, NetworkConfig, RegistrationStrategy};
26use errors::OrchestratorError;
27use generators::{core_assignment, errors::GeneratorError};
28use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
29pub use network_spec::NetworkSpec;
31use network_spec::{node::NodeSpec, parachain::ParachainSpec};
32use provider::{
33 types::{ProviderCapabilities, TransferedFile},
34 DynNamespace, DynProvider,
35};
36use serde_json::json;
37use support::{
38 constants::{
39 GRAPH_CONTAINS_DEP, GRAPH_CONTAINS_NAME, INDEGREE_CONTAINS_NAME, QUEUE_NOT_EMPTY,
40 THIS_IS_A_BUG,
41 },
42 fs::{FileSystem, FileSystemError},
43 replacer::{get_tokens_to_replace, has_tokens},
44};
45use tokio::time::timeout;
46use tracing::{debug, info, trace, warn};
47
48use crate::{
49 network::{node::RawNetworkNode, parachain::RawParachain, relaychain::RawRelaychain},
50 shared::types::RegisterParachainOptions,
51 spawner::SpawnNodeCtx,
52 utils::write_zombie_json,
53};
54pub struct Orchestrator<T>
55where
56 T: FileSystem + Sync + Send,
57{
58 filesystem: T,
59 provider: DynProvider,
60}
61
62impl<T> Orchestrator<T>
63where
64 T: FileSystem + Sync + Send + Clone,
65{
66 pub fn new(filesystem: T, provider: DynProvider) -> Self {
67 Self {
68 filesystem,
69 provider,
70 }
71 }
72
73 pub async fn spawn(
74 &self,
75 network_config: NetworkConfig,
76 ) -> Result<Network<T>, OrchestratorError> {
77 let global_timeout = network_config.global_settings().network_spawn_timeout();
78 let network_spec = NetworkSpec::from_config(&network_config).await?;
79
80 let res = timeout(
81 Duration::from_secs(global_timeout.into()),
82 self.spawn_inner(network_spec),
83 )
84 .await
85 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
86 res?
87 }
88
89 pub async fn spawn_from_spec(
90 &self,
91 network_spec: NetworkSpec,
92 ) -> Result<Network<T>, OrchestratorError> {
93 let global_timeout = network_spec.global_settings.network_spawn_timeout();
94 let res = timeout(
95 Duration::from_secs(global_timeout as u64),
96 self.spawn_inner(network_spec),
97 )
98 .await
99 .map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
100 res?
101 }
102
103 pub async fn attach_to_live(
104 &self,
105 zombie_json_path: &Path,
106 ) -> Result<Network<T>, OrchestratorError> {
107 info!("attaching to live network...");
108 info!("reading zombie.json from {:?}", zombie_json_path);
109
110 let zombie_json_content = self.filesystem.read_to_string(zombie_json_path).await?;
111 let zombie_json: serde_json::Value = serde_json::from_str(&zombie_json_content)?;
112
113 info!("recreating namespace...");
114 let ns: DynNamespace = self
115 .provider
116 .create_namespace_from_json(&zombie_json)
117 .await?;
118
119 info!("recreating relaychain...");
120 let (relay, initial_spec) =
121 recreate_relaychain_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
122 let relay_nodes = relay.nodes.clone();
123
124 let mut network =
125 Network::new_with_relay(relay, ns.clone(), self.filesystem.clone(), initial_spec);
126
127 for node in relay_nodes {
128 if node.is_responsive().await {
129 node.set_is_running(true);
130 }
131 network.insert_node(node);
132 }
133
134 info!("recreating parachains...");
135 let parachains_map =
136 recreate_parachains_from_json(&zombie_json, ns.clone(), self.provider.name()).await?;
137 let para_nodes = parachains_map
138 .values()
139 .flat_map(|paras| paras.iter().flat_map(|para| para.collators.clone()))
140 .collect::<Vec<NetworkNode>>();
141
142 network.set_parachains(parachains_map);
143 for node in para_nodes {
144 if node.is_responsive().await {
145 node.set_is_running(true);
146 }
147 network.insert_node(node);
148 }
149
150 Ok(network)
151 }
152
153 async fn spawn_inner(
154 &self,
155 mut network_spec: NetworkSpec,
156 ) -> Result<Network<T>, OrchestratorError> {
157 debug!(network_spec = ?network_spec,"Network spec to spawn");
159
160 validate_spec_with_provider_capabilities(&network_spec, self.provider.capabilities())
162 .map_err(|err| {
163 OrchestratorError::InvalidConfigForProvider(
164 self.provider.name().into(),
165 err.to_string(),
166 )
167 })?;
168
169 let ns = if let Some(base_dir) = network_spec.global_settings.base_dir() {
171 self.provider
172 .create_namespace_with_base_dir(base_dir)
173 .await?
174 } else {
175 self.provider.create_namespace().await?
176 };
177
178 let (spawn_concurrency, limited_by_tokens) = calculate_concurrency(&network_spec)?;
180
181 let start_time = SystemTime::now();
182 info!("🧰 ns: {}", ns.name());
183 info!("🧰 base_dir: {:?}", ns.base_dir());
184 info!("🕰 start time: {:?}", start_time);
185 info!("⚙️ spawn concurrency: {spawn_concurrency} (limited by tokens: {limited_by_tokens})");
186
187 network_spec
188 .populate_nodes_available_args(ns.clone())
189 .await?;
190
191 let all_nodes: Vec<&NodeSpec> = network_spec
194 .relaychain()
195 .nodes
196 .iter()
197 .chain(
198 network_spec
199 .parachains_iter()
200 .flat_map(|p| p.collators.iter()),
201 )
202 .collect();
203 let resolved_db_snapshots =
204 generators::resolve_db_snapshots(all_nodes, &ns, &self.filesystem).await?;
205
206 let base_dir = ns.base_dir().to_string_lossy();
207 let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);
208 network_spec
210 .relaychain
211 .chain_spec
212 .build(&ns, &scoped_fs)
213 .await?;
214
215 debug!("relaychain spec built!");
216 let relay_chain_id = network_spec
218 .relaychain
219 .chain_spec
220 .read_chain_id(&scoped_fs)
221 .await?;
222
223 let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
224 let base_dir_exists = network_spec.global_settings.base_dir().is_some();
225 network_spec
226 .build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
227 .await?;
228
229 let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
231 Vec<&ParachainSpec>,
232 Vec<&ParachainSpec>,
233 ) = network_spec
234 .parachains
235 .iter()
236 .filter(|para| para.registration_strategy != RegistrationStrategy::Manual)
237 .partition(|para| {
238 matches!(para.registration_strategy, RegistrationStrategy::InGenesis)
239 });
240
241 let mut para_artifacts = vec![];
242 for para in ¶_to_register_in_genesis {
243 let genesis_config = para.get_genesis_config()?;
244 para_artifacts.push(genesis_config)
245 }
246
247 network_spec
249 .relaychain
250 .chain_spec
251 .customize_relay(
252 &network_spec.relaychain,
253 &network_spec.hrmp_channels,
254 para_artifacts,
255 &scoped_fs,
256 )
257 .await?;
258
259 if let Some(script_cmd) = network_spec.relaychain.post_process_script.as_deref() {
261 network_spec
262 .relaychain
263 .chain_spec
264 .run_post_process_script(script_cmd, &scoped_fs)
265 .await?;
266 }
267
268 let num_cores = network_spec.parachains_iter().fold(0u32, |mut acc, para| {
270 if let Some(cores) = para.num_cores {
271 acc += cores;
272 } else if &RegistrationStrategy::InGenesis == para.registration_strategy() {
273 acc += 1;
275 }
276
277 acc
278 });
279
280 if num_cores > para_to_register_in_genesis.len() as u32 {
281 let num_cores_to_set = num_cores - para_to_register_in_genesis.len() as u32;
282 let overrides = json!({
284 "configuration": {
285 "config": {
286 "scheduler_params": {
287 "num_cores": num_cores_to_set,
288 "max_validators_per_core": 1
289 },
290 }
291 }
292 });
293
294 network_spec
295 .relaychain
296 .chain_spec
297 .apply_genesis_override(&scoped_fs, &overrides)
298 .await?;
299 }
300
301 network_spec
303 .relaychain
304 .chain_spec
305 .build_raw(&ns, &scoped_fs, None)
306 .await?;
307
308 if let Some(ref wasm_override) = network_spec.relaychain.wasm_override {
310 network_spec
311 .relaychain
312 .chain_spec
313 .override_code(&scoped_fs, wasm_override)
314 .await?;
315 }
316
317 if let Some(ref raw_spec_override) = network_spec.relaychain.raw_spec_override {
319 network_spec
320 .relaychain
321 .chain_spec
322 .override_raw_spec(&scoped_fs, raw_spec_override)
323 .await?;
324 }
325
326 debug!("Raw overrides info: num_cores: {}, para_to_register_in_genesis_len: {:?}, override_session_0: {}", num_cores, para_to_register_in_genesis.len(), network_spec.relaychain().override_session_0);
328 if num_cores > para_to_register_in_genesis.len() as u32
329 || network_spec.relaychain().override_session_0
330 {
331 let mut core_index = 0u32;
332 let scheduler_key = core_assignment::get_parascheduler_storage_key();
335 let is_old = !network_spec
336 .relaychain
337 .chain_spec
338 .find_raw_key(&scoped_fs, &scheduler_key)
339 .await?;
340
341 let mut para_scheduler_value_parts: Vec<String> = vec![];
342 let mut raw_json_overrides = json!({});
343 for para in &network_spec.parachains {
345 let mut cores_for_para = 0_u32;
347 if let Some(cores) = para.num_cores {
348 if &RegistrationStrategy::InGenesis == para.registration_strategy() {
349 cores_for_para = cores;
350 }
351 } else {
352 if &RegistrationStrategy::InGenesis == para.registration_strategy()
355 && network_spec.relaychain().override_session_0
356 {
357 cores_for_para += 1;
358 }
359 }
360
361 debug!(
362 "Assigning {cores_for_para} cores in raw spec for para {}. Using pallet {}",
363 para.id,
364 if is_old {
365 "CoretimeAssignmentProvider"
366 } else {
367 "ParaScheduler"
368 }
369 );
370 for _core in 0..cores_for_para {
371 if is_old {
372 let (core_assign_key, core_assign_value) =
373 core_assignment::generate_old(core_index, para.id);
374 raw_json_overrides[core_assign_key] = json!(core_assign_value);
375 } else {
376 let part = core_assignment::generate(core_index, para.id);
377 para_scheduler_value_parts.push(part);
378 }
379 core_index += 1;
380 }
381 }
382
383 if !is_old {
385 let count_prefix = format!("{:02x}", para_scheduler_value_parts.len() * 4);
386 let core_assign_value =
387 format!("{count_prefix}{}", para_scheduler_value_parts.join(""));
388 raw_json_overrides[scheduler_key] = json!(core_assign_value);
389 }
390
391 if network_spec.relaychain().override_session_0 {
393 trace!("Overriding pallet ParaSessionInfo.session (0) to allow paras to produce blocks at first session.");
394 let raw_spec = network_spec
395 .relaychain
396 .chain_spec
397 .read_raw_spec(&scoped_fs)
398 .await?;
399 let overrides = generators::generate_session_0_overrides(&raw_spec, num_cores)?;
400
401 for (k, v) in overrides.as_object().ok_or(anyhow!(
402 "'generate_session_0_overrides' should be a valid json Object."
403 ))? {
404 raw_json_overrides[k] = v.clone();
405 }
406 }
407
408 debug!("Raw overrides keys: {:?}", raw_json_overrides);
409
410 network_spec
411 .relaychain
412 .chain_spec
413 .override_raw_spec(
414 &scoped_fs,
415 &JsonOverrides::Json(json!({
416 "genesis": {
417 "raw": {
418 "top": raw_json_overrides
419 }
420 }
421 })),
422 )
423 .await?;
424 }
425
426 let (bootnodes, relaynodes) =
427 split_nodes_by_bootnodes(&network_spec.relaychain.nodes, false);
428
429 let mut ctx = SpawnNodeCtx {
431 chain_id: &relay_chain_id,
432 parachain_id: None,
433 chain: relay_chain_name.as_str(),
434 role: ZombieRole::Node,
435 ns: &ns,
436 scoped_fs: &scoped_fs,
437 parachain: None,
438 bootnodes_addr: &vec![],
439 wait_ready: false,
440 nodes_by_name: json!({}),
441 global_settings: &network_spec.global_settings,
442 resolved_db_snapshots: &resolved_db_snapshots,
443 };
444
445 let global_files_to_inject = vec![TransferedFile::new(
446 PathBuf::from(format!(
447 "{}/{relay_chain_name}.json",
448 ns.base_dir().to_string_lossy()
449 )),
450 PathBuf::from(format!("/cfg/{relay_chain_name}.json")),
451 )];
452
453 let r = Relaychain::new(
454 relay_chain_name.to_string(),
455 relay_chain_id.clone(),
456 PathBuf::from(network_spec.relaychain.chain_spec.raw_path().ok_or(
457 OrchestratorError::InvariantError("chain-spec raw path should be set now"),
458 )?),
459 );
460 let mut network =
461 Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());
462
463 let mut node_ws_url: String = "".to_string();
465
466 let mut bootnodes_addr: Vec<String> = vec![];
468
469 for level in dependency_levels_among(&bootnodes)? {
470 let mut running_nodes_per_level = vec![];
471 for chunk in level.chunks(spawn_concurrency) {
472 let spawning_tasks = chunk
473 .iter()
474 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
475
476 for node in futures::future::try_join_all(spawning_tasks).await? {
477 let bootnode_multiaddr = node.multiaddr();
478
479 bootnodes_addr.push(bootnode_multiaddr.to_string());
480
481 if node_ws_url.is_empty() {
483 node_ws_url.clone_from(&node.ws_uri)
484 }
485
486 running_nodes_per_level.push(node);
487 }
488 }
489 info!(
490 "🕰 waiting for level: {:?} to be up...",
491 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
492 );
493
494 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
496 node.wait_until_is_up(network_spec.global_settings.node_spawn_timeout())
497 });
498
499 let _ = futures::future::try_join_all(waiting_tasks).await?;
500
501 for node in running_nodes_per_level {
502 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
504 network.add_running_node(node, None).await;
505 }
506 }
507
508 network_spec
510 .relaychain
511 .chain_spec
512 .add_bootnodes(&scoped_fs, &bootnodes_addr)
513 .await?;
514
515 ctx.bootnodes_addr = &bootnodes_addr;
516
517 for level in dependency_levels_among(&relaynodes)? {
518 let mut running_nodes_per_level = vec![];
519 for chunk in level.chunks(spawn_concurrency) {
520 let spawning_tasks = chunk
521 .iter()
522 .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));
523
524 for node in futures::future::try_join_all(spawning_tasks).await? {
525 running_nodes_per_level.push(node);
526 }
527 }
528 info!(
529 "🕰 waiting for level: {:?} to be up...",
530 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
531 );
532
533 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
535 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
536 });
537
538 let _ = futures::future::try_join_all(waiting_tasks).await?;
539
540 for node in running_nodes_per_level {
541 ctx.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
542 network.add_running_node(node, None).await;
543 }
544 }
545
546 for para in network_spec.parachains.iter() {
548 let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
550 let parachain_id = parachain.chain_id.clone();
551
552 let (bootnodes, collators) =
553 split_nodes_by_bootnodes(¶.collators, para.no_default_bootnodes);
554
555 let mut ctx_para = SpawnNodeCtx {
557 parachain: Some(para),
558 parachain_id: parachain_id.as_deref(),
559 role: if para.is_cumulus_based {
560 ZombieRole::CumulusCollator
561 } else {
562 ZombieRole::Collator
563 },
564 bootnodes_addr: &vec![],
565 ..ctx.clone()
566 };
567
568 let mut bootnodes_addr: Vec<String> = vec![];
570 let mut running_nodes: Vec<NetworkNode> = vec![];
571
572 for level in dependency_levels_among(&bootnodes)? {
573 let mut running_nodes_per_level = vec![];
574 for chunk in level.chunks(spawn_concurrency) {
575 let spawning_tasks = chunk.iter().map(|node| {
576 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
577 });
578
579 for node in futures::future::try_join_all(spawning_tasks).await? {
580 let bootnode_multiaddr = node.multiaddr();
581
582 bootnodes_addr.push(bootnode_multiaddr.to_string());
583
584 running_nodes_per_level.push(node);
585 }
586 }
587 info!(
588 "🕰 waiting for level: {:?} to be up...",
589 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
590 );
591
592 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
594 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
595 });
596
597 let _ = futures::future::try_join_all(waiting_tasks).await?;
598
599 for node in running_nodes_per_level {
600 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
601 running_nodes.push(node);
602 }
603 }
604
605 if let Some(para_chain_spec) = para.chain_spec.as_ref() {
606 para_chain_spec
607 .add_bootnodes(&scoped_fs, &bootnodes_addr)
608 .await?;
609 }
610
611 ctx_para.bootnodes_addr = &bootnodes_addr;
612
613 for level in dependency_levels_among(&collators)? {
615 let mut running_nodes_per_level = vec![];
616 for chunk in level.chunks(spawn_concurrency) {
617 let spawning_tasks = chunk.iter().map(|node| {
618 spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
619 });
620
621 for node in futures::future::try_join_all(spawning_tasks).await? {
622 running_nodes_per_level.push(node);
623 }
624 }
625 info!(
626 "🕰 waiting for level: {:?} to be up...",
627 level.iter().map(|n| n.name.clone()).collect::<Vec<_>>()
628 );
629
630 let waiting_tasks = running_nodes_per_level.iter().map(|node| {
632 node.wait_until_is_up(network_spec.global_settings.network_spawn_timeout())
633 });
634
635 let _ = futures::future::try_join_all(waiting_tasks).await?;
636
637 for node in running_nodes_per_level {
638 ctx_para.nodes_by_name[node.name().to_owned()] = serde_json::to_value(&node)?;
639 running_nodes.push(node);
640 }
641 }
642
643 let running_para_id = parachain.para_id;
644 network.add_para(parachain);
645 for node in running_nodes {
646 network.add_running_node(node, Some(running_para_id)).await;
647 }
648 }
649
650 for para in para_to_register_with_extrinsic {
652 let register_para_options: RegisterParachainOptions = RegisterParachainOptions {
653 id: para.id,
654 wasm_path: para
656 .genesis_wasm
657 .artifact_path()
658 .ok_or(OrchestratorError::InvariantError(
659 "artifact path for wasm must be set at this point",
660 ))?
661 .to_path_buf(),
662 state_path: para
663 .genesis_state
664 .artifact_path()
665 .ok_or(OrchestratorError::InvariantError(
666 "artifact path for state must be set at this point",
667 ))?
668 .to_path_buf(),
669 node_ws_url: node_ws_url.clone(),
670 onboard_as_para: para.onboard_as_parachain,
671 seed: None, finalization: false,
673 };
674
675 Parachain::register(register_para_options, &scoped_fs).await?;
676 }
677
678 if network_spec.global_settings.observability().enabled() {
679 match network
680 .start_observability(network_spec.global_settings.observability())
681 .await
682 {
683 Ok(obs) => {
684 info!("📊 Prometheus URL: {}", obs.prometheus_url);
685 info!("📊 Grafana URL: {}", obs.grafana_url);
686 },
687 Err(e) => {
688 warn!("⚠️ Failed to spawn observability stack: {e}");
689 },
690 }
691 }
692
693 for cp in &network_spec.custom_processes {
695 if let Err(e) = spawner::spawn_process(cp, ns.clone()).await {
696 warn!("⚠️ Failed to spawn custom process {}, err: {e}", cp.name())
697 }
698 }
699
700 network.set_start_time_ts(start_time);
701
702 write_zombie_json(serde_json::to_value(&network)?, scoped_fs, ns.name()).await?;
703
704 if network_spec.global_settings.tear_down_on_failure() {
705 network.spawn_watching_task();
706 }
707
708 generators::cleanup_db_snapshot_cache(&resolved_db_snapshots).await;
709
710 Ok(network)
711 }
712}
713
714async fn recreate_network_nodes_from_json(
717 nodes_json: &serde_json::Value,
718 ns: DynNamespace,
719 provider_name: &str,
720) -> Result<Vec<NetworkNode>, OrchestratorError> {
721 let raw_nodes: Vec<RawNetworkNode> = serde_json::from_value(nodes_json.clone())?;
722
723 let mut nodes = Vec::with_capacity(raw_nodes.len());
724 for raw in raw_nodes {
725 let provider_tag = raw
727 .inner
728 .get("provider_tag")
729 .and_then(|v| v.as_str())
730 .ok_or_else(|| {
731 OrchestratorError::InvalidConfig(format!(
732 "Node '{}' is missing `provider_tag` in inner node JSON",
733 raw.name
734 ))
735 })?;
736
737 if provider_tag != provider_name {
738 return Err(OrchestratorError::InvalidConfigForProvider(
739 provider_name.to_string(),
740 provider_tag.to_string(),
741 ));
742 }
743 let inner = ns.spawn_node_from_json(&raw.inner).await?;
744 let relay_node = NetworkNode::new(
745 raw.name,
746 raw.ws_uri,
747 raw.prometheus_uri,
748 raw.multiaddr,
749 raw.spec,
750 inner,
751 raw.cmd_generator_opts,
752 raw.context,
753 );
754 nodes.push(relay_node);
755 }
756
757 Ok(nodes)
758}
759
760async fn recreate_relaychain_from_json(
761 zombie_json: &serde_json::Value,
762 ns: DynNamespace,
763 provider_name: &str,
764) -> Result<(Relaychain, NetworkSpec), OrchestratorError> {
765 let relay_json = zombie_json
766 .get("relay")
767 .ok_or(OrchestratorError::InvalidConfig(
768 "Missing `relay` field in zombie.json".into(),
769 ))?
770 .clone();
771
772 let mut relay_raw: RawRelaychain = serde_json::from_value(relay_json)?;
773
774 let initial_spec: NetworkSpec = serde_json::from_value(
775 zombie_json
776 .get("initial_spec")
777 .ok_or(OrchestratorError::InvalidConfig(
778 "Missing `initial_spec` field in zombie.json".into(),
779 ))?
780 .clone(),
781 )?;
782
783 let nodes =
785 recreate_network_nodes_from_json(&relay_raw.nodes, ns.clone(), provider_name).await?;
786 relay_raw.inner.nodes = nodes;
787
788 Ok((relay_raw.inner, initial_spec))
789}
790
791async fn recreate_parachains_from_json(
792 zombie_json: &serde_json::Value,
793 ns: DynNamespace,
794 provider_name: &str,
795) -> Result<HashMap<u32, Vec<Parachain>>, OrchestratorError> {
796 let paras_json = zombie_json
797 .get("parachains")
798 .ok_or(OrchestratorError::InvalidConfig(
799 "Missing `parachains` field in zombie.json".into(),
800 ))?
801 .clone();
802
803 let raw_paras: HashMap<u32, Vec<RawParachain>> = serde_json::from_value(paras_json)?;
804
805 let mut parachains_map = HashMap::new();
806
807 for (id, parachain_entries) in raw_paras {
808 let mut parsed_vec = Vec::with_capacity(parachain_entries.len());
809
810 for raw_para in parachain_entries {
811 let mut para = raw_para.inner;
812 para.collators =
813 recreate_network_nodes_from_json(&raw_para.collators, ns.clone(), provider_name)
814 .await?;
815 parsed_vec.push(para);
816 }
817
818 parachains_map.insert(id, parsed_vec);
819 }
820
821 Ok(parachains_map)
822}
823
824fn split_nodes_by_bootnodes(
827 nodes: &[NodeSpec],
828 no_default_bootnodes: bool,
829) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
830 let mut bootnodes = vec![];
832 let mut other_nodes = vec![];
833 nodes.iter().for_each(|node| {
834 if node.is_bootnode {
835 bootnodes.push(node)
836 } else {
837 other_nodes.push(node)
838 }
839 });
840
841 if bootnodes.is_empty() && !no_default_bootnodes {
842 bootnodes.push(other_nodes.remove(0))
843 }
844
845 (bootnodes, other_nodes)
846}
847
848fn generate_bootnode_addr(
850 node: &NetworkNode,
851 ip: &IpAddr,
852 port: u16,
853) -> Result<String, GeneratorError> {
854 generators::generate_node_bootnode_addr(
855 &node.spec.peer_id,
856 ip,
857 port,
858 node.inner.args().as_ref(),
859 &node.spec.p2p_cert_hash,
860 )
861}
862fn validate_spec_with_provider_capabilities(
864 network_spec: &NetworkSpec,
865 capabilities: &ProviderCapabilities,
866) -> Result<(), anyhow::Error> {
867 let mut errs: Vec<String> = vec![];
868
869 if capabilities.requires_image {
870 if network_spec.relaychain.default_image.is_none() {
872 let nodes = &network_spec.relaychain.nodes;
874 if nodes.iter().any(|node| node.image.is_none()) {
875 errs.push(String::from(
876 "Missing image for node, and not default is set at relaychain",
877 ));
878 }
879 };
880
881 for para in &network_spec.parachains {
883 if para.default_image.is_none() {
884 let nodes = ¶.collators;
885 if nodes.iter().any(|node| node.image.is_none()) {
886 errs.push(format!(
887 "Missing image for node, and not default is set at parachain {}",
888 para.id
889 ));
890 }
891 }
892 }
893 } else {
894 let mut cmds: HashSet<&str> = Default::default();
897 if let Some(cmd) = network_spec.relaychain.default_command.as_ref() {
898 cmds.insert(cmd.as_str());
899 }
900 for node in network_spec.relaychain().nodes.iter() {
901 cmds.insert(node.command());
902 }
903
904 for para in &network_spec.parachains {
906 if let Some(cmd) = para.default_command.as_ref() {
907 cmds.insert(cmd.as_str());
908 }
909
910 for node in para.collators.iter() {
911 cmds.insert(node.command());
912 }
913 }
914
915 let path = std::env::var("PATH").unwrap_or_default(); trace!("current PATH: {path}");
918 let parts: Vec<_> = path.split(":").collect();
919 for cmd in cmds {
920 let missing = if cmd.contains('/') {
921 trace!("checking {cmd}");
922 if std::fs::metadata(cmd).is_err() {
923 true
924 } else {
925 info!("🔎 We will use the full path {cmd} to spawn nodes.");
926 false
927 }
928 } else {
929 !parts.iter().any(|part| {
931 let path_to = format!("{part}/{cmd}");
932 trace!("checking {path_to}");
933 let check_result = std::fs::metadata(&path_to);
934 trace!("result {:?}", check_result);
935 if check_result.is_ok() {
936 info!("🔎 We will use the cmd: '{cmd}' at path {path_to} to spawn nodes.");
937 true
938 } else {
939 false
940 }
941 })
942 };
943
944 if missing {
945 errs.push(help_msg(cmd));
946 }
947 }
948 }
949
950 if !errs.is_empty() {
951 let msg = errs.join("\n");
952 return Err(anyhow::anyhow!(format!("Invalid configuration: \n {msg}")));
953 }
954
955 Ok(())
956}
957
958fn help_msg(cmd: &str) -> String {
959 match cmd {
960 "parachain-template-node" | "solochain-template-node" | "minimal-template-node" => {
961 format!("Missing binary {cmd}, compile by running: \n\tcargo build --package {cmd} --release")
962 },
963 "polkadot" => {
964 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --locked --release --features fast-runtime --bin {cmd} --bin polkadot-prepare-worker --bin polkadot-execute-worker")
965 },
966 "polkadot-parachain" => {
967 format!("Missing binary {cmd}, compile by running (in the polkadot-sdk repo): \n\t cargo build --release --locked -p {cmd}-bin --bin {cmd}")
968 },
969 _ => {
970 format!("Missing binary {cmd}, please compile it.")
971 },
972 }
973}
974
975fn spawn_concurrency_from_env() -> Option<usize> {
977 if let Ok(concurrency) = env::var("ZOMBIE_SPAWN_CONCURRENCY") {
978 concurrency.parse::<usize>().ok()
979 } else {
980 None
981 }
982}
983
984fn calculate_concurrency(spec: &NetworkSpec) -> Result<(usize, bool), anyhow::Error> {
985 let desired_spawn_concurrency = match (
986 spawn_concurrency_from_env(),
987 spec.global_settings.spawn_concurrency(),
988 ) {
989 (Some(n), _) => Some(n),
990 (None, Some(n)) => Some(n),
991 _ => None,
992 };
993
994 let (spawn_concurrency, limited_by_tokens) =
995 if let Some(spawn_concurrency) = desired_spawn_concurrency {
996 if spawn_concurrency == 1 {
997 (1, false)
998 } else if has_tokens(&serde_json::to_string(spec)?) {
999 (1, true)
1000 } else {
1001 (spawn_concurrency, false)
1002 }
1003 } else {
1004 if has_tokens(&serde_json::to_string(spec)?) {
1006 (1, true)
1007 } else {
1008 (100, false)
1010 }
1011 };
1012
1013 Ok((spawn_concurrency, limited_by_tokens))
1014}
1015
1016fn dependency_levels_among<'a>(
1021 nodes: &'a [&'a NodeSpec],
1022) -> Result<Vec<Vec<&'a NodeSpec>>, OrchestratorError> {
1023 let by_name = nodes
1024 .iter()
1025 .map(|n| (n.name.as_str(), *n))
1026 .collect::<HashMap<_, _>>();
1027
1028 let mut graph = HashMap::with_capacity(nodes.len());
1029 let mut indegree = HashMap::with_capacity(nodes.len());
1030
1031 for node in nodes {
1032 graph.insert(node.name.as_str(), Vec::new());
1033 indegree.insert(node.name.as_str(), 0);
1034 }
1035
1036 for &node in nodes {
1038 if let Ok(args_json) = serde_json::to_string(&node.args) {
1039 let unique_deps = get_tokens_to_replace(&args_json)
1041 .into_iter()
1042 .filter(|dep| dep != &node.name)
1043 .filter_map(|dep| by_name.get(dep.as_str()))
1044 .map(|&dep_node| dep_node.name.as_str())
1045 .collect::<HashSet<_>>();
1046
1047 for dep_name in unique_deps {
1048 graph
1049 .get_mut(dep_name)
1050 .expect(&format!("{GRAPH_CONTAINS_DEP} {THIS_IS_A_BUG}"))
1051 .push(node);
1052 *indegree
1053 .get_mut(node.name.as_str())
1054 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}")) += 1;
1055 }
1056 }
1057 }
1058
1059 let mut queue = nodes
1061 .iter()
1062 .filter(|n| {
1063 *indegree
1064 .get(n.name.as_str())
1065 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1066 == 0
1067 })
1068 .copied()
1069 .collect::<VecDeque<_>>();
1070
1071 let mut processed_count = 0;
1072 let mut levels = Vec::new();
1073
1074 while !queue.is_empty() {
1076 let level_size = queue.len();
1077 let mut current_level = Vec::with_capacity(level_size);
1078
1079 for _ in 0..level_size {
1080 let n = queue
1081 .pop_front()
1082 .expect(&format!("{QUEUE_NOT_EMPTY} {THIS_IS_A_BUG}"));
1083 current_level.push(n);
1084 processed_count += 1;
1085
1086 for &neighbour in graph
1087 .get(n.name.as_str())
1088 .expect(&format!("{GRAPH_CONTAINS_NAME} {THIS_IS_A_BUG}"))
1089 {
1090 let neighbour_indegree = indegree
1091 .get_mut(neighbour.name.as_str())
1092 .expect(&format!("{INDEGREE_CONTAINS_NAME} {THIS_IS_A_BUG}"));
1093 *neighbour_indegree -= 1;
1094
1095 if *neighbour_indegree == 0 {
1096 queue.push_back(neighbour);
1097 }
1098 }
1099 }
1100
1101 current_level.sort_by_key(|n| &n.name);
1102 levels.push(current_level);
1103 }
1104
1105 if processed_count != nodes.len() {
1107 return Err(OrchestratorError::InvalidConfig(
1108 "Tokens have cyclical dependencies".to_string(),
1109 ));
1110 }
1111
1112 Ok(levels)
1113}
1114
1115#[derive(Clone, Debug)]
1123pub struct ScopedFilesystem<'a, FS: FileSystem> {
1124 fs: &'a FS,
1125 base_dir: &'a str,
1126}
1127
1128impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
1129 pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
1130 Self { fs, base_dir }
1131 }
1132
1133 async fn copy_files(&self, files: Vec<&TransferedFile>) -> Result<(), FileSystemError> {
1134 for file in files {
1135 let full_remote_path = PathBuf::from(format!(
1136 "{}/{}",
1137 self.base_dir,
1138 file.remote_path.to_string_lossy()
1139 ));
1140 trace!("coping file: {file}");
1141 self.fs
1142 .copy(file.local_path.as_path(), full_remote_path)
1143 .await?;
1144 }
1145 Ok(())
1146 }
1147
1148 async fn read(&self, file: impl AsRef<Path>) -> Result<Vec<u8>, FileSystemError> {
1149 let file = file.as_ref();
1150
1151 let full_path = if file.is_absolute() {
1152 file.to_owned()
1153 } else {
1154 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1155 };
1156 let content = self.fs.read(full_path).await?;
1157 Ok(content)
1158 }
1159
1160 async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
1161 let file = file.as_ref();
1162
1163 let full_path = if file.is_absolute() {
1164 file.to_owned()
1165 } else {
1166 PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
1167 };
1168 let content = self.fs.read_to_string(full_path).await?;
1169 Ok(content)
1170 }
1171
1172 async fn create_dir(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1173 let path = PathBuf::from(format!(
1174 "{}/{}",
1175 self.base_dir,
1176 path.as_ref().to_string_lossy()
1177 ));
1178 self.fs.create_dir(path).await
1179 }
1180
1181 async fn create_dir_all(&self, path: impl AsRef<Path>) -> Result<(), FileSystemError> {
1182 let path = PathBuf::from(format!(
1183 "{}/{}",
1184 self.base_dir,
1185 path.as_ref().to_string_lossy()
1186 ));
1187 self.fs.create_dir_all(path).await
1188 }
1189
1190 async fn write(
1191 &self,
1192 path: impl AsRef<Path>,
1193 contents: impl AsRef<[u8]> + Send,
1194 ) -> Result<(), FileSystemError> {
1195 let path = path.as_ref();
1196
1197 let full_path = if path.is_absolute() {
1198 path.to_owned()
1199 } else {
1200 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1201 };
1202
1203 self.fs.write(full_path, contents).await
1204 }
1205
1206 fn full_path(&self, path: impl AsRef<Path>) -> PathBuf {
1208 let path = path.as_ref();
1209
1210 let full_path = if path.is_absolute() {
1211 path.to_owned()
1212 } else {
1213 PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
1214 };
1215
1216 full_path
1217 }
1218
1219 fn base_dir(&self) -> &str {
1221 self.base_dir
1222 }
1223}
1224
1225#[derive(Clone, Debug)]
1226pub enum ZombieRole {
1227 Temp,
1228 Node,
1229 Bootnode,
1230 Collator,
1231 CumulusCollator,
1232 Companion,
1233}
1234
1235pub use network::{AddCollatorOptions, AddNodeOptions};
1237pub use network_helper::metrics;
1238pub use sc_chain_spec;
1239
1240#[cfg(test)]
1241mod tests {
1242 use configuration::{GlobalSettingsBuilder, NetworkConfigBuilder};
1243 use lazy_static::lazy_static;
1244 use tokio::sync::Mutex;
1245
1246 use super::*;
1247
1248 const ENV_KEY: &str = "ZOMBIE_SPAWN_CONCURRENCY";
1249 lazy_static! {
1251 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1252 }
1253
1254 fn set_env(concurrency: Option<u32>) {
1255 if let Some(value) = concurrency {
1256 env::set_var(ENV_KEY, value.to_string());
1257 } else {
1258 env::remove_var(ENV_KEY);
1259 }
1260 }
1261
1262 fn generate(
1263 with_image: bool,
1264 with_cmd: Option<&'static str>,
1265 ) -> Result<NetworkConfig, Vec<anyhow::Error>> {
1266 NetworkConfigBuilder::new()
1267 .with_relaychain(|r| {
1268 let mut relay = r
1269 .with_chain("rococo-local")
1270 .with_default_command(with_cmd.unwrap_or("polkadot"));
1271 if with_image {
1272 relay = relay.with_default_image("docker.io/parity/polkadot")
1273 }
1274
1275 relay
1276 .with_validator(|node| node.with_name("alice"))
1277 .with_validator(|node| node.with_name("bob"))
1278 })
1279 .with_parachain(|p| {
1280 p.with_id(2000).cumulus_based(true).with_collator(|n| {
1281 let node = n
1282 .with_name("collator")
1283 .with_command(with_cmd.unwrap_or("polkadot-parachain"));
1284 if with_image {
1285 node.with_image("docker.io/paritypr/test-parachain")
1286 } else {
1287 node
1288 }
1289 })
1290 })
1291 .build()
1292 }
1293
1294 fn get_node_with_dependencies(name: &str, dependencies: Option<Vec<&NodeSpec>>) -> NodeSpec {
1295 let mut spec = NodeSpec {
1296 name: name.to_string(),
1297 ..Default::default()
1298 };
1299 if let Some(dependencies) = dependencies {
1300 for node in dependencies {
1301 spec.args.push(
1302 format!("{{{{ZOMBIE:{}:someField}}}}", node.name)
1303 .as_str()
1304 .into(),
1305 );
1306 }
1307 }
1308 spec
1309 }
1310
1311 fn verify_levels(actual_levels: Vec<Vec<&NodeSpec>>, expected_levels: Vec<Vec<&str>>) {
1312 actual_levels
1313 .iter()
1314 .zip(expected_levels)
1315 .for_each(|(actual_level, expected_level)| {
1316 assert_eq!(actual_level.len(), expected_level.len());
1317 actual_level
1318 .iter()
1319 .zip(expected_level.iter())
1320 .for_each(|(node, expected_name)| assert_eq!(node.name, *expected_name));
1321 });
1322 }
1323
1324 #[tokio::test]
1325 async fn valid_config_with_image() {
1326 let network_config = generate(true, None).unwrap();
1327 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1328 let caps = ProviderCapabilities {
1329 requires_image: true,
1330 has_resources: false,
1331 prefix_with_full_path: false,
1332 use_default_ports_in_cmd: false,
1333 };
1334
1335 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1336 assert!(valid.is_ok())
1337 }
1338
1339 #[tokio::test]
1340 async fn invalid_config_without_image() {
1341 let network_config = generate(false, None).unwrap();
1342 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1343 let caps = ProviderCapabilities {
1344 requires_image: true,
1345 has_resources: false,
1346 prefix_with_full_path: false,
1347 use_default_ports_in_cmd: false,
1348 };
1349
1350 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1351 assert!(valid.is_err())
1352 }
1353
1354 #[tokio::test]
1355 async fn invalid_config_missing_cmd() {
1356 let network_config = generate(false, Some("other")).unwrap();
1357 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1358 let caps = ProviderCapabilities {
1359 requires_image: false,
1360 has_resources: false,
1361 prefix_with_full_path: false,
1362 use_default_ports_in_cmd: false,
1363 };
1364
1365 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1366 assert!(valid.is_err())
1367 }
1368
1369 #[tokio::test]
1370 async fn valid_config_present_cmd() {
1371 let network_config = generate(false, Some("cargo")).unwrap();
1372 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1373 let caps = ProviderCapabilities {
1374 requires_image: false,
1375 has_resources: false,
1376 prefix_with_full_path: false,
1377 use_default_ports_in_cmd: false,
1378 };
1379
1380 let valid = validate_spec_with_provider_capabilities(&spec, &caps);
1381 println!("{valid:?}");
1382 assert!(valid.is_ok())
1383 }
1384
1385 #[tokio::test]
1386 async fn default_spawn_concurrency() {
1387 let _g = ENV_MUTEX.lock().await;
1388 set_env(None);
1389 let network_config = generate(false, Some("cargo")).unwrap();
1390 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1391 let (concurrency, _) = calculate_concurrency(&spec).unwrap();
1392 assert_eq!(concurrency, 100);
1393 }
1394
1395 #[tokio::test]
1396 async fn set_spawn_concurrency() {
1397 let _g = ENV_MUTEX.lock().await;
1398 set_env(None);
1399
1400 let network_config = generate(false, Some("cargo")).unwrap();
1401 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1402
1403 let global_settings = GlobalSettingsBuilder::new()
1404 .with_spawn_concurrency(4)
1405 .build()
1406 .unwrap();
1407
1408 spec.set_global_settings(global_settings);
1409 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1410 assert_eq!(concurrency, 4);
1411 assert!(!limited);
1412 }
1413
1414 #[tokio::test]
1415 async fn set_spawn_concurrency_but_limited() {
1416 let _g = ENV_MUTEX.lock().await;
1417 set_env(None);
1418
1419 let network_config = generate(false, Some("cargo")).unwrap();
1420 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1421
1422 let global_settings = GlobalSettingsBuilder::new()
1423 .with_spawn_concurrency(4)
1424 .build()
1425 .unwrap();
1426
1427 spec.set_global_settings(global_settings);
1428 let node = spec.relaychain.nodes.first_mut().unwrap();
1429 node.args
1430 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1431 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1432 assert_eq!(concurrency, 1);
1433 assert!(limited);
1434 }
1435
1436 #[tokio::test]
1437 async fn set_spawn_concurrency_from_env() {
1438 let _g = ENV_MUTEX.lock().await;
1439 set_env(Some(10));
1440
1441 let network_config = generate(false, Some("cargo")).unwrap();
1442 let spec = NetworkSpec::from_config(&network_config).await.unwrap();
1443 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1444 assert_eq!(concurrency, 10);
1445 assert!(!limited);
1446 }
1447
1448 #[tokio::test]
1449 async fn set_spawn_concurrency_from_env_but_limited() {
1450 let _g = ENV_MUTEX.lock().await;
1451 set_env(Some(12));
1452
1453 let network_config = generate(false, Some("cargo")).unwrap();
1454 let mut spec = NetworkSpec::from_config(&network_config).await.unwrap();
1455 let node = spec.relaychain.nodes.first_mut().unwrap();
1456 node.args
1457 .push("--bootnodes {{ZOMBIE:bob:multiAddress')}}".into());
1458 let (concurrency, limited) = calculate_concurrency(&spec).unwrap();
1459 assert_eq!(concurrency, 1);
1460 assert!(limited);
1461 }
1462
1463 #[test]
1464 fn dependency_levels_among_should_work() {
1465 assert!(dependency_levels_among(&[]).unwrap().is_empty());
1467
1468 let alice = get_node_with_dependencies("alice", None);
1470 let nodes = [&alice];
1471
1472 let levels = dependency_levels_among(&nodes).unwrap();
1473 let expected = vec![vec!["alice"]];
1474
1475 verify_levels(levels, expected);
1476
1477 let alice = get_node_with_dependencies("alice", None);
1479 let bob = get_node_with_dependencies("bob", None);
1480 let nodes = [&alice, &bob];
1481
1482 let levels = dependency_levels_among(&nodes).unwrap();
1483 let expected = vec![vec!["alice", "bob"]];
1484
1485 verify_levels(levels, expected);
1486
1487 let alice = get_node_with_dependencies("alice", None);
1489 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1490 let charlie = get_node_with_dependencies("charlie", Some(vec![&bob]));
1491 let nodes = [&alice, &bob, &charlie];
1492
1493 let levels = dependency_levels_among(&nodes).unwrap();
1494 let expected = vec![vec!["alice"], vec!["bob"], vec!["charlie"]];
1495
1496 verify_levels(levels, expected);
1497
1498 let alice = get_node_with_dependencies("alice", None);
1502 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1503 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1504 let nodes = [&alice, &bob, &charlie];
1505
1506 let levels = dependency_levels_among(&nodes).unwrap();
1507 let expected = vec![vec!["alice"], vec!["bob", "charlie"]];
1508
1509 verify_levels(levels, expected);
1510
1511 let alice = get_node_with_dependencies("alice", None);
1515 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1516 let charlie = get_node_with_dependencies("charlie", Some(vec![&alice]));
1517 let dave = get_node_with_dependencies("dave", Some(vec![&charlie, &bob]));
1518 let nodes = [&alice, &bob, &charlie, &dave];
1519
1520 let levels = dependency_levels_among(&nodes).unwrap();
1521 let expected = vec![vec!["alice"], vec!["bob", "charlie"], vec!["dave"]];
1522
1523 verify_levels(levels, expected);
1524 }
1525
1526 #[test]
1527 fn dependency_levels_among_should_detect_cycles() {
1528 let mut alice = get_node_with_dependencies("alice", None);
1529 let bob = get_node_with_dependencies("bob", Some(vec![&alice]));
1530 alice.args.push("{{ZOMBIE:bob:someField}}".into());
1531
1532 assert!(dependency_levels_among(&[&alice, &bob]).is_err())
1533 }
1534}