litep2p/transport/tcp/
substream.rs1use crate::{protocol::Permit, BandwidthSink};
22
23use tokio::io::{AsyncRead, AsyncWrite};
24use tokio_util::compat::Compat;
25
26use std::{
27 io,
28 pin::Pin,
29 task::{Context, Poll},
30};
31
32#[derive(Debug)]
37pub struct Substream {
38 io: Compat<crate::yamux::Stream>,
40
41 bandwidth_sink: BandwidthSink,
43
44 _permit: Permit,
46}
47
48impl Substream {
49 pub fn new(
51 io: Compat<crate::yamux::Stream>,
52 bandwidth_sink: BandwidthSink,
53 _permit: Permit,
54 ) -> Self {
55 Self {
56 io,
57 bandwidth_sink,
58 _permit,
59 }
60 }
61}
62
63impl AsyncRead for Substream {
64 fn poll_read(
65 mut self: Pin<&mut Self>,
66 cx: &mut Context<'_>,
67 buf: &mut tokio::io::ReadBuf<'_>,
68 ) -> Poll<io::Result<()>> {
69 match futures::ready!(Pin::new(&mut self.io).poll_read(cx, buf)) {
70 Err(error) => Poll::Ready(Err(error)),
71 Ok(res) => {
72 self.bandwidth_sink.increase_inbound(buf.filled().len());
73 Poll::Ready(Ok(res))
74 }
75 }
76 }
77}
78
79impl AsyncWrite for Substream {
80 fn poll_write(
81 mut self: Pin<&mut Self>,
82 cx: &mut Context<'_>,
83 buf: &[u8],
84 ) -> Poll<Result<usize, io::Error>> {
85 match futures::ready!(Pin::new(&mut self.io).poll_write(cx, buf)) {
86 Err(error) => Poll::Ready(Err(error)),
87 Ok(nwritten) => {
88 self.bandwidth_sink.increase_outbound(nwritten);
89 Poll::Ready(Ok(nwritten))
90 }
91 }
92 }
93
94 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
95 Pin::new(&mut self.io).poll_flush(cx)
96 }
97
98 fn poll_shutdown(
99 mut self: Pin<&mut Self>,
100 cx: &mut Context<'_>,
101 ) -> Poll<Result<(), io::Error>> {
102 Pin::new(&mut self.io).poll_shutdown(cx)
103 }
104
105 fn poll_write_vectored(
106 mut self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 bufs: &[io::IoSlice<'_>],
109 ) -> Poll<Result<usize, io::Error>> {
110 match futures::ready!(Pin::new(&mut self.io).poll_write_vectored(cx, bufs)) {
111 Err(error) => Poll::Ready(Err(error)),
112 Ok(nwritten) => {
113 self.bandwidth_sink.increase_outbound(nwritten);
114 Poll::Ready(Ok(nwritten))
115 }
116 }
117 }
118
119 fn is_write_vectored(&self) -> bool {
120 self.io.is_write_vectored()
121 }
122}