zombienet_provider/docker/
namespace.rs

1use std::{
2    collections::HashMap,
3    path::{Path, PathBuf},
4    sync::{Arc, Weak},
5    thread,
6};
7
8use async_trait::async_trait;
9use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
10use tokio::sync::{Mutex, RwLock};
11use tracing::{debug, trace, warn};
12use uuid::Uuid;
13
14use super::{
15    client::{ContainerRunOptions, DockerClient},
16    node::DockerNode,
17    DockerProvider,
18};
19use crate::{
20    constants::NAMESPACE_PREFIX,
21    docker::{
22        node::{DeserializableDockerNodeOptions, DockerNodeOptions},
23        provider,
24    },
25    shared::helpers::extract_execution_result,
26    types::{
27        GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
28        SpawnNodeOptions,
29    },
30    DynNode, ProviderError, ProviderNamespace, ProviderNode,
31};
32
33pub struct DockerNamespace<FS>
34where
35    FS: FileSystem + Send + Sync + Clone,
36{
37    weak: Weak<DockerNamespace<FS>>,
38    #[allow(dead_code)]
39    provider: Weak<DockerProvider<FS>>,
40    name: String,
41    base_dir: PathBuf,
42    capabilities: ProviderCapabilities,
43    docker_client: DockerClient,
44    filesystem: FS,
45    delete_on_drop: Arc<Mutex<bool>>,
46    pub(super) nodes: RwLock<HashMap<String, Arc<DockerNode<FS>>>>,
47}
48
49impl<FS> DockerNamespace<FS>
50where
51    FS: FileSystem + Send + Sync + Clone + 'static,
52{
53    pub(super) async fn new(
54        provider: &Weak<DockerProvider<FS>>,
55        tmp_dir: &PathBuf,
56        capabilities: &ProviderCapabilities,
57        docker_client: &DockerClient,
58        filesystem: &FS,
59        custom_base_dir: Option<&Path>,
60    ) -> Result<Arc<Self>, ProviderError> {
61        let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
62        let base_dir = if let Some(custom_base_dir) = custom_base_dir {
63            if !filesystem.exists(custom_base_dir).await {
64                filesystem.create_dir(custom_base_dir).await?;
65            } else {
66                warn!(
67                    "⚠️  Using and existing directory {} as base dir",
68                    custom_base_dir.to_string_lossy()
69                );
70            }
71            PathBuf::from(custom_base_dir)
72        } else {
73            let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
74            filesystem.create_dir(&base_dir).await?;
75            base_dir
76        };
77
78        let namespace = Arc::new_cyclic(|weak| DockerNamespace {
79            weak: weak.clone(),
80            provider: provider.clone(),
81            name,
82            base_dir,
83            capabilities: capabilities.clone(),
84            filesystem: filesystem.clone(),
85            docker_client: docker_client.clone(),
86            nodes: RwLock::new(HashMap::new()),
87            delete_on_drop: Arc::new(Mutex::new(true)),
88        });
89
90        namespace.initialize().await?;
91
92        Ok(namespace)
93    }
94
95    pub(super) async fn attach_to_live(
96        provider: &Weak<DockerProvider<FS>>,
97        capabilities: &ProviderCapabilities,
98        docker_client: &DockerClient,
99        filesystem: &FS,
100        custom_base_dir: &Path,
101        name: &str,
102    ) -> Result<Arc<Self>, ProviderError> {
103        let base_dir = custom_base_dir.to_path_buf();
104
105        let namespace = Arc::new_cyclic(|weak| DockerNamespace {
106            weak: weak.clone(),
107            provider: provider.clone(),
108            name: name.to_owned(),
109            base_dir,
110            capabilities: capabilities.clone(),
111            filesystem: filesystem.clone(),
112            docker_client: docker_client.clone(),
113            nodes: RwLock::new(HashMap::new()),
114            delete_on_drop: Arc::new(Mutex::new(false)),
115        });
116
117        Ok(namespace)
118    }
119
120    async fn initialize(&self) -> Result<(), ProviderError> {
121        // let ns_scripts_shared =  PathBuf::from_iter([&self.base_dir, &PathBuf::from("shared-scripts")]);
122        // self.filesystem.create_dir(&ns_scripts_shared).await?;
123        self.initialize_zombie_scripts_volume().await?;
124        self.initialize_helper_binaries_volume().await?;
125
126        Ok(())
127    }
128
129    async fn initialize_zombie_scripts_volume(&self) -> Result<(), ProviderError> {
130        let local_zombie_wrapper_path =
131            PathBuf::from_iter([&self.base_dir, &PathBuf::from("zombie-wrapper.sh")]);
132
133        self.filesystem
134            .write(
135                &local_zombie_wrapper_path,
136                include_str!("../shared/scripts/zombie-wrapper.sh"),
137            )
138            .await?;
139
140        let local_helper_binaries_downloader_path = PathBuf::from_iter([
141            &self.base_dir,
142            &PathBuf::from("helper-binaries-downloader.sh"),
143        ]);
144
145        self.filesystem
146            .write(
147                &local_helper_binaries_downloader_path,
148                include_str!("../shared/scripts/helper-binaries-downloader.sh"),
149            )
150            .await?;
151
152        let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
153        let zombie_wrapper_container_name = format!("{}-scripts", self.name);
154
155        self.docker_client
156            .create_volume(&zombie_wrapper_volume_name)
157            .await
158            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
159
160        self.docker_client
161            .container_create(
162                ContainerRunOptions::new("alpine:latest", vec!["tail", "-f", "/dev/null"])
163                    .volume_mounts(HashMap::from([(
164                        zombie_wrapper_volume_name.as_str(),
165                        "/scripts",
166                    )]))
167                    .name(&zombie_wrapper_container_name)
168                    .rm(),
169            )
170            .await
171            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
172
173        // copy the scripts
174        self.docker_client
175            .container_cp(
176                &zombie_wrapper_container_name,
177                &local_zombie_wrapper_path,
178                &PathBuf::from("/scripts/zombie-wrapper.sh"),
179            )
180            .await
181            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
182
183        self.docker_client
184            .container_cp(
185                &zombie_wrapper_container_name,
186                &local_helper_binaries_downloader_path,
187                &PathBuf::from("/scripts/helper-binaries-downloader.sh"),
188            )
189            .await
190            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
191
192        // set permissions for rwx on whole volume recursively
193        self.docker_client
194            .container_run(
195                ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/scripts"])
196                    .volume_mounts(HashMap::from([(
197                        zombie_wrapper_volume_name.as_ref(),
198                        "/scripts",
199                    )]))
200                    .rm(),
201            )
202            .await
203            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
204
205        Ok(())
206    }
207
208    async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> {
209        let helper_binaries_volume_name = format!("{}-helper-binaries", self.name);
210        let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
211
212        self.docker_client
213            .create_volume(&helper_binaries_volume_name)
214            .await
215            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
216
217        // download binaries to volume
218        self.docker_client
219            .container_run(
220                ContainerRunOptions::new(
221                    "alpine:latest",
222                    vec!["ash", "/scripts/helper-binaries-downloader.sh"],
223                )
224                .volume_mounts(HashMap::from([
225                    (
226                        helper_binaries_volume_name.as_str(),
227                        "/helpers",
228                    ),
229                    (
230                        zombie_wrapper_volume_name.as_ref(),
231                        "/scripts",
232                    )
233                ]))
234                // wait until complete
235                .detach(false)
236                .rm(),
237            )
238            .await
239            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
240
241        // set permissions for rwx on whole volume recursively
242        self.docker_client
243            .container_run(
244                ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/helpers"])
245                    .volume_mounts(HashMap::from([(
246                        helper_binaries_volume_name.as_ref(),
247                        "/helpers",
248                    )]))
249                    .rm(),
250            )
251            .await
252            .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
253
254        Ok(())
255    }
256
257    pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
258        *self.delete_on_drop.lock().await = delete_on_drop;
259    }
260
261    pub async fn delete_on_drop(&self) -> bool {
262        if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
263            *delete_on_drop
264        } else {
265            // if we can't lock just remove the ns
266            true
267        }
268    }
269}
270
271#[async_trait]
272impl<FS> ProviderNamespace for DockerNamespace<FS>
273where
274    FS: FileSystem + Send + Sync + Clone + 'static,
275{
276    fn name(&self) -> &str {
277        &self.name
278    }
279
280    fn base_dir(&self) -> &PathBuf {
281        &self.base_dir
282    }
283
284    fn capabilities(&self) -> &ProviderCapabilities {
285        &self.capabilities
286    }
287
288    fn provider_name(&self) -> &str {
289        provider::PROVIDER_NAME
290    }
291
292    async fn detach(&self) {
293        self.set_delete_on_drop(false).await;
294    }
295
296    async fn is_detached(&self) -> bool {
297        self.delete_on_drop().await
298    }
299
300    async fn nodes(&self) -> HashMap<String, DynNode> {
301        self.nodes
302            .read()
303            .await
304            .iter()
305            .map(|(name, node)| (name.clone(), node.clone() as DynNode))
306            .collect()
307    }
308
309    async fn get_node_available_args(
310        &self,
311        (command, image): (String, Option<String>),
312    ) -> Result<String, ProviderError> {
313        let node_image = image.expect(&format!("image should be present when getting node available args with docker provider {THIS_IS_A_BUG}"));
314
315        let temp_node = self
316            .spawn_node(
317                &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
318                    .image(node_image.clone()),
319            )
320            .await?;
321
322        let available_args_output = temp_node
323            .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
324            .await?
325            .map_err(|(_exit, status)| {
326                ProviderError::NodeAvailableArgsError(node_image, command, status)
327            })?;
328
329        temp_node.destroy().await?;
330
331        Ok(available_args_output)
332    }
333
334    async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
335        debug!("spawn option {:?}", options);
336
337        let node = DockerNode::new(DockerNodeOptions {
338            namespace: &self.weak,
339            namespace_base_dir: &self.base_dir,
340            name: &options.name,
341            image: options.image.as_ref(),
342            program: &options.program,
343            args: &options.args,
344            env: &options.env,
345            startup_files: &options.injected_files,
346            db_snapshot: options.db_snapshot.as_ref(),
347            docker_client: &self.docker_client,
348            container_name: format!("{}-{}", self.name, options.name),
349            filesystem: &self.filesystem,
350            port_mapping: options.port_mapping.as_ref().unwrap_or(&HashMap::default()),
351        })
352        .await?;
353
354        self.nodes
355            .write()
356            .await
357            .insert(node.name().to_string(), node.clone());
358
359        Ok(node)
360    }
361
362    async fn spawn_node_from_json(
363        &self,
364        json_value: &serde_json::Value,
365    ) -> Result<DynNode, ProviderError> {
366        let deserializable: DeserializableDockerNodeOptions =
367            serde_json::from_value(json_value.clone())?;
368        let options = DockerNodeOptions::from_deserializable(
369            &deserializable,
370            &self.weak,
371            &self.base_dir,
372            &self.docker_client,
373            &self.filesystem,
374        );
375
376        let node = DockerNode::attach_to_live(options).await?;
377
378        self.nodes
379            .write()
380            .await
381            .insert(node.name().to_string(), node.clone());
382
383        Ok(node)
384    }
385
386    async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
387        debug!("generate files options {options:#?}");
388
389        let node_name = options
390            .temp_name
391            .unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
392        let node_image = options.image.expect(&format!(
393            "image should be present when generating files with docker provider {THIS_IS_A_BUG}"
394        ));
395
396        // run dummy command in a new container
397        let temp_node = self
398            .spawn_node(
399                &SpawnNodeOptions::new(node_name, "cat".to_string())
400                    .injected_files(options.injected_files)
401                    .image(node_image),
402            )
403            .await?;
404
405        for GenerateFileCommand {
406            program,
407            args,
408            env,
409            local_output_path,
410        } in options.commands
411        {
412            let local_output_full_path = format!(
413                "{}{}{}",
414                self.base_dir.to_string_lossy(),
415                if local_output_path.starts_with("/") {
416                    ""
417                } else {
418                    "/"
419                },
420                local_output_path.to_string_lossy()
421            );
422
423            let contents = extract_execution_result(
424                &temp_node,
425                RunCommandOptions { program, args, env },
426                options.expected_path.as_ref(),
427            )
428            .await?;
429            self.filesystem
430                .write(local_output_full_path, contents)
431                .await
432                .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
433        }
434
435        temp_node.destroy().await
436    }
437
438    async fn static_setup(&self) -> Result<(), ProviderError> {
439        todo!()
440    }
441
442    async fn destroy(&self) -> Result<(), ProviderError> {
443        let _ = self
444            .docker_client
445            .namespaced_containers_rm(&self.name)
446            .await
447            .map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?;
448
449        if let Some(provider) = self.provider.upgrade() {
450            provider.namespaces.write().await.remove(&self.name);
451        }
452
453        Ok(())
454    }
455}
456
457impl<FS> Drop for DockerNamespace<FS>
458where
459    FS: FileSystem + Send + Sync + Clone,
460{
461    fn drop(&mut self) {
462        let ns_name = self.name.clone();
463        if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
464            if *delete_on_drop {
465                let client = self.docker_client.clone();
466                let provider = self.provider.upgrade();
467
468                let handler = thread::spawn(move || {
469                    let rt = tokio::runtime::Runtime::new().unwrap();
470                    rt.block_on(async move {
471                        trace!("🧟 deleting ns {ns_name} from cluster");
472                        let _ = client.namespaced_containers_rm(&ns_name).await;
473                        trace!("✅ deleted");
474                    });
475                });
476
477                if handler.join().is_ok() {
478                    if let Some(provider) = provider {
479                        if let Ok(mut p) = provider.namespaces.try_write() {
480                            p.remove(&self.name);
481                        } else {
482                            warn!(
483                                "⚠️  Can not acquire write lock to the provider, ns {} not removed",
484                                self.name
485                            );
486                        }
487                    }
488                }
489            } else {
490                trace!("⚠️ leaking ns {ns_name} in cluster");
491            }
492        };
493    }
494}