sc_network/protocol/notifications/upgrade/
notifications.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19/// Notifications protocol.
20///
21/// The Substrate notifications protocol consists in the following:
22///
23/// - Node A opens a substream to node B and sends a message which contains some
24///   protocol-specific higher-level logic. This message is prefixed with a variable-length
25///   integer message length. This message can be empty, in which case `0` is sent.
26/// - If node B accepts the substream, it sends back a message with the same properties.
27/// - If instead B refuses the connection (which typically happens because no empty slot is
28///   available), then it immediately closes the substream without sending back anything.
29/// - Node A can then send notifications to B, prefixed with a variable-length integer
30///   indicating the length of the message.
31/// - Either node A or node B can signal that it doesn't want this notifications substream
32///   anymore by closing its writing side. The other party should respond by also closing their
33///   own writing side soon after.
34///
35/// Notification substreams are unidirectional. If A opens a substream with B, then B is
36/// encouraged but not required to open a substream to A as well.
37use crate::types::ProtocolName;
38
39use asynchronous_codec::Framed;
40use bytes::BytesMut;
41use futures::prelude::*;
42use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
43use log::{error, warn};
44use unsigned_varint::codec::UviBytes;
45
46use std::{
47	io, mem,
48	pin::Pin,
49	task::{Context, Poll},
50	vec,
51};
52
53/// Maximum allowed size of the two handshake messages, in bytes.
54const MAX_HANDSHAKE_SIZE: usize = 1024;
55
56/// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional
57/// stream of messages.
58#[derive(Debug, Clone)]
59pub struct NotificationsIn {
60	/// Protocol name to use when negotiating the substream.
61	/// The first one is the main name, while the other ones are fall backs.
62	protocol_names: Vec<ProtocolName>,
63	/// Maximum allowed size for a single notification.
64	max_notification_size: u64,
65}
66
67/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
68/// message, then becomes a unidirectional sink of data.
69#[derive(Debug, Clone)]
70pub struct NotificationsOut {
71	/// Protocol name to use when negotiating the substream.
72	/// The first one is the main name, while the other ones are fall backs.
73	protocol_names: Vec<ProtocolName>,
74	/// Message to send when we start the handshake.
75	initial_message: Vec<u8>,
76	/// Maximum allowed size for a single notification.
77	max_notification_size: u64,
78}
79
80/// A substream for incoming notification messages.
81///
82/// When creating, this struct starts in a state in which we must first send back a handshake
83/// message to the remote. No message will come before this has been done.
84#[pin_project::pin_project]
85pub struct NotificationsInSubstream<TSubstream> {
86	#[pin]
87	socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
88	handshake: NotificationsInSubstreamHandshake,
89}
90
91/// State of the handshake sending back process.
92#[derive(Debug)]
93pub enum NotificationsInSubstreamHandshake {
94	/// Waiting for the user to give us the handshake message.
95	NotSent,
96	/// User gave us the handshake message. Trying to push it in the socket.
97	PendingSend(Vec<u8>),
98	/// Handshake message was pushed in the socket. Still need to flush.
99	Flush,
100	/// Handshake message successfully sent and flushed.
101	Sent,
102	/// Remote has closed their writing side. We close our own writing side in return.
103	ClosingInResponseToRemote,
104	/// Both our side and the remote have closed their writing side.
105	BothSidesClosed,
106}
107
108/// A substream for outgoing notification messages.
109#[pin_project::pin_project]
110pub struct NotificationsOutSubstream<TSubstream> {
111	/// Substream where to send messages.
112	#[pin]
113	socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
114}
115
116#[cfg(test)]
117impl<TSubstream> NotificationsOutSubstream<TSubstream> {
118	pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
119		Self { socket }
120	}
121}
122
123impl NotificationsIn {
124	/// Builds a new potential upgrade.
125	pub fn new(
126		main_protocol_name: impl Into<ProtocolName>,
127		fallback_names: Vec<ProtocolName>,
128		max_notification_size: u64,
129	) -> Self {
130		let mut protocol_names = fallback_names;
131		protocol_names.insert(0, main_protocol_name.into());
132
133		Self { protocol_names, max_notification_size }
134	}
135}
136
137impl UpgradeInfo for NotificationsIn {
138	type Info = ProtocolName;
139	type InfoIter = vec::IntoIter<Self::Info>;
140
141	fn protocol_info(&self) -> Self::InfoIter {
142		self.protocol_names.clone().into_iter()
143	}
144}
145
146impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn
147where
148	TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
149{
150	type Output = NotificationsInOpen<TSubstream>;
151	type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
152	type Error = NotificationsHandshakeError;
153
154	fn upgrade_inbound(self, mut socket: TSubstream, _negotiated_name: Self::Info) -> Self::Future {
155		Box::pin(async move {
156			let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
157			if handshake_len > MAX_HANDSHAKE_SIZE {
158				return Err(NotificationsHandshakeError::TooLarge {
159					requested: handshake_len,
160					max: MAX_HANDSHAKE_SIZE,
161				})
162			}
163
164			let mut handshake = vec![0u8; handshake_len];
165			if !handshake.is_empty() {
166				socket.read_exact(&mut handshake).await?;
167			}
168
169			let mut codec = UviBytes::default();
170			codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::MAX));
171
172			let substream = NotificationsInSubstream {
173				socket: Framed::new(socket, codec),
174				handshake: NotificationsInSubstreamHandshake::NotSent,
175			};
176
177			Ok(NotificationsInOpen { handshake, substream })
178		})
179	}
180}
181
182/// Yielded by the [`NotificationsIn`] after a successfully upgrade.
183pub struct NotificationsInOpen<TSubstream> {
184	/// Handshake sent by the remote.
185	pub handshake: Vec<u8>,
186	/// Implementation of `Stream` that allows receives messages from the substream.
187	pub substream: NotificationsInSubstream<TSubstream>,
188}
189
190impl<TSubstream> NotificationsInSubstream<TSubstream>
191where
192	TSubstream: AsyncRead + AsyncWrite + Unpin,
193{
194	#[cfg(test)]
195	pub fn new(
196		socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
197		handshake: NotificationsInSubstreamHandshake,
198	) -> Self {
199		Self { socket, handshake }
200	}
201
202	/// Sends the handshake in order to inform the remote that we accept the substream.
203	pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
204		if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
205			error!(target: "sub-libp2p", "Tried to send handshake twice");
206			return
207		}
208
209		self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
210	}
211
212	/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
213	/// guaranteed to not generate any notification.
214	pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
215		let mut this = self.project();
216
217		loop {
218			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
219				NotificationsInSubstreamHandshake::PendingSend(msg) => {
220					match Sink::poll_ready(this.socket.as_mut(), cx) {
221						Poll::Ready(_) => {
222							*this.handshake = NotificationsInSubstreamHandshake::Flush;
223							match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
224								Ok(()) => {},
225								Err(err) => return Poll::Ready(Err(err)),
226							}
227						},
228						Poll::Pending => {
229							*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
230							return Poll::Pending
231						},
232					}
233				},
234				NotificationsInSubstreamHandshake::Flush => {
235					match Sink::poll_flush(this.socket.as_mut(), cx)? {
236						Poll::Ready(()) => {
237							*this.handshake = NotificationsInSubstreamHandshake::Sent;
238							return Poll::Ready(Ok(()));
239						},
240						Poll::Pending => {
241							*this.handshake = NotificationsInSubstreamHandshake::Flush;
242							return Poll::Pending
243						},
244					}
245				},
246
247				st @ NotificationsInSubstreamHandshake::NotSent |
248				st @ NotificationsInSubstreamHandshake::Sent |
249				st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
250				st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
251					*this.handshake = st;
252					return Poll::Ready(Ok(()));
253				},
254			}
255		}
256	}
257}
258
259impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>
260where
261	TSubstream: AsyncRead + AsyncWrite + Unpin,
262{
263	type Item = Result<BytesMut, io::Error>;
264
265	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
266		let mut this = self.project();
267
268		// This `Stream` implementation first tries to send back the handshake if necessary.
269		loop {
270			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
271				NotificationsInSubstreamHandshake::NotSent => {
272					*this.handshake = NotificationsInSubstreamHandshake::NotSent;
273					return Poll::Pending
274				},
275				NotificationsInSubstreamHandshake::PendingSend(msg) => {
276					match Sink::poll_ready(this.socket.as_mut(), cx) {
277						Poll::Ready(_) => {
278							*this.handshake = NotificationsInSubstreamHandshake::Flush;
279							match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
280								Ok(()) => {},
281								Err(err) => return Poll::Ready(Some(Err(err))),
282							}
283						},
284						Poll::Pending => {
285							*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
286							return Poll::Pending
287						},
288					}
289				},
290				NotificationsInSubstreamHandshake::Flush => {
291					match Sink::poll_flush(this.socket.as_mut(), cx)? {
292						Poll::Ready(()) =>
293							*this.handshake = NotificationsInSubstreamHandshake::Sent,
294						Poll::Pending => {
295							*this.handshake = NotificationsInSubstreamHandshake::Flush;
296							return Poll::Pending
297						},
298					}
299				},
300
301				NotificationsInSubstreamHandshake::Sent => {
302					match Stream::poll_next(this.socket.as_mut(), cx) {
303						Poll::Ready(None) =>
304							*this.handshake =
305								NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
306						Poll::Ready(Some(msg)) => {
307							*this.handshake = NotificationsInSubstreamHandshake::Sent;
308							return Poll::Ready(Some(msg))
309						},
310						Poll::Pending => {
311							*this.handshake = NotificationsInSubstreamHandshake::Sent;
312							return Poll::Pending
313						},
314					}
315				},
316
317				NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
318					match Sink::poll_close(this.socket.as_mut(), cx)? {
319						Poll::Ready(()) =>
320							*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
321						Poll::Pending => {
322							*this.handshake =
323								NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
324							return Poll::Pending
325						},
326					},
327
328				NotificationsInSubstreamHandshake::BothSidesClosed => return Poll::Ready(None),
329			}
330		}
331	}
332}
333
334impl NotificationsOut {
335	/// Builds a new potential upgrade.
336	pub fn new(
337		main_protocol_name: impl Into<ProtocolName>,
338		fallback_names: Vec<ProtocolName>,
339		initial_message: impl Into<Vec<u8>>,
340		max_notification_size: u64,
341	) -> Self {
342		let initial_message = initial_message.into();
343		if initial_message.len() > MAX_HANDSHAKE_SIZE {
344			error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
345		}
346
347		let mut protocol_names = fallback_names;
348		protocol_names.insert(0, main_protocol_name.into());
349
350		Self { protocol_names, initial_message, max_notification_size }
351	}
352}
353
354impl UpgradeInfo for NotificationsOut {
355	type Info = ProtocolName;
356	type InfoIter = vec::IntoIter<Self::Info>;
357
358	fn protocol_info(&self) -> Self::InfoIter {
359		self.protocol_names.clone().into_iter()
360	}
361}
362
363impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut
364where
365	TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
366{
367	type Output = NotificationsOutOpen<TSubstream>;
368	type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
369	type Error = NotificationsHandshakeError;
370
371	fn upgrade_outbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
372		Box::pin(async move {
373			upgrade::write_length_prefixed(&mut socket, &self.initial_message).await?;
374
375			// Reading handshake.
376			let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
377			if handshake_len > MAX_HANDSHAKE_SIZE {
378				return Err(NotificationsHandshakeError::TooLarge {
379					requested: handshake_len,
380					max: MAX_HANDSHAKE_SIZE,
381				})
382			}
383
384			let mut handshake = vec![0u8; handshake_len];
385			if !handshake.is_empty() {
386				socket.read_exact(&mut handshake).await?;
387			}
388
389			let mut codec = UviBytes::default();
390			codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::MAX));
391
392			Ok(NotificationsOutOpen {
393				handshake,
394				negotiated_fallback: if negotiated_name == self.protocol_names[0] {
395					None
396				} else {
397					Some(negotiated_name)
398				},
399				substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
400			})
401		})
402	}
403}
404
405/// Yielded by the [`NotificationsOut`] after a successfully upgrade.
406pub struct NotificationsOutOpen<TSubstream> {
407	/// Handshake returned by the remote.
408	pub handshake: Vec<u8>,
409	/// If the negotiated name is not the "main" protocol name but a fallback, contains the
410	/// name of the negotiated fallback.
411	pub negotiated_fallback: Option<ProtocolName>,
412	/// Implementation of `Sink` that allows sending messages on the substream.
413	pub substream: NotificationsOutSubstream<TSubstream>,
414}
415
416impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
417where
418	TSubstream: AsyncRead + AsyncWrite + Unpin,
419{
420	type Error = NotificationsOutError;
421
422	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
423		let mut this = self.project();
424		Sink::poll_ready(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
425	}
426
427	fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
428		let mut this = self.project();
429		Sink::start_send(this.socket.as_mut(), io::Cursor::new(item))
430			.map_err(NotificationsOutError::Io)
431	}
432
433	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
434		let mut this = self.project();
435
436		// `Sink::poll_flush` does not expose stream closed error until we write something into
437		// the stream, so the code below makes sure we detect that the substream was closed
438		// even if we don't write anything into it.
439		match Stream::poll_next(this.socket.as_mut(), cx) {
440			Poll::Pending => {},
441			Poll::Ready(Some(_)) => {
442				error!(
443					target: "sub-libp2p",
444					"Unexpected incoming data in `NotificationsOutSubstream`",
445				);
446			},
447			Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
448		}
449
450		Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
451	}
452
453	fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
454		let mut this = self.project();
455		Sink::poll_close(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
456	}
457}
458
459/// Error generated by sending on a notifications out substream.
460#[derive(Debug, thiserror::Error)]
461pub enum NotificationsHandshakeError {
462	/// I/O error on the substream.
463	#[error(transparent)]
464	Io(#[from] io::Error),
465
466	/// Initial message or handshake was too large.
467	#[error("Initial message or handshake was too large: {requested}")]
468	TooLarge {
469		/// Size requested by the remote.
470		requested: usize,
471		/// Maximum allowed,
472		max: usize,
473	},
474
475	/// Error while decoding the variable-length integer.
476	#[error(transparent)]
477	VarintDecode(#[from] unsigned_varint::decode::Error),
478}
479
480impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError {
481	fn from(err: unsigned_varint::io::ReadError) -> Self {
482		match err {
483			unsigned_varint::io::ReadError::Io(err) => Self::Io(err),
484			unsigned_varint::io::ReadError::Decode(err) => Self::VarintDecode(err),
485			_ => {
486				warn!("Unrecognized varint decoding error");
487				Self::Io(From::from(io::ErrorKind::InvalidData))
488			},
489		}
490	}
491}
492
493/// Error generated by sending on a notifications out substream.
494#[derive(Debug, thiserror::Error)]
495pub enum NotificationsOutError {
496	/// I/O error on the substream.
497	#[error(transparent)]
498	Io(#[from] io::Error),
499	#[error("substream was closed/reset")]
500	Terminated,
501}
502
503#[cfg(test)]
504mod tests {
505	use crate::ProtocolName;
506
507	use super::{
508		NotificationsHandshakeError, NotificationsIn, NotificationsInOpen,
509		NotificationsInSubstream, NotificationsOut, NotificationsOutError, NotificationsOutOpen,
510		NotificationsOutSubstream,
511	};
512	use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt};
513	use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
514	use std::{pin::Pin, task::Poll};
515	use tokio::net::{TcpListener, TcpStream};
516	use tokio_util::compat::TokioAsyncReadCompatExt;
517
518	/// Opens a substream to the given address, negotiates the protocol, and returns the substream
519	/// along with the handshake message.
520	async fn dial(
521		addr: std::net::SocketAddr,
522		handshake: impl Into<Vec<u8>>,
523	) -> Result<
524		(
525			Vec<u8>,
526			NotificationsOutSubstream<
527				multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
528			>,
529		),
530		NotificationsHandshakeError,
531	> {
532		let socket = TcpStream::connect(addr).await.unwrap();
533		let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
534		let (_, substream) = multistream_select::dialer_select_proto(
535			socket.compat(),
536			notifs_out.protocol_info(),
537			upgrade::Version::V1,
538		)
539		.await
540		.unwrap();
541		let NotificationsOutOpen { handshake, substream, .. } =
542			<NotificationsOut as OutboundUpgrade<_>>::upgrade_outbound(
543				notifs_out,
544				substream,
545				"/test/proto/1".into(),
546			)
547			.await?;
548		Ok((handshake, substream))
549	}
550
551	/// Listens on a localhost, negotiates the protocol, and returns the substream along with the
552	/// handshake message.
553	///
554	/// Also sends the listener address through the given channel.
555	async fn listen_on_localhost(
556		listener_addr_tx: oneshot::Sender<std::net::SocketAddr>,
557	) -> Result<
558		(
559			Vec<u8>,
560			NotificationsInSubstream<
561				multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
562			>,
563		),
564		NotificationsHandshakeError,
565	> {
566		let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
567		listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
568
569		let (socket, _) = listener.accept().await.unwrap();
570		let notifs_in = NotificationsIn::new("/test/proto/1", Vec::new(), 1024 * 1024);
571		let (_, substream) =
572			multistream_select::listener_select_proto(socket.compat(), notifs_in.protocol_info())
573				.await
574				.unwrap();
575		let NotificationsInOpen { handshake, substream, .. } =
576			<NotificationsIn as InboundUpgrade<_>>::upgrade_inbound(
577				notifs_in,
578				substream,
579				"/test/proto/1".into(),
580			)
581			.await?;
582		Ok((handshake, substream))
583	}
584
585	#[tokio::test]
586	async fn basic_works() {
587		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
588
589		let client = tokio::spawn(async move {
590			let (handshake, mut substream) =
591				dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await.unwrap();
592
593			assert_eq!(handshake, b"hello world");
594			substream.send(b"test message".to_vec()).await.unwrap();
595		});
596
597		let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
598
599		assert_eq!(handshake, b"initial message");
600		substream.send_handshake(&b"hello world"[..]);
601
602		let msg = substream.next().await.unwrap().unwrap();
603		assert_eq!(msg.as_ref(), b"test message");
604
605		client.await.unwrap();
606	}
607
608	#[tokio::test]
609	async fn empty_handshake() {
610		// Check that everything still works when the handshake messages are empty.
611
612		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
613
614		let client = tokio::spawn(async move {
615			let (handshake, mut substream) =
616				dial(listener_addr_rx.await.unwrap(), vec![]).await.unwrap();
617
618			assert!(handshake.is_empty());
619			substream.send(Default::default()).await.unwrap();
620		});
621
622		let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
623
624		assert!(handshake.is_empty());
625		substream.send_handshake(vec![]);
626
627		let msg = substream.next().await.unwrap().unwrap();
628		assert!(msg.as_ref().is_empty());
629
630		client.await.unwrap();
631	}
632
633	#[tokio::test]
634	async fn refused() {
635		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
636
637		let client = tokio::spawn(async move {
638			let outcome = dial(listener_addr_rx.await.unwrap(), &b"hello"[..]).await;
639
640			// Despite the protocol negotiation being successfully conducted on the listener
641			// side, we have to receive an error here because the listener didn't send the
642			// handshake.
643			assert!(outcome.is_err());
644		});
645
646		let (handshake, substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
647		assert_eq!(handshake, b"hello");
648
649		// We successfully upgrade to the protocol, but then close the substream.
650		drop(substream);
651
652		client.await.unwrap();
653	}
654
655	#[tokio::test]
656	async fn large_initial_message_refused() {
657		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
658
659		let client = tokio::spawn(async move {
660			let ret =
661				dial(listener_addr_rx.await.unwrap(), (0..32768).map(|_| 0).collect::<Vec<_>>())
662					.await;
663			assert!(ret.is_err());
664		});
665
666		let _ret = listen_on_localhost(listener_addr_tx).await;
667		client.await.unwrap();
668	}
669
670	#[tokio::test]
671	async fn large_handshake_refused() {
672		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
673
674		let client = tokio::spawn(async move {
675			let ret = dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await;
676			assert!(ret.is_err());
677		});
678
679		let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
680		assert_eq!(handshake, b"initial message");
681
682		// We check that a handshake that is too large gets refused.
683		substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());
684		let _ = substream.next().await;
685
686		client.await.unwrap();
687	}
688
689	#[tokio::test]
690	async fn send_handshake_without_polling_for_incoming_data() {
691		const PROTO_NAME: &str = "/test/proto/1";
692		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
693
694		let client = tokio::spawn(async move {
695			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
696			let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound(
697				NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
698				socket.compat(),
699				ProtocolName::Static(PROTO_NAME),
700			)
701			.await
702			.unwrap();
703
704			assert_eq!(handshake, b"hello world");
705		});
706
707		let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
708		listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
709
710		let (socket, _) = listener.accept().await.unwrap();
711		let NotificationsInOpen { handshake, mut substream, .. } = InboundUpgrade::upgrade_inbound(
712			NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
713			socket.compat(),
714			ProtocolName::Static(PROTO_NAME),
715		)
716		.await
717		.unwrap();
718
719		assert_eq!(handshake, b"initial message");
720		substream.send_handshake(&b"hello world"[..]);
721
722		// Actually send the handshake.
723		future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
724
725		client.await.unwrap();
726	}
727
728	#[tokio::test]
729	async fn can_detect_dropped_out_substream_without_writing_data() {
730		const PROTO_NAME: &str = "/test/proto/1";
731		let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
732
733		let client = tokio::spawn(async move {
734			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
735			let NotificationsOutOpen { handshake, mut substream, .. } =
736				OutboundUpgrade::upgrade_outbound(
737					NotificationsOut::new(
738						PROTO_NAME,
739						Vec::new(),
740						&b"initial message"[..],
741						1024 * 1024,
742					),
743					socket.compat(),
744					ProtocolName::Static(PROTO_NAME),
745				)
746				.await
747				.unwrap();
748
749			assert_eq!(handshake, b"hello world");
750
751			future::poll_fn(|cx| match Pin::new(&mut substream).poll_flush(cx) {
752				Poll::Pending => Poll::Pending,
753				Poll::Ready(Ok(())) => {
754					cx.waker().wake_by_ref();
755					Poll::Pending
756				},
757				Poll::Ready(Err(e)) => {
758					assert!(matches!(e, NotificationsOutError::Terminated));
759					Poll::Ready(())
760				},
761			})
762			.await;
763		});
764
765		let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
766		listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
767
768		let (socket, _) = listener.accept().await.unwrap();
769		let NotificationsInOpen { handshake, mut substream, .. } = InboundUpgrade::upgrade_inbound(
770			NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
771			socket.compat(),
772			ProtocolName::Static(PROTO_NAME),
773		)
774		.await
775		.unwrap();
776
777		assert_eq!(handshake, b"initial message");
778
779		// Send the handhsake.
780		substream.send_handshake(&b"hello world"[..]);
781		future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
782
783		drop(substream);
784
785		client.await.unwrap();
786	}
787}