referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
import_notification_sink.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Multi view import notification sink. This module provides a unified stream of transactions that
20//! have been notified as ready by any of the active views maintained by the transaction pool. It
21//! combines streams (`import_notification_stream`) from multiple views into a single stream. Events
22//! coming from this stream are dynamically dispatched to many external watchers.
23
24use crate::{fork_aware_txpool::stream_map_util::next_event, LOG_TARGET};
25use futures::{
26	channel::mpsc::{channel, Receiver as EventStream, Sender as ExternalSink},
27	stream::StreamExt,
28	Future, FutureExt,
29};
30use parking_lot::RwLock;
31use sc_utils::mpsc;
32use std::{
33	collections::HashSet,
34	fmt::{self, Debug, Formatter},
35	hash::Hash,
36	pin::Pin,
37	sync::Arc,
38};
39use tokio_stream::StreamMap;
40use tracing::trace;
41
42/// A type alias for a pinned, boxed stream of items of type `I`.
43/// This alias is particularly useful for defining the types of the incoming streams from various
44/// views, and is intended to build the stream of transaction hashes that become ready.
45///
46/// Note: generic parameter allows better testing of all types involved.
47type StreamOf<I> = Pin<Box<dyn futures::Stream<Item = I> + Send>>;
48
49/// A type alias for a tracing unbounded sender used as the command channel controller.
50/// Used to send control commands to the [`AggregatedStreamContext`].
51type Controller<T> = mpsc::TracingUnboundedSender<T>;
52
53/// A type alias for a tracing unbounded receiver used as the command channel receiver.
54/// Used to receive control commands in the [`AggregatedStreamContext`].
55type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
56
57/// An enum representing commands that can be sent to the multi-sinks context.
58///
59/// This enum contains variants that encapsulate control commands used to manage multiple streams
60/// within the `AggregatedStreamContext`.
61enum Command<K, I: Send + Sync> {
62	///  Adds a new view with a unique key and a stream of items of type `I`.
63	AddView(K, StreamOf<I>),
64}
65
66impl<K, I: Send + Sync> Debug for Command<K, I> {
67	fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
68		match self {
69			Command::AddView(..) => write!(f, "AddView"),
70		}
71	}
72}
73
74/// A context used to unfold the single stream of items aggregated from the multiple
75/// streams.
76///
77/// The `AggregatedStreamContext` continuously monitors both the command receiver and the stream
78/// map, ensuring new views can be dynamically added and events from any active view can be
79/// processed.
80struct AggregatedStreamContext<K, I: Send + Sync> {
81	/// A map of streams identified by unique keys,
82	stream_map: StreamMap<K, StreamOf<I>>,
83	/// A receiver for handling control commands, such as adding new views.
84	command_receiver: CommandReceiver<Command<K, I>>,
85}
86
87impl<K, I> AggregatedStreamContext<K, I>
88where
89	K: Send + Debug + Unpin + Clone + Default + Hash + Eq + 'static,
90	I: Send + Sync + 'static + PartialEq + Eq + Hash + Clone + Debug,
91{
92	/// Creates a new aggregated stream of items and its command controller.
93	///
94	/// This function sets up the initial context with an empty stream map. The aggregated output
95	/// stream of items (e.g. hashes of transactions that become ready) is unfolded.
96	///
97	/// It returns a tuple containing the output stream and the command controller, allowing
98	/// external components to control this stream.
99	fn event_stream() -> (StreamOf<I>, Controller<Command<K, I>>) {
100		let (sender, receiver) =
101			sc_utils::mpsc::tracing_unbounded::<Command<K, I>>("import-notification-sink", 16);
102
103		let ctx = Self { stream_map: StreamMap::new(), command_receiver: receiver };
104
105		let output_stream = futures::stream::unfold(ctx, |mut ctx| async move {
106			loop {
107				tokio::select! {
108					biased;
109					cmd = ctx.command_receiver.next() => {
110						match cmd? {
111							Command::AddView(key,stream) => {
112								trace!(
113									target: LOG_TARGET,
114									?key,
115									"Command::AddView"
116								);
117								ctx.stream_map.insert(key,stream);
118							},
119						}
120					},
121
122					Some(event) = next_event(&mut ctx.stream_map) => {
123						trace!(
124							target: LOG_TARGET,
125							?event,
126							"import_notification_sink: select_next_some"
127						);
128						return Some((event.1, ctx));
129					}
130				}
131			}
132		})
133		.boxed();
134
135		(output_stream, sender)
136	}
137}
138
139/// A struct that facilitates the relaying notifications of ready transactions from multiple views
140/// to many external sinks.
141///
142/// `MultiViewImportNotificationSink` provides mechanisms to dynamically add new views, filter
143/// notifications of imported transactions hashes and relay them to the multiple external sinks.
144#[derive(Clone)]
145pub struct MultiViewImportNotificationSink<K, I: Send + Sync> {
146	/// A controller used to send commands to the internal [`AggregatedStreamContext`].
147	controller: Controller<Command<K, I>>,
148	/// A vector of the external sinks, each receiving a copy of the merged stream of ready
149	/// transaction hashes.
150	external_sinks: Arc<RwLock<Vec<ExternalSink<I>>>>,
151	/// A set of already notified items, ensuring that each item (transaction hash) is only
152	/// sent out once.
153	already_notified_items: Arc<RwLock<HashSet<I>>>,
154}
155
156/// An asynchronous task responsible for dispatching aggregated import notifications to multiple
157/// sinks (created by [`MultiViewImportNotificationSink::event_stream`]).
158pub type ImportNotificationTask = Pin<Box<dyn Future<Output = ()> + Send>>;
159
160impl<K, I> MultiViewImportNotificationSink<K, I>
161where
162	K: 'static + Clone + Send + Debug + Default + Unpin + Eq + Hash,
163	I: 'static + Clone + Send + Debug + Sync + PartialEq + Eq + Hash,
164{
165	/// Creates a new [`MultiViewImportNotificationSink`] along with its associated worker task.
166	///
167	/// This function initializes the sink and provides the worker task that listens for events from
168	/// the aggregated stream, relaying them to the external sinks. The task shall be polled by
169	/// caller.
170	///
171	/// Returns a tuple containing the [`MultiViewImportNotificationSink`] and the
172	/// [`ImportNotificationTask`].
173	pub fn new_with_worker() -> (MultiViewImportNotificationSink<K, I>, ImportNotificationTask) {
174		let (output_stream, controller) = AggregatedStreamContext::<K, I>::event_stream();
175		let output_stream_controller = Self {
176			controller,
177			external_sinks: Default::default(),
178			already_notified_items: Default::default(),
179		};
180		let external_sinks = output_stream_controller.external_sinks.clone();
181		let already_notified_items = output_stream_controller.already_notified_items.clone();
182
183		let import_notifcation_task = output_stream
184			.for_each(move |event| {
185				let external_sinks = external_sinks.clone();
186				let already_notified_items = already_notified_items.clone();
187				async move {
188					if already_notified_items.write().insert(event.clone()) {
189						external_sinks.write().retain_mut(|sink| {
190							trace!(
191								target: LOG_TARGET,
192								?event,
193								"import_sink_worker sending out imported"
194							);
195							if let Err(error) = sink.try_send(event.clone()) {
196								trace!(
197									target: LOG_TARGET,
198									%error,
199									"import_sink_worker sending message failed"
200								);
201								false
202							} else {
203								true
204							}
205						});
206					}
207				}
208			})
209			.boxed();
210		(output_stream_controller, import_notifcation_task)
211	}
212
213	/// Adds a new stream associated with the view identified by specified key.
214	///
215	/// The new view's stream is added to the internal aggregated stream context by sending command
216	/// to its `command_receiver`.
217	pub fn add_view(&self, key: K, view: StreamOf<I>) {
218		let _ =
219			self.controller
220				.unbounded_send(Command::AddView(key.clone(), view))
221				.map_err(|error| {
222					trace!(
223						target: LOG_TARGET,
224						?key,
225						%error,
226						"add_view send message failed"
227					);
228				});
229	}
230
231	/// Creates and returns a new external stream of ready transactions hashes notifications.
232	pub fn event_stream(&self) -> EventStream<I> {
233		const CHANNEL_BUFFER_SIZE: usize = 1024;
234		let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
235		self.external_sinks.write().push(sender);
236		receiver
237	}
238
239	/// Removes specified items from the `already_notified_items` set.
240	///
241	/// Intended to be called once transactions are finalized.
242	pub fn clean_notified_items(&self, items_to_be_removed: &[I]) {
243		let mut already_notified_items = self.already_notified_items.write();
244		items_to_be_removed.iter().for_each(|i| {
245			already_notified_items.remove(i);
246		});
247	}
248
249	/// Lenght of the `already_notified_items` set.
250	///
251	/// Exposed for testing only.
252	pub fn notified_items_len(&self) -> usize {
253		self.already_notified_items.read().len()
254	}
255}
256
257#[cfg(test)]
258mod tests {
259	use super::*;
260	use core::time::Duration;
261	use tokio::task::JoinHandle;
262
263	#[derive(Debug, Clone)]
264	struct Event<I: Send> {
265		delay: u64,
266		value: I,
267	}
268
269	impl<I: Send> From<(u64, I)> for Event<I> {
270		fn from(event: (u64, I)) -> Self {
271			Self { delay: event.0, value: event.1 }
272		}
273	}
274
275	struct View<I: Send + Sync> {
276		scenario: Vec<Event<I>>,
277		sinks: Arc<RwLock<Vec<ExternalSink<I>>>>,
278	}
279
280	impl<I: Send + Sync + 'static + Clone + Debug> View<I> {
281		fn new(scenario: Vec<(u64, I)>) -> Self {
282			Self {
283				scenario: scenario.into_iter().map(Into::into).collect(),
284				sinks: Default::default(),
285			}
286		}
287
288		async fn event_stream(&self) -> EventStream<I> {
289			let (sender, receiver) = channel(32);
290			self.sinks.write().push(sender);
291			receiver
292		}
293
294		fn play(&mut self) -> JoinHandle<()> {
295			let mut scenario = self.scenario.clone();
296			let sinks = self.sinks.clone();
297			tokio::spawn(async move {
298				loop {
299					if scenario.is_empty() {
300						for sink in &mut *sinks.write() {
301							sink.close_channel();
302						}
303						break;
304					};
305					let x = scenario.remove(0);
306					tokio::time::sleep(Duration::from_millis(x.delay)).await;
307					for sink in &mut *sinks.write() {
308						sink.try_send(x.value.clone()).unwrap();
309					}
310				}
311			})
312		}
313	}
314
315	#[tokio::test]
316	async fn deduplicating_works() {
317		sp_tracing::try_init_simple();
318
319		let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
320
321		let j0 = tokio::spawn(runnable);
322
323		let stream = ctrl.event_stream();
324
325		let mut v1 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
326		let mut v2 = View::new(vec![(0, 1), (0, 2), (0, 6)]);
327		let mut v3 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
328
329		let j1 = v1.play();
330		let j2 = v2.play();
331		let j3 = v3.play();
332
333		let o1 = v1.event_stream().await.boxed();
334		let o2 = v2.event_stream().await.boxed();
335		let o3 = v3.event_stream().await.boxed();
336
337		ctrl.add_view(1000, o1);
338		ctrl.add_view(2000, o2);
339		ctrl.add_view(3000, o3);
340
341		let out = stream.take(4).collect::<Vec<_>>().await;
342		assert!(out.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
343		drop(ctrl);
344
345		futures::future::join_all(vec![j0, j1, j2, j3]).await;
346	}
347
348	#[tokio::test]
349	async fn dedup_filter_reset_works() {
350		sp_tracing::try_init_simple();
351
352		let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
353
354		let j0 = tokio::spawn(runnable);
355
356		let stream = ctrl.event_stream();
357		let stream2 = ctrl.event_stream();
358
359		let mut v1 = View::new(vec![(10, 1), (10, 2), (10, 3)]);
360		let mut v2 = View::new(vec![(20, 1), (20, 2), (20, 6)]);
361		let mut v3 = View::new(vec![(20, 1), (20, 2), (20, 3)]);
362
363		let j1 = v1.play();
364		let j2 = v2.play();
365		let j3 = v3.play();
366
367		let o1 = v1.event_stream().await.boxed();
368		let o2 = v2.event_stream().await.boxed();
369		let o3 = v3.event_stream().await.boxed();
370
371		ctrl.add_view(1000, o1);
372		ctrl.add_view(2000, o2);
373
374		let out = stream.take(4).collect::<Vec<_>>().await;
375		assert_eq!(out, vec![1, 2, 3, 6]);
376
377		ctrl.clean_notified_items(&vec![1, 3]);
378		ctrl.add_view(3000, o3.boxed());
379		let out = stream2.take(6).collect::<Vec<_>>().await;
380		assert_eq!(out, vec![1, 2, 3, 6, 1, 3]);
381
382		drop(ctrl);
383		futures::future::join_all(vec![j0, j1, j2, j3]).await;
384	}
385
386	#[tokio::test]
387	async fn many_output_streams_are_supported() {
388		sp_tracing::try_init_simple();
389
390		let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
391
392		let j0 = tokio::spawn(runnable);
393
394		let stream0 = ctrl.event_stream();
395		let stream1 = ctrl.event_stream();
396
397		let mut v1 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
398		let mut v2 = View::new(vec![(0, 1), (0, 2), (0, 6)]);
399		let mut v3 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
400
401		let j1 = v1.play();
402		let j2 = v2.play();
403		let j3 = v3.play();
404
405		let o1 = v1.event_stream().await.boxed();
406		let o2 = v2.event_stream().await.boxed();
407		let o3 = v3.event_stream().await.boxed();
408
409		ctrl.add_view(1000, o1);
410		ctrl.add_view(2000, o2);
411		ctrl.add_view(3000, o3);
412
413		let out0 = stream0.take(4).collect::<Vec<_>>().await;
414		let out1 = stream1.take(4).collect::<Vec<_>>().await;
415		assert!(out0.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
416		assert!(out1.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
417		drop(ctrl);
418
419		futures::future::join_all(vec![j0, j1, j2, j3]).await;
420	}
421}