Skip to main content

zombienet_orchestrator/generators/
db_snapshot.rs

1//! Resolve `db_snapshot` `AssetLocation`s into local cache paths, once per
2//! unique location, **before** the parallel spawn fanout.
3//!
4//! Cache layout is: `{ns_base_dir}/{sha256(loc_str)}.tgz`. The
5//! provider's `initialize_db_snapshot` now takes a `&Path` to this
6//! already-resolved file and only has to extract it.
7
8use std::{
9    collections::HashMap,
10    path::{Path, PathBuf},
11};
12
13use configuration::types::AssetLocation;
14use provider::{DynNamespace, ProviderError};
15use sha2::Digest;
16use support::fs::FileSystem;
17use tracing::trace;
18
19use crate::network_spec::node::NodeSpec;
20
21/// Lookup map produced by [`resolve_db_snapshots`].
22pub type ResolvedDbSnapshots = HashMap<AssetLocation, PathBuf>;
23
24/// Walk every node's `db_snapshot`, deduplicate by `AssetLocation`, and
25/// fetch each unique location once into the namespace cache. Returns a
26/// map from the original `AssetLocation` to the local cache `PathBuf`.
27pub async fn resolve_db_snapshots<'a, FS, I>(
28    nodes: I,
29    ns: &DynNamespace,
30    filesystem: &FS,
31) -> Result<ResolvedDbSnapshots, ProviderError>
32where
33    FS: FileSystem,
34    I: IntoIterator<Item = &'a NodeSpec>,
35{
36    let ns_base_dir = ns.base_dir().to_string_lossy().to_string();
37    let mut resolved: ResolvedDbSnapshots = HashMap::new();
38
39    for loc in nodes.into_iter().filter_map(|n| n.db_snapshot.as_ref()) {
40        if resolved.contains_key(loc) {
41            continue;
42        }
43        let hashed = hex::encode(sha2::Sha256::digest(loc.to_string()));
44        let cache_path = PathBuf::from(format!("{ns_base_dir}/{hashed}.tgz"));
45        if !filesystem.exists(&cache_path).await {
46            fetch_into_cache(loc, &cache_path, filesystem).await?;
47        } else {
48            trace!("db_snapshot cache hit: {}", cache_path.display());
49        }
50        resolved.insert(loc.clone(), cache_path);
51    }
52
53    Ok(resolved)
54}
55
56/// Remove cache tarballs after all consumers have finished extracting them,
57/// gated by the `ZOMBIE_RM_TGZ_AFTER_EXTRACT` env var. Best-effort: errors
58/// are logged and swallowed (cleanup must never fail spawn).
59///
60/// Must be called only after every node that consumes the cache has
61/// completed its `initialize_db_snapshot` — otherwise concurrent readers
62/// will hit `ENOENT`.
63pub async fn cleanup_db_snapshot_cache(resolved: &ResolvedDbSnapshots) {
64    if std::env::var("ZOMBIE_RM_TGZ_AFTER_EXTRACT").is_err() {
65        return;
66    }
67    for cache_path in resolved.values() {
68        match tokio::fs::remove_file(cache_path).await {
69            Ok(()) => trace!("removed cache {}", cache_path.display()),
70            Err(err) => trace!("failed to remove cache {}: {err}", cache_path.display()),
71        }
72    }
73}
74
75async fn fetch_into_cache<FS: FileSystem>(
76    location: &AssetLocation,
77    cache_path: &Path,
78    filesystem: &FS,
79) -> Result<(), ProviderError> {
80    trace!(
81        "resolving db_snapshot {} -> {}",
82        location,
83        cache_path.display()
84    );
85    match location {
86        AssetLocation::Url(url) => {
87            let res = reqwest::get(url.as_ref())
88                .await
89                .map_err(|err| ProviderError::DownloadFile(url.to_string(), err.into()))?;
90            let bytes = res
91                .bytes()
92                .await
93                .map_err(|err| ProviderError::DownloadFile(url.to_string(), err.into()))?;
94            filesystem.write(cache_path, &bytes[..]).await?;
95        },
96        AssetLocation::FilePath(filepath) => {
97            filesystem.copy(filepath, cache_path).await?;
98        },
99    }
100    Ok(())
101}