referrerpolicy=no-referrer-when-downgrade

sc_telemetry/
transport.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::{
20	prelude::*,
21	ready,
22	task::{Context, Poll},
23};
24use libp2p::{core::transport::timeout::TransportTimeout, Transport};
25use std::{io, pin::Pin, time::Duration};
26
27/// Timeout after which a connection attempt is considered failed. Includes the WebSocket HTTP
28/// upgrading.
29const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
30
31pub(crate) fn initialize_transport() -> Result<WsTrans, io::Error> {
32	let transport = {
33		let tcp_transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new());
34		let inner = libp2p::dns::tokio::Transport::system(tcp_transport)?;
35		libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
36			let connec = connec
37				.with(|item| {
38					let item = libp2p::websocket::framed::OutgoingData::Binary(item);
39					future::ready(Ok::<_, io::Error>(item))
40				})
41				.try_filter_map(|item| async move {
42					if let libp2p::websocket::framed::Incoming::Data(data) = item {
43						Ok(Some(data.into_bytes()))
44					} else {
45						Ok(None)
46					}
47				});
48			future::ready(Ok::<_, io::Error>(connec))
49		})
50	};
51
52	Ok(TransportTimeout::new(
53		transport.map(|out, _| {
54			let out = out
55				.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
56				.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
57			Box::pin(out) as Pin<Box<_>>
58		}),
59		CONNECT_TIMEOUT,
60	)
61	.boxed())
62}
63
64/// A trait that implements `Stream` and `Sink`.
65pub(crate) trait StreamAndSink<I>: Stream + Sink<I> {}
66impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
67
68/// A type alias for the WebSocket transport.
69pub(crate) type WsTrans = libp2p::core::transport::Boxed<
70	Pin<
71		Box<
72			dyn StreamAndSink<Vec<u8>, Item = Result<Vec<u8>, io::Error>, Error = io::Error> + Send,
73		>,
74	>,
75>;
76
77/// Wraps around an `AsyncWrite` and implements `Sink`. Guarantees that each item being sent maps
78/// to one call of `write`.
79#[pin_project::pin_project]
80pub(crate) struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
81
82impl<T> From<T> for StreamSink<T> {
83	fn from(inner: T) -> StreamSink<T> {
84		StreamSink(inner, None)
85	}
86}
87
88impl<T: AsyncRead> Stream for StreamSink<T> {
89	type Item = Result<Vec<u8>, io::Error>;
90
91	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
92		let this = self.project();
93		let mut buf = vec![0; 128];
94		match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
95			Ok(0) => Poll::Ready(None),
96			Ok(n) => {
97				buf.truncate(n);
98				Poll::Ready(Some(Ok(buf)))
99			},
100			Err(err) => Poll::Ready(Some(Err(err))),
101		}
102	}
103}
104
105impl<T: AsyncWrite> StreamSink<T> {
106	fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
107		let this = self.project();
108
109		if let Some(buffer) = this.1 {
110			if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
111				log::error!(target: "telemetry",
112					"Detected some internal buffering happening in the telemetry");
113				let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
114				return Poll::Ready(Err(err))
115			}
116		}
117
118		*this.1 = None;
119		Poll::Ready(Ok(()))
120	}
121}
122
123impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
124	type Error = io::Error;
125
126	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
127		ready!(StreamSink::poll_flush_buffer(self, cx))?;
128		Poll::Ready(Ok(()))
129	}
130
131	fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
132		let this = self.project();
133		debug_assert!(this.1.is_none());
134		*this.1 = Some(item);
135		Ok(())
136	}
137
138	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
139		ready!(self.as_mut().poll_flush_buffer(cx))?;
140		let this = self.project();
141		AsyncWrite::poll_flush(this.0, cx)
142	}
143
144	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
145		ready!(self.as_mut().poll_flush_buffer(cx))?;
146		let this = self.project();
147		AsyncWrite::poll_close(this.0, cx)
148	}
149}