use futures::channel::oneshot;
use parking_lot::Mutex;
use sc_client_api::Backend;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use crate::chain_head::{
subscription::SubscriptionManagementError, FollowEventReceiver, FollowEventSender,
};
type NotifyOnDrop = tokio::sync::mpsc::Receiver<()>;
type SharedOperations = Arc<Mutex<HashMap<String, (NotifyOnDrop, StopHandle)>>>;
const BUF_CAP_PER_SUBSCRIPTION: usize = 16;
#[derive(Debug, Clone, PartialEq)]
enum BlockStateMachine {
Registered,
FullyRegistered,
Unpinned,
FullyUnpinned,
}
impl BlockStateMachine {
fn new() -> Self {
BlockStateMachine::Registered
}
fn advance_register(&mut self) {
match self {
BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
_ => (),
}
}
fn advance_unpin(&mut self) {
match self {
BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
_ => (),
}
}
fn was_unpinned(&self) -> bool {
match self {
BlockStateMachine::Unpinned => true,
BlockStateMachine::FullyUnpinned => true,
_ => false,
}
}
}
struct LimitOperations {
semaphore: Arc<tokio::sync::Semaphore>,
}
impl LimitOperations {
fn new(max_operations: usize) -> Self {
LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
}
fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
if num_ops == 0 {
return None
}
let permits = Arc::clone(&self.semaphore)
.try_acquire_many_owned(num_ops.try_into().ok()?)
.ok()?;
Some(permits)
}
}
type PermitOperations = tokio::sync::OwnedSemaphorePermit;
#[derive(Clone)]
pub struct StopHandle(tokio::sync::mpsc::Sender<()>);
impl StopHandle {
pub async fn stopped(&self) {
self.0.closed().await;
}
pub fn is_stopped(&self) -> bool {
self.0.is_closed()
}
}
#[derive(Clone)]
pub struct OperationState {
stop: StopHandle,
operations: SharedOperations,
operation_id: String,
}
impl OperationState {
pub fn stop(&mut self) {
if !self.stop.is_stopped() {
self.operations.lock().remove(&self.operation_id);
}
}
}
pub struct RegisteredOperation {
stop_handle: StopHandle,
operations: SharedOperations,
operation_id: String,
_permit: PermitOperations,
}
impl RegisteredOperation {
pub fn stop_handle(&self) -> &StopHandle {
&self.stop_handle
}
pub fn operation_id(&self) -> String {
self.operation_id.clone()
}
}
impl Drop for RegisteredOperation {
fn drop(&mut self) {
self.operations.lock().remove(&self.operation_id);
}
}
struct Operations {
next_operation_id: usize,
limits: LimitOperations,
operations: SharedOperations,
}
impl Operations {
fn new(max_operations: usize) -> Self {
Operations {
next_operation_id: 0,
limits: LimitOperations::new(max_operations),
operations: Default::default(),
}
}
pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
let permit = self.limits.reserve_at_most(to_reserve)?;
let operation_id = self.next_operation_id();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let stop_handle = StopHandle(tx);
let operations = self.operations.clone();
operations.lock().insert(operation_id.clone(), (rx, stop_handle.clone()));
Some(RegisteredOperation { stop_handle, operation_id, operations, _permit: permit })
}
pub fn get_operation(&self, id: &str) -> Option<OperationState> {
let stop = self.operations.lock().get(id).map(|(_, stop)| stop.clone())?;
Some(OperationState {
stop,
operations: self.operations.clone(),
operation_id: id.to_string(),
})
}
fn next_operation_id(&mut self) -> String {
let op_id = self.next_operation_id;
self.next_operation_id += 1;
op_id.to_string()
}
}
struct BlockState {
state_machine: BlockStateMachine,
timestamp: Instant,
}
struct SubscriptionState<Block: BlockT> {
with_runtime: bool,
tx_stop: Option<oneshot::Sender<()>>,
response_sender: FollowEventSender<Block::Hash>,
operations: Operations,
blocks: HashMap<Block::Hash, BlockState>,
}
impl<Block: BlockT> SubscriptionState<Block> {
fn stop(&mut self) {
if let Some(tx_stop) = self.tx_stop.take() {
let _ = tx_stop.send(());
}
}
fn register_block(&mut self, hash: Block::Hash) -> bool {
match self.blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
let block_state = occupied.get_mut();
block_state.state_machine.advance_register();
if block_state.state_machine == BlockStateMachine::FullyUnpinned {
occupied.remove();
}
false
},
Entry::Vacant(vacant) => {
vacant.insert(BlockState {
state_machine: BlockStateMachine::new(),
timestamp: Instant::now(),
});
true
},
}
}
fn unregister_block(&mut self, hash: Block::Hash) -> bool {
match self.blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
let block_state = occupied.get_mut();
if block_state.state_machine.was_unpinned() {
return false
}
block_state.state_machine.advance_unpin();
if block_state.state_machine == BlockStateMachine::FullyUnpinned {
occupied.remove();
}
true
},
Entry::Vacant(_) => false,
}
}
fn contains_block(&self, hash: Block::Hash) -> bool {
let Some(state) = self.blocks.get(&hash) else {
return false
};
!state.state_machine.was_unpinned()
}
fn find_oldest_block_timestamp(&self) -> Instant {
let mut timestamp = Instant::now();
for (_, state) in self.blocks.iter() {
timestamp = std::cmp::min(timestamp, state.timestamp);
}
timestamp
}
fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
self.operations.register_operation(to_reserve)
}
pub fn get_operation(&self, id: &str) -> Option<OperationState> {
self.operations.get_operation(id)
}
}
pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
hash: Block::Hash,
with_runtime: bool,
response_sender: FollowEventSender<Block::Hash>,
operation: RegisteredOperation,
backend: Arc<BE>,
}
impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlockGuard hash {:?} with_runtime {:?}", self.hash, self.with_runtime)
}
}
impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
fn new(
hash: Block::Hash,
with_runtime: bool,
response_sender: FollowEventSender<Block::Hash>,
operation: RegisteredOperation,
backend: Arc<BE>,
) -> Result<Self, SubscriptionManagementError> {
backend
.pin_block(hash)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
Ok(Self { hash, with_runtime, response_sender, operation, backend })
}
pub fn has_runtime(&self) -> bool {
self.with_runtime
}
pub fn response_sender(&self) -> FollowEventSender<Block::Hash> {
self.response_sender.clone()
}
pub fn operation(&mut self) -> &mut RegisteredOperation {
&mut self.operation
}
}
impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
fn drop(&mut self) {
self.backend.unpin_block(self.hash);
}
}
pub struct InsertedSubscriptionData<Block: BlockT> {
pub rx_stop: oneshot::Receiver<()>,
pub response_receiver: FollowEventReceiver<Block::Hash>,
}
pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
global_blocks: HashMap<Block::Hash, usize>,
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
subs: HashMap<String, SubscriptionState<Block>>,
backend: Arc<BE>,
}
impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
backend: Arc<BE>,
) -> Self {
SubscriptionsInner {
global_blocks: Default::default(),
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
subs: Default::default(),
backend,
}
}
pub fn insert_subscription(
&mut self,
sub_id: String,
with_runtime: bool,
) -> Option<InsertedSubscriptionData<Block>> {
if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let (response_sender, response_receiver) =
futures::channel::mpsc::channel(BUF_CAP_PER_SUBSCRIPTION);
let state = SubscriptionState::<Block> {
with_runtime,
tx_stop: Some(tx_stop),
response_sender,
blocks: Default::default(),
operations: Operations::new(self.max_ongoing_operations),
};
entry.insert(state);
Some(InsertedSubscriptionData { rx_stop, response_receiver })
} else {
None
}
}
pub fn remove_subscription(&mut self, sub_id: &str) {
let Some(mut sub) = self.subs.remove(sub_id) else { return };
sub.stop();
for (hash, state) in sub.blocks.iter() {
if !state.state_machine.was_unpinned() {
self.global_unregister_block(*hash);
}
}
}
pub fn stop_all_subscriptions(&mut self) {
let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
for sub_id in to_remove {
self.remove_subscription(&sub_id);
}
}
fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
if self.global_blocks.len() < self.global_max_pinned_blocks {
return false
}
let now = Instant::now();
let to_remove: Vec<_> = self
.subs
.iter_mut()
.filter_map(|(sub_id, sub)| {
let sub_time = sub.find_oldest_block_timestamp();
let should_remove = match now.checked_duration_since(sub_time) {
Some(duration) => duration > self.local_max_pin_duration,
None => true,
};
should_remove.then(|| sub_id.clone())
})
.collect();
let mut is_terminated = false;
for sub_id in to_remove {
if sub_id == request_sub_id {
is_terminated = true;
}
self.remove_subscription(&sub_id);
}
if self.global_blocks.len() < self.global_max_pinned_blocks {
return is_terminated
}
let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
for sub_id in to_remove {
if sub_id == request_sub_id {
is_terminated = true;
}
self.remove_subscription(&sub_id);
}
return is_terminated
}
pub fn pin_block(
&mut self,
sub_id: &str,
hash: Block::Hash,
) -> Result<bool, SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
if !sub.register_block(hash) {
return Ok(false)
}
if !self.global_blocks.contains_key(&hash) {
if self.ensure_block_space(sub_id) {
return Err(SubscriptionManagementError::ExceededLimits)
}
}
self.global_register_block(hash)?;
Ok(true)
}
fn global_register_block(
&mut self,
hash: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
match self.global_blocks.entry(hash) {
Entry::Occupied(mut occupied) => {
*occupied.get_mut() += 1;
},
Entry::Vacant(vacant) => {
self.backend
.pin_block(hash)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
vacant.insert(1);
},
};
Ok(())
}
fn global_unregister_block(&mut self, hash: Block::Hash) {
if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
let counter = occupied.get_mut();
if *counter == 1 {
self.backend.unpin_block(hash);
occupied.remove();
} else {
*counter -= 1;
}
}
}
fn ensure_hash_uniqueness(
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
let mut set = HashSet::new();
hashes.into_iter().try_for_each(|hash| {
if !set.insert(hash) {
Err(SubscriptionManagementError::DuplicateHashes)
} else {
Ok(())
}
})
}
pub fn unpin_blocks(
&mut self,
sub_id: &str,
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
Self::ensure_hash_uniqueness(hashes.clone())?;
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
for hash in hashes.clone() {
if !sub.contains_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
}
for hash in hashes.clone() {
sub.unregister_block(hash);
}
for hash in hashes {
self.global_unregister_block(hash);
}
Ok(())
}
pub fn lock_block(
&mut self,
sub_id: &str,
hash: Block::Hash,
to_reserve: usize,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
if !sub.contains_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
}
let Some(operation) = sub.register_operation(to_reserve) else {
return Err(SubscriptionManagementError::ExceededLimits)
};
BlockGuard::new(
hash,
sub.with_runtime,
sub.response_sender.clone(),
operation,
self.backend.clone(),
)
}
pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option<OperationState> {
let state = self.subs.get(sub_id)?;
state.get_operation(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use jsonrpsee::ConnectionId;
use sc_block_builder::BlockBuilderBuilder;
use sc_service::client::new_with_backend;
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, H256};
use substrate_test_runtime_client::{
prelude::*,
runtime::{Block, RuntimeApi},
Client, ClientBlockImportExt, GenesisInit,
};
const MAX_OPERATIONS_PER_SUB: usize = 16;
fn init_backend() -> (
Arc<sc_client_api::in_mem::Backend<Block>>,
Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
) {
let backend = Arc::new(sc_client_api::in_mem::Backend::new());
let executor = substrate_test_runtime_client::WasmExecutor::default();
let client_config = sc_service::ClientConfig::default();
let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
!client_config.no_genesis,
backend.clone(),
executor.clone(),
)
.unwrap();
let client = Arc::new(
new_with_backend::<_, _, Block, _, RuntimeApi>(
backend.clone(),
executor,
genesis_block_builder,
Box::new(TaskExecutor::new()),
None,
None,
client_config,
)
.unwrap(),
);
(backend, client)
}
fn produce_blocks(
client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
num_blocks: usize,
) -> Vec<<Block as BlockT>::Hash> {
let mut blocks = Vec::with_capacity(num_blocks);
let mut parent_hash = client.chain_info().genesis_hash;
for i in 0..num_blocks {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(i as u64)
.build()
.unwrap()
.build()
.unwrap()
.block;
parent_hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
blocks.push(block.header.hash());
}
blocks
}
#[test]
fn block_state_machine_register_unpin() {
let mut state = BlockStateMachine::new();
assert_eq!(state, BlockStateMachine::Registered);
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyRegistered);
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyRegistered);
assert!(!state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
}
#[test]
fn block_state_machine_unpin_register() {
let mut state = BlockStateMachine::new();
assert_eq!(state, BlockStateMachine::Registered);
assert!(!state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::Unpinned);
assert!(state.was_unpinned());
state.advance_unpin();
assert_eq!(state, BlockStateMachine::Unpinned);
assert!(state.was_unpinned());
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
state.advance_register();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
state.advance_unpin();
assert_eq!(state, BlockStateMachine::FullyUnpinned);
assert!(state.was_unpinned());
}
#[test]
fn sub_state_register_twice() {
let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
let mut sub_state = SubscriptionState::<Block> {
with_runtime: false,
tx_stop: None,
response_sender,
operations: Operations::new(MAX_OPERATIONS_PER_SUB),
blocks: Default::default(),
};
let hash = H256::random();
assert_eq!(sub_state.register_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
assert_eq!(sub_state.register_block(hash), false);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
assert_eq!(sub_state.unregister_block(hash), true);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
}
#[test]
fn sub_state_register_unregister() {
let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
let mut sub_state = SubscriptionState::<Block> {
with_runtime: false,
tx_stop: None,
response_sender,
blocks: Default::default(),
operations: Operations::new(MAX_OPERATIONS_PER_SUB),
};
let hash = H256::random();
assert_eq!(sub_state.unregister_block(hash), false);
assert_eq!(sub_state.register_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
assert_eq!(sub_state.unregister_block(hash), true);
let block_state = sub_state.blocks.get(&hash).unwrap();
assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
assert_eq!(sub_state.register_block(hash), false);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
assert_eq!(sub_state.unregister_block(hash), false);
let block_state = sub_state.blocks.get(&hash);
assert!(block_state.is_none());
}
#[test]
fn unpin_duplicate_hashes() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
assert_eq!(subs.global_blocks.get(&hash_1), None);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
}
#[test]
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let hash = H256::random();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
assert!(subs.insert_subscription(id.clone(), true).is_none());
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
subs.remove_subscription(&id);
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
}
#[test]
fn subscription_check_block() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 1);
let hash = hashes[0];
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
let block = subs.lock_block(&id, hash, 1).unwrap();
assert_eq!(block.has_runtime(), true);
let invalid_id = "abc-invalid".to_string();
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
subs.unpin_blocks(&id, vec![hash]).unwrap();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
#[test]
fn subscription_ref_count() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 1);
let hash = hashes[0];
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let _stop = subs.insert_subscription(id.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
let id_second = "abcd".to_string();
let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
assert!(subs.global_blocks.get(&hash).is_none());
}
#[test]
fn subscription_remove_subscription() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
subs.remove_subscription(&id_1);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_3).is_none());
subs.remove_subscription(&id_2);
assert!(subs.global_blocks.get(&hash_2).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_limits() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
let err = subs.pin_block(&id_1, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_limits_with_duration() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
std::thread::sleep(std::time::Duration::from_secs(5));
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
let err = subs.pin_block(&id_1, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 1);
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
let err = subs.pin_block(&id_2, hash_3).unwrap_err();
assert_eq!(err, SubscriptionManagementError::ExceededLimits);
assert!(subs.global_blocks.get(&hash_1).is_none());
assert!(subs.global_blocks.get(&hash_2).is_none());
assert!(subs.global_blocks.get(&hash_3).is_none());
assert_eq!(subs.global_blocks.len(), 0);
}
#[test]
fn subscription_check_stop_event() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();
let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_none());
let sub = subs.subs.get_mut(&id).unwrap();
sub.stop();
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
#[test]
fn ongoing_operations() {
let ops = LimitOperations::new(2);
let permit_one = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_one.num_permits(), 1);
let permit_two = ops.reserve_at_most(2).unwrap();
assert_eq!(permit_two.num_permits(), 1);
let permit = ops.reserve_at_most(1);
assert!(permit.is_none());
drop(permit_two);
let permit_three = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_three.num_permits(), 1);
}
#[test]
fn stop_all_subscriptions() {
let (backend, client) = init_backend();
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
assert_eq!(subs.global_blocks.len(), 3);
subs.stop_all_subscriptions();
assert!(subs.global_blocks.is_empty());
}
#[test]
fn reserved_subscription_cleans_resources() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
backend,
)));
let rpc_connections = crate::common::connections::RpcConnections::new(2);
let subscription_management =
crate::chain_head::subscription::SubscriptionManagement::_from_inner(
subs.clone(),
rpc_connections.clone(),
);
let reserved_sub_first =
subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
let mut reserved_sub_second =
subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
assert_eq!(subs.read().subs.len(), 0);
assert!(subscription_management.reserve_subscription(ConnectionId(1)).is_none());
drop(reserved_sub_first);
let mut reserved_sub_first =
subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
let _sub_data_first =
reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
let _sub_data_second =
reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
assert_eq!(subs.read().subs.len(), 2);
drop(reserved_sub_first);
assert_eq!(subs.read().subs.len(), 1);
let reserved_sub_first =
subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
drop(reserved_sub_first);
drop(reserved_sub_second);
assert_eq!(subs.read().subs.len(), 0);
}
}