referrerpolicy=no-referrer-when-downgrade

sc_telemetry/
node.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::TelemetryPayload;
20use futures::{channel::mpsc, prelude::*};
21use libp2p::{
22	core::{
23		transport::{DialOpts, PortUse, Transport},
24		Endpoint,
25	},
26	Multiaddr,
27};
28use rand::Rng as _;
29use std::{
30	fmt, mem,
31	pin::Pin,
32	task::{Context, Poll},
33	time::Duration,
34};
35use wasm_timer::Delay;
36
37pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>;
38pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>;
39
40pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver)
41{
42	mpsc::channel(0)
43}
44
45/// Handler for a single telemetry node.
46///
47/// This is a wrapper `Sink` around a network `Sink` with 3 particularities:
48///  - It is infallible: if the connection stops, it will reconnect automatically when the server
49///    becomes available again.
50///  - It holds a list of "connection messages" which are sent automatically when the connection is
51///    (re-)established. This is used for the "system.connected" message that needs to be send for
52///    every substrate node that connects.
53///  - It doesn't stay in pending while waiting for connection. Instead, it moves data into the void
54///    if the connection could not be established. This is important for the `Dispatcher` `Sink`
55///    which we don't want to block if one connection is broken.
56#[derive(Debug)]
57pub(crate) struct Node<TTrans: Transport> {
58	/// Address of the node.
59	addr: Multiaddr,
60	/// State of the connection.
61	socket: NodeSocket<TTrans>,
62	/// Transport used to establish new connections.
63	transport: TTrans,
64	/// Messages that are sent when the connection (re-)establishes.
65	pub(crate) connection_messages: Vec<TelemetryPayload>,
66	/// Notifier for when the connection (re-)establishes.
67	pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
68}
69
70enum NodeSocket<TTrans: Transport> {
71	/// We're connected to the node. This is the normal state.
72	Connected(NodeSocketConnected<TTrans>),
73	/// We are currently dialing the node.
74	Dialing(TTrans::Dial),
75	/// A new connection should be started as soon as possible.
76	ReconnectNow,
77	/// Waiting before attempting to dial again.
78	WaitingReconnect(Delay),
79	/// Temporary transition state.
80	Poisoned,
81}
82
83impl<TTrans: Transport> NodeSocket<TTrans> {
84	fn wait_reconnect() -> NodeSocket<TTrans> {
85		let random_delay = rand::thread_rng().gen_range(10..20);
86		let delay = Delay::new(Duration::from_secs(random_delay));
87		log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
88		NodeSocket::WaitingReconnect(delay)
89	}
90}
91
92struct NodeSocketConnected<TTrans: Transport> {
93	/// Where to send data.
94	sink: TTrans::Output,
95	/// Queue of packets to send before accepting new packets.
96	buf: Vec<Vec<u8>>,
97}
98
99impl<TTrans: Transport> Node<TTrans> {
100	/// Builds a new node handler.
101	pub(crate) fn new(
102		transport: TTrans,
103		addr: Multiaddr,
104		connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
105		telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
106	) -> Self {
107		Node {
108			addr,
109			socket: NodeSocket::ReconnectNow,
110			transport,
111			connection_messages,
112			telemetry_connection_notifier,
113		}
114	}
115}
116
117impl<TTrans: Transport, TSinkErr> Node<TTrans>
118where
119	TTrans::Dial: Unpin,
120	TTrans::Output:
121		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
122	TSinkErr: fmt::Debug,
123{
124	// NOTE: this code has been inspired from `Buffer` (`futures_util::sink::Buffer`).
125	//       https://docs.rs/futures-util/0.3.8/src/futures_util/sink/buffer.rs.html#32
126	fn try_send_connection_messages(
127		self: Pin<&mut Self>,
128		cx: &mut Context<'_>,
129		conn: &mut NodeSocketConnected<TTrans>,
130	) -> Poll<Result<(), TSinkErr>> {
131		while let Some(item) = conn.buf.pop() {
132			if let Err(e) = conn.sink.start_send_unpin(item) {
133				return Poll::Ready(Err(e))
134			}
135			futures::ready!(conn.sink.poll_ready_unpin(cx))?;
136		}
137		Poll::Ready(Ok(()))
138	}
139}
140
141pub(crate) enum Infallible {}
142
143impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
144where
145	TTrans: Unpin,
146	TTrans::Dial: Unpin,
147	TTrans::Output:
148		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
149	TSinkErr: fmt::Debug,
150{
151	type Error = Infallible;
152
153	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
154		let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
155		self.socket = loop {
156			match socket {
157				NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
158					Poll::Ready(Ok(())) => {
159						match self.as_mut().try_send_connection_messages(cx, &mut conn) {
160							Poll::Ready(Err(err)) => {
161								log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
162								socket = NodeSocket::wait_reconnect();
163							},
164							Poll::Ready(Ok(())) => {
165								self.socket = NodeSocket::Connected(conn);
166								return Poll::Ready(Ok(()))
167							},
168							Poll::Pending => {
169								self.socket = NodeSocket::Connected(conn);
170								return Poll::Pending
171							},
172						}
173					},
174					Poll::Ready(Err(err)) => {
175						log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
176						socket = NodeSocket::wait_reconnect();
177					},
178					Poll::Pending => {
179						self.socket = NodeSocket::Connected(conn);
180						return Poll::Pending
181					},
182				},
183				NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
184					Poll::Ready(Ok(sink)) => {
185						log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
186
187						{
188							let mut index = 0;
189							while index < self.telemetry_connection_notifier.len() {
190								let sender = &mut self.telemetry_connection_notifier[index];
191								if let Err(error) = sender.try_send(()) {
192									if !error.is_disconnected() {
193										log::debug!(target: "telemetry", "Failed to send a telemetry connection notification: {}", error);
194									} else {
195										self.telemetry_connection_notifier.swap_remove(index);
196										continue
197									}
198								}
199								index += 1;
200							}
201						}
202
203						let buf = self
204							.connection_messages
205							.iter()
206							.map(|json| {
207								let mut json = json.clone();
208								json.insert(
209									"ts".to_string(),
210									chrono::Local::now().to_rfc3339().into(),
211								);
212								json
213							})
214							.filter_map(|json| match serde_json::to_vec(&json) {
215								Ok(message) => Some(message),
216								Err(err) => {
217									log::error!(
218										target: "telemetry",
219										"An error occurred while generating new connection \
220										messages: {}",
221										err,
222									);
223									None
224								},
225							})
226							.collect();
227
228						socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
229					},
230					Poll::Pending => break NodeSocket::Dialing(s),
231					Poll::Ready(Err(err)) => {
232						log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
233						socket = NodeSocket::wait_reconnect();
234					},
235				},
236				NodeSocket::ReconnectNow => {
237					let addr = self.addr.clone();
238					match self
239						.transport
240						.dial(addr, DialOpts { role: Endpoint::Dialer, port_use: PortUse::New })
241					{
242						Ok(d) => {
243							log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
244							socket = NodeSocket::Dialing(d);
245						},
246						Err(err) => {
247							log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
248							socket = NodeSocket::wait_reconnect();
249						},
250					}
251				},
252				NodeSocket::WaitingReconnect(mut s) => {
253					if Future::poll(Pin::new(&mut s), cx).is_ready() {
254						socket = NodeSocket::ReconnectNow;
255					} else {
256						break NodeSocket::WaitingReconnect(s)
257					}
258				},
259				NodeSocket::Poisoned => {
260					log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
261					break NodeSocket::Poisoned
262				},
263			}
264		};
265
266		// The Dispatcher blocks when the Node syncs blocks. This is why it is important that the
267		// Node sinks don't go into "Pending" state while waiting for reconnection but rather
268		// discard the excess of telemetry messages.
269		Poll::Ready(Ok(()))
270	}
271
272	fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
273		// Any buffered outgoing telemetry messages are discarded while (re-)connecting.
274		match &mut self.socket {
275			NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
276				Ok(data) => {
277					log::trace!(target: "telemetry", "Sending {} bytes", data.len());
278					let _ = conn.sink.start_send_unpin(data);
279				},
280				Err(err) => log::debug!(
281					target: "telemetry",
282					"Could not serialize payload: {}",
283					err,
284				),
285			},
286			// We are currently dialing the node.
287			NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
288			// A new connection should be started as soon as possible.
289			NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
290			// Waiting before attempting to dial again.
291			NodeSocket::WaitingReconnect(_) => {},
292			// Temporary transition state.
293			NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
294		}
295		Ok(())
296	}
297
298	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
299		match &mut self.socket {
300			NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
301				Poll::Ready(Err(e)) => {
302					// When `telemetry` closes the websocket connection we end
303					// up here, which is sub-optimal. See
304					// https://github.com/libp2p/rust-libp2p/issues/2021 for
305					// what we could do to improve this.
306					log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
307					self.socket = NodeSocket::wait_reconnect();
308					Poll::Ready(Ok(()))
309				},
310				Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
311				Poll::Pending => Poll::Pending,
312			},
313			_ => Poll::Ready(Ok(())),
314		}
315	}
316
317	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
318		match &mut self.socket {
319			NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
320			_ => Poll::Ready(Ok(())),
321		}
322	}
323}
324
325impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
326	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327		use NodeSocket::*;
328		f.write_str(match self {
329			Connected(_) => "Connected",
330			Dialing(_) => "Dialing",
331			ReconnectNow => "ReconnectNow",
332			WaitingReconnect(_) => "WaitingReconnect",
333			Poisoned => "Poisoned",
334		})
335	}
336}