1use crate::{fork_aware_txpool::stream_map_util::next_event, LOG_TARGET};
25use futures::{
26 channel::mpsc::{channel, Receiver as EventStream, Sender as ExternalSink},
27 stream::StreamExt,
28 Future, FutureExt,
29};
30use parking_lot::RwLock;
31use sc_utils::mpsc;
32use std::{
33 collections::HashSet,
34 fmt::{self, Debug, Formatter},
35 hash::Hash,
36 pin::Pin,
37 sync::Arc,
38};
39use tokio_stream::StreamMap;
40use tracing::trace;
41
42type StreamOf<I> = Pin<Box<dyn futures::Stream<Item = I> + Send>>;
48
49type Controller<T> = mpsc::TracingUnboundedSender<T>;
52
53type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
56
57enum Command<K, I: Send + Sync> {
62 AddView(K, StreamOf<I>),
64}
65
66impl<K, I: Send + Sync> Debug for Command<K, I> {
67 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
68 match self {
69 Command::AddView(..) => write!(f, "AddView"),
70 }
71 }
72}
73
74struct AggregatedStreamContext<K, I: Send + Sync> {
81 stream_map: StreamMap<K, StreamOf<I>>,
83 command_receiver: CommandReceiver<Command<K, I>>,
85}
86
87impl<K, I> AggregatedStreamContext<K, I>
88where
89 K: Send + Debug + Unpin + Clone + Default + Hash + Eq + 'static,
90 I: Send + Sync + 'static + PartialEq + Eq + Hash + Clone + Debug,
91{
92 fn event_stream() -> (StreamOf<I>, Controller<Command<K, I>>) {
100 let (sender, receiver) =
101 sc_utils::mpsc::tracing_unbounded::<Command<K, I>>("import-notification-sink", 16);
102
103 let ctx = Self { stream_map: StreamMap::new(), command_receiver: receiver };
104
105 let output_stream = futures::stream::unfold(ctx, |mut ctx| async move {
106 loop {
107 tokio::select! {
108 biased;
109 cmd = ctx.command_receiver.next() => {
110 match cmd? {
111 Command::AddView(key,stream) => {
112 trace!(
113 target: LOG_TARGET,
114 ?key,
115 "Command::AddView"
116 );
117 ctx.stream_map.insert(key,stream);
118 },
119 }
120 },
121
122 Some(event) = next_event(&mut ctx.stream_map) => {
123 trace!(
124 target: LOG_TARGET,
125 ?event,
126 "import_notification_sink: select_next_some"
127 );
128 return Some((event.1, ctx));
129 }
130 }
131 }
132 })
133 .boxed();
134
135 (output_stream, sender)
136 }
137}
138
139#[derive(Clone)]
145pub struct MultiViewImportNotificationSink<K, I: Send + Sync> {
146 controller: Controller<Command<K, I>>,
148 external_sinks: Arc<RwLock<Vec<ExternalSink<I>>>>,
151 already_notified_items: Arc<RwLock<HashSet<I>>>,
154}
155
156pub type ImportNotificationTask = Pin<Box<dyn Future<Output = ()> + Send>>;
159
160impl<K, I> MultiViewImportNotificationSink<K, I>
161where
162 K: 'static + Clone + Send + Debug + Default + Unpin + Eq + Hash,
163 I: 'static + Clone + Send + Debug + Sync + PartialEq + Eq + Hash,
164{
165 pub fn new_with_worker() -> (MultiViewImportNotificationSink<K, I>, ImportNotificationTask) {
174 let (output_stream, controller) = AggregatedStreamContext::<K, I>::event_stream();
175 let output_stream_controller = Self {
176 controller,
177 external_sinks: Default::default(),
178 already_notified_items: Default::default(),
179 };
180 let external_sinks = output_stream_controller.external_sinks.clone();
181 let already_notified_items = output_stream_controller.already_notified_items.clone();
182
183 let import_notifcation_task = output_stream
184 .for_each(move |event| {
185 let external_sinks = external_sinks.clone();
186 let already_notified_items = already_notified_items.clone();
187 async move {
188 if already_notified_items.write().insert(event.clone()) {
189 external_sinks.write().retain_mut(|sink| {
190 trace!(
191 target: LOG_TARGET,
192 ?event,
193 "import_sink_worker sending out imported"
194 );
195 if let Err(error) = sink.try_send(event.clone()) {
196 trace!(
197 target: LOG_TARGET,
198 %error,
199 "import_sink_worker sending message failed"
200 );
201 false
202 } else {
203 true
204 }
205 });
206 }
207 }
208 })
209 .boxed();
210 (output_stream_controller, import_notifcation_task)
211 }
212
213 pub fn add_view(&self, key: K, view: StreamOf<I>) {
218 let _ =
219 self.controller
220 .unbounded_send(Command::AddView(key.clone(), view))
221 .map_err(|error| {
222 trace!(
223 target: LOG_TARGET,
224 ?key,
225 %error,
226 "add_view send message failed"
227 );
228 });
229 }
230
231 pub fn event_stream(&self) -> EventStream<I> {
233 const CHANNEL_BUFFER_SIZE: usize = 1024;
234 let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
235 self.external_sinks.write().push(sender);
236 receiver
237 }
238
239 pub fn clean_notified_items(&self, items_to_be_removed: &[I]) {
243 let mut already_notified_items = self.already_notified_items.write();
244 items_to_be_removed.iter().for_each(|i| {
245 already_notified_items.remove(i);
246 });
247 }
248
249 pub fn notified_items_len(&self) -> usize {
253 self.already_notified_items.read().len()
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use core::time::Duration;
261 use tokio::task::JoinHandle;
262
263 #[derive(Debug, Clone)]
264 struct Event<I: Send> {
265 delay: u64,
266 value: I,
267 }
268
269 impl<I: Send> From<(u64, I)> for Event<I> {
270 fn from(event: (u64, I)) -> Self {
271 Self { delay: event.0, value: event.1 }
272 }
273 }
274
275 struct View<I: Send + Sync> {
276 scenario: Vec<Event<I>>,
277 sinks: Arc<RwLock<Vec<ExternalSink<I>>>>,
278 }
279
280 impl<I: Send + Sync + 'static + Clone + Debug> View<I> {
281 fn new(scenario: Vec<(u64, I)>) -> Self {
282 Self {
283 scenario: scenario.into_iter().map(Into::into).collect(),
284 sinks: Default::default(),
285 }
286 }
287
288 async fn event_stream(&self) -> EventStream<I> {
289 let (sender, receiver) = channel(32);
290 self.sinks.write().push(sender);
291 receiver
292 }
293
294 fn play(&mut self) -> JoinHandle<()> {
295 let mut scenario = self.scenario.clone();
296 let sinks = self.sinks.clone();
297 tokio::spawn(async move {
298 loop {
299 if scenario.is_empty() {
300 for sink in &mut *sinks.write() {
301 sink.close_channel();
302 }
303 break;
304 };
305 let x = scenario.remove(0);
306 tokio::time::sleep(Duration::from_millis(x.delay)).await;
307 for sink in &mut *sinks.write() {
308 sink.try_send(x.value.clone()).unwrap();
309 }
310 }
311 })
312 }
313 }
314
315 #[tokio::test]
316 async fn deduplicating_works() {
317 sp_tracing::try_init_simple();
318
319 let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
320
321 let j0 = tokio::spawn(runnable);
322
323 let stream = ctrl.event_stream();
324
325 let mut v1 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
326 let mut v2 = View::new(vec![(0, 1), (0, 2), (0, 6)]);
327 let mut v3 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
328
329 let j1 = v1.play();
330 let j2 = v2.play();
331 let j3 = v3.play();
332
333 let o1 = v1.event_stream().await.boxed();
334 let o2 = v2.event_stream().await.boxed();
335 let o3 = v3.event_stream().await.boxed();
336
337 ctrl.add_view(1000, o1);
338 ctrl.add_view(2000, o2);
339 ctrl.add_view(3000, o3);
340
341 let out = stream.take(4).collect::<Vec<_>>().await;
342 assert!(out.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
343 drop(ctrl);
344
345 futures::future::join_all(vec![j0, j1, j2, j3]).await;
346 }
347
348 #[tokio::test]
349 async fn dedup_filter_reset_works() {
350 sp_tracing::try_init_simple();
351
352 let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
353
354 let j0 = tokio::spawn(runnable);
355
356 let stream = ctrl.event_stream();
357 let stream2 = ctrl.event_stream();
358
359 let mut v1 = View::new(vec![(10, 1), (10, 2), (10, 3)]);
360 let mut v2 = View::new(vec![(20, 1), (20, 2), (20, 6)]);
361 let mut v3 = View::new(vec![(20, 1), (20, 2), (20, 3)]);
362
363 let j1 = v1.play();
364 let j2 = v2.play();
365 let j3 = v3.play();
366
367 let o1 = v1.event_stream().await.boxed();
368 let o2 = v2.event_stream().await.boxed();
369 let o3 = v3.event_stream().await.boxed();
370
371 ctrl.add_view(1000, o1);
372 ctrl.add_view(2000, o2);
373
374 let out = stream.take(4).collect::<Vec<_>>().await;
375 assert_eq!(out, vec![1, 2, 3, 6]);
376
377 ctrl.clean_notified_items(&vec![1, 3]);
378 ctrl.add_view(3000, o3.boxed());
379 let out = stream2.take(6).collect::<Vec<_>>().await;
380 assert_eq!(out, vec![1, 2, 3, 6, 1, 3]);
381
382 drop(ctrl);
383 futures::future::join_all(vec![j0, j1, j2, j3]).await;
384 }
385
386 #[tokio::test]
387 async fn many_output_streams_are_supported() {
388 sp_tracing::try_init_simple();
389
390 let (ctrl, runnable) = MultiViewImportNotificationSink::<u64, i32>::new_with_worker();
391
392 let j0 = tokio::spawn(runnable);
393
394 let stream0 = ctrl.event_stream();
395 let stream1 = ctrl.event_stream();
396
397 let mut v1 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
398 let mut v2 = View::new(vec![(0, 1), (0, 2), (0, 6)]);
399 let mut v3 = View::new(vec![(0, 1), (0, 2), (0, 3)]);
400
401 let j1 = v1.play();
402 let j2 = v2.play();
403 let j3 = v3.play();
404
405 let o1 = v1.event_stream().await.boxed();
406 let o2 = v2.event_stream().await.boxed();
407 let o3 = v3.event_stream().await.boxed();
408
409 ctrl.add_view(1000, o1);
410 ctrl.add_view(2000, o2);
411 ctrl.add_view(3000, o3);
412
413 let out0 = stream0.take(4).collect::<Vec<_>>().await;
414 let out1 = stream1.take(4).collect::<Vec<_>>().await;
415 assert!(out0.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
416 assert!(out1.iter().all(|v| vec![1, 2, 3, 6].contains(v)));
417 drop(ctrl);
418
419 futures::future::join_all(vec![j0, j1, j2, j3]).await;
420 }
421}