sc_client_api/notifications/
registry.rs1use super::*;
20
21use sp_core::hexdisplay::HexDisplay;
22
23use fnv::{FnvHashMap, FnvHashSet};
24use prometheus_endpoint::{register, CounterVec, Opts, U64};
25
26use sc_utils::{
27 id_sequence::SeqID as SubscriberId,
28 pubsub::{Dispatch, Subscribe, Unsubscribe},
29};
30
31type SubscribersGauge = CounterVec<U64>;
32
33pub(super) struct SubscribeOp<'a> {
37 pub filter_keys: Option<&'a [StorageKey]>,
38 pub filter_child_keys: Option<&'a [(StorageKey, Option<Vec<StorageKey>>)]>,
39}
40
41#[derive(Debug, Default)]
42pub(super) struct Registry {
43 pub(super) metrics: Option<SubscribersGauge>,
44 pub(super) wildcard_listeners: FnvHashSet<SubscriberId>,
45 pub(super) listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
46 pub(super) child_listeners: HashMap<
47 StorageKey,
48 (HashMap<StorageKey, FnvHashSet<SubscriberId>>, FnvHashSet<SubscriberId>),
49 >,
50 pub(super) sinks: FnvHashMap<SubscriberId, SubscriberSink>,
51}
52
53#[derive(Debug)]
54pub(super) struct SubscriberSink {
55 subs_id: SubscriberId,
56 keys: Keys,
57 child_keys: ChildKeys,
58 was_triggered: bool,
59}
60
61impl Drop for SubscriberSink {
62 fn drop(&mut self) {
63 if !self.was_triggered {
64 log::trace!(
65 target: "storage_notifications",
66 "Listener was never triggered: id={}, keys={:?}, child_keys={:?}",
67 self.subs_id,
68 PrintKeys(&self.keys),
69 PrintChildKeys(&self.child_keys),
70 );
71 }
72 }
73}
74
75impl SubscriberSink {
76 fn new(subs_id: SubscriberId, keys: Keys, child_keys: ChildKeys) -> Self {
77 Self { subs_id, keys, child_keys, was_triggered: false }
78 }
79}
80
81impl Registry {
82 pub(super) fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
83 let metrics = prometheus_registry.and_then(|r| {
84 CounterVec::new(
85 Opts::new(
86 "substrate_storage_notification_subscribers",
87 "Number of subscribers in storage notification sytem",
88 ),
89 &["action"], )
91 .and_then(|g| register(g, &r))
92 .ok()
93 });
94
95 Registry { metrics, ..Default::default() }
96 }
97}
98
99impl Unsubscribe for Registry {
100 fn unsubscribe(&mut self, subs_id: SubscriberId) {
101 self.remove_subscriber(subs_id);
102 }
103}
104
105impl<'a> Subscribe<SubscribeOp<'a>> for Registry {
106 fn subscribe(&mut self, subs_op: SubscribeOp<'a>, subs_id: SubscriberId) {
107 let SubscribeOp { filter_keys, filter_child_keys } = subs_op;
108
109 let keys = Self::listen_from(
110 subs_id,
111 filter_keys.as_ref(),
112 &mut self.listeners,
113 &mut self.wildcard_listeners,
114 );
115
116 let child_keys = filter_child_keys.map(|filter_child_keys| {
117 filter_child_keys
118 .iter()
119 .map(|(c_key, o_keys)| {
120 let (c_listeners, c_wildcards) =
121 self.child_listeners.entry(c_key.clone()).or_default();
122
123 (
124 c_key.clone(),
125 Self::listen_from(
126 subs_id,
127 o_keys.as_ref(),
128 &mut *c_listeners,
129 &mut *c_wildcards,
130 ),
131 )
132 })
133 .collect()
134 });
135
136 if let Some(m) = self.metrics.as_ref() {
137 m.with_label_values(&["added"]).inc();
138 }
139
140 if self
141 .sinks
142 .insert(subs_id, SubscriberSink::new(subs_id, keys, child_keys))
143 .is_some()
144 {
145 log::warn!("The `subscribe`-method has been passed a non-unique subs_id (in `sc-client-api::notifications`)");
146 }
147 }
148}
149
150impl<'a, Hash, CS, CCS, CCSI> Dispatch<(&'a Hash, CS, CCS)> for Registry
151where
152 Hash: Clone,
153 CS: Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
154 CCS: Iterator<Item = (Vec<u8>, CCSI)>,
155 CCSI: Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
156{
157 type Item = StorageNotification<Hash>;
158 type Ret = ();
159
160 fn dispatch<F>(&mut self, message: (&'a Hash, CS, CCS), dispatch: F) -> Self::Ret
161 where
162 F: FnMut(&SubscriberId, Self::Item),
163 {
164 let (hash, changeset, child_changeset) = message;
165 self.trigger(hash, changeset, child_changeset, dispatch);
166 }
167}
168
169impl Registry {
170 pub(super) fn trigger<Hash, F>(
171 &mut self,
172 hash: &Hash,
173 changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
174 child_changeset: impl Iterator<
175 Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
176 >,
177 mut dispatch: F,
178 ) where
179 Hash: Clone,
180 F: FnMut(&SubscriberId, StorageNotification<Hash>),
181 {
182 let has_wildcard = !self.wildcard_listeners.is_empty();
183
184 if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() {
186 return
187 }
188
189 let mut subscribers = self.wildcard_listeners.clone();
190 let mut changes = Vec::new();
191 let mut child_changes = Vec::new();
192
193 for (k, v) in changeset {
195 let k = StorageKey(k);
196 let listeners = self.listeners.get(&k);
197
198 if let Some(listeners) = listeners {
199 subscribers.extend(listeners.iter());
200 }
201
202 if has_wildcard || listeners.is_some() {
203 changes.push((k, v.map(StorageData)));
204 }
205 }
206 for (sk, changeset) in child_changeset {
207 let sk = StorageKey(sk);
208 if let Some((cl, cw)) = self.child_listeners.get(&sk) {
209 let mut changes = Vec::new();
210 for (k, v) in changeset {
211 let k = StorageKey(k);
212 let listeners = cl.get(&k);
213
214 if let Some(listeners) = listeners {
215 subscribers.extend(listeners.iter());
216 }
217
218 subscribers.extend(cw.iter());
219
220 if !cw.is_empty() || listeners.is_some() {
221 changes.push((k, v.map(StorageData)));
222 }
223 }
224 if !changes.is_empty() {
225 child_changes.push((sk, changes));
226 }
227 }
228 }
229
230 if changes.is_empty() && child_changes.is_empty() {
232 return
233 }
234
235 let changes = Arc::<[_]>::from(changes);
236 let child_changes = Arc::<[_]>::from(child_changes);
237
238 self.sinks.iter_mut().for_each(|(subs_id, sink)| {
240 if subscribers.contains(subs_id) {
241 sink.was_triggered = true;
242
243 let storage_change_set = StorageChangeSet {
244 changes: changes.clone(),
245 child_changes: child_changes.clone(),
246 filter: sink.keys.clone(),
247 child_filters: sink.child_keys.clone(),
248 };
249
250 let notification =
251 StorageNotification { block: hash.clone(), changes: storage_change_set };
252
253 dispatch(subs_id, notification);
254 }
255 });
256 }
257}
258
259impl Registry {
260 fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> {
261 let sink = self.sinks.remove(&subscriber)?;
262
263 Self::remove_subscriber_from(
264 subscriber,
265 &sink.keys,
266 &mut self.listeners,
267 &mut self.wildcard_listeners,
268 );
269 if let Some(child_filters) = &sink.child_keys {
270 for (c_key, filters) in child_filters {
271 if let Some((listeners, wildcards)) = self.child_listeners.get_mut(c_key) {
272 Self::remove_subscriber_from(
273 subscriber,
274 filters,
275 &mut *listeners,
276 &mut *wildcards,
277 );
278
279 if listeners.is_empty() && wildcards.is_empty() {
280 self.child_listeners.remove(c_key);
281 }
282 }
283 }
284 }
285 if let Some(m) = self.metrics.as_ref() {
286 m.with_label_values(&["removed"]).inc();
287 }
288
289 Some((sink.keys.clone(), sink.child_keys.clone()))
290 }
291
292 fn remove_subscriber_from(
293 subscriber: SubscriberId,
294 filters: &Keys,
295 listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
296 wildcards: &mut FnvHashSet<SubscriberId>,
297 ) {
298 match filters {
299 None => {
300 wildcards.remove(&subscriber);
301 },
302 Some(filters) =>
303 for key in filters.iter() {
304 let remove_key = match listeners.get_mut(key) {
305 Some(ref mut set) => {
306 set.remove(&subscriber);
307 set.is_empty()
308 },
309 None => false,
310 };
311
312 if remove_key {
313 listeners.remove(key);
314 }
315 },
316 }
317 }
318
319 fn listen_from(
320 current_id: SubscriberId,
321 filter_keys: Option<impl AsRef<[StorageKey]>>,
322 listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
323 wildcards: &mut FnvHashSet<SubscriberId>,
324 ) -> Keys {
325 match filter_keys {
326 None => {
327 wildcards.insert(current_id);
328 None
329 },
330 Some(keys) => Some(
331 keys.as_ref()
332 .iter()
333 .map(|key| {
334 listeners.entry(key.clone()).or_default().insert(current_id);
335 key.clone()
336 })
337 .collect(),
338 ),
339 }
340 }
341}
342
343pub(super) struct PrintKeys<'a>(pub &'a Keys);
344impl<'a> std::fmt::Debug for PrintKeys<'a> {
345 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
346 if let Some(keys) = self.0 {
347 fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish()
348 } else {
349 write!(fmt, "None")
350 }
351 }
352}
353
354pub(super) struct PrintChildKeys<'a>(pub &'a ChildKeys);
355impl<'a> std::fmt::Debug for PrintChildKeys<'a> {
356 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
357 if let Some(map) = self.0 {
358 fmt.debug_map()
359 .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values))))
360 .finish()
361 } else {
362 write!(fmt, "None")
363 }
364 }
365}