1use crate::types::ProtocolName;
38
39use asynchronous_codec::Framed;
40use bytes::BytesMut;
41use futures::prelude::*;
42use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
43use log::{error, warn};
44use unsigned_varint::codec::UviBytes;
45
46use std::{
47 io, mem,
48 pin::Pin,
49 task::{Context, Poll},
50 vec,
51};
52
53const MAX_HANDSHAKE_SIZE: usize = 1024;
55
56#[derive(Debug, Clone)]
59pub struct NotificationsIn {
60 protocol_names: Vec<ProtocolName>,
63 max_notification_size: u64,
65}
66
67#[derive(Debug, Clone)]
70pub struct NotificationsOut {
71 protocol_names: Vec<ProtocolName>,
74 initial_message: Vec<u8>,
76 max_notification_size: u64,
78}
79
80#[pin_project::pin_project]
85pub struct NotificationsInSubstream<TSubstream> {
86 #[pin]
87 socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
88 handshake: NotificationsInSubstreamHandshake,
89}
90
91#[derive(Debug)]
93pub enum NotificationsInSubstreamHandshake {
94 NotSent,
96 PendingSend(Vec<u8>),
98 Flush,
100 Sent,
102 ClosingInResponseToRemote,
104 BothSidesClosed,
106}
107
108#[pin_project::pin_project]
110pub struct NotificationsOutSubstream<TSubstream> {
111 #[pin]
113 socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
114}
115
116#[cfg(test)]
117impl<TSubstream> NotificationsOutSubstream<TSubstream> {
118 pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
119 Self { socket }
120 }
121}
122
123impl NotificationsIn {
124 pub fn new(
126 main_protocol_name: impl Into<ProtocolName>,
127 fallback_names: Vec<ProtocolName>,
128 max_notification_size: u64,
129 ) -> Self {
130 let mut protocol_names = fallback_names;
131 protocol_names.insert(0, main_protocol_name.into());
132
133 Self { protocol_names, max_notification_size }
134 }
135}
136
137impl UpgradeInfo for NotificationsIn {
138 type Info = ProtocolName;
139 type InfoIter = vec::IntoIter<Self::Info>;
140
141 fn protocol_info(&self) -> Self::InfoIter {
142 self.protocol_names.clone().into_iter()
143 }
144}
145
146impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn
147where
148 TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
149{
150 type Output = NotificationsInOpen<TSubstream>;
151 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
152 type Error = NotificationsHandshakeError;
153
154 fn upgrade_inbound(self, mut socket: TSubstream, _negotiated_name: Self::Info) -> Self::Future {
155 Box::pin(async move {
156 let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
157 if handshake_len > MAX_HANDSHAKE_SIZE {
158 return Err(NotificationsHandshakeError::TooLarge {
159 requested: handshake_len,
160 max: MAX_HANDSHAKE_SIZE,
161 })
162 }
163
164 let mut handshake = vec![0u8; handshake_len];
165 if !handshake.is_empty() {
166 socket.read_exact(&mut handshake).await?;
167 }
168
169 let mut codec = UviBytes::default();
170 codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::MAX));
171
172 let substream = NotificationsInSubstream {
173 socket: Framed::new(socket, codec),
174 handshake: NotificationsInSubstreamHandshake::NotSent,
175 };
176
177 Ok(NotificationsInOpen { handshake, substream })
178 })
179 }
180}
181
182pub struct NotificationsInOpen<TSubstream> {
184 pub handshake: Vec<u8>,
186 pub substream: NotificationsInSubstream<TSubstream>,
188}
189
190impl<TSubstream> NotificationsInSubstream<TSubstream>
191where
192 TSubstream: AsyncRead + AsyncWrite + Unpin,
193{
194 #[cfg(test)]
195 pub fn new(
196 socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
197 handshake: NotificationsInSubstreamHandshake,
198 ) -> Self {
199 Self { socket, handshake }
200 }
201
202 pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
204 if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
205 error!(target: "sub-libp2p", "Tried to send handshake twice");
206 return
207 }
208
209 self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
210 }
211
212 pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
215 let mut this = self.project();
216
217 loop {
218 match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
219 NotificationsInSubstreamHandshake::PendingSend(msg) => {
220 match Sink::poll_ready(this.socket.as_mut(), cx) {
221 Poll::Ready(_) => {
222 *this.handshake = NotificationsInSubstreamHandshake::Flush;
223 match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
224 Ok(()) => {},
225 Err(err) => return Poll::Ready(Err(err)),
226 }
227 },
228 Poll::Pending => {
229 *this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
230 return Poll::Pending
231 },
232 }
233 },
234 NotificationsInSubstreamHandshake::Flush => {
235 match Sink::poll_flush(this.socket.as_mut(), cx)? {
236 Poll::Ready(()) => {
237 *this.handshake = NotificationsInSubstreamHandshake::Sent;
238 return Poll::Ready(Ok(()));
239 },
240 Poll::Pending => {
241 *this.handshake = NotificationsInSubstreamHandshake::Flush;
242 return Poll::Pending
243 },
244 }
245 },
246
247 st @ NotificationsInSubstreamHandshake::NotSent |
248 st @ NotificationsInSubstreamHandshake::Sent |
249 st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
250 st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
251 *this.handshake = st;
252 return Poll::Ready(Ok(()));
253 },
254 }
255 }
256 }
257}
258
259impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>
260where
261 TSubstream: AsyncRead + AsyncWrite + Unpin,
262{
263 type Item = Result<BytesMut, io::Error>;
264
265 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
266 let mut this = self.project();
267
268 loop {
270 match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
271 NotificationsInSubstreamHandshake::NotSent => {
272 *this.handshake = NotificationsInSubstreamHandshake::NotSent;
273 return Poll::Pending
274 },
275 NotificationsInSubstreamHandshake::PendingSend(msg) => {
276 match Sink::poll_ready(this.socket.as_mut(), cx) {
277 Poll::Ready(_) => {
278 *this.handshake = NotificationsInSubstreamHandshake::Flush;
279 match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
280 Ok(()) => {},
281 Err(err) => return Poll::Ready(Some(Err(err))),
282 }
283 },
284 Poll::Pending => {
285 *this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
286 return Poll::Pending
287 },
288 }
289 },
290 NotificationsInSubstreamHandshake::Flush => {
291 match Sink::poll_flush(this.socket.as_mut(), cx)? {
292 Poll::Ready(()) =>
293 *this.handshake = NotificationsInSubstreamHandshake::Sent,
294 Poll::Pending => {
295 *this.handshake = NotificationsInSubstreamHandshake::Flush;
296 return Poll::Pending
297 },
298 }
299 },
300
301 NotificationsInSubstreamHandshake::Sent => {
302 match Stream::poll_next(this.socket.as_mut(), cx) {
303 Poll::Ready(None) =>
304 *this.handshake =
305 NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
306 Poll::Ready(Some(msg)) => {
307 *this.handshake = NotificationsInSubstreamHandshake::Sent;
308 return Poll::Ready(Some(msg))
309 },
310 Poll::Pending => {
311 *this.handshake = NotificationsInSubstreamHandshake::Sent;
312 return Poll::Pending
313 },
314 }
315 },
316
317 NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
318 match Sink::poll_close(this.socket.as_mut(), cx)? {
319 Poll::Ready(()) =>
320 *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
321 Poll::Pending => {
322 *this.handshake =
323 NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
324 return Poll::Pending
325 },
326 },
327
328 NotificationsInSubstreamHandshake::BothSidesClosed => return Poll::Ready(None),
329 }
330 }
331 }
332}
333
334impl NotificationsOut {
335 pub fn new(
337 main_protocol_name: impl Into<ProtocolName>,
338 fallback_names: Vec<ProtocolName>,
339 initial_message: impl Into<Vec<u8>>,
340 max_notification_size: u64,
341 ) -> Self {
342 let initial_message = initial_message.into();
343 if initial_message.len() > MAX_HANDSHAKE_SIZE {
344 error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
345 }
346
347 let mut protocol_names = fallback_names;
348 protocol_names.insert(0, main_protocol_name.into());
349
350 Self { protocol_names, initial_message, max_notification_size }
351 }
352}
353
354impl UpgradeInfo for NotificationsOut {
355 type Info = ProtocolName;
356 type InfoIter = vec::IntoIter<Self::Info>;
357
358 fn protocol_info(&self) -> Self::InfoIter {
359 self.protocol_names.clone().into_iter()
360 }
361}
362
363impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut
364where
365 TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
366{
367 type Output = NotificationsOutOpen<TSubstream>;
368 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
369 type Error = NotificationsHandshakeError;
370
371 fn upgrade_outbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
372 Box::pin(async move {
373 upgrade::write_length_prefixed(&mut socket, &self.initial_message).await?;
374
375 let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
377 if handshake_len > MAX_HANDSHAKE_SIZE {
378 return Err(NotificationsHandshakeError::TooLarge {
379 requested: handshake_len,
380 max: MAX_HANDSHAKE_SIZE,
381 })
382 }
383
384 let mut handshake = vec![0u8; handshake_len];
385 if !handshake.is_empty() {
386 socket.read_exact(&mut handshake).await?;
387 }
388
389 let mut codec = UviBytes::default();
390 codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::MAX));
391
392 Ok(NotificationsOutOpen {
393 handshake,
394 negotiated_fallback: if negotiated_name == self.protocol_names[0] {
395 None
396 } else {
397 Some(negotiated_name)
398 },
399 substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
400 })
401 })
402 }
403}
404
405pub struct NotificationsOutOpen<TSubstream> {
407 pub handshake: Vec<u8>,
409 pub negotiated_fallback: Option<ProtocolName>,
412 pub substream: NotificationsOutSubstream<TSubstream>,
414}
415
416impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
417where
418 TSubstream: AsyncRead + AsyncWrite + Unpin,
419{
420 type Error = NotificationsOutError;
421
422 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
423 let mut this = self.project();
424 Sink::poll_ready(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
425 }
426
427 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
428 let mut this = self.project();
429 Sink::start_send(this.socket.as_mut(), io::Cursor::new(item))
430 .map_err(NotificationsOutError::Io)
431 }
432
433 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
434 let mut this = self.project();
435
436 match Stream::poll_next(this.socket.as_mut(), cx) {
440 Poll::Pending => {},
441 Poll::Ready(Some(_)) => {
442 error!(
443 target: "sub-libp2p",
444 "Unexpected incoming data in `NotificationsOutSubstream`",
445 );
446 },
447 Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
448 }
449
450 Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
451 }
452
453 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
454 let mut this = self.project();
455 Sink::poll_close(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
456 }
457}
458
459#[derive(Debug, thiserror::Error)]
461pub enum NotificationsHandshakeError {
462 #[error(transparent)]
464 Io(#[from] io::Error),
465
466 #[error("Initial message or handshake was too large: {requested}")]
468 TooLarge {
469 requested: usize,
471 max: usize,
473 },
474
475 #[error(transparent)]
477 VarintDecode(#[from] unsigned_varint::decode::Error),
478}
479
480impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError {
481 fn from(err: unsigned_varint::io::ReadError) -> Self {
482 match err {
483 unsigned_varint::io::ReadError::Io(err) => Self::Io(err),
484 unsigned_varint::io::ReadError::Decode(err) => Self::VarintDecode(err),
485 _ => {
486 warn!("Unrecognized varint decoding error");
487 Self::Io(From::from(io::ErrorKind::InvalidData))
488 },
489 }
490 }
491}
492
493#[derive(Debug, thiserror::Error)]
495pub enum NotificationsOutError {
496 #[error(transparent)]
498 Io(#[from] io::Error),
499 #[error("substream was closed/reset")]
500 Terminated,
501}
502
503#[cfg(test)]
504mod tests {
505 use crate::ProtocolName;
506
507 use super::{
508 NotificationsHandshakeError, NotificationsIn, NotificationsInOpen,
509 NotificationsInSubstream, NotificationsOut, NotificationsOutError, NotificationsOutOpen,
510 NotificationsOutSubstream,
511 };
512 use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt};
513 use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
514 use std::{pin::Pin, task::Poll};
515 use tokio::net::{TcpListener, TcpStream};
516 use tokio_util::compat::TokioAsyncReadCompatExt;
517
518 async fn dial(
521 addr: std::net::SocketAddr,
522 handshake: impl Into<Vec<u8>>,
523 ) -> Result<
524 (
525 Vec<u8>,
526 NotificationsOutSubstream<
527 multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
528 >,
529 ),
530 NotificationsHandshakeError,
531 > {
532 let socket = TcpStream::connect(addr).await.unwrap();
533 let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
534 let (_, substream) = multistream_select::dialer_select_proto(
535 socket.compat(),
536 notifs_out.protocol_info(),
537 upgrade::Version::V1,
538 )
539 .await
540 .unwrap();
541 let NotificationsOutOpen { handshake, substream, .. } =
542 <NotificationsOut as OutboundUpgrade<_>>::upgrade_outbound(
543 notifs_out,
544 substream,
545 "/test/proto/1".into(),
546 )
547 .await?;
548 Ok((handshake, substream))
549 }
550
551 async fn listen_on_localhost(
556 listener_addr_tx: oneshot::Sender<std::net::SocketAddr>,
557 ) -> Result<
558 (
559 Vec<u8>,
560 NotificationsInSubstream<
561 multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
562 >,
563 ),
564 NotificationsHandshakeError,
565 > {
566 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
567 listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
568
569 let (socket, _) = listener.accept().await.unwrap();
570 let notifs_in = NotificationsIn::new("/test/proto/1", Vec::new(), 1024 * 1024);
571 let (_, substream) =
572 multistream_select::listener_select_proto(socket.compat(), notifs_in.protocol_info())
573 .await
574 .unwrap();
575 let NotificationsInOpen { handshake, substream, .. } =
576 <NotificationsIn as InboundUpgrade<_>>::upgrade_inbound(
577 notifs_in,
578 substream,
579 "/test/proto/1".into(),
580 )
581 .await?;
582 Ok((handshake, substream))
583 }
584
585 #[tokio::test]
586 async fn basic_works() {
587 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
588
589 let client = tokio::spawn(async move {
590 let (handshake, mut substream) =
591 dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await.unwrap();
592
593 assert_eq!(handshake, b"hello world");
594 substream.send(b"test message".to_vec()).await.unwrap();
595 });
596
597 let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
598
599 assert_eq!(handshake, b"initial message");
600 substream.send_handshake(&b"hello world"[..]);
601
602 let msg = substream.next().await.unwrap().unwrap();
603 assert_eq!(msg.as_ref(), b"test message");
604
605 client.await.unwrap();
606 }
607
608 #[tokio::test]
609 async fn empty_handshake() {
610 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
613
614 let client = tokio::spawn(async move {
615 let (handshake, mut substream) =
616 dial(listener_addr_rx.await.unwrap(), vec![]).await.unwrap();
617
618 assert!(handshake.is_empty());
619 substream.send(Default::default()).await.unwrap();
620 });
621
622 let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
623
624 assert!(handshake.is_empty());
625 substream.send_handshake(vec![]);
626
627 let msg = substream.next().await.unwrap().unwrap();
628 assert!(msg.as_ref().is_empty());
629
630 client.await.unwrap();
631 }
632
633 #[tokio::test]
634 async fn refused() {
635 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
636
637 let client = tokio::spawn(async move {
638 let outcome = dial(listener_addr_rx.await.unwrap(), &b"hello"[..]).await;
639
640 assert!(outcome.is_err());
644 });
645
646 let (handshake, substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
647 assert_eq!(handshake, b"hello");
648
649 drop(substream);
651
652 client.await.unwrap();
653 }
654
655 #[tokio::test]
656 async fn large_initial_message_refused() {
657 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
658
659 let client = tokio::spawn(async move {
660 let ret =
661 dial(listener_addr_rx.await.unwrap(), (0..32768).map(|_| 0).collect::<Vec<_>>())
662 .await;
663 assert!(ret.is_err());
664 });
665
666 let _ret = listen_on_localhost(listener_addr_tx).await;
667 client.await.unwrap();
668 }
669
670 #[tokio::test]
671 async fn large_handshake_refused() {
672 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
673
674 let client = tokio::spawn(async move {
675 let ret = dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await;
676 assert!(ret.is_err());
677 });
678
679 let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
680 assert_eq!(handshake, b"initial message");
681
682 substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());
684 let _ = substream.next().await;
685
686 client.await.unwrap();
687 }
688
689 #[tokio::test]
690 async fn send_handshake_without_polling_for_incoming_data() {
691 const PROTO_NAME: &str = "/test/proto/1";
692 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
693
694 let client = tokio::spawn(async move {
695 let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
696 let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound(
697 NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
698 socket.compat(),
699 ProtocolName::Static(PROTO_NAME),
700 )
701 .await
702 .unwrap();
703
704 assert_eq!(handshake, b"hello world");
705 });
706
707 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
708 listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
709
710 let (socket, _) = listener.accept().await.unwrap();
711 let NotificationsInOpen { handshake, mut substream, .. } = InboundUpgrade::upgrade_inbound(
712 NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
713 socket.compat(),
714 ProtocolName::Static(PROTO_NAME),
715 )
716 .await
717 .unwrap();
718
719 assert_eq!(handshake, b"initial message");
720 substream.send_handshake(&b"hello world"[..]);
721
722 future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
724
725 client.await.unwrap();
726 }
727
728 #[tokio::test]
729 async fn can_detect_dropped_out_substream_without_writing_data() {
730 const PROTO_NAME: &str = "/test/proto/1";
731 let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
732
733 let client = tokio::spawn(async move {
734 let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
735 let NotificationsOutOpen { handshake, mut substream, .. } =
736 OutboundUpgrade::upgrade_outbound(
737 NotificationsOut::new(
738 PROTO_NAME,
739 Vec::new(),
740 &b"initial message"[..],
741 1024 * 1024,
742 ),
743 socket.compat(),
744 ProtocolName::Static(PROTO_NAME),
745 )
746 .await
747 .unwrap();
748
749 assert_eq!(handshake, b"hello world");
750
751 future::poll_fn(|cx| match Pin::new(&mut substream).poll_flush(cx) {
752 Poll::Pending => Poll::Pending,
753 Poll::Ready(Ok(())) => {
754 cx.waker().wake_by_ref();
755 Poll::Pending
756 },
757 Poll::Ready(Err(e)) => {
758 assert!(matches!(e, NotificationsOutError::Terminated));
759 Poll::Ready(())
760 },
761 })
762 .await;
763 });
764
765 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
766 listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
767
768 let (socket, _) = listener.accept().await.unwrap();
769 let NotificationsInOpen { handshake, mut substream, .. } = InboundUpgrade::upgrade_inbound(
770 NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
771 socket.compat(),
772 ProtocolName::Static(PROTO_NAME),
773 )
774 .await
775 .unwrap();
776
777 assert_eq!(handshake, b"initial message");
778
779 substream.send_handshake(&b"hello world"[..]);
781 future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
782
783 drop(substream);
784
785 client.await.unwrap();
786 }
787}