parity_db/
migration.rs

1// Copyright 2021-2022 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or MIT.
3
4use crate::{
5	column::{ColId, IterState},
6	db::{CommitChangeSet, Db, IndexedChangeSet, Operation},
7	error::try_io,
8	options::Options,
9	Error, Result,
10};
11/// Database migration.
12use std::path::Path;
13
14const COMMIT_SIZE: usize = 10240;
15const OVERWRITE_TMP_PATH: &str = "to_revert_overwrite";
16
17/// Attempt to migrate a database to a new configuration with different column settings.
18/// `from` Source database path
19/// `to` New database configuration.
20/// `overwrite` Ignore path set in `to` and attempt to overwrite data in place. This may be faster
21/// but if migration fails data may be lost
22/// `force_migrate` Force column re-population even if its setting did not change.
23///
24/// Note that migration between hash to btree columns is not possible.
25pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u8]) -> Result<()> {
26	let source_meta = Options::load_metadata(from)?
27		.ok_or_else(|| Error::Migration("Error loading source metadata".into()))?;
28
29	let mut to_migrate = source_meta.columns_to_migrate();
30	for force in force_migrate.iter() {
31		to_migrate.insert(*force);
32	}
33	if source_meta.columns.len() != to.columns.len() {
34		return Err(Error::Migration("Source and dest columns mismatch".into()))
35	}
36
37	// Make sure we are using the same salt value.
38	to.salt = Some(source_meta.salt);
39
40	if (to.salt.is_none()) && overwrite {
41		return Err(Error::Migration("Changing salt need to update metadata at once.".into()))
42	}
43
44	let mut source_options = Options::with_columns(from, source_meta.columns.len() as u8);
45	source_options.salt = Some(source_meta.salt);
46	source_options.columns = source_meta.columns;
47
48	let mut source = Db::open(&source_options)?;
49	let mut dest = Db::open_or_create(&to)?;
50
51	let mut ncommits: u64 = 0;
52	let mut commit = CommitChangeSet::default();
53	let mut nb_commit = 0;
54	let mut last_time = std::time::Instant::now();
55	for c in 0..source_options.columns.len() as ColId {
56		if source_options.columns[c as usize] != to.columns[c as usize] {
57			to_migrate.insert(c);
58		}
59	}
60
61	for c in to_migrate.iter() {
62		if source_options.columns[*c as usize].btree_index || to.columns[*c as usize].btree_index {
63			return Err(Error::Migration(
64				"Migrate only implemented for hash indexed column to hash indexed column".into(),
65			))
66		}
67	}
68
69	for c in 0..source_options.columns.len() as ColId {
70		if !to_migrate.contains(&c) {
71			if !overwrite {
72				drop(dest);
73				copy_column(c, from, &to.path)?;
74				dest = Db::open_or_create(&to)?;
75			}
76			continue
77		}
78		log::info!("Migrating col {}", c);
79		source.iter_column_index_while(
80			c,
81			|IterState { item_index: index, key, rc, mut value, .. }| {
82				//TODO: more efficient ref migration
83				for _ in 0..rc {
84					let value = std::mem::take(&mut value);
85					commit
86						.indexed
87						.entry(c)
88						.or_insert_with(|| IndexedChangeSet::new(c))
89						.changes
90						.push(Operation::Set(key, value.into()));
91					nb_commit += 1;
92					if nb_commit == COMMIT_SIZE {
93						ncommits += 1;
94						if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) {
95							log::warn!("Migration error: {:?}", e);
96							return false
97						}
98						nb_commit = 0;
99
100						if last_time.elapsed() > std::time::Duration::from_secs(3) {
101							last_time = std::time::Instant::now();
102							log::info!("Migrating {} #{}, commit {}", c, index, ncommits);
103						}
104					}
105				}
106				true
107			},
108		)?;
109		if overwrite {
110			dest.commit_raw(commit)?;
111			commit = Default::default();
112			nb_commit = 0;
113			drop(dest);
114			dest = Db::open_or_create(&to)?; // This is needed to flush logs.
115			log::info!("Collection migrated {}, imported", c);
116
117			drop(dest);
118			drop(source);
119			let mut tmp_dir = from.to_path_buf();
120			tmp_dir.push(OVERWRITE_TMP_PATH);
121			let remove_tmp_dir = || -> Result<()> {
122				if std::fs::metadata(&tmp_dir).is_ok() {
123					std::fs::remove_dir_all(&tmp_dir).map_err(|e| {
124						Error::Migration(format!("Error removing overwrite tmp dir: {e:?}"))
125					})?;
126				}
127				Ok(())
128			};
129			remove_tmp_dir()?;
130			std::fs::create_dir_all(&tmp_dir).map_err(|e| {
131				Error::Migration(format!("Error creating overwrite tmp dir: {e:?}"))
132			})?;
133
134			move_column(c, from, &tmp_dir)?;
135			move_column(c, &to.path, from)?;
136			source_options.columns[c as usize] = to.columns[c as usize].clone();
137			source_options
138				.write_metadata(from, &to.salt.expect("Migrate requires salt"))
139				.map_err(|e| {
140					Error::Migration(format!(
141						"Error {e:?}\nFail updating metadata of column {c:?} \
142							in source, please restore manually before restarting."
143					))
144				})?;
145			remove_tmp_dir()?;
146			source = Db::open(&source_options)?;
147			dest = Db::open_or_create(&to)?;
148
149			log::info!("Collection migrated {}, migrated", c);
150		}
151	}
152	dest.commit_raw(commit)?;
153	Ok(())
154}
155
156/// Clear specified column. All data is removed and stats are reset.
157/// Database must be closed before calling this.
158pub fn clear_column(path: &Path, column: ColId) -> Result<()> {
159	let meta = Options::load_metadata(path)?
160		.ok_or_else(|| Error::Migration("Error loading source metadata".into()))?;
161
162	if (column as usize) >= meta.columns.len() {
163		return Err(Error::Migration("Invalid column index".into()))
164	}
165
166	crate::column::Column::drop_files(column, path.to_path_buf())?;
167
168	Ok(())
169}
170
171fn move_column(c: ColId, from: &Path, to: &Path) -> Result<()> {
172	deplace_column(c, from, to, false)
173}
174
175fn copy_column(c: ColId, from: &Path, to: &Path) -> Result<()> {
176	deplace_column(c, from, to, true)
177}
178
179fn deplace_column(c: ColId, from: &Path, to: &Path, copy: bool) -> Result<()> {
180	for entry in try_io!(std::fs::read_dir(from)) {
181		let entry = try_io!(entry);
182		if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
183			if crate::index::TableId::is_file_name(c, file) ||
184				crate::table::TableId::is_file_name(c, file)
185			{
186				let mut from = from.to_path_buf();
187				from.push(file);
188				let mut to = to.to_path_buf();
189				to.push(file);
190				if copy {
191					try_io!(std::fs::copy(from, to));
192				} else {
193					try_io!(std::fs::rename(from, to));
194				}
195			}
196		}
197	}
198	Ok(())
199}
200
201#[cfg(test)]
202mod test {
203	use crate::{migration, Db, Options};
204	use tempfile::tempdir;
205
206	#[test]
207	fn migrate_simple() {
208		let dir = tempdir().unwrap();
209		let source_dir = dir.path().join("source");
210		let dest_dir = dir.path().join("dest");
211		{
212			let source = Db::with_columns(&source_dir, 1).unwrap();
213			source.commit([(0, b"1".to_vec(), Some(b"value".to_vec()))]).unwrap();
214		}
215
216		let dest_opts = Options::with_columns(&dest_dir, 1);
217		migration::migrate(&source_dir, dest_opts, false, &[0]).unwrap();
218		let dest = Db::with_columns(&dest_dir, 1).unwrap();
219		assert_eq!(dest.get(0, b"1").unwrap(), Some("value".as_bytes().to_vec()));
220	}
221
222	#[test]
223	fn clear_column() {
224		let source_dir = tempdir().unwrap();
225		let mut options = Options::with_columns(source_dir.path(), 3);
226		options.columns = vec![Default::default(); 3];
227		options.columns[1].btree_index = true;
228		{
229			let db = Db::open_or_create(&options).unwrap();
230
231			db.commit(vec![
232				(0, b"0".to_vec(), Some(b"value0".to_vec())),
233				(1, b"1".to_vec(), Some(b"value1".to_vec())),
234				(2, b"2".to_vec(), Some(b"value2".to_vec())),
235			])
236			.unwrap();
237		}
238
239		migration::clear_column(source_dir.path(), 1).unwrap();
240		let db = Db::open(&options).unwrap();
241		assert_eq!(db.get(0, b"0").unwrap(), Some("value0".as_bytes().to_vec()));
242		assert_eq!(db.get(1, b"1").unwrap(), None);
243		assert_eq!(db.get(2, b"2").unwrap(), Some("value2".as_bytes().to_vec()));
244	}
245}