zombienet_provider/kubernetes/
node.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    env,
4    net::IpAddr,
5    path::{Component, Path, PathBuf},
6    sync::{Arc, Weak},
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use async_trait::async_trait;
12use configuration::{shared::resources::Resources, types::AssetLocation};
13use futures::future::try_join_all;
14use k8s_openapi::api::core::v1::{ServicePort, ServiceSpec};
15use serde::{Deserialize, Serialize};
16use sha2::Digest;
17use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
18use tokio::{sync::RwLock, task::JoinHandle, time::sleep, try_join};
19use tracing::{debug, trace, warn};
20use url::Url;
21
22use super::{
23    client::KubernetesClient, namespace::KubernetesNamespace, pod_spec_builder::PodSpecBuilder,
24};
25use crate::{
26    constants::{
27        NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR, P2P_PORT,
28        PROMETHEUS_PORT, RPC_HTTP_PORT, RPC_WS_PORT,
29    },
30    kubernetes,
31    types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile},
32    ProviderError, ProviderNamespace, ProviderNode,
33};
34
35pub(super) struct KubernetesNodeOptions<'a, FS>
36where
37    FS: FileSystem + Send + Sync + Clone + 'static,
38{
39    pub(super) namespace: &'a Weak<KubernetesNamespace<FS>>,
40    pub(super) namespace_base_dir: &'a PathBuf,
41    pub(super) name: &'a str,
42    pub(super) image: Option<&'a String>,
43    pub(super) program: &'a str,
44    pub(super) args: &'a [String],
45    pub(super) env: &'a [(String, String)],
46    pub(super) startup_files: &'a [TransferedFile],
47    pub(super) resources: Option<&'a Resources>,
48    pub(super) db_snapshot: Option<&'a AssetLocation>,
49    pub(super) k8s_client: &'a KubernetesClient,
50    pub(super) filesystem: &'a FS,
51}
52
53impl<'a, FS> KubernetesNodeOptions<'a, FS>
54where
55    FS: FileSystem + Send + Sync + Clone + 'static,
56{
57    pub(super) fn from_deserializable(
58        deserializable: &'a DeserializableKubernetesNodeOptions,
59        namespace: &'a Weak<KubernetesNamespace<FS>>,
60        namespace_base_dir: &'a PathBuf,
61        k8s_client: &'a KubernetesClient,
62        filesystem: &'a FS,
63    ) -> KubernetesNodeOptions<'a, FS> {
64        KubernetesNodeOptions {
65            namespace,
66            namespace_base_dir,
67            name: &deserializable.name,
68            image: deserializable.image.as_ref(),
69            program: &deserializable.program,
70            args: &deserializable.args,
71            env: &deserializable.env,
72            startup_files: &[],
73            resources: deserializable.resources.as_ref(),
74            db_snapshot: None,
75            k8s_client,
76            filesystem,
77        }
78    }
79}
80
81#[derive(Deserialize)]
82pub(super) struct DeserializableKubernetesNodeOptions {
83    pub(super) name: String,
84    pub(super) image: Option<String>,
85    pub(super) program: String,
86    pub(super) args: Vec<String>,
87    pub(super) env: Vec<(String, String)>,
88    pub(super) resources: Option<Resources>,
89}
90
91type FwdInfo = (u16, JoinHandle<()>);
92
93#[derive(Serialize)]
94pub(super) struct KubernetesNode<FS>
95where
96    FS: FileSystem + Send + Sync + Clone,
97{
98    #[serde(skip)]
99    namespace: Weak<KubernetesNamespace<FS>>,
100    name: String,
101    image: String,
102    program: String,
103    args: Vec<String>,
104    env: Vec<(String, String)>,
105    resources: Option<Resources>,
106    base_dir: PathBuf,
107    config_dir: PathBuf,
108    data_dir: PathBuf,
109    relay_data_dir: PathBuf,
110    scripts_dir: PathBuf,
111    log_path: PathBuf,
112    #[serde(skip)]
113    k8s_client: KubernetesClient,
114    #[serde(skip)]
115    http_client: reqwest::Client,
116    #[serde(skip)]
117    filesystem: FS,
118    #[serde(skip)]
119    port_fwds: RwLock<HashMap<u16, FwdInfo>>,
120    provider_tag: String,
121}
122
123impl<FS> KubernetesNode<FS>
124where
125    FS: FileSystem + Send + Sync + Clone + 'static,
126{
127    pub(super) async fn new(
128        options: KubernetesNodeOptions<'_, FS>,
129    ) -> Result<Arc<Self>, ProviderError> {
130        let image = options.image.ok_or_else(|| {
131            ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
132        })?;
133
134        let filesystem = options.filesystem.clone();
135
136        let base_dir =
137            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
138        filesystem.create_dir_all(&base_dir).await?;
139
140        let base_dir_raw = base_dir.to_string_lossy();
141        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
142        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
143        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
144        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
145        let log_path = base_dir.join("node.log");
146
147        try_join!(
148            filesystem.create_dir(&config_dir),
149            filesystem.create_dir(&data_dir),
150            filesystem.create_dir(&relay_data_dir),
151            filesystem.create_dir(&scripts_dir),
152        )?;
153
154        let node = Arc::new(KubernetesNode {
155            namespace: options.namespace.clone(),
156            name: options.name.to_string(),
157            image: image.to_string(),
158            program: options.program.to_string(),
159            args: options.args.to_vec(),
160            env: options.env.to_vec(),
161            resources: options.resources.cloned(),
162            base_dir,
163            config_dir,
164            data_dir,
165            relay_data_dir,
166            scripts_dir,
167            log_path,
168            filesystem: filesystem.clone(),
169            k8s_client: options.k8s_client.clone(),
170            http_client: reqwest::Client::new(),
171            port_fwds: Default::default(),
172            provider_tag: kubernetes::provider::PROVIDER_NAME.to_string(),
173        });
174
175        node.initialize_k8s().await?;
176
177        if let Some(db_snap) = options.db_snapshot {
178            node.initialize_db_snapshot(db_snap).await?;
179        }
180
181        node.initialize_startup_files(options.startup_files).await?;
182
183        node.start().await?;
184
185        Ok(node)
186    }
187
188    pub(super) async fn attach_to_live(
189        options: KubernetesNodeOptions<'_, FS>,
190    ) -> Result<Arc<Self>, ProviderError> {
191        let image = options.image.ok_or_else(|| {
192            ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
193        })?;
194
195        let filesystem = options.filesystem.clone();
196
197        let base_dir =
198            PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
199        filesystem.create_dir_all(&base_dir).await?;
200
201        let base_dir_raw = base_dir.to_string_lossy();
202        let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
203        let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
204        let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
205        let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
206        let log_path = base_dir.join("node.log");
207
208        let node = Arc::new(KubernetesNode {
209            namespace: options.namespace.clone(),
210            name: options.name.to_string(),
211            image: image.to_string(),
212            program: options.program.to_string(),
213            args: options.args.to_vec(),
214            env: options.env.to_vec(),
215            resources: options.resources.cloned(),
216            base_dir,
217            config_dir,
218            data_dir,
219            relay_data_dir,
220            scripts_dir,
221            log_path,
222            filesystem: filesystem.clone(),
223            k8s_client: options.k8s_client.clone(),
224            http_client: reqwest::Client::new(),
225            port_fwds: Default::default(),
226            provider_tag: kubernetes::provider::PROVIDER_NAME.to_string(),
227        });
228
229        Ok(node)
230    }
231
232    async fn initialize_k8s(&self) -> Result<(), ProviderError> {
233        let labels = BTreeMap::from([
234            (
235                "app.kubernetes.io/name".to_string(),
236                self.name().to_string(),
237            ),
238            (
239                "x-infra-instance".to_string(),
240                env::var("X_INFRA_INSTANCE").unwrap_or("ondemand".to_string()),
241            ),
242        ]);
243
244        // Create pod
245        let pod_spec = PodSpecBuilder::build(
246            &self.name,
247            &self.image,
248            self.resources.as_ref(),
249            &self.program,
250            &self.args,
251            &self.env,
252        );
253
254        let manifest = self
255            .k8s_client
256            .create_pod(&self.namespace_name(), &self.name, pod_spec, labels.clone())
257            .await
258            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
259
260        let serialized_manifest = serde_yaml::to_string(&manifest)
261            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
262
263        let dest_path = PathBuf::from_iter([
264            &self.base_dir,
265            &PathBuf::from(format!("{}_manifest.yaml", &self.name)),
266        ]);
267
268        self.filesystem
269            .write(dest_path, serialized_manifest)
270            .await
271            .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
272
273        // Create service for pod
274        let service_spec = ServiceSpec {
275            selector: Some(labels.clone()),
276            ports: Some(vec![
277                ServicePort {
278                    port: P2P_PORT.into(),
279                    name: Some("p2p".into()),
280                    ..Default::default()
281                },
282                ServicePort {
283                    port: RPC_WS_PORT.into(),
284                    name: Some("rpc".into()),
285                    ..Default::default()
286                },
287                ServicePort {
288                    port: RPC_HTTP_PORT.into(),
289                    name: Some("rpc-http".into()),
290                    ..Default::default()
291                },
292                ServicePort {
293                    port: PROMETHEUS_PORT.into(),
294                    name: Some("prom".into()),
295                    ..Default::default()
296                },
297            ]),
298            ..Default::default()
299        };
300
301        let service_manifest = self
302            .k8s_client
303            .create_service(&self.namespace_name(), &self.name, service_spec, labels)
304            .await
305            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
306
307        let serialized_service_manifest = serde_yaml::to_string(&service_manifest)
308            .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
309
310        let service_dest_path = PathBuf::from_iter([
311            &self.base_dir,
312            &PathBuf::from(format!("{}_svc_manifest.yaml", &self.name)),
313        ]);
314
315        self.filesystem
316            .write(service_dest_path, serialized_service_manifest)
317            .await?;
318
319        Ok(())
320    }
321
322    async fn initialize_db_snapshot(
323        &self,
324        db_snapshot: &AssetLocation,
325    ) -> Result<(), ProviderError> {
326        trace!("snap: {db_snapshot}");
327        let url_of_snap = match db_snapshot {
328            AssetLocation::Url(location) => location.clone(),
329            AssetLocation::FilePath(filepath) => {
330                let (url, _) = self.upload_to_fileserver(filepath).await?;
331                url
332            },
333        };
334
335        // we need to get the snapshot from a public access
336        // and extract to /data
337        let opts = RunCommandOptions::new("mkdir").args([
338            "-p",
339            "/data/",
340            "&&",
341            "mkdir",
342            "-p",
343            "/relay-data/",
344            "&&",
345            // Use our version of curl
346            "/cfg/curl",
347            url_of_snap.as_ref(),
348            "--output",
349            "/data/db.tgz",
350            "&&",
351            "cd",
352            "/",
353            "&&",
354            "tar",
355            "--skip-old-files",
356            "-xzvf",
357            "/data/db.tgz",
358        ]);
359
360        trace!("cmd opts: {:#?}", opts);
361        let _ = self.run_command(opts).await?;
362
363        Ok(())
364    }
365
366    async fn initialize_startup_files(
367        &self,
368        startup_files: &[TransferedFile],
369    ) -> Result<(), ProviderError> {
370        try_join_all(
371            startup_files
372                .iter()
373                .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
374        )
375        .await?;
376
377        Ok(())
378    }
379
380    pub(super) async fn start(&self) -> Result<(), ProviderError> {
381        self.k8s_client
382            .pod_exec(
383                &self.namespace_name(),
384                &self.name,
385                vec!["sh", "-c", "echo start > /tmp/zombiepipe"],
386            )
387            .await
388            .map_err(|err| {
389                ProviderError::NodeSpawningFailed(
390                    format!("failed to start pod {} after spawning", self.name),
391                    err.into(),
392                )
393            })?
394            .map_err(|err| {
395                ProviderError::NodeSpawningFailed(
396                    format!("failed to start pod {} after spawning", self.name,),
397                    anyhow!("command failed in container: status {}: {}", err.0, err.1),
398                )
399            })?;
400
401        Ok(())
402    }
403
404    fn get_remote_parent_dir(&self, remote_file_path: &Path) -> Option<PathBuf> {
405        if let Some(remote_parent_dir) = remote_file_path.parent() {
406            if matches!(
407                remote_parent_dir.components().rev().peekable().peek(),
408                Some(Component::Normal(_))
409            ) {
410                return Some(remote_parent_dir.to_path_buf());
411            }
412        }
413
414        None
415    }
416
417    async fn create_remote_dir(&self, remote_dir: &Path) -> Result<(), ProviderError> {
418        let _ = self
419            .k8s_client
420            .pod_exec(
421                &self.namespace_name(),
422                &self.name,
423                vec!["mkdir", "-p", &remote_dir.to_string_lossy()],
424            )
425            .await
426            .map_err(|err| {
427                ProviderError::NodeSpawningFailed(
428                    format!(
429                        "failed to create dir {} for pod {}",
430                        remote_dir.to_string_lossy(),
431                        &self.name
432                    ),
433                    err.into(),
434                )
435            })?;
436
437        Ok(())
438    }
439
440    fn namespace_name(&self) -> String {
441        self.namespace
442            .upgrade()
443            .map(|namespace| namespace.name().to_string())
444            .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
445    }
446
447    async fn upload_to_fileserver(&self, location: &Path) -> Result<(Url, String), ProviderError> {
448        let file_name = if let Some(name) = location.file_name() {
449            name.to_string_lossy()
450        } else {
451            "unnamed".into()
452        };
453
454        let data = self.filesystem.read(location).await?;
455        let content_hashed = hex::encode(sha2::Sha256::digest(&data));
456        let req = self
457            .http_client
458            .head(format!(
459                "http://{}/{content_hashed}__{file_name}",
460                self.file_server_local_host().await?
461            ))
462            .build()
463            .map_err(|err| {
464                ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
465            })?;
466
467        let url = req.url().clone();
468        let res = self.http_client.execute(req).await.map_err(|err| {
469            ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
470        })?;
471
472        if res.status() != reqwest::StatusCode::OK {
473            // we need to upload the file
474            self.http_client
475                .post(url.as_ref())
476                .body(data)
477                .send()
478                .await
479                .map_err(|err| {
480                    ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
481                })?;
482        }
483
484        Ok((url, content_hashed))
485    }
486
487    async fn file_server_local_host(&self) -> Result<String, ProviderError> {
488        if let Some(namespace) = self.namespace.upgrade() {
489            if let Some(port) = *namespace.file_server_port.read().await {
490                return Ok(format!("localhost:{port}"));
491            }
492        }
493
494        Err(ProviderError::FileServerSetupError(anyhow!(
495            "file server port not bound locally"
496        )))
497    }
498
499    async fn download_file(
500        &self,
501        url: &str,
502        remote_file_path: &Path,
503        hash: Option<&str>,
504    ) -> Result<(), ProviderError> {
505        let r = self
506            .k8s_client
507            .pod_exec(
508                &self.namespace_name(),
509                &self.name,
510                vec![
511                    "/cfg/curl",
512                    url,
513                    "--output",
514                    &remote_file_path.to_string_lossy(),
515                ],
516            )
517            .await
518            .map_err(|err| {
519                ProviderError::DownloadFile(
520                    remote_file_path.to_string_lossy().to_string(),
521                    anyhow!(format!("node: {}, err: {}", self.name(), err)),
522                )
523            })?;
524
525        trace!("download url {} result: {:?}", url, r);
526
527        if r.is_err() {
528            return Err(ProviderError::DownloadFile(
529                remote_file_path.to_string_lossy().to_string(),
530                anyhow!(format!("node: {}, err downloading file", self.name())),
531            ));
532        }
533
534        if let Some(hash) = hash {
535            // check if the hash of the file is correct
536            let res = self
537                .k8s_client
538                .pod_exec(
539                    &self.namespace_name(),
540                    &self.name,
541                    vec![
542                        "/cfg/coreutils",
543                        "sha256sum",
544                        &remote_file_path.to_string_lossy(),
545                    ],
546                )
547                .await
548                .map_err(|err| {
549                    ProviderError::DownloadFile(
550                        remote_file_path.to_string_lossy().to_string(),
551                        anyhow!(format!("node: {}, err: {}", self.name(), err)),
552                    )
553                })?;
554
555            if let Ok(output) = res {
556                if !output.contains(hash) {
557                    return Err(ProviderError::DownloadFile(
558                        remote_file_path.to_string_lossy().to_string(),
559                        anyhow!(format!("node: {}, invalid sha256sum hash: {hash} for file, output was {output}", self.name())),
560                    ));
561                }
562            } else {
563                return Err(ProviderError::DownloadFile(
564                    remote_file_path.to_string_lossy().to_string(),
565                    anyhow!(format!(
566                        "node: {}, err calculating sha256sum for file {:?}",
567                        self.name(),
568                        res
569                    )),
570                ));
571            }
572        }
573
574        Ok(())
575    }
576}
577
578#[async_trait]
579impl<FS> ProviderNode for KubernetesNode<FS>
580where
581    FS: FileSystem + Send + Sync + Clone + 'static,
582{
583    fn name(&self) -> &str {
584        &self.name
585    }
586
587    fn args(&self) -> Vec<&str> {
588        self.args.iter().map(|arg| arg.as_str()).collect()
589    }
590
591    fn base_dir(&self) -> &PathBuf {
592        &self.base_dir
593    }
594
595    fn config_dir(&self) -> &PathBuf {
596        &self.config_dir
597    }
598
599    fn data_dir(&self) -> &PathBuf {
600        &self.data_dir
601    }
602
603    fn relay_data_dir(&self) -> &PathBuf {
604        &self.relay_data_dir
605    }
606
607    fn scripts_dir(&self) -> &PathBuf {
608        &self.scripts_dir
609    }
610
611    fn log_path(&self) -> &PathBuf {
612        &self.log_path
613    }
614
615    fn log_cmd(&self) -> String {
616        format!("kubectl -n {} logs {}", self.namespace_name(), self.name)
617    }
618
619    fn path_in_node(&self, file: &Path) -> PathBuf {
620        // here is just a noop op since we will receive the path
621        // for the file inside the pod
622        PathBuf::from(file)
623    }
624
625    // TODO: handle log rotation as we do in v1
626    async fn logs(&self) -> Result<String, ProviderError> {
627        self.k8s_client
628            .pod_logs(&self.namespace_name(), &self.name)
629            .await
630            .map_err(|err| ProviderError::GetLogsFailed(self.name.to_string(), err.into()))
631    }
632
633    async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
634        let logs = self.logs().await?;
635
636        self.filesystem
637            .write(local_dest, logs)
638            .await
639            .map_err(|err| ProviderError::DumpLogsFailed(self.name.to_string(), err.into()))?;
640
641        Ok(())
642    }
643
644    async fn create_port_forward(
645        &self,
646        local_port: u16,
647        remote_port: u16,
648    ) -> Result<Option<u16>, ProviderError> {
649        // If the fwd exist just return the local port
650        if let Some(fwd_info) = self.port_fwds.read().await.get(&remote_port) {
651            return Ok(Some(fwd_info.0));
652        };
653
654        let (port, task) = self
655            .k8s_client
656            .create_pod_port_forward(&self.namespace_name(), &self.name, local_port, remote_port)
657            .await
658            .map_err(|err| ProviderError::PortForwardError(local_port, remote_port, err.into()))?;
659
660        self.port_fwds
661            .write()
662            .await
663            .insert(remote_port, (port, task));
664
665        Ok(Some(port))
666    }
667
668    async fn run_command(
669        &self,
670        options: RunCommandOptions,
671    ) -> Result<ExecutionResult, ProviderError> {
672        let mut command = vec![];
673
674        for (name, value) in options.env {
675            command.push(format!("export {name}={value};"));
676        }
677
678        command.push(options.program);
679
680        for arg in options.args {
681            command.push(arg);
682        }
683
684        self.k8s_client
685            .pod_exec(
686                &self.namespace_name(),
687                &self.name,
688                vec!["sh", "-c", &command.join(" ")],
689            )
690            .await
691            .map_err(|err| {
692                ProviderError::RunCommandError(
693                    format!("sh -c {}", &command.join(" ")),
694                    format!("in pod {}", self.name),
695                    err.into(),
696                )
697            })
698    }
699
700    async fn run_script(
701        &self,
702        options: RunScriptOptions,
703    ) -> Result<ExecutionResult, ProviderError> {
704        let file_name = options
705            .local_script_path
706            .file_name()
707            .expect(&format!(
708                "file name should be present at this point {THIS_IS_A_BUG}"
709            ))
710            .to_string_lossy();
711
712        self.run_command(RunCommandOptions {
713            program: format!("/tmp/{file_name}"),
714            args: options.args,
715            env: options.env,
716        })
717        .await
718        .map_err(|err| ProviderError::RunScriptError(self.name.to_string(), err.into()))
719    }
720
721    async fn send_file(
722        &self,
723        local_file_path: &Path,
724        remote_file_path: &Path,
725        mode: &str,
726    ) -> Result<(), ProviderError> {
727        if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) {
728            self.create_remote_dir(&remote_parent_dir).await?;
729        }
730
731        debug!(
732            "Uploading file: {} IFF not present in the fileserver",
733            local_file_path.to_string_lossy()
734        );
735
736        // we need to override the url to use inside the pod
737        let (mut url, hash) = self.upload_to_fileserver(local_file_path).await?;
738        let _ = url.set_host(Some("fileserver"));
739        let _ = url.set_port(Some(80));
740
741        // Sometimes downloading the file fails (the file is corrupted)
742        // Add at most 5 retries
743        let mut last_err = None;
744        for i in 0..5 {
745            if i > 0 {
746                warn!("retrying number {i} download file {:?}", remote_file_path);
747                tokio::time::sleep(Duration::from_secs(i)).await;
748            }
749
750            let res = self
751                .download_file(url.as_ref(), remote_file_path, Some(&hash))
752                .await;
753
754            last_err = res.err();
755
756            if last_err.is_none() {
757                // ready to continue
758                break;
759            }
760        }
761
762        if let Some(last_err) = last_err {
763            return Err(last_err);
764        }
765
766        let _ = self
767            .k8s_client
768            .pod_exec(
769                &self.namespace_name(),
770                &self.name,
771                vec!["chmod", mode, &remote_file_path.to_string_lossy()],
772            )
773            .await
774            .map_err(|err| {
775                ProviderError::SendFile(
776                    self.name.clone(),
777                    local_file_path.to_string_lossy().to_string(),
778                    err.into(),
779                )
780            })?;
781
782        Ok(())
783    }
784
785    async fn receive_file(
786        &self,
787        _remote_src: &Path,
788        _local_dest: &Path,
789    ) -> Result<(), ProviderError> {
790        Ok(())
791    }
792
793    async fn ip(&self) -> Result<IpAddr, ProviderError> {
794        let status = self
795            .k8s_client
796            .pod_status(&self.namespace_name(), &self.name)
797            .await
798            .map_err(|_| ProviderError::MissingNode(self.name.clone()))?;
799
800        if let Some(ip) = status.pod_ip {
801            // Pod ip should be parseable
802            Ok(ip.parse::<IpAddr>().map_err(|err| {
803                ProviderError::InvalidConfig(format!("Can not parse the pod ip: {ip}, err: {err}"))
804            })?)
805        } else {
806            Err(ProviderError::InvalidConfig(format!(
807                "Can not find ip of pod: {}",
808                self.name()
809            )))
810        }
811    }
812
813    async fn pause(&self) -> Result<(), ProviderError> {
814        self.k8s_client
815            .pod_exec(
816                &self.namespace_name(),
817                &self.name,
818                vec!["sh", "-c", "echo pause > /tmp/zombiepipe"],
819            )
820            .await
821            .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
822            .map_err(|err| {
823                ProviderError::PauseNodeFailed(
824                    self.name.to_string(),
825                    anyhow!("error when pausing node: status {}: {}", err.0, err.1),
826                )
827            })?;
828
829        Ok(())
830    }
831
832    async fn resume(&self) -> Result<(), ProviderError> {
833        self.k8s_client
834            .pod_exec(
835                &self.namespace_name(),
836                &self.name,
837                vec!["sh", "-c", "echo resume > /tmp/zombiepipe"],
838            )
839            .await
840            .map_err(|err| ProviderError::ResumeNodeFailed(self.name.to_string(), err.into()))?
841            .map_err(|err| {
842                ProviderError::ResumeNodeFailed(
843                    self.name.to_string(),
844                    anyhow!("error when pausing node: status {}: {}", err.0, err.1),
845                )
846            })?;
847
848        Ok(())
849    }
850
851    async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
852        if let Some(duration) = after {
853            sleep(duration).await;
854        }
855
856        self.k8s_client
857            .pod_exec(
858                &self.namespace_name(),
859                &self.name,
860                vec!["sh", "-c", "echo restart > /tmp/zombiepipe"],
861            )
862            .await
863            .map_err(|err| ProviderError::RestartNodeFailed(self.name.to_string(), err.into()))?
864            .map_err(|err| {
865                ProviderError::RestartNodeFailed(
866                    self.name.to_string(),
867                    anyhow!("error when restarting node: status {}: {}", err.0, err.1),
868                )
869            })?;
870
871        Ok(())
872    }
873
874    async fn destroy(&self) -> Result<(), ProviderError> {
875        self.k8s_client
876            .delete_pod(&self.namespace_name(), &self.name)
877            .await
878            .map_err(|err| ProviderError::KillNodeFailed(self.name.to_string(), err.into()))?;
879
880        if let Some(namespace) = self.namespace.upgrade() {
881            namespace.nodes.write().await.remove(&self.name);
882        }
883
884        Ok(())
885    }
886}