1use std::{
2 collections::{BTreeMap, HashMap},
3 env,
4 path::{Path, PathBuf},
5 sync::{Arc, Weak},
6};
7
8use async_trait::async_trait;
9use k8s_openapi::{
10 api::core::v1::{
11 Container, ContainerPort, HTTPGetAction, PodSpec, Probe, ServicePort, ServiceSpec,
12 },
13 apimachinery::pkg::util::intstr::IntOrString,
14};
15use support::{constants::THIS_IS_A_BUG, fs::FileSystem, replacer::apply_replacements};
16use tokio::sync::{Mutex, RwLock};
17use tracing::{debug, trace, warn};
18use uuid::Uuid;
19
20use super::{client::KubernetesClient, node::KubernetesNode};
21use crate::{
22 constants::NAMESPACE_PREFIX,
23 kubernetes::{
24 node::{DeserializableKubernetesNodeOptions, KubernetesNodeOptions},
25 provider,
26 },
27 shared::helpers::{extract_execution_result, running_in_ci},
28 types::{
29 GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
30 SpawnNodeOptions,
31 },
32 DynNode, KubernetesProvider, ProviderError, ProviderNamespace, ProviderNode,
33};
34
35const FILE_SERVER_IMAGE: &str = "europe-west3-docker.pkg.dev/parity-zombienet/zombienet-public-images/zombienet-file-server:latest";
36
37const ZOMBIE_K8S_CI_NAMESPACE: &str = "ZOMBIE_K8S_CI_NAMESPACE";
39
40pub(super) struct KubernetesNamespace<FS>
41where
42 FS: FileSystem + Send + Sync + Clone,
43{
44 weak: Weak<KubernetesNamespace<FS>>,
45 provider: Weak<KubernetesProvider<FS>>,
46 name: String,
47 base_dir: PathBuf,
48 capabilities: ProviderCapabilities,
49 k8s_client: KubernetesClient,
50 filesystem: FS,
51 file_server_fw_task: RwLock<Option<tokio::task::JoinHandle<()>>>,
52 delete_on_drop: Arc<Mutex<bool>>,
53 pub(super) file_server_port: RwLock<Option<u16>>,
54 pub(super) nodes: RwLock<HashMap<String, Arc<KubernetesNode<FS>>>>,
55}
56
57impl<FS> KubernetesNamespace<FS>
58where
59 FS: FileSystem + Send + Sync + Clone + 'static,
60{
61 pub(super) async fn new(
62 provider: &Weak<KubernetesProvider<FS>>,
63 tmp_dir: &PathBuf,
64 capabilities: &ProviderCapabilities,
65 k8s_client: &KubernetesClient,
66 filesystem: &FS,
67 custom_base_dir: Option<&Path>,
68 ) -> Result<Arc<Self>, ProviderError> {
69 let name = if let Ok(name) = env::var(ZOMBIE_K8S_CI_NAMESPACE) {
71 name
72 } else {
73 format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4())
74 };
75
76 let base_dir = if let Some(custom_base_dir) = custom_base_dir {
77 if !filesystem.exists(custom_base_dir).await {
78 filesystem.create_dir(custom_base_dir).await?;
79 } else {
80 warn!(
81 "⚠️ Using and existing directory {} as base dir",
82 custom_base_dir.to_string_lossy()
83 );
84 }
85 PathBuf::from(custom_base_dir)
86 } else {
87 let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
88 filesystem.create_dir(&base_dir).await?;
89 base_dir
90 };
91
92 let namespace = Arc::new_cyclic(|weak| KubernetesNamespace {
93 weak: weak.clone(),
94 provider: provider.clone(),
95 name,
96 base_dir,
97 capabilities: capabilities.clone(),
98 filesystem: filesystem.clone(),
99 k8s_client: k8s_client.clone(),
100 file_server_port: RwLock::new(None),
101 file_server_fw_task: RwLock::new(None),
102 nodes: RwLock::new(HashMap::new()),
103 delete_on_drop: Arc::new(Mutex::new(true)),
104 });
105
106 namespace.initialize().await?;
107
108 Ok(namespace)
109 }
110
111 pub(super) async fn attach_to_live(
112 provider: &Weak<KubernetesProvider<FS>>,
113 capabilities: &ProviderCapabilities,
114 k8s_client: &KubernetesClient,
115 filesystem: &FS,
116 custom_base_dir: &Path,
117 name: &str,
118 ) -> Result<Arc<Self>, ProviderError> {
119 let base_dir = custom_base_dir.to_path_buf();
120
121 let namespace = Arc::new_cyclic(|weak| KubernetesNamespace {
122 weak: weak.clone(),
123 provider: provider.clone(),
124 name: name.to_owned(),
125 base_dir,
126 capabilities: capabilities.clone(),
127 filesystem: filesystem.clone(),
128 k8s_client: k8s_client.clone(),
129 file_server_port: RwLock::new(None),
130 file_server_fw_task: RwLock::new(None),
131 nodes: RwLock::new(HashMap::new()),
132 delete_on_drop: Arc::new(Mutex::new(false)),
133 });
134
135 namespace.setup_file_server_port_fwd("fileserver").await?;
136
137 Ok(namespace)
138 }
139
140 async fn initialize(&self) -> Result<(), ProviderError> {
141 if env::var(ZOMBIE_K8S_CI_NAMESPACE).is_err() || !running_in_ci() {
144 self.initialize_k8s().await?;
145 }
146
147 if running_in_ci() {
149 self.initialize_static_resources().await?
150 }
151
152 self.initialize_file_server().await?;
153
154 self.setup_script_config_map(
155 "zombie-wrapper",
156 include_str!("../shared/scripts/zombie-wrapper.sh"),
157 "zombie_wrapper_config_map_manifest.yaml",
158 BTreeMap::new(),
160 )
161 .await?;
162
163 self.setup_script_config_map(
164 "helper-binaries-downloader",
165 include_str!("../shared/scripts/helper-binaries-downloader.sh"),
166 "helper_binaries_downloader_config_map_manifest.yaml",
167 BTreeMap::new(),
169 )
170 .await?;
171
172 Ok(())
173 }
174
175 async fn initialize_k8s(&self) -> Result<(), ProviderError> {
176 let labels = BTreeMap::from([
178 (
179 "jobId".to_string(),
180 env::var("CI_JOB_ID").unwrap_or("".to_string()),
181 ),
182 (
183 "projectName".to_string(),
184 env::var("CI_PROJECT_NAME").unwrap_or("".to_string()),
185 ),
186 (
187 "projectId".to_string(),
188 env::var("CI_PROJECT_ID").unwrap_or("".to_string()),
189 ),
190 ]);
191
192 let manifest = self
193 .k8s_client
194 .create_namespace(&self.name, labels)
195 .await
196 .map_err(|err| {
197 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
198 })?;
199
200 let serialized_manifest = serde_yaml::to_string(&manifest).map_err(|err| {
201 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
202 })?;
203
204 let dest_path =
205 PathBuf::from_iter([&self.base_dir, &PathBuf::from("namespace_manifest.yaml")]);
206
207 self.filesystem
208 .write(dest_path, serialized_manifest)
209 .await?;
210
211 Ok(())
212 }
213
214 async fn initialize_static_resources(&self) -> Result<(), ProviderError> {
215 let np_manifest = apply_replacements(
216 include_str!("./static-configs/namespace-network-policy.yaml"),
217 &HashMap::from([("namespace", self.name())]),
218 );
219
220 self.k8s_client
222 .create_static_resource(&self.name, &np_manifest)
223 .await
224 .map_err(|err| {
225 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
226 })?;
227
228 self.k8s_client
230 .create_static_resource(
231 &self.name,
232 include_str!("./static-configs/baseline-resources.yaml"),
233 )
234 .await
235 .map_err(|err| {
236 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
237 })?;
238 Ok(())
239 }
240
241 async fn initialize_file_server(&self) -> Result<(), ProviderError> {
242 let name = "fileserver".to_string();
243 let labels = BTreeMap::from([
244 ("app.kubernetes.io/name".to_string(), name.clone()),
245 (
246 "x-infra-instance".to_string(),
247 env::var("X_INFRA_INSTANCE").unwrap_or("ondemand".to_string()),
248 ),
249 ]);
250
251 let pod_spec = PodSpec {
252 hostname: Some(name.clone()),
253 containers: vec![Container {
254 name: name.clone(),
255 image: Some(FILE_SERVER_IMAGE.to_string()),
256 image_pull_policy: Some("Always".to_string()),
257 ports: Some(vec![ContainerPort {
258 container_port: 80,
259 ..Default::default()
260 }]),
261 startup_probe: Some(Probe {
262 http_get: Some(HTTPGetAction {
263 path: Some("/".to_string()),
264 port: IntOrString::Int(80),
265 ..Default::default()
266 }),
267 initial_delay_seconds: Some(1),
268 period_seconds: Some(2),
269 failure_threshold: Some(3),
270 ..Default::default()
271 }),
272 ..Default::default()
273 }],
274 restart_policy: Some("OnFailure".into()),
275 ..Default::default()
276 };
277
278 let pod_manifest = self
279 .k8s_client
280 .create_pod(&self.name, &name, pod_spec, labels.clone())
281 .await
282 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
283
284 let pod_serialized_manifest = serde_yaml::to_string(&pod_manifest)
286 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
287
288 let pod_dest_path = PathBuf::from_iter([
289 &self.base_dir,
290 &PathBuf::from("file_server_pod_manifest.yaml"),
291 ]);
292
293 self.filesystem
294 .write(pod_dest_path, pod_serialized_manifest)
295 .await?;
296
297 let service_spec = ServiceSpec {
298 selector: Some(labels.clone()),
299 ports: Some(vec![ServicePort {
300 port: 80,
301 ..Default::default()
302 }]),
303 ..Default::default()
304 };
305
306 let service_manifest = self
307 .k8s_client
308 .create_service(&self.name, &name, service_spec, labels)
309 .await
310 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
311
312 let serialized_service_manifest = serde_yaml::to_string(&service_manifest)
313 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
314
315 let service_dest_path = PathBuf::from_iter([
316 &self.base_dir,
317 &PathBuf::from("file_server_service_manifest.yaml"),
318 ]);
319
320 self.filesystem
321 .write(service_dest_path, serialized_service_manifest)
322 .await?;
323
324 self.setup_file_server_port_fwd(&name).await?;
325
326 Ok(())
327 }
328
329 async fn setup_file_server_port_fwd(&self, name: &str) -> Result<(), ProviderError> {
330 let (port, task) = self
331 .k8s_client
332 .create_pod_port_forward(&self.name, name, 0, 80)
333 .await
334 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
335
336 *self.file_server_port.write().await = Some(port);
337 *self.file_server_fw_task.write().await = Some(task);
338
339 Ok(())
340 }
341
342 async fn setup_script_config_map(
343 &self,
344 name: &str,
345 script_contents: &str,
346 local_manifest_name: &str,
347 labels: BTreeMap<String, String>,
348 ) -> Result<(), ProviderError> {
349 let manifest = self
350 .k8s_client
351 .create_config_map_from_file(
352 &self.name,
353 name,
354 &format!("{name}.sh"),
355 script_contents,
356 labels,
357 )
358 .await
359 .map_err(|err| {
360 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
361 })?;
362
363 let serializer_manifest = serde_yaml::to_string(&manifest).map_err(|err| {
364 ProviderError::CreateNamespaceFailed(self.name.to_string(), err.into())
365 })?;
366
367 let dest_path = PathBuf::from_iter([&self.base_dir, &PathBuf::from(local_manifest_name)]);
368
369 self.filesystem
370 .write(dest_path, serializer_manifest)
371 .await?;
372
373 Ok(())
374 }
375
376 pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
377 *self.delete_on_drop.lock().await = delete_on_drop;
378 }
379
380 pub async fn delete_on_drop(&self) -> bool {
381 if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
382 *delete_on_drop
383 } else {
384 true
386 }
387 }
388}
389
390impl<FS> Drop for KubernetesNamespace<FS>
391where
392 FS: FileSystem + Send + Sync + Clone,
393{
394 fn drop(&mut self) {
395 let ns_name = self.name.clone();
396 if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
397 if *delete_on_drop {
398 let client = self.k8s_client.clone();
399 let provider = self.provider.upgrade();
400 futures::executor::block_on(async move {
401 trace!("🧟 deleting ns {ns_name} from cluster");
402 let _ = client.delete_namespace(&ns_name).await;
403 if let Some(provider) = provider {
404 provider.namespaces.write().await.remove(&ns_name);
405 }
406
407 trace!("✅ deleted");
408 });
409 } else {
410 trace!("⚠️ leaking ns {ns_name} in cluster");
411 }
412 };
413 }
414}
415
416#[async_trait]
417impl<FS> ProviderNamespace for KubernetesNamespace<FS>
418where
419 FS: FileSystem + Send + Sync + Clone + 'static,
420{
421 fn name(&self) -> &str {
422 &self.name
423 }
424
425 fn base_dir(&self) -> &PathBuf {
426 &self.base_dir
427 }
428
429 fn capabilities(&self) -> &ProviderCapabilities {
430 &self.capabilities
431 }
432
433 fn provider_name(&self) -> &str {
434 provider::PROVIDER_NAME
435 }
436
437 async fn detach(&self) {
438 self.set_delete_on_drop(false).await;
439 }
440
441 async fn is_detached(&self) -> bool {
442 self.delete_on_drop().await
443 }
444
445 async fn nodes(&self) -> HashMap<String, DynNode> {
446 self.nodes
447 .read()
448 .await
449 .iter()
450 .map(|(name, node)| (name.clone(), node.clone() as DynNode))
451 .collect()
452 }
453
454 async fn get_node_available_args(
455 &self,
456 (command, image): (String, Option<String>),
457 ) -> Result<String, ProviderError> {
458 let node_image = image.expect(&format!("image should be present when getting node available args with kubernetes provider {THIS_IS_A_BUG}"));
459
460 let temp_node = self
462 .spawn_node(
463 &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
464 .image(node_image.clone()),
465 )
466 .await?;
467
468 let available_args_output = temp_node
469 .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
470 .await?
471 .map_err(|(_exit, status)| {
472 ProviderError::NodeAvailableArgsError(node_image, command, status)
473 })?;
474
475 temp_node.destroy().await?;
476
477 Ok(available_args_output)
478 }
479
480 async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
481 trace!("spawn node options {options:?}");
482
483 let node = KubernetesNode::new(KubernetesNodeOptions {
484 namespace: &self.weak,
485 namespace_base_dir: &self.base_dir,
486 name: &options.name,
487 image: options.image.as_ref(),
488 program: &options.program,
489 args: &options.args,
490 env: &options.env,
491 startup_files: &options.injected_files,
492 resources: options.resources.as_ref(),
493 db_snapshot: options.db_snapshot.as_ref(),
494 k8s_client: &self.k8s_client,
495 filesystem: &self.filesystem,
496 })
497 .await?;
498
499 self.nodes
500 .write()
501 .await
502 .insert(node.name().to_string(), node.clone());
503
504 Ok(node)
505 }
506
507 async fn spawn_node_from_json(
508 &self,
509 json_value: &serde_json::Value,
510 ) -> Result<DynNode, ProviderError> {
511 let deserializable: DeserializableKubernetesNodeOptions =
512 serde_json::from_value(json_value.clone())?;
513 let options = KubernetesNodeOptions::from_deserializable(
514 &deserializable,
515 &self.weak,
516 &self.base_dir,
517 &self.k8s_client,
518 &self.filesystem,
519 );
520
521 let node = KubernetesNode::attach_to_live(options).await?;
522
523 self.nodes
524 .write()
525 .await
526 .insert(node.name().to_string(), node.clone());
527
528 Ok(node)
529 }
530
531 async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
532 debug!("generate files options {options:#?}");
533
534 let node_name = options
535 .temp_name
536 .unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
537 let node_image = options
538 .image
539 .expect(&format!("image should be present when generating files with kubernetes provider {THIS_IS_A_BUG}"));
540
541 let temp_node = self
543 .spawn_node(
544 &SpawnNodeOptions::new(node_name, "cat".to_string())
545 .injected_files(options.injected_files)
546 .image(node_image),
547 )
548 .await?;
549
550 for GenerateFileCommand {
551 program,
552 args,
553 env,
554 local_output_path,
555 } in options.commands
556 {
557 let local_output_full_path = format!(
558 "{}{}{}",
559 self.base_dir.to_string_lossy(),
560 if local_output_path.starts_with("/") {
561 ""
562 } else {
563 "/"
564 },
565 local_output_path.to_string_lossy()
566 );
567
568 let contents = extract_execution_result(
569 &temp_node,
570 RunCommandOptions { program, args, env },
571 options.expected_path.as_ref(),
572 )
573 .await?;
574 self.filesystem
575 .write(local_output_full_path, contents)
576 .await
577 .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
578 }
579
580 temp_node.destroy().await
581 }
582
583 async fn static_setup(&self) -> Result<(), ProviderError> {
584 todo!()
585 }
586
587 async fn destroy(&self) -> Result<(), ProviderError> {
588 let _ = self
589 .k8s_client
590 .delete_namespace(&self.name)
591 .await
592 .map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?;
593
594 if let Some(provider) = self.provider.upgrade() {
595 provider.namespaces.write().await.remove(&self.name);
596 }
597
598 Ok(())
599 }
600}