sc_client_api/
notifications.rs1use std::{
22	collections::{HashMap, HashSet},
23	pin::Pin,
24	sync::Arc,
25	task::Poll,
26};
27
28use futures::Stream;
29
30use prometheus_endpoint::Registry as PrometheusRegistry;
31
32use sc_utils::pubsub::{Hub, Receiver};
33use sp_core::storage::{StorageData, StorageKey};
34use sp_runtime::traits::Block as BlockT;
35
36mod registry;
37
38use registry::Registry;
39
40#[cfg(test)]
41mod tests;
42
43#[derive(Debug)]
45pub struct StorageNotification<Hash> {
46	pub block: Hash,
48
49	pub changes: StorageChangeSet,
51}
52
53#[derive(Debug)]
55pub struct StorageChangeSet {
56	changes: Arc<[(StorageKey, Option<StorageData>)]>,
57	child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
58	filter: Keys,
59	child_filters: ChildKeys,
60}
61
62#[derive(Debug)]
64pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
65
66pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
68
69type Keys = Option<HashSet<StorageKey>>;
70type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
71
72impl StorageChangeSet {
73	pub fn iter(
75		&self,
76	) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
77		let top = self
78			.changes
79			.iter()
80			.filter(move |&(key, _)| match self.filter {
81				Some(ref filter) => filter.contains(key),
82				None => true,
83			})
84			.map(move |(k, v)| (None, k, v.as_ref()));
85		let children = self
86			.child_changes
87			.iter()
88			.filter_map(move |(sk, changes)| {
89				self.child_filters.as_ref().and_then(|cf| {
90					cf.get(sk).map(|filter| {
91						changes
92							.iter()
93							.filter(move |&(key, _)| match filter {
94								Some(ref filter) => filter.contains(key),
95								None => true,
96							})
97							.map(move |(k, v)| (Some(sk), k, v.as_ref()))
98					})
99				})
100			})
101			.flatten();
102		top.chain(children)
103	}
104}
105
106impl<H> Stream for StorageEventStream<H> {
107	type Item = StorageNotification<H>;
108	fn poll_next(
109		self: Pin<&mut Self>,
110		cx: &mut std::task::Context<'_>,
111	) -> Poll<Option<Self::Item>> {
112		Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
113	}
114}
115
116impl<Block: BlockT> StorageNotifications<Block> {
117	pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
120		let registry = Registry::new(prometheus_registry);
121		let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
122
123		StorageNotifications(hub)
124	}
125
126	pub fn trigger(
131		&self,
132		hash: &Block::Hash,
133		changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
134		child_changeset: impl Iterator<
135			Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
136		>,
137	) {
138		self.0.send((hash, changeset, child_changeset))
139	}
140
141	pub fn listen(
143		&self,
144		filter_keys: Option<&[StorageKey]>,
145		filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
146	) -> StorageEventStream<Block::Hash> {
147		let receiver = self
148			.0
149			.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
150
151		StorageEventStream(receiver)
152	}
153}