zombienet_provider/docker/
client.rs

1use std::{collections::HashMap, path::Path, process::Stdio};
2
3use anyhow::anyhow;
4use futures::future::try_join_all;
5use serde::{Deserialize, Deserializer};
6use tokio::process::Command;
7use tracing::{info, trace};
8
9use crate::types::{ExecutionResult, Port};
10
11#[derive(thiserror::Error, Debug)]
12#[error(transparent)]
13pub struct Error(#[from] anyhow::Error);
14
15pub type Result<T> = core::result::Result<T, Error>;
16
17#[derive(Clone)]
18pub struct DockerClient {
19    using_podman: bool,
20}
21
22#[derive(Debug)]
23pub struct ContainerRunOptions {
24    image: String,
25    command: Vec<String>,
26    env: Option<Vec<(String, String)>>,
27    volume_mounts: Option<HashMap<String, String>>,
28    name: Option<String>,
29    entrypoint: Option<String>,
30    port_mapping: HashMap<Port, Port>,
31    rm: bool,
32    detach: bool,
33}
34
35enum Container {
36    Docker(DockerContainer),
37    Podman(PodmanContainer),
38}
39
40// TODO: we may don't need this
41#[allow(dead_code)]
42#[derive(Deserialize, Debug)]
43struct DockerContainer {
44    #[serde(alias = "Names", deserialize_with = "deserialize_list")]
45    names: Vec<String>,
46    #[serde(alias = "Ports", deserialize_with = "deserialize_list")]
47    ports: Vec<String>,
48    #[serde(alias = "State")]
49    state: String,
50}
51
52// TODO: we may don't need this
53#[allow(dead_code)]
54#[derive(Deserialize, Debug)]
55struct PodmanPort {
56    host_ip: String,
57    container_port: u16,
58    host_port: u16,
59    range: u16,
60    protocol: String,
61}
62
63// TODO: we may don't need this
64#[allow(dead_code)]
65#[derive(Deserialize, Debug)]
66struct PodmanContainer {
67    #[serde(alias = "Id")]
68    id: String,
69    #[serde(alias = "Image")]
70    image: String,
71    #[serde(alias = "Mounts")]
72    mounts: Vec<String>,
73    #[serde(alias = "Names")]
74    names: Vec<String>,
75    #[serde(alias = "Ports", deserialize_with = "deserialize_null_as_default")]
76    ports: Vec<PodmanPort>,
77    #[serde(alias = "State")]
78    state: String,
79}
80
81fn deserialize_list<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
82where
83    D: Deserializer<'de>,
84{
85    let str_sequence = String::deserialize(deserializer)?;
86    Ok(str_sequence
87        .split(',')
88        .filter(|item| !item.is_empty())
89        .map(|item| item.to_owned())
90        .collect())
91}
92
93fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error>
94where
95    T: Default + Deserialize<'de>,
96    D: Deserializer<'de>,
97{
98    let opt = Option::deserialize(deserializer)?;
99    Ok(opt.unwrap_or_default())
100}
101
102impl ContainerRunOptions {
103    pub fn new<S>(image: &str, command: Vec<S>) -> Self
104    where
105        S: Into<String> + std::fmt::Debug + Send + Clone,
106    {
107        ContainerRunOptions {
108            image: image.to_string(),
109            command: command
110                .clone()
111                .into_iter()
112                .map(|s| s.into())
113                .collect::<Vec<_>>(),
114            env: None,
115            volume_mounts: None,
116            name: None,
117            entrypoint: None,
118            port_mapping: HashMap::default(),
119            rm: false,
120            detach: true, // add -d flag by default
121        }
122    }
123
124    pub fn env<S>(mut self, env: Vec<(S, S)>) -> Self
125    where
126        S: Into<String> + std::fmt::Debug + Send + Clone,
127    {
128        self.env = Some(
129            env.into_iter()
130                .map(|(name, value)| (name.into(), value.into()))
131                .collect(),
132        );
133        self
134    }
135
136    pub fn volume_mounts<S>(mut self, volume_mounts: HashMap<S, S>) -> Self
137    where
138        S: Into<String> + std::fmt::Debug + Send + Clone,
139    {
140        self.volume_mounts = Some(
141            volume_mounts
142                .into_iter()
143                .map(|(source, target)| (source.into(), target.into()))
144                .collect(),
145        );
146        self
147    }
148
149    pub fn name<S>(mut self, name: S) -> Self
150    where
151        S: Into<String> + std::fmt::Debug + Send + Clone,
152    {
153        self.name = Some(name.into());
154        self
155    }
156
157    pub fn entrypoint<S>(mut self, entrypoint: S) -> Self
158    where
159        S: Into<String> + std::fmt::Debug + Send + Clone,
160    {
161        self.entrypoint = Some(entrypoint.into());
162        self
163    }
164
165    pub fn port_mapping(mut self, port_mapping: &HashMap<Port, Port>) -> Self {
166        self.port_mapping.clone_from(port_mapping);
167        self
168    }
169
170    pub fn rm(mut self) -> Self {
171        self.rm = true;
172        self
173    }
174
175    pub fn detach(mut self, choice: bool) -> Self {
176        self.detach = choice;
177        self
178    }
179}
180
181impl DockerClient {
182    pub async fn new() -> Result<Self> {
183        let using_podman = Self::is_using_podman().await?;
184
185        Ok(DockerClient { using_podman })
186    }
187
188    pub fn client_binary(&self) -> String {
189        String::from(if self.using_podman {
190            "podman"
191        } else {
192            "docker"
193        })
194    }
195
196    async fn is_using_podman() -> Result<bool> {
197        if let Ok(output) = tokio::process::Command::new("docker")
198            .arg("version")
199            .output()
200            .await
201        {
202            // detect whether we're actually running podman with docker emulation
203            return Ok(String::from_utf8_lossy(&output.stdout)
204                .to_lowercase()
205                .contains("podman"));
206        }
207
208        tokio::process::Command::new("podman")
209            .arg("--version")
210            .output()
211            .await
212            .map_err(|err| anyhow!("Failed to detect container engine: {err}"))?;
213
214        Ok(true)
215    }
216}
217
218impl DockerClient {
219    fn client_command(&self) -> tokio::process::Command {
220        tokio::process::Command::new(self.client_binary())
221    }
222
223    pub async fn create_volume(&self, name: &str) -> Result<()> {
224        let result = self
225            .client_command()
226            .args(["volume", "create", name])
227            .output()
228            .await
229            .map_err(|err| anyhow!("Failed to create volume '{name}': {err}"))?;
230
231        if !result.status.success() {
232            return Err(anyhow!(
233                "Failed to create volume '{name}': {}",
234                String::from_utf8_lossy(&result.stderr)
235            )
236            .into());
237        }
238
239        Ok(())
240    }
241
242    pub async fn container_run(&self, options: ContainerRunOptions) -> Result<String> {
243        let mut cmd = self.client_command();
244        cmd.args(["run", "--platform", "linux/amd64"]);
245
246        if options.detach {
247            cmd.arg("-d");
248        }
249
250        Self::apply_cmd_options(&mut cmd, &options);
251
252        trace!("cmd: {:?}", cmd);
253
254        let result = cmd.output().await.map_err(|err| {
255            anyhow!(
256                "Failed to run container with image '{image}' and command '{command}': {err}",
257                image = options.image,
258                command = options.command.join(" "),
259            )
260        })?;
261
262        if !result.status.success() {
263            return Err(anyhow!(
264                "Failed to run container with image '{image}' and command '{command}': {err}",
265                image = options.image,
266                command = options.command.join(" "),
267                err = String::from_utf8_lossy(&result.stderr)
268            )
269            .into());
270        }
271
272        Ok(String::from_utf8_lossy(&result.stdout).to_string())
273    }
274
275    pub async fn container_create(&self, options: ContainerRunOptions) -> Result<String> {
276        let mut cmd = self.client_command();
277        cmd.args(["container", "create"]);
278
279        Self::apply_cmd_options(&mut cmd, &options);
280
281        trace!("cmd: {:?}", cmd);
282
283        let result = cmd.output().await.map_err(|err| {
284            anyhow!(
285                "Failed to run container with image '{image}' and command '{command}': {err}",
286                image = options.image,
287                command = options.command.join(" "),
288            )
289        })?;
290
291        if !result.status.success() {
292            return Err(anyhow!(
293                "Failed to run container with image '{image}' and command '{command}': {err}",
294                image = options.image,
295                command = options.command.join(" "),
296                err = String::from_utf8_lossy(&result.stderr)
297            )
298            .into());
299        }
300
301        Ok(String::from_utf8_lossy(&result.stdout).to_string())
302    }
303
304    pub async fn container_exec<S>(
305        &self,
306        name: &str,
307        command: Vec<S>,
308        env: Option<Vec<(S, S)>>,
309        as_user: Option<S>,
310    ) -> Result<ExecutionResult>
311    where
312        S: Into<String> + std::fmt::Debug + Send + Clone,
313    {
314        let mut cmd = self.client_command();
315        cmd.arg("exec");
316
317        if let Some(env) = env {
318            for env_var in env {
319                cmd.args(["-e", &format!("{}={}", env_var.0.into(), env_var.1.into())]);
320            }
321        }
322
323        if let Some(user) = as_user {
324            cmd.args(["-u", user.into().as_ref()]);
325        }
326
327        cmd.arg(name);
328
329        cmd.args(
330            command
331                .clone()
332                .into_iter()
333                .map(|s| <S as Into<String>>::into(s)),
334        );
335
336        trace!("cmd is : {:?}", cmd);
337
338        let result = cmd.output().await.map_err(|err| {
339            anyhow!(
340                "Failed to exec '{}' on '{}': {err}",
341                command
342                    .into_iter()
343                    .map(|s| <S as Into<String>>::into(s))
344                    .collect::<Vec<_>>()
345                    .join(" "),
346                name,
347            )
348        })?;
349
350        if !result.status.success() {
351            return Ok(Err((
352                result.status,
353                String::from_utf8_lossy(&result.stderr).to_string(),
354            )));
355        }
356
357        Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
358    }
359
360    pub async fn container_cp(
361        &self,
362        name: &str,
363        local_path: &Path,
364        remote_path: &Path,
365    ) -> Result<()> {
366        let result = self
367            .client_command()
368            .args([
369                "cp",
370                local_path.to_string_lossy().as_ref(),
371                &format!("{name}:{}", remote_path.to_string_lossy().as_ref()),
372            ])
373            .output()
374            .await
375            .map_err(|err| {
376                anyhow!(
377                    "Failed copy file '{file}' to container '{name}': {err}",
378                    file = local_path.to_string_lossy(),
379                )
380            })?;
381
382        if !result.status.success() {
383            return Err(anyhow!(
384                "Failed to copy file '{file}' to container '{name}': {err}",
385                file = local_path.to_string_lossy(),
386                err = String::from_utf8_lossy(&result.stderr)
387            )
388            .into());
389        }
390
391        Ok(())
392    }
393
394    pub async fn container_rm(&self, name: &str) -> Result<()> {
395        let result = self
396            .client_command()
397            .args(["rm", "--force", "--volumes", name])
398            .output()
399            .await
400            .map_err(|err| anyhow!("Failed do remove container '{name}: {err}"))?;
401
402        if !result.status.success() {
403            return Err(anyhow!(
404                "Failed to remove container '{name}': {err}",
405                err = String::from_utf8_lossy(&result.stderr)
406            )
407            .into());
408        }
409
410        Ok(())
411    }
412
413    pub async fn namespaced_containers_rm(&self, namespace: &str) -> Result<()> {
414        let container_names: Vec<String> = self
415            .get_containers()
416            .await?
417            .into_iter()
418            .filter_map(|container| match container {
419                Container::Docker(container) => {
420                    if let Some(name) = container.names.first() {
421                        if name.starts_with(namespace) {
422                            return Some(name.to_string());
423                        }
424                    }
425
426                    None
427                },
428                Container::Podman(container) => {
429                    if let Some(name) = container.names.first() {
430                        if name.starts_with(namespace) {
431                            return Some(name.to_string());
432                        }
433                    }
434
435                    None
436                },
437            })
438            .collect();
439
440        info!("{:?}", container_names);
441        let futures = container_names
442            .iter()
443            .map(|name| self.container_rm(name))
444            .collect::<Vec<_>>();
445        try_join_all(futures).await?;
446
447        Ok(())
448    }
449
450    pub async fn container_ip(&self, container_name: &str) -> Result<String> {
451        let ip = if self.using_podman {
452            "127.0.0.1".into()
453        } else {
454            let mut cmd = tokio::process::Command::new("docker");
455            cmd.args(vec![
456                "inspect",
457                "-f",
458                "{{ .NetworkSettings.IPAddress }}",
459                container_name,
460            ]);
461
462            trace!("CMD: {cmd:?}");
463
464            let res = cmd
465                .output()
466                .await
467                .map_err(|err| anyhow!("Failed to get docker container ip,  output: {err}"))?;
468
469            String::from_utf8(res.stdout)
470                .map_err(|err| anyhow!("Failed to get docker container ip,  output: {err}"))?
471                .trim()
472                .into()
473        };
474
475        trace!("IP: {ip}");
476        Ok(ip)
477    }
478
479    async fn get_containers(&self) -> Result<Vec<Container>> {
480        let containers = if self.using_podman {
481            self.get_podman_containers()
482                .await?
483                .into_iter()
484                .map(Container::Podman)
485                .collect()
486        } else {
487            self.get_docker_containers()
488                .await?
489                .into_iter()
490                .map(Container::Docker)
491                .collect()
492        };
493
494        Ok(containers)
495    }
496
497    async fn get_podman_containers(&self) -> Result<Vec<PodmanContainer>> {
498        let res = tokio::process::Command::new("podman")
499            .args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
500            .output()
501            .await
502            .map_err(|err| anyhow!("Failed to get podman containers output: {err}"))?;
503
504        let stdout = String::from_utf8_lossy(&res.stdout);
505
506        let containers = serde_json::from_str(&stdout)
507            .map_err(|err| anyhow!("Failed to parse podman containers output: {err}"))?;
508
509        Ok(containers)
510    }
511
512    async fn get_docker_containers(&self) -> Result<Vec<DockerContainer>> {
513        let res = tokio::process::Command::new("docker")
514            .args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
515            .output()
516            .await
517            .unwrap();
518
519        let stdout = String::from_utf8_lossy(&res.stdout);
520
521        let mut containers = vec![];
522        for line in stdout.lines() {
523            containers.push(
524                serde_json::from_str::<DockerContainer>(line)
525                    .map_err(|err| anyhow!("Failed to parse docker container output: {err}"))?,
526            );
527        }
528
529        Ok(containers)
530    }
531
532    pub(crate) async fn container_logs(&self, container_name: &str) -> Result<String> {
533        let output = Command::new("sh")
534            .arg("-c")
535            .arg(format!("docker logs -t '{container_name}' 2>&1"))
536            .stdout(Stdio::piped())
537            .output()
538            .await
539            .map_err(|err| {
540                anyhow!(
541                    "Failed to spawn docker logs command for container '{container_name}': {err}"
542                )
543            })?;
544
545        let logs = String::from_utf8_lossy(&output.stdout).to_string();
546
547        if !output.status.success() {
548            // stderr was redirected to stdout, so logs should contain the error message if any
549            return Err(anyhow!(
550                "Failed to get logs for container '{name}': {logs}",
551                name = container_name,
552                logs = &logs
553            )
554            .into());
555        }
556
557        Ok(logs)
558    }
559
560    fn apply_cmd_options(cmd: &mut Command, options: &ContainerRunOptions) {
561        if options.rm {
562            cmd.arg("--rm");
563        }
564
565        if let Some(entrypoint) = options.entrypoint.as_ref() {
566            cmd.args(["--entrypoint", entrypoint]);
567        }
568
569        if let Some(volume_mounts) = options.volume_mounts.as_ref() {
570            for (source, target) in volume_mounts {
571                cmd.args(["-v", &format!("{source}:{target}")]);
572            }
573        }
574
575        if let Some(env) = options.env.as_ref() {
576            for env_var in env {
577                cmd.args(["-e", &format!("{}={}", env_var.0, env_var.1)]);
578            }
579        }
580
581        // add published ports
582        for (container_port, host_port) in options.port_mapping.iter() {
583            cmd.args(["-p", &format!("{host_port}:{container_port}")]);
584        }
585
586        if let Some(name) = options.name.as_ref() {
587            cmd.args(["--name", name]);
588        }
589
590        cmd.arg(&options.image);
591
592        for arg in &options.command {
593            cmd.arg(arg);
594        }
595    }
596}