1const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000;
35
36const SUBSCRIPTION_BUFFER_SIZE: usize = 128;
38
39use futures::{Stream, StreamExt};
40use itertools::Itertools;
41
42use crate::LOG_TARGET;
43use sc_utils::id_sequence::SeqID;
44use sp_core::{traits::SpawnNamed, Bytes, Encode};
45pub use sp_statement_store::StatementStore;
46use sp_statement_store::{
47 OptimizedTopicFilter, Result, Statement, StatementEvent, Topic, MAX_TOPICS,
48};
49use std::{
50 collections::{hash_map::Entry, HashMap, HashSet},
51 sync::atomic::AtomicU64,
52};
53
54pub trait StatementStoreSubscriptionApi: Send + Sync {
56 fn subscribe_statement(
61 &self,
62 topic_filter: OptimizedTopicFilter,
63 ) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>;
64}
65
66#[derive(Clone, Debug)]
68pub enum MatcherMessage {
69 NewStatement(Statement),
71 Subscribe(SubscriptionInfo),
73 Unsubscribe(SeqID),
75}
76
77pub struct SubscriptionsHandle {
79 id_sequence: AtomicU64,
82 matchers: SubscriptionsMatchersHandlers,
84}
85
86impl SubscriptionsHandle {
87 pub(crate) fn new(
89 task_spawner: Box<dyn SpawnNamed>,
90 num_matcher_workers: usize,
91 ) -> SubscriptionsHandle {
92 let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers);
93
94 for task in 0..num_matcher_workers {
95 let (subscription_matcher_sender, subscription_matcher_receiver) =
96 async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
97 subscriptions_matchers_senders.push(subscription_matcher_sender);
98 task_spawner.spawn_blocking(
99 "statement-store-subscription-filters",
100 Some("statement-store"),
101 Box::pin(async move {
102 let mut subscriptions = SubscriptionsInfo::new();
103 log::debug!(
104 target: LOG_TARGET,
105 "Started statement subscription matcher task: {task}"
106 );
107 loop {
108 let res = subscription_matcher_receiver.recv().await;
109 match res {
110 Ok(MatcherMessage::NewStatement(statement)) => {
111 subscriptions.notify_matching_filters(&statement);
112 },
113 Ok(MatcherMessage::Subscribe(info)) => {
114 subscriptions.subscribe(info);
115 },
116 Ok(MatcherMessage::Unsubscribe(seq_id)) => {
117 subscriptions.unsubscribe(seq_id);
118 },
119 Err(_) => {
120 log::error!(
122 target: LOG_TARGET,
123 "Statement subscription matcher channel closed: {task}"
124 );
125 break;
126 },
127 };
128 }
129 }),
130 );
131 }
132 SubscriptionsHandle {
133 id_sequence: AtomicU64::new(0),
134 matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders),
135 }
136 }
137
138 fn next_id(&self) -> SeqID {
140 let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
141 SeqID::from(id)
142 }
143
144 pub(crate) fn subscribe(
146 &self,
147 topic_filter: OptimizedTopicFilter,
148 ) -> (async_channel::Sender<StatementEvent>, SubscriptionStatementsStream) {
149 let next_id = self.next_id();
150 let (tx, rx) = async_channel::bounded(SUBSCRIPTION_BUFFER_SIZE);
151 let subscription_info =
152 SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx };
153
154 let result = (
155 subscription_info.tx.clone(),
156 SubscriptionStatementsStream {
157 rx,
158 sub_id: subscription_info.seq_id,
159 matchers: self.matchers.clone(),
160 },
161 );
162
163 self.matchers
164 .send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info));
165 result
166 }
167
168 pub(crate) fn notify(&self, statement: Statement) {
169 self.matchers.send_all(MatcherMessage::NewStatement(statement));
170 }
171}
172
173struct SubscriptionsInfo {
176 subscriptions_match_all_by_topic:
185 HashMap<Topic, [HashMap<SeqID, SubscriptionInfo>; MAX_TOPICS]>,
186 subscriptions_match_any_by_topic: HashMap<Topic, HashMap<SeqID, SubscriptionInfo>>,
188 subscriptions_any: HashMap<SeqID, SubscriptionInfo>,
190 by_sub_id: HashMap<SeqID, OptimizedTopicFilter>,
192}
193
194#[derive(Clone, Debug)]
196pub(crate) struct SubscriptionInfo {
197 topic_filter: OptimizedTopicFilter,
199 seq_id: SeqID,
201 tx: async_channel::Sender<StatementEvent>,
203}
204
205impl SubscriptionsInfo {
206 fn new() -> SubscriptionsInfo {
207 SubscriptionsInfo {
208 subscriptions_match_all_by_topic: HashMap::new(),
209 subscriptions_match_any_by_topic: HashMap::new(),
210 subscriptions_any: HashMap::new(),
211 by_sub_id: HashMap::new(),
212 }
213 }
214
215 fn subscribe(&mut self, subscription_info: SubscriptionInfo) {
217 self.by_sub_id
218 .insert(subscription_info.seq_id, subscription_info.topic_filter.clone());
219 match &subscription_info.topic_filter {
220 OptimizedTopicFilter::Any => {
221 self.subscriptions_any.insert(subscription_info.seq_id, subscription_info);
222 },
223 OptimizedTopicFilter::MatchAll(topics) => {
224 for topic in topics {
225 self.subscriptions_match_all_by_topic.entry(*topic).or_default()
226 [topics.len() - 1]
227 .insert(subscription_info.seq_id, subscription_info.clone());
228 }
229 },
230 OptimizedTopicFilter::MatchAny(topics) => {
231 for topic in topics {
232 self.subscriptions_match_any_by_topic
233 .entry(*topic)
234 .or_default()
235 .insert(subscription_info.seq_id, subscription_info.clone());
236 }
237 },
238 };
239 }
240
241 fn notify_subscriber(
243 &self,
244 subscription: &SubscriptionInfo,
245 bytes_to_send: Bytes,
246 needs_unsubscribing: &mut HashSet<SeqID>,
247 ) {
248 if let Err(err) = subscription.tx.try_send(StatementEvent::NewStatements {
249 statements: vec![bytes_to_send],
250 remaining: None,
251 }) {
252 log::debug!(
253 target: LOG_TARGET,
254 "Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err
255 );
256 needs_unsubscribing.insert(subscription.seq_id);
259 }
260 }
261
262 fn notify_matching_filters(&mut self, statement: &Statement) {
263 self.notify_match_all_subscribers_best(statement);
264 self.notify_match_any_subscribers(statement);
265 self.notify_any_subscribers(statement);
266 }
267
268 fn notify_match_any_subscribers(&mut self, statement: &Statement) {
270 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
271 let mut already_notified: HashSet<SeqID> = HashSet::new();
272
273 let bytes_to_send: Bytes = statement.encode().into();
274 for statement_topic in statement.topics() {
275 if let Some(subscriptions) = self.subscriptions_match_any_by_topic.get(statement_topic)
276 {
277 for subscription in subscriptions
278 .values()
279 .filter(|subscription| already_notified.insert(subscription.seq_id))
280 {
281 self.notify_subscriber(
282 subscription,
283 bytes_to_send.clone(),
284 &mut needs_unsubscribing,
285 );
286 }
287 }
288 }
289
290 for sub_id in needs_unsubscribing {
293 self.unsubscribe(sub_id);
294 }
295 }
296
297 fn notify_match_all_subscribers_best(&mut self, statement: &Statement) {
299 let bytes_to_send: Bytes = statement.encode().into();
300 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
301 let num_topics = statement.topics().len();
302
303 for num_topics_to_check in 1..=num_topics {
306 for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
307 let Some(Some(topic_with_fewest)) = topics_combination
309 .iter()
310 .map(|topic| self.subscriptions_match_all_by_topic.get(*topic))
311 .min_by_key(|subscriptions| {
312 subscriptions.map_or(0, |subscriptions_by_length| {
313 subscriptions_by_length[num_topics_to_check - 1].len()
314 })
315 })
316 else {
317 continue;
318 };
319
320 for subscription in topic_with_fewest[num_topics_to_check - 1]
321 .values()
322 .filter(|subscription| subscription.topic_filter.matches(statement))
323 {
324 self.notify_subscriber(
325 subscription,
326 bytes_to_send.clone(),
327 &mut needs_unsubscribing,
328 );
329 }
330 }
331 }
332 for sub_id in needs_unsubscribing {
335 self.unsubscribe(sub_id);
336 }
337 }
338
339 fn notify_any_subscribers(&mut self, statement: &Statement) {
341 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
342
343 let bytes_to_send: Bytes = statement.encode().into();
344 for subscription in self.subscriptions_any.values() {
345 self.notify_subscriber(subscription, bytes_to_send.clone(), &mut needs_unsubscribing);
346 }
347
348 for sub_id in needs_unsubscribing {
351 self.unsubscribe(sub_id);
352 }
353 }
354
355 fn unsubscribe(&mut self, id: SeqID) {
357 let Some(entry) = self.by_sub_id.remove(&id) else {
358 return;
359 };
360
361 let topics = match &entry {
362 OptimizedTopicFilter::Any => {
363 self.subscriptions_any.remove(&id);
364 return;
365 },
366 OptimizedTopicFilter::MatchAll(topics) => topics,
367 OptimizedTopicFilter::MatchAny(topics) => topics,
368 };
369
370 for topic in topics {
372 if let Entry::Occupied(mut entry) = self.subscriptions_match_any_by_topic.entry(*topic)
374 {
375 entry.get_mut().remove(&id);
376 if entry.get().is_empty() {
377 entry.remove();
378 }
379 }
380 if let Entry::Occupied(mut entry) = self.subscriptions_match_all_by_topic.entry(*topic)
382 {
383 for subscriptions in entry.get_mut().iter_mut() {
384 if subscriptions.remove(&id).is_some() {
385 break;
386 }
387 }
388 if entry.get().iter().all(|s| s.is_empty()) {
389 entry.remove();
390 }
391 }
392 }
393 }
394}
395
396#[derive(Clone)]
398pub struct SubscriptionsMatchersHandlers {
399 matchers: Vec<async_channel::Sender<MatcherMessage>>,
401}
402
403impl SubscriptionsMatchersHandlers {
404 fn new(matchers: Vec<async_channel::Sender<MatcherMessage>>) -> SubscriptionsMatchersHandlers {
406 SubscriptionsMatchersHandlers { matchers }
407 }
408
409 fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) {
411 let index: u64 = id.into();
412 if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message)
415 {
416 log::error!(
417 target: LOG_TARGET,
418 "Failed to send statement to matcher task: {:?}", err
419 );
420 }
421 }
422
423 fn send_all(&self, message: MatcherMessage) {
425 for sender in &self.matchers {
426 if let Err(err) = sender.send_blocking(message.clone()) {
427 log::error!(
428 target: LOG_TARGET,
429 "Failed to send message to matcher task: {:?}", err
430 );
431 }
432 }
433 }
434}
435
436pub struct SubscriptionStatementsStream {
438 pub rx: async_channel::Receiver<StatementEvent>,
440 sub_id: SeqID,
442 matchers: SubscriptionsMatchersHandlers,
444}
445
446impl Drop for SubscriptionStatementsStream {
448 fn drop(&mut self) {
449 self.matchers
450 .send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id));
451 }
452}
453
454impl Stream for SubscriptionStatementsStream {
455 type Item = StatementEvent;
456
457 fn poll_next(
458 mut self: std::pin::Pin<&mut Self>,
459 cx: &mut std::task::Context<'_>,
460 ) -> std::task::Poll<Option<Self::Item>> {
461 self.rx.poll_next_unpin(cx)
462 }
463}
464
465#[cfg(test)]
466mod tests {
467
468 use crate::tests::signed_statement;
469
470 use super::*;
471 use sp_core::Decode;
472 use sp_statement_store::Topic;
473
474 fn unwrap_statement(item: StatementEvent) -> Bytes {
475 match item {
476 StatementEvent::NewStatements { mut statements, .. } => {
477 assert_eq!(statements.len(), 1, "Expected exactly one statement in batch");
478 statements.remove(0)
479 },
480 }
481 }
482 #[test]
483 fn test_subscribe_unsubscribe() {
484 let mut subscriptions = SubscriptionsInfo::new();
485
486 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
487 let topic1 = Topic::from([8u8; 32]);
488 let topic2 = Topic::from([9u8; 32]);
489 let sub_info1 = SubscriptionInfo {
490 topic_filter: OptimizedTopicFilter::MatchAll(
491 vec![topic1, topic2].into_iter().collect(),
492 ),
493 seq_id: SeqID::from(1),
494 tx: tx1,
495 };
496 subscriptions.subscribe(sub_info1.clone());
497 assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
498 assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
499 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
500 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
501
502 subscriptions.unsubscribe(sub_info1.seq_id);
503 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
504 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
505 }
506
507 #[test]
508 fn test_subscribe_any() {
509 let mut subscriptions = SubscriptionsInfo::new();
510 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
511 let sub_info1 = SubscriptionInfo {
512 topic_filter: OptimizedTopicFilter::Any,
513 seq_id: SeqID::from(1),
514 tx: tx1,
515 };
516 subscriptions.subscribe(sub_info1.clone());
517 assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
518 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
519 subscriptions.unsubscribe(sub_info1.seq_id);
520 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
521 }
522
523 #[test]
524 fn test_subscribe_match_any() {
525 let mut subscriptions = SubscriptionsInfo::new();
526
527 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
528 let topic1 = Topic::from([8u8; 32]);
529 let topic2 = Topic::from([9u8; 32]);
530 let sub_info1 = SubscriptionInfo {
531 topic_filter: OptimizedTopicFilter::MatchAny(
532 vec![topic1, topic2].into_iter().collect(),
533 ),
534 seq_id: SeqID::from(1),
535 tx: tx1,
536 };
537 subscriptions.subscribe(sub_info1.clone());
538 assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
539 assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic2));
540 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
541 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
542
543 subscriptions.unsubscribe(sub_info1.seq_id);
544 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
545 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
546 }
547
548 #[test]
549 fn test_notify_any_subscribers() {
550 let mut subscriptions = SubscriptionsInfo::new();
551
552 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
553 let sub_info1 = SubscriptionInfo {
554 topic_filter: OptimizedTopicFilter::Any,
555 seq_id: SeqID::from(1),
556 tx: tx1,
557 };
558 subscriptions.subscribe(sub_info1.clone());
559
560 let statement = signed_statement(1);
561 subscriptions.notify_matching_filters(&statement);
562
563 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
564 let decoded_statement: Statement =
565 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
566 assert_eq!(decoded_statement, statement);
567 }
568
569 #[test]
570 fn test_notify_match_all_subscribers() {
571 let mut subscriptions = SubscriptionsInfo::new();
572
573 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
574 let topic1 = Topic::from([8u8; 32]);
575 let topic2 = Topic::from([9u8; 32]);
576 let sub_info1 = SubscriptionInfo {
577 topic_filter: OptimizedTopicFilter::MatchAll(
578 vec![topic1, topic2].into_iter().collect(),
579 ),
580 seq_id: SeqID::from(1),
581 tx: tx1,
582 };
583 subscriptions.subscribe(sub_info1.clone());
584
585 let mut statement = signed_statement(1);
586 statement.set_topic(0, topic2);
587 subscriptions.notify_matching_filters(&statement);
588
589 assert!(rx1.try_recv().is_err());
591
592 statement.set_topic(1, topic1);
593 subscriptions.notify_matching_filters(&statement);
594
595 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
596 let decoded_statement: Statement =
597 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
598 assert_eq!(decoded_statement, statement);
599 }
600
601 #[test]
602 fn test_notify_match_any_subscribers() {
603 let mut subscriptions = SubscriptionsInfo::new();
604 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
605 let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
606
607 let topic1 = Topic::from([8u8; 32]);
608 let topic2 = Topic::from([9u8; 32]);
609 let sub_info1 = SubscriptionInfo {
610 topic_filter: OptimizedTopicFilter::MatchAny(
611 vec![topic1, topic2].into_iter().collect(),
612 ),
613 seq_id: SeqID::from(1),
614 tx: tx1,
615 };
616
617 let sub_info2 = SubscriptionInfo {
618 topic_filter: OptimizedTopicFilter::MatchAny(vec![topic2].into_iter().collect()),
619 seq_id: SeqID::from(2),
620 tx: tx2,
621 };
622
623 subscriptions.subscribe(sub_info1.clone());
624 subscriptions.subscribe(sub_info2.clone());
625
626 let mut statement = signed_statement(1);
627 statement.set_topic(0, topic1);
628 statement.set_topic(1, topic2);
629 subscriptions.notify_match_any_subscribers(&statement);
630
631 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
632 let decoded_statement: Statement =
633 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
634 assert_eq!(decoded_statement, statement);
635
636 let received = unwrap_statement(rx2.try_recv().expect("Should receive statement"));
637 let decoded_statement: Statement =
638 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
639 assert_eq!(decoded_statement, statement);
640 }
641
642 #[tokio::test]
643 async fn test_subscription_handle_with_different_workers_number() {
644 for num_workers in 1..5 {
645 let subscriptions_handle = SubscriptionsHandle::new(
646 Box::new(sp_core::testing::TaskExecutor::new()),
647 num_workers,
648 );
649
650 let topic1 = Topic::from([8u8; 32]);
651 let topic2 = Topic::from([9u8; 32]);
652
653 let streams = (0..5)
654 .into_iter()
655 .map(|_| {
656 subscriptions_handle.subscribe(OptimizedTopicFilter::MatchAll(
657 vec![topic1, topic2].into_iter().collect(),
658 ))
659 })
660 .collect::<Vec<_>>();
661
662 let mut statement = signed_statement(1);
663 statement.set_topic(0, topic2);
664 subscriptions_handle.notify(statement.clone());
665
666 statement.set_topic(1, topic1);
667 subscriptions_handle.notify(statement.clone());
668
669 for (_tx, mut stream) in streams {
670 let received =
671 unwrap_statement(stream.next().await.expect("Should receive statement"));
672 let decoded_statement: Statement =
673 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
674 assert_eq!(decoded_statement, statement);
675 }
676 }
677 }
678
679 #[tokio::test]
680 async fn test_handle_unsubscribe() {
681 let subscriptions_handle =
682 SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
683
684 let topic1 = Topic::from([8u8; 32]);
685 let topic2 = Topic::from([9u8; 32]);
686
687 let (tx, mut stream) = subscriptions_handle
688 .subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
689
690 let mut statement = signed_statement(1);
691 statement.set_topic(0, topic1);
692 statement.set_topic(1, topic2);
693
694 subscriptions_handle.notify(statement.clone());
696
697 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
698 let decoded_statement: Statement =
699 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
700 assert_eq!(decoded_statement, statement);
701
702 drop(stream);
704
705 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
707
708 let mut statement2 = signed_statement(2);
710 statement2.set_topic(0, topic1);
711 statement2.set_topic(1, topic2);
712 subscriptions_handle.notify(statement2.clone());
713
714 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
717
718 assert!(tx.is_closed(), "Sender should be closed after unsubscribe");
721 }
722
723 #[test]
724 fn test_unsubscribe_nonexistent() {
725 let mut subscriptions = SubscriptionsInfo::new();
726 subscriptions.unsubscribe(SeqID::from(999));
728 assert!(subscriptions.by_sub_id.is_empty());
730 assert!(subscriptions.subscriptions_any.is_empty());
731 assert!(subscriptions.subscriptions_match_all_by_topic.is_empty());
732 assert!(subscriptions.subscriptions_match_any_by_topic.is_empty());
733 }
734
735 #[test]
736 fn test_multiple_subscriptions_same_topic() {
737 let mut subscriptions = SubscriptionsInfo::new();
738
739 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
740 let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
741 let topic1 = Topic::from([8u8; 32]);
742 let topic2 = Topic::from([9u8; 32]);
743
744 let sub_info1 = SubscriptionInfo {
745 topic_filter: OptimizedTopicFilter::MatchAll(
746 vec![topic1, topic2].into_iter().collect(),
747 ),
748 seq_id: SeqID::from(1),
749 tx: tx1,
750 };
751 let sub_info2 = SubscriptionInfo {
752 topic_filter: OptimizedTopicFilter::MatchAll(
753 vec![topic1, topic2].into_iter().collect(),
754 ),
755 seq_id: SeqID::from(2),
756 tx: tx2,
757 };
758
759 subscriptions.subscribe(sub_info1.clone());
760 subscriptions.subscribe(sub_info2.clone());
761
762 assert_eq!(
764 subscriptions
765 .subscriptions_match_all_by_topic
766 .get(&topic1)
767 .unwrap()
768 .iter()
769 .map(|s| s.len())
770 .sum::<usize>(),
771 2
772 );
773 assert_eq!(
774 subscriptions
775 .subscriptions_match_all_by_topic
776 .get(&topic2)
777 .unwrap()
778 .iter()
779 .map(|s| s.len())
780 .sum::<usize>(),
781 2
782 );
783
784 let mut statement = signed_statement(1);
786 statement.set_topic(0, topic1);
787 statement.set_topic(1, topic2);
788 subscriptions.notify_matching_filters(&statement);
789
790 assert!(rx1.try_recv().is_ok());
792 assert!(rx2.try_recv().is_ok());
793
794 subscriptions.unsubscribe(sub_info1.seq_id);
796
797 assert_eq!(
799 subscriptions
800 .subscriptions_match_all_by_topic
801 .get(&topic1)
802 .unwrap()
803 .iter()
804 .map(|s| s.len())
805 .sum::<usize>(),
806 1
807 );
808 assert_eq!(
809 subscriptions
810 .subscriptions_match_all_by_topic
811 .get(&topic2)
812 .unwrap()
813 .iter()
814 .map(|s| s.len())
815 .sum::<usize>(),
816 1
817 );
818 assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
819 assert!(subscriptions.by_sub_id.contains_key(&sub_info2.seq_id));
820
821 subscriptions.notify_matching_filters(&statement);
823
824 assert!(rx2.try_recv().is_ok());
826 assert!(rx1.try_recv().is_err());
827 }
828
829 #[test]
830 fn test_subscriber_auto_unsubscribe_on_channel_full() {
831 let mut subscriptions = SubscriptionsInfo::new();
832
833 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(1);
835 let topic1 = Topic::from([8u8; 32]);
836
837 let sub_info1 = SubscriptionInfo {
838 topic_filter: OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()),
839 seq_id: SeqID::from(1),
840 tx: tx1,
841 };
842 subscriptions.subscribe(sub_info1.clone());
843
844 let mut statement = signed_statement(1);
845 statement.set_topic(0, topic1);
846
847 subscriptions.notify_matching_filters(&statement);
849 assert!(rx1.try_recv().is_ok());
850
851 subscriptions.notify_matching_filters(&statement);
853 subscriptions.notify_matching_filters(&statement);
857
858 assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
860 assert!(!subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
861 }
862
863 #[test]
864 fn test_match_any_receives_once_per_statement() {
865 let mut subscriptions = SubscriptionsInfo::new();
866
867 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
868 let topic1 = Topic::from([8u8; 32]);
869 let topic2 = Topic::from([9u8; 32]);
870
871 let sub_info1 = SubscriptionInfo {
873 topic_filter: OptimizedTopicFilter::MatchAny(
874 vec![topic1, topic2].into_iter().collect(),
875 ),
876 seq_id: SeqID::from(1),
877 tx: tx1,
878 };
879 subscriptions.subscribe(sub_info1.clone());
880
881 let mut statement = signed_statement(1);
883 statement.set_topic(0, topic1);
884 statement.set_topic(1, topic2);
885
886 subscriptions.notify_match_any_subscribers(&statement);
887
888 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
890 let decoded_statement: Statement =
891 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
892 assert_eq!(decoded_statement, statement);
893
894 assert!(rx1.try_recv().is_err());
896 }
897
898 #[test]
899 fn test_match_all_with_single_topic_matches_statement_with_two_topics() {
900 let mut subscriptions = SubscriptionsInfo::new();
901
902 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
903 let topic1 = Topic::from([8u8; 32]);
904 let topic2 = Topic::from([9u8; 32]);
905
906 let sub_info1 = SubscriptionInfo {
908 topic_filter: OptimizedTopicFilter::MatchAll(vec![topic1].into_iter().collect()),
909 seq_id: SeqID::from(1),
910 tx: tx1,
911 };
912 subscriptions.subscribe(sub_info1.clone());
913
914 let mut statement = signed_statement(1);
916 statement.set_topic(0, topic1);
917 statement.set_topic(1, topic2);
918
919 subscriptions.notify_matching_filters(&statement);
920
921 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
923 let decoded_statement: Statement =
924 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
925 assert_eq!(decoded_statement, statement);
926
927 assert!(rx1.try_recv().is_err());
929 }
930
931 #[test]
932 fn test_match_all_no_matching_topics() {
933 let mut subscriptions = SubscriptionsInfo::new();
934
935 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
936 let topic1 = Topic::from([8u8; 32]);
937 let topic2 = Topic::from([9u8; 32]);
938 let topic3 = Topic::from([10u8; 32]);
939
940 let sub_info1 = SubscriptionInfo {
941 topic_filter: OptimizedTopicFilter::MatchAll(
942 vec![topic1, topic2].into_iter().collect(),
943 ),
944 seq_id: SeqID::from(1),
945 tx: tx1,
946 };
947 subscriptions.subscribe(sub_info1.clone());
948
949 let mut statement = signed_statement(1);
951 statement.set_topic(0, topic3);
952
953 subscriptions.notify_matching_filters(&statement);
954
955 assert!(rx1.try_recv().is_err());
957 }
958
959 #[test]
960 fn test_match_all_with_unsubscribed_topic_first_in_statement() {
961 let mut subscriptions = SubscriptionsInfo::new();
966
967 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
968 let topic1 = Topic::from([1u8; 32]);
970 let topic2 = Topic::from([2u8; 32]);
972
973 let sub_info1 = SubscriptionInfo {
975 topic_filter: OptimizedTopicFilter::MatchAll(vec![topic2].into_iter().collect()),
976 seq_id: SeqID::from(1),
977 tx: tx1,
978 };
979 subscriptions.subscribe(sub_info1);
980
981 let mut statement = signed_statement(1);
986 statement.set_topic(0, topic1);
987 statement.set_topic(1, topic2);
988
989 subscriptions.notify_match_all_subscribers_best(&statement);
990
991 let received = unwrap_statement(rx1.try_recv().expect(
994 "Should receive statement - if this fails, the `return` bug in \
995 notify_match_all_subscribers_best is present (should be `continue`)",
996 ));
997 let decoded_statement: Statement =
998 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
999 assert_eq!(decoded_statement, statement);
1000 }
1001
1002 #[tokio::test]
1003 async fn test_handle_with_match_any_filter() {
1004 let subscriptions_handle =
1005 SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1006
1007 let topic1 = Topic::from([8u8; 32]);
1008 let topic2 = Topic::from([9u8; 32]);
1009
1010 let (_tx, mut stream) = subscriptions_handle
1011 .subscribe(OptimizedTopicFilter::MatchAny(vec![topic1, topic2].into_iter().collect()));
1012
1013 let mut statement1 = signed_statement(1);
1015 statement1.set_topic(0, topic1);
1016 subscriptions_handle.notify(statement1.clone());
1017
1018 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1019 let decoded_statement: Statement =
1020 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1021 assert_eq!(decoded_statement, statement1);
1022
1023 let mut statement2 = signed_statement(2);
1025 statement2.set_topic(0, topic2);
1026 subscriptions_handle.notify(statement2.clone());
1027
1028 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1029 let decoded_statement: Statement =
1030 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1031 assert_eq!(decoded_statement, statement2);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_handle_with_any_filter() {
1036 let subscriptions_handle =
1037 SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1038
1039 let (_tx, mut stream) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1040
1041 let statement1 = signed_statement(1);
1043 subscriptions_handle.notify(statement1.clone());
1044
1045 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1046 let decoded_statement: Statement =
1047 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1048 assert_eq!(decoded_statement, statement1);
1049
1050 let mut statement2 = signed_statement(2);
1051 statement2.set_topic(0, Topic::from([99u8; 32]));
1052 subscriptions_handle.notify(statement2.clone());
1053
1054 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1055 let decoded_statement: Statement =
1056 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1057 assert_eq!(decoded_statement, statement2);
1058 }
1059
1060 #[tokio::test]
1061 async fn test_handle_multiple_subscribers_different_filters() {
1062 let subscriptions_handle =
1063 SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1064
1065 let topic1 = Topic::from([8u8; 32]);
1066 let topic2 = Topic::from([9u8; 32]);
1067
1068 let (_tx1, mut stream1) = subscriptions_handle
1070 .subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
1071
1072 let (_tx2, mut stream2) = subscriptions_handle
1074 .subscribe(OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()));
1075
1076 let (_tx3, mut stream3) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1078
1079 let mut statement1 = signed_statement(1);
1081 statement1.set_topic(0, topic1);
1082 subscriptions_handle.notify(statement1.clone());
1083
1084 let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1089 let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1090 assert_eq!(decoded2, statement1);
1091
1092 let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1093 let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1094 assert_eq!(decoded3, statement1);
1095
1096 let mut statement2 = signed_statement(2);
1098 statement2.set_topic(0, topic1);
1099 statement2.set_topic(1, topic2);
1100 subscriptions_handle.notify(statement2.clone());
1101
1102 let received1 = unwrap_statement(stream1.next().await.expect("stream1 should receive"));
1104 let decoded1: Statement = Statement::decode(&mut &received1.0[..]).unwrap();
1105 assert_eq!(decoded1, statement2);
1106
1107 let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1108 let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1109 assert_eq!(decoded2, statement2);
1110
1111 let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1112 let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1113 assert_eq!(decoded3, statement2);
1114 }
1115
1116 #[test]
1117 fn test_statement_without_topics_matches_only_any_filter() {
1118 let mut subscriptions = SubscriptionsInfo::new();
1119
1120 let (tx_match_all, rx_match_all) = async_channel::bounded::<StatementEvent>(10);
1121 let (tx_match_any, rx_match_any) = async_channel::bounded::<StatementEvent>(10);
1122 let (tx_any, rx_any) = async_channel::bounded::<StatementEvent>(10);
1123
1124 let topic1 = Topic::from([8u8; 32]);
1125 let topic2 = Topic::from([9u8; 32]);
1126
1127 let sub_match_all = SubscriptionInfo {
1129 topic_filter: OptimizedTopicFilter::MatchAll(
1130 vec![topic1, topic2].into_iter().collect(),
1131 ),
1132 seq_id: SeqID::from(1),
1133 tx: tx_match_all,
1134 };
1135 subscriptions.subscribe(sub_match_all);
1136
1137 let sub_match_any = SubscriptionInfo {
1139 topic_filter: OptimizedTopicFilter::MatchAny(
1140 vec![topic1, topic2].into_iter().collect(),
1141 ),
1142 seq_id: SeqID::from(2),
1143 tx: tx_match_any,
1144 };
1145 subscriptions.subscribe(sub_match_any);
1146
1147 let sub_any = SubscriptionInfo {
1149 topic_filter: OptimizedTopicFilter::Any,
1150 seq_id: SeqID::from(3),
1151 tx: tx_any,
1152 };
1153 subscriptions.subscribe(sub_any);
1154
1155 let statement = signed_statement(1);
1157 assert!(statement.topics().is_empty(), "Statement should have no topics");
1158
1159 subscriptions.notify_matching_filters(&statement);
1161
1162 let received =
1164 unwrap_statement(rx_any.try_recv().expect("Any filter should receive statement"));
1165 let decoded_statement: Statement =
1166 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1167 assert_eq!(decoded_statement, statement);
1168
1169 assert!(
1171 rx_match_all.try_recv().is_err(),
1172 "MatchAll should not receive statement without topics"
1173 );
1174
1175 assert!(
1177 rx_match_any.try_recv().is_err(),
1178 "MatchAny should not receive statement without topics"
1179 );
1180 }
1181}