zombienet_provider/kubernetes/
namespace.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    env,
4    path::{Path, PathBuf},
5    sync::{Arc, Weak},
6};
7
8use async_trait::async_trait;
9use k8s_openapi::{
10    api::core::v1::{
11        Container, ContainerPort, HTTPGetAction, PodSpec, Probe, ServicePort, ServiceSpec,
12    },
13    apimachinery::pkg::util::intstr::IntOrString,
14};
15use support::{constants::THIS_IS_A_BUG, fs::FileSystem, replacer::apply_replacements};
16use tokio::sync::{Mutex, RwLock};
17use tracing::{debug, trace, warn};
18use uuid::Uuid;
19
20use super::{client::KubernetesClient, node::KubernetesNode};
21use crate::{
22    constants::NAMESPACE_PREFIX,
23    kubernetes::{
24        node::{DeserializableKubernetesNodeOptions, KubernetesNodeOptions},
25        provider,
26    },
27    shared::helpers::{extract_execution_result, running_in_ci},
28    types::{
29        GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
30        SpawnNodeOptions,
31    },
32    DynNode, KubernetesProvider, ProviderError, ProviderNamespace, ProviderNode,
33};
34
35const FILE_SERVER_IMAGE: &str = "europe-west3-docker.pkg.dev/parity-zombienet/zombienet-public-images/zombienet-file-server:latest";
36
37// env var used by our internal CI to pass the namespace created and ready to use
38const ZOMBIE_K8S_CI_NAMESPACE: &str = "ZOMBIE_K8S_CI_NAMESPACE";
39
40pub(super) struct KubernetesNamespace<FS>
41where
42    FS: FileSystem + Send + Sync + Clone,
43{
44    weak: Weak<KubernetesNamespace<FS>>,
45    provider: Weak<KubernetesProvider<FS>>,
46    name: String,
47    base_dir: PathBuf,
48    capabilities: ProviderCapabilities,
49    k8s_client: KubernetesClient,
50    filesystem: FS,
51    file_server_fw_task: RwLock<Option<tokio::task::JoinHandle<()>>>,
52    delete_on_drop: Arc<Mutex<bool>>,
53    pub(super) file_server_port: RwLock<Option<u16>>,
54    pub(super) nodes: RwLock<HashMap<String, Arc<KubernetesNode<FS>>>>,
55}
56
57impl<FS> KubernetesNamespace<FS>
58where
59    FS: FileSystem + Send + Sync + Clone + 'static,
60{
61    pub(super) async fn new(
62        provider: &Weak<KubernetesProvider<FS>>,
63        tmp_dir: &PathBuf,
64        capabilities: &ProviderCapabilities,
65        k8s_client: &KubernetesClient,
66        filesystem: &FS,
67        custom_base_dir: Option<&Path>,
68    ) -> Result<Arc<Self>, ProviderError> {
69        // If the namespace is already provided
70        let name = if let Ok(name) = env::var(ZOMBIE_K8S_CI_NAMESPACE) {
71            name
72        } else {
73            format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4())
74        };
75
76        let base_dir = if let Some(custom_base_dir) = custom_base_dir {
77            if !filesystem.exists(custom_base_dir).await {
78                filesystem.create_dir(custom_base_dir).await?;
79            } else {
80                warn!(
81                    "⚠️ Using and existing directory {} as base dir",
82                    custom_base_dir.to_string_lossy()
83                );
84            }
85            PathBuf::from(custom_base_dir)
86        } else {
87            let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
88            filesystem.create_dir(&base_dir).await?;
89            base_dir
90        };
91
92        let namespace = Arc::new_cyclic(|weak| KubernetesNamespace {
93            weak: weak.clone(),
94            provider: provider.clone(),
95            name,
96            base_dir,
97            capabilities: capabilities.clone(),
98            filesystem: filesystem.clone(),
99            k8s_client: k8s_client.clone(),
100            file_server_port: RwLock::new(None),
101            file_server_fw_task: RwLock::new(None),
102            nodes: RwLock::new(HashMap::new()),
103            delete_on_drop: Arc::new(Mutex::new(true)),
104        });
105
106        namespace.initialize().await?;
107
108        Ok(namespace)
109    }
110
111    pub(super) async fn attach_to_live(
112        provider: &Weak<KubernetesProvider<FS>>,
113        capabilities: &ProviderCapabilities,
114        k8s_client: &KubernetesClient,
115        filesystem: &FS,
116        custom_base_dir: &Path,
117        name: &str,
118    ) -> Result<Arc<Self>, ProviderError> {
119        let base_dir = custom_base_dir.to_path_buf();
120
121        let namespace = Arc::new_cyclic(|weak| KubernetesNamespace {
122            weak: weak.clone(),
123            provider: provider.clone(),
124            name: name.to_owned(),
125            base_dir,
126            capabilities: capabilities.clone(),
127            filesystem: filesystem.clone(),
128            k8s_client: k8s_client.clone(),
129            file_server_port: RwLock::new(None),
130            file_server_fw_task: RwLock::new(None),
131            nodes: RwLock::new(HashMap::new()),
132            delete_on_drop: Arc::new(Mutex::new(false)),
133        });
134
135        namespace.setup_file_server_port_fwd("fileserver").await?;
136
137        Ok(namespace)
138    }
139
140    async fn initialize(&self) -> Result<(), ProviderError> {
141        // Initialize the namespace IFF
142        // we are not in CI or we don't have the env `ZOMBIE_NAMESPACE` set
143        if env::var(ZOMBIE_K8S_CI_NAMESPACE).is_err() || !running_in_ci() {
144            self.initialize_k8s().await?;
145        }
146
147        // Ensure namespace isolation and minimal resources IFF we are running in CI
148        if running_in_ci() {
149            self.initialize_static_resources().await?
150        }
151
152        self.initialize_file_server().await?;
153
154        self.setup_script_config_map(
155            "zombie-wrapper",
156            include_str!("../shared/scripts/zombie-wrapper.sh"),
157            "zombie_wrapper_config_map_manifest.yaml",
158            // TODO: add correct labels
159            BTreeMap::new(),
160        )
161        .await?;
162
163        self.setup_script_config_map(
164            "helper-binaries-downloader",
165            include_str!("../shared/scripts/helper-binaries-downloader.sh"),
166            "helper_binaries_downloader_config_map_manifest.yaml",
167            // TODO: add correct labels
168            BTreeMap::new(),
169        )
170        .await?;
171
172        Ok(())
173    }
174
175    async fn initialize_k8s(&self) -> Result<(), ProviderError> {
176        // TODO (javier): check with Hamid if we are using this labels in any scheduling logic.
177        let labels = BTreeMap::from([
178            (
179                "jobId".to_string(),
180                env::var("CI_JOB_ID").unwrap_or("".to_string()),
181            ),
182            (
183                "projectName".to_string(),
184                env::var("CI_PROJECT_NAME").unwrap_or("".to_string()),
185            ),
186            (
187                "projectId".to_string(),
188                env::var("CI_PROJECT_ID").unwrap_or("".to_string()),
189            ),
190        ]);
191
192        let manifest = self
193            .k8s_client
194            .create_namespace(&self.name, labels)
195            .await
196            .map_err(|err| {
197                ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
198            })?;
199
200        let serialized_manifest = serde_yaml::to_string(&manifest).map_err(|err| {
201            ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
202        })?;
203
204        let dest_path =
205            PathBuf::from_iter([&self.base_dir, &PathBuf::from("namespace_manifest.yaml")]);
206
207        self.filesystem
208            .write(dest_path, serialized_manifest)
209            .await?;
210
211        Ok(())
212    }
213
214    async fn initialize_static_resources(&self) -> Result<(), ProviderError> {
215        let np_manifest = apply_replacements(
216            include_str!("./static-configs/namespace-network-policy.yaml"),
217            &HashMap::from([("namespace", self.name())]),
218        );
219
220        // Apply NetworkPolicy manifest
221        self.k8s_client
222            .create_static_resource(&self.name, &np_manifest)
223            .await
224            .map_err(|err| {
225                ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
226            })?;
227
228        // Apply LimitRange manifest
229        self.k8s_client
230            .create_static_resource(
231                &self.name,
232                include_str!("./static-configs/baseline-resources.yaml"),
233            )
234            .await
235            .map_err(|err| {
236                ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
237            })?;
238        Ok(())
239    }
240
241    async fn initialize_file_server(&self) -> Result<(), ProviderError> {
242        let name = "fileserver".to_string();
243        let labels = BTreeMap::from([
244            ("app.kubernetes.io/name".to_string(), name.clone()),
245            (
246                "x-infra-instance".to_string(),
247                env::var("X_INFRA_INSTANCE").unwrap_or("ondemand".to_string()),
248            ),
249        ]);
250
251        let pod_spec = PodSpec {
252            hostname: Some(name.clone()),
253            containers: vec![Container {
254                name: name.clone(),
255                image: Some(FILE_SERVER_IMAGE.to_string()),
256                image_pull_policy: Some("Always".to_string()),
257                ports: Some(vec![ContainerPort {
258                    container_port: 80,
259                    ..Default::default()
260                }]),
261                startup_probe: Some(Probe {
262                    http_get: Some(HTTPGetAction {
263                        path: Some("/".to_string()),
264                        port: IntOrString::Int(80),
265                        ..Default::default()
266                    }),
267                    initial_delay_seconds: Some(1),
268                    period_seconds: Some(2),
269                    failure_threshold: Some(3),
270                    ..Default::default()
271                }),
272                ..Default::default()
273            }],
274            restart_policy: Some("OnFailure".into()),
275            ..Default::default()
276        };
277
278        let pod_manifest = self
279            .k8s_client
280            .create_pod(&self.name, &name, pod_spec, labels.clone())
281            .await
282            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
283
284        // TODO: remove duplication across methods
285        let pod_serialized_manifest = serde_yaml::to_string(&pod_manifest)
286            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
287
288        let pod_dest_path = PathBuf::from_iter([
289            &self.base_dir,
290            &PathBuf::from("file_server_pod_manifest.yaml"),
291        ]);
292
293        self.filesystem
294            .write(pod_dest_path, pod_serialized_manifest)
295            .await?;
296
297        let service_spec = ServiceSpec {
298            selector: Some(labels.clone()),
299            ports: Some(vec![ServicePort {
300                port: 80,
301                ..Default::default()
302            }]),
303            ..Default::default()
304        };
305
306        let service_manifest = self
307            .k8s_client
308            .create_service(&self.name, &name, service_spec, labels)
309            .await
310            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
311
312        let serialized_service_manifest = serde_yaml::to_string(&service_manifest)
313            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
314
315        let service_dest_path = PathBuf::from_iter([
316            &self.base_dir,
317            &PathBuf::from("file_server_service_manifest.yaml"),
318        ]);
319
320        self.filesystem
321            .write(service_dest_path, serialized_service_manifest)
322            .await?;
323
324        self.setup_file_server_port_fwd(&name).await?;
325
326        Ok(())
327    }
328
329    async fn setup_file_server_port_fwd(&self, name: &str) -> Result<(), ProviderError> {
330        let (port, task) = self
331            .k8s_client
332            .create_pod_port_forward(&self.name, name, 0, 80)
333            .await
334            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
335
336        *self.file_server_port.write().await = Some(port);
337        *self.file_server_fw_task.write().await = Some(task);
338
339        Ok(())
340    }
341
342    async fn setup_script_config_map(
343        &self,
344        name: &str,
345        script_contents: &str,
346        local_manifest_name: &str,
347        labels: BTreeMap<String, String>,
348    ) -> Result<(), ProviderError> {
349        let manifest = self
350            .k8s_client
351            .create_config_map_from_file(
352                &self.name,
353                name,
354                &format!("{name}.sh"),
355                script_contents,
356                labels,
357            )
358            .await
359            .map_err(|err| {
360                ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
361            })?;
362
363        let serializer_manifest = serde_yaml::to_string(&manifest).map_err(|err| {
364            ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
365        })?;
366
367        let dest_path = PathBuf::from_iter([&self.base_dir, &PathBuf::from(local_manifest_name)]);
368
369        self.filesystem
370            .write(dest_path, serializer_manifest)
371            .await?;
372
373        Ok(())
374    }
375
376    pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
377        *self.delete_on_drop.lock().await = delete_on_drop;
378    }
379
380    pub async fn delete_on_drop(&self) -> bool {
381        if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
382            *delete_on_drop
383        } else {
384            // if we can't lock just remove the ns
385            true
386        }
387    }
388}
389
390impl<FS> Drop for KubernetesNamespace<FS>
391where
392    FS: FileSystem + Send + Sync + Clone,
393{
394    fn drop(&mut self) {
395        let ns_name = self.name.clone();
396        if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
397            if *delete_on_drop {
398                let client = self.k8s_client.clone();
399                let provider = self.provider.upgrade();
400                futures::executor::block_on(async move {
401                    trace!("🧟 deleting ns {ns_name} from cluster");
402                    let _ = client.delete_namespace(&ns_name).await;
403                    if let Some(provider) = provider {
404                        provider.namespaces.write().await.remove(&ns_name);
405                    }
406
407                    trace!("✅ deleted");
408                });
409            } else {
410                trace!("⚠️ leaking ns {ns_name} in cluster");
411            }
412        };
413    }
414}
415
416#[async_trait]
417impl<FS> ProviderNamespace for KubernetesNamespace<FS>
418where
419    FS: FileSystem + Send + Sync + Clone + 'static,
420{
421    fn name(&self) -> &str {
422        &self.name
423    }
424
425    fn base_dir(&self) -> &PathBuf {
426        &self.base_dir
427    }
428
429    fn capabilities(&self) -> &ProviderCapabilities {
430        &self.capabilities
431    }
432
433    fn provider_name(&self) -> &str {
434        provider::PROVIDER_NAME
435    }
436
437    async fn detach(&self) {
438        self.set_delete_on_drop(false).await;
439    }
440
441    async fn is_detached(&self) -> bool {
442        self.delete_on_drop().await
443    }
444
445    async fn nodes(&self) -> HashMap<String, DynNode> {
446        self.nodes
447            .read()
448            .await
449            .iter()
450            .map(|(name, node)| (name.clone(), node.clone() as DynNode))
451            .collect()
452    }
453
454    async fn get_node_available_args(
455        &self,
456        (command, image): (String, Option<String>),
457    ) -> Result<String, ProviderError> {
458        let node_image = image.expect(&format!("image should be present when getting node available args with kubernetes provider {THIS_IS_A_BUG}"));
459
460        // run dummy command in new pod
461        let temp_node = self
462            .spawn_node(
463                &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
464                    .image(node_image.clone()),
465            )
466            .await?;
467
468        let available_args_output = temp_node
469            .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
470            .await?
471            .map_err(|(_exit, status)| {
472                ProviderError::NodeAvailableArgsError(node_image, command, status)
473            })?;
474
475        temp_node.destroy().await?;
476
477        Ok(available_args_output)
478    }
479
480    async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
481        trace!("spawn node options {options:?}");
482
483        let node = KubernetesNode::new(KubernetesNodeOptions {
484            namespace: &self.weak,
485            namespace_base_dir: &self.base_dir,
486            name: &options.name,
487            image: options.image.as_ref(),
488            program: &options.program,
489            args: &options.args,
490            env: &options.env,
491            startup_files: &options.injected_files,
492            resources: options.resources.as_ref(),
493            db_snapshot: options.db_snapshot.as_ref(),
494            k8s_client: &self.k8s_client,
495            filesystem: &self.filesystem,
496        })
497        .await?;
498
499        self.nodes
500            .write()
501            .await
502            .insert(node.name().to_string(), node.clone());
503
504        Ok(node)
505    }
506
507    async fn spawn_node_from_json(
508        &self,
509        json_value: &serde_json::Value,
510    ) -> Result<DynNode, ProviderError> {
511        let deserializable: DeserializableKubernetesNodeOptions =
512            serde_json::from_value(json_value.clone())?;
513        let options = KubernetesNodeOptions::from_deserializable(
514            &deserializable,
515            &self.weak,
516            &self.base_dir,
517            &self.k8s_client,
518            &self.filesystem,
519        );
520
521        let node = KubernetesNode::attach_to_live(options).await?;
522
523        self.nodes
524            .write()
525            .await
526            .insert(node.name().to_string(), node.clone());
527
528        Ok(node)
529    }
530
531    async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
532        debug!("generate files options {options:#?}");
533
534        let node_name = options
535            .temp_name
536            .unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
537        let node_image = options
538            .image
539            .expect(&format!("image should be present when generating files with kubernetes provider {THIS_IS_A_BUG}"));
540
541        // run dummy command in new pod
542        let temp_node = self
543            .spawn_node(
544                &SpawnNodeOptions::new(node_name, "cat".to_string())
545                    .injected_files(options.injected_files)
546                    .image(node_image),
547            )
548            .await?;
549
550        for GenerateFileCommand {
551            program,
552            args,
553            env,
554            local_output_path,
555        } in options.commands
556        {
557            let local_output_full_path = format!(
558                "{}{}{}",
559                self.base_dir.to_string_lossy(),
560                if local_output_path.starts_with("/") {
561                    ""
562                } else {
563                    "/"
564                },
565                local_output_path.to_string_lossy()
566            );
567
568            let contents = extract_execution_result(
569                &temp_node,
570                RunCommandOptions { program, args, env },
571                options.expected_path.as_ref(),
572            )
573            .await?;
574            self.filesystem
575                .write(local_output_full_path, contents)
576                .await
577                .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
578        }
579
580        temp_node.destroy().await
581    }
582
583    async fn static_setup(&self) -> Result<(), ProviderError> {
584        todo!()
585    }
586
587    async fn destroy(&self) -> Result<(), ProviderError> {
588        let _ = self
589            .k8s_client
590            .delete_namespace(&self.name)
591            .await
592            .map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?;
593
594        if let Some(provider) = self.provider.upgrade() {
595            provider.namespaces.write().await.remove(&self.name);
596        }
597
598        Ok(())
599    }
600}