1// This file is part of Substrate.
23// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
56// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
1011// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
1516// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
1819//! Storage notifications
2021use std::{
22 collections::{HashMap, HashSet},
23 pin::Pin,
24 sync::Arc,
25 task::Poll,
26};
2728use futures::Stream;
2930use prometheus_endpoint::Registry as PrometheusRegistry;
3132use sc_utils::pubsub::{Hub, Receiver};
33use sp_core::storage::{StorageData, StorageKey};
34use sp_runtime::traits::Block as BlockT;
3536mod registry;
3738use registry::Registry;
3940#[cfg(test)]
41mod tests;
4243/// A type of a message delivered to the subscribers
44#[derive(Debug)]
45pub struct StorageNotification<Hash> {
46/// The hash of the block
47pub block: Hash,
4849/// The set of changes
50pub changes: StorageChangeSet,
51}
5253/// Storage change set
54#[derive(Debug)]
55pub struct StorageChangeSet {
56 changes: Arc<[(StorageKey, Option<StorageData>)]>,
57 child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
58 filter: Keys,
59 child_filters: ChildKeys,
60}
6162/// Manages storage listeners.
63#[derive(Debug)]
64pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
6566/// Type that implements `futures::Stream` of storage change events.
67pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
6869type Keys = Option<HashSet<StorageKey>>;
70type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
7172impl StorageChangeSet {
73/// Convert the change set into iterator over storage items.
74pub fn iter(
75&self,
76 ) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
77let top = self
78.changes
79 .iter()
80 .filter(move |&(key, _)| match self.filter {
81Some(ref filter) => filter.contains(key),
82None => true,
83 })
84 .map(move |(k, v)| (None, k, v.as_ref()));
85let children = self
86.child_changes
87 .iter()
88 .filter_map(move |(sk, changes)| {
89self.child_filters.as_ref().and_then(|cf| {
90 cf.get(sk).map(|filter| {
91 changes
92 .iter()
93 .filter(move |&(key, _)| match filter {
94Some(ref filter) => filter.contains(key),
95None => true,
96 })
97 .map(move |(k, v)| (Some(sk), k, v.as_ref()))
98 })
99 })
100 })
101 .flatten();
102 top.chain(children)
103 }
104}
105106impl<H> Stream for StorageEventStream<H> {
107type Item = StorageNotification<H>;
108fn poll_next(
109self: Pin<&mut Self>,
110 cx: &mut std::task::Context<'_>,
111 ) -> Poll<Option<Self::Item>> {
112 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
113 }
114}
115116impl<Block: BlockT> StorageNotifications<Block> {
117/// Initialize a new StorageNotifications
118 /// optionally pass a prometheus registry to send subscriber metrics to
119pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
120let registry = Registry::new(prometheus_registry);
121let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
122123 StorageNotifications(hub)
124 }
125126/// Trigger notification to all listeners.
127 ///
128 /// Note the changes are going to be filtered by listener's filter key.
129 /// In fact no event might be sent if clients are not interested in the changes.
130pub fn trigger(
131&self,
132 hash: &Block::Hash,
133 changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
134 child_changeset: impl Iterator<
135 Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
136 >,
137 ) {
138self.0.send((hash, changeset, child_changeset))
139 }
140141/// Start listening for particular storage keys.
142pub fn listen(
143&self,
144 filter_keys: Option<&[StorageKey]>,
145 filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
146 ) -> StorageEventStream<Block::Hash> {
147let receiver = self
148.0
149.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
150151 StorageEventStream(receiver)
152 }
153}