zombienet_provider/kubernetes/
client.rs

1use std::{
2    collections::BTreeMap, fmt::Debug, os::unix::process::ExitStatusExt, process::ExitStatus,
3    time::Duration,
4};
5
6use anyhow::anyhow;
7use futures::{StreamExt, TryStreamExt};
8use k8s_openapi::api::core::v1::{
9    ConfigMap, Namespace, Pod, PodSpec, PodStatus, Service, ServiceSpec,
10};
11use kube::{
12    api::{AttachParams, DeleteParams, ListParams, LogParams, PostParams, WatchParams},
13    core::{DynamicObject, GroupVersionKind, ObjectMeta, TypeMeta, WatchEvent},
14    discovery::ApiResource,
15    runtime::{conditions, wait::await_condition},
16    Api, Resource,
17};
18use serde::de::DeserializeOwned;
19use support::constants::THIS_IS_A_BUG;
20use tokio::{
21    io::{AsyncRead, ErrorKind},
22    net::TcpListener,
23    task::JoinHandle,
24};
25use tokio_util::compat::FuturesAsyncReadCompatExt;
26use tracing::{debug, trace};
27
28use crate::{constants::LOCALHOST, types::ExecutionResult};
29
30#[derive(thiserror::Error, Debug)]
31#[error(transparent)]
32pub struct Error(#[from] anyhow::Error);
33
34pub type Result<T> = core::result::Result<T, Error>;
35
36#[derive(Clone)]
37pub struct KubernetesClient {
38    inner: kube::Client,
39}
40
41impl KubernetesClient {
42    pub(super) async fn new() -> Result<Self> {
43        Ok(Self {
44            // TODO: make it more flexible with path to kube config
45            inner: kube::Client::try_default()
46                .await
47                .map_err(|err| Error::from(anyhow!("error initializing kubers client: {err}")))?,
48        })
49    }
50
51    #[allow(dead_code)]
52    pub(super) async fn get_namespace(&self, name: &str) -> Result<Option<Namespace>> {
53        Api::<Namespace>::all(self.inner.clone())
54            .get_opt(name.as_ref())
55            .await
56            .map_err(|err| Error::from(anyhow!("error while getting namespace {name}: {err}")))
57    }
58
59    #[allow(dead_code)]
60    pub(super) async fn get_namespaces(&self) -> Result<Vec<Namespace>> {
61        Ok(Api::<Namespace>::all(self.inner.clone())
62            .list(&ListParams::default())
63            .await
64            .map_err(|err| Error::from(anyhow!("error while getting all namespaces: {err}")))?
65            .into_iter()
66            .filter(|ns| matches!(&ns.meta().name, Some(name) if name.starts_with("zombienet")))
67            .collect())
68    }
69
70    pub(super) async fn create_namespace(
71        &self,
72        name: &str,
73        labels: BTreeMap<String, String>,
74    ) -> Result<Namespace> {
75        let namespaces = Api::<Namespace>::all(self.inner.clone());
76
77        let namespace = Namespace {
78            metadata: ObjectMeta {
79                name: Some(name.to_string()),
80                labels: Some(labels),
81                ..Default::default()
82            },
83            ..Default::default()
84        };
85
86        namespaces
87            .create(&PostParams::default(), &namespace)
88            .await
89            .map_err(|err| Error::from(anyhow!("error while created namespace {name}: {err}")))?;
90
91        self.wait_created(namespaces, name).await?;
92
93        Ok(namespace)
94    }
95
96    pub(super) async fn delete_namespace(&self, name: &str) -> Result<()> {
97        let namespaces = Api::<Namespace>::all(self.inner.clone());
98
99        namespaces
100            .delete(name, &DeleteParams::default())
101            .await
102            .map_err(|err| Error::from(anyhow!("error while deleting namespace {name}: {err}")))?;
103
104        Ok(())
105    }
106
107    pub(super) async fn create_config_map_from_file(
108        &self,
109        namespace: &str,
110        name: &str,
111        file_name: &str,
112        file_contents: &str,
113        labels: BTreeMap<String, String>,
114    ) -> Result<ConfigMap> {
115        let config_maps = Api::<ConfigMap>::namespaced(self.inner.clone(), namespace);
116
117        let config_map = ConfigMap {
118            metadata: ObjectMeta {
119                name: Some(name.to_string()),
120                namespace: Some(namespace.to_string()),
121                labels: Some(labels),
122                ..Default::default()
123            },
124            data: Some(BTreeMap::from([(
125                file_name.to_string(),
126                file_contents.to_string(),
127            )])),
128            ..Default::default()
129        };
130
131        config_maps
132            .create(&PostParams::default(), &config_map)
133            .await
134            .map_err(|err| {
135                Error::from(anyhow!(
136                    "error while creating config map {name} for {file_name}: {err}"
137                ))
138            })?;
139
140        self.wait_created(config_maps, name).await?;
141
142        Ok(config_map)
143    }
144
145    pub(super) async fn create_pod(
146        &self,
147        namespace: &str,
148        name: &str,
149        spec: PodSpec,
150        labels: BTreeMap<String, String>,
151    ) -> Result<Pod> {
152        let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
153
154        let pod = Pod {
155            metadata: ObjectMeta {
156                name: Some(name.to_string()),
157                namespace: Some(namespace.to_string()),
158                labels: Some(labels),
159                ..Default::default()
160            },
161            spec: Some(spec),
162            ..Default::default()
163        };
164
165        pods.create(&PostParams::default(), &pod)
166            .await
167            .map_err(|err| Error::from(anyhow!("error while creating pod {name}: {err}")))?;
168
169        trace!("Pod {name} checking for ready state!");
170        let wait_ready = await_condition(pods, name, helpers::is_pod_ready());
171        // TODO: we should use the `node_spawn_timeout` from global settings here.
172        let _ = tokio::time::timeout(Duration::from_secs(600), wait_ready)
173            .await
174            .map_err(|err| {
175                Error::from(anyhow!("error while awaiting pod {name} running: {err}"))
176            })?;
177
178        debug!("Pod {name} is Ready!");
179        Ok(pod)
180    }
181
182    pub(super) async fn pod_logs(&self, namespace: &str, name: &str) -> Result<String> {
183        Api::<Pod>::namespaced(self.inner.clone(), namespace)
184            .logs(
185                name,
186                &LogParams {
187                    pretty: true,
188                    timestamps: true,
189                    ..Default::default()
190                },
191            )
192            .await
193            .map_err(|err| Error::from(anyhow!("error while getting logs for pod {name}: {err}")))
194    }
195
196    pub(super) async fn pod_status(&self, namespace: &str, name: &str) -> Result<PodStatus> {
197        let pod = Api::<Pod>::namespaced(self.inner.clone(), namespace)
198            .get(name)
199            .await
200            .map_err(|err| Error::from(anyhow!("error while getting pod {name}: {err}")))?;
201
202        let status = pod.status.ok_or(Error::from(anyhow!(
203            "error while getting status for pod {name}"
204        )))?;
205        Ok(status)
206    }
207
208    #[allow(dead_code)]
209    pub(super) async fn create_pod_logs_stream(
210        &self,
211        namespace: &str,
212        name: &str,
213    ) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
214        Ok(Box::new(
215            Api::<Pod>::namespaced(self.inner.clone(), namespace)
216                .log_stream(
217                    name,
218                    &LogParams {
219                        follow: true,
220                        pretty: true,
221                        timestamps: true,
222                        ..Default::default()
223                    },
224                )
225                .await
226                .map_err(|err| {
227                    Error::from(anyhow!(
228                        "error while getting a log stream for {name}: {err}"
229                    ))
230                })?
231                .compat(),
232        ))
233    }
234
235    pub(super) async fn pod_exec<S>(
236        &self,
237        namespace: &str,
238        name: &str,
239        command: Vec<S>,
240    ) -> Result<ExecutionResult>
241    where
242        S: Into<String> + std::fmt::Debug + Send,
243    {
244        trace!("running command: {command:?} on pod {name} for ns {namespace}");
245        let mut process = Api::<Pod>::namespaced(self.inner.clone(), namespace)
246            .exec(
247                name,
248                command,
249                &AttachParams::default().stdout(true).stderr(true),
250            )
251            .await
252            .map_err(|err| Error::from(anyhow!("error while exec in the pod {name}: {err}")))?;
253
254        let stdout_stream = process.stdout().expect(&format!(
255            "stdout shouldn't be None when true passed to exec {THIS_IS_A_BUG}"
256        ));
257        let stdout = tokio_util::io::ReaderStream::new(stdout_stream)
258            .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
259            .collect::<Vec<_>>()
260            .await
261            .join("");
262        let stderr_stream = process.stderr().expect(&format!(
263            "stderr shouldn't be None when true passed to exec {THIS_IS_A_BUG}"
264        ));
265        let stderr = tokio_util::io::ReaderStream::new(stderr_stream)
266            .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
267            .collect::<Vec<_>>()
268            .await
269            .join("");
270
271        let status = process
272            .take_status()
273            .expect(&format!(
274                "first call to status shouldn't fail {THIS_IS_A_BUG}"
275            ))
276            .await;
277
278        // await process to finish
279        process.join().await.map_err(|err| {
280            Error::from(anyhow!(
281                "error while joining process during exec for {name}: {err}"
282            ))
283        })?;
284
285        match status {
286            // command succeeded with stdout
287            Some(status) if status.status.as_ref().is_some_and(|s| s == "Success") => {
288                Ok(Ok(stdout))
289            },
290            // command failed
291            Some(status) if status.status.as_ref().is_some_and(|s| s == "Failure") => {
292                match status.reason {
293                    // due to exit code
294                    Some(reason) if reason == "NonZeroExitCode" => {
295                        let exit_status = status
296                            .details
297                            .and_then(|details| {
298                                details.causes.and_then(|causes| {
299                                    causes.first().and_then(|cause| {
300                                        cause.message.as_deref().and_then(|message| {
301                                            message.parse::<i32>().ok().map(ExitStatus::from_raw)
302                                        })
303                                    })
304                                })
305                            })
306                            .expect(
307                                &format!("command with non-zero exit code should have exit code present {THIS_IS_A_BUG}")
308                            );
309
310                        Ok(Err((exit_status, stderr)))
311                    },
312                    // due to other unknown reason
313                    Some(ref reason) => Err(Error::from(anyhow!(
314                        format!("unhandled reason while exec for {name}: {reason}, stderr: {stderr}, status: {status:?}")
315                    ))),
316                    None => {
317                        panic!("command had status but no reason was present, this is a bug")
318                    },
319                }
320            },
321            Some(_) => {
322                unreachable!("command had status but it didn't matches either Success or Failure, this is a bug from the kube.rs library itself");
323            },
324            None => {
325                panic!("command has no status following its execution, this is a bug");
326            },
327        }
328    }
329
330    pub(super) async fn delete_pod(&self, namespace: &str, name: &str) -> Result<()> {
331        let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
332
333        pods.delete(name, &DeleteParams::default())
334            .await
335            .map_err(|err| Error::from(anyhow!("error when deleting pod {name}: {err}")))?;
336
337        await_condition(pods, name, conditions::is_deleted(name))
338            .await
339            .map_err(|err| {
340                Error::from(anyhow!(
341                    "error when waiting for pod {name} to be deleted: {err}"
342                ))
343            })?;
344
345        Ok(())
346    }
347
348    pub(super) async fn create_service(
349        &self,
350        namespace: &str,
351        name: &str,
352        spec: ServiceSpec,
353        labels: BTreeMap<String, String>,
354    ) -> Result<Service> {
355        let services = Api::<Service>::namespaced(self.inner.clone(), namespace);
356
357        let service = Service {
358            metadata: ObjectMeta {
359                name: Some(name.to_string()),
360                namespace: Some(namespace.to_string()),
361                labels: Some(labels),
362                ..Default::default()
363            },
364            spec: Some(spec),
365            ..Default::default()
366        };
367
368        services
369            .create(&PostParams::default(), &service)
370            .await
371            .map_err(|err| Error::from(anyhow!("error while creating service {name}: {err}")))?;
372
373        Ok(service)
374    }
375
376    pub(super) async fn create_pod_port_forward(
377        &self,
378        namespace: &str,
379        name: &str,
380        local_port: u16,
381        remote_port: u16,
382    ) -> Result<(u16, JoinHandle<()>)> {
383        let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
384        let bind = TcpListener::bind((LOCALHOST, local_port))
385            .await
386            .map_err(|err| {
387                Error::from(anyhow!(
388                    "error binding port {local_port} for  pod {name}: {err}"
389                ))
390            })?;
391        let local_port = bind.local_addr().map_err(|err| Error(err.into()))?.port();
392        let name = name.to_string();
393
394        const MAX_FAILURES: usize = 5;
395        let monitor_handle = tokio::spawn(async move {
396            let mut consecutive_failures = 0;
397            loop {
398                let (mut client_conn, _) = match bind.accept().await {
399                    Ok(conn) => {
400                        consecutive_failures = 0;
401                        conn
402                    },
403                    Err(e) => {
404                        if consecutive_failures < MAX_FAILURES {
405                            trace!("Port-forward accept error: {e:?}, retrying in 1s");
406                            tokio::time::sleep(Duration::from_secs(1)).await;
407                            consecutive_failures += 1;
408                            continue;
409                        } else {
410                            trace!("Port-forward accept failed too many times, giving up");
411                            break;
412                        }
413                    },
414                };
415
416                let peer = match client_conn.peer_addr() {
417                    Ok(addr) => addr,
418                    Err(e) => {
419                        trace!("Failed to get peer address: {e:?}");
420                        break;
421                    },
422                };
423
424                trace!("new connection on local_port: {local_port}, peer: {peer}");
425                let (name, pods) = (name.clone(), pods.clone());
426
427                tokio::spawn(async move {
428                    loop {
429                        // Try to establish port-forward
430                        let mut forwarder = match pods.portforward(&name, &[remote_port]).await {
431                            Ok(f) => {
432                                consecutive_failures = 0;
433                                f
434                            },
435                            Err(e) => {
436                                consecutive_failures += 1;
437                                if consecutive_failures < MAX_FAILURES {
438                                    trace!("portforward failed to establish ({}/{}): {e:?}, retrying in 1s", 
439                                       consecutive_failures, MAX_FAILURES);
440                                    tokio::time::sleep(Duration::from_secs(1)).await;
441                                    continue;
442                                } else {
443                                    trace!("portforward failed to establish after {} attempts: {e:?}, closing connection", 
444                                       consecutive_failures);
445                                    break;
446                                }
447                            },
448                        };
449
450                        let mut upstream_conn = match forwarder.take_stream(remote_port) {
451                            Some(s) => s,
452                            None => {
453                                trace!("Failed to take stream for remote_port: {remote_port}, retrying in 1s");
454                                tokio::time::sleep(Duration::from_secs(1)).await;
455                                continue;
456                            },
457                        };
458
459                        match tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn)
460                            .await
461                        {
462                            Ok((_n1, _n2)) => {
463                                // EOF reached, close connection
464                                trace!("copy_bidirectional finished (EOF), closing connection");
465
466                                drop(upstream_conn);
467                                let _ = forwarder.join().await;
468
469                                break;
470                            },
471                            Err(e) => {
472                                let kind = e.kind();
473                                match kind {
474                                    ErrorKind::ConnectionReset
475                                    | ErrorKind::ConnectionAborted
476                                    | ErrorKind::ConnectionRefused
477                                    | ErrorKind::TimedOut => {
478                                        consecutive_failures += 1;
479                                        if consecutive_failures < MAX_FAILURES {
480                                            trace!("Network error ({kind:?}): {e:?}, retrying port-forward for this connection");
481                                            tokio::time::sleep(Duration::from_secs(1)).await;
482                                            continue;
483                                        } else {
484                                            trace!("portforward failed to establish after {} attempts: {e:?}, closing connection", 
485                                       consecutive_failures);
486                                            break;
487                                        }
488                                    },
489                                    _ => {
490                                        trace!("Non-network error ({kind:?}): {e:?}, closing connection");
491                                        break;
492                                    },
493                                }
494                            },
495                        }
496                    }
497                });
498
499                trace!("finished forwarder process for local port: {local_port}, peer: {peer}");
500            }
501        });
502
503        Ok((local_port, monitor_handle))
504    }
505
506    /// Create resources from yamls in `static-configs` directory
507    pub(super) async fn create_static_resource(
508        &self,
509        namespace: &str,
510        raw_manifest: &str,
511    ) -> Result<()> {
512        let tm: TypeMeta = serde_yaml::from_str(raw_manifest).map_err(|err| {
513            Error::from(anyhow!(
514                "error while extracting TypeMeta from manifest: {raw_manifest}: {err}"
515            ))
516        })?;
517        let gvk = GroupVersionKind::try_from(&tm).map_err(|err| {
518            Error::from(anyhow!(
519                "error while extracting GroupVersionKind from manifest: {raw_manifest}: {err}"
520            ))
521        })?;
522
523        let ar = ApiResource::from_gvk(&gvk);
524        let api: Api<DynamicObject> = Api::namespaced_with(self.inner.clone(), namespace, &ar);
525
526        api.create(
527            &PostParams::default(),
528            &serde_yaml::from_str(raw_manifest).unwrap(),
529        )
530        .await
531        .map_err(|err| {
532            Error::from(anyhow!(
533                "error while creating static-config {raw_manifest}: {err}"
534            ))
535        })?;
536
537        Ok(())
538    }
539
540    async fn wait_created<K>(&self, api: Api<K>, name: &str) -> Result<()>
541    where
542        K: Clone + DeserializeOwned + Debug,
543    {
544        let params = &WatchParams::default().fields(&format!("metadata.name={name}"));
545        let mut stream = api
546            .watch(params, "0")
547            .await
548            .map_err(|err| {
549                Error::from(anyhow!(
550                    "error while awaiting first response when resource {name} is created: {err}"
551                ))
552            })?
553            .boxed();
554
555        while let Some(status) = stream.try_next().await.map_err(|err| {
556            Error::from(anyhow!(
557                "error while awaiting next change after resource {name} is created: {err}"
558            ))
559        })? {
560            match status {
561                WatchEvent::Added(_) => break,
562                WatchEvent::Error(err) => Err(Error::from(anyhow!(
563                    "error while awaiting resource {name} is created: {err}"
564                )))?,
565                WatchEvent::Bookmark(_) => {
566                    // bookmark events are periodically sent as keep-alive/checkpoint, we should continue waiting
567                }
568                any_other_event => panic!("Unexpected event happened while creating '{name}' {THIS_IS_A_BUG}. Event: {any_other_event:?}"),
569            }
570        }
571
572        Ok(())
573    }
574}
575
576mod helpers {
577    use k8s_openapi::api::core::v1::Pod;
578    use kube::runtime::wait::Condition;
579    use tracing::trace;
580
581    /// An await condition for `Pod` that returns `true` once it is ready
582    /// based on [`kube::runtime::wait::conditions::is_pod_running`]
583    pub fn is_pod_ready() -> impl Condition<Pod> {
584        |obj: Option<&Pod>| {
585            if let Some(pod) = &obj {
586                if let Some(status) = &pod.status {
587                    if let Some(conditions) = &status.conditions {
588                        let ready = conditions
589                            .iter()
590                            .any(|cond| cond.status == "True" && cond.type_ == "Ready");
591
592                        if ready {
593                            trace!("{:#?}", status);
594                            return ready;
595                        }
596                    }
597                }
598            }
599            false
600        }
601    }
602}