wasmtime_cache/
worker.rs

1//! Background worker that watches over the cache.
2//!
3//! It cleans up old cache, updates statistics and optimizes the cache.
4//! We allow losing some messages (it doesn't hurt) and some races,
5//! but we guarantee eventual consistency and fault tolerancy.
6//! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8use super::{fs_write_atomic, CacheConfig};
9use log::{debug, info, trace, warn};
10use serde::{Deserialize, Serialize};
11use std::cmp;
12use std::collections::HashMap;
13use std::ffi::OsStr;
14use std::fmt;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
18#[cfg(test)]
19use std::sync::{Arc, Condvar, Mutex};
20use std::thread;
21use std::time::Duration;
22#[cfg(not(test))]
23use std::time::SystemTime;
24#[cfg(test)]
25use tests::system_time_stub::SystemTimeStub as SystemTime;
26
27#[derive(Clone)]
28pub(super) struct Worker {
29    sender: SyncSender<CacheEvent>,
30    #[cfg(test)]
31    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
32}
33
34struct WorkerThread {
35    receiver: Receiver<CacheEvent>,
36    cache_config: CacheConfig,
37    #[cfg(test)]
38    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
39}
40
41#[cfg(test)]
42#[derive(Default)]
43struct WorkerStats {
44    dropped: u32,
45    sent: u32,
46    handled: u32,
47}
48
49#[derive(Debug, Clone)]
50enum CacheEvent {
51    OnCacheGet(PathBuf),
52    OnCacheUpdate(PathBuf),
53}
54
55impl Worker {
56    pub(super) fn start_new(
57        cache_config: &CacheConfig,
58        init_file_per_thread_logger: Option<&'static str>,
59    ) -> Self {
60        let queue_size = match cache_config.worker_event_queue_size() {
61            num if num <= usize::max_value() as u64 => num as usize,
62            _ => usize::max_value(),
63        };
64        let (tx, rx) = sync_channel(queue_size);
65
66        #[cfg(test)]
67        let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
68
69        let worker_thread = WorkerThread {
70            receiver: rx,
71            cache_config: cache_config.clone(),
72            #[cfg(test)]
73            stats: stats.clone(),
74        };
75
76        // when self is dropped, sender will be dropped, what will cause the channel
77        // to hang, and the worker thread to exit -- it happens in the tests
78        // non-tests binary has only a static worker, so Rust doesn't drop it
79        thread::spawn(move || worker_thread.run(init_file_per_thread_logger));
80
81        Self {
82            sender: tx,
83            #[cfg(test)]
84            stats,
85        }
86    }
87
88    pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
89        let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
90        self.send_cache_event(event);
91    }
92
93    pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
94        let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
95        self.send_cache_event(event);
96    }
97
98    #[inline]
99    fn send_cache_event(&self, event: CacheEvent) {
100        let sent_event = self.sender.try_send(event.clone());
101
102        if let Err(ref err) = sent_event {
103            info!(
104                "Failed to send asynchronously message to worker thread, \
105                 event: {:?}, error: {}",
106                event, err
107            );
108        }
109
110        #[cfg(test)]
111        {
112            let mut stats = self
113                .stats
114                .0
115                .lock()
116                .expect("Failed to acquire worker stats lock");
117
118            if sent_event.is_ok() {
119                stats.sent += 1;
120            } else {
121                stats.dropped += 1;
122            }
123        }
124    }
125
126    #[cfg(test)]
127    pub(super) fn events_dropped(&self) -> u32 {
128        let stats = self
129            .stats
130            .0
131            .lock()
132            .expect("Failed to acquire worker stats lock");
133        stats.dropped
134    }
135
136    #[cfg(test)]
137    pub(super) fn wait_for_all_events_handled(&self) {
138        let (stats, condvar) = &*self.stats;
139        let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
140        while stats.handled != stats.sent {
141            stats = condvar
142                .wait(stats)
143                .expect("Failed to reacquire worker stats lock");
144        }
145    }
146}
147
148impl fmt::Debug for Worker {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("Worker").finish()
151    }
152}
153
154#[derive(Serialize, Deserialize)]
155struct ModuleCacheStatistics {
156    pub usages: u64,
157    #[serde(rename = "optimized-compression")]
158    pub compression_level: i32,
159}
160
161impl ModuleCacheStatistics {
162    fn default(cache_config: &CacheConfig) -> Self {
163        Self {
164            usages: 0,
165            compression_level: cache_config.baseline_compression_level(),
166        }
167    }
168}
169
170enum CacheEntry {
171    Recognized {
172        path: PathBuf,
173        mtime: SystemTime,
174        size: u64,
175    },
176    Unrecognized {
177        path: PathBuf,
178        is_dir: bool,
179    },
180}
181
182macro_rules! unwrap_or_warn {
183    ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
184        match $result {
185            Ok(val) => val,
186            Err(err) => {
187                warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
188                $cont
189            }
190        }
191    };
192}
193
194impl WorkerThread {
195    fn run(self, init_file_per_thread_logger: Option<&'static str>) {
196        if let Some(prefix) = init_file_per_thread_logger {
197            file_per_thread_logger::initialize(prefix);
198        }
199
200        debug!("Cache worker thread started.");
201
202        Self::lower_thread_priority();
203
204        #[cfg(test)]
205        let (stats, condvar) = &*self.stats;
206
207        for event in self.receiver.iter() {
208            match event {
209                CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
210                CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
211            }
212
213            #[cfg(test)]
214            {
215                let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
216                stats.handled += 1;
217                condvar.notify_all();
218            }
219        }
220    }
221
222    #[cfg(target_os = "fuchsia")]
223    fn lower_thread_priority() {
224        // TODO This needs to use Fuchsia thread profiles
225        // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
226        warn!(
227            "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
228        );
229    }
230
231    #[cfg(target_os = "windows")]
232    fn lower_thread_priority() {
233        use windows_sys::Win32::System::Threading::*;
234
235        // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
236        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
237
238        if unsafe {
239            SetThreadPriority(
240                GetCurrentThread(),
241                THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
242            )
243        } == 0
244        {
245            warn!(
246                "Failed to lower worker thread priority. It might affect application performance."
247            );
248        }
249    }
250
251    #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
252    fn lower_thread_priority() {
253        // http://man7.org/linux/man-pages/man7/sched.7.html
254
255        const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
256
257        match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
258            Ok(current_nice) => {
259                debug!("New nice value of worker thread: {}", current_nice);
260            }
261            Err(err) => {
262                warn!(
263                    "Failed to lower worker thread priority ({:?}). It might affect application performance.", err);
264            }
265        };
266    }
267
268    /// Increases the usage counter and recompresses the file
269    /// if the usage counter reached configurable treshold.
270    fn handle_on_cache_get(&self, path: PathBuf) {
271        trace!("handle_on_cache_get() for path: {}", path.display());
272
273        // construct .stats file path
274        let filename = path.file_name().unwrap().to_str().unwrap();
275        let stats_path = path.with_file_name(format!("{}.stats", filename));
276
277        // load .stats file (default if none or error)
278        let mut stats = read_stats_file(stats_path.as_ref())
279            .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
280
281        // step 1: update the usage counter & write to the disk
282        //         it's racy, but it's fine (the counter will be just smaller,
283        //         sometimes will retrigger recompression)
284        stats.usages += 1;
285        if !write_stats_file(stats_path.as_ref(), &stats) {
286            return;
287        }
288
289        // step 2: recompress if there's a need
290        let opt_compr_lvl = self.cache_config.optimized_compression_level();
291        if stats.compression_level >= opt_compr_lvl
292            || stats.usages
293                < self
294                    .cache_config
295                    .optimized_compression_usage_counter_threshold()
296        {
297            return;
298        }
299
300        let lock_path = if let Some(p) = acquire_task_fs_lock(
301            path.as_ref(),
302            self.cache_config.optimizing_compression_task_timeout(),
303            self.cache_config
304                .allowed_clock_drift_for_files_from_future(),
305        ) {
306            p
307        } else {
308            return;
309        };
310
311        trace!("Trying to recompress file: {}", path.display());
312
313        // recompress, write to other file, rename (it's atomic file content exchange)
314        // and update the stats file
315        let compressed_cache_bytes = unwrap_or_warn!(
316            fs::read(&path),
317            return,
318            "Failed to read old cache file",
319            path
320        );
321
322        let cache_bytes = unwrap_or_warn!(
323            zstd::decode_all(&compressed_cache_bytes[..]),
324            return,
325            "Failed to decompress cached code",
326            path
327        );
328
329        let recompressed_cache_bytes = unwrap_or_warn!(
330            zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
331            return,
332            "Failed to compress cached code",
333            path
334        );
335
336        unwrap_or_warn!(
337            fs::write(&lock_path, &recompressed_cache_bytes),
338            return,
339            "Failed to write recompressed cache",
340            lock_path
341        );
342
343        unwrap_or_warn!(
344            fs::rename(&lock_path, &path),
345            {
346                if let Err(error) = fs::remove_file(&lock_path) {
347                    warn!(
348                        "Failed to clean up (remove) recompressed cache, path {}, err: {}",
349                        lock_path.display(),
350                        error
351                    );
352                }
353
354                return;
355            },
356            "Failed to rename recompressed cache",
357            lock_path
358        );
359
360        // update stats file (reload it! recompression can take some time)
361        if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
362            if new_stats.compression_level >= opt_compr_lvl {
363                // Rare race:
364                //    two instances with different opt_compr_lvl: we don't know in which order they updated
365                //    the cache file and the stats file (they are not updated together atomically)
366                // Possible solution is to use directories per cache entry, but it complicates the system
367                // and is not worth it.
368                debug!(
369                    "DETECTED task did more than once (or race with new file): \
370                     recompression of {}. Note: if optimized compression level setting \
371                     has changed in the meantine, the stats file might contain \
372                     inconsistent compression level due to race.",
373                    path.display()
374                );
375            } else {
376                new_stats.compression_level = opt_compr_lvl;
377                let _ = write_stats_file(stats_path.as_ref(), &new_stats);
378            }
379
380            if new_stats.usages < stats.usages {
381                debug!(
382                    "DETECTED lower usage count (new file or race with counter \
383                     increasing): file {}",
384                    path.display()
385                );
386            }
387        } else {
388            debug!(
389                "Can't read stats file again to update compression level (it might got \
390                 cleaned up): file {}",
391                stats_path.display()
392            );
393        }
394
395        trace!("Task finished: recompress file: {}", path.display());
396    }
397
398    fn handle_on_cache_update(&self, path: PathBuf) {
399        trace!("handle_on_cache_update() for path: {}", path.display());
400
401        // ---------------------- step 1: create .stats file
402
403        // construct .stats file path
404        let filename = path
405            .file_name()
406            .expect("Expected valid cache file name")
407            .to_str()
408            .expect("Expected valid cache file name");
409        let stats_path = path.with_file_name(format!("{}.stats", filename));
410
411        // create and write stats file
412        let mut stats = ModuleCacheStatistics::default(&self.cache_config);
413        stats.usages += 1;
414        write_stats_file(&stats_path, &stats);
415
416        // ---------------------- step 2: perform cleanup task if needed
417
418        // acquire lock for cleanup task
419        // Lock is a proof of recent cleanup task, so we don't want to delete them.
420        // Expired locks will be deleted by the cleanup task.
421        let cleanup_file = self.cache_config.directory().join(".cleanup"); // some non existing marker file
422        if acquire_task_fs_lock(
423            &cleanup_file,
424            self.cache_config.cleanup_interval(),
425            self.cache_config
426                .allowed_clock_drift_for_files_from_future(),
427        )
428        .is_none()
429        {
430            return;
431        }
432
433        trace!("Trying to clean up cache");
434
435        let mut cache_index = self.list_cache_contents();
436        let future_tolerance = SystemTime::now()
437            .checked_add(
438                self.cache_config
439                    .allowed_clock_drift_for_files_from_future(),
440            )
441            .expect("Brace your cache, the next Big Bang is coming (time overflow)");
442        cache_index.sort_unstable_by(|lhs, rhs| {
443            // sort by age
444            use CacheEntry::*;
445            match (lhs, rhs) {
446                (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
447                    match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
448                        // later == younger
449                        (false, false) => rhs_mt.cmp(lhs_mt),
450                        // files from far future are treated as oldest recognized files
451                        // we want to delete them, so the cache keeps track of recent files
452                        // however, we don't delete them uncodintionally,
453                        // because .stats file can be overwritten with a meaningful mtime
454                        (true, false) => cmp::Ordering::Greater,
455                        (false, true) => cmp::Ordering::Less,
456                        (true, true) => cmp::Ordering::Equal,
457                    }
458                }
459                // unrecognized is kind of infinity
460                (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
461                (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
462                (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
463            }
464        });
465
466        // find "cut" boundary:
467        // - remove unrecognized files anyway,
468        // - remove some cache files if some quota has been exceeded
469        let mut total_size = 0u64;
470        let mut start_delete_idx = None;
471        let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
472
473        let total_size_limit = self.cache_config.files_total_size_soft_limit();
474        let file_count_limit = self.cache_config.file_count_soft_limit();
475        let tsl_if_deleting = total_size_limit
476            .checked_mul(
477                self.cache_config
478                    .files_total_size_limit_percent_if_deleting() as u64,
479            )
480            .unwrap()
481            / 100;
482        let fcl_if_deleting = file_count_limit
483            .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
484            .unwrap()
485            / 100;
486
487        for (idx, item) in cache_index.iter().enumerate() {
488            let size = if let CacheEntry::Recognized { size, .. } = item {
489                size
490            } else {
491                start_delete_idx = Some(idx);
492                break;
493            };
494
495            total_size += size;
496            if start_delete_idx_if_deleting_recognized_items.is_none()
497                && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
498            {
499                start_delete_idx_if_deleting_recognized_items = Some(idx);
500            }
501
502            if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
503                start_delete_idx = start_delete_idx_if_deleting_recognized_items;
504                break;
505            }
506        }
507
508        if let Some(idx) = start_delete_idx {
509            for item in &cache_index[idx..] {
510                let (result, path, entity) = match item {
511                    CacheEntry::Recognized { path, .. }
512                    | CacheEntry::Unrecognized {
513                        path,
514                        is_dir: false,
515                    } => (fs::remove_file(path), path, "file"),
516                    CacheEntry::Unrecognized { path, is_dir: true } => {
517                        (fs::remove_dir_all(path), path, "directory")
518                    }
519                };
520                if let Err(err) = result {
521                    warn!(
522                        "Failed to remove {} during cleanup, path: {}, err: {}",
523                        entity,
524                        path.display(),
525                        err
526                    );
527                }
528            }
529        }
530
531        trace!("Task finished: clean up cache");
532    }
533
534    // Be fault tolerant: list as much as you can, and ignore the rest
535    fn list_cache_contents(&self) -> Vec<CacheEntry> {
536        fn enter_dir(
537            vec: &mut Vec<CacheEntry>,
538            dir_path: &Path,
539            level: u8,
540            cache_config: &CacheConfig,
541        ) {
542            macro_rules! add_unrecognized {
543                (file: $path:expr) => {
544                    add_unrecognized!(false, $path)
545                };
546                (dir: $path:expr) => {
547                    add_unrecognized!(true, $path)
548                };
549                ($is_dir:expr, $path:expr) => {
550                    vec.push(CacheEntry::Unrecognized {
551                        path: $path.to_path_buf(),
552                        is_dir: $is_dir,
553                    })
554                };
555            }
556            macro_rules! add_unrecognized_and {
557                ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
558                    $( add_unrecognized!($ty: $path); )*
559                        $cont
560                }};
561            }
562
563            macro_rules! unwrap_or {
564                ($result:expr, $cont:stmt, $err_msg:expr) => {
565                    unwrap_or!($result, $cont, $err_msg, dir_path)
566                };
567                ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
568                    unwrap_or_warn!(
569                        $result,
570                        $cont,
571                        format!("{}, level: {}", $err_msg, level),
572                        $path
573                    )
574                };
575            }
576
577            // If we fail to list a directory, something bad is happening anyway
578            // (something touches our cache or we have disk failure)
579            // Try to delete it, so we can stay within soft limits of the cache size.
580            // This comment applies later in this function, too.
581            let it = unwrap_or!(
582                fs::read_dir(dir_path),
583                add_unrecognized_and!([dir: dir_path], return),
584                "Failed to list cache directory, deleting it"
585            );
586
587            let mut cache_files = HashMap::new();
588            for entry in it {
589                // read_dir() returns an iterator over results - in case some of them are errors
590                // we don't know their names, so we can't delete them. We don't want to delete
591                // the whole directory with good entries too, so we just ignore the erroneous entries.
592                let entry = unwrap_or!(
593                    entry,
594                    continue,
595                    "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
596                );
597                let path = entry.path();
598                match (level, path.is_dir()) {
599                    (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
600                    (0..=1, false) => {
601                        if level == 0
602                            && path.file_stem() == Some(OsStr::new(".cleanup"))
603                                && path.extension().is_some()
604                                // assume it's cleanup lock
605                                && !is_fs_lock_expired(
606                                    Some(&entry),
607                                    &path,
608                                    cache_config.cleanup_interval(),
609                                    cache_config.allowed_clock_drift_for_files_from_future(),
610                                )
611                        {
612                            continue; // skip active lock
613                        }
614                        add_unrecognized!(file: path);
615                    }
616                    (2, false) => {
617                        match path.extension().and_then(OsStr::to_str) {
618                            // mod or stats file
619                            None | Some("stats") => {
620                                cache_files.insert(path, entry);
621                            }
622
623                            Some(ext) => {
624                                // check if valid lock
625                                let recognized = ext.starts_with("wip-")
626                                    && !is_fs_lock_expired(
627                                        Some(&entry),
628                                        &path,
629                                        cache_config.optimizing_compression_task_timeout(),
630                                        cache_config.allowed_clock_drift_for_files_from_future(),
631                                    );
632
633                                if !recognized {
634                                    add_unrecognized!(file: path);
635                                }
636                            }
637                        }
638                    }
639                    (_, is_dir) => add_unrecognized!(is_dir, path),
640                }
641            }
642
643            // associate module with its stats & handle them
644            // assumption: just mods and stats
645            for (path, entry) in cache_files.iter() {
646                let path_buf: PathBuf;
647                let (mod_, stats_, is_mod) = match path.extension() {
648                    Some(_) => {
649                        path_buf = path.with_extension("");
650                        (
651                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
652                            Some((path, entry)),
653                            false,
654                        )
655                    }
656                    None => {
657                        path_buf = path.with_extension("stats");
658                        (
659                            Some((path, entry)),
660                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
661                            true,
662                        )
663                    }
664                };
665
666                // construct a cache entry
667                match (mod_, stats_, is_mod) {
668                    (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
669                        let mod_metadata = unwrap_or!(
670                            mod_entry.metadata(),
671                            add_unrecognized_and!([file: stats_path, file: mod_path], continue),
672                            "Failed to get metadata, deleting BOTH module cache and stats files",
673                            mod_path
674                        );
675                        let stats_mtime = unwrap_or!(
676                            stats_entry.metadata().and_then(|m| m.modified()),
677                            add_unrecognized_and!(
678                                [file: stats_path],
679                                unwrap_or!(
680                                    mod_metadata.modified(),
681                                    add_unrecognized_and!(
682                                        [file: stats_path, file: mod_path],
683                                        continue
684                                    ),
685                                    "Failed to get mtime, deleting BOTH module cache and stats \
686                                     files",
687                                    mod_path
688                                )
689                            ),
690                            "Failed to get metadata/mtime, deleting the file",
691                            stats_path
692                        );
693                        // .into() called for the SystemTimeStub if cfg(test)
694                        #[allow(clippy::identity_conversion)]
695                        vec.push(CacheEntry::Recognized {
696                            path: mod_path.to_path_buf(),
697                            mtime: stats_mtime.into(),
698                            size: mod_metadata.len(),
699                        })
700                    }
701                    (Some(_), Some(_), false) => (), // was or will be handled by previous branch
702                    (Some((mod_path, mod_entry)), None, _) => {
703                        let (mod_metadata, mod_mtime) = unwrap_or!(
704                            mod_entry
705                                .metadata()
706                                .and_then(|md| md.modified().map(|mt| (md, mt))),
707                            add_unrecognized_and!([file: mod_path], continue),
708                            "Failed to get metadata/mtime, deleting the file",
709                            mod_path
710                        );
711                        // .into() called for the SystemTimeStub if cfg(test)
712                        #[allow(clippy::identity_conversion)]
713                        vec.push(CacheEntry::Recognized {
714                            path: mod_path.to_path_buf(),
715                            mtime: mod_mtime.into(),
716                            size: mod_metadata.len(),
717                        })
718                    }
719                    (None, Some((stats_path, _stats_entry)), _) => {
720                        debug!("Found orphaned stats file: {}", stats_path.display());
721                        add_unrecognized!(file: stats_path);
722                    }
723                    _ => unreachable!(),
724                }
725            }
726        }
727
728        let mut vec = Vec::new();
729        enter_dir(
730            &mut vec,
731            self.cache_config.directory(),
732            0,
733            &self.cache_config,
734        );
735        vec
736    }
737}
738
739fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
740    fs::read(path)
741        .map_err(|err| {
742            trace!(
743                "Failed to read stats file, path: {}, err: {}",
744                path.display(),
745                err
746            )
747        })
748        .and_then(|bytes| {
749            toml::from_slice::<ModuleCacheStatistics>(&bytes[..]).map_err(|err| {
750                trace!(
751                    "Failed to parse stats file, path: {}, err: {}",
752                    path.display(),
753                    err,
754                )
755            })
756        })
757        .ok()
758}
759
760fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
761    toml::to_string_pretty(&stats)
762        .map_err(|err| {
763            warn!(
764                "Failed to serialize stats file, path: {}, err: {}",
765                path.display(),
766                err
767            )
768        })
769        .and_then(|serialized| {
770            if fs_write_atomic(path, "stats", serialized.as_bytes()) {
771                Ok(())
772            } else {
773                Err(())
774            }
775        })
776        .is_ok()
777}
778
779/// Tries to acquire a lock for specific task.
780///
781/// Returns Some(path) to the lock if succeeds. The task path must not
782/// contain any extension and have file stem.
783///
784/// To release a lock you need either manually rename or remove it,
785/// or wait until it expires and cleanup task removes it.
786///
787/// Note: this function is racy. Main idea is: be fault tolerant and
788///       never block some task. The price is that we rarely do some task
789///       more than once.
790fn acquire_task_fs_lock(
791    task_path: &Path,
792    timeout: Duration,
793    allowed_future_drift: Duration,
794) -> Option<PathBuf> {
795    assert!(task_path.extension().is_none());
796    assert!(task_path.file_stem().is_some());
797
798    // list directory
799    let dir_path = task_path.parent()?;
800    let it = fs::read_dir(dir_path)
801        .map_err(|err| {
802            warn!(
803                "Failed to list cache directory, path: {}, err: {}",
804                dir_path.display(),
805                err
806            )
807        })
808        .ok()?;
809
810    // look for existing locks
811    for entry in it {
812        let entry = entry
813            .map_err(|err| {
814                warn!(
815                    "Failed to list cache directory, path: {}, err: {}",
816                    dir_path.display(),
817                    err
818                )
819            })
820            .ok()?;
821
822        let path = entry.path();
823        if path.is_dir() || path.file_stem() != task_path.file_stem() {
824            continue;
825        }
826
827        // check extension and mtime
828        match path.extension() {
829            None => continue,
830            Some(ext) => {
831                if let Some(ext_str) = ext.to_str() {
832                    // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
833                    if ext_str.starts_with("wip-")
834                        && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
835                    {
836                        return None;
837                    }
838                }
839            }
840        }
841    }
842
843    // create the lock
844    let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
845    let _file = fs::OpenOptions::new()
846        .create_new(true)
847        .write(true)
848        .open(&lock_path)
849        .map_err(|err| {
850            warn!(
851                "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
852                lock_path.display(),
853                err
854            )
855        })
856        .ok()?;
857
858    Some(lock_path)
859}
860
861// we have either both, or just path; dir entry is desirable since on some platforms we can get
862// metadata without extra syscalls
863// futhermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
864fn is_fs_lock_expired(
865    entry: Option<&fs::DirEntry>,
866    path: &PathBuf,
867    threshold: Duration,
868    allowed_future_drift: Duration,
869) -> bool {
870    let mtime = match entry
871        .map_or_else(|| path.metadata(), |e| e.metadata())
872        .and_then(|metadata| metadata.modified())
873    {
874        Ok(mt) => mt,
875        Err(err) => {
876            warn!(
877                "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
878                path.display(),
879                err
880            );
881            return true; // can't read mtime, treat as expired, so this task will not be starved
882        }
883    };
884
885    // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
886    match SystemTime::now().duration_since(mtime) {
887        Ok(elapsed) => elapsed >= threshold,
888        Err(err) => {
889            trace!(
890                "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
891                path.display(),
892                err
893            );
894            // the lock is expired if the time is too far in the future
895            // it is fine to have network share and not synchronized clocks,
896            // but it's not good when user changes time in their system clock
897            err.duration() > allowed_future_drift
898        }
899    }
900}
901
902#[cfg(test)]
903mod tests;