1use std::{
2 collections::BTreeMap, fmt::Debug, os::unix::process::ExitStatusExt, process::ExitStatus,
3 time::Duration,
4};
5
6use anyhow::anyhow;
7use futures::{StreamExt, TryStreamExt};
8use k8s_openapi::api::core::v1::{
9 ConfigMap, Namespace, Pod, PodSpec, PodStatus, Service, ServiceSpec,
10};
11use kube::{
12 api::{AttachParams, DeleteParams, ListParams, LogParams, PostParams, WatchParams},
13 core::{DynamicObject, GroupVersionKind, ObjectMeta, TypeMeta, WatchEvent},
14 discovery::ApiResource,
15 runtime::{conditions, wait::await_condition},
16 Api, Resource,
17};
18use serde::de::DeserializeOwned;
19use support::constants::THIS_IS_A_BUG;
20use tokio::{
21 io::{AsyncRead, ErrorKind},
22 net::TcpListener,
23 task::JoinHandle,
24};
25use tokio_util::compat::FuturesAsyncReadCompatExt;
26use tracing::{debug, trace};
27
28use crate::{constants::LOCALHOST, types::ExecutionResult};
29
30#[derive(thiserror::Error, Debug)]
31#[error(transparent)]
32pub struct Error(#[from] anyhow::Error);
33
34pub type Result<T> = core::result::Result<T, Error>;
35
36#[derive(Clone)]
37pub struct KubernetesClient {
38 inner: kube::Client,
39}
40
41impl KubernetesClient {
42 pub(super) async fn new() -> Result<Self> {
43 Ok(Self {
44 inner: kube::Client::try_default()
46 .await
47 .map_err(|err| Error::from(anyhow!("error initializing kubers client: {err}")))?,
48 })
49 }
50
51 #[allow(dead_code)]
52 pub(super) async fn get_namespace(&self, name: &str) -> Result<Option<Namespace>> {
53 Api::<Namespace>::all(self.inner.clone())
54 .get_opt(name.as_ref())
55 .await
56 .map_err(|err| Error::from(anyhow!("error while getting namespace {name}: {err}")))
57 }
58
59 #[allow(dead_code)]
60 pub(super) async fn get_namespaces(&self) -> Result<Vec<Namespace>> {
61 Ok(Api::<Namespace>::all(self.inner.clone())
62 .list(&ListParams::default())
63 .await
64 .map_err(|err| Error::from(anyhow!("error while getting all namespaces: {err}")))?
65 .into_iter()
66 .filter(|ns| matches!(&ns.meta().name, Some(name) if name.starts_with("zombienet")))
67 .collect())
68 }
69
70 pub(super) async fn create_namespace(
71 &self,
72 name: &str,
73 labels: BTreeMap<String, String>,
74 ) -> Result<Namespace> {
75 let namespaces = Api::<Namespace>::all(self.inner.clone());
76
77 let namespace = Namespace {
78 metadata: ObjectMeta {
79 name: Some(name.to_string()),
80 labels: Some(labels),
81 ..Default::default()
82 },
83 ..Default::default()
84 };
85
86 namespaces
87 .create(&PostParams::default(), &namespace)
88 .await
89 .map_err(|err| Error::from(anyhow!("error while created namespace {name}: {err}")))?;
90
91 self.wait_created(namespaces, name).await?;
92
93 Ok(namespace)
94 }
95
96 pub(super) async fn delete_namespace(&self, name: &str) -> Result<()> {
97 let namespaces = Api::<Namespace>::all(self.inner.clone());
98
99 namespaces
100 .delete(name, &DeleteParams::default())
101 .await
102 .map_err(|err| Error::from(anyhow!("error while deleting namespace {name}: {err}")))?;
103
104 Ok(())
105 }
106
107 pub(super) async fn create_config_map_from_file(
108 &self,
109 namespace: &str,
110 name: &str,
111 file_name: &str,
112 file_contents: &str,
113 labels: BTreeMap<String, String>,
114 ) -> Result<ConfigMap> {
115 let config_maps = Api::<ConfigMap>::namespaced(self.inner.clone(), namespace);
116
117 let config_map = ConfigMap {
118 metadata: ObjectMeta {
119 name: Some(name.to_string()),
120 namespace: Some(namespace.to_string()),
121 labels: Some(labels),
122 ..Default::default()
123 },
124 data: Some(BTreeMap::from([(
125 file_name.to_string(),
126 file_contents.to_string(),
127 )])),
128 ..Default::default()
129 };
130
131 config_maps
132 .create(&PostParams::default(), &config_map)
133 .await
134 .map_err(|err| {
135 Error::from(anyhow!(
136 "error while creating config map {name} for {file_name}: {err}"
137 ))
138 })?;
139
140 self.wait_created(config_maps, name).await?;
141
142 Ok(config_map)
143 }
144
145 pub(super) async fn create_pod(
146 &self,
147 namespace: &str,
148 name: &str,
149 spec: PodSpec,
150 labels: BTreeMap<String, String>,
151 ) -> Result<Pod> {
152 let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
153
154 let pod = Pod {
155 metadata: ObjectMeta {
156 name: Some(name.to_string()),
157 namespace: Some(namespace.to_string()),
158 labels: Some(labels),
159 ..Default::default()
160 },
161 spec: Some(spec),
162 ..Default::default()
163 };
164
165 pods.create(&PostParams::default(), &pod)
166 .await
167 .map_err(|err| Error::from(anyhow!("error while creating pod {name}: {err}")))?;
168
169 trace!("Pod {name} checking for ready state!");
170 let wait_ready = await_condition(pods, name, helpers::is_pod_ready());
171 let _ = tokio::time::timeout(Duration::from_secs(600), wait_ready)
173 .await
174 .map_err(|err| {
175 Error::from(anyhow!("error while awaiting pod {name} running: {err}"))
176 })?;
177
178 debug!("Pod {name} is Ready!");
179 Ok(pod)
180 }
181
182 pub(super) async fn pod_logs(&self, namespace: &str, name: &str) -> Result<String> {
183 Api::<Pod>::namespaced(self.inner.clone(), namespace)
184 .logs(
185 name,
186 &LogParams {
187 pretty: true,
188 timestamps: true,
189 ..Default::default()
190 },
191 )
192 .await
193 .map_err(|err| Error::from(anyhow!("error while getting logs for pod {name}: {err}")))
194 }
195
196 pub(super) async fn pod_status(&self, namespace: &str, name: &str) -> Result<PodStatus> {
197 let pod = Api::<Pod>::namespaced(self.inner.clone(), namespace)
198 .get(name)
199 .await
200 .map_err(|err| Error::from(anyhow!("error while getting pod {name}: {err}")))?;
201
202 let status = pod.status.ok_or(Error::from(anyhow!(
203 "error while getting status for pod {name}"
204 )))?;
205 Ok(status)
206 }
207
208 #[allow(dead_code)]
209 pub(super) async fn create_pod_logs_stream(
210 &self,
211 namespace: &str,
212 name: &str,
213 ) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
214 Ok(Box::new(
215 Api::<Pod>::namespaced(self.inner.clone(), namespace)
216 .log_stream(
217 name,
218 &LogParams {
219 follow: true,
220 pretty: true,
221 timestamps: true,
222 ..Default::default()
223 },
224 )
225 .await
226 .map_err(|err| {
227 Error::from(anyhow!(
228 "error while getting a log stream for {name}: {err}"
229 ))
230 })?
231 .compat(),
232 ))
233 }
234
235 pub(super) async fn pod_exec<S>(
236 &self,
237 namespace: &str,
238 name: &str,
239 command: Vec<S>,
240 ) -> Result<ExecutionResult>
241 where
242 S: Into<String> + std::fmt::Debug + Send,
243 {
244 trace!("running command: {command:?} on pod {name} for ns {namespace}");
245 let mut process = Api::<Pod>::namespaced(self.inner.clone(), namespace)
246 .exec(
247 name,
248 command,
249 &AttachParams::default().stdout(true).stderr(true),
250 )
251 .await
252 .map_err(|err| Error::from(anyhow!("error while exec in the pod {name}: {err}")))?;
253
254 let stdout_stream = process.stdout().expect(&format!(
255 "stdout shouldn't be None when true passed to exec {THIS_IS_A_BUG}"
256 ));
257 let stdout = tokio_util::io::ReaderStream::new(stdout_stream)
258 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
259 .collect::<Vec<_>>()
260 .await
261 .join("");
262 let stderr_stream = process.stderr().expect(&format!(
263 "stderr shouldn't be None when true passed to exec {THIS_IS_A_BUG}"
264 ));
265 let stderr = tokio_util::io::ReaderStream::new(stderr_stream)
266 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
267 .collect::<Vec<_>>()
268 .await
269 .join("");
270
271 let status = process
272 .take_status()
273 .expect(&format!(
274 "first call to status shouldn't fail {THIS_IS_A_BUG}"
275 ))
276 .await;
277
278 process.join().await.map_err(|err| {
280 Error::from(anyhow!(
281 "error while joining process during exec for {name}: {err}"
282 ))
283 })?;
284
285 match status {
286 Some(status) if status.status.as_ref().is_some_and(|s| s == "Success") => {
288 Ok(Ok(stdout))
289 },
290 Some(status) if status.status.as_ref().is_some_and(|s| s == "Failure") => {
292 match status.reason {
293 Some(reason) if reason == "NonZeroExitCode" => {
295 let exit_status = status
296 .details
297 .and_then(|details| {
298 details.causes.and_then(|causes| {
299 causes.first().and_then(|cause| {
300 cause.message.as_deref().and_then(|message| {
301 message.parse::<i32>().ok().map(ExitStatus::from_raw)
302 })
303 })
304 })
305 })
306 .expect(
307 &format!("command with non-zero exit code should have exit code present {THIS_IS_A_BUG}")
308 );
309
310 Ok(Err((exit_status, stderr)))
311 },
312 Some(ref reason) => Err(Error::from(anyhow!(
314 format!("unhandled reason while exec for {name}: {reason}, stderr: {stderr}, status: {status:?}")
315 ))),
316 None => {
317 panic!("command had status but no reason was present, this is a bug")
318 },
319 }
320 },
321 Some(_) => {
322 unreachable!("command had status but it didn't matches either Success or Failure, this is a bug from the kube.rs library itself");
323 },
324 None => {
325 panic!("command has no status following its execution, this is a bug");
326 },
327 }
328 }
329
330 pub(super) async fn delete_pod(&self, namespace: &str, name: &str) -> Result<()> {
331 let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
332
333 pods.delete(name, &DeleteParams::default())
334 .await
335 .map_err(|err| Error::from(anyhow!("error when deleting pod {name}: {err}")))?;
336
337 await_condition(pods, name, conditions::is_deleted(name))
338 .await
339 .map_err(|err| {
340 Error::from(anyhow!(
341 "error when waiting for pod {name} to be deleted: {err}"
342 ))
343 })?;
344
345 Ok(())
346 }
347
348 pub(super) async fn create_service(
349 &self,
350 namespace: &str,
351 name: &str,
352 spec: ServiceSpec,
353 labels: BTreeMap<String, String>,
354 ) -> Result<Service> {
355 let services = Api::<Service>::namespaced(self.inner.clone(), namespace);
356
357 let service = Service {
358 metadata: ObjectMeta {
359 name: Some(name.to_string()),
360 namespace: Some(namespace.to_string()),
361 labels: Some(labels),
362 ..Default::default()
363 },
364 spec: Some(spec),
365 ..Default::default()
366 };
367
368 services
369 .create(&PostParams::default(), &service)
370 .await
371 .map_err(|err| Error::from(anyhow!("error while creating service {name}: {err}")))?;
372
373 Ok(service)
374 }
375
376 pub(super) async fn create_pod_port_forward(
377 &self,
378 namespace: &str,
379 name: &str,
380 local_port: u16,
381 remote_port: u16,
382 ) -> Result<(u16, JoinHandle<()>)> {
383 let pods = Api::<Pod>::namespaced(self.inner.clone(), namespace);
384 let bind = TcpListener::bind((LOCALHOST, local_port))
385 .await
386 .map_err(|err| {
387 Error::from(anyhow!(
388 "error binding port {local_port} for pod {name}: {err}"
389 ))
390 })?;
391 let local_port = bind.local_addr().map_err(|err| Error(err.into()))?.port();
392 let name = name.to_string();
393
394 const MAX_FAILURES: usize = 5;
395 let monitor_handle = tokio::spawn(async move {
396 let mut consecutive_failures = 0;
397 loop {
398 let (mut client_conn, _) = match bind.accept().await {
399 Ok(conn) => {
400 consecutive_failures = 0;
401 conn
402 },
403 Err(e) => {
404 if consecutive_failures < MAX_FAILURES {
405 trace!("Port-forward accept error: {e:?}, retrying in 1s");
406 tokio::time::sleep(Duration::from_secs(1)).await;
407 consecutive_failures += 1;
408 continue;
409 } else {
410 trace!("Port-forward accept failed too many times, giving up");
411 break;
412 }
413 },
414 };
415
416 let peer = match client_conn.peer_addr() {
417 Ok(addr) => addr,
418 Err(e) => {
419 trace!("Failed to get peer address: {e:?}");
420 break;
421 },
422 };
423
424 trace!("new connection on local_port: {local_port}, peer: {peer}");
425 let (name, pods) = (name.clone(), pods.clone());
426
427 tokio::spawn(async move {
428 loop {
429 let mut forwarder = match pods.portforward(&name, &[remote_port]).await {
431 Ok(f) => {
432 consecutive_failures = 0;
433 f
434 },
435 Err(e) => {
436 consecutive_failures += 1;
437 if consecutive_failures < MAX_FAILURES {
438 trace!("portforward failed to establish ({}/{}): {e:?}, retrying in 1s",
439 consecutive_failures, MAX_FAILURES);
440 tokio::time::sleep(Duration::from_secs(1)).await;
441 continue;
442 } else {
443 trace!("portforward failed to establish after {} attempts: {e:?}, closing connection",
444 consecutive_failures);
445 break;
446 }
447 },
448 };
449
450 let mut upstream_conn = match forwarder.take_stream(remote_port) {
451 Some(s) => s,
452 None => {
453 trace!("Failed to take stream for remote_port: {remote_port}, retrying in 1s");
454 tokio::time::sleep(Duration::from_secs(1)).await;
455 continue;
456 },
457 };
458
459 match tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn)
460 .await
461 {
462 Ok((_n1, _n2)) => {
463 trace!("copy_bidirectional finished (EOF), closing connection");
465
466 drop(upstream_conn);
467 let _ = forwarder.join().await;
468
469 break;
470 },
471 Err(e) => {
472 let kind = e.kind();
473 match kind {
474 ErrorKind::ConnectionReset
475 | ErrorKind::ConnectionAborted
476 | ErrorKind::ConnectionRefused
477 | ErrorKind::TimedOut => {
478 consecutive_failures += 1;
479 if consecutive_failures < MAX_FAILURES {
480 trace!("Network error ({kind:?}): {e:?}, retrying port-forward for this connection");
481 tokio::time::sleep(Duration::from_secs(1)).await;
482 continue;
483 } else {
484 trace!("portforward failed to establish after {} attempts: {e:?}, closing connection",
485 consecutive_failures);
486 break;
487 }
488 },
489 _ => {
490 trace!("Non-network error ({kind:?}): {e:?}, closing connection");
491 break;
492 },
493 }
494 },
495 }
496 }
497 });
498
499 trace!("finished forwarder process for local port: {local_port}, peer: {peer}");
500 }
501 });
502
503 Ok((local_port, monitor_handle))
504 }
505
506 pub(super) async fn create_static_resource(
508 &self,
509 namespace: &str,
510 raw_manifest: &str,
511 ) -> Result<()> {
512 let tm: TypeMeta = serde_yaml::from_str(raw_manifest).map_err(|err| {
513 Error::from(anyhow!(
514 "error while extracting TypeMeta from manifest: {raw_manifest}: {err}"
515 ))
516 })?;
517 let gvk = GroupVersionKind::try_from(&tm).map_err(|err| {
518 Error::from(anyhow!(
519 "error while extracting GroupVersionKind from manifest: {raw_manifest}: {err}"
520 ))
521 })?;
522
523 let ar = ApiResource::from_gvk(&gvk);
524 let api: Api<DynamicObject> = Api::namespaced_with(self.inner.clone(), namespace, &ar);
525
526 api.create(
527 &PostParams::default(),
528 &serde_yaml::from_str(raw_manifest).unwrap(),
529 )
530 .await
531 .map_err(|err| {
532 Error::from(anyhow!(
533 "error while creating static-config {raw_manifest}: {err}"
534 ))
535 })?;
536
537 Ok(())
538 }
539
540 async fn wait_created<K>(&self, api: Api<K>, name: &str) -> Result<()>
541 where
542 K: Clone + DeserializeOwned + Debug,
543 {
544 let params = &WatchParams::default().fields(&format!("metadata.name={name}"));
545 let mut stream = api
546 .watch(params, "0")
547 .await
548 .map_err(|err| {
549 Error::from(anyhow!(
550 "error while awaiting first response when resource {name} is created: {err}"
551 ))
552 })?
553 .boxed();
554
555 while let Some(status) = stream.try_next().await.map_err(|err| {
556 Error::from(anyhow!(
557 "error while awaiting next change after resource {name} is created: {err}"
558 ))
559 })? {
560 match status {
561 WatchEvent::Added(_) => break,
562 WatchEvent::Error(err) => Err(Error::from(anyhow!(
563 "error while awaiting resource {name} is created: {err}"
564 )))?,
565 WatchEvent::Bookmark(_) => {
566 }
568 any_other_event => panic!("Unexpected event happened while creating '{name}' {THIS_IS_A_BUG}. Event: {any_other_event:?}"),
569 }
570 }
571
572 Ok(())
573 }
574}
575
576mod helpers {
577 use k8s_openapi::api::core::v1::Pod;
578 use kube::runtime::wait::Condition;
579 use tracing::trace;
580
581 pub fn is_pod_ready() -> impl Condition<Pod> {
584 |obj: Option<&Pod>| {
585 if let Some(pod) = &obj {
586 if let Some(status) = &pod.status {
587 if let Some(conditions) = &status.conditions {
588 let ready = conditions
589 .iter()
590 .any(|cond| cond.status == "True" && cond.type_ == "Ready");
591
592 if ready {
593 trace!("{:#?}", status);
594 return ready;
595 }
596 }
597 }
598 }
599 false
600 }
601 }
602}