1use std::{
2 collections::HashMap,
3 path::{Path, PathBuf},
4 sync::{Arc, Weak},
5 thread,
6};
7
8use async_trait::async_trait;
9use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
10use tokio::sync::{Mutex, RwLock};
11use tracing::{debug, trace, warn};
12use uuid::Uuid;
13
14use super::{
15 client::{ContainerRunOptions, DockerClient},
16 node::DockerNode,
17 DockerProvider,
18};
19use crate::{
20 constants::NAMESPACE_PREFIX,
21 docker::{
22 node::{DeserializableDockerNodeOptions, DockerNodeOptions},
23 provider,
24 },
25 shared::helpers::extract_execution_result,
26 types::{
27 GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
28 SpawnNodeOptions,
29 },
30 DynNode, ProviderError, ProviderNamespace, ProviderNode,
31};
32
33pub struct DockerNamespace<FS>
34where
35 FS: FileSystem + Send + Sync + Clone,
36{
37 weak: Weak<DockerNamespace<FS>>,
38 #[allow(dead_code)]
39 provider: Weak<DockerProvider<FS>>,
40 name: String,
41 base_dir: PathBuf,
42 capabilities: ProviderCapabilities,
43 docker_client: DockerClient,
44 filesystem: FS,
45 delete_on_drop: Arc<Mutex<bool>>,
46 pub(super) nodes: RwLock<HashMap<String, Arc<DockerNode<FS>>>>,
47}
48
49impl<FS> DockerNamespace<FS>
50where
51 FS: FileSystem + Send + Sync + Clone + 'static,
52{
53 pub(super) async fn new(
54 provider: &Weak<DockerProvider<FS>>,
55 tmp_dir: &PathBuf,
56 capabilities: &ProviderCapabilities,
57 docker_client: &DockerClient,
58 filesystem: &FS,
59 custom_base_dir: Option<&Path>,
60 ) -> Result<Arc<Self>, ProviderError> {
61 let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
62 let base_dir = if let Some(custom_base_dir) = custom_base_dir {
63 if !filesystem.exists(custom_base_dir).await {
64 filesystem.create_dir(custom_base_dir).await?;
65 } else {
66 warn!(
67 "⚠️ Using and existing directory {} as base dir",
68 custom_base_dir.to_string_lossy()
69 );
70 }
71 PathBuf::from(custom_base_dir)
72 } else {
73 let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
74 filesystem.create_dir(&base_dir).await?;
75 base_dir
76 };
77
78 let namespace = Arc::new_cyclic(|weak| DockerNamespace {
79 weak: weak.clone(),
80 provider: provider.clone(),
81 name,
82 base_dir,
83 capabilities: capabilities.clone(),
84 filesystem: filesystem.clone(),
85 docker_client: docker_client.clone(),
86 nodes: RwLock::new(HashMap::new()),
87 delete_on_drop: Arc::new(Mutex::new(true)),
88 });
89
90 namespace.initialize().await?;
91
92 Ok(namespace)
93 }
94
95 pub(super) async fn attach_to_live(
96 provider: &Weak<DockerProvider<FS>>,
97 capabilities: &ProviderCapabilities,
98 docker_client: &DockerClient,
99 filesystem: &FS,
100 custom_base_dir: &Path,
101 name: &str,
102 ) -> Result<Arc<Self>, ProviderError> {
103 let base_dir = custom_base_dir.to_path_buf();
104
105 let namespace = Arc::new_cyclic(|weak| DockerNamespace {
106 weak: weak.clone(),
107 provider: provider.clone(),
108 name: name.to_owned(),
109 base_dir,
110 capabilities: capabilities.clone(),
111 filesystem: filesystem.clone(),
112 docker_client: docker_client.clone(),
113 nodes: RwLock::new(HashMap::new()),
114 delete_on_drop: Arc::new(Mutex::new(false)),
115 });
116
117 Ok(namespace)
118 }
119
120 async fn initialize(&self) -> Result<(), ProviderError> {
121 self.initialize_zombie_scripts_volume().await?;
124 self.initialize_helper_binaries_volume().await?;
125
126 Ok(())
127 }
128
129 async fn initialize_zombie_scripts_volume(&self) -> Result<(), ProviderError> {
130 let local_zombie_wrapper_path =
131 PathBuf::from_iter([&self.base_dir, &PathBuf::from("zombie-wrapper.sh")]);
132
133 self.filesystem
134 .write(
135 &local_zombie_wrapper_path,
136 include_str!("../shared/scripts/zombie-wrapper.sh"),
137 )
138 .await?;
139
140 let local_helper_binaries_downloader_path = PathBuf::from_iter([
141 &self.base_dir,
142 &PathBuf::from("helper-binaries-downloader.sh"),
143 ]);
144
145 self.filesystem
146 .write(
147 &local_helper_binaries_downloader_path,
148 include_str!("../shared/scripts/helper-binaries-downloader.sh"),
149 )
150 .await?;
151
152 let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
153 let zombie_wrapper_container_name = format!("{}-scripts", self.name);
154
155 self.docker_client
156 .create_volume(&zombie_wrapper_volume_name)
157 .await
158 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
159
160 self.docker_client
161 .container_create(
162 ContainerRunOptions::new("alpine:latest", vec!["tail", "-f", "/dev/null"])
163 .volume_mounts(HashMap::from([(
164 zombie_wrapper_volume_name.as_str(),
165 "/scripts",
166 )]))
167 .name(&zombie_wrapper_container_name)
168 .rm(),
169 )
170 .await
171 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
172
173 self.docker_client
175 .container_cp(
176 &zombie_wrapper_container_name,
177 &local_zombie_wrapper_path,
178 &PathBuf::from("/scripts/zombie-wrapper.sh"),
179 )
180 .await
181 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
182
183 self.docker_client
184 .container_cp(
185 &zombie_wrapper_container_name,
186 &local_helper_binaries_downloader_path,
187 &PathBuf::from("/scripts/helper-binaries-downloader.sh"),
188 )
189 .await
190 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
191
192 self.docker_client
194 .container_run(
195 ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/scripts"])
196 .volume_mounts(HashMap::from([(
197 zombie_wrapper_volume_name.as_ref(),
198 "/scripts",
199 )]))
200 .rm(),
201 )
202 .await
203 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
204
205 Ok(())
206 }
207
208 async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> {
209 let helper_binaries_volume_name = format!("{}-helper-binaries", self.name);
210 let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
211
212 self.docker_client
213 .create_volume(&helper_binaries_volume_name)
214 .await
215 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
216
217 self.docker_client
219 .container_run(
220 ContainerRunOptions::new(
221 "alpine:latest",
222 vec!["ash", "/scripts/helper-binaries-downloader.sh"],
223 )
224 .volume_mounts(HashMap::from([
225 (
226 helper_binaries_volume_name.as_str(),
227 "/helpers",
228 ),
229 (
230 zombie_wrapper_volume_name.as_ref(),
231 "/scripts",
232 )
233 ]))
234 .detach(false)
236 .rm(),
237 )
238 .await
239 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
240
241 self.docker_client
243 .container_run(
244 ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/helpers"])
245 .volume_mounts(HashMap::from([(
246 helper_binaries_volume_name.as_ref(),
247 "/helpers",
248 )]))
249 .rm(),
250 )
251 .await
252 .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
253
254 Ok(())
255 }
256
257 pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
258 *self.delete_on_drop.lock().await = delete_on_drop;
259 }
260
261 pub async fn delete_on_drop(&self) -> bool {
262 if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
263 *delete_on_drop
264 } else {
265 true
267 }
268 }
269}
270
271#[async_trait]
272impl<FS> ProviderNamespace for DockerNamespace<FS>
273where
274 FS: FileSystem + Send + Sync + Clone + 'static,
275{
276 fn name(&self) -> &str {
277 &self.name
278 }
279
280 fn base_dir(&self) -> &PathBuf {
281 &self.base_dir
282 }
283
284 fn capabilities(&self) -> &ProviderCapabilities {
285 &self.capabilities
286 }
287
288 fn provider_name(&self) -> &str {
289 provider::PROVIDER_NAME
290 }
291
292 async fn detach(&self) {
293 self.set_delete_on_drop(false).await;
294 }
295
296 async fn is_detached(&self) -> bool {
297 self.delete_on_drop().await
298 }
299
300 async fn nodes(&self) -> HashMap<String, DynNode> {
301 self.nodes
302 .read()
303 .await
304 .iter()
305 .map(|(name, node)| (name.clone(), node.clone() as DynNode))
306 .collect()
307 }
308
309 async fn get_node_available_args(
310 &self,
311 (command, image): (String, Option<String>),
312 ) -> Result<String, ProviderError> {
313 let node_image = image.expect(&format!("image should be present when getting node available args with docker provider {THIS_IS_A_BUG}"));
314
315 let temp_node = self
316 .spawn_node(
317 &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
318 .image(node_image.clone()),
319 )
320 .await?;
321
322 let available_args_output = temp_node
323 .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
324 .await?
325 .map_err(|(_exit, status)| {
326 ProviderError::NodeAvailableArgsError(node_image, command, status)
327 })?;
328
329 temp_node.destroy().await?;
330
331 Ok(available_args_output)
332 }
333
334 async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
335 debug!("spawn option {:?}", options);
336
337 let node = DockerNode::new(DockerNodeOptions {
338 namespace: &self.weak,
339 namespace_base_dir: &self.base_dir,
340 name: &options.name,
341 image: options.image.as_ref(),
342 program: &options.program,
343 args: &options.args,
344 env: &options.env,
345 startup_files: &options.injected_files,
346 db_snapshot: options.db_snapshot.as_ref(),
347 docker_client: &self.docker_client,
348 container_name: format!("{}-{}", self.name, options.name),
349 filesystem: &self.filesystem,
350 port_mapping: options.port_mapping.as_ref().unwrap_or(&HashMap::default()),
351 })
352 .await?;
353
354 self.nodes
355 .write()
356 .await
357 .insert(node.name().to_string(), node.clone());
358
359 Ok(node)
360 }
361
362 async fn spawn_node_from_json(
363 &self,
364 json_value: &serde_json::Value,
365 ) -> Result<DynNode, ProviderError> {
366 let deserializable: DeserializableDockerNodeOptions =
367 serde_json::from_value(json_value.clone())?;
368 let options = DockerNodeOptions::from_deserializable(
369 &deserializable,
370 &self.weak,
371 &self.base_dir,
372 &self.docker_client,
373 &self.filesystem,
374 );
375
376 let node = DockerNode::attach_to_live(options).await?;
377
378 self.nodes
379 .write()
380 .await
381 .insert(node.name().to_string(), node.clone());
382
383 Ok(node)
384 }
385
386 async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
387 debug!("generate files options {options:#?}");
388
389 let node_name = options
390 .temp_name
391 .unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
392 let node_image = options.image.expect(&format!(
393 "image should be present when generating files with docker provider {THIS_IS_A_BUG}"
394 ));
395
396 let temp_node = self
398 .spawn_node(
399 &SpawnNodeOptions::new(node_name, "cat".to_string())
400 .injected_files(options.injected_files)
401 .image(node_image),
402 )
403 .await?;
404
405 for GenerateFileCommand {
406 program,
407 args,
408 env,
409 local_output_path,
410 } in options.commands
411 {
412 let local_output_full_path = format!(
413 "{}{}{}",
414 self.base_dir.to_string_lossy(),
415 if local_output_path.starts_with("/") {
416 ""
417 } else {
418 "/"
419 },
420 local_output_path.to_string_lossy()
421 );
422
423 let contents = extract_execution_result(
424 &temp_node,
425 RunCommandOptions { program, args, env },
426 options.expected_path.as_ref(),
427 )
428 .await?;
429 self.filesystem
430 .write(local_output_full_path, contents)
431 .await
432 .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
433 }
434
435 temp_node.destroy().await
436 }
437
438 async fn static_setup(&self) -> Result<(), ProviderError> {
439 todo!()
440 }
441
442 async fn destroy(&self) -> Result<(), ProviderError> {
443 let _ = self
444 .docker_client
445 .namespaced_containers_rm(&self.name)
446 .await
447 .map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?;
448
449 if let Some(provider) = self.provider.upgrade() {
450 provider.namespaces.write().await.remove(&self.name);
451 }
452
453 Ok(())
454 }
455}
456
457impl<FS> Drop for DockerNamespace<FS>
458where
459 FS: FileSystem + Send + Sync + Clone,
460{
461 fn drop(&mut self) {
462 let ns_name = self.name.clone();
463 if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
464 if *delete_on_drop {
465 let client = self.docker_client.clone();
466 let provider = self.provider.upgrade();
467
468 let handler = thread::spawn(move || {
469 let rt = tokio::runtime::Runtime::new().unwrap();
470 rt.block_on(async move {
471 trace!("🧟 deleting ns {ns_name} from cluster");
472 let _ = client.namespaced_containers_rm(&ns_name).await;
473 trace!("✅ deleted");
474 });
475 });
476
477 if handler.join().is_ok() {
478 if let Some(provider) = provider {
479 if let Ok(mut p) = provider.namespaces.try_write() {
480 p.remove(&self.name);
481 } else {
482 warn!(
483 "⚠️ Can not acquire write lock to the provider, ns {} not removed",
484 self.name
485 );
486 }
487 }
488 }
489 } else {
490 trace!("⚠️ leaking ns {ns_name} in cluster");
491 }
492 };
493 }
494}