referrerpolicy=no-referrer-when-downgrade

sc_client_api/notifications/
registry.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use super::*;
20
21use sp_core::hexdisplay::HexDisplay;
22
23use fnv::{FnvHashMap, FnvHashSet};
24use prometheus_endpoint::{register, CounterVec, Opts, U64};
25
26use sc_utils::{
27	id_sequence::SeqID as SubscriberId,
28	pubsub::{Dispatch, Subscribe, Unsubscribe},
29};
30
31type SubscribersGauge = CounterVec<U64>;
32
33/// A command to subscribe with the specified filters.
34///
35/// Used by the implementation of [`Subscribe<Op>`] trait for [`Registry].
36pub(super) struct SubscribeOp<'a> {
37	pub filter_keys: Option<&'a [StorageKey]>,
38	pub filter_child_keys: Option<&'a [(StorageKey, Option<Vec<StorageKey>>)]>,
39}
40
41#[derive(Debug, Default)]
42pub(super) struct Registry {
43	pub(super) metrics: Option<SubscribersGauge>,
44	pub(super) wildcard_listeners: FnvHashSet<SubscriberId>,
45	pub(super) listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
46	pub(super) child_listeners: HashMap<
47		StorageKey,
48		(HashMap<StorageKey, FnvHashSet<SubscriberId>>, FnvHashSet<SubscriberId>),
49	>,
50	pub(super) sinks: FnvHashMap<SubscriberId, SubscriberSink>,
51}
52
53#[derive(Debug)]
54pub(super) struct SubscriberSink {
55	subs_id: SubscriberId,
56	keys: Keys,
57	child_keys: ChildKeys,
58	was_triggered: bool,
59}
60
61impl Drop for SubscriberSink {
62	fn drop(&mut self) {
63		if !self.was_triggered {
64			log::trace!(
65				target: "storage_notifications",
66				"Listener was never triggered: id={}, keys={:?}, child_keys={:?}",
67				self.subs_id,
68				PrintKeys(&self.keys),
69				PrintChildKeys(&self.child_keys),
70			);
71		}
72	}
73}
74
75impl SubscriberSink {
76	fn new(subs_id: SubscriberId, keys: Keys, child_keys: ChildKeys) -> Self {
77		Self { subs_id, keys, child_keys, was_triggered: false }
78	}
79}
80
81impl Registry {
82	pub(super) fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
83		let metrics = prometheus_registry.and_then(|r| {
84			CounterVec::new(
85				Opts::new(
86					"substrate_storage_notification_subscribers",
87					"Number of subscribers in storage notification sytem",
88				),
89				&["action"], // added | removed
90			)
91			.and_then(|g| register(g, &r))
92			.ok()
93		});
94
95		Registry { metrics, ..Default::default() }
96	}
97}
98
99impl Unsubscribe for Registry {
100	fn unsubscribe(&mut self, subs_id: SubscriberId) {
101		self.remove_subscriber(subs_id);
102	}
103}
104
105impl<'a> Subscribe<SubscribeOp<'a>> for Registry {
106	fn subscribe(&mut self, subs_op: SubscribeOp<'a>, subs_id: SubscriberId) {
107		let SubscribeOp { filter_keys, filter_child_keys } = subs_op;
108
109		let keys = Self::listen_from(
110			subs_id,
111			filter_keys.as_ref(),
112			&mut self.listeners,
113			&mut self.wildcard_listeners,
114		);
115
116		let child_keys = filter_child_keys.map(|filter_child_keys| {
117			filter_child_keys
118				.iter()
119				.map(|(c_key, o_keys)| {
120					let (c_listeners, c_wildcards) =
121						self.child_listeners.entry(c_key.clone()).or_default();
122
123					(
124						c_key.clone(),
125						Self::listen_from(
126							subs_id,
127							o_keys.as_ref(),
128							&mut *c_listeners,
129							&mut *c_wildcards,
130						),
131					)
132				})
133				.collect()
134		});
135
136		if let Some(m) = self.metrics.as_ref() {
137			m.with_label_values(&["added"]).inc();
138		}
139
140		if self
141			.sinks
142			.insert(subs_id, SubscriberSink::new(subs_id, keys, child_keys))
143			.is_some()
144		{
145			log::warn!("The `subscribe`-method has been passed a non-unique subs_id (in `sc-client-api::notifications`)");
146		}
147	}
148}
149
150impl<'a, Hash, CS, CCS, CCSI> Dispatch<(&'a Hash, CS, CCS)> for Registry
151where
152	Hash: Clone,
153	CS: Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
154	CCS: Iterator<Item = (Vec<u8>, CCSI)>,
155	CCSI: Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
156{
157	type Item = StorageNotification<Hash>;
158	type Ret = ();
159
160	fn dispatch<F>(&mut self, message: (&'a Hash, CS, CCS), dispatch: F) -> Self::Ret
161	where
162		F: FnMut(&SubscriberId, Self::Item),
163	{
164		let (hash, changeset, child_changeset) = message;
165		self.trigger(hash, changeset, child_changeset, dispatch);
166	}
167}
168
169impl Registry {
170	pub(super) fn trigger<Hash, F>(
171		&mut self,
172		hash: &Hash,
173		changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
174		child_changeset: impl Iterator<
175			Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
176		>,
177		mut dispatch: F,
178	) where
179		Hash: Clone,
180		F: FnMut(&SubscriberId, StorageNotification<Hash>),
181	{
182		let has_wildcard = !self.wildcard_listeners.is_empty();
183
184		// early exit if no listeners
185		if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() {
186			return
187		}
188
189		let mut subscribers = self.wildcard_listeners.clone();
190		let mut changes = Vec::new();
191		let mut child_changes = Vec::new();
192
193		// Collect subscribers and changes
194		for (k, v) in changeset {
195			let k = StorageKey(k);
196			let listeners = self.listeners.get(&k);
197
198			if let Some(listeners) = listeners {
199				subscribers.extend(listeners.iter());
200			}
201
202			if has_wildcard || listeners.is_some() {
203				changes.push((k, v.map(StorageData)));
204			}
205		}
206		for (sk, changeset) in child_changeset {
207			let sk = StorageKey(sk);
208			if let Some((cl, cw)) = self.child_listeners.get(&sk) {
209				let mut changes = Vec::new();
210				for (k, v) in changeset {
211					let k = StorageKey(k);
212					let listeners = cl.get(&k);
213
214					if let Some(listeners) = listeners {
215						subscribers.extend(listeners.iter());
216					}
217
218					subscribers.extend(cw.iter());
219
220					if !cw.is_empty() || listeners.is_some() {
221						changes.push((k, v.map(StorageData)));
222					}
223				}
224				if !changes.is_empty() {
225					child_changes.push((sk, changes));
226				}
227			}
228		}
229
230		// Don't send empty notifications
231		if changes.is_empty() && child_changes.is_empty() {
232			return
233		}
234
235		let changes = Arc::<[_]>::from(changes);
236		let child_changes = Arc::<[_]>::from(child_changes);
237
238		// Trigger the events
239		self.sinks.iter_mut().for_each(|(subs_id, sink)| {
240			if subscribers.contains(subs_id) {
241				sink.was_triggered = true;
242
243				let storage_change_set = StorageChangeSet {
244					changes: changes.clone(),
245					child_changes: child_changes.clone(),
246					filter: sink.keys.clone(),
247					child_filters: sink.child_keys.clone(),
248				};
249
250				let notification =
251					StorageNotification { block: hash.clone(), changes: storage_change_set };
252
253				dispatch(subs_id, notification);
254			}
255		});
256	}
257}
258
259impl Registry {
260	fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> {
261		let sink = self.sinks.remove(&subscriber)?;
262
263		Self::remove_subscriber_from(
264			subscriber,
265			&sink.keys,
266			&mut self.listeners,
267			&mut self.wildcard_listeners,
268		);
269		if let Some(child_filters) = &sink.child_keys {
270			for (c_key, filters) in child_filters {
271				if let Some((listeners, wildcards)) = self.child_listeners.get_mut(c_key) {
272					Self::remove_subscriber_from(
273						subscriber,
274						filters,
275						&mut *listeners,
276						&mut *wildcards,
277					);
278
279					if listeners.is_empty() && wildcards.is_empty() {
280						self.child_listeners.remove(c_key);
281					}
282				}
283			}
284		}
285		if let Some(m) = self.metrics.as_ref() {
286			m.with_label_values(&["removed"]).inc();
287		}
288
289		Some((sink.keys.clone(), sink.child_keys.clone()))
290	}
291
292	fn remove_subscriber_from(
293		subscriber: SubscriberId,
294		filters: &Keys,
295		listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
296		wildcards: &mut FnvHashSet<SubscriberId>,
297	) {
298		match filters {
299			None => {
300				wildcards.remove(&subscriber);
301			},
302			Some(filters) =>
303				for key in filters.iter() {
304					let remove_key = match listeners.get_mut(key) {
305						Some(ref mut set) => {
306							set.remove(&subscriber);
307							set.is_empty()
308						},
309						None => false,
310					};
311
312					if remove_key {
313						listeners.remove(key);
314					}
315				},
316		}
317	}
318
319	fn listen_from(
320		current_id: SubscriberId,
321		filter_keys: Option<impl AsRef<[StorageKey]>>,
322		listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
323		wildcards: &mut FnvHashSet<SubscriberId>,
324	) -> Keys {
325		match filter_keys {
326			None => {
327				wildcards.insert(current_id);
328				None
329			},
330			Some(keys) => Some(
331				keys.as_ref()
332					.iter()
333					.map(|key| {
334						listeners.entry(key.clone()).or_default().insert(current_id);
335						key.clone()
336					})
337					.collect(),
338			),
339		}
340	}
341}
342
343pub(super) struct PrintKeys<'a>(pub &'a Keys);
344impl<'a> std::fmt::Debug for PrintKeys<'a> {
345	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
346		if let Some(keys) = self.0 {
347			fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish()
348		} else {
349			write!(fmt, "None")
350		}
351	}
352}
353
354pub(super) struct PrintChildKeys<'a>(pub &'a ChildKeys);
355impl<'a> std::fmt::Debug for PrintChildKeys<'a> {
356	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
357		if let Some(map) = self.0 {
358			fmt.debug_map()
359				.entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values))))
360				.finish()
361		} else {
362			write!(fmt, "None")
363		}
364	}
365}