1use crate::{
61	protocol::notifications::upgrade::{
62		NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
63		NotificationsOutSubstream, UpgradeCollec,
64	},
65	service::metrics::NotificationMetrics,
66	types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71	channel::mpsc,
72	lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73	prelude::*,
74};
75use libp2p::{
76	swarm::{
77		handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
78		SubstreamProtocol,
79	},
80	PeerId,
81};
82
83use parking_lot::{Mutex, RwLock};
84use std::{
85	collections::VecDeque,
86	mem,
87	pin::Pin,
88	sync::Arc,
89	task::{Context, Poll},
90	time::Duration,
91};
92
93const LOG_TARGET: &str = "sub-libp2p::notification::handler";
95
96pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
99
100const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
102
103const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
106
107const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
111
112pub struct NotifsHandler {
116	protocols: Vec<Protocol>,
118
119	keep_alive: bool,
121
122	keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
125
126	peer_id: PeerId,
128
129	events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
131
132	metrics: Option<Arc<NotificationMetrics>>,
134}
135
136impl NotifsHandler {
137	pub fn new(
139		peer_id: PeerId,
140		protocols: Vec<ProtocolConfig>,
141		metrics: Option<NotificationMetrics>,
142	) -> Self {
143		Self {
144			protocols: protocols
145				.into_iter()
146				.map(|config| {
147					let in_upgrade = NotificationsIn::new(
148						config.name.clone(),
149						config.fallback_names.clone(),
150						config.max_notification_size,
151					);
152
153					Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
154				})
155				.collect(),
156			peer_id,
157			keep_alive: true,
159			keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
163			events_queue: VecDeque::with_capacity(16),
164			metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
165		}
166	}
167}
168
169#[derive(Debug, Clone)]
171pub struct ProtocolConfig {
172	pub name: ProtocolName,
174	pub fallback_names: Vec<ProtocolName>,
176	pub handshake: Arc<RwLock<Vec<u8>>>,
178	pub max_notification_size: u64,
180}
181
182struct Protocol {
184	config: ProtocolConfig,
186
187	in_upgrade: NotificationsIn,
189
190	state: State,
192}
193
194enum State {
196	Closed {
198		pending_opening: bool,
200	},
201
202	OpenDesiredByRemote {
205		in_substream: NotificationsInSubstream<Stream>,
207
208		pending_opening: bool,
210	},
211
212	Opening {
218		in_substream: Option<NotificationsInSubstream<Stream>>,
220		inbound: bool,
222	},
223
224	Open {
226		notifications_sink_rx: stream::Peekable<
232			stream::Select<
233				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
234				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
235			>,
236		>,
237
238		out_substream: Option<NotificationsOutSubstream<Stream>>,
244
245		in_substream: Option<NotificationsInSubstream<Stream>>,
251	},
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum CloseReason {
257	RemoteRequest,
261
262	ProtocolMisbehavior,
266}
267
268#[derive(Debug, Clone)]
270pub enum NotifsHandlerIn {
271	Open {
279		protocol_index: usize,
281
282		peer_id: PeerId,
284	},
285
286	Close {
291		protocol_index: usize,
293	},
294}
295
296#[derive(Debug)]
298pub enum NotifsHandlerOut {
299	OpenResultOk {
301		protocol_index: usize,
303		negotiated_fallback: Option<ProtocolName>,
305		received_handshake: Vec<u8>,
308		notifications_sink: NotificationsSink,
310		inbound: bool,
312	},
313
314	OpenResultErr {
317		protocol_index: usize,
319	},
320
321	CloseResult {
323		protocol_index: usize,
325	},
326
327	OpenDesiredByRemote {
333		protocol_index: usize,
335		handshake: Vec<u8>,
337	},
338
339	CloseDesired {
345		protocol_index: usize,
347
348		reason: CloseReason,
350	},
351
352	Notification {
356		protocol_index: usize,
358		message: BytesMut,
360	},
361
362	Close {
364		protocol_index: usize,
366	},
367}
368
369#[derive(Debug, Clone)]
373pub struct NotificationsSink {
374	inner: Arc<NotificationsSinkInner>,
375	metrics: Option<Arc<NotificationMetrics>>,
376}
377
378impl NotificationsSink {
379	pub fn new(
382		peer_id: PeerId,
383	) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
384	{
385		let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
386		let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
387		(
388			NotificationsSink {
389				inner: Arc::new(NotificationsSinkInner {
390					peer_id,
391					async_channel: FuturesMutex::new(async_tx),
392					sync_channel: Mutex::new(Some(sync_tx)),
393				}),
394				metrics: None,
395			},
396			async_rx,
397			sync_rx,
398		)
399	}
400
401	pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
403		&self.metrics
404	}
405}
406
407#[derive(Debug)]
408struct NotificationsSinkInner {
409	peer_id: PeerId,
411	async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
413	sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
420}
421
422#[derive(Debug, PartialEq, Eq)]
425pub enum NotificationsSinkMessage {
426	Notification { message: Vec<u8> },
429
430	ForceClose,
432}
433
434impl NotificationsSink {
435	pub fn peer_id(&self) -> &PeerId {
437		&self.inner.peer_id
438	}
439
440	pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
450		let mut lock = self.inner.sync_channel.lock();
451
452		if let Some(tx) = lock.as_mut() {
453			let message = message.into();
454			let result = tx.try_send(NotificationsSinkMessage::Notification { message });
455
456			if result.is_err() {
457				let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
460				debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
461
462				*lock = None;
464			}
465		}
466	}
467
468	pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
475		let mut lock = self.inner.async_channel.lock().await;
476
477		let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
478		if poll_ready.is_ok() {
479			Ok(Ready { lock })
480		} else {
481			Err(())
482		}
483	}
484}
485
486#[must_use]
488#[derive(Debug)]
489pub struct Ready<'a> {
490	lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
492}
493
494impl<'a> Ready<'a> {
495	pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
499		self.lock
500			.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
501			.map_err(|_| ())
502	}
503}
504
505impl ConnectionHandler for NotifsHandler {
506	type FromBehaviour = NotifsHandlerIn;
507	type ToBehaviour = NotifsHandlerOut;
508	type InboundProtocol = UpgradeCollec<NotificationsIn>;
509	type OutboundProtocol = NotificationsOut;
510	type OutboundOpenInfo = usize;
512	type InboundOpenInfo = ();
513
514	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
515		let protocols = self
516			.protocols
517			.iter()
518			.map(|p| p.in_upgrade.clone())
519			.collect::<UpgradeCollec<_>>();
520
521		SubstreamProtocol::new(protocols, ())
522	}
523
524	fn on_connection_event(
525		&mut self,
526		event: ConnectionEvent<
527			'_,
528			Self::InboundProtocol,
529			Self::OutboundProtocol,
530			Self::InboundOpenInfo,
531			Self::OutboundOpenInfo,
532		>,
533	) {
534		match event {
535			ConnectionEvent::FullyNegotiatedInbound(inbound) => {
536				let (mut in_substream_open, protocol_index) = inbound.protocol;
537				let protocol_info = &mut self.protocols[protocol_index];
538
539				match protocol_info.state {
540					State::Closed { pending_opening } => {
541						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
542							NotifsHandlerOut::OpenDesiredByRemote {
543								protocol_index,
544								handshake: in_substream_open.handshake,
545							},
546						));
547
548						protocol_info.state = State::OpenDesiredByRemote {
549							in_substream: in_substream_open.substream,
550							pending_opening,
551						};
552					},
553					State::OpenDesiredByRemote { .. } => {
554						return
562					},
563					State::Opening { ref mut in_substream, .. } |
564					State::Open { ref mut in_substream, .. } => {
565						if in_substream.is_some() {
566							return
568						}
569
570						let handshake_message = protocol_info.config.handshake.read().clone();
573						in_substream_open.substream.send_handshake(handshake_message);
574						*in_substream = Some(in_substream_open.substream);
575					},
576				}
577			},
578			ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
579				let (new_open, protocol_index) = (outbound.protocol, outbound.info);
580
581				match self.protocols[protocol_index].state {
582					State::Closed { ref mut pending_opening } |
583					State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
584						debug_assert!(*pending_opening);
585						*pending_opening = false;
586					},
587					State::Open { .. } => {
588						log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
589						debug_assert!(false);
590					},
591					State::Opening { ref mut in_substream, inbound } => {
592						let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
593						let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
594						let notifications_sink = NotificationsSink {
595							inner: Arc::new(NotificationsSinkInner {
596								peer_id: self.peer_id,
597								async_channel: FuturesMutex::new(async_tx),
598								sync_channel: Mutex::new(Some(sync_tx)),
599							}),
600							metrics: self.metrics.clone(),
601						};
602
603						self.protocols[protocol_index].state = State::Open {
604							notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
605								.peekable(),
606							out_substream: Some(new_open.substream),
607							in_substream: in_substream.take(),
608						};
609
610						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611							NotifsHandlerOut::OpenResultOk {
612								protocol_index,
613								negotiated_fallback: new_open.negotiated_fallback,
614								received_handshake: new_open.handshake,
615								notifications_sink,
616								inbound,
617							},
618						));
619					},
620				}
621			},
622			ConnectionEvent::AddressChange(_address_change) => {},
623			ConnectionEvent::LocalProtocolsChange(_) => {},
624			ConnectionEvent::RemoteProtocolsChange(_) => {},
625			ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
626				[dial_upgrade_error.info]
627				.state
628			{
629				State::Closed { ref mut pending_opening } |
630				State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
631					debug_assert!(*pending_opening);
632					*pending_opening = false;
633				},
634
635				State::Opening { .. } => {
636					self.protocols[dial_upgrade_error.info].state =
637						State::Closed { pending_opening: false };
638
639					self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
640						NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
641					));
642				},
643
644				State::Open { .. } => debug_assert!(false),
646			},
647			ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
648			event => {
649				log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
650			},
651		}
652	}
653
654	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
655		match message {
656			NotifsHandlerIn::Open { protocol_index, peer_id } => {
657				let protocol_info = &mut self.protocols[protocol_index];
658				match &mut protocol_info.state {
659					State::Closed { pending_opening } => {
660						if !*pending_opening {
661							let proto = NotificationsOut::new(
662								protocol_info.config.name.clone(),
663								protocol_info.config.fallback_names.clone(),
664								protocol_info.config.handshake.read().clone(),
665								protocol_info.config.max_notification_size,
666								peer_id,
667							);
668
669							self.events_queue.push_back(
670								ConnectionHandlerEvent::OutboundSubstreamRequest {
671									protocol: SubstreamProtocol::new(proto, protocol_index)
672										.with_timeout(OPEN_TIMEOUT),
673								},
674							);
675						}
676
677						protocol_info.state = State::Opening { in_substream: None, inbound: false };
678					},
679					State::OpenDesiredByRemote { pending_opening, in_substream } => {
680						let handshake_message = protocol_info.config.handshake.read().clone();
681
682						if !*pending_opening {
683							let proto = NotificationsOut::new(
684								protocol_info.config.name.clone(),
685								protocol_info.config.fallback_names.clone(),
686								handshake_message.clone(),
687								protocol_info.config.max_notification_size,
688								peer_id,
689							);
690
691							self.events_queue.push_back(
692								ConnectionHandlerEvent::OutboundSubstreamRequest {
693									protocol: SubstreamProtocol::new(proto, protocol_index)
694										.with_timeout(OPEN_TIMEOUT),
695								},
696							);
697						}
698
699						in_substream.send_handshake(handshake_message);
700
701						let in_substream = match mem::replace(
703							&mut protocol_info.state,
704							State::Opening { in_substream: None, inbound: false },
705						) {
706							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
707							_ => unreachable!(),
708						};
709						protocol_info.state =
710							State::Opening { in_substream: Some(in_substream), inbound: true };
711					},
712					State::Opening { .. } | State::Open { .. } => {
713						log::error!(target: LOG_TARGET, "opening already-opened handler");
716						debug_assert!(false);
717					},
718				}
719			},
720
721			NotifsHandlerIn::Close { protocol_index } => {
722				match self.protocols[protocol_index].state {
723					State::Open { .. } => {
724						self.protocols[protocol_index].state =
725							State::Closed { pending_opening: false };
726					},
727					State::Opening { .. } => {
728						self.protocols[protocol_index].state =
729							State::Closed { pending_opening: true };
730
731						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
732							NotifsHandlerOut::OpenResultErr { protocol_index },
733						));
734					},
735					State::OpenDesiredByRemote { pending_opening, .. } => {
736						self.protocols[protocol_index].state = State::Closed { pending_opening };
737					},
738					State::Closed { .. } => {},
739				}
740
741				self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
742					NotifsHandlerOut::CloseResult { protocol_index },
743				));
744			},
745		}
746	}
747
748	fn connection_keep_alive(&self) -> bool {
749		if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
751			return true;
752		}
753
754		self.keep_alive
755	}
756
757	fn poll(
758		&mut self,
759		cx: &mut Context,
760	) -> Poll<
761		ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
762	> {
763		{
764			let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
765			if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
766				if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
767					maybe_keep_alive_timeout_future.take();
768					self.keep_alive = false;
769				}
770			}
771		}
772
773		if let Some(ev) = self.events_queue.pop_front() {
774			return Poll::Ready(ev)
775		}
776
777		for protocol_index in 0..self.protocols.len() {
780			if let State::Open {
781				notifications_sink_rx, out_substream: Some(out_substream), ..
782			} = &mut self.protocols[protocol_index].state
783			{
784				loop {
785					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
789						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
790							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
791								NotifsHandlerOut::Close { protocol_index },
792							)),
793						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
794						Poll::Ready(None) | Poll::Pending => break,
795					}
796
797					match out_substream.poll_ready_unpin(cx) {
800						Poll::Ready(_) => {},
801						Poll::Pending => break,
802					}
803
804					let message = match notifications_sink_rx.poll_next_unpin(cx) {
806						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
807							message,
808						Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
809						Poll::Ready(None) |
810						Poll::Pending => {
811							debug_assert!(false);
813							break
814						},
815					};
816
817					let _ = out_substream.start_send_unpin(message);
818					}
820			}
821		}
822
823		for protocol_index in 0..self.protocols.len() {
834			match &mut self.protocols[protocol_index].state {
835				State::Open { out_substream: out_substream @ Some(_), .. } => {
836					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
837						Poll::Pending | Poll::Ready(Ok(())) => {},
838						Poll::Ready(Err(error)) => {
839							*out_substream = None;
840
841							let reason = match error {
842								NotificationsOutError::Io(_) | NotificationsOutError::Closed =>
843									CloseReason::RemoteRequest,
844								NotificationsOutError::UnexpectedData =>
845									CloseReason::ProtocolMisbehavior,
846							};
847
848							let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
849							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
850						},
851					};
852				},
853
854				State::Closed { .. } |
855				State::Opening { .. } |
856				State::Open { out_substream: None, .. } |
857				State::OpenDesiredByRemote { .. } => {},
858			}
859		}
860
861		for protocol_index in 0..self.protocols.len() {
863			match &mut self.protocols[protocol_index].state {
866				State::Closed { .. } |
867				State::Open { in_substream: None, .. } |
868				State::Opening { in_substream: None, .. } => {},
869
870				State::Open { in_substream: in_substream @ Some(_), .. } =>
871					match futures::prelude::stream::Stream::poll_next(
872						Pin::new(in_substream.as_mut().unwrap()),
873						cx,
874					) {
875						Poll::Pending => {},
876						Poll::Ready(Some(Ok(message))) => {
877							let event = NotifsHandlerOut::Notification { protocol_index, message };
878							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
879						},
880						Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
881					},
882
883				State::OpenDesiredByRemote { in_substream, pending_opening } =>
884					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
885						Poll::Pending => {},
886						Poll::Ready(Ok(())) => {},
887						Poll::Ready(Err(_)) => {
888							self.protocols[protocol_index].state =
889								State::Closed { pending_opening: *pending_opening };
890							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
891								NotifsHandlerOut::CloseDesired {
892									protocol_index,
893									reason: CloseReason::RemoteRequest,
894								},
895							))
896						},
897					},
898
899				State::Opening { in_substream: in_substream @ Some(_), .. } =>
900					match NotificationsInSubstream::poll_process(
901						Pin::new(in_substream.as_mut().unwrap()),
902						cx,
903					) {
904						Poll::Pending => {},
905						Poll::Ready(Ok(())) => {},
906						Poll::Ready(Err(_)) => *in_substream = None,
907					},
908			}
909		}
910
911		Poll::Pending
915	}
916}
917
918#[cfg(test)]
919pub mod tests {
920	use super::*;
921	use crate::protocol::notifications::upgrade::{
922		NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
923	};
924	use asynchronous_codec::Framed;
925	use libp2p::{
926		core::muxing::SubstreamBox,
927		swarm::handler::{self, StreamUpgradeError},
928	};
929	use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
930	use std::{
931		collections::HashMap,
932		io::{Error, IoSlice, IoSliceMut},
933	};
934	use tokio::sync::mpsc;
935	use unsigned_varint::codec::UviBytes;
936
937	struct OpenSubstream {
938		notifications: stream::Peekable<
939			stream::Select<
940				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
941				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
942			>,
943		>,
944		_in_substream: MockSubstream,
945		_out_substream: MockSubstream,
946	}
947
948	pub struct ConnectionYielder {
949		connections: HashMap<(PeerId, usize), OpenSubstream>,
950	}
951
952	impl ConnectionYielder {
953		pub fn new() -> Self {
955			Self { connections: HashMap::new() }
956		}
957
958		pub fn open_substream(
960			&mut self,
961			peer: PeerId,
962			protocol_index: usize,
963			received_handshake: Vec<u8>,
964		) -> NotifsHandlerOut {
965			let (async_tx, async_rx) =
966				futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
967			let (sync_tx, sync_rx) =
968				futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
969			let notifications_sink = NotificationsSink {
970				inner: Arc::new(NotificationsSinkInner {
971					peer_id: peer,
972					async_channel: FuturesMutex::new(async_tx),
973					sync_channel: Mutex::new(Some(sync_tx)),
974				}),
975				metrics: None,
976			};
977			let (in_substream, out_substream) = MockSubstream::new();
978
979			self.connections.insert(
980				(peer, protocol_index),
981				OpenSubstream {
982					notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
983					_in_substream: in_substream,
984					_out_substream: out_substream,
985				},
986			);
987
988			NotifsHandlerOut::OpenResultOk {
989				protocol_index,
990				negotiated_fallback: None,
991				received_handshake,
992				notifications_sink,
993				inbound: false,
994			}
995		}
996
997		pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
999			let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
1000				info
1001			} else {
1002				return None
1003			};
1004
1005			futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1006				Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
1007					Poll::Ready(Some(message)),
1008				Poll::Pending => Poll::Ready(None),
1009				Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1010					panic!("sink closed")
1011				},
1012			})
1013			.await
1014		}
1015	}
1016
1017	struct MockSubstream {
1018		pub rx: mpsc::Receiver<Vec<u8>>,
1019		pub tx: mpsc::Sender<Vec<u8>>,
1020		rx_buffer: BytesMut,
1021	}
1022
1023	#[allow(dead_code)]
1025	struct MockActiveStreamCounter(Arc<()>);
1026
1027	#[allow(dead_code)]
1029	struct MockStream {
1030		stream: Negotiated<SubstreamBox>,
1031		counter: Option<MockActiveStreamCounter>,
1032	}
1033
1034	impl MockSubstream {
1035		pub fn new() -> (Self, Self) {
1037			let (tx1, rx1) = mpsc::channel(32);
1038			let (tx2, rx2) = mpsc::channel(32);
1039
1040			(
1041				Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1042				Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1043			)
1044		}
1045
1046		pub async fn negotiated() -> (Stream, Stream) {
1048			let (socket1, socket2) = Self::new();
1049			let socket1 = SubstreamBox::new(socket1);
1050			let socket2 = SubstreamBox::new(socket2);
1051
1052			let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1053			let (res1, res2) = tokio::join!(
1054				dialer_select_proto(socket1, protos.clone(), Version::V1),
1055				listener_select_proto(socket2, protos),
1056			);
1057
1058			(Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1059		}
1060
1061		fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1063			let stream = MockStream { stream, counter: None };
1064			const _: () = {
1066				assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1067				assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1068			};
1069
1070			unsafe { core::mem::transmute(stream) }
1071		}
1072	}
1073
1074	impl AsyncWrite for MockSubstream {
1075		fn poll_write<'a>(
1076			self: Pin<&mut Self>,
1077			_cx: &mut Context<'a>,
1078			buf: &[u8],
1079		) -> Poll<Result<usize, Error>> {
1080			match self.tx.try_send(buf.to_vec()) {
1081				Ok(_) => Poll::Ready(Ok(buf.len())),
1082				Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1083			}
1084		}
1085
1086		fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1087			Poll::Ready(Ok(()))
1088		}
1089
1090		fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1091			Poll::Ready(Ok(()))
1092		}
1093
1094		fn poll_write_vectored<'a, 'b>(
1095			self: Pin<&mut Self>,
1096			_cx: &mut Context<'a>,
1097			_bufs: &[IoSlice<'b>],
1098		) -> Poll<Result<usize, Error>> {
1099			unimplemented!();
1100		}
1101	}
1102
1103	impl AsyncRead for MockSubstream {
1104		fn poll_read<'a>(
1105			mut self: Pin<&mut Self>,
1106			cx: &mut Context<'a>,
1107			buf: &mut [u8],
1108		) -> Poll<Result<usize, Error>> {
1109			match self.rx.poll_recv(cx) {
1110				Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1111				Poll::Ready(None) =>
1112					return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1113				_ => {},
1114			}
1115
1116			let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1117			let data = self.rx_buffer.split_to(nsize);
1118			buf[..nsize].copy_from_slice(&data[..]);
1119
1120			if nsize > 0 {
1121				return Poll::Ready(Ok(nsize))
1122			}
1123
1124			Poll::Pending
1125		}
1126
1127		fn poll_read_vectored<'a, 'b>(
1128			self: Pin<&mut Self>,
1129			_cx: &mut Context<'a>,
1130			_bufs: &mut [IoSliceMut<'b>],
1131		) -> Poll<Result<usize, Error>> {
1132			unimplemented!();
1133		}
1134	}
1135
1136	fn notifs_handler() -> NotifsHandler {
1138		NotifsHandler::new(
1139			PeerId::random(),
1140			vec![ProtocolConfig {
1141				name: "/foo".into(),
1142				fallback_names: vec![],
1143				handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1144				max_notification_size: u64::MAX,
1145			}],
1146			None,
1147		)
1148	}
1149
1150	#[tokio::test]
1153	async fn second_open_desired_by_remote_rejected() {
1154		let mut handler = notifs_handler();
1155		let (io, mut io2) = MockSubstream::negotiated().await;
1156		let mut codec = UviBytes::default();
1157		codec.set_max_len(usize::MAX);
1158
1159		let notif_in = NotificationsInOpen {
1160			handshake: b"hello, world".to_vec(),
1161			substream: NotificationsInSubstream::new(
1162				Framed::new(io, codec),
1163				NotificationsInSubstreamHandshake::NotSent,
1164			),
1165		};
1166
1167		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1168			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1169		));
1170
1171		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1173		futures::future::poll_fn(|cx| {
1174			let mut buf = Vec::with_capacity(512);
1175			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1176			Poll::Ready(())
1177		})
1178		.await;
1179
1180		let (io, mut io2) = MockSubstream::negotiated().await;
1182		let mut codec = UviBytes::default();
1183		codec.set_max_len(usize::MAX);
1184
1185		let notif_in = NotificationsInOpen {
1186			handshake: b"hello, world".to_vec(),
1187			substream: NotificationsInSubstream::new(
1188				Framed::new(io, codec),
1189				NotificationsInSubstreamHandshake::NotSent,
1190			),
1191		};
1192
1193		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1194			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1195		));
1196
1197		futures::future::poll_fn(|cx| {
1199			let mut buf = Vec::with_capacity(512);
1200
1201			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1202				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1203			}
1204
1205			Poll::Ready(())
1206		})
1207		.await;
1208	}
1209
1210	#[tokio::test]
1211	async fn open_rejected_if_substream_is_opening() {
1212		let mut handler = notifs_handler();
1213		let (io, mut io2) = MockSubstream::negotiated().await;
1214		let mut codec = UviBytes::default();
1215		codec.set_max_len(usize::MAX);
1216
1217		let notif_in = NotificationsInOpen {
1218			handshake: b"hello, world".to_vec(),
1219			substream: NotificationsInSubstream::new(
1220				Framed::new(io, codec),
1221				NotificationsInSubstreamHandshake::NotSent,
1222			),
1223		};
1224
1225		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1226			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1227		));
1228
1229		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1231		futures::future::poll_fn(|cx| {
1232			let mut buf = Vec::with_capacity(512);
1233			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1234			Poll::Ready(())
1235		})
1236		.await;
1237
1238		handler.on_behaviour_event(NotifsHandlerIn::Open {
1240			protocol_index: 0,
1241			peer_id: PeerId::random(),
1242		});
1243		assert!(std::matches!(
1244			handler.protocols[0].state,
1245			State::Opening { in_substream: Some(_), .. }
1246		));
1247
1248		let (io, mut io2) = MockSubstream::negotiated().await;
1250		let mut codec = UviBytes::default();
1251		codec.set_max_len(usize::MAX);
1252
1253		let notif_in = NotificationsInOpen {
1254			handshake: b"hello, world".to_vec(),
1255			substream: NotificationsInSubstream::new(
1256				Framed::new(io, codec),
1257				NotificationsInSubstreamHandshake::NotSent,
1258			),
1259		};
1260
1261		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1262			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1263		));
1264
1265		futures::future::poll_fn(|cx| {
1268			let mut buf = Vec::with_capacity(512);
1269
1270			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1271				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1272			} else {
1273				panic!("unexpected result");
1274			}
1275
1276			Poll::Ready(())
1277		})
1278		.await;
1279		assert!(std::matches!(
1280			handler.protocols[0].state,
1281			State::Opening { in_substream: Some(_), .. }
1282		));
1283	}
1284
1285	#[tokio::test]
1286	async fn open_rejected_if_substream_already_open() {
1287		let mut handler = notifs_handler();
1288		let (io, mut io2) = MockSubstream::negotiated().await;
1289		let mut codec = UviBytes::default();
1290		codec.set_max_len(usize::MAX);
1291
1292		let notif_in = NotificationsInOpen {
1293			handshake: b"hello, world".to_vec(),
1294			substream: NotificationsInSubstream::new(
1295				Framed::new(io, codec),
1296				NotificationsInSubstreamHandshake::NotSent,
1297			),
1298		};
1299		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1300			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1301		));
1302
1303		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1305		futures::future::poll_fn(|cx| {
1306			let mut buf = Vec::with_capacity(512);
1307			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1308			Poll::Ready(())
1309		})
1310		.await;
1311
1312		handler.on_behaviour_event(NotifsHandlerIn::Open {
1314			protocol_index: 0,
1315			peer_id: PeerId::random(),
1316		});
1317		assert!(std::matches!(
1318			handler.protocols[0].state,
1319			State::Opening { in_substream: Some(_), .. }
1320		));
1321
1322		let (io, _io2) = MockSubstream::negotiated().await;
1324		let mut codec = UviBytes::default();
1325		codec.set_max_len(usize::MAX);
1326
1327		let notif_out = NotificationsOutOpen {
1328			handshake: b"hello, world".to_vec(),
1329			negotiated_fallback: None,
1330			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1331		};
1332		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1333			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1334		));
1335
1336		assert!(std::matches!(
1337			handler.protocols[0].state,
1338			State::Open { in_substream: Some(_), .. }
1339		));
1340
1341		let (io, mut io2) = MockSubstream::negotiated().await;
1343		let mut codec = UviBytes::default();
1344		codec.set_max_len(usize::MAX);
1345		let notif_in = NotificationsInOpen {
1346			handshake: b"hello, world".to_vec(),
1347			substream: NotificationsInSubstream::new(
1348				Framed::new(io, codec),
1349				NotificationsInSubstreamHandshake::NotSent,
1350			),
1351		};
1352		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1353			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1354		));
1355
1356		futures::future::poll_fn(|cx| {
1359			let mut buf = Vec::with_capacity(512);
1360
1361			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1362				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1363			} else {
1364				panic!("unexpected result");
1365			}
1366
1367			Poll::Ready(())
1368		})
1369		.await;
1370		assert!(std::matches!(
1371			handler.protocols[0].state,
1372			State::Open { in_substream: Some(_), .. }
1373		));
1374	}
1375
1376	#[tokio::test]
1377	async fn fully_negotiated_resets_state_for_closed_substream() {
1378		let mut handler = notifs_handler();
1379		let (io, mut io2) = MockSubstream::negotiated().await;
1380		let mut codec = UviBytes::default();
1381		codec.set_max_len(usize::MAX);
1382
1383		let notif_in = NotificationsInOpen {
1384			handshake: b"hello, world".to_vec(),
1385			substream: NotificationsInSubstream::new(
1386				Framed::new(io, codec),
1387				NotificationsInSubstreamHandshake::NotSent,
1388			),
1389		};
1390		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1391			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1392		));
1393
1394		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1396		futures::future::poll_fn(|cx| {
1397			let mut buf = Vec::with_capacity(512);
1398			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1399			Poll::Ready(())
1400		})
1401		.await;
1402
1403		handler.on_behaviour_event(NotifsHandlerIn::Open {
1406			protocol_index: 0,
1407			peer_id: PeerId::random(),
1408		});
1409		assert!(std::matches!(
1410			handler.protocols[0].state,
1411			State::Opening { in_substream: Some(_), .. }
1412		));
1413
1414		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1415		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1416
1417		let (io, _io2) = MockSubstream::negotiated().await;
1420		let mut codec = UviBytes::default();
1421		codec.set_max_len(usize::MAX);
1422
1423		let notif_out = NotificationsOutOpen {
1424			handshake: b"hello, world".to_vec(),
1425			negotiated_fallback: None,
1426			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1427		};
1428		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1429			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1430		));
1431
1432		assert!(std::matches!(
1433			handler.protocols[0].state,
1434			State::Closed { pending_opening: false }
1435		));
1436	}
1437
1438	#[tokio::test]
1439	async fn fully_negotiated_resets_state_for_open_desired_substream() {
1440		let mut handler = notifs_handler();
1441		let (io, mut io2) = MockSubstream::negotiated().await;
1442		let mut codec = UviBytes::default();
1443		codec.set_max_len(usize::MAX);
1444
1445		let notif_in = NotificationsInOpen {
1446			handshake: b"hello, world".to_vec(),
1447			substream: NotificationsInSubstream::new(
1448				Framed::new(io, codec),
1449				NotificationsInSubstreamHandshake::NotSent,
1450			),
1451		};
1452		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1453			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1454		));
1455
1456		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1458		futures::future::poll_fn(|cx| {
1459			let mut buf = Vec::with_capacity(512);
1460			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1461			Poll::Ready(())
1462		})
1463		.await;
1464
1465		handler.on_behaviour_event(NotifsHandlerIn::Open {
1468			protocol_index: 0,
1469			peer_id: PeerId::random(),
1470		});
1471		assert!(std::matches!(
1472			handler.protocols[0].state,
1473			State::Opening { in_substream: Some(_), .. }
1474		));
1475
1476		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1477		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1478
1479		let (io, _io2) = MockSubstream::negotiated().await;
1481		let mut codec = UviBytes::default();
1482		codec.set_max_len(usize::MAX);
1483
1484		let notif_in = NotificationsInOpen {
1485			handshake: b"hello, world".to_vec(),
1486			substream: NotificationsInSubstream::new(
1487				Framed::new(io, codec),
1488				NotificationsInSubstreamHandshake::NotSent,
1489			),
1490		};
1491		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1492			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1493		));
1494
1495		assert!(std::matches!(
1496			handler.protocols[0].state,
1497			State::OpenDesiredByRemote { pending_opening: true, .. }
1498		));
1499
1500		let (io, _io2) = MockSubstream::negotiated().await;
1503		let mut codec = UviBytes::default();
1504		codec.set_max_len(usize::MAX);
1505
1506		let notif_out = NotificationsOutOpen {
1507			handshake: b"hello, world".to_vec(),
1508			negotiated_fallback: None,
1509			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1510		};
1511
1512		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1513			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1514		));
1515
1516		assert!(std::matches!(
1517			handler.protocols[0].state,
1518			State::OpenDesiredByRemote { pending_opening: false, .. }
1519		));
1520	}
1521
1522	#[tokio::test]
1523	async fn dial_upgrade_error_resets_closed_outbound_state() {
1524		let mut handler = notifs_handler();
1525		let (io, mut io2) = MockSubstream::negotiated().await;
1526		let mut codec = UviBytes::default();
1527		codec.set_max_len(usize::MAX);
1528
1529		let notif_in = NotificationsInOpen {
1530			handshake: b"hello, world".to_vec(),
1531			substream: NotificationsInSubstream::new(
1532				Framed::new(io, codec),
1533				NotificationsInSubstreamHandshake::NotSent,
1534			),
1535		};
1536		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1537			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1538		));
1539
1540		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1542		futures::future::poll_fn(|cx| {
1543			let mut buf = Vec::with_capacity(512);
1544			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1545			Poll::Ready(())
1546		})
1547		.await;
1548
1549		handler.on_behaviour_event(NotifsHandlerIn::Open {
1552			protocol_index: 0,
1553			peer_id: PeerId::random(),
1554		});
1555		assert!(std::matches!(
1556			handler.protocols[0].state,
1557			State::Opening { in_substream: Some(_), .. }
1558		));
1559
1560		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1561		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1562
1563		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1565			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1566		));
1567		assert!(std::matches!(
1568			handler.protocols[0].state,
1569			State::Closed { pending_opening: false }
1570		));
1571	}
1572
1573	#[tokio::test]
1574	async fn dial_upgrade_error_resets_open_desired_state() {
1575		let mut handler = notifs_handler();
1576		let (io, mut io2) = MockSubstream::negotiated().await;
1577		let mut codec = UviBytes::default();
1578		codec.set_max_len(usize::MAX);
1579
1580		let notif_in = NotificationsInOpen {
1581			handshake: b"hello, world".to_vec(),
1582			substream: NotificationsInSubstream::new(
1583				Framed::new(io, codec),
1584				NotificationsInSubstreamHandshake::NotSent,
1585			),
1586		};
1587		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1588			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1589		));
1590
1591		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1593		futures::future::poll_fn(|cx| {
1594			let mut buf = Vec::with_capacity(512);
1595			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1596			Poll::Ready(())
1597		})
1598		.await;
1599
1600		handler.on_behaviour_event(NotifsHandlerIn::Open {
1603			protocol_index: 0,
1604			peer_id: PeerId::random(),
1605		});
1606		assert!(std::matches!(
1607			handler.protocols[0].state,
1608			State::Opening { in_substream: Some(_), .. }
1609		));
1610
1611		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1612		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1613
1614		let (io, _io2) = MockSubstream::negotiated().await;
1615		let mut codec = UviBytes::default();
1616		codec.set_max_len(usize::MAX);
1617
1618		let notif_in = NotificationsInOpen {
1619			handshake: b"hello, world".to_vec(),
1620			substream: NotificationsInSubstream::new(
1621				Framed::new(io, codec),
1622				NotificationsInSubstreamHandshake::NotSent,
1623			),
1624		};
1625		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1626			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1627		));
1628
1629		assert!(std::matches!(
1630			handler.protocols[0].state,
1631			State::OpenDesiredByRemote { pending_opening: true, .. }
1632		));
1633
1634		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1636			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1637		));
1638		assert!(std::matches!(
1639			handler.protocols[0].state,
1640			State::OpenDesiredByRemote { pending_opening: false, .. }
1641		));
1642	}
1643
1644	#[tokio::test]
1645	async fn sync_notifications_clogged() {
1646		let mut handler = notifs_handler();
1647		let (io, _) = MockSubstream::negotiated().await;
1648		let codec = UviBytes::default();
1649
1650		let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1651		let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1652		let notifications_sink = NotificationsSink {
1653			inner: Arc::new(NotificationsSinkInner {
1654				peer_id: PeerId::random(),
1655				async_channel: FuturesMutex::new(async_tx),
1656				sync_channel: Mutex::new(Some(sync_tx)),
1657			}),
1658			metrics: None,
1659		};
1660
1661		handler.protocols[0].state = State::Open {
1662			notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1663			out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1664			in_substream: None,
1665		};
1666
1667		notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1668		notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1669		notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1670		notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1671
1672		futures::future::poll_fn(|cx| {
1673			assert!(std::matches!(
1674				handler.poll(cx),
1675				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1676					NotifsHandlerOut::Close { .. }
1677				))
1678			));
1679			Poll::Ready(())
1680		})
1681		.await;
1682	}
1683
1684	#[tokio::test]
1685	async fn close_desired_by_remote() {
1686		let mut handler = notifs_handler();
1687		let (io, io2) = MockSubstream::negotiated().await;
1688		let mut codec = UviBytes::default();
1689		codec.set_max_len(usize::MAX);
1690
1691		let notif_in = NotificationsInOpen {
1692			handshake: b"hello, world".to_vec(),
1693			substream: NotificationsInSubstream::new(
1694				Framed::new(io, codec),
1695				NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1696			),
1697		};
1698
1699		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1702			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1703		));
1704		drop(io2);
1705
1706		futures::future::poll_fn(|cx| {
1707			assert!(std::matches!(
1708				handler.poll(cx),
1709				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1710					NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1711				))
1712			));
1713			assert!(std::matches!(
1714				handler.poll(cx),
1715				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1716					NotifsHandlerOut::CloseDesired {
1717						protocol_index: 0,
1718						reason: CloseReason::RemoteRequest,
1719					},
1720				))
1721			));
1722			Poll::Ready(())
1723		})
1724		.await;
1725	}
1726}