1use std::{
2 collections::HashMap,
3 net::IpAddr,
4 path::{Component, Path, PathBuf},
5 sync::{Arc, Weak},
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use configuration::types::AssetLocation;
12use futures::future::try_join_all;
13use serde::{Deserialize, Serialize};
14use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
15use tokio::{time::sleep, try_join};
16use tracing::debug;
17
18use super::{
19 client::{ContainerRunOptions, DockerClient},
20 namespace::DockerNamespace,
21};
22use crate::{
23 constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR},
24 docker,
25 types::{ExecutionResult, Port, RunCommandOptions, RunScriptOptions, TransferedFile},
26 ProviderError, ProviderNamespace, ProviderNode,
27};
28
29pub(super) struct DockerNodeOptions<'a, FS>
30where
31 FS: FileSystem + Send + Sync + Clone + 'static,
32{
33 pub(super) namespace: &'a Weak<DockerNamespace<FS>>,
34 pub(super) namespace_base_dir: &'a PathBuf,
35 pub(super) name: &'a str,
36 pub(super) image: Option<&'a String>,
37 pub(super) program: &'a str,
38 pub(super) args: &'a [String],
39 pub(super) env: &'a [(String, String)],
40 pub(super) startup_files: &'a [TransferedFile],
41 pub(super) db_snapshot: Option<&'a AssetLocation>,
42 pub(super) docker_client: &'a DockerClient,
43 pub(super) container_name: String,
44 pub(super) filesystem: &'a FS,
45 pub(super) port_mapping: &'a HashMap<Port, Port>,
46}
47
48impl<'a, FS> DockerNodeOptions<'a, FS>
49where
50 FS: FileSystem + Send + Sync + Clone + 'static,
51{
52 pub fn from_deserializable(
53 deserializable: &'a DeserializableDockerNodeOptions,
54 namespace: &'a Weak<DockerNamespace<FS>>,
55 namespace_base_dir: &'a PathBuf,
56 docker_client: &'a DockerClient,
57 filesystem: &'a FS,
58 ) -> Self {
59 DockerNodeOptions {
60 namespace,
61 namespace_base_dir,
62 name: &deserializable.name,
63 image: deserializable.image.as_ref(),
64 program: &deserializable.program,
65 args: &deserializable.args,
66 env: &deserializable.env,
67 startup_files: &[],
68 db_snapshot: None,
69 docker_client,
70 container_name: deserializable.container_name.clone(),
71 filesystem,
72 port_mapping: &deserializable.port_mapping,
73 }
74 }
75}
76
77#[derive(Deserialize)]
78pub(super) struct DeserializableDockerNodeOptions {
79 pub(super) name: String,
80 pub(super) image: Option<String>,
81 pub(super) program: String,
82 pub(super) args: Vec<String>,
83 pub(super) env: Vec<(String, String)>,
84 pub(super) container_name: String,
85 pub(super) port_mapping: HashMap<Port, Port>,
86}
87
88#[derive(Serialize)]
89pub struct DockerNode<FS>
90where
91 FS: FileSystem + Send + Sync + Clone,
92{
93 #[serde(skip)]
94 namespace: Weak<DockerNamespace<FS>>,
95 name: String,
96 image: String,
97 program: String,
98 args: Vec<String>,
99 env: Vec<(String, String)>,
100 base_dir: PathBuf,
101 config_dir: PathBuf,
102 data_dir: PathBuf,
103 relay_data_dir: PathBuf,
104 scripts_dir: PathBuf,
105 log_path: PathBuf,
106 #[serde(skip)]
107 docker_client: DockerClient,
108 container_name: String,
109 port_mapping: HashMap<Port, Port>,
110 #[allow(dead_code)]
111 #[serde(skip)]
112 filesystem: FS,
113 provider_tag: String,
114}
115
116impl<FS> DockerNode<FS>
117where
118 FS: FileSystem + Send + Sync + Clone + 'static,
119{
120 pub(super) async fn new(
121 options: DockerNodeOptions<'_, FS>,
122 ) -> Result<Arc<Self>, ProviderError> {
123 let image = options.image.ok_or_else(|| {
124 ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
125 })?;
126
127 let filesystem = options.filesystem.clone();
128
129 let base_dir =
130 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
131 filesystem.create_dir_all(&base_dir).await?;
132
133 let base_dir_raw = base_dir.to_string_lossy();
134 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
135 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
136 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
137 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
138 let log_path = base_dir.join("node.log");
139
140 try_join!(
141 filesystem.create_dir_all(&config_dir),
142 filesystem.create_dir_all(&data_dir),
143 filesystem.create_dir_all(&relay_data_dir),
144 filesystem.create_dir_all(&scripts_dir),
145 )?;
146
147 let node = Arc::new(DockerNode {
148 namespace: options.namespace.clone(),
149 name: options.name.to_string(),
150 image: image.to_string(),
151 program: options.program.to_string(),
152 args: options.args.to_vec(),
153 env: options.env.to_vec(),
154 base_dir,
155 config_dir,
156 data_dir,
157 relay_data_dir,
158 scripts_dir,
159 log_path,
160 filesystem: filesystem.clone(),
161 docker_client: options.docker_client.clone(),
162 container_name: options.container_name,
163 port_mapping: options.port_mapping.clone(),
164 provider_tag: docker::provider::PROVIDER_NAME.to_string(),
165 });
166
167 node.initialize_docker().await?;
168
169 if let Some(db_snap) = options.db_snapshot {
170 node.initialize_db_snapshot(db_snap).await?;
171 }
172
173 node.initialize_startup_files(options.startup_files).await?;
174
175 node.start().await?;
176
177 Ok(node)
178 }
179
180 pub(super) async fn attach_to_live(
181 options: DockerNodeOptions<'_, FS>,
182 ) -> Result<Arc<Self>, ProviderError> {
183 let image = options.image.ok_or_else(|| {
184 ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
185 })?;
186
187 let filesystem = options.filesystem.clone();
188
189 let base_dir =
190 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
191 filesystem.create_dir_all(&base_dir).await?;
192
193 let base_dir_raw = base_dir.to_string_lossy();
194 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
195 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
196 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
197 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
198 let log_path = base_dir.join("node.log");
199
200 let node = Arc::new(DockerNode {
201 namespace: options.namespace.clone(),
202 name: options.name.to_string(),
203 image: image.to_string(),
204 program: options.program.to_string(),
205 args: options.args.to_vec(),
206 env: options.env.to_vec(),
207 base_dir,
208 config_dir,
209 data_dir,
210 relay_data_dir,
211 scripts_dir,
212 log_path,
213 filesystem: filesystem.clone(),
214 docker_client: options.docker_client.clone(),
215 container_name: options.container_name,
216 port_mapping: options.port_mapping.clone(),
217 provider_tag: docker::provider::PROVIDER_NAME.to_string(),
218 });
219
220 Ok(node)
221 }
222
223 async fn initialize_docker(&self) -> Result<(), ProviderError> {
224 let command = [vec![self.program.to_string()], self.args.to_vec()].concat();
225
226 self.docker_client
227 .container_run(
228 ContainerRunOptions::new(&self.image, command)
229 .name(&self.container_name)
230 .env(self.env.clone())
231 .volume_mounts(HashMap::from([
232 (
233 format!("{}-zombie-wrapper", self.namespace_name(),),
234 "/scripts".to_string(),
235 ),
236 (
237 format!("{}-helper-binaries", self.namespace_name()),
238 "/helpers".to_string(),
239 ),
240 (
241 self.config_dir.to_string_lossy().into_owned(),
242 "/cfg".to_string(),
243 ),
244 (
245 self.data_dir.to_string_lossy().into_owned(),
246 "/data".to_string(),
247 ),
248 (
249 self.relay_data_dir.to_string_lossy().into_owned(),
250 "/relay-data".to_string(),
251 ),
252 ]))
253 .entrypoint("/scripts/zombie-wrapper.sh")
254 .port_mapping(&self.port_mapping),
255 )
256 .await
257 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
258
259 let _ = self
261 .docker_client
262 .container_exec(
263 &self.container_name,
264 ["chmod", "777", "/cfg", "/data", "/relay-data"].into(),
265 None,
266 Some("root"),
267 )
268 .await
269 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
270
271 Ok(())
272 }
273
274 async fn initialize_db_snapshot(
275 &self,
276 _db_snapshot: &AssetLocation,
277 ) -> Result<(), ProviderError> {
278 todo!()
279 }
315
316 async fn initialize_startup_files(
317 &self,
318 startup_files: &[TransferedFile],
319 ) -> Result<(), ProviderError> {
320 try_join_all(
321 startup_files
322 .iter()
323 .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
324 )
325 .await?;
326
327 Ok(())
328 }
329
330 pub(super) async fn start(&self) -> Result<(), ProviderError> {
331 self.docker_client
332 .container_exec(
333 &self.container_name,
334 vec!["sh", "-c", "echo start > /tmp/zombiepipe"],
335 None,
336 None,
337 )
338 .await
339 .map_err(|err| {
340 ProviderError::NodeSpawningFailed(
341 format!("failed to start pod {} after spawning", self.name),
342 err.into(),
343 )
344 })?
345 .map_err(|err| {
346 ProviderError::NodeSpawningFailed(
347 format!("failed to start pod {} after spawning", self.name,),
348 anyhow!("command failed in container: status {}: {}", err.0, err.1),
349 )
350 })?;
351
352 Ok(())
353 }
354
355 fn get_remote_parent_dir(&self, remote_file_path: &Path) -> Option<PathBuf> {
356 if let Some(remote_parent_dir) = remote_file_path.parent() {
357 if matches!(
358 remote_parent_dir.components().rev().peekable().peek(),
359 Some(Component::Normal(_))
360 ) {
361 return Some(remote_parent_dir.to_path_buf());
362 }
363 }
364
365 None
366 }
367
368 async fn create_remote_dir(&self, remote_dir: &Path) -> Result<(), ProviderError> {
369 let _ = self
370 .docker_client
371 .container_exec(
372 &self.container_name,
373 vec!["mkdir", "-p", &remote_dir.to_string_lossy()],
374 None,
375 None,
376 )
377 .await
378 .map_err(|err| {
379 ProviderError::NodeSpawningFailed(
380 format!(
381 "failed to create dir {} for container {}",
382 remote_dir.to_string_lossy(),
383 &self.name
384 ),
385 err.into(),
386 )
387 })?;
388
389 Ok(())
390 }
391
392 fn namespace_name(&self) -> String {
393 self.namespace
394 .upgrade()
395 .map(|namespace| namespace.name().to_string())
396 .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
397 }
398}
399
400#[async_trait]
401impl<FS> ProviderNode for DockerNode<FS>
402where
403 FS: FileSystem + Send + Sync + Clone + 'static,
404{
405 fn name(&self) -> &str {
406 &self.name
407 }
408
409 fn args(&self) -> Vec<&str> {
410 self.args.iter().map(|arg| arg.as_str()).collect()
411 }
412
413 fn base_dir(&self) -> &PathBuf {
414 &self.base_dir
415 }
416
417 fn config_dir(&self) -> &PathBuf {
418 &self.config_dir
419 }
420
421 fn data_dir(&self) -> &PathBuf {
422 &self.data_dir
423 }
424
425 fn relay_data_dir(&self) -> &PathBuf {
426 &self.relay_data_dir
427 }
428
429 fn scripts_dir(&self) -> &PathBuf {
430 &self.scripts_dir
431 }
432
433 fn log_path(&self) -> &PathBuf {
434 &self.log_path
435 }
436
437 fn log_cmd(&self) -> String {
438 format!(
439 "{} logs -f {}",
440 self.docker_client.client_binary(),
441 self.container_name
442 )
443 }
444
445 fn path_in_node(&self, file: &Path) -> PathBuf {
446 PathBuf::from(file)
449 }
450
451 async fn logs(&self) -> Result<String, ProviderError> {
452 self.docker_client
453 .container_logs(&self.container_name)
454 .await
455 .map_err(|err| ProviderError::GetLogsFailed(self.name.to_string(), err.into()))
456 }
457
458 async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
459 let logs = self.logs().await?;
460
461 self.filesystem
462 .write(local_dest, logs)
463 .await
464 .map_err(|err| ProviderError::DumpLogsFailed(self.name.to_string(), err.into()))?;
465
466 Ok(())
467 }
468
469 async fn run_command(
470 &self,
471 options: RunCommandOptions,
472 ) -> Result<ExecutionResult, ProviderError> {
473 debug!(
474 "running command for {} with options {:?}",
475 self.name, options
476 );
477 let command = [vec![options.program], options.args].concat();
478
479 self.docker_client
480 .container_exec(
481 &self.container_name,
482 vec!["sh", "-c", &command.join(" ")],
483 Some(
484 options
485 .env
486 .iter()
487 .map(|(k, v)| (k.as_ref(), v.as_ref()))
488 .collect(),
489 ),
490 None,
491 )
492 .await
493 .map_err(|err| {
494 ProviderError::RunCommandError(
495 format!("sh -c {}", &command.join(" ")),
496 format!("in pod {}", self.name),
497 err.into(),
498 )
499 })
500 }
501
502 async fn run_script(
503 &self,
504 _options: RunScriptOptions,
505 ) -> Result<ExecutionResult, ProviderError> {
506 todo!()
507 }
508
509 async fn send_file(
510 &self,
511 local_file_path: &Path,
512 remote_file_path: &Path,
513 mode: &str,
514 ) -> Result<(), ProviderError> {
515 if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) {
516 self.create_remote_dir(&remote_parent_dir).await?;
517 }
518
519 debug!(
520 "starting sending file for {}: {} to {} with mode {}",
521 self.name,
522 local_file_path.to_string_lossy(),
523 remote_file_path.to_string_lossy(),
524 mode
525 );
526
527 let _ = self
528 .docker_client
529 .container_cp(&self.container_name, local_file_path, remote_file_path)
530 .await
531 .map_err(|err| {
532 ProviderError::SendFile(
533 local_file_path.to_string_lossy().to_string(),
534 self.name.clone(),
535 err.into(),
536 )
537 });
538
539 let _ = self
540 .docker_client
541 .container_exec(
542 &self.container_name,
543 vec!["chmod", mode, &remote_file_path.to_string_lossy()],
544 None,
545 None,
546 )
547 .await
548 .map_err(|err| {
549 ProviderError::SendFile(
550 self.name.clone(),
551 local_file_path.to_string_lossy().to_string(),
552 err.into(),
553 )
554 })?;
555
556 Ok(())
557 }
558
559 async fn receive_file(
560 &self,
561 _remote_src: &Path,
562 _local_dest: &Path,
563 ) -> Result<(), ProviderError> {
564 Ok(())
565 }
566
567 async fn ip(&self) -> Result<IpAddr, ProviderError> {
568 let ip = self
569 .docker_client
570 .container_ip(&self.container_name)
571 .await
572 .map_err(|err| {
573 ProviderError::InvalidConfig(format!("Error getting container ip, err: {err}"))
574 })?;
575
576 Ok(ip.parse::<IpAddr>().map_err(|err| {
577 ProviderError::InvalidConfig(format!(
578 "Can not parse the container ip: {ip}, err: {err}"
579 ))
580 })?)
581 }
582
583 async fn pause(&self) -> Result<(), ProviderError> {
584 self.docker_client
585 .container_exec(
586 &self.container_name,
587 vec!["sh", "-c", "echo pause > /tmp/zombiepipe"],
588 None,
589 None,
590 )
591 .await
592 .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
593 .map_err(|err| {
594 ProviderError::PauseNodeFailed(
595 self.name.to_string(),
596 anyhow!("error when pausing node: status {}: {}", err.0, err.1),
597 )
598 })?;
599
600 Ok(())
601 }
602
603 async fn resume(&self) -> Result<(), ProviderError> {
604 self.docker_client
605 .container_exec(
606 &self.container_name,
607 vec!["sh", "-c", "echo resume > /tmp/zombiepipe"],
608 None,
609 None,
610 )
611 .await
612 .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
613 .map_err(|err| {
614 ProviderError::PauseNodeFailed(
615 self.name.to_string(),
616 anyhow!("error when pausing node: status {}: {}", err.0, err.1),
617 )
618 })?;
619
620 Ok(())
621 }
622
623 async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
624 if let Some(duration) = after {
625 sleep(duration).await;
626 }
627
628 self.docker_client
629 .container_exec(
630 &self.container_name,
631 vec!["sh", "-c", "echo restart > /tmp/zombiepipe"],
632 None,
633 None,
634 )
635 .await
636 .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
637 .map_err(|err| {
638 ProviderError::PauseNodeFailed(
639 self.name.to_string(),
640 anyhow!("error when pausing node: status {}: {}", err.0, err.1),
641 )
642 })?;
643
644 Ok(())
645 }
646
647 async fn destroy(&self) -> Result<(), ProviderError> {
648 self.docker_client
649 .container_rm(&self.container_name)
650 .await
651 .map_err(|err| ProviderError::KillNodeFailed(self.name.to_string(), err.into()))?;
652
653 if let Some(namespace) = self.namespace.upgrade() {
654 namespace.nodes.write().await.remove(&self.name);
655 }
656
657 Ok(())
658 }
659}