zombienet_provider/native/
namespace.rs1use std::{
2 collections::HashMap,
3 path::{Path, PathBuf},
4 sync::{Arc, Weak},
5};
6
7use async_trait::async_trait;
8use support::fs::FileSystem;
9use tokio::sync::RwLock;
10use tracing::{trace, warn};
11use uuid::Uuid;
12
13use super::node::{NativeNode, NativeNodeOptions};
14use crate::{
15 constants::NAMESPACE_PREFIX,
16 native::{node::DeserializableNativeNodeOptions, provider},
17 shared::helpers::extract_execution_result,
18 types::{
19 GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
20 SpawnNodeOptions,
21 },
22 DynNode, NativeProvider, ProviderError, ProviderNamespace, ProviderNode,
23};
24
25pub(super) struct NativeNamespace<FS>
26where
27 FS: FileSystem + Send + Sync + Clone,
28{
29 weak: Weak<NativeNamespace<FS>>,
30 name: String,
31 provider: Weak<NativeProvider<FS>>,
32 base_dir: PathBuf,
33 capabilities: ProviderCapabilities,
34 filesystem: FS,
35 pub(super) nodes: RwLock<HashMap<String, Arc<NativeNode<FS>>>>,
36}
37
38impl<FS> NativeNamespace<FS>
39where
40 FS: FileSystem + Send + Sync + Clone + 'static,
41{
42 pub(super) async fn new(
43 provider: &Weak<NativeProvider<FS>>,
44 tmp_dir: &PathBuf,
45 capabilities: &ProviderCapabilities,
46 filesystem: &FS,
47 custom_base_dir: Option<&Path>,
48 ) -> Result<Arc<Self>, ProviderError> {
49 let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
50 let base_dir = if let Some(custom_base_dir) = custom_base_dir {
51 if !filesystem.exists(custom_base_dir).await {
52 filesystem.create_dir_all(custom_base_dir).await?;
53 } else {
54 warn!(
55 "⚠️ Using and existing directory {} as base dir",
56 custom_base_dir.to_string_lossy()
57 );
58 }
59 PathBuf::from(custom_base_dir)
60 } else {
61 let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
62 filesystem.create_dir(&base_dir).await?;
63 base_dir
64 };
65
66 Ok(Arc::new_cyclic(|weak| NativeNamespace {
67 weak: weak.clone(),
68 provider: provider.clone(),
69 name,
70 base_dir,
71 capabilities: capabilities.clone(),
72 filesystem: filesystem.clone(),
73 nodes: RwLock::new(HashMap::new()),
74 }))
75 }
76
77 pub(super) async fn attach_to_live(
78 provider: &Weak<NativeProvider<FS>>,
79 capabilities: &ProviderCapabilities,
80 filesystem: &FS,
81 custom_base_dir: &Path,
82 name: &str,
83 ) -> Result<Arc<Self>, ProviderError> {
84 let base_dir = custom_base_dir.to_path_buf();
85
86 Ok(Arc::new_cyclic(|weak| NativeNamespace {
87 weak: weak.clone(),
88 provider: provider.clone(),
89 name: name.to_string(),
90 base_dir,
91 capabilities: capabilities.clone(),
92 filesystem: filesystem.clone(),
93 nodes: RwLock::new(HashMap::new()),
94 }))
95 }
96}
97
98#[async_trait]
99impl<FS> ProviderNamespace for NativeNamespace<FS>
100where
101 FS: FileSystem + Send + Sync + Clone + 'static,
102{
103 fn name(&self) -> &str {
104 &self.name
105 }
106
107 fn base_dir(&self) -> &PathBuf {
108 &self.base_dir
109 }
110
111 fn capabilities(&self) -> &ProviderCapabilities {
112 &self.capabilities
113 }
114
115 fn provider_name(&self) -> &str {
116 provider::PROVIDER_NAME
117 }
118
119 async fn nodes(&self) -> HashMap<String, DynNode> {
120 self.nodes
121 .read()
122 .await
123 .iter()
124 .map(|(name, node)| (name.clone(), node.clone() as DynNode))
125 .collect()
126 }
127
128 async fn get_node_available_args(
129 &self,
130 (command, _image): (String, Option<String>),
131 ) -> Result<String, ProviderError> {
132 let temp_node = self
133 .spawn_node(
134 &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "bash".to_string())
135 .args(vec!["-c", "while :; do sleep 1; done"]),
136 )
137 .await?;
138
139 let available_args_output = temp_node
140 .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
141 .await?
142 .map_err(|(_exit, status)| {
143 ProviderError::NodeAvailableArgsError("".to_string(), command, status)
144 })?;
145
146 temp_node.destroy().await?;
147
148 Ok(available_args_output)
149 }
150
151 async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
152 trace!("spawn node options {options:?}");
153
154 let node = NativeNode::new(NativeNodeOptions {
155 namespace: &self.weak,
156 namespace_base_dir: &self.base_dir,
157 name: &options.name,
158 program: &options.program,
159 args: &options.args,
160 env: &options.env,
161 startup_files: &options.injected_files,
162 created_paths: &options.created_paths,
163 db_snapshot: options.db_snapshot.as_ref(),
164 filesystem: &self.filesystem,
165 node_log_path: options.node_log_path.as_ref(),
166 })
167 .await?;
168
169 self.nodes
170 .write()
171 .await
172 .insert(options.name.clone(), node.clone());
173
174 Ok(node)
175 }
176
177 async fn spawn_node_from_json(
178 &self,
179 json_value: &serde_json::Value,
180 ) -> Result<DynNode, ProviderError> {
181 let deserializable: DeserializableNativeNodeOptions =
182 serde_json::from_value(json_value.clone())?;
183 let options = NativeNodeOptions::from_deserializable(
184 &deserializable,
185 &self.weak,
186 &self.base_dir,
187 &self.filesystem,
188 );
189
190 let pid = json_value
191 .get("process_handle")
192 .and_then(|v| v.as_i64())
193 .ok_or_else(|| ProviderError::InvalidConfig("Missing pid field".to_string()))?
194 as i32;
195 let node = NativeNode::attach_to_live(options, pid).await?;
196
197 self.nodes
198 .write()
199 .await
200 .insert(node.name().to_string(), node.clone());
201
202 Ok(node)
203 }
204
205 async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
206 let node_name = if let Some(name) = options.temp_name {
207 name
208 } else {
209 format!("temp-{}", Uuid::new_v4())
210 };
211
212 let temp_node = self
214 .spawn_node(
215 &SpawnNodeOptions::new(node_name, "bash".to_string())
216 .args(vec!["-c", "while :; do sleep 1; done"])
217 .injected_files(options.injected_files),
218 )
219 .await?;
220
221 for GenerateFileCommand {
222 program,
223 args,
224 env,
225 local_output_path,
226 } in options.commands
227 {
228 trace!(
229 "🏗 building file {:?} in path {} with command {} {}",
230 local_output_path.as_os_str(),
231 self.base_dir.to_string_lossy(),
232 program,
233 args.join(" ")
234 );
235 let local_output_full_path = format!(
236 "{}{}{}",
237 self.base_dir.to_string_lossy(),
238 if local_output_path.starts_with("/") {
239 ""
240 } else {
241 "/"
242 },
243 local_output_path.to_string_lossy()
244 );
245
246 let contents = extract_execution_result(
247 &temp_node,
248 RunCommandOptions { program, args, env },
249 options.expected_path.as_ref(),
250 )
251 .await?;
252 self.filesystem
253 .write(local_output_full_path, contents)
254 .await
255 .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
256 }
257
258 temp_node.destroy().await
259 }
260
261 async fn static_setup(&self) -> Result<(), ProviderError> {
262 todo!()
264 }
265
266 async fn destroy(&self) -> Result<(), ProviderError> {
267 let mut names = vec![];
268
269 for node in self.nodes.read().await.values() {
270 node.abort()
271 .await
272 .map_err(|err| ProviderError::DestroyNodeFailed(node.name().to_string(), err))?;
273 names.push(node.name().to_string());
274 }
275
276 let mut nodes = self.nodes.write().await;
277 for name in names {
278 nodes.remove(&name);
279 }
280
281 if let Some(provider) = self.provider.upgrade() {
282 provider.namespaces.write().await.remove(&self.name);
283 }
284
285 Ok(())
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use support::fs::local::LocalFileSystem;
292
293 use super::*;
294 use crate::{
295 types::{GenerateFileCommand, GenerateFilesOptions},
296 NativeProvider, Provider,
297 };
298
299 fn unique_temp_dir() -> PathBuf {
300 let mut base = std::env::temp_dir();
301 base.push(format!("znet_native_ns_test_{}", uuid::Uuid::new_v4()));
302 base
303 }
304
305 #[tokio::test]
306 async fn generate_files_uses_expected_path_when_provided() {
307 let fs = LocalFileSystem;
308 let provider = NativeProvider::new(fs.clone());
309 let base_dir = unique_temp_dir();
310 let ns = provider
312 .create_namespace_with_base_dir(&base_dir)
313 .await
314 .expect("namespace should be created");
315
316 let expected_path =
318 std::env::temp_dir().join(format!("znet_expected_{}.json", uuid::Uuid::new_v4()));
319
320 let program = "bash".to_string();
322 let script = format!(
323 "echo -n '{{\"hello\":\"world\"}}' > {} && echo should_not_be_used",
324 expected_path.to_string_lossy()
325 );
326 let args: Vec<String> = vec!["-lc".into(), script];
327
328 let out_name = PathBuf::from("result_expected.json");
329 let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
330 let options = GenerateFilesOptions::new(vec![cmd], None, Some(expected_path.clone()));
331
332 ns.generate_files(options)
333 .await
334 .expect("generation should succeed");
335
336 let produced_path = base_dir.join(out_name);
338 let produced = fs
339 .read_to_string(&produced_path)
340 .await
341 .expect("should read produced file");
342 assert_eq!(produced, "{\"hello\":\"world\"}");
343 }
344
345 #[tokio::test]
346 async fn generate_files_uses_stdout_when_expected_path_absent() {
347 let fs = LocalFileSystem;
348 let provider = NativeProvider::new(fs.clone());
349 let base_dir = unique_temp_dir();
350 let ns = provider
351 .create_namespace_with_base_dir(&base_dir)
352 .await
353 .expect("namespace should be created");
354
355 let program = "bash".to_string();
357 let args: Vec<String> = vec!["-lc".into(), "echo -n 42".into()];
358
359 let out_name = PathBuf::from("result_stdout.txt");
360 let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
361 let options = GenerateFilesOptions::new(vec![cmd], None, None);
362
363 ns.generate_files(options)
364 .await
365 .expect("generation should succeed");
366
367 let produced_path = base_dir.join(out_name);
368 let produced = fs
369 .read_to_string(&produced_path)
370 .await
371 .expect("should read produced file");
372 assert_eq!(produced, "42");
373 }
374}