1use crate::{
5 column::{ColId, IterState},
6 db::{CommitChangeSet, Db, IndexedChangeSet, Operation},
7 error::try_io,
8 options::Options,
9 Error, Result,
10};
11use std::path::Path;
13
14const COMMIT_SIZE: usize = 10240;
15const OVERWRITE_TMP_PATH: &str = "to_revert_overwrite";
16
17pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u8]) -> Result<()> {
26 let source_meta = Options::load_metadata(from)?
27 .ok_or_else(|| Error::Migration("Error loading source metadata".into()))?;
28
29 let mut to_migrate = source_meta.columns_to_migrate();
30 for force in force_migrate.iter() {
31 to_migrate.insert(*force);
32 }
33 if source_meta.columns.len() != to.columns.len() {
34 return Err(Error::Migration("Source and dest columns mismatch".into()))
35 }
36
37 to.salt = Some(source_meta.salt);
39
40 if (to.salt.is_none()) && overwrite {
41 return Err(Error::Migration("Changing salt need to update metadata at once.".into()))
42 }
43
44 let mut source_options = Options::with_columns(from, source_meta.columns.len() as u8);
45 source_options.salt = Some(source_meta.salt);
46 source_options.columns = source_meta.columns;
47
48 let mut source = Db::open(&source_options)?;
49 let mut dest = Db::open_or_create(&to)?;
50
51 let mut ncommits: u64 = 0;
52 let mut commit = CommitChangeSet::default();
53 let mut nb_commit = 0;
54 let mut last_time = std::time::Instant::now();
55 for c in 0..source_options.columns.len() as ColId {
56 if source_options.columns[c as usize] != to.columns[c as usize] {
57 to_migrate.insert(c);
58 }
59 }
60
61 for c in to_migrate.iter() {
62 if source_options.columns[*c as usize].btree_index || to.columns[*c as usize].btree_index {
63 return Err(Error::Migration(
64 "Migrate only implemented for hash indexed column to hash indexed column".into(),
65 ))
66 }
67 }
68
69 for c in 0..source_options.columns.len() as ColId {
70 if !to_migrate.contains(&c) {
71 if !overwrite {
72 drop(dest);
73 copy_column(c, from, &to.path)?;
74 dest = Db::open_or_create(&to)?;
75 }
76 continue
77 }
78 log::info!("Migrating col {}", c);
79 source.iter_column_index_while(
80 c,
81 |IterState { item_index: index, key, rc, mut value, .. }| {
82 for _ in 0..rc {
84 let value = std::mem::take(&mut value);
85 commit
86 .indexed
87 .entry(c)
88 .or_insert_with(|| IndexedChangeSet::new(c))
89 .changes
90 .push(Operation::Set(key, value.into()));
91 nb_commit += 1;
92 if nb_commit == COMMIT_SIZE {
93 ncommits += 1;
94 if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) {
95 log::warn!("Migration error: {:?}", e);
96 return false
97 }
98 nb_commit = 0;
99
100 if last_time.elapsed() > std::time::Duration::from_secs(3) {
101 last_time = std::time::Instant::now();
102 log::info!("Migrating {} #{}, commit {}", c, index, ncommits);
103 }
104 }
105 }
106 true
107 },
108 )?;
109 if overwrite {
110 dest.commit_raw(commit)?;
111 commit = Default::default();
112 nb_commit = 0;
113 drop(dest);
114 dest = Db::open_or_create(&to)?; log::info!("Collection migrated {}, imported", c);
116
117 drop(dest);
118 drop(source);
119 let mut tmp_dir = from.to_path_buf();
120 tmp_dir.push(OVERWRITE_TMP_PATH);
121 let remove_tmp_dir = || -> Result<()> {
122 if std::fs::metadata(&tmp_dir).is_ok() {
123 std::fs::remove_dir_all(&tmp_dir).map_err(|e| {
124 Error::Migration(format!("Error removing overwrite tmp dir: {e:?}"))
125 })?;
126 }
127 Ok(())
128 };
129 remove_tmp_dir()?;
130 std::fs::create_dir_all(&tmp_dir).map_err(|e| {
131 Error::Migration(format!("Error creating overwrite tmp dir: {e:?}"))
132 })?;
133
134 move_column(c, from, &tmp_dir)?;
135 move_column(c, &to.path, from)?;
136 source_options.columns[c as usize] = to.columns[c as usize].clone();
137 source_options
138 .write_metadata(from, &to.salt.expect("Migrate requires salt"))
139 .map_err(|e| {
140 Error::Migration(format!(
141 "Error {e:?}\nFail updating metadata of column {c:?} \
142 in source, please restore manually before restarting."
143 ))
144 })?;
145 remove_tmp_dir()?;
146 source = Db::open(&source_options)?;
147 dest = Db::open_or_create(&to)?;
148
149 log::info!("Collection migrated {}, migrated", c);
150 }
151 }
152 dest.commit_raw(commit)?;
153 Ok(())
154}
155
156pub fn clear_column(path: &Path, column: ColId) -> Result<()> {
159 let meta = Options::load_metadata(path)?
160 .ok_or_else(|| Error::Migration("Error loading source metadata".into()))?;
161
162 if (column as usize) >= meta.columns.len() {
163 return Err(Error::Migration("Invalid column index".into()))
164 }
165
166 crate::column::Column::drop_files(column, path.to_path_buf())?;
167
168 Ok(())
169}
170
171fn move_column(c: ColId, from: &Path, to: &Path) -> Result<()> {
172 deplace_column(c, from, to, false)
173}
174
175fn copy_column(c: ColId, from: &Path, to: &Path) -> Result<()> {
176 deplace_column(c, from, to, true)
177}
178
179fn deplace_column(c: ColId, from: &Path, to: &Path, copy: bool) -> Result<()> {
180 for entry in try_io!(std::fs::read_dir(from)) {
181 let entry = try_io!(entry);
182 if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
183 if crate::index::TableId::is_file_name(c, file) ||
184 crate::table::TableId::is_file_name(c, file)
185 {
186 let mut from = from.to_path_buf();
187 from.push(file);
188 let mut to = to.to_path_buf();
189 to.push(file);
190 if copy {
191 try_io!(std::fs::copy(from, to));
192 } else {
193 try_io!(std::fs::rename(from, to));
194 }
195 }
196 }
197 }
198 Ok(())
199}
200
201#[cfg(test)]
202mod test {
203 use crate::{migration, Db, Options};
204 use tempfile::tempdir;
205
206 #[test]
207 fn migrate_simple() {
208 let dir = tempdir().unwrap();
209 let source_dir = dir.path().join("source");
210 let dest_dir = dir.path().join("dest");
211 {
212 let source = Db::with_columns(&source_dir, 1).unwrap();
213 source.commit([(0, b"1".to_vec(), Some(b"value".to_vec()))]).unwrap();
214 }
215
216 let dest_opts = Options::with_columns(&dest_dir, 1);
217 migration::migrate(&source_dir, dest_opts, false, &[0]).unwrap();
218 let dest = Db::with_columns(&dest_dir, 1).unwrap();
219 assert_eq!(dest.get(0, b"1").unwrap(), Some("value".as_bytes().to_vec()));
220 }
221
222 #[test]
223 fn clear_column() {
224 let source_dir = tempdir().unwrap();
225 let mut options = Options::with_columns(source_dir.path(), 3);
226 options.columns = vec![Default::default(); 3];
227 options.columns[1].btree_index = true;
228 {
229 let db = Db::open_or_create(&options).unwrap();
230
231 db.commit(vec![
232 (0, b"0".to_vec(), Some(b"value0".to_vec())),
233 (1, b"1".to_vec(), Some(b"value1".to_vec())),
234 (2, b"2".to_vec(), Some(b"value2".to_vec())),
235 ])
236 .unwrap();
237 }
238
239 migration::clear_column(source_dir.path(), 1).unwrap();
240 let db = Db::open(&options).unwrap();
241 assert_eq!(db.get(0, b"0").unwrap(), Some("value0".as_bytes().to_vec()));
242 assert_eq!(db.get(1, b"1").unwrap(), None);
243 assert_eq!(db.get(2, b"2").unwrap(), Some("value2".as_bytes().to_vec()));
244 }
245}