1use std::{
2 collections::{BTreeMap, HashMap},
3 env,
4 net::IpAddr,
5 path::{Component, Path, PathBuf},
6 sync::{Arc, Weak},
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use async_trait::async_trait;
12use configuration::{shared::resources::Resources, types::AssetLocation};
13use futures::future::try_join_all;
14use k8s_openapi::api::core::v1::{ServicePort, ServiceSpec};
15use serde::{Deserialize, Serialize};
16use sha2::Digest;
17use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
18use tokio::{sync::RwLock, task::JoinHandle, time::sleep, try_join};
19use tracing::{debug, trace, warn};
20use url::Url;
21
22use super::{
23 client::KubernetesClient, namespace::KubernetesNamespace, pod_spec_builder::PodSpecBuilder,
24};
25use crate::{
26 constants::{
27 NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR, P2P_PORT,
28 PROMETHEUS_PORT, RPC_HTTP_PORT, RPC_WS_PORT,
29 },
30 kubernetes,
31 types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile},
32 ProviderError, ProviderNamespace, ProviderNode,
33};
34
35pub(super) struct KubernetesNodeOptions<'a, FS>
36where
37 FS: FileSystem + Send + Sync + Clone + 'static,
38{
39 pub(super) namespace: &'a Weak<KubernetesNamespace<FS>>,
40 pub(super) namespace_base_dir: &'a PathBuf,
41 pub(super) name: &'a str,
42 pub(super) image: Option<&'a String>,
43 pub(super) program: &'a str,
44 pub(super) args: &'a [String],
45 pub(super) env: &'a [(String, String)],
46 pub(super) startup_files: &'a [TransferedFile],
47 pub(super) resources: Option<&'a Resources>,
48 pub(super) db_snapshot: Option<&'a AssetLocation>,
49 pub(super) k8s_client: &'a KubernetesClient,
50 pub(super) filesystem: &'a FS,
51}
52
53impl<'a, FS> KubernetesNodeOptions<'a, FS>
54where
55 FS: FileSystem + Send + Sync + Clone + 'static,
56{
57 pub(super) fn from_deserializable(
58 deserializable: &'a DeserializableKubernetesNodeOptions,
59 namespace: &'a Weak<KubernetesNamespace<FS>>,
60 namespace_base_dir: &'a PathBuf,
61 k8s_client: &'a KubernetesClient,
62 filesystem: &'a FS,
63 ) -> KubernetesNodeOptions<'a, FS> {
64 KubernetesNodeOptions {
65 namespace,
66 namespace_base_dir,
67 name: &deserializable.name,
68 image: deserializable.image.as_ref(),
69 program: &deserializable.program,
70 args: &deserializable.args,
71 env: &deserializable.env,
72 startup_files: &[],
73 resources: deserializable.resources.as_ref(),
74 db_snapshot: None,
75 k8s_client,
76 filesystem,
77 }
78 }
79}
80
81#[derive(Deserialize)]
82pub(super) struct DeserializableKubernetesNodeOptions {
83 pub(super) name: String,
84 pub(super) image: Option<String>,
85 pub(super) program: String,
86 pub(super) args: Vec<String>,
87 pub(super) env: Vec<(String, String)>,
88 pub(super) resources: Option<Resources>,
89}
90
91type FwdInfo = (u16, JoinHandle<()>);
92
93#[derive(Serialize)]
94pub(super) struct KubernetesNode<FS>
95where
96 FS: FileSystem + Send + Sync + Clone,
97{
98 #[serde(skip)]
99 namespace: Weak<KubernetesNamespace<FS>>,
100 name: String,
101 image: String,
102 program: String,
103 args: Vec<String>,
104 env: Vec<(String, String)>,
105 resources: Option<Resources>,
106 base_dir: PathBuf,
107 config_dir: PathBuf,
108 data_dir: PathBuf,
109 relay_data_dir: PathBuf,
110 scripts_dir: PathBuf,
111 log_path: PathBuf,
112 #[serde(skip)]
113 k8s_client: KubernetesClient,
114 #[serde(skip)]
115 http_client: reqwest::Client,
116 #[serde(skip)]
117 filesystem: FS,
118 #[serde(skip)]
119 port_fwds: RwLock<HashMap<u16, FwdInfo>>,
120 provider_tag: String,
121}
122
123impl<FS> KubernetesNode<FS>
124where
125 FS: FileSystem + Send + Sync + Clone + 'static,
126{
127 pub(super) async fn new(
128 options: KubernetesNodeOptions<'_, FS>,
129 ) -> Result<Arc<Self>, ProviderError> {
130 let image = options.image.ok_or_else(|| {
131 ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
132 })?;
133
134 let filesystem = options.filesystem.clone();
135
136 let base_dir =
137 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
138 filesystem.create_dir_all(&base_dir).await?;
139
140 let base_dir_raw = base_dir.to_string_lossy();
141 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
142 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
143 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
144 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
145 let log_path = base_dir.join("node.log");
146
147 try_join!(
148 filesystem.create_dir(&config_dir),
149 filesystem.create_dir(&data_dir),
150 filesystem.create_dir(&relay_data_dir),
151 filesystem.create_dir(&scripts_dir),
152 )?;
153
154 let node = Arc::new(KubernetesNode {
155 namespace: options.namespace.clone(),
156 name: options.name.to_string(),
157 image: image.to_string(),
158 program: options.program.to_string(),
159 args: options.args.to_vec(),
160 env: options.env.to_vec(),
161 resources: options.resources.cloned(),
162 base_dir,
163 config_dir,
164 data_dir,
165 relay_data_dir,
166 scripts_dir,
167 log_path,
168 filesystem: filesystem.clone(),
169 k8s_client: options.k8s_client.clone(),
170 http_client: reqwest::Client::new(),
171 port_fwds: Default::default(),
172 provider_tag: kubernetes::provider::PROVIDER_NAME.to_string(),
173 });
174
175 node.initialize_k8s().await?;
176
177 if let Some(db_snap) = options.db_snapshot {
178 node.initialize_db_snapshot(db_snap).await?;
179 }
180
181 node.initialize_startup_files(options.startup_files).await?;
182
183 node.start().await?;
184
185 Ok(node)
186 }
187
188 pub(super) async fn attach_to_live(
189 options: KubernetesNodeOptions<'_, FS>,
190 ) -> Result<Arc<Self>, ProviderError> {
191 let image = options.image.ok_or_else(|| {
192 ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string())
193 })?;
194
195 let filesystem = options.filesystem.clone();
196
197 let base_dir =
198 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
199 filesystem.create_dir_all(&base_dir).await?;
200
201 let base_dir_raw = base_dir.to_string_lossy();
202 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
203 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
204 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
205 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
206 let log_path = base_dir.join("node.log");
207
208 let node = Arc::new(KubernetesNode {
209 namespace: options.namespace.clone(),
210 name: options.name.to_string(),
211 image: image.to_string(),
212 program: options.program.to_string(),
213 args: options.args.to_vec(),
214 env: options.env.to_vec(),
215 resources: options.resources.cloned(),
216 base_dir,
217 config_dir,
218 data_dir,
219 relay_data_dir,
220 scripts_dir,
221 log_path,
222 filesystem: filesystem.clone(),
223 k8s_client: options.k8s_client.clone(),
224 http_client: reqwest::Client::new(),
225 port_fwds: Default::default(),
226 provider_tag: kubernetes::provider::PROVIDER_NAME.to_string(),
227 });
228
229 Ok(node)
230 }
231
232 async fn initialize_k8s(&self) -> Result<(), ProviderError> {
233 let labels = BTreeMap::from([
234 (
235 "app.kubernetes.io/name".to_string(),
236 self.name().to_string(),
237 ),
238 (
239 "x-infra-instance".to_string(),
240 env::var("X_INFRA_INSTANCE").unwrap_or("ondemand".to_string()),
241 ),
242 ]);
243
244 let pod_spec = PodSpecBuilder::build(
246 &self.name,
247 &self.image,
248 self.resources.as_ref(),
249 &self.program,
250 &self.args,
251 &self.env,
252 );
253
254 let manifest = self
255 .k8s_client
256 .create_pod(&self.namespace_name(), &self.name, pod_spec, labels.clone())
257 .await
258 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?;
259
260 let serialized_manifest = serde_yaml::to_string(&manifest)
261 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
262
263 let dest_path = PathBuf::from_iter([
264 &self.base_dir,
265 &PathBuf::from(format!("{}_manifest.yaml", &self.name)),
266 ]);
267
268 self.filesystem
269 .write(dest_path, serialized_manifest)
270 .await
271 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
272
273 let service_spec = ServiceSpec {
275 selector: Some(labels.clone()),
276 ports: Some(vec![
277 ServicePort {
278 port: P2P_PORT.into(),
279 name: Some("p2p".into()),
280 ..Default::default()
281 },
282 ServicePort {
283 port: RPC_WS_PORT.into(),
284 name: Some("rpc".into()),
285 ..Default::default()
286 },
287 ServicePort {
288 port: RPC_HTTP_PORT.into(),
289 name: Some("rpc-http".into()),
290 ..Default::default()
291 },
292 ServicePort {
293 port: PROMETHEUS_PORT.into(),
294 name: Some("prom".into()),
295 ..Default::default()
296 },
297 ]),
298 ..Default::default()
299 };
300
301 let service_manifest = self
302 .k8s_client
303 .create_service(&self.namespace_name(), &self.name, service_spec, labels)
304 .await
305 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
306
307 let serialized_service_manifest = serde_yaml::to_string(&service_manifest)
308 .map_err(|err| ProviderError::FileServerSetupError(err.into()))?;
309
310 let service_dest_path = PathBuf::from_iter([
311 &self.base_dir,
312 &PathBuf::from(format!("{}_svc_manifest.yaml", &self.name)),
313 ]);
314
315 self.filesystem
316 .write(service_dest_path, serialized_service_manifest)
317 .await?;
318
319 Ok(())
320 }
321
322 async fn initialize_db_snapshot(
323 &self,
324 db_snapshot: &AssetLocation,
325 ) -> Result<(), ProviderError> {
326 trace!("snap: {db_snapshot}");
327 let url_of_snap = match db_snapshot {
328 AssetLocation::Url(location) => location.clone(),
329 AssetLocation::FilePath(filepath) => {
330 let (url, _) = self.upload_to_fileserver(filepath).await?;
331 url
332 },
333 };
334
335 let opts = RunCommandOptions::new("mkdir").args([
338 "-p",
339 "/data/",
340 "&&",
341 "mkdir",
342 "-p",
343 "/relay-data/",
344 "&&",
345 "/cfg/curl",
347 url_of_snap.as_ref(),
348 "--output",
349 "/data/db.tgz",
350 "&&",
351 "cd",
352 "/",
353 "&&",
354 "tar",
355 "--skip-old-files",
356 "-xzvf",
357 "/data/db.tgz",
358 ]);
359
360 trace!("cmd opts: {:#?}", opts);
361 let _ = self.run_command(opts).await?;
362
363 Ok(())
364 }
365
366 async fn initialize_startup_files(
367 &self,
368 startup_files: &[TransferedFile],
369 ) -> Result<(), ProviderError> {
370 try_join_all(
371 startup_files
372 .iter()
373 .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
374 )
375 .await?;
376
377 Ok(())
378 }
379
380 pub(super) async fn start(&self) -> Result<(), ProviderError> {
381 self.k8s_client
382 .pod_exec(
383 &self.namespace_name(),
384 &self.name,
385 vec!["sh", "-c", "echo start > /tmp/zombiepipe"],
386 )
387 .await
388 .map_err(|err| {
389 ProviderError::NodeSpawningFailed(
390 format!("failed to start pod {} after spawning", self.name),
391 err.into(),
392 )
393 })?
394 .map_err(|err| {
395 ProviderError::NodeSpawningFailed(
396 format!("failed to start pod {} after spawning", self.name,),
397 anyhow!("command failed in container: status {}: {}", err.0, err.1),
398 )
399 })?;
400
401 Ok(())
402 }
403
404 fn get_remote_parent_dir(&self, remote_file_path: &Path) -> Option<PathBuf> {
405 if let Some(remote_parent_dir) = remote_file_path.parent() {
406 if matches!(
407 remote_parent_dir.components().rev().peekable().peek(),
408 Some(Component::Normal(_))
409 ) {
410 return Some(remote_parent_dir.to_path_buf());
411 }
412 }
413
414 None
415 }
416
417 async fn create_remote_dir(&self, remote_dir: &Path) -> Result<(), ProviderError> {
418 let _ = self
419 .k8s_client
420 .pod_exec(
421 &self.namespace_name(),
422 &self.name,
423 vec!["mkdir", "-p", &remote_dir.to_string_lossy()],
424 )
425 .await
426 .map_err(|err| {
427 ProviderError::NodeSpawningFailed(
428 format!(
429 "failed to create dir {} for pod {}",
430 remote_dir.to_string_lossy(),
431 &self.name
432 ),
433 err.into(),
434 )
435 })?;
436
437 Ok(())
438 }
439
440 fn namespace_name(&self) -> String {
441 self.namespace
442 .upgrade()
443 .map(|namespace| namespace.name().to_string())
444 .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
445 }
446
447 async fn upload_to_fileserver(&self, location: &Path) -> Result<(Url, String), ProviderError> {
448 let file_name = if let Some(name) = location.file_name() {
449 name.to_string_lossy()
450 } else {
451 "unnamed".into()
452 };
453
454 let data = self.filesystem.read(location).await?;
455 let content_hashed = hex::encode(sha2::Sha256::digest(&data));
456 let req = self
457 .http_client
458 .head(format!(
459 "http://{}/{content_hashed}__{file_name}",
460 self.file_server_local_host().await?
461 ))
462 .build()
463 .map_err(|err| {
464 ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
465 })?;
466
467 let url = req.url().clone();
468 let res = self.http_client.execute(req).await.map_err(|err| {
469 ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
470 })?;
471
472 if res.status() != reqwest::StatusCode::OK {
473 self.http_client
475 .post(url.as_ref())
476 .body(data)
477 .send()
478 .await
479 .map_err(|err| {
480 ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into())
481 })?;
482 }
483
484 Ok((url, content_hashed))
485 }
486
487 async fn file_server_local_host(&self) -> Result<String, ProviderError> {
488 if let Some(namespace) = self.namespace.upgrade() {
489 if let Some(port) = *namespace.file_server_port.read().await {
490 return Ok(format!("localhost:{port}"));
491 }
492 }
493
494 Err(ProviderError::FileServerSetupError(anyhow!(
495 "file server port not bound locally"
496 )))
497 }
498
499 async fn download_file(
500 &self,
501 url: &str,
502 remote_file_path: &Path,
503 hash: Option<&str>,
504 ) -> Result<(), ProviderError> {
505 let r = self
506 .k8s_client
507 .pod_exec(
508 &self.namespace_name(),
509 &self.name,
510 vec![
511 "/cfg/curl",
512 url,
513 "--output",
514 &remote_file_path.to_string_lossy(),
515 ],
516 )
517 .await
518 .map_err(|err| {
519 ProviderError::DownloadFile(
520 remote_file_path.to_string_lossy().to_string(),
521 anyhow!(format!("node: {}, err: {}", self.name(), err)),
522 )
523 })?;
524
525 trace!("download url {} result: {:?}", url, r);
526
527 if r.is_err() {
528 return Err(ProviderError::DownloadFile(
529 remote_file_path.to_string_lossy().to_string(),
530 anyhow!(format!("node: {}, err downloading file", self.name())),
531 ));
532 }
533
534 if let Some(hash) = hash {
535 let res = self
537 .k8s_client
538 .pod_exec(
539 &self.namespace_name(),
540 &self.name,
541 vec![
542 "/cfg/coreutils",
543 "sha256sum",
544 &remote_file_path.to_string_lossy(),
545 ],
546 )
547 .await
548 .map_err(|err| {
549 ProviderError::DownloadFile(
550 remote_file_path.to_string_lossy().to_string(),
551 anyhow!(format!("node: {}, err: {}", self.name(), err)),
552 )
553 })?;
554
555 if let Ok(output) = res {
556 if !output.contains(hash) {
557 return Err(ProviderError::DownloadFile(
558 remote_file_path.to_string_lossy().to_string(),
559 anyhow!(format!("node: {}, invalid sha256sum hash: {hash} for file, output was {output}", self.name())),
560 ));
561 }
562 } else {
563 return Err(ProviderError::DownloadFile(
564 remote_file_path.to_string_lossy().to_string(),
565 anyhow!(format!(
566 "node: {}, err calculating sha256sum for file {:?}",
567 self.name(),
568 res
569 )),
570 ));
571 }
572 }
573
574 Ok(())
575 }
576}
577
578#[async_trait]
579impl<FS> ProviderNode for KubernetesNode<FS>
580where
581 FS: FileSystem + Send + Sync + Clone + 'static,
582{
583 fn name(&self) -> &str {
584 &self.name
585 }
586
587 fn args(&self) -> Vec<&str> {
588 self.args.iter().map(|arg| arg.as_str()).collect()
589 }
590
591 fn base_dir(&self) -> &PathBuf {
592 &self.base_dir
593 }
594
595 fn config_dir(&self) -> &PathBuf {
596 &self.config_dir
597 }
598
599 fn data_dir(&self) -> &PathBuf {
600 &self.data_dir
601 }
602
603 fn relay_data_dir(&self) -> &PathBuf {
604 &self.relay_data_dir
605 }
606
607 fn scripts_dir(&self) -> &PathBuf {
608 &self.scripts_dir
609 }
610
611 fn log_path(&self) -> &PathBuf {
612 &self.log_path
613 }
614
615 fn log_cmd(&self) -> String {
616 format!("kubectl -n {} logs {}", self.namespace_name(), self.name)
617 }
618
619 fn path_in_node(&self, file: &Path) -> PathBuf {
620 PathBuf::from(file)
623 }
624
625 async fn logs(&self) -> Result<String, ProviderError> {
627 self.k8s_client
628 .pod_logs(&self.namespace_name(), &self.name)
629 .await
630 .map_err(|err| ProviderError::GetLogsFailed(self.name.to_string(), err.into()))
631 }
632
633 async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
634 let logs = self.logs().await?;
635
636 self.filesystem
637 .write(local_dest, logs)
638 .await
639 .map_err(|err| ProviderError::DumpLogsFailed(self.name.to_string(), err.into()))?;
640
641 Ok(())
642 }
643
644 async fn create_port_forward(
645 &self,
646 local_port: u16,
647 remote_port: u16,
648 ) -> Result<Option<u16>, ProviderError> {
649 if let Some(fwd_info) = self.port_fwds.read().await.get(&remote_port) {
651 return Ok(Some(fwd_info.0));
652 };
653
654 let (port, task) = self
655 .k8s_client
656 .create_pod_port_forward(&self.namespace_name(), &self.name, local_port, remote_port)
657 .await
658 .map_err(|err| ProviderError::PortForwardError(local_port, remote_port, err.into()))?;
659
660 self.port_fwds
661 .write()
662 .await
663 .insert(remote_port, (port, task));
664
665 Ok(Some(port))
666 }
667
668 async fn run_command(
669 &self,
670 options: RunCommandOptions,
671 ) -> Result<ExecutionResult, ProviderError> {
672 let mut command = vec![];
673
674 for (name, value) in options.env {
675 command.push(format!("export {name}={value};"));
676 }
677
678 command.push(options.program);
679
680 for arg in options.args {
681 command.push(arg);
682 }
683
684 self.k8s_client
685 .pod_exec(
686 &self.namespace_name(),
687 &self.name,
688 vec!["sh", "-c", &command.join(" ")],
689 )
690 .await
691 .map_err(|err| {
692 ProviderError::RunCommandError(
693 format!("sh -c {}", &command.join(" ")),
694 format!("in pod {}", self.name),
695 err.into(),
696 )
697 })
698 }
699
700 async fn run_script(
701 &self,
702 options: RunScriptOptions,
703 ) -> Result<ExecutionResult, ProviderError> {
704 let file_name = options
705 .local_script_path
706 .file_name()
707 .expect(&format!(
708 "file name should be present at this point {THIS_IS_A_BUG}"
709 ))
710 .to_string_lossy();
711
712 self.run_command(RunCommandOptions {
713 program: format!("/tmp/{file_name}"),
714 args: options.args,
715 env: options.env,
716 })
717 .await
718 .map_err(|err| ProviderError::RunScriptError(self.name.to_string(), err.into()))
719 }
720
721 async fn send_file(
722 &self,
723 local_file_path: &Path,
724 remote_file_path: &Path,
725 mode: &str,
726 ) -> Result<(), ProviderError> {
727 if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) {
728 self.create_remote_dir(&remote_parent_dir).await?;
729 }
730
731 debug!(
732 "Uploading file: {} IFF not present in the fileserver",
733 local_file_path.to_string_lossy()
734 );
735
736 let (mut url, hash) = self.upload_to_fileserver(local_file_path).await?;
738 let _ = url.set_host(Some("fileserver"));
739 let _ = url.set_port(Some(80));
740
741 let mut last_err = None;
744 for i in 0..5 {
745 if i > 0 {
746 warn!("retrying number {i} download file {:?}", remote_file_path);
747 tokio::time::sleep(Duration::from_secs(i)).await;
748 }
749
750 let res = self
751 .download_file(url.as_ref(), remote_file_path, Some(&hash))
752 .await;
753
754 last_err = res.err();
755
756 if last_err.is_none() {
757 break;
759 }
760 }
761
762 if let Some(last_err) = last_err {
763 return Err(last_err);
764 }
765
766 let _ = self
767 .k8s_client
768 .pod_exec(
769 &self.namespace_name(),
770 &self.name,
771 vec!["chmod", mode, &remote_file_path.to_string_lossy()],
772 )
773 .await
774 .map_err(|err| {
775 ProviderError::SendFile(
776 self.name.clone(),
777 local_file_path.to_string_lossy().to_string(),
778 err.into(),
779 )
780 })?;
781
782 Ok(())
783 }
784
785 async fn receive_file(
786 &self,
787 _remote_src: &Path,
788 _local_dest: &Path,
789 ) -> Result<(), ProviderError> {
790 Ok(())
791 }
792
793 async fn ip(&self) -> Result<IpAddr, ProviderError> {
794 let status = self
795 .k8s_client
796 .pod_status(&self.namespace_name(), &self.name)
797 .await
798 .map_err(|_| ProviderError::MissingNode(self.name.clone()))?;
799
800 if let Some(ip) = status.pod_ip {
801 Ok(ip.parse::<IpAddr>().map_err(|err| {
803 ProviderError::InvalidConfig(format!("Can not parse the pod ip: {ip}, err: {err}"))
804 })?)
805 } else {
806 Err(ProviderError::InvalidConfig(format!(
807 "Can not find ip of pod: {}",
808 self.name()
809 )))
810 }
811 }
812
813 async fn pause(&self) -> Result<(), ProviderError> {
814 self.k8s_client
815 .pod_exec(
816 &self.namespace_name(),
817 &self.name,
818 vec!["sh", "-c", "echo pause > /tmp/zombiepipe"],
819 )
820 .await
821 .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))?
822 .map_err(|err| {
823 ProviderError::PauseNodeFailed(
824 self.name.to_string(),
825 anyhow!("error when pausing node: status {}: {}", err.0, err.1),
826 )
827 })?;
828
829 Ok(())
830 }
831
832 async fn resume(&self) -> Result<(), ProviderError> {
833 self.k8s_client
834 .pod_exec(
835 &self.namespace_name(),
836 &self.name,
837 vec!["sh", "-c", "echo resume > /tmp/zombiepipe"],
838 )
839 .await
840 .map_err(|err| ProviderError::ResumeNodeFailed(self.name.to_string(), err.into()))?
841 .map_err(|err| {
842 ProviderError::ResumeNodeFailed(
843 self.name.to_string(),
844 anyhow!("error when pausing node: status {}: {}", err.0, err.1),
845 )
846 })?;
847
848 Ok(())
849 }
850
851 async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
852 if let Some(duration) = after {
853 sleep(duration).await;
854 }
855
856 self.k8s_client
857 .pod_exec(
858 &self.namespace_name(),
859 &self.name,
860 vec!["sh", "-c", "echo restart > /tmp/zombiepipe"],
861 )
862 .await
863 .map_err(|err| ProviderError::RestartNodeFailed(self.name.to_string(), err.into()))?
864 .map_err(|err| {
865 ProviderError::RestartNodeFailed(
866 self.name.to_string(),
867 anyhow!("error when restarting node: status {}: {}", err.0, err.1),
868 )
869 })?;
870
871 Ok(())
872 }
873
874 async fn destroy(&self) -> Result<(), ProviderError> {
875 self.k8s_client
876 .delete_pod(&self.namespace_name(), &self.name)
877 .await
878 .map_err(|err| ProviderError::KillNodeFailed(self.name.to_string(), err.into()))?;
879
880 if let Some(namespace) = self.namespace.upgrade() {
881 namespace.nodes.write().await.remove(&self.name);
882 }
883
884 Ok(())
885 }
886}