1use super::{fs_write_atomic, CacheConfig};
9use log::{debug, info, trace, warn};
10use serde::{Deserialize, Serialize};
11use std::cmp;
12use std::collections::HashMap;
13use std::ffi::OsStr;
14use std::fmt;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
18#[cfg(test)]
19use std::sync::{Arc, Condvar, Mutex};
20use std::thread;
21use std::time::Duration;
22#[cfg(not(test))]
23use std::time::SystemTime;
24#[cfg(test)]
25use tests::system_time_stub::SystemTimeStub as SystemTime;
26
27#[derive(Clone)]
28pub(super) struct Worker {
29 sender: SyncSender<CacheEvent>,
30 #[cfg(test)]
31 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
32}
33
34struct WorkerThread {
35 receiver: Receiver<CacheEvent>,
36 cache_config: CacheConfig,
37 #[cfg(test)]
38 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
39}
40
41#[cfg(test)]
42#[derive(Default)]
43struct WorkerStats {
44 dropped: u32,
45 sent: u32,
46 handled: u32,
47}
48
49#[derive(Debug, Clone)]
50enum CacheEvent {
51 OnCacheGet(PathBuf),
52 OnCacheUpdate(PathBuf),
53}
54
55impl Worker {
56 pub(super) fn start_new(
57 cache_config: &CacheConfig,
58 init_file_per_thread_logger: Option<&'static str>,
59 ) -> Self {
60 let queue_size = match cache_config.worker_event_queue_size() {
61 num if num <= usize::max_value() as u64 => num as usize,
62 _ => usize::max_value(),
63 };
64 let (tx, rx) = sync_channel(queue_size);
65
66 #[cfg(test)]
67 let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
68
69 let worker_thread = WorkerThread {
70 receiver: rx,
71 cache_config: cache_config.clone(),
72 #[cfg(test)]
73 stats: stats.clone(),
74 };
75
76 thread::spawn(move || worker_thread.run(init_file_per_thread_logger));
80
81 Self {
82 sender: tx,
83 #[cfg(test)]
84 stats,
85 }
86 }
87
88 pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
89 let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
90 self.send_cache_event(event);
91 }
92
93 pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
94 let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
95 self.send_cache_event(event);
96 }
97
98 #[inline]
99 fn send_cache_event(&self, event: CacheEvent) {
100 let sent_event = self.sender.try_send(event.clone());
101
102 if let Err(ref err) = sent_event {
103 info!(
104 "Failed to send asynchronously message to worker thread, \
105 event: {:?}, error: {}",
106 event, err
107 );
108 }
109
110 #[cfg(test)]
111 {
112 let mut stats = self
113 .stats
114 .0
115 .lock()
116 .expect("Failed to acquire worker stats lock");
117
118 if sent_event.is_ok() {
119 stats.sent += 1;
120 } else {
121 stats.dropped += 1;
122 }
123 }
124 }
125
126 #[cfg(test)]
127 pub(super) fn events_dropped(&self) -> u32 {
128 let stats = self
129 .stats
130 .0
131 .lock()
132 .expect("Failed to acquire worker stats lock");
133 stats.dropped
134 }
135
136 #[cfg(test)]
137 pub(super) fn wait_for_all_events_handled(&self) {
138 let (stats, condvar) = &*self.stats;
139 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
140 while stats.handled != stats.sent {
141 stats = condvar
142 .wait(stats)
143 .expect("Failed to reacquire worker stats lock");
144 }
145 }
146}
147
148impl fmt::Debug for Worker {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("Worker").finish()
151 }
152}
153
154#[derive(Serialize, Deserialize)]
155struct ModuleCacheStatistics {
156 pub usages: u64,
157 #[serde(rename = "optimized-compression")]
158 pub compression_level: i32,
159}
160
161impl ModuleCacheStatistics {
162 fn default(cache_config: &CacheConfig) -> Self {
163 Self {
164 usages: 0,
165 compression_level: cache_config.baseline_compression_level(),
166 }
167 }
168}
169
170enum CacheEntry {
171 Recognized {
172 path: PathBuf,
173 mtime: SystemTime,
174 size: u64,
175 },
176 Unrecognized {
177 path: PathBuf,
178 is_dir: bool,
179 },
180}
181
182macro_rules! unwrap_or_warn {
183 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
184 match $result {
185 Ok(val) => val,
186 Err(err) => {
187 warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
188 $cont
189 }
190 }
191 };
192}
193
194impl WorkerThread {
195 fn run(self, init_file_per_thread_logger: Option<&'static str>) {
196 if let Some(prefix) = init_file_per_thread_logger {
197 file_per_thread_logger::initialize(prefix);
198 }
199
200 debug!("Cache worker thread started.");
201
202 Self::lower_thread_priority();
203
204 #[cfg(test)]
205 let (stats, condvar) = &*self.stats;
206
207 for event in self.receiver.iter() {
208 match event {
209 CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
210 CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
211 }
212
213 #[cfg(test)]
214 {
215 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
216 stats.handled += 1;
217 condvar.notify_all();
218 }
219 }
220 }
221
222 #[cfg(target_os = "fuchsia")]
223 fn lower_thread_priority() {
224 warn!(
227 "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
228 );
229 }
230
231 #[cfg(target_os = "windows")]
232 fn lower_thread_priority() {
233 use windows_sys::Win32::System::Threading::*;
234
235 if unsafe {
239 SetThreadPriority(
240 GetCurrentThread(),
241 THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
242 )
243 } == 0
244 {
245 warn!(
246 "Failed to lower worker thread priority. It might affect application performance."
247 );
248 }
249 }
250
251 #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
252 fn lower_thread_priority() {
253 const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
256
257 match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
258 Ok(current_nice) => {
259 debug!("New nice value of worker thread: {}", current_nice);
260 }
261 Err(err) => {
262 warn!(
263 "Failed to lower worker thread priority ({:?}). It might affect application performance.", err);
264 }
265 };
266 }
267
268 fn handle_on_cache_get(&self, path: PathBuf) {
271 trace!("handle_on_cache_get() for path: {}", path.display());
272
273 let filename = path.file_name().unwrap().to_str().unwrap();
275 let stats_path = path.with_file_name(format!("{}.stats", filename));
276
277 let mut stats = read_stats_file(stats_path.as_ref())
279 .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
280
281 stats.usages += 1;
285 if !write_stats_file(stats_path.as_ref(), &stats) {
286 return;
287 }
288
289 let opt_compr_lvl = self.cache_config.optimized_compression_level();
291 if stats.compression_level >= opt_compr_lvl
292 || stats.usages
293 < self
294 .cache_config
295 .optimized_compression_usage_counter_threshold()
296 {
297 return;
298 }
299
300 let lock_path = if let Some(p) = acquire_task_fs_lock(
301 path.as_ref(),
302 self.cache_config.optimizing_compression_task_timeout(),
303 self.cache_config
304 .allowed_clock_drift_for_files_from_future(),
305 ) {
306 p
307 } else {
308 return;
309 };
310
311 trace!("Trying to recompress file: {}", path.display());
312
313 let compressed_cache_bytes = unwrap_or_warn!(
316 fs::read(&path),
317 return,
318 "Failed to read old cache file",
319 path
320 );
321
322 let cache_bytes = unwrap_or_warn!(
323 zstd::decode_all(&compressed_cache_bytes[..]),
324 return,
325 "Failed to decompress cached code",
326 path
327 );
328
329 let recompressed_cache_bytes = unwrap_or_warn!(
330 zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
331 return,
332 "Failed to compress cached code",
333 path
334 );
335
336 unwrap_or_warn!(
337 fs::write(&lock_path, &recompressed_cache_bytes),
338 return,
339 "Failed to write recompressed cache",
340 lock_path
341 );
342
343 unwrap_or_warn!(
344 fs::rename(&lock_path, &path),
345 {
346 if let Err(error) = fs::remove_file(&lock_path) {
347 warn!(
348 "Failed to clean up (remove) recompressed cache, path {}, err: {}",
349 lock_path.display(),
350 error
351 );
352 }
353
354 return;
355 },
356 "Failed to rename recompressed cache",
357 lock_path
358 );
359
360 if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
362 if new_stats.compression_level >= opt_compr_lvl {
363 debug!(
369 "DETECTED task did more than once (or race with new file): \
370 recompression of {}. Note: if optimized compression level setting \
371 has changed in the meantine, the stats file might contain \
372 inconsistent compression level due to race.",
373 path.display()
374 );
375 } else {
376 new_stats.compression_level = opt_compr_lvl;
377 let _ = write_stats_file(stats_path.as_ref(), &new_stats);
378 }
379
380 if new_stats.usages < stats.usages {
381 debug!(
382 "DETECTED lower usage count (new file or race with counter \
383 increasing): file {}",
384 path.display()
385 );
386 }
387 } else {
388 debug!(
389 "Can't read stats file again to update compression level (it might got \
390 cleaned up): file {}",
391 stats_path.display()
392 );
393 }
394
395 trace!("Task finished: recompress file: {}", path.display());
396 }
397
398 fn handle_on_cache_update(&self, path: PathBuf) {
399 trace!("handle_on_cache_update() for path: {}", path.display());
400
401 let filename = path
405 .file_name()
406 .expect("Expected valid cache file name")
407 .to_str()
408 .expect("Expected valid cache file name");
409 let stats_path = path.with_file_name(format!("{}.stats", filename));
410
411 let mut stats = ModuleCacheStatistics::default(&self.cache_config);
413 stats.usages += 1;
414 write_stats_file(&stats_path, &stats);
415
416 let cleanup_file = self.cache_config.directory().join(".cleanup"); if acquire_task_fs_lock(
423 &cleanup_file,
424 self.cache_config.cleanup_interval(),
425 self.cache_config
426 .allowed_clock_drift_for_files_from_future(),
427 )
428 .is_none()
429 {
430 return;
431 }
432
433 trace!("Trying to clean up cache");
434
435 let mut cache_index = self.list_cache_contents();
436 let future_tolerance = SystemTime::now()
437 .checked_add(
438 self.cache_config
439 .allowed_clock_drift_for_files_from_future(),
440 )
441 .expect("Brace your cache, the next Big Bang is coming (time overflow)");
442 cache_index.sort_unstable_by(|lhs, rhs| {
443 use CacheEntry::*;
445 match (lhs, rhs) {
446 (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
447 match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
448 (false, false) => rhs_mt.cmp(lhs_mt),
450 (true, false) => cmp::Ordering::Greater,
455 (false, true) => cmp::Ordering::Less,
456 (true, true) => cmp::Ordering::Equal,
457 }
458 }
459 (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
461 (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
462 (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
463 }
464 });
465
466 let mut total_size = 0u64;
470 let mut start_delete_idx = None;
471 let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
472
473 let total_size_limit = self.cache_config.files_total_size_soft_limit();
474 let file_count_limit = self.cache_config.file_count_soft_limit();
475 let tsl_if_deleting = total_size_limit
476 .checked_mul(
477 self.cache_config
478 .files_total_size_limit_percent_if_deleting() as u64,
479 )
480 .unwrap()
481 / 100;
482 let fcl_if_deleting = file_count_limit
483 .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
484 .unwrap()
485 / 100;
486
487 for (idx, item) in cache_index.iter().enumerate() {
488 let size = if let CacheEntry::Recognized { size, .. } = item {
489 size
490 } else {
491 start_delete_idx = Some(idx);
492 break;
493 };
494
495 total_size += size;
496 if start_delete_idx_if_deleting_recognized_items.is_none()
497 && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
498 {
499 start_delete_idx_if_deleting_recognized_items = Some(idx);
500 }
501
502 if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
503 start_delete_idx = start_delete_idx_if_deleting_recognized_items;
504 break;
505 }
506 }
507
508 if let Some(idx) = start_delete_idx {
509 for item in &cache_index[idx..] {
510 let (result, path, entity) = match item {
511 CacheEntry::Recognized { path, .. }
512 | CacheEntry::Unrecognized {
513 path,
514 is_dir: false,
515 } => (fs::remove_file(path), path, "file"),
516 CacheEntry::Unrecognized { path, is_dir: true } => {
517 (fs::remove_dir_all(path), path, "directory")
518 }
519 };
520 if let Err(err) = result {
521 warn!(
522 "Failed to remove {} during cleanup, path: {}, err: {}",
523 entity,
524 path.display(),
525 err
526 );
527 }
528 }
529 }
530
531 trace!("Task finished: clean up cache");
532 }
533
534 fn list_cache_contents(&self) -> Vec<CacheEntry> {
536 fn enter_dir(
537 vec: &mut Vec<CacheEntry>,
538 dir_path: &Path,
539 level: u8,
540 cache_config: &CacheConfig,
541 ) {
542 macro_rules! add_unrecognized {
543 (file: $path:expr) => {
544 add_unrecognized!(false, $path)
545 };
546 (dir: $path:expr) => {
547 add_unrecognized!(true, $path)
548 };
549 ($is_dir:expr, $path:expr) => {
550 vec.push(CacheEntry::Unrecognized {
551 path: $path.to_path_buf(),
552 is_dir: $is_dir,
553 })
554 };
555 }
556 macro_rules! add_unrecognized_and {
557 ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
558 $( add_unrecognized!($ty: $path); )*
559 $cont
560 }};
561 }
562
563 macro_rules! unwrap_or {
564 ($result:expr, $cont:stmt, $err_msg:expr) => {
565 unwrap_or!($result, $cont, $err_msg, dir_path)
566 };
567 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
568 unwrap_or_warn!(
569 $result,
570 $cont,
571 format!("{}, level: {}", $err_msg, level),
572 $path
573 )
574 };
575 }
576
577 let it = unwrap_or!(
582 fs::read_dir(dir_path),
583 add_unrecognized_and!([dir: dir_path], return),
584 "Failed to list cache directory, deleting it"
585 );
586
587 let mut cache_files = HashMap::new();
588 for entry in it {
589 let entry = unwrap_or!(
593 entry,
594 continue,
595 "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
596 );
597 let path = entry.path();
598 match (level, path.is_dir()) {
599 (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
600 (0..=1, false) => {
601 if level == 0
602 && path.file_stem() == Some(OsStr::new(".cleanup"))
603 && path.extension().is_some()
604 && !is_fs_lock_expired(
606 Some(&entry),
607 &path,
608 cache_config.cleanup_interval(),
609 cache_config.allowed_clock_drift_for_files_from_future(),
610 )
611 {
612 continue; }
614 add_unrecognized!(file: path);
615 }
616 (2, false) => {
617 match path.extension().and_then(OsStr::to_str) {
618 None | Some("stats") => {
620 cache_files.insert(path, entry);
621 }
622
623 Some(ext) => {
624 let recognized = ext.starts_with("wip-")
626 && !is_fs_lock_expired(
627 Some(&entry),
628 &path,
629 cache_config.optimizing_compression_task_timeout(),
630 cache_config.allowed_clock_drift_for_files_from_future(),
631 );
632
633 if !recognized {
634 add_unrecognized!(file: path);
635 }
636 }
637 }
638 }
639 (_, is_dir) => add_unrecognized!(is_dir, path),
640 }
641 }
642
643 for (path, entry) in cache_files.iter() {
646 let path_buf: PathBuf;
647 let (mod_, stats_, is_mod) = match path.extension() {
648 Some(_) => {
649 path_buf = path.with_extension("");
650 (
651 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
652 Some((path, entry)),
653 false,
654 )
655 }
656 None => {
657 path_buf = path.with_extension("stats");
658 (
659 Some((path, entry)),
660 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
661 true,
662 )
663 }
664 };
665
666 match (mod_, stats_, is_mod) {
668 (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
669 let mod_metadata = unwrap_or!(
670 mod_entry.metadata(),
671 add_unrecognized_and!([file: stats_path, file: mod_path], continue),
672 "Failed to get metadata, deleting BOTH module cache and stats files",
673 mod_path
674 );
675 let stats_mtime = unwrap_or!(
676 stats_entry.metadata().and_then(|m| m.modified()),
677 add_unrecognized_and!(
678 [file: stats_path],
679 unwrap_or!(
680 mod_metadata.modified(),
681 add_unrecognized_and!(
682 [file: stats_path, file: mod_path],
683 continue
684 ),
685 "Failed to get mtime, deleting BOTH module cache and stats \
686 files",
687 mod_path
688 )
689 ),
690 "Failed to get metadata/mtime, deleting the file",
691 stats_path
692 );
693 #[allow(clippy::identity_conversion)]
695 vec.push(CacheEntry::Recognized {
696 path: mod_path.to_path_buf(),
697 mtime: stats_mtime.into(),
698 size: mod_metadata.len(),
699 })
700 }
701 (Some(_), Some(_), false) => (), (Some((mod_path, mod_entry)), None, _) => {
703 let (mod_metadata, mod_mtime) = unwrap_or!(
704 mod_entry
705 .metadata()
706 .and_then(|md| md.modified().map(|mt| (md, mt))),
707 add_unrecognized_and!([file: mod_path], continue),
708 "Failed to get metadata/mtime, deleting the file",
709 mod_path
710 );
711 #[allow(clippy::identity_conversion)]
713 vec.push(CacheEntry::Recognized {
714 path: mod_path.to_path_buf(),
715 mtime: mod_mtime.into(),
716 size: mod_metadata.len(),
717 })
718 }
719 (None, Some((stats_path, _stats_entry)), _) => {
720 debug!("Found orphaned stats file: {}", stats_path.display());
721 add_unrecognized!(file: stats_path);
722 }
723 _ => unreachable!(),
724 }
725 }
726 }
727
728 let mut vec = Vec::new();
729 enter_dir(
730 &mut vec,
731 self.cache_config.directory(),
732 0,
733 &self.cache_config,
734 );
735 vec
736 }
737}
738
739fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
740 fs::read(path)
741 .map_err(|err| {
742 trace!(
743 "Failed to read stats file, path: {}, err: {}",
744 path.display(),
745 err
746 )
747 })
748 .and_then(|bytes| {
749 toml::from_slice::<ModuleCacheStatistics>(&bytes[..]).map_err(|err| {
750 trace!(
751 "Failed to parse stats file, path: {}, err: {}",
752 path.display(),
753 err,
754 )
755 })
756 })
757 .ok()
758}
759
760fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
761 toml::to_string_pretty(&stats)
762 .map_err(|err| {
763 warn!(
764 "Failed to serialize stats file, path: {}, err: {}",
765 path.display(),
766 err
767 )
768 })
769 .and_then(|serialized| {
770 if fs_write_atomic(path, "stats", serialized.as_bytes()) {
771 Ok(())
772 } else {
773 Err(())
774 }
775 })
776 .is_ok()
777}
778
779fn acquire_task_fs_lock(
791 task_path: &Path,
792 timeout: Duration,
793 allowed_future_drift: Duration,
794) -> Option<PathBuf> {
795 assert!(task_path.extension().is_none());
796 assert!(task_path.file_stem().is_some());
797
798 let dir_path = task_path.parent()?;
800 let it = fs::read_dir(dir_path)
801 .map_err(|err| {
802 warn!(
803 "Failed to list cache directory, path: {}, err: {}",
804 dir_path.display(),
805 err
806 )
807 })
808 .ok()?;
809
810 for entry in it {
812 let entry = entry
813 .map_err(|err| {
814 warn!(
815 "Failed to list cache directory, path: {}, err: {}",
816 dir_path.display(),
817 err
818 )
819 })
820 .ok()?;
821
822 let path = entry.path();
823 if path.is_dir() || path.file_stem() != task_path.file_stem() {
824 continue;
825 }
826
827 match path.extension() {
829 None => continue,
830 Some(ext) => {
831 if let Some(ext_str) = ext.to_str() {
832 if ext_str.starts_with("wip-")
834 && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
835 {
836 return None;
837 }
838 }
839 }
840 }
841 }
842
843 let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
845 let _file = fs::OpenOptions::new()
846 .create_new(true)
847 .write(true)
848 .open(&lock_path)
849 .map_err(|err| {
850 warn!(
851 "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
852 lock_path.display(),
853 err
854 )
855 })
856 .ok()?;
857
858 Some(lock_path)
859}
860
861fn is_fs_lock_expired(
865 entry: Option<&fs::DirEntry>,
866 path: &PathBuf,
867 threshold: Duration,
868 allowed_future_drift: Duration,
869) -> bool {
870 let mtime = match entry
871 .map_or_else(|| path.metadata(), |e| e.metadata())
872 .and_then(|metadata| metadata.modified())
873 {
874 Ok(mt) => mt,
875 Err(err) => {
876 warn!(
877 "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
878 path.display(),
879 err
880 );
881 return true; }
883 };
884
885 match SystemTime::now().duration_since(mtime) {
887 Ok(elapsed) => elapsed >= threshold,
888 Err(err) => {
889 trace!(
890 "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
891 path.display(),
892 err
893 );
894 err.duration() > allowed_future_drift
898 }
899 }
900}
901
902#[cfg(test)]
903mod tests;