referrerpolicy=no-referrer-when-downgrade

sc_statement_store/
subscription.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Subscription logic for statement store.
20//!
21//! Manages subscriptions to statement topics and notifies subscribers when new statements arrive.
22//! Uses multiple matcher tasks to handle subscriptions concurrently, each responsible for a subset
23//! of subscriptions. Each matcher task maintains its own list of subscriptions and matches incoming
24//! statements against them. When a new statement is submitted, it is sent to all matcher tasks for
25//! processing. If a statement matches a subscription's filter, it is sent to the subscriber via an
26//! async channel.
27//!
28//! This design allows for efficient handling of a large number of subscriptions and statements and
29//! can be scaled by adjusting the number of matcher tasks.
30
31// Buffer size for the matcher task channels, to backpressure the submission senders.
32// This value is generous to allow for bursts of statements without dropping any or backpressuring
33// too early.
34const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000;
35
36// Buffer size for individual subscriptions.
37const SUBSCRIPTION_BUFFER_SIZE: usize = 128;
38
39use futures::{Stream, StreamExt};
40use itertools::Itertools;
41
42use crate::LOG_TARGET;
43use sc_utils::id_sequence::SeqID;
44use sp_core::{traits::SpawnNamed, Bytes, Encode};
45pub use sp_statement_store::StatementStore;
46use sp_statement_store::{
47	OptimizedTopicFilter, Result, Statement, StatementEvent, Topic, MAX_TOPICS,
48};
49use std::{
50	collections::{hash_map::Entry, HashMap, HashSet},
51	sync::atomic::AtomicU64,
52};
53
54/// Trait for initiating statement store subscriptions from the RPC module.
55pub trait StatementStoreSubscriptionApi: Send + Sync {
56	/// Subscribe to statements matching the topic filter.
57	///
58	/// Returns existing matching statements, a sender channel to send matched statements and a
59	/// stream for receiving matched statements when they arrive.
60	fn subscribe_statement(
61		&self,
62		topic_filter: OptimizedTopicFilter,
63	) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>;
64}
65
66/// Messages sent to matcher tasks.
67#[derive(Clone, Debug)]
68pub enum MatcherMessage {
69	/// A new statement has been submitted.
70	NewStatement(Statement),
71	/// A new subscription has been created.
72	Subscribe(SubscriptionInfo),
73	/// Unsubscribe the subscription with the given ID.
74	Unsubscribe(SeqID),
75}
76
77// Handle to manage all subscriptions.
78pub struct SubscriptionsHandle {
79	// Sequence generator for subscription IDs, atomic for thread safety.
80	// Subscription creation is expensive enough that we don't worry about overflow here.
81	id_sequence: AtomicU64,
82	//  Subscriptions matchers handlers.
83	matchers: SubscriptionsMatchersHandlers,
84}
85
86impl SubscriptionsHandle {
87	/// Create a new SubscriptionsHandle with the given task spawner and number of filter workers.
88	pub(crate) fn new(
89		task_spawner: Box<dyn SpawnNamed>,
90		num_matcher_workers: usize,
91	) -> SubscriptionsHandle {
92		let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers);
93
94		for task in 0..num_matcher_workers {
95			let (subscription_matcher_sender, subscription_matcher_receiver) =
96				async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
97			subscriptions_matchers_senders.push(subscription_matcher_sender);
98			task_spawner.spawn_blocking(
99				"statement-store-subscription-filters",
100				Some("statement-store"),
101				Box::pin(async move {
102					let mut subscriptions = SubscriptionsInfo::new();
103					log::debug!(
104						target: LOG_TARGET,
105						"Started statement subscription matcher task: {task}"
106					);
107					loop {
108						let res = subscription_matcher_receiver.recv().await;
109						match res {
110							Ok(MatcherMessage::NewStatement(statement)) => {
111								subscriptions.notify_matching_filters(&statement);
112							},
113							Ok(MatcherMessage::Subscribe(info)) => {
114								subscriptions.subscribe(info);
115							},
116							Ok(MatcherMessage::Unsubscribe(seq_id)) => {
117								subscriptions.unsubscribe(seq_id);
118							},
119							Err(_) => {
120								// Expected when the subscription manager is dropped at shutdown.
121								log::error!(
122									target: LOG_TARGET,
123									"Statement subscription matcher channel closed: {task}"
124								);
125								break;
126							},
127						};
128					}
129				}),
130			);
131		}
132		SubscriptionsHandle {
133			id_sequence: AtomicU64::new(0),
134			matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders),
135		}
136	}
137
138	// Generate the next unique subscription ID.
139	fn next_id(&self) -> SeqID {
140		let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
141		SeqID::from(id)
142	}
143
144	/// Subscribe to statements matching the topic filter.
145	pub(crate) fn subscribe(
146		&self,
147		topic_filter: OptimizedTopicFilter,
148	) -> (async_channel::Sender<StatementEvent>, SubscriptionStatementsStream) {
149		let next_id = self.next_id();
150		let (tx, rx) = async_channel::bounded(SUBSCRIPTION_BUFFER_SIZE);
151		let subscription_info =
152			SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx };
153
154		let result = (
155			subscription_info.tx.clone(),
156			SubscriptionStatementsStream {
157				rx,
158				sub_id: subscription_info.seq_id,
159				matchers: self.matchers.clone(),
160			},
161		);
162
163		self.matchers
164			.send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info));
165		result
166	}
167
168	pub(crate) fn notify(&self, statement: Statement) {
169		self.matchers.send_all(MatcherMessage::NewStatement(statement));
170	}
171}
172
173// Information about all subscriptions.
174// Each matcher task will have its own instance of this struct.
175struct SubscriptionsInfo {
176	// Subscriptions organized by topic for MatchAll filters.
177	//
178	// Maps each topic to an array of HashMaps, where the array is indexed by
179	// `(number_of_topics_in_filter - 1)`. For example, a subscription requiring
180	// topics [A, B] (2 topics) will be stored at index 1 under both topic A and B.
181	//
182	// This structure allows efficient matching: when a statement arrives with N topics,
183	// we only need to check subscriptions that require exactly N or fewer topics.
184	subscriptions_match_all_by_topic:
185		HashMap<Topic, [HashMap<SeqID, SubscriptionInfo>; MAX_TOPICS]>,
186	// Subscriptions organized by topic for MatchAny filters.
187	subscriptions_match_any_by_topic: HashMap<Topic, HashMap<SeqID, SubscriptionInfo>>,
188	// Subscriptions that listen with Any filter (i.e., no topic filtering).
189	subscriptions_any: HashMap<SeqID, SubscriptionInfo>,
190	// Mapping from subscription ID to topic filter.
191	by_sub_id: HashMap<SeqID, OptimizedTopicFilter>,
192}
193
194// Information about a single subscription.
195#[derive(Clone, Debug)]
196pub(crate) struct SubscriptionInfo {
197	// The filter used for this subscription.
198	topic_filter: OptimizedTopicFilter,
199	// The unique ID of this subscription.
200	seq_id: SeqID,
201	// Channel to send matched statements to the subscriber.
202	tx: async_channel::Sender<StatementEvent>,
203}
204
205impl SubscriptionsInfo {
206	fn new() -> SubscriptionsInfo {
207		SubscriptionsInfo {
208			subscriptions_match_all_by_topic: HashMap::new(),
209			subscriptions_match_any_by_topic: HashMap::new(),
210			subscriptions_any: HashMap::new(),
211			by_sub_id: HashMap::new(),
212		}
213	}
214
215	// Subscribe a new subscription.
216	fn subscribe(&mut self, subscription_info: SubscriptionInfo) {
217		self.by_sub_id
218			.insert(subscription_info.seq_id, subscription_info.topic_filter.clone());
219		match &subscription_info.topic_filter {
220			OptimizedTopicFilter::Any => {
221				self.subscriptions_any.insert(subscription_info.seq_id, subscription_info);
222			},
223			OptimizedTopicFilter::MatchAll(topics) => {
224				for topic in topics {
225					self.subscriptions_match_all_by_topic.entry(*topic).or_default()
226						[topics.len() - 1]
227						.insert(subscription_info.seq_id, subscription_info.clone());
228				}
229			},
230			OptimizedTopicFilter::MatchAny(topics) => {
231				for topic in topics {
232					self.subscriptions_match_any_by_topic
233						.entry(*topic)
234						.or_default()
235						.insert(subscription_info.seq_id, subscription_info.clone());
236				}
237			},
238		};
239	}
240
241	// Notify a single subscriber, marking it for unsubscribing if sending fails.
242	fn notify_subscriber(
243		&self,
244		subscription: &SubscriptionInfo,
245		bytes_to_send: Bytes,
246		needs_unsubscribing: &mut HashSet<SeqID>,
247	) {
248		if let Err(err) = subscription.tx.try_send(StatementEvent::NewStatements {
249			statements: vec![bytes_to_send],
250			remaining: None,
251		}) {
252			log::debug!(
253				target: LOG_TARGET,
254				"Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err
255			);
256			// Mark subscription for unsubscribing, to give it a chance to recover the buffers are
257			// generous enough, if subscription cannot keep up we unsubscribe it.
258			needs_unsubscribing.insert(subscription.seq_id);
259		}
260	}
261
262	fn notify_matching_filters(&mut self, statement: &Statement) {
263		self.notify_match_all_subscribers_best(statement);
264		self.notify_match_any_subscribers(statement);
265		self.notify_any_subscribers(statement);
266	}
267
268	// Notify all subscribers with MatchAny filters that match the given statement.
269	fn notify_match_any_subscribers(&mut self, statement: &Statement) {
270		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
271		let mut already_notified: HashSet<SeqID> = HashSet::new();
272
273		let bytes_to_send: Bytes = statement.encode().into();
274		for statement_topic in statement.topics() {
275			if let Some(subscriptions) = self.subscriptions_match_any_by_topic.get(statement_topic)
276			{
277				for subscription in subscriptions
278					.values()
279					.filter(|subscription| already_notified.insert(subscription.seq_id))
280				{
281					self.notify_subscriber(
282						subscription,
283						bytes_to_send.clone(),
284						&mut needs_unsubscribing,
285					);
286				}
287			}
288		}
289
290		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
291		// recover and not miss statements.
292		for sub_id in needs_unsubscribing {
293			self.unsubscribe(sub_id);
294		}
295	}
296
297	// Notify all subscribers with MatchAll filters that match the given statement.
298	fn notify_match_all_subscribers_best(&mut self, statement: &Statement) {
299		let bytes_to_send: Bytes = statement.encode().into();
300		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
301		let num_topics = statement.topics().len();
302
303		// Check all combinations of topics in the statement to find matching subscriptions.
304		// This works well because the maximum allowed topics is small (MAX_TOPICS = 4).
305		for num_topics_to_check in 1..=num_topics {
306			for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
307				// Find the topic with the fewest subscriptions to minimize the number of checks.
308				let Some(Some(topic_with_fewest)) = topics_combination
309					.iter()
310					.map(|topic| self.subscriptions_match_all_by_topic.get(*topic))
311					.min_by_key(|subscriptions| {
312						subscriptions.map_or(0, |subscriptions_by_length| {
313							subscriptions_by_length[num_topics_to_check - 1].len()
314						})
315					})
316				else {
317					continue;
318				};
319
320				for subscription in topic_with_fewest[num_topics_to_check - 1]
321					.values()
322					.filter(|subscription| subscription.topic_filter.matches(statement))
323				{
324					self.notify_subscriber(
325						subscription,
326						bytes_to_send.clone(),
327						&mut needs_unsubscribing,
328					);
329				}
330			}
331		}
332		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
333		// recover and not miss statements.
334		for sub_id in needs_unsubscribing {
335			self.unsubscribe(sub_id);
336		}
337	}
338
339	// Notify all subscribers that don't filter by topic and want to receive all statements.
340	fn notify_any_subscribers(&mut self, statement: &Statement) {
341		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
342
343		let bytes_to_send: Bytes = statement.encode().into();
344		for subscription in self.subscriptions_any.values() {
345			self.notify_subscriber(subscription, bytes_to_send.clone(), &mut needs_unsubscribing);
346		}
347
348		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
349		// recover and not miss statements.
350		for sub_id in needs_unsubscribing {
351			self.unsubscribe(sub_id);
352		}
353	}
354
355	// Unsubscribe a subscription by its ID.
356	fn unsubscribe(&mut self, id: SeqID) {
357		let Some(entry) = self.by_sub_id.remove(&id) else {
358			return;
359		};
360
361		let topics = match &entry {
362			OptimizedTopicFilter::Any => {
363				self.subscriptions_any.remove(&id);
364				return;
365			},
366			OptimizedTopicFilter::MatchAll(topics) => topics,
367			OptimizedTopicFilter::MatchAny(topics) => topics,
368		};
369
370		// Remove subscription from relevant maps.
371		for topic in topics {
372			// Check MatchAny map.
373			if let Entry::Occupied(mut entry) = self.subscriptions_match_any_by_topic.entry(*topic)
374			{
375				entry.get_mut().remove(&id);
376				if entry.get().is_empty() {
377					entry.remove();
378				}
379			}
380			// Check MatchAll map.
381			if let Entry::Occupied(mut entry) = self.subscriptions_match_all_by_topic.entry(*topic)
382			{
383				for subscriptions in entry.get_mut().iter_mut() {
384					if subscriptions.remove(&id).is_some() {
385						break;
386					}
387				}
388				if entry.get().iter().all(|s| s.is_empty()) {
389					entry.remove();
390				}
391			}
392		}
393	}
394}
395
396// Handlers to communicate with subscription matcher tasks.
397#[derive(Clone)]
398pub struct SubscriptionsMatchersHandlers {
399	// Channels to send messages to matcher tasks.
400	matchers: Vec<async_channel::Sender<MatcherMessage>>,
401}
402
403impl SubscriptionsMatchersHandlers {
404	/// Create new SubscriptionsMatchersHandlers with the given matcher task senders.
405	fn new(matchers: Vec<async_channel::Sender<MatcherMessage>>) -> SubscriptionsMatchersHandlers {
406		SubscriptionsMatchersHandlers { matchers }
407	}
408
409	// Send a message to the matcher task responsible for the given subscription ID.
410	fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) {
411		let index: u64 = id.into();
412		// If matchers channels are full we backpressure the sender, in this case it will be the
413		// processing of new statements.
414		if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message)
415		{
416			log::error!(
417				target: LOG_TARGET,
418				"Failed to send statement to matcher task: {:?}", err
419			);
420		}
421	}
422
423	// Send a message to all matcher tasks.
424	fn send_all(&self, message: MatcherMessage) {
425		for sender in &self.matchers {
426			if let Err(err) = sender.send_blocking(message.clone()) {
427				log::error!(
428					target: LOG_TARGET,
429					"Failed to send message to matcher task: {:?}", err
430				);
431			}
432		}
433	}
434}
435
436// Stream of statements for a subscription.
437pub struct SubscriptionStatementsStream {
438	// Channel to receive statements.
439	pub rx: async_channel::Receiver<StatementEvent>,
440	// Subscription ID, used for cleanup on drop.
441	sub_id: SeqID,
442	// Reference to the matchers for cleanup.
443	matchers: SubscriptionsMatchersHandlers,
444}
445
446// When the stream is dropped, unsubscribe from the matchers.
447impl Drop for SubscriptionStatementsStream {
448	fn drop(&mut self) {
449		self.matchers
450			.send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id));
451	}
452}
453
454impl Stream for SubscriptionStatementsStream {
455	type Item = StatementEvent;
456
457	fn poll_next(
458		mut self: std::pin::Pin<&mut Self>,
459		cx: &mut std::task::Context<'_>,
460	) -> std::task::Poll<Option<Self::Item>> {
461		self.rx.poll_next_unpin(cx)
462	}
463}
464
465#[cfg(test)]
466mod tests {
467
468	use crate::tests::signed_statement;
469
470	use super::*;
471	use sp_core::Decode;
472	use sp_statement_store::Topic;
473
474	fn unwrap_statement(item: StatementEvent) -> Bytes {
475		match item {
476			StatementEvent::NewStatements { mut statements, .. } => {
477				assert_eq!(statements.len(), 1, "Expected exactly one statement in batch");
478				statements.remove(0)
479			},
480		}
481	}
482	#[test]
483	fn test_subscribe_unsubscribe() {
484		let mut subscriptions = SubscriptionsInfo::new();
485
486		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
487		let topic1 = Topic::from([8u8; 32]);
488		let topic2 = Topic::from([9u8; 32]);
489		let sub_info1 = SubscriptionInfo {
490			topic_filter: OptimizedTopicFilter::MatchAll(
491				vec![topic1, topic2].into_iter().collect(),
492			),
493			seq_id: SeqID::from(1),
494			tx: tx1,
495		};
496		subscriptions.subscribe(sub_info1.clone());
497		assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
498		assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
499		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
500		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
501
502		subscriptions.unsubscribe(sub_info1.seq_id);
503		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
504		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
505	}
506
507	#[test]
508	fn test_subscribe_any() {
509		let mut subscriptions = SubscriptionsInfo::new();
510		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
511		let sub_info1 = SubscriptionInfo {
512			topic_filter: OptimizedTopicFilter::Any,
513			seq_id: SeqID::from(1),
514			tx: tx1,
515		};
516		subscriptions.subscribe(sub_info1.clone());
517		assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
518		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
519		subscriptions.unsubscribe(sub_info1.seq_id);
520		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
521	}
522
523	#[test]
524	fn test_subscribe_match_any() {
525		let mut subscriptions = SubscriptionsInfo::new();
526
527		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
528		let topic1 = Topic::from([8u8; 32]);
529		let topic2 = Topic::from([9u8; 32]);
530		let sub_info1 = SubscriptionInfo {
531			topic_filter: OptimizedTopicFilter::MatchAny(
532				vec![topic1, topic2].into_iter().collect(),
533			),
534			seq_id: SeqID::from(1),
535			tx: tx1,
536		};
537		subscriptions.subscribe(sub_info1.clone());
538		assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
539		assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic2));
540		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
541		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
542
543		subscriptions.unsubscribe(sub_info1.seq_id);
544		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
545		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
546	}
547
548	#[test]
549	fn test_notify_any_subscribers() {
550		let mut subscriptions = SubscriptionsInfo::new();
551
552		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
553		let sub_info1 = SubscriptionInfo {
554			topic_filter: OptimizedTopicFilter::Any,
555			seq_id: SeqID::from(1),
556			tx: tx1,
557		};
558		subscriptions.subscribe(sub_info1.clone());
559
560		let statement = signed_statement(1);
561		subscriptions.notify_matching_filters(&statement);
562
563		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
564		let decoded_statement: Statement =
565			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
566		assert_eq!(decoded_statement, statement);
567	}
568
569	#[test]
570	fn test_notify_match_all_subscribers() {
571		let mut subscriptions = SubscriptionsInfo::new();
572
573		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
574		let topic1 = Topic::from([8u8; 32]);
575		let topic2 = Topic::from([9u8; 32]);
576		let sub_info1 = SubscriptionInfo {
577			topic_filter: OptimizedTopicFilter::MatchAll(
578				vec![topic1, topic2].into_iter().collect(),
579			),
580			seq_id: SeqID::from(1),
581			tx: tx1,
582		};
583		subscriptions.subscribe(sub_info1.clone());
584
585		let mut statement = signed_statement(1);
586		statement.set_topic(0, topic2);
587		subscriptions.notify_matching_filters(&statement);
588
589		// Should not receive yet, only one topic matched.
590		assert!(rx1.try_recv().is_err());
591
592		statement.set_topic(1, topic1);
593		subscriptions.notify_matching_filters(&statement);
594
595		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
596		let decoded_statement: Statement =
597			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
598		assert_eq!(decoded_statement, statement);
599	}
600
601	#[test]
602	fn test_notify_match_any_subscribers() {
603		let mut subscriptions = SubscriptionsInfo::new();
604		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
605		let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
606
607		let topic1 = Topic::from([8u8; 32]);
608		let topic2 = Topic::from([9u8; 32]);
609		let sub_info1 = SubscriptionInfo {
610			topic_filter: OptimizedTopicFilter::MatchAny(
611				vec![topic1, topic2].into_iter().collect(),
612			),
613			seq_id: SeqID::from(1),
614			tx: tx1,
615		};
616
617		let sub_info2 = SubscriptionInfo {
618			topic_filter: OptimizedTopicFilter::MatchAny(vec![topic2].into_iter().collect()),
619			seq_id: SeqID::from(2),
620			tx: tx2,
621		};
622
623		subscriptions.subscribe(sub_info1.clone());
624		subscriptions.subscribe(sub_info2.clone());
625
626		let mut statement = signed_statement(1);
627		statement.set_topic(0, topic1);
628		statement.set_topic(1, topic2);
629		subscriptions.notify_match_any_subscribers(&statement);
630
631		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
632		let decoded_statement: Statement =
633			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
634		assert_eq!(decoded_statement, statement);
635
636		let received = unwrap_statement(rx2.try_recv().expect("Should receive statement"));
637		let decoded_statement: Statement =
638			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
639		assert_eq!(decoded_statement, statement);
640	}
641
642	#[tokio::test]
643	async fn test_subscription_handle_with_different_workers_number() {
644		for num_workers in 1..5 {
645			let subscriptions_handle = SubscriptionsHandle::new(
646				Box::new(sp_core::testing::TaskExecutor::new()),
647				num_workers,
648			);
649
650			let topic1 = Topic::from([8u8; 32]);
651			let topic2 = Topic::from([9u8; 32]);
652
653			let streams = (0..5)
654				.into_iter()
655				.map(|_| {
656					subscriptions_handle.subscribe(OptimizedTopicFilter::MatchAll(
657						vec![topic1, topic2].into_iter().collect(),
658					))
659				})
660				.collect::<Vec<_>>();
661
662			let mut statement = signed_statement(1);
663			statement.set_topic(0, topic2);
664			subscriptions_handle.notify(statement.clone());
665
666			statement.set_topic(1, topic1);
667			subscriptions_handle.notify(statement.clone());
668
669			for (_tx, mut stream) in streams {
670				let received =
671					unwrap_statement(stream.next().await.expect("Should receive statement"));
672				let decoded_statement: Statement =
673					Statement::decode(&mut &received.0[..]).expect("Should decode statement");
674				assert_eq!(decoded_statement, statement);
675			}
676		}
677	}
678
679	#[tokio::test]
680	async fn test_handle_unsubscribe() {
681		let subscriptions_handle =
682			SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
683
684		let topic1 = Topic::from([8u8; 32]);
685		let topic2 = Topic::from([9u8; 32]);
686
687		let (tx, mut stream) = subscriptions_handle
688			.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
689
690		let mut statement = signed_statement(1);
691		statement.set_topic(0, topic1);
692		statement.set_topic(1, topic2);
693
694		// Send a statement and verify it's received.
695		subscriptions_handle.notify(statement.clone());
696
697		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
698		let decoded_statement: Statement =
699			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
700		assert_eq!(decoded_statement, statement);
701
702		// Drop the stream to trigger unsubscribe.
703		drop(stream);
704
705		// Give some time for the unsubscribe message to be processed.
706		tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
707
708		// Send another statement after unsubscribe.
709		let mut statement2 = signed_statement(2);
710		statement2.set_topic(0, topic1);
711		statement2.set_topic(1, topic2);
712		subscriptions_handle.notify(statement2.clone());
713
714		// The tx channel should be closed/disconnected since the subscription was removed.
715		// Give some time for the notification to potentially arrive (it shouldn't).
716		tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
717
718		// The sender should fail to send since the subscription is gone.
719		// We verify by checking that the tx channel is disconnected.
720		assert!(tx.is_closed(), "Sender should be closed after unsubscribe");
721	}
722
723	#[test]
724	fn test_unsubscribe_nonexistent() {
725		let mut subscriptions = SubscriptionsInfo::new();
726		// Unsubscribing a non-existent subscription should not panic.
727		subscriptions.unsubscribe(SeqID::from(999));
728		// Verify internal state is still valid.
729		assert!(subscriptions.by_sub_id.is_empty());
730		assert!(subscriptions.subscriptions_any.is_empty());
731		assert!(subscriptions.subscriptions_match_all_by_topic.is_empty());
732		assert!(subscriptions.subscriptions_match_any_by_topic.is_empty());
733	}
734
735	#[test]
736	fn test_multiple_subscriptions_same_topic() {
737		let mut subscriptions = SubscriptionsInfo::new();
738
739		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
740		let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
741		let topic1 = Topic::from([8u8; 32]);
742		let topic2 = Topic::from([9u8; 32]);
743
744		let sub_info1 = SubscriptionInfo {
745			topic_filter: OptimizedTopicFilter::MatchAll(
746				vec![topic1, topic2].into_iter().collect(),
747			),
748			seq_id: SeqID::from(1),
749			tx: tx1,
750		};
751		let sub_info2 = SubscriptionInfo {
752			topic_filter: OptimizedTopicFilter::MatchAll(
753				vec![topic1, topic2].into_iter().collect(),
754			),
755			seq_id: SeqID::from(2),
756			tx: tx2,
757		};
758
759		subscriptions.subscribe(sub_info1.clone());
760		subscriptions.subscribe(sub_info2.clone());
761
762		// Both subscriptions should be registered under each topic.
763		assert_eq!(
764			subscriptions
765				.subscriptions_match_all_by_topic
766				.get(&topic1)
767				.unwrap()
768				.iter()
769				.map(|s| s.len())
770				.sum::<usize>(),
771			2
772		);
773		assert_eq!(
774			subscriptions
775				.subscriptions_match_all_by_topic
776				.get(&topic2)
777				.unwrap()
778				.iter()
779				.map(|s| s.len())
780				.sum::<usize>(),
781			2
782		);
783
784		// Send a matching statement.
785		let mut statement = signed_statement(1);
786		statement.set_topic(0, topic1);
787		statement.set_topic(1, topic2);
788		subscriptions.notify_matching_filters(&statement);
789
790		// Both should receive.
791		assert!(rx1.try_recv().is_ok());
792		assert!(rx2.try_recv().is_ok());
793
794		// Unsubscribe one.
795		subscriptions.unsubscribe(sub_info1.seq_id);
796
797		// Only one subscription should remain.
798		assert_eq!(
799			subscriptions
800				.subscriptions_match_all_by_topic
801				.get(&topic1)
802				.unwrap()
803				.iter()
804				.map(|s| s.len())
805				.sum::<usize>(),
806			1
807		);
808		assert_eq!(
809			subscriptions
810				.subscriptions_match_all_by_topic
811				.get(&topic2)
812				.unwrap()
813				.iter()
814				.map(|s| s.len())
815				.sum::<usize>(),
816			1
817		);
818		assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
819		assert!(subscriptions.by_sub_id.contains_key(&sub_info2.seq_id));
820
821		// Send another statement.
822		subscriptions.notify_matching_filters(&statement);
823
824		// Only sub2 should receive.
825		assert!(rx2.try_recv().is_ok());
826		assert!(rx1.try_recv().is_err());
827	}
828
829	#[test]
830	fn test_subscriber_auto_unsubscribe_on_channel_full() {
831		let mut subscriptions = SubscriptionsInfo::new();
832
833		// Create a channel with capacity 1.
834		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(1);
835		let topic1 = Topic::from([8u8; 32]);
836
837		let sub_info1 = SubscriptionInfo {
838			topic_filter: OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()),
839			seq_id: SeqID::from(1),
840			tx: tx1,
841		};
842		subscriptions.subscribe(sub_info1.clone());
843
844		let mut statement = signed_statement(1);
845		statement.set_topic(0, topic1);
846
847		// First notification should succeed.
848		subscriptions.notify_matching_filters(&statement);
849		assert!(rx1.try_recv().is_ok());
850
851		// Fill the channel.
852		subscriptions.notify_matching_filters(&statement);
853		// Channel is now full.
854
855		// Next notification should trigger auto-unsubscribe.
856		subscriptions.notify_matching_filters(&statement);
857
858		// Subscription should be removed.
859		assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
860		assert!(!subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
861	}
862
863	#[test]
864	fn test_match_any_receives_once_per_statement() {
865		let mut subscriptions = SubscriptionsInfo::new();
866
867		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
868		let topic1 = Topic::from([8u8; 32]);
869		let topic2 = Topic::from([9u8; 32]);
870
871		// Subscribe to MatchAny with both topics.
872		let sub_info1 = SubscriptionInfo {
873			topic_filter: OptimizedTopicFilter::MatchAny(
874				vec![topic1, topic2].into_iter().collect(),
875			),
876			seq_id: SeqID::from(1),
877			tx: tx1,
878		};
879		subscriptions.subscribe(sub_info1.clone());
880
881		// Create a statement that matches BOTH topics.
882		let mut statement = signed_statement(1);
883		statement.set_topic(0, topic1);
884		statement.set_topic(1, topic2);
885
886		subscriptions.notify_match_any_subscribers(&statement);
887
888		// Should receive exactly once, not twice.
889		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
890		let decoded_statement: Statement =
891			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
892		assert_eq!(decoded_statement, statement);
893
894		// No more messages.
895		assert!(rx1.try_recv().is_err());
896	}
897
898	#[test]
899	fn test_match_all_with_single_topic_matches_statement_with_two_topics() {
900		let mut subscriptions = SubscriptionsInfo::new();
901
902		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
903		let topic1 = Topic::from([8u8; 32]);
904		let topic2 = Topic::from([9u8; 32]);
905
906		// Subscribe with MatchAll on only topic1.
907		let sub_info1 = SubscriptionInfo {
908			topic_filter: OptimizedTopicFilter::MatchAll(vec![topic1].into_iter().collect()),
909			seq_id: SeqID::from(1),
910			tx: tx1,
911		};
912		subscriptions.subscribe(sub_info1.clone());
913
914		// Create a statement that has BOTH topic1 and topic2.
915		let mut statement = signed_statement(1);
916		statement.set_topic(0, topic1);
917		statement.set_topic(1, topic2);
918
919		subscriptions.notify_matching_filters(&statement);
920
921		// Should receive because the statement contains topic1 (which is the only required topic).
922		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
923		let decoded_statement: Statement =
924			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
925		assert_eq!(decoded_statement, statement);
926
927		// No more messages.
928		assert!(rx1.try_recv().is_err());
929	}
930
931	#[test]
932	fn test_match_all_no_matching_topics() {
933		let mut subscriptions = SubscriptionsInfo::new();
934
935		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
936		let topic1 = Topic::from([8u8; 32]);
937		let topic2 = Topic::from([9u8; 32]);
938		let topic3 = Topic::from([10u8; 32]);
939
940		let sub_info1 = SubscriptionInfo {
941			topic_filter: OptimizedTopicFilter::MatchAll(
942				vec![topic1, topic2].into_iter().collect(),
943			),
944			seq_id: SeqID::from(1),
945			tx: tx1,
946		};
947		subscriptions.subscribe(sub_info1.clone());
948
949		// Statement with completely different topics.
950		let mut statement = signed_statement(1);
951		statement.set_topic(0, topic3);
952
953		subscriptions.notify_matching_filters(&statement);
954
955		// Should not receive anything.
956		assert!(rx1.try_recv().is_err());
957	}
958
959	#[test]
960	fn test_match_all_with_unsubscribed_topic_first_in_statement() {
961		// This test exposes a bug where `return` is used instead of `continue` in
962		// `notify_match_all_subscribers_best`. When a statement has a topic that has no
963		// subscriptions (not in the map), the function returns early instead of checking
964		// subsequent topic combinations.
965		let mut subscriptions = SubscriptionsInfo::new();
966
967		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
968		// topic1 will have NO subscriptions
969		let topic1 = Topic::from([1u8; 32]);
970		// topic2 WILL have a subscription
971		let topic2 = Topic::from([2u8; 32]);
972
973		// Subscribe only to topic2 with MatchAll filter.
974		let sub_info1 = SubscriptionInfo {
975			topic_filter: OptimizedTopicFilter::MatchAll(vec![topic2].into_iter().collect()),
976			seq_id: SeqID::from(1),
977			tx: tx1,
978		};
979		subscriptions.subscribe(sub_info1);
980
981		// Create a statement with BOTH topics. topic1 comes first (lower bytes).
982		// When iterating combinations(1), [topic1] is checked before [topic2].
983		// Since topic1 has no subscriptions, the buggy `return` exits early,
984		// preventing the [topic2] combination from being checked.
985		let mut statement = signed_statement(1);
986		statement.set_topic(0, topic1);
987		statement.set_topic(1, topic2);
988
989		subscriptions.notify_match_all_subscribers_best(&statement);
990
991		// With the bug: rx1.try_recv() fails because the function returned early.
992		// With the fix: rx1.try_recv() succeeds because [topic2] combination is checked.
993		let received = unwrap_statement(rx1.try_recv().expect(
994			"Should receive statement - if this fails, the `return` bug in \
995			 notify_match_all_subscribers_best is present (should be `continue`)",
996		));
997		let decoded_statement: Statement =
998			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
999		assert_eq!(decoded_statement, statement);
1000	}
1001
1002	#[tokio::test]
1003	async fn test_handle_with_match_any_filter() {
1004		let subscriptions_handle =
1005			SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1006
1007		let topic1 = Topic::from([8u8; 32]);
1008		let topic2 = Topic::from([9u8; 32]);
1009
1010		let (_tx, mut stream) = subscriptions_handle
1011			.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1, topic2].into_iter().collect()));
1012
1013		// Statement matching only topic1.
1014		let mut statement1 = signed_statement(1);
1015		statement1.set_topic(0, topic1);
1016		subscriptions_handle.notify(statement1.clone());
1017
1018		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1019		let decoded_statement: Statement =
1020			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1021		assert_eq!(decoded_statement, statement1);
1022
1023		// Statement matching only topic2.
1024		let mut statement2 = signed_statement(2);
1025		statement2.set_topic(0, topic2);
1026		subscriptions_handle.notify(statement2.clone());
1027
1028		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1029		let decoded_statement: Statement =
1030			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1031		assert_eq!(decoded_statement, statement2);
1032	}
1033
1034	#[tokio::test]
1035	async fn test_handle_with_any_filter() {
1036		let subscriptions_handle =
1037			SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1038
1039		let (_tx, mut stream) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1040
1041		// Send statements with various topics.
1042		let statement1 = signed_statement(1);
1043		subscriptions_handle.notify(statement1.clone());
1044
1045		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1046		let decoded_statement: Statement =
1047			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1048		assert_eq!(decoded_statement, statement1);
1049
1050		let mut statement2 = signed_statement(2);
1051		statement2.set_topic(0, Topic::from([99u8; 32]));
1052		subscriptions_handle.notify(statement2.clone());
1053
1054		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1055		let decoded_statement: Statement =
1056			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1057		assert_eq!(decoded_statement, statement2);
1058	}
1059
1060	#[tokio::test]
1061	async fn test_handle_multiple_subscribers_different_filters() {
1062		let subscriptions_handle =
1063			SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 2);
1064
1065		let topic1 = Topic::from([8u8; 32]);
1066		let topic2 = Topic::from([9u8; 32]);
1067
1068		// Subscriber 1: MatchAll on topic1 and topic2.
1069		let (_tx1, mut stream1) = subscriptions_handle
1070			.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
1071
1072		// Subscriber 2: MatchAny on topic1.
1073		let (_tx2, mut stream2) = subscriptions_handle
1074			.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()));
1075
1076		// Subscriber 3: Any.
1077		let (_tx3, mut stream3) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1078
1079		// Statement matching only topic1.
1080		let mut statement1 = signed_statement(1);
1081		statement1.set_topic(0, topic1);
1082		subscriptions_handle.notify(statement1.clone());
1083
1084		// stream1 should NOT receive (needs both topics).
1085		// stream2 should receive (MatchAny topic1).
1086		// stream3 should receive (Any).
1087
1088		let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1089		let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1090		assert_eq!(decoded2, statement1);
1091
1092		let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1093		let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1094		assert_eq!(decoded3, statement1);
1095
1096		// Statement matching both topics.
1097		let mut statement2 = signed_statement(2);
1098		statement2.set_topic(0, topic1);
1099		statement2.set_topic(1, topic2);
1100		subscriptions_handle.notify(statement2.clone());
1101
1102		// All should receive.
1103		let received1 = unwrap_statement(stream1.next().await.expect("stream1 should receive"));
1104		let decoded1: Statement = Statement::decode(&mut &received1.0[..]).unwrap();
1105		assert_eq!(decoded1, statement2);
1106
1107		let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1108		let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1109		assert_eq!(decoded2, statement2);
1110
1111		let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1112		let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1113		assert_eq!(decoded3, statement2);
1114	}
1115
1116	#[test]
1117	fn test_statement_without_topics_matches_only_any_filter() {
1118		let mut subscriptions = SubscriptionsInfo::new();
1119
1120		let (tx_match_all, rx_match_all) = async_channel::bounded::<StatementEvent>(10);
1121		let (tx_match_any, rx_match_any) = async_channel::bounded::<StatementEvent>(10);
1122		let (tx_any, rx_any) = async_channel::bounded::<StatementEvent>(10);
1123
1124		let topic1 = Topic::from([8u8; 32]);
1125		let topic2 = Topic::from([9u8; 32]);
1126
1127		// Subscribe with MatchAll filter.
1128		let sub_match_all = SubscriptionInfo {
1129			topic_filter: OptimizedTopicFilter::MatchAll(
1130				vec![topic1, topic2].into_iter().collect(),
1131			),
1132			seq_id: SeqID::from(1),
1133			tx: tx_match_all,
1134		};
1135		subscriptions.subscribe(sub_match_all);
1136
1137		// Subscribe with MatchAny filter.
1138		let sub_match_any = SubscriptionInfo {
1139			topic_filter: OptimizedTopicFilter::MatchAny(
1140				vec![topic1, topic2].into_iter().collect(),
1141			),
1142			seq_id: SeqID::from(2),
1143			tx: tx_match_any,
1144		};
1145		subscriptions.subscribe(sub_match_any);
1146
1147		// Subscribe with Any filter.
1148		let sub_any = SubscriptionInfo {
1149			topic_filter: OptimizedTopicFilter::Any,
1150			seq_id: SeqID::from(3),
1151			tx: tx_any,
1152		};
1153		subscriptions.subscribe(sub_any);
1154
1155		// Create a statement without any topics set.
1156		let statement = signed_statement(1);
1157		assert!(statement.topics().is_empty(), "Statement should have no topics");
1158
1159		// Notify all matching filters.
1160		subscriptions.notify_matching_filters(&statement);
1161
1162		// Any should receive (matches all statements regardless of topics).
1163		let received =
1164			unwrap_statement(rx_any.try_recv().expect("Any filter should receive statement"));
1165		let decoded_statement: Statement =
1166			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1167		assert_eq!(decoded_statement, statement);
1168
1169		// MatchAll should NOT receive (statement has no topics, filter requires topic1 AND topic2).
1170		assert!(
1171			rx_match_all.try_recv().is_err(),
1172			"MatchAll should not receive statement without topics"
1173		);
1174
1175		// MatchAny should NOT receive (statement has no topics, filter requires topic1 OR topic2).
1176		assert!(
1177			rx_match_any.try_recv().is_err(),
1178			"MatchAny should not receive statement without topics"
1179		);
1180	}
1181}