referrerpolicy=no-referrer-when-downgrade

sc_client_db/
utils.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Db-based backend utility structures and functions, used by both
20//! full and light storages.
21
22use std::{fmt, fs, io, path::Path, sync::Arc};
23
24use log::{debug, info};
25
26use crate::{Database, DatabaseSource, DbHash};
27use codec::Decode;
28use sc_client_api::blockchain::{BlockGap, BlockGapType};
29use sp_database::Transaction;
30use sp_runtime::{
31	generic::BlockId,
32	traits::{
33		Block as BlockT, Header as HeaderT, NumberFor, UniqueSaturatedFrom, UniqueSaturatedInto,
34		Zero,
35	},
36};
37use sp_trie::DBValue;
38
39/// Number of columns in the db. Must be the same for both full && light dbs.
40/// Otherwise RocksDb will fail to open database && check its type.
41pub const NUM_COLUMNS: u32 = 13;
42/// Meta column. The set of keys in the column is shared by full && light storages.
43pub const COLUMN_META: u32 = 0;
44
45/// Current block gap version.
46pub const BLOCK_GAP_CURRENT_VERSION: u32 = 1;
47
48/// Keys of entries in COLUMN_META.
49pub mod meta_keys {
50	/// Type of storage (full or light).
51	pub const TYPE: &[u8; 4] = b"type";
52	/// Best block key.
53	pub const BEST_BLOCK: &[u8; 4] = b"best";
54	/// Last finalized block key.
55	pub const FINALIZED_BLOCK: &[u8; 5] = b"final";
56	/// Last finalized state key.
57	pub const FINALIZED_STATE: &[u8; 6] = b"fstate";
58	/// Block gap.
59	pub const BLOCK_GAP: &[u8; 3] = b"gap";
60	/// Block gap version.
61	pub const BLOCK_GAP_VERSION: &[u8; 7] = b"gap_ver";
62	/// Genesis block hash.
63	pub const GENESIS_HASH: &[u8; 3] = b"gen";
64	/// Leaves prefix list key.
65	pub const LEAF_PREFIX: &[u8; 4] = b"leaf";
66	/// Children prefix list key.
67	pub const CHILDREN_PREFIX: &[u8; 8] = b"children";
68}
69
70/// Database metadata.
71#[derive(Debug)]
72pub struct Meta<N, H> {
73	/// Hash of the best known block.
74	pub best_hash: H,
75	/// Number of the best known block.
76	pub best_number: N,
77	/// Hash of the best finalized block.
78	pub finalized_hash: H,
79	/// Number of the best finalized block.
80	pub finalized_number: N,
81	/// Hash of the genesis block.
82	pub genesis_hash: H,
83	/// Finalized state, if any
84	pub finalized_state: Option<(H, N)>,
85	/// Block gap, if any.
86	pub block_gap: Option<BlockGap<N>>,
87}
88
89/// A block lookup key: used for canonical lookup from block number to hash
90pub type NumberIndexKey = [u8; 4];
91
92/// Database type.
93#[derive(Clone, Copy, Debug, PartialEq)]
94pub enum DatabaseType {
95	/// Full node database.
96	Full,
97}
98
99/// Convert block number into short lookup key (LE representation) for
100/// blocks that are in the canonical chain.
101///
102/// In the current database schema, this kind of key is only used for
103/// lookups into an index, NOT for storing header data or others.
104pub fn number_index_key<N: TryInto<u32>>(n: N) -> sp_blockchain::Result<NumberIndexKey> {
105	let n = n.try_into().map_err(|_| {
106		sp_blockchain::Error::Backend("Block number cannot be converted to u32".into())
107	})?;
108
109	Ok([(n >> 24) as u8, ((n >> 16) & 0xff) as u8, ((n >> 8) & 0xff) as u8, (n & 0xff) as u8])
110}
111
112/// Convert number and hash into long lookup key for blocks that are
113/// not in the canonical chain.
114pub fn number_and_hash_to_lookup_key<N, H>(number: N, hash: H) -> sp_blockchain::Result<Vec<u8>>
115where
116	N: TryInto<u32>,
117	H: AsRef<[u8]>,
118{
119	let mut lookup_key = number_index_key(number)?.to_vec();
120	lookup_key.extend_from_slice(hash.as_ref());
121	Ok(lookup_key)
122}
123
124/// Delete number to hash mapping in DB transaction.
125pub fn remove_number_to_key_mapping<N: TryInto<u32>>(
126	transaction: &mut Transaction<DbHash>,
127	key_lookup_col: u32,
128	number: N,
129) -> sp_blockchain::Result<()> {
130	transaction.remove(key_lookup_col, number_index_key(number)?.as_ref());
131	Ok(())
132}
133
134/// Place a number mapping into the database. This maps number to current perceived
135/// block hash at that position.
136pub fn insert_number_to_key_mapping<N: TryInto<u32> + Clone, H: AsRef<[u8]>>(
137	transaction: &mut Transaction<DbHash>,
138	key_lookup_col: u32,
139	number: N,
140	hash: H,
141) -> sp_blockchain::Result<()> {
142	transaction.set_from_vec(
143		key_lookup_col,
144		number_index_key(number.clone())?.as_ref(),
145		number_and_hash_to_lookup_key(number, hash)?,
146	);
147	Ok(())
148}
149
150/// Insert a hash to key mapping in the database.
151pub fn insert_hash_to_key_mapping<N: TryInto<u32>, H: AsRef<[u8]> + Clone>(
152	transaction: &mut Transaction<DbHash>,
153	key_lookup_col: u32,
154	number: N,
155	hash: H,
156) -> sp_blockchain::Result<()> {
157	transaction.set_from_vec(
158		key_lookup_col,
159		hash.as_ref(),
160		number_and_hash_to_lookup_key(number, hash.clone())?,
161	);
162	Ok(())
163}
164
165/// Convert block id to block lookup key.
166/// block lookup key is the DB-key header, block and justification are stored under.
167/// looks up lookup key by hash from DB as necessary.
168pub fn block_id_to_lookup_key<Block>(
169	db: &dyn Database<DbHash>,
170	key_lookup_col: u32,
171	id: BlockId<Block>,
172) -> Result<Option<Vec<u8>>, sp_blockchain::Error>
173where
174	Block: BlockT,
175	::sp_runtime::traits::NumberFor<Block>: UniqueSaturatedFrom<u64> + UniqueSaturatedInto<u64>,
176{
177	Ok(match id {
178		BlockId::Number(n) => db.get(key_lookup_col, number_index_key(n)?.as_ref()),
179		BlockId::Hash(h) => db.get(key_lookup_col, h.as_ref()),
180	})
181}
182
183/// Opens the configured database.
184pub fn open_database<Block: BlockT>(
185	db_source: &DatabaseSource,
186	db_type: DatabaseType,
187	create: bool,
188) -> OpenDbResult {
189	// Maybe migrate (copy) the database to a type specific subdirectory to make it
190	// possible that light and full databases coexist
191	// NOTE: This function can be removed in a few releases
192	maybe_migrate_to_type_subdir::<Block>(db_source, db_type)?;
193
194	open_database_at::<Block>(db_source, db_type, create)
195}
196
197fn open_database_at<Block: BlockT>(
198	db_source: &DatabaseSource,
199	db_type: DatabaseType,
200	create: bool,
201) -> OpenDbResult {
202	let db: Arc<dyn Database<DbHash>> = match &db_source {
203		DatabaseSource::ParityDb { path } => open_parity_db::<Block>(path, db_type, create)?,
204		#[cfg(feature = "rocksdb")]
205		DatabaseSource::RocksDb { path, cache_size } =>
206			open_kvdb_rocksdb::<Block>(path, db_type, create, *cache_size)?,
207		DatabaseSource::Custom { db, require_create_flag } => {
208			if *require_create_flag && !create {
209				return Err(OpenDbError::DoesNotExist);
210			}
211			db.clone()
212		},
213		DatabaseSource::Auto { paritydb_path, rocksdb_path, cache_size } => {
214			// check if rocksdb exists first, if not, open paritydb
215			match open_kvdb_rocksdb::<Block>(rocksdb_path, db_type, false, *cache_size) {
216				Ok(db) => db,
217				Err(OpenDbError::NotEnabled(_)) | Err(OpenDbError::DoesNotExist) =>
218					open_parity_db::<Block>(paritydb_path, db_type, create)?,
219				Err(as_is) => return Err(as_is),
220			}
221		},
222	};
223
224	check_database_type(&*db, db_type)?;
225	Ok(db)
226}
227
228#[derive(Debug)]
229pub enum OpenDbError {
230	// constructed only when rocksdb and paritydb are disabled
231	#[allow(dead_code)]
232	NotEnabled(&'static str),
233	DoesNotExist,
234	Internal(String),
235	DatabaseError(sp_database::error::DatabaseError),
236	UnexpectedDbType {
237		expected: DatabaseType,
238		found: Vec<u8>,
239	},
240}
241
242type OpenDbResult = Result<Arc<dyn Database<DbHash>>, OpenDbError>;
243
244impl fmt::Display for OpenDbError {
245	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
246		match self {
247			OpenDbError::Internal(e) => write!(f, "{}", e),
248			OpenDbError::DoesNotExist => write!(f, "Database does not exist at given location"),
249			OpenDbError::NotEnabled(feat) => {
250				write!(f, "`{}` feature not enabled, database can not be opened", feat)
251			},
252			OpenDbError::DatabaseError(db_error) => {
253				write!(f, "Database Error: {}", db_error)
254			},
255			OpenDbError::UnexpectedDbType { expected, found } => {
256				write!(
257					f,
258					"Unexpected DB-Type. Expected: {:?}, Found: {:?}",
259					expected.as_str().as_bytes(),
260					found
261				)
262			},
263		}
264	}
265}
266
267impl From<OpenDbError> for sp_blockchain::Error {
268	fn from(err: OpenDbError) -> Self {
269		sp_blockchain::Error::Backend(err.to_string())
270	}
271}
272
273impl From<parity_db::Error> for OpenDbError {
274	fn from(err: parity_db::Error) -> Self {
275		if matches!(err, parity_db::Error::DatabaseNotFound) {
276			OpenDbError::DoesNotExist
277		} else {
278			OpenDbError::Internal(err.to_string())
279		}
280	}
281}
282
283impl From<io::Error> for OpenDbError {
284	fn from(err: io::Error) -> Self {
285		if err.to_string().contains("create_if_missing is false") {
286			OpenDbError::DoesNotExist
287		} else {
288			OpenDbError::Internal(err.to_string())
289		}
290	}
291}
292
293fn open_parity_db<Block: BlockT>(path: &Path, db_type: DatabaseType, create: bool) -> OpenDbResult {
294	match crate::parity_db::open(path, db_type, create, false) {
295		Ok(db) => Ok(db),
296		Err(parity_db::Error::InvalidConfiguration(_)) => {
297			log::warn!("Invalid parity db configuration, attempting database metadata update.");
298			// Try to update the database with the new config
299			Ok(crate::parity_db::open(path, db_type, create, true)?)
300		},
301		Err(e) => Err(e.into()),
302	}
303}
304
305#[cfg(any(feature = "rocksdb", test))]
306fn open_kvdb_rocksdb<Block: BlockT>(
307	path: &Path,
308	db_type: DatabaseType,
309	create: bool,
310	cache_size: usize,
311) -> OpenDbResult {
312	// first upgrade database to required version
313	match crate::upgrade::upgrade_db::<Block>(path, db_type) {
314		// in case of missing version file, assume that database simply does not exist at given
315		// location
316		Ok(_) | Err(crate::upgrade::UpgradeError::MissingDatabaseVersionFile) => (),
317		Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()),
318	}
319
320	// and now open database assuming that it has the latest version
321	let mut db_config = kvdb_rocksdb::DatabaseConfig::with_columns(NUM_COLUMNS);
322	db_config.create_if_missing = create;
323
324	let mut memory_budget = std::collections::HashMap::new();
325	match db_type {
326		DatabaseType::Full => {
327			let state_col_budget = (cache_size as f64 * 0.9) as usize;
328			let other_col_budget = (cache_size - state_col_budget) / (NUM_COLUMNS as usize - 1);
329
330			for i in 0..NUM_COLUMNS {
331				if i == crate::columns::STATE {
332					memory_budget.insert(i, state_col_budget);
333				} else {
334					memory_budget.insert(i, other_col_budget);
335				}
336			}
337			log::trace!(
338				target: "db",
339				"Open RocksDB database at {:?}, state column budget: {} MiB, others({}) column cache: {} MiB",
340				path,
341				state_col_budget,
342				NUM_COLUMNS,
343				other_col_budget,
344			);
345		},
346	}
347	db_config.memory_budget = memory_budget;
348
349	let db = kvdb_rocksdb::Database::open(&db_config, path)?;
350	// write database version only after the database is successfully opened
351	crate::upgrade::update_version(path)?;
352	Ok(sp_database::as_database(db))
353}
354
355#[cfg(not(any(feature = "rocksdb", test)))]
356fn open_kvdb_rocksdb<Block: BlockT>(
357	_path: &Path,
358	_db_type: DatabaseType,
359	_create: bool,
360	_cache_size: usize,
361) -> OpenDbResult {
362	Err(OpenDbError::NotEnabled("with-kvdb-rocksdb"))
363}
364
365/// Check database type.
366pub fn check_database_type(
367	db: &dyn Database<DbHash>,
368	db_type: DatabaseType,
369) -> Result<(), OpenDbError> {
370	match db.get(COLUMN_META, meta_keys::TYPE) {
371		Some(stored_type) =>
372			if db_type.as_str().as_bytes() != &*stored_type {
373				return Err(OpenDbError::UnexpectedDbType {
374					expected: db_type,
375					found: stored_type.to_owned(),
376				});
377			},
378		None => {
379			let mut transaction = Transaction::new();
380			transaction.set(COLUMN_META, meta_keys::TYPE, db_type.as_str().as_bytes());
381			db.commit(transaction).map_err(OpenDbError::DatabaseError)?;
382		},
383	}
384
385	Ok(())
386}
387
388fn maybe_migrate_to_type_subdir<Block: BlockT>(
389	source: &DatabaseSource,
390	db_type: DatabaseType,
391) -> Result<(), OpenDbError> {
392	if let Some(p) = source.path() {
393		let mut basedir = p.to_path_buf();
394		basedir.pop();
395
396		// Do we have to migrate to a database-type-based subdirectory layout:
397		// See if there's a file identifying a rocksdb or paritydb folder in the parent dir and
398		// the target path ends in a role specific directory
399		if (basedir.join("db_version").exists() || basedir.join("metadata").exists()) &&
400			(p.ends_with(DatabaseType::Full.as_str()))
401		{
402			// Try to open the database to check if the current `DatabaseType` matches the type of
403			// database stored in the target directory and close the database on success.
404			let mut old_source = source.clone();
405			old_source.set_path(&basedir);
406			open_database_at::<Block>(&old_source, db_type, false)?;
407
408			info!(
409				"Migrating database to a database-type-based subdirectory: '{:?}' -> '{:?}'",
410				basedir,
411				basedir.join(db_type.as_str())
412			);
413
414			let mut tmp_dir = basedir.clone();
415			tmp_dir.pop();
416			tmp_dir.push("tmp");
417
418			fs::rename(&basedir, &tmp_dir)?;
419			fs::create_dir_all(&p)?;
420			fs::rename(tmp_dir, &p)?;
421		}
422	}
423
424	Ok(())
425}
426
427/// Read database column entry for the given block.
428pub fn read_db<Block>(
429	db: &dyn Database<DbHash>,
430	col_index: u32,
431	col: u32,
432	id: BlockId<Block>,
433) -> sp_blockchain::Result<Option<DBValue>>
434where
435	Block: BlockT,
436{
437	block_id_to_lookup_key(db, col_index, id).map(|key| match key {
438		Some(key) => db.get(col, key.as_ref()),
439		None => None,
440	})
441}
442
443/// Remove database column entry for the given block.
444pub fn remove_from_db<Block>(
445	transaction: &mut Transaction<DbHash>,
446	db: &dyn Database<DbHash>,
447	col_index: u32,
448	col: u32,
449	id: BlockId<Block>,
450) -> sp_blockchain::Result<()>
451where
452	Block: BlockT,
453{
454	block_id_to_lookup_key(db, col_index, id).map(|key| {
455		if let Some(key) = key {
456			transaction.remove(col, key.as_ref());
457		}
458	})
459}
460
461/// Read a header from the database.
462pub fn read_header<Block: BlockT>(
463	db: &dyn Database<DbHash>,
464	col_index: u32,
465	col: u32,
466	id: BlockId<Block>,
467) -> sp_blockchain::Result<Option<Block::Header>> {
468	match read_db(db, col_index, col, id)? {
469		Some(header) => match Block::Header::decode(&mut &header[..]) {
470			Ok(header) => Ok(Some(header)),
471			Err(_) => Err(sp_blockchain::Error::Backend("Error decoding header".into())),
472		},
473		None => Ok(None),
474	}
475}
476
477/// Read meta from the database.
478pub fn read_meta<Block>(
479	db: &dyn Database<DbHash>,
480	col_header: u32,
481) -> Result<Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>, sp_blockchain::Error>
482where
483	Block: BlockT,
484{
485	let genesis_hash: Block::Hash = match read_genesis_hash(db)? {
486		Some(genesis_hash) => genesis_hash,
487		None =>
488			return Ok(Meta {
489				best_hash: Default::default(),
490				best_number: Zero::zero(),
491				finalized_hash: Default::default(),
492				finalized_number: Zero::zero(),
493				genesis_hash: Default::default(),
494				finalized_state: None,
495				block_gap: None,
496			}),
497	};
498
499	let load_meta_block = |desc, key| -> Result<_, sp_blockchain::Error> {
500		if let Some(Some(header)) = db
501			.get(COLUMN_META, key)
502			.and_then(|id| db.get(col_header, &id).map(|b| Block::Header::decode(&mut &b[..]).ok()))
503		{
504			let hash = header.hash();
505			debug!(
506				target: "db",
507				"Opened blockchain db, fetched {} = {:?} ({})",
508				desc,
509				hash,
510				header.number(),
511			);
512			Ok((hash, *header.number()))
513		} else {
514			Ok((Default::default(), Zero::zero()))
515		}
516	};
517
518	let (best_hash, best_number) = load_meta_block("best", meta_keys::BEST_BLOCK)?;
519	let (finalized_hash, finalized_number) = load_meta_block("final", meta_keys::FINALIZED_BLOCK)?;
520	let (finalized_state_hash, finalized_state_number) =
521		load_meta_block("final_state", meta_keys::FINALIZED_STATE)?;
522	let finalized_state = if finalized_state_hash != Default::default() {
523		Some((finalized_state_hash, finalized_state_number))
524	} else {
525		None
526	};
527	let block_gap = match db
528		.get(COLUMN_META, meta_keys::BLOCK_GAP_VERSION)
529		.and_then(|d| u32::decode(&mut d.as_slice()).ok())
530	{
531		None => {
532			let old_block_gap: Option<(NumberFor<Block>, NumberFor<Block>)> = db
533				.get(COLUMN_META, meta_keys::BLOCK_GAP)
534				.and_then(|d| Decode::decode(&mut d.as_slice()).ok());
535
536			old_block_gap.map(|(start, end)| BlockGap {
537				start,
538				end,
539				gap_type: BlockGapType::MissingHeaderAndBody,
540			})
541		},
542		Some(version) => match version {
543			BLOCK_GAP_CURRENT_VERSION => db
544				.get(COLUMN_META, meta_keys::BLOCK_GAP)
545				.and_then(|d| Decode::decode(&mut d.as_slice()).ok()),
546			v =>
547				return Err(sp_blockchain::Error::Backend(format!(
548					"Unsupported block gap DB version: {v}"
549				))),
550		},
551	};
552	debug!(target: "db", "block_gap={:?}", block_gap);
553
554	Ok(Meta {
555		best_hash,
556		best_number,
557		finalized_hash,
558		finalized_number,
559		genesis_hash,
560		finalized_state,
561		block_gap,
562	})
563}
564
565/// Read genesis hash from database.
566pub fn read_genesis_hash<Hash: Decode>(
567	db: &dyn Database<DbHash>,
568) -> sp_blockchain::Result<Option<Hash>> {
569	match db.get(COLUMN_META, meta_keys::GENESIS_HASH) {
570		Some(h) => match Decode::decode(&mut &h[..]) {
571			Ok(h) => Ok(Some(h)),
572			Err(err) =>
573				Err(sp_blockchain::Error::Backend(format!("Error decoding genesis hash: {}", err))),
574		},
575		None => Ok(None),
576	}
577}
578
579impl DatabaseType {
580	/// Returns str representation of the type.
581	pub fn as_str(&self) -> &'static str {
582		match *self {
583			DatabaseType::Full => "full",
584		}
585	}
586}
587
588pub(crate) struct JoinInput<'a, 'b>(&'a [u8], &'b [u8]);
589
590pub(crate) fn join_input<'a, 'b>(i1: &'a [u8], i2: &'b [u8]) -> JoinInput<'a, 'b> {
591	JoinInput(i1, i2)
592}
593
594impl<'a, 'b> codec::Input for JoinInput<'a, 'b> {
595	fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
596		Ok(Some(self.0.len() + self.1.len()))
597	}
598
599	fn read(&mut self, into: &mut [u8]) -> Result<(), codec::Error> {
600		let mut read = 0;
601		if self.0.len() > 0 {
602			read = std::cmp::min(self.0.len(), into.len());
603			self.0.read(&mut into[..read])?;
604		}
605		if read < into.len() {
606			self.1.read(&mut into[read..])?;
607		}
608		Ok(())
609	}
610}
611
612#[cfg(test)]
613mod tests {
614	use super::*;
615	use codec::Input;
616	use sp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
617
618	pub type UncheckedXt = TestXt<MockCallU64, ()>;
619	type Block = RawBlock<UncheckedXt>;
620
621	#[cfg(feature = "rocksdb")]
622	#[test]
623	fn database_type_subdir_migration() {
624		use std::path::PathBuf;
625		type Block = RawBlock<UncheckedXt>;
626
627		fn check_dir_for_db_type(
628			db_type: DatabaseType,
629			mut source: DatabaseSource,
630			db_check_file: &str,
631		) {
632			let base_path = tempfile::TempDir::new().unwrap();
633			let old_db_path = base_path.path().join("chains/dev/db");
634
635			source.set_path(&old_db_path);
636
637			{
638				let db_res = open_database::<Block>(&source, db_type, true);
639				assert!(db_res.is_ok(), "New database should be created.");
640				assert!(old_db_path.join(db_check_file).exists());
641				assert!(!old_db_path.join(db_type.as_str()).join("db_version").exists());
642			}
643
644			source.set_path(&old_db_path.join(db_type.as_str()));
645
646			let db_res = open_database::<Block>(&source, db_type, true);
647			assert!(db_res.is_ok(), "Reopening the db with the same role should work");
648			// check if the database dir had been migrated
649			assert!(!old_db_path.join(db_check_file).exists());
650			assert!(old_db_path.join(db_type.as_str()).join(db_check_file).exists());
651		}
652
653		check_dir_for_db_type(
654			DatabaseType::Full,
655			DatabaseSource::RocksDb { path: PathBuf::new(), cache_size: 128 },
656			"db_version",
657		);
658
659		check_dir_for_db_type(
660			DatabaseType::Full,
661			DatabaseSource::ParityDb { path: PathBuf::new() },
662			"metadata",
663		);
664
665		// check failure on reopening with wrong role
666		{
667			let base_path = tempfile::TempDir::new().unwrap();
668			let old_db_path = base_path.path().join("chains/dev/db");
669
670			let source = DatabaseSource::RocksDb { path: old_db_path.clone(), cache_size: 128 };
671			{
672				let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
673				assert!(db_res.is_ok(), "New database should be created.");
674
675				// check if the database dir had been migrated
676				assert!(old_db_path.join("db_version").exists());
677				assert!(!old_db_path.join("light/db_version").exists());
678				assert!(!old_db_path.join("full/db_version").exists());
679			}
680			// assert nothing was changed
681			assert!(old_db_path.join("db_version").exists());
682			assert!(!old_db_path.join("full/db_version").exists());
683		}
684	}
685
686	#[test]
687	fn number_index_key_doesnt_panic() {
688		let id = BlockId::<Block>::Number(72340207214430721);
689		match id {
690			BlockId::Number(n) => number_index_key(n).expect_err("number should overflow u32"),
691			_ => unreachable!(),
692		};
693	}
694
695	#[test]
696	fn database_type_as_str_works() {
697		assert_eq!(DatabaseType::Full.as_str(), "full");
698	}
699
700	#[test]
701	fn join_input_works() {
702		let buf1 = [1, 2, 3, 4];
703		let buf2 = [5, 6, 7, 8];
704		let mut test = [0, 0, 0];
705		let mut joined = join_input(buf1.as_ref(), buf2.as_ref());
706		assert_eq!(joined.remaining_len().unwrap(), Some(8));
707
708		joined.read(&mut test).unwrap();
709		assert_eq!(test, [1, 2, 3]);
710		assert_eq!(joined.remaining_len().unwrap(), Some(5));
711
712		joined.read(&mut test).unwrap();
713		assert_eq!(test, [4, 5, 6]);
714		assert_eq!(joined.remaining_len().unwrap(), Some(2));
715
716		joined.read(&mut test[0..2]).unwrap();
717		assert_eq!(test, [7, 8, 6]);
718		assert_eq!(joined.remaining_len().unwrap(), Some(0));
719	}
720
721	#[cfg(feature = "rocksdb")]
722	#[test]
723	fn test_open_database_auto_new() {
724		let db_dir = tempfile::TempDir::new().unwrap();
725		let db_path = db_dir.path().to_owned();
726		let paritydb_path = db_path.join("paritydb");
727		let rocksdb_path = db_path.join("rocksdb_path");
728		let source = DatabaseSource::Auto {
729			paritydb_path: paritydb_path.clone(),
730			rocksdb_path: rocksdb_path.clone(),
731			cache_size: 128,
732		};
733
734		// it should create new auto (paritydb) database
735		{
736			let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
737			assert!(db_res.is_ok(), "New database should be created.");
738		}
739
740		// it should reopen existing auto (pairtydb) database
741		{
742			let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
743			assert!(db_res.is_ok(), "Existing parity database should be reopened");
744		}
745
746		// it should fail to open existing auto (pairtydb) database
747		{
748			let db_res = open_database::<Block>(
749				&DatabaseSource::RocksDb { path: rocksdb_path, cache_size: 128 },
750				DatabaseType::Full,
751				true,
752			);
753			assert!(db_res.is_ok(), "New database should be opened.");
754		}
755
756		// it should reopen existing auto (pairtydb) database
757		{
758			let db_res = open_database::<Block>(
759				&DatabaseSource::ParityDb { path: paritydb_path },
760				DatabaseType::Full,
761				true,
762			);
763			assert!(db_res.is_ok(), "Existing parity database should be reopened");
764		}
765	}
766
767	#[cfg(feature = "rocksdb")]
768	#[test]
769	fn test_open_database_rocksdb_new() {
770		let db_dir = tempfile::TempDir::new().unwrap();
771		let db_path = db_dir.path().to_owned();
772		let paritydb_path = db_path.join("paritydb");
773		let rocksdb_path = db_path.join("rocksdb_path");
774
775		let source = DatabaseSource::RocksDb { path: rocksdb_path.clone(), cache_size: 128 };
776
777		// it should create new rocksdb database
778		{
779			let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
780			assert!(db_res.is_ok(), "New rocksdb database should be created");
781		}
782
783		// it should reopen existing auto (rocksdb) database
784		{
785			let db_res = open_database::<Block>(
786				&DatabaseSource::Auto {
787					paritydb_path: paritydb_path.clone(),
788					rocksdb_path: rocksdb_path.clone(),
789					cache_size: 128,
790				},
791				DatabaseType::Full,
792				true,
793			);
794			assert!(db_res.is_ok(), "Existing rocksdb database should be reopened");
795		}
796
797		// it should fail to open existing auto (rocksdb) database
798		{
799			let db_res = open_database::<Block>(
800				&DatabaseSource::ParityDb { path: paritydb_path },
801				DatabaseType::Full,
802				true,
803			);
804			assert!(db_res.is_ok(), "New paritydb database should be created");
805		}
806
807		// it should reopen existing auto (pairtydb) database
808		{
809			let db_res = open_database::<Block>(
810				&DatabaseSource::RocksDb { path: rocksdb_path, cache_size: 128 },
811				DatabaseType::Full,
812				true,
813			);
814			assert!(db_res.is_ok(), "Existing rocksdb database should be reopened");
815		}
816	}
817
818	#[cfg(feature = "rocksdb")]
819	#[test]
820	fn test_open_database_paritydb_new() {
821		let db_dir = tempfile::TempDir::new().unwrap();
822		let db_path = db_dir.path().to_owned();
823		let paritydb_path = db_path.join("paritydb");
824		let rocksdb_path = db_path.join("rocksdb_path");
825
826		let source = DatabaseSource::ParityDb { path: paritydb_path.clone() };
827
828		// it should create new paritydb database
829		{
830			let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
831			assert!(db_res.is_ok(), "New database should be created.");
832		}
833
834		// it should reopen existing pairtydb database
835		{
836			let db_res = open_database::<Block>(&source, DatabaseType::Full, true);
837			assert!(db_res.is_ok(), "Existing parity database should be reopened");
838		}
839
840		// it should fail to open existing pairtydb database
841		{
842			let db_res = open_database::<Block>(
843				&DatabaseSource::RocksDb { path: rocksdb_path.clone(), cache_size: 128 },
844				DatabaseType::Full,
845				true,
846			);
847			assert!(db_res.is_ok(), "New rocksdb database should be created");
848		}
849
850		// it should reopen existing auto (pairtydb) database
851		{
852			let db_res = open_database::<Block>(
853				&DatabaseSource::Auto { paritydb_path, rocksdb_path, cache_size: 128 },
854				DatabaseType::Full,
855				true,
856			);
857			assert!(db_res.is_ok(), "Existing parity database should be reopened");
858		}
859	}
860}