sc_telemetry/
transport.rs1use futures::{
20 prelude::*,
21 ready,
22 task::{Context, Poll},
23};
24use libp2p::{core::transport::timeout::TransportTimeout, Transport};
25use std::{io, pin::Pin, time::Duration};
26
27const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
30
31pub(crate) fn initialize_transport() -> Result<WsTrans, io::Error> {
32 let transport = {
33 let tcp_transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new());
34 let inner = libp2p::dns::tokio::Transport::system(tcp_transport)?;
35 libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
36 let connec = connec
37 .with(|item| {
38 let item = libp2p::websocket::framed::OutgoingData::Binary(item);
39 future::ready(Ok::<_, io::Error>(item))
40 })
41 .try_filter_map(|item| async move {
42 if let libp2p::websocket::framed::Incoming::Data(data) = item {
43 Ok(Some(data.into_bytes()))
44 } else {
45 Ok(None)
46 }
47 });
48 future::ready(Ok::<_, io::Error>(connec))
49 })
50 };
51
52 Ok(TransportTimeout::new(
53 transport.map(|out, _| {
54 let out = out
55 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
56 .sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
57 Box::pin(out) as Pin<Box<_>>
58 }),
59 CONNECT_TIMEOUT,
60 )
61 .boxed())
62}
63
64pub(crate) trait StreamAndSink<I>: Stream + Sink<I> {}
66impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
67
68pub(crate) type WsTrans = libp2p::core::transport::Boxed<
70 Pin<
71 Box<
72 dyn StreamAndSink<Vec<u8>, Item = Result<Vec<u8>, io::Error>, Error = io::Error> + Send,
73 >,
74 >,
75>;
76
77#[pin_project::pin_project]
80pub(crate) struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
81
82impl<T> From<T> for StreamSink<T> {
83 fn from(inner: T) -> StreamSink<T> {
84 StreamSink(inner, None)
85 }
86}
87
88impl<T: AsyncRead> Stream for StreamSink<T> {
89 type Item = Result<Vec<u8>, io::Error>;
90
91 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
92 let this = self.project();
93 let mut buf = vec![0; 128];
94 match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
95 Ok(0) => Poll::Ready(None),
96 Ok(n) => {
97 buf.truncate(n);
98 Poll::Ready(Some(Ok(buf)))
99 },
100 Err(err) => Poll::Ready(Some(Err(err))),
101 }
102 }
103}
104
105impl<T: AsyncWrite> StreamSink<T> {
106 fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
107 let this = self.project();
108
109 if let Some(buffer) = this.1 {
110 if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
111 log::error!(target: "telemetry",
112 "Detected some internal buffering happening in the telemetry");
113 let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
114 return Poll::Ready(Err(err))
115 }
116 }
117
118 *this.1 = None;
119 Poll::Ready(Ok(()))
120 }
121}
122
123impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
124 type Error = io::Error;
125
126 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
127 ready!(StreamSink::poll_flush_buffer(self, cx))?;
128 Poll::Ready(Ok(()))
129 }
130
131 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
132 let this = self.project();
133 debug_assert!(this.1.is_none());
134 *this.1 = Some(item);
135 Ok(())
136 }
137
138 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
139 ready!(self.as_mut().poll_flush_buffer(cx))?;
140 let this = self.project();
141 AsyncWrite::poll_flush(this.0, cx)
142 }
143
144 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
145 ready!(self.as_mut().poll_flush_buffer(cx))?;
146 let this = self.project();
147 AsyncWrite::poll_close(this.0, cx)
148 }
149}