zombienet_provider/docker/
node.rs

1use std::{
2    collections::HashMap,
3    net::IpAddr,
4    path::{Component, Path, PathBuf},
5    sync::{Arc, Weak},
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use configuration::types::AssetLocation;
12use futures::future::try_join_all;
13use serde::{Deserialize, Serialize};
14use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
15use tokio::{time::sleep, try_join};
16use tracing::debug;
17
18use super::{
19    client::{ContainerRunOptions, DockerClient},
20    namespace::DockerNamespace,
21};
22use crate::{
23    constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR},
24    docker,
25    types::{ExecutionResult, Port, RunCommandOptions, RunScriptOptions, TransferedFile},
26    ProviderError, ProviderNamespace, ProviderNode,
27};
28
29pub(super) struct DockerNodeOptions<'a, FS>
30where
31    FS: FileSystem + Send + Sync + Clone + 'static,
32{
33    pub(super) namespace: &'a Weak<DockerNamespace<FS>>,
34    pub(super) namespace_base_dir: &'a PathBuf,
35    pub(super) name: &'a str,
36    pub(super) image: Option<&'a String>,
37    pub(super) program: &'a str,
38    pub(super) args: &'a [String],
39    pub(super) env: &'a [(String, String)],
40    pub(super) startup_files: &'a [TransferedFile],
41    pub(super) db_snapshot: Option<&'a AssetLocation>,
42    pub(super) docker_client: &'a DockerClient,
43    pub(super) container_name: String,
44    pub(super) filesystem: &'a FS,
45    pub(super) port_mapping: &'a HashMap<Port, Port>,
46}
47
48impl<'a, FS> DockerNodeOptions<'a, FS>
49where
50    FS: FileSystem + Send + Sync + Clone + 'static,
51{
52    pub fn from_deserializable(
53        deserializable: &'a DeserializableDockerNodeOptions,
54        namespace: &'a Weak<DockerNamespace<FS>>,
55        namespace_base_dir: &'a PathBuf,
56        docker_client: &'a DockerClient,
57        filesystem: &'a FS,
58    ) -> Self {
59        DockerNodeOptions {
60            namespace,
61            namespace_base_dir,
62            name: &deserializable.name,
63            image: deserializable.image.as_ref(),
64            program: &deserializable.program,
65            args: &deserializable.args,
66            env: &deserializable.env,
67            startup_files: &[],
68            db_snapshot: None,
69            docker_client,
70            container_name: deserializable.container_name.clone(),
71            filesystem,
72            port_mapping: &deserializable.port_mapping,
73        }
74    }
75}
76
77#[derive(Deserialize)]
78pub(super) struct DeserializableDockerNodeOptions {
79    pub(super) name: String,
80    pub(super) image: Option<String>,
81    pub(super) program: String,
82    pub(super) args: Vec<String>,
83    pub(super) env: Vec<(String, String)>,
84    pub(super) container_name: String,
85    pub(super) port_mapping: HashMap<Port, Port>,
86}
87
88#[derive(Serialize)]
89pub struct DockerNode<FS>
90where
91    FS: FileSystem + Send + Sync + Clone,
92{
93    #[serde(skip)]
94    namespace: Weak<DockerNamespace<FS>>,
95    name: String,
96    image: String,
97    program: String,
98    args: Vec<String>,
99    env: Vec<(String, String)>,
100    base_dir: PathBuf,
101    config_dir: PathBuf,
102    data_dir: PathBuf,
103    relay_data_dir: PathBuf,
104    scripts_dir: PathBuf,
105    log_path: PathBuf,
106    #[serde(skip)]
107    docker_client: DockerClient,
108    container_name: String,
109    port_mapping: HashMap<Port, Port>,
110    #[allow(dead_code)]
111    #[serde(skip)]
112    filesystem: FS,
113    provider_tag: String,
114}
115
116impl<FS> DockerNode<FS>
117where
118    FS: FileSystem + Send + Sync + Clone + 'static,
119{
120    pub(super) async fn new(
121        options: DockerNodeOptions<'_, FS>,
122    ) -> Result<Arc<Self>, ProviderError> {
123        let image = options.image.ok_or_else(|| {
124            ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
125        })?;
126
127        let filesystem = options.filesystem.clone();
128
129        let base_dir =
130            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
131        filesystem.create_dir_all(&base_dir).await?;
132
133        let base_dir_raw = base_dir.to_string_lossy();
134        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
135        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
136        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
137        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
138        let log_path = base_dir.join("node.log");
139
140        try_join!(
141            filesystem.create_dir_all(&config_dir),
142            filesystem.create_dir_all(&data_dir),
143            filesystem.create_dir_all(&relay_data_dir),
144            filesystem.create_dir_all(&scripts_dir),
145        )?;
146
147        let node = Arc::new(DockerNode {
148            namespace: options.namespace.clone(),
149            name: options.name.to_string(),
150            image: image.to_string(),
151            program: options.program.to_string(),
152            args: options.args.to_vec(),
153            env: options.env.to_vec(),
154            base_dir,
155            config_dir,
156            data_dir,
157            relay_data_dir,
158            scripts_dir,
159            log_path,
160            filesystem: filesystem.clone(),
161            docker_client: options.docker_client.clone(),
162            container_name: options.container_name,
163            port_mapping: options.port_mapping.clone(),
164            provider_tag: docker::provider::PROVIDER_NAME.to_string(),
165        });
166
167        node.initialize_docker().await?;
168
169        if let Some(db_snap) = options.db_snapshot {
170            node.initialize_db_snapshot(db_snap).await?;
171        }
172
173        node.initialize_startup_files(options.startup_files).await?;
174
175        node.start().await?;
176
177        Ok(node)
178    }
179
180    pub(super) async fn attach_to_live(
181        options: DockerNodeOptions<'_, FS>,
182    ) -> Result<Arc<Self>, ProviderError> {
183        let image = options.image.ok_or_else(|| {
184            ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
185        })?;
186
187        let filesystem = options.filesystem.clone();
188
189        let base_dir =
190            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
191        filesystem.create_dir_all(&base_dir).await?;
192
193        let base_dir_raw = base_dir.to_string_lossy();
194        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
195        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
196        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
197        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
198        let log_path = base_dir.join("node.log");
199
200        let node = Arc::new(DockerNode {
201            namespace: options.namespace.clone(),
202            name: options.name.to_string(),
203            image: image.to_string(),
204            program: options.program.to_string(),
205            args: options.args.to_vec(),
206            env: options.env.to_vec(),
207            base_dir,
208            config_dir,
209            data_dir,
210            relay_data_dir,
211            scripts_dir,
212            log_path,
213            filesystem: filesystem.clone(),
214            docker_client: options.docker_client.clone(),
215            container_name: options.container_name,
216            port_mapping: options.port_mapping.clone(),
217            provider_tag: docker::provider::PROVIDER_NAME.to_string(),
218        });
219
220        Ok(node)
221    }
222
223    async fn initialize_docker(&self) -> Result<(), ProviderError> {
224        let command = [vec![self.program.to_string()], self.args.to_vec()].concat();
225
226        self.docker_client
227            .container_run(
228                ContainerRunOptions::new(&self.image, command)
229                    .name(&self.container_name)
230                    .env(self.env.clone())
231                    .volume_mounts(HashMap::from([
232                        (
233                            format!("{}-zombie-wrapper", self.namespace_name(),),
234                            "/scripts".to_string(),
235                        ),
236                        (
237                            format!("{}-helper-binaries", self.namespace_name()),
238                            "/helpers".to_string(),
239                        ),
240                        (
241                            self.config_dir.to_string_lossy().into_owned(),
242                            "/cfg".to_string(),
243                        ),
244                        (
245                            self.data_dir.to_string_lossy().into_owned(),
246                            "/data".to_string(),
247                        ),
248                        (
249                            self.relay_data_dir.to_string_lossy().into_owned(),
250                            "/relay-data".to_string(),
251                        ),
252                    ]))
253                    .entrypoint("/scripts/zombie-wrapper.sh")
254                    .port_mapping(&self.port_mapping),
255            )
256            .await
257            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
258
259        // change dirs permission
260        let _ = self
261            .docker_client
262            .container_exec(
263                &self.container_name,
264                ["chmod", "777", "/cfg", "/data", "/relay-data"].into(),
265                None,
266                Some("root"),
267            )
268            .await
269            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
270
271        Ok(())
272    }
273
274    async fn initialize_db_snapshot(
275        &self,
276        _db_snapshot: &AssetLocation,
277    ) -> Result<(), ProviderError> {
278        todo!()
279        // trace!("snap: {db_snapshot}");
280        // let url_of_snap = match db_snapshot {
281        //     AssetLocation::Url(location) => location.clone(),
282        //     AssetLocation::FilePath(filepath) => self.upload_to_fileserver(filepath).await?,
283        // };
284
285        // // we need to get the snapshot from a public access
286        // // and extract to /data
287        // let opts = RunCommandOptions::new("mkdir").args([
288        //     "-p",
289        //     "/data/",
290        //     "&&",
291        //     "mkdir",
292        //     "-p",
293        //     "/relay-data/",
294        //     "&&",
295        //     // Use our version of curl
296        //     "/cfg/curl",
297        //     url_of_snap.as_ref(),
298        //     "--output",
299        //     "/data/db.tgz",
300        //     "&&",
301        //     "cd",
302        //     "/",
303        //     "&&",
304        //     "tar",
305        //     "--skip-old-files",
306        //     "-xzvf",
307        //     "/data/db.tgz",
308        // ]);
309
310        // trace!("cmd opts: {:#?}", opts);
311        // let _ = self.run_command(opts).await?;
312
313        // Ok(())
314    }
315
316    async fn initialize_startup_files(
317        &self,
318        startup_files: &[TransferedFile],
319    ) -> Result<(), ProviderError> {
320        try_join_all(
321            startup_files
322                .iter()
323                .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
324        )
325        .await?;
326
327        Ok(())
328    }
329
330    pub(super) async fn start(&self) -> Result<(), ProviderError> {
331        self.docker_client
332            .container_exec(
333                &self.container_name,
334                vec!["sh", "-c", "echo start > /tmp/zombiepipe"],
335                None,
336                None,
337            )
338            .await
339            .map_err(|err| {
340                ProviderError::NodeSpawningFailed(
341                    format!("failed to start pod {} after spawning", self.name),
342                    err.into(),
343                )
344            })?
345            .map_err(|err| {
346                ProviderError::NodeSpawningFailed(
347                    format!("failed to start pod {} after spawning", self.name,),
348                    anyhow!("command failed in container: status {}: {}", err.0, err.1),
349                )
350            })?;
351
352        Ok(())
353    }
354
355    fn get_remote_parent_dir(&self, remote_file_path: &Path) -> Option<PathBuf> {
356        if let Some(remote_parent_dir) = remote_file_path.parent() {
357            if matches!(
358                remote_parent_dir.components().rev().peekable().peek(),
359                Some(Component::Normal(_))
360            ) {
361                return Some(remote_parent_dir.to_path_buf());
362            }
363        }
364
365        None
366    }
367
368    async fn create_remote_dir(&self, remote_dir: &Path) -> Result<(), ProviderError> {
369        let _ = self
370            .docker_client
371            .container_exec(
372                &self.container_name,
373                vec!["mkdir", "-p", &remote_dir.to_string_lossy()],
374                None,
375                None,
376            )
377            .await
378            .map_err(|err| {
379                ProviderError::NodeSpawningFailed(
380                    format!(
381                        "failed to create dir {} for container {}",
382                        remote_dir.to_string_lossy(),
383                        &self.name
384                    ),
385                    err.into(),
386                )
387            })?;
388
389        Ok(())
390    }
391
392    fn namespace_name(&self) -> String {
393        self.namespace
394            .upgrade()
395            .map(|namespace| namespace.name().to_string())
396            .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
397    }
398}
399
400#[async_trait]
401impl<FS> ProviderNode for DockerNode<FS>
402where
403    FS: FileSystem + Send + Sync + Clone + 'static,
404{
405    fn name(&self) -> &str {
406        &self.name
407    }
408
409    fn args(&self) -> Vec<&str> {
410        self.args.iter().map(|arg| arg.as_str()).collect()
411    }
412
413    fn base_dir(&self) -> &PathBuf {
414        &self.base_dir
415    }
416
417    fn config_dir(&self) -> &PathBuf {
418        &self.config_dir
419    }
420
421    fn data_dir(&self) -> &PathBuf {
422        &self.data_dir
423    }
424
425    fn relay_data_dir(&self) -> &PathBuf {
426        &self.relay_data_dir
427    }
428
429    fn scripts_dir(&self) -> &PathBuf {
430        &self.scripts_dir
431    }
432
433    fn log_path(&self) -> &PathBuf {
434        &self.log_path
435    }
436
437    fn log_cmd(&self) -> String {
438        format!(
439            "{} logs -f {}",
440            self.docker_client.client_binary(),
441            self.container_name
442        )
443    }
444
445    fn path_in_node(&self, file: &Path) -> PathBuf {
446        // here is just a noop op since we will receive the path
447        // for the file inside the pod
448        PathBuf::from(file)
449    }
450
451    async fn logs(&self) -> Result<String, ProviderError> {
452        self.docker_client
453            .container_logs(&self.container_name)
454            .await
455            .map_err(|err| ProviderError::GetLogsFailed(self.name.to_string(), err.into()))
456    }
457
458    async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
459        let logs = self.logs().await?;
460
461        self.filesystem
462            .write(local_dest, logs)
463            .await
464            .map_err(|err| ProviderError::DumpLogsFailed(self.name.to_string(), err.into()))?;
465
466        Ok(())
467    }
468
469    async fn run_command(
470        &self,
471        options: RunCommandOptions,
472    ) -> Result<ExecutionResult, ProviderError> {
473        debug!(
474            "running command for {} with options {:?}",
475            self.name, options
476        );
477        let command = [vec![options.program], options.args].concat();
478
479        self.docker_client
480            .container_exec(
481                &self.container_name,
482                vec!["sh", "-c", &command.join(" ")],
483                Some(
484                    options
485                        .env
486                        .iter()
487                        .map(|(k, v)| (k.as_ref(), v.as_ref()))
488                        .collect(),
489                ),
490                None,
491            )
492            .await
493            .map_err(|err| {
494                ProviderError::RunCommandError(
495                    format!("sh -c {}", &command.join(" ")),
496                    format!("in pod {}", self.name),
497                    err.into(),
498                )
499            })
500    }
501
502    async fn run_script(
503        &self,
504        _options: RunScriptOptions,
505    ) -> Result<ExecutionResult, ProviderError> {
506        todo!()
507    }
508
509    async fn send_file(
510        &self,
511        local_file_path: &Path,
512        remote_file_path: &Path,
513        mode: &str,
514    ) -> Result<(), ProviderError> {
515        if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) {
516            self.create_remote_dir(&remote_parent_dir).await?;
517        }
518
519        debug!(
520            "starting sending file for {}: {} to {} with mode {}",
521            self.name,
522            local_file_path.to_string_lossy(),
523            remote_file_path.to_string_lossy(),
524            mode
525        );
526
527        let _ = self
528            .docker_client
529            .container_cp(&self.container_name, local_file_path, remote_file_path)
530            .await
531            .map_err(|err| {
532                ProviderError::SendFile(
533                    local_file_path.to_string_lossy().to_string(),
534                    self.name.clone(),
535                    err.into(),
536                )
537            });
538
539        let _ = self
540            .docker_client
541            .container_exec(
542                &self.container_name,
543                vec!["chmod", mode, &remote_file_path.to_string_lossy()],
544                None,
545                None,
546            )
547            .await
548            .map_err(|err| {
549                ProviderError::SendFile(
550                    self.name.clone(),
551                    local_file_path.to_string_lossy().to_string(),
552                    err.into(),
553                )
554            })?;
555
556        Ok(())
557    }
558
559    async fn receive_file(
560        &self,
561        _remote_src: &Path,
562        _local_dest: &Path,
563    ) -> Result<(), ProviderError> {
564        Ok(())
565    }
566
567    async fn ip(&self) -> Result<IpAddr, ProviderError> {
568        let ip = self
569            .docker_client
570            .container_ip(&self.container_name)
571            .await
572            .map_err(|err| {
573                ProviderError::InvalidConfig(format!("Error getting container ip, err: {err}"))
574            })?;
575
576        Ok(ip.parse::<IpAddr>().map_err(|err| {
577            ProviderError::InvalidConfig(format!(
578                "Can not parse the container ip: {ip}, err: {err}"
579            ))
580        })?)
581    }
582
583    async fn pause(&self) -> Result<(), ProviderError> {
584        self.docker_client
585            .container_exec(
586                &self.container_name,
587                vec!["sh", "-c", "echo pause > /tmp/zombiepipe"],
588                None,
589                None,
590            )
591            .await
592            .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
593            .map_err(|err| {
594                ProviderError::PauseNodeFailed(
595                    self.name.to_string(),
596                    anyhow!("error when pausing node: status {}: {}", err.0, err.1),
597                )
598            })?;
599
600        Ok(())
601    }
602
603    async fn resume(&self) -> Result<(), ProviderError> {
604        self.docker_client
605            .container_exec(
606                &self.container_name,
607                vec!["sh", "-c", "echo resume > /tmp/zombiepipe"],
608                None,
609                None,
610            )
611            .await
612            .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
613            .map_err(|err| {
614                ProviderError::PauseNodeFailed(
615                    self.name.to_string(),
616                    anyhow!("error when pausing node: status {}: {}", err.0, err.1),
617                )
618            })?;
619
620        Ok(())
621    }
622
623    async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
624        if let Some(duration) = after {
625            sleep(duration).await;
626        }
627
628        self.docker_client
629            .container_exec(
630                &self.container_name,
631                vec!["sh", "-c", "echo restart > /tmp/zombiepipe"],
632                None,
633                None,
634            )
635            .await
636            .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
637            .map_err(|err| {
638                ProviderError::PauseNodeFailed(
639                    self.name.to_string(),
640                    anyhow!("error when pausing node: status {}: {}", err.0, err.1),
641                )
642            })?;
643
644        Ok(())
645    }
646
647    async fn destroy(&self) -> Result<(), ProviderError> {
648        self.docker_client
649            .container_rm(&self.container_name)
650            .await
651            .map_err(|err| ProviderError::KillNodeFailed(self.name.to_string(), err.into()))?;
652
653        if let Some(namespace) = self.namespace.upgrade() {
654            namespace.nodes.write().await.remove(&self.name);
655        }
656
657        Ok(())
658    }
659}