1use std::{
2 collections::HashMap,
3 env,
4 path::{Path, PathBuf},
5 process::Stdio,
6 sync::{Arc, Weak},
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use async_trait::async_trait;
12use configuration::types::AssetLocation;
13use flate2::read::GzDecoder;
14use futures::future::try_join_all;
15use nix::{
16 sys::signal::{kill, Signal},
17 unistd::Pid,
18};
19use serde::{ser::Error, Deserialize, Serialize, Serializer};
20use sha2::Digest;
21use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
22use tar::Archive;
23use tokio::{
24 fs,
25 io::{AsyncRead, AsyncReadExt, BufReader},
26 process::{Child, ChildStderr, ChildStdout, Command},
27 sync::{
28 mpsc::{self, Sender},
29 RwLock,
30 },
31 task::JoinHandle,
32 time::sleep,
33 try_join,
34};
35use tracing::trace;
36
37use super::namespace::NativeNamespace;
38use crate::{
39 constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR},
40 native,
41 types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile},
42 ProviderError, ProviderNamespace, ProviderNode,
43};
44
45pub(super) struct NativeNodeOptions<'a, FS>
46where
47 FS: FileSystem + Send + Sync + Clone + 'static,
48{
49 pub(super) namespace: &'a Weak<NativeNamespace<FS>>,
50 pub(super) namespace_base_dir: &'a PathBuf,
51 pub(super) name: &'a str,
52 pub(super) program: &'a str,
53 pub(super) args: &'a [String],
54 pub(super) env: &'a [(String, String)],
55 pub(super) startup_files: &'a [TransferedFile],
56 pub(super) created_paths: &'a [PathBuf],
57 pub(super) db_snapshot: Option<&'a AssetLocation>,
58 pub(super) filesystem: &'a FS,
59 pub(super) node_log_path: Option<&'a PathBuf>,
60}
61
62impl<'a, FS> NativeNodeOptions<'a, FS>
63where
64 FS: FileSystem + Send + Sync + Clone + 'static,
65{
66 pub(super) fn from_deserializable(
67 deserializable: &'a DeserializableNativeNodeOptions,
68 namespace: &'a Weak<NativeNamespace<FS>>,
69 namespace_base_dir: &'a PathBuf,
70 filesystem: &'a FS,
71 ) -> NativeNodeOptions<'a, FS> {
72 NativeNodeOptions {
73 namespace,
74 namespace_base_dir,
75 name: &deserializable.name,
76 program: &deserializable.program,
77 args: &deserializable.args,
78 env: &deserializable.env,
79 startup_files: &[],
80 created_paths: &[],
81 db_snapshot: None,
82 filesystem,
83 node_log_path: deserializable.node_log_path.as_ref(),
84 }
85 }
86}
87
88#[derive(Deserialize)]
89pub(super) struct DeserializableNativeNodeOptions {
90 pub name: String,
91 pub program: String,
92 pub args: Vec<String>,
93 pub env: Vec<(String, String)>,
94 pub node_log_path: Option<PathBuf>,
95}
96
97enum ProcessHandle {
98 Spawned(Child, Pid),
99 Attached(Pid),
100}
101
102#[derive(Serialize)]
103pub(super) struct NativeNode<FS>
104where
105 FS: FileSystem + Send + Sync + Clone,
106{
107 #[serde(skip)]
108 namespace: Weak<NativeNamespace<FS>>,
109 name: String,
110 program: String,
111 args: Vec<String>,
112 env: Vec<(String, String)>,
113 base_dir: PathBuf,
114 config_dir: PathBuf,
115 data_dir: PathBuf,
116 relay_data_dir: PathBuf,
117 scripts_dir: PathBuf,
118 log_path: PathBuf,
119 #[serde(serialize_with = "serialize_process_handle")]
120 process_handle: std::sync::RwLock<Option<ProcessHandle>>,
123 #[serde(skip)]
124 stdout_reading_task: RwLock<Option<JoinHandle<()>>>,
125 #[serde(skip)]
126 stderr_reading_task: RwLock<Option<JoinHandle<()>>>,
127 #[serde(skip)]
128 log_writing_task: RwLock<Option<JoinHandle<()>>>,
129 #[serde(skip)]
130 filesystem: FS,
131 provider_tag: String,
132}
133
134impl<FS> NativeNode<FS>
135where
136 FS: FileSystem + Send + Sync + Clone + 'static,
137{
138 pub(super) async fn new(
139 options: NativeNodeOptions<'_, FS>,
140 ) -> Result<Arc<Self>, ProviderError> {
141 let filesystem = options.filesystem.clone();
142
143 let base_dir =
144 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
145 trace!("creating base_dir {:?}", base_dir);
146 options.filesystem.create_dir_all(&base_dir).await?;
147 trace!("created base_dir {:?}", base_dir);
148
149 let base_dir_raw = base_dir.to_string_lossy();
150 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
151 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
152 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
153 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
154 let log_path = options
155 .node_log_path
156 .cloned()
157 .unwrap_or_else(|| base_dir.join(format!("{}.log", options.name)));
158
159 trace!("creating dirs {:?}", config_dir);
160 try_join!(
161 filesystem.create_dir_all(&config_dir),
162 filesystem.create_dir_all(&data_dir),
163 filesystem.create_dir_all(&relay_data_dir),
164 filesystem.create_dir_all(&scripts_dir),
165 )?;
166 trace!("created!");
167
168 let node = Arc::new(NativeNode {
169 namespace: options.namespace.clone(),
170 name: options.name.to_string(),
171 program: options.program.to_string(),
172 args: options.args.to_vec(),
173 env: options.env.to_vec(),
174 base_dir,
175 config_dir,
176 data_dir,
177 relay_data_dir,
178 scripts_dir,
179 log_path,
180 process_handle: std::sync::RwLock::new(None),
181 stdout_reading_task: RwLock::new(None),
182 stderr_reading_task: RwLock::new(None),
183 log_writing_task: RwLock::new(None),
184 filesystem: filesystem.clone(),
185 provider_tag: native::provider::PROVIDER_NAME.to_string(),
186 });
187
188 node.initialize_startup_paths(options.created_paths).await?;
189 node.initialize_startup_files(options.startup_files).await?;
190
191 if let Some(db_snap) = options.db_snapshot {
192 node.initialize_db_snapshot(db_snap).await?;
193 }
194
195 let (stdout, stderr) = node.initialize_process().await?;
196
197 node.initialize_log_writing(stdout, stderr).await;
198
199 Ok(node)
200 }
201
202 pub(super) async fn attach_to_live(
203 options: NativeNodeOptions<'_, FS>,
204 pid: i32,
205 ) -> Result<Arc<Self>, ProviderError> {
206 let filesystem = options.filesystem.clone();
207
208 let base_dir =
209 PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]);
210 trace!("creating base_dir {:?}", base_dir);
211 options.filesystem.create_dir_all(&base_dir).await?;
212 trace!("created base_dir {:?}", base_dir);
213
214 let base_dir_raw = base_dir.to_string_lossy();
215 let config_dir = PathBuf::from(format!("{base_dir_raw}{NODE_CONFIG_DIR}"));
216 let data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_DATA_DIR}"));
217 let relay_data_dir = PathBuf::from(format!("{base_dir_raw}{NODE_RELAY_DATA_DIR}"));
218 let scripts_dir = PathBuf::from(format!("{base_dir_raw}{NODE_SCRIPTS_DIR}"));
219 let log_path = options
220 .node_log_path
221 .cloned()
222 .unwrap_or_else(|| base_dir.join(format!("{}.log", options.name)));
223
224 let pid = Pid::from_raw(pid);
225
226 let node = Arc::new(NativeNode {
227 namespace: options.namespace.clone(),
228 name: options.name.to_string(),
229 program: options.program.to_string(),
230 args: options.args.to_vec(),
231 env: options.env.to_vec(),
232 base_dir,
233 config_dir,
234 data_dir,
235 relay_data_dir,
236 scripts_dir,
237 log_path,
238 process_handle: std::sync::RwLock::new(Some(ProcessHandle::Attached(pid))),
239 stdout_reading_task: RwLock::new(None),
240 stderr_reading_task: RwLock::new(None),
241 log_writing_task: RwLock::new(None),
242 filesystem: filesystem.clone(),
243 provider_tag: native::provider::PROVIDER_NAME.to_string(),
244 });
245
246 Ok(node)
247 }
248
249 async fn initialize_startup_paths(&self, paths: &[PathBuf]) -> Result<(), ProviderError> {
250 trace!("creating paths {:?}", paths);
251 let base_dir_raw = self.base_dir.to_string_lossy();
252 try_join_all(paths.iter().map(|file| {
253 let full_path = format!("{base_dir_raw}{}", file.to_string_lossy());
254 self.filesystem.create_dir_all(full_path)
255 }))
256 .await?;
257 trace!("paths created!");
258
259 Ok(())
260 }
261
262 async fn initialize_startup_files(
263 &self,
264 startup_files: &[TransferedFile],
265 ) -> Result<(), ProviderError> {
266 trace!("creating files {:?}", startup_files);
267 try_join_all(
268 startup_files
269 .iter()
270 .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)),
271 )
272 .await?;
273 trace!("files created!");
274
275 Ok(())
276 }
277
278 async fn initialize_db_snapshot(
279 &self,
280 db_snapshot: &AssetLocation,
281 ) -> Result<(), ProviderError> {
282 trace!("snap: {db_snapshot}");
283
284 let ns_base_dir = self.namespace_base_dir();
286 let hashed_location = match db_snapshot {
287 AssetLocation::Url(location) => hex::encode(sha2::Sha256::digest(location.to_string())),
288 AssetLocation::FilePath(filepath) => {
289 hex::encode(sha2::Sha256::digest(filepath.to_string_lossy().to_string()))
290 },
291 };
292
293 let full_path = format!("{ns_base_dir}/{hashed_location}.tgz");
294 trace!("db_snap fullpath in ns: {full_path}");
295 if !self.filesystem.exists(&full_path).await {
296 self.get_db_snapshot(db_snapshot, &full_path).await?;
298 }
299
300 let contents = self.filesystem.read(&full_path).await.unwrap();
301 let gz = GzDecoder::new(&contents[..]);
302 let mut archive = Archive::new(gz);
303 archive
304 .unpack(self.base_dir.to_string_lossy().as_ref())
305 .unwrap();
306
307 if std::env::var("ZOMBIE_RM_TGZ_AFTER_EXTRACT").is_ok() {
308 let res = fs::remove_file(&full_path).await;
309 trace!("removing {}, result {:?}", full_path, res);
310 }
311
312 Ok(())
313 }
314
315 async fn get_db_snapshot(
316 &self,
317 location: &AssetLocation,
318 full_path: &str,
319 ) -> Result<(), ProviderError> {
320 trace!("getting db_snapshot from: {:?} to: {full_path}", location);
321 match location {
322 AssetLocation::Url(location) => {
323 let res = reqwest::get(location.as_ref())
324 .await
325 .map_err(|err| ProviderError::DownloadFile(location.to_string(), err.into()))?;
326
327 let contents: &[u8] = &res.bytes().await.unwrap();
328 trace!("writing: {full_path}");
329 self.filesystem.write(full_path, contents).await?;
330 },
331 AssetLocation::FilePath(filepath) => {
332 self.filesystem.copy(filepath, full_path).await?;
333 },
334 };
335
336 Ok(())
337 }
338
339 async fn initialize_process(&self) -> Result<(ChildStdout, ChildStderr), ProviderError> {
340 let filtered_env: HashMap<String, String> = env::vars()
341 .filter(|(k, _)| k == "TZ" || k == "LANG" || k == "PATH")
342 .collect();
343
344 let mut process = Command::new(&self.program)
345 .args(&self.args)
346 .env_clear()
347 .envs(&filtered_env) .envs(self.env.to_vec())
349 .stdin(Stdio::null())
350 .stdout(Stdio::piped())
351 .stderr(Stdio::piped())
352 .kill_on_drop(true)
353 .current_dir(&self.base_dir)
354 .spawn()
355 .map_err(|err| ProviderError::NodeSpawningFailed(self.name.to_string(), err.into()))?;
356 let stdout = process
357 .stdout
358 .take()
359 .expect(&format!("infaillible, stdout is piped {THIS_IS_A_BUG}"));
360 let stderr = process
361 .stderr
362 .take()
363 .expect(&format!("infaillible, stderr is piped {THIS_IS_A_BUG}"));
364
365 let pid = Pid::from_raw(
366 process
367 .id()
368 .ok_or_else(|| ProviderError::ProcessIdRetrievalFailed(self.name.to_string()))?
369 as i32,
370 );
371 self.process_handle
372 .write()
373 .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?
374 .replace(ProcessHandle::Spawned(process, pid));
375
376 Ok((stdout, stderr))
377 }
378
379 async fn initialize_log_writing(&self, stdout: ChildStdout, stderr: ChildStderr) {
380 let (stdout_tx, mut rx) = mpsc::channel(10);
381 let stderr_tx = stdout_tx.clone();
382
383 self.stdout_reading_task
384 .write()
385 .await
386 .replace(self.create_stream_polling_task(stdout, stdout_tx));
387 self.stderr_reading_task
388 .write()
389 .await
390 .replace(self.create_stream_polling_task(stderr, stderr_tx));
391
392 let filesystem = self.filesystem.clone();
393 let log_path = self.log_path.clone();
394
395 self.log_writing_task
396 .write()
397 .await
398 .replace(tokio::spawn(async move {
399 loop {
400 while let Some(Ok(data)) = rx.recv().await {
401 let _ = filesystem.append(&log_path, data).await;
403 }
404 sleep(Duration::from_millis(250)).await;
405 }
406 }));
407 }
408
409 fn create_stream_polling_task(
410 &self,
411 stream: impl AsyncRead + Unpin + Send + 'static,
412 tx: Sender<Result<Vec<u8>, std::io::Error>>,
413 ) -> JoinHandle<()> {
414 tokio::spawn(async move {
415 let mut reader = BufReader::new(stream);
416 let mut buffer = vec![0u8; 1024];
417
418 loop {
419 match reader.read(&mut buffer).await {
420 Ok(0) => {
421 let _ = tx.send(Ok(Vec::new())).await;
422 break;
423 },
424 Ok(n) => {
425 let _ = tx.send(Ok(buffer[..n].to_vec())).await;
426 },
427 Err(e) => {
428 let _ = tx.send(Err(e)).await;
429 break;
430 },
431 }
432 }
433 })
434 }
435
436 fn process_id(&self) -> Result<Pid, ProviderError> {
437 let pid = self
438 .process_handle
439 .read()
440 .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?
441 .as_ref()
442 .map(|handle| match handle {
443 ProcessHandle::Spawned(_, pid) => *pid,
444 ProcessHandle::Attached(pid) => *pid,
445 })
446 .ok_or_else(|| ProviderError::ProcessIdRetrievalFailed(self.name.to_string()))?;
447
448 Ok(pid)
449 }
450
451 pub(crate) async fn abort(&self) -> anyhow::Result<()> {
452 if let Some(task) = self.log_writing_task.write().await.take() {
453 task.abort();
454 }
455 if let Some(task) = self.stdout_reading_task.write().await.take() {
456 task.abort();
457 }
458 if let Some(task) = self.stderr_reading_task.write().await.take() {
459 task.abort();
460 }
461
462 let process_handle = {
463 let mut guard = self
464 .process_handle
465 .write()
466 .map_err(|_e| ProviderError::FailedToAcquireLock(self.name.clone()))?;
467 guard
468 .take()
469 .ok_or_else(|| anyhow!("no process was attached for the node"))?
470 };
471
472 match process_handle {
473 ProcessHandle::Spawned(mut child, _pid) => {
474 child.kill().await?;
475 },
476 ProcessHandle::Attached(pid) => {
477 kill(pid, Signal::SIGKILL)
478 .map_err(|err| anyhow!("Failed to kill attached process {pid}: {err}"))?;
479 },
480 }
481
482 Ok(())
483 }
484
485 fn namespace_base_dir(&self) -> String {
486 self.namespace
487 .upgrade()
488 .map(|namespace| namespace.base_dir().to_string_lossy().to_string())
489 .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {THIS_IS_A_BUG}"))
490 }
491}
492
493#[async_trait]
494impl<FS> ProviderNode for NativeNode<FS>
495where
496 FS: FileSystem + Send + Sync + Clone + 'static,
497{
498 fn name(&self) -> &str {
499 &self.name
500 }
501
502 fn args(&self) -> Vec<&str> {
503 self.args.iter().map(|arg| arg.as_str()).collect()
504 }
505
506 fn base_dir(&self) -> &PathBuf {
507 &self.base_dir
508 }
509
510 fn config_dir(&self) -> &PathBuf {
511 &self.config_dir
512 }
513
514 fn data_dir(&self) -> &PathBuf {
515 &self.data_dir
516 }
517
518 fn relay_data_dir(&self) -> &PathBuf {
519 &self.relay_data_dir
520 }
521
522 fn scripts_dir(&self) -> &PathBuf {
523 &self.scripts_dir
524 }
525
526 fn log_path(&self) -> &PathBuf {
527 &self.log_path
528 }
529
530 fn log_cmd(&self) -> String {
531 format!("tail -f {}", self.log_path().to_string_lossy())
532 }
533
534 fn path_in_node(&self, file: &Path) -> PathBuf {
535 let full_path = format!(
536 "{}/{}",
537 self.base_dir.to_string_lossy(),
538 file.to_string_lossy()
539 );
540 PathBuf::from(full_path)
541 }
542
543 async fn logs(&self) -> Result<String, ProviderError> {
544 Ok(self.filesystem.read_to_string(&self.log_path).await?)
545 }
546
547 async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
548 Ok(self.filesystem.copy(&self.log_path, local_dest).await?)
549 }
550
551 async fn run_command(
552 &self,
553 options: RunCommandOptions,
554 ) -> Result<ExecutionResult, ProviderError> {
555 let result = Command::new(options.program.clone())
556 .args(options.args.clone())
557 .envs(options.env)
558 .current_dir(&self.base_dir)
559 .output()
560 .await
561 .map_err(|err| {
562 ProviderError::RunCommandError(
563 format!("{} {}", &options.program, &options.args.join(" ")),
564 "locally".to_string(),
565 err.into(),
566 )
567 })?;
568
569 if result.status.success() {
570 Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
571 } else {
572 Ok(Err((
573 result.status,
574 String::from_utf8_lossy(&result.stderr).to_string(),
575 )))
576 }
577 }
578
579 async fn run_script(
580 &self,
581 options: RunScriptOptions,
582 ) -> Result<ExecutionResult, ProviderError> {
583 let local_script_path = PathBuf::from(&options.local_script_path);
584
585 if !self.filesystem.exists(&local_script_path).await {
586 return Err(ProviderError::ScriptNotFound(local_script_path));
587 }
588
589 let script_file_name = local_script_path
591 .file_name()
592 .map(|file_name| file_name.to_string_lossy().to_string())
593 .ok_or(ProviderError::InvalidScriptPath(anyhow!(
594 "Can't retrieve filename from script with path: {:?}",
595 options.local_script_path
596 )))?;
597 let remote_script_path = format!(
598 "{}/{}",
599 self.scripts_dir.to_string_lossy(),
600 script_file_name
601 );
602
603 self.filesystem
605 .copy(local_script_path, &remote_script_path)
606 .await?;
607 self.filesystem.set_mode(&remote_script_path, 0o744).await?;
608
609 self.run_command(RunCommandOptions {
611 program: remote_script_path,
612 args: options.args,
613 env: options.env,
614 })
615 .await
616 }
617
618 async fn send_file(
619 &self,
620 local_file_path: &Path,
621 remote_file_path: &Path,
622 mode: &str,
623 ) -> Result<(), ProviderError> {
624 let namespaced_remote_file_path = PathBuf::from(format!(
625 "{}{}",
626 &self.base_dir.to_string_lossy(),
627 remote_file_path.to_string_lossy()
628 ));
629
630 self.filesystem
631 .copy(local_file_path, &namespaced_remote_file_path)
632 .await?;
633
634 self.run_command(
635 RunCommandOptions::new("chmod")
636 .args(vec![mode, &namespaced_remote_file_path.to_string_lossy()]),
637 )
638 .await?
639 .map_err(|(_, err)| {
640 ProviderError::SendFile(
641 self.name.clone(),
642 local_file_path.to_string_lossy().to_string(),
643 anyhow!("{err}"),
644 )
645 })?;
646
647 Ok(())
648 }
649
650 async fn receive_file(
651 &self,
652 remote_file_path: &Path,
653 local_file_path: &Path,
654 ) -> Result<(), ProviderError> {
655 let namespaced_remote_file_path = PathBuf::from(format!(
656 "{}{}",
657 &self.base_dir.to_string_lossy(),
658 remote_file_path.to_string_lossy()
659 ));
660
661 self.filesystem
662 .copy(namespaced_remote_file_path, local_file_path)
663 .await?;
664
665 Ok(())
666 }
667
668 async fn pause(&self) -> Result<(), ProviderError> {
669 let process_id = self.process_id()?;
670
671 kill(process_id, Signal::SIGSTOP)
672 .map_err(|err| ProviderError::PauseNodeFailed(self.name.clone(), err.into()))?;
673
674 Ok(())
675 }
676
677 async fn resume(&self) -> Result<(), ProviderError> {
678 let process_id = self.process_id()?;
679
680 nix::sys::signal::kill(process_id, Signal::SIGCONT)
681 .map_err(|err| ProviderError::ResumeNodeFailed(self.name.clone(), err.into()))?;
682
683 Ok(())
684 }
685
686 async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
687 if let Some(duration) = after {
688 sleep(duration).await;
689 }
690
691 self.abort()
692 .await
693 .map_err(|err| ProviderError::RestartNodeFailed(self.name.clone(), err))?;
694
695 let (stdout, stderr) = self
696 .initialize_process()
697 .await
698 .map_err(|err| ProviderError::RestartNodeFailed(self.name.clone(), err.into()))?;
699
700 self.initialize_log_writing(stdout, stderr).await;
701
702 Ok(())
703 }
704
705 async fn destroy(&self) -> Result<(), ProviderError> {
706 self.abort()
707 .await
708 .map_err(|err| ProviderError::DestroyNodeFailed(self.name.clone(), err))?;
709
710 if let Some(namespace) = self.namespace.upgrade() {
711 namespace.nodes.write().await.remove(&self.name);
712 }
713
714 Ok(())
715 }
716}
717
718fn serialize_process_handle<S>(
719 process_handle: &std::sync::RwLock<Option<ProcessHandle>>,
720 serializer: S,
721) -> Result<S::Ok, S::Error>
722where
723 S: Serializer,
724{
725 let pid = process_handle
726 .read()
727 .map_err(|_e| S::Error::custom("failed to acquire read lock"))?
728 .as_ref()
729 .map(|handle| match handle {
730 ProcessHandle::Spawned(_, pid) => pid.as_raw(),
731 ProcessHandle::Attached(pid) => pid.as_raw(),
732 });
733 pid.serialize(serializer)
734}