sc_client_db/
parity_db.rs1use crate::{
19 columns,
20 utils::{DatabaseType, NUM_COLUMNS},
21};
22use sp_database::{error::DatabaseError, Change, ColumnId, Database, Transaction};
24
25struct DbAdapter(parity_db::Db);
26
27fn handle_err<T>(result: parity_db::Result<T>) -> T {
28 match result {
29 Ok(r) => r,
30 Err(e) => {
31 panic!("Critical database error: {:?}", e);
32 },
33 }
34}
35
36pub fn open<H: Clone + AsRef<[u8]>>(
38 path: &std::path::Path,
39 db_type: DatabaseType,
40 create: bool,
41 upgrade: bool,
42) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> {
43 let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8);
44
45 match db_type {
46 DatabaseType::Full => {
47 let compressed = [
48 columns::STATE,
49 columns::HEADER,
50 columns::BODY,
51 columns::BODY_INDEX,
52 columns::TRANSACTION,
53 columns::JUSTIFICATIONS,
54 ];
55
56 for i in compressed {
57 let column = &mut config.columns[i as usize];
58 column.compression = parity_db::CompressionType::Lz4;
59 }
60
61 let state_col = &mut config.columns[columns::STATE as usize];
62 state_col.ref_counted = true;
63 state_col.preimage = true;
64 state_col.uniform = true;
65
66 let tx_col = &mut config.columns[columns::TRANSACTION as usize];
67 tx_col.ref_counted = true;
68 tx_col.preimage = true;
69 tx_col.uniform = true;
70 },
71 }
72
73 if upgrade {
74 log::info!("Upgrading database metadata.");
75 if let Some(meta) = parity_db::Options::load_metadata(path)? {
76 config.write_metadata_with_version(path, &meta.salt, Some(meta.version))?;
77 }
78 }
79
80 let db = if create {
81 parity_db::Db::open_or_create(&config)?
82 } else {
83 parity_db::Db::open(&config)?
84 };
85
86 Ok(std::sync::Arc::new(DbAdapter(db)))
87}
88
89fn ref_counted_column(col: u32) -> bool {
90 col == columns::TRANSACTION || col == columns::STATE
91}
92
93impl<H: Clone + AsRef<[u8]>> Database<H> for DbAdapter {
94 fn commit(&self, transaction: Transaction<H>) -> Result<(), DatabaseError> {
95 let mut not_ref_counted_column = Vec::new();
96 let result = self.0.commit(transaction.0.into_iter().filter_map(|change| {
97 Some(match change {
98 Change::Set(col, key, value) => (col as u8, key, Some(value)),
99 Change::Remove(col, key) => (col as u8, key, None),
100 Change::Store(col, key, value) =>
101 if ref_counted_column(col) {
102 (col as u8, key.as_ref().to_vec(), Some(value))
103 } else {
104 if !not_ref_counted_column.contains(&col) {
105 not_ref_counted_column.push(col);
106 }
107 return None
108 },
109 Change::Reference(col, key) => {
110 if ref_counted_column(col) {
111 let value = <Self as Database<H>>::get(self, col, key.as_ref());
113 (col as u8, key.as_ref().to_vec(), value)
114 } else {
115 if !not_ref_counted_column.contains(&col) {
116 not_ref_counted_column.push(col);
117 }
118 return None
119 }
120 },
121 Change::Release(col, key) =>
122 if ref_counted_column(col) {
123 (col as u8, key.as_ref().to_vec(), None)
124 } else {
125 if !not_ref_counted_column.contains(&col) {
126 not_ref_counted_column.push(col);
127 }
128 return None
129 },
130 })
131 }));
132
133 if not_ref_counted_column.len() > 0 {
134 return Err(DatabaseError(Box::new(parity_db::Error::InvalidInput(format!(
135 "Ref counted operation on non ref counted columns {:?}",
136 not_ref_counted_column
137 )))))
138 }
139
140 result.map_err(|e| DatabaseError(Box::new(e)))
141 }
142
143 fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
144 handle_err(self.0.get(col as u8, key))
145 }
146
147 fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
148 handle_err(self.0.get_size(col as u8, key)).is_some()
149 }
150
151 fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> {
152 handle_err(self.0.get_size(col as u8, key)).map(|s| s as usize)
153 }
154
155 fn supports_ref_counting(&self) -> bool {
156 true
157 }
158
159 fn sanitize_key(&self, key: &mut Vec<u8>) {
160 let _prefix = key.drain(0..key.len() - crate::DB_HASH_LEN);
161 }
162}