referrerpolicy=no-referrer-when-downgrade

sc_client_api/
notifications.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Storage notifications
20
21use std::{
22	collections::{HashMap, HashSet},
23	pin::Pin,
24	sync::Arc,
25	task::Poll,
26};
27
28use futures::Stream;
29
30use prometheus_endpoint::Registry as PrometheusRegistry;
31
32use sc_utils::pubsub::{Hub, Receiver};
33use sp_core::storage::{StorageData, StorageKey};
34use sp_runtime::traits::Block as BlockT;
35
36mod registry;
37
38use registry::Registry;
39
40#[cfg(test)]
41mod tests;
42
43/// A type of a message delivered to the subscribers
44#[derive(Debug)]
45pub struct StorageNotification<Hash> {
46	/// The hash of the block
47	pub block: Hash,
48
49	/// The set of changes
50	pub changes: StorageChangeSet,
51}
52
53/// Storage change set
54#[derive(Debug)]
55pub struct StorageChangeSet {
56	changes: Arc<[(StorageKey, Option<StorageData>)]>,
57	child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
58	filter: Keys,
59	child_filters: ChildKeys,
60}
61
62/// Manages storage listeners.
63#[derive(Debug)]
64pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
65
66/// Type that implements `futures::Stream` of storage change events.
67pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
68
69type Keys = Option<HashSet<StorageKey>>;
70type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
71
72impl StorageChangeSet {
73	/// Convert the change set into iterator over storage items.
74	pub fn iter(
75		&self,
76	) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
77		let top = self
78			.changes
79			.iter()
80			.filter(move |&(key, _)| match self.filter {
81				Some(ref filter) => filter.contains(key),
82				None => true,
83			})
84			.map(move |(k, v)| (None, k, v.as_ref()));
85		let children = self
86			.child_changes
87			.iter()
88			.filter_map(move |(sk, changes)| {
89				self.child_filters.as_ref().and_then(|cf| {
90					cf.get(sk).map(|filter| {
91						changes
92							.iter()
93							.filter(move |&(key, _)| match filter {
94								Some(ref filter) => filter.contains(key),
95								None => true,
96							})
97							.map(move |(k, v)| (Some(sk), k, v.as_ref()))
98					})
99				})
100			})
101			.flatten();
102		top.chain(children)
103	}
104}
105
106impl<H> Stream for StorageEventStream<H> {
107	type Item = StorageNotification<H>;
108	fn poll_next(
109		self: Pin<&mut Self>,
110		cx: &mut std::task::Context<'_>,
111	) -> Poll<Option<Self::Item>> {
112		Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
113	}
114}
115
116impl<Block: BlockT> StorageNotifications<Block> {
117	/// Initialize a new StorageNotifications
118	/// optionally pass a prometheus registry to send subscriber metrics to
119	pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
120		let registry = Registry::new(prometheus_registry);
121		let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
122
123		StorageNotifications(hub)
124	}
125
126	/// Trigger notification to all listeners.
127	///
128	/// Note the changes are going to be filtered by listener's filter key.
129	/// In fact no event might be sent if clients are not interested in the changes.
130	pub fn trigger(
131		&self,
132		hash: &Block::Hash,
133		changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
134		child_changeset: impl Iterator<
135			Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
136		>,
137	) {
138		self.0.send((hash, changeset, child_changeset))
139	}
140
141	/// Start listening for particular storage keys.
142	pub fn listen(
143		&self,
144		filter_keys: Option<&[StorageKey]>,
145		filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
146	) -> StorageEventStream<Block::Hash> {
147		let receiver = self
148			.0
149			.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
150
151		StorageEventStream(receiver)
152	}
153}