use std::{collections::HashMap, sync::Arc};
use crate::{columns, Database, DbHash, Transaction};
use log::error;
use parking_lot::Mutex;
#[derive(Clone)]
pub struct LocalStorage {
	db: Arc<dyn Database<DbHash>>,
	locks: Arc<Mutex<HashMap<Vec<u8>, Arc<Mutex<()>>>>>,
}
impl std::fmt::Debug for LocalStorage {
	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
		fmt.debug_struct("LocalStorage").finish()
	}
}
impl LocalStorage {
	#[cfg(any(feature = "test-helpers", test))]
	pub fn new_test() -> Self {
		let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
		let db = sp_database::as_database(db);
		Self::new(db as _)
	}
	pub fn new(db: Arc<dyn Database<DbHash>>) -> Self {
		Self { db, locks: Default::default() }
	}
}
impl sp_core::offchain::OffchainStorage for LocalStorage {
	fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
		let mut tx = Transaction::new();
		tx.set(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key), value);
		if let Err(err) = self.db.commit(tx) {
			error!("Error setting on local storage: {}", err)
		}
	}
	fn remove(&mut self, prefix: &[u8], key: &[u8]) {
		let mut tx = Transaction::new();
		tx.remove(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key));
		if let Err(err) = self.db.commit(tx) {
			error!("Error removing on local storage: {}", err)
		}
	}
	fn get(&self, prefix: &[u8], key: &[u8]) -> Option<Vec<u8>> {
		self.db.get(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key))
	}
	fn compare_and_set(
		&mut self,
		prefix: &[u8],
		item_key: &[u8],
		old_value: Option<&[u8]>,
		new_value: &[u8],
	) -> bool {
		let key = concatenate_prefix_and_key(prefix, item_key);
		let key_lock = {
			let mut locks = self.locks.lock();
			locks.entry(key.clone()).or_default().clone()
		};
		let is_set;
		{
			let _key_guard = key_lock.lock();
			let val = self.db.get(columns::OFFCHAIN, &key);
			is_set = val.as_deref() == old_value;
			if is_set {
				self.set(prefix, item_key, new_value)
			}
		}
		let mut locks = self.locks.lock();
		{
			drop(key_lock);
			let key_lock = locks.get_mut(&key);
			if key_lock.and_then(Arc::get_mut).is_some() {
				locks.remove(&key);
			}
		}
		is_set
	}
}
pub(crate) fn concatenate_prefix_and_key(prefix: &[u8], key: &[u8]) -> Vec<u8> {
	prefix.iter().chain(key.iter()).cloned().collect()
}
#[cfg(test)]
mod tests {
	use super::*;
	use sp_core::offchain::OffchainStorage;
	#[test]
	fn should_compare_and_set_and_clear_the_locks_map() {
		let mut storage = LocalStorage::new_test();
		let prefix = b"prefix";
		let key = b"key";
		let value = b"value";
		storage.set(prefix, key, value);
		assert_eq!(storage.get(prefix, key), Some(value.to_vec()));
		assert_eq!(storage.compare_and_set(prefix, key, Some(value), b"asd"), true);
		assert_eq!(storage.get(prefix, key), Some(b"asd".to_vec()));
		assert!(storage.locks.lock().is_empty(), "Locks map should be empty!");
	}
	#[test]
	fn should_compare_and_set_on_empty_field() {
		let mut storage = LocalStorage::new_test();
		let prefix = b"prefix";
		let key = b"key";
		assert_eq!(storage.compare_and_set(prefix, key, None, b"asd"), true);
		assert_eq!(storage.get(prefix, key), Some(b"asd".to_vec()));
		assert!(storage.locks.lock().is_empty(), "Locks map should be empty!");
	}
}