sc_telemetry/
node.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::TelemetryPayload;
20use futures::{channel::mpsc, prelude::*};
21use libp2p::{core::transport::Transport, Multiaddr};
22use rand::Rng as _;
23use std::{
24	fmt, mem,
25	pin::Pin,
26	task::{Context, Poll},
27	time::Duration,
28};
29use wasm_timer::Delay;
30
31pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>;
32pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>;
33
34pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver)
35{
36	mpsc::channel(0)
37}
38
39/// Handler for a single telemetry node.
40///
41/// This is a wrapper `Sink` around a network `Sink` with 3 particularities:
42///  - It is infallible: if the connection stops, it will reconnect automatically when the server
43///    becomes available again.
44///  - It holds a list of "connection messages" which are sent automatically when the connection is
45///    (re-)established. This is used for the "system.connected" message that needs to be send for
46///    every substrate node that connects.
47///  - It doesn't stay in pending while waiting for connection. Instead, it moves data into the void
48///    if the connection could not be established. This is important for the `Dispatcher` `Sink`
49///    which we don't want to block if one connection is broken.
50#[derive(Debug)]
51pub(crate) struct Node<TTrans: Transport> {
52	/// Address of the node.
53	addr: Multiaddr,
54	/// State of the connection.
55	socket: NodeSocket<TTrans>,
56	/// Transport used to establish new connections.
57	transport: TTrans,
58	/// Messages that are sent when the connection (re-)establishes.
59	pub(crate) connection_messages: Vec<TelemetryPayload>,
60	/// Notifier for when the connection (re-)establishes.
61	pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
62}
63
64enum NodeSocket<TTrans: Transport> {
65	/// We're connected to the node. This is the normal state.
66	Connected(NodeSocketConnected<TTrans>),
67	/// We are currently dialing the node.
68	Dialing(TTrans::Dial),
69	/// A new connection should be started as soon as possible.
70	ReconnectNow,
71	/// Waiting before attempting to dial again.
72	WaitingReconnect(Delay),
73	/// Temporary transition state.
74	Poisoned,
75}
76
77impl<TTrans: Transport> NodeSocket<TTrans> {
78	fn wait_reconnect() -> NodeSocket<TTrans> {
79		let random_delay = rand::thread_rng().gen_range(10..20);
80		let delay = Delay::new(Duration::from_secs(random_delay));
81		log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
82		NodeSocket::WaitingReconnect(delay)
83	}
84}
85
86struct NodeSocketConnected<TTrans: Transport> {
87	/// Where to send data.
88	sink: TTrans::Output,
89	/// Queue of packets to send before accepting new packets.
90	buf: Vec<Vec<u8>>,
91}
92
93impl<TTrans: Transport> Node<TTrans> {
94	/// Builds a new node handler.
95	pub(crate) fn new(
96		transport: TTrans,
97		addr: Multiaddr,
98		connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
99		telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
100	) -> Self {
101		Node {
102			addr,
103			socket: NodeSocket::ReconnectNow,
104			transport,
105			connection_messages,
106			telemetry_connection_notifier,
107		}
108	}
109}
110
111impl<TTrans: Transport, TSinkErr> Node<TTrans>
112where
113	TTrans::Dial: Unpin,
114	TTrans::Output:
115		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
116	TSinkErr: fmt::Debug,
117{
118	// NOTE: this code has been inspired from `Buffer` (`futures_util::sink::Buffer`).
119	//       https://docs.rs/futures-util/0.3.8/src/futures_util/sink/buffer.rs.html#32
120	fn try_send_connection_messages(
121		self: Pin<&mut Self>,
122		cx: &mut Context<'_>,
123		conn: &mut NodeSocketConnected<TTrans>,
124	) -> Poll<Result<(), TSinkErr>> {
125		while let Some(item) = conn.buf.pop() {
126			if let Err(e) = conn.sink.start_send_unpin(item) {
127				return Poll::Ready(Err(e))
128			}
129			futures::ready!(conn.sink.poll_ready_unpin(cx))?;
130		}
131		Poll::Ready(Ok(()))
132	}
133}
134
135pub(crate) enum Infallible {}
136
137impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
138where
139	TTrans: Unpin,
140	TTrans::Dial: Unpin,
141	TTrans::Output:
142		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
143	TSinkErr: fmt::Debug,
144{
145	type Error = Infallible;
146
147	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
148		let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
149		self.socket = loop {
150			match socket {
151				NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
152					Poll::Ready(Ok(())) => {
153						match self.as_mut().try_send_connection_messages(cx, &mut conn) {
154							Poll::Ready(Err(err)) => {
155								log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
156								socket = NodeSocket::wait_reconnect();
157							},
158							Poll::Ready(Ok(())) => {
159								self.socket = NodeSocket::Connected(conn);
160								return Poll::Ready(Ok(()))
161							},
162							Poll::Pending => {
163								self.socket = NodeSocket::Connected(conn);
164								return Poll::Pending
165							},
166						}
167					},
168					Poll::Ready(Err(err)) => {
169						log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
170						socket = NodeSocket::wait_reconnect();
171					},
172					Poll::Pending => {
173						self.socket = NodeSocket::Connected(conn);
174						return Poll::Pending
175					},
176				},
177				NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
178					Poll::Ready(Ok(sink)) => {
179						log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
180
181						{
182							let mut index = 0;
183							while index < self.telemetry_connection_notifier.len() {
184								let sender = &mut self.telemetry_connection_notifier[index];
185								if let Err(error) = sender.try_send(()) {
186									if !error.is_disconnected() {
187										log::debug!(target: "telemetry", "Failed to send a telemetry connection notification: {}", error);
188									} else {
189										self.telemetry_connection_notifier.swap_remove(index);
190										continue
191									}
192								}
193								index += 1;
194							}
195						}
196
197						let buf = self
198							.connection_messages
199							.iter()
200							.map(|json| {
201								let mut json = json.clone();
202								json.insert(
203									"ts".to_string(),
204									chrono::Local::now().to_rfc3339().into(),
205								);
206								json
207							})
208							.filter_map(|json| match serde_json::to_vec(&json) {
209								Ok(message) => Some(message),
210								Err(err) => {
211									log::error!(
212										target: "telemetry",
213										"An error occurred while generating new connection \
214										messages: {}",
215										err,
216									);
217									None
218								},
219							})
220							.collect();
221
222						socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
223					},
224					Poll::Pending => break NodeSocket::Dialing(s),
225					Poll::Ready(Err(err)) => {
226						log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
227						socket = NodeSocket::wait_reconnect();
228					},
229				},
230				NodeSocket::ReconnectNow => {
231					let addr = self.addr.clone();
232					match self.transport.dial(addr) {
233						Ok(d) => {
234							log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
235							socket = NodeSocket::Dialing(d);
236						},
237						Err(err) => {
238							log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
239							socket = NodeSocket::wait_reconnect();
240						},
241					}
242				},
243				NodeSocket::WaitingReconnect(mut s) => {
244					if Future::poll(Pin::new(&mut s), cx).is_ready() {
245						socket = NodeSocket::ReconnectNow;
246					} else {
247						break NodeSocket::WaitingReconnect(s)
248					}
249				},
250				NodeSocket::Poisoned => {
251					log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
252					break NodeSocket::Poisoned
253				},
254			}
255		};
256
257		// The Dispatcher blocks when the Node syncs blocks. This is why it is important that the
258		// Node sinks don't go into "Pending" state while waiting for reconnection but rather
259		// discard the excess of telemetry messages.
260		Poll::Ready(Ok(()))
261	}
262
263	fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
264		// Any buffered outgoing telemetry messages are discarded while (re-)connecting.
265		match &mut self.socket {
266			NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
267				Ok(data) => {
268					log::trace!(target: "telemetry", "Sending {} bytes", data.len());
269					let _ = conn.sink.start_send_unpin(data);
270				},
271				Err(err) => log::debug!(
272					target: "telemetry",
273					"Could not serialize payload: {}",
274					err,
275				),
276			},
277			// We are currently dialing the node.
278			NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
279			// A new connection should be started as soon as possible.
280			NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
281			// Waiting before attempting to dial again.
282			NodeSocket::WaitingReconnect(_) => {},
283			// Temporary transition state.
284			NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
285		}
286		Ok(())
287	}
288
289	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290		match &mut self.socket {
291			NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
292				Poll::Ready(Err(e)) => {
293					// When `telemetry` closes the websocket connection we end
294					// up here, which is sub-optimal. See
295					// https://github.com/libp2p/rust-libp2p/issues/2021 for
296					// what we could do to improve this.
297					log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
298					self.socket = NodeSocket::wait_reconnect();
299					Poll::Ready(Ok(()))
300				},
301				Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
302				Poll::Pending => Poll::Pending,
303			},
304			_ => Poll::Ready(Ok(())),
305		}
306	}
307
308	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
309		match &mut self.socket {
310			NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
311			_ => Poll::Ready(Ok(())),
312		}
313	}
314}
315
316impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
317	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
318		use NodeSocket::*;
319		f.write_str(match self {
320			Connected(_) => "Connected",
321			Dialing(_) => "Dialing",
322			ReconnectNow => "ReconnectNow",
323			WaitingReconnect(_) => "WaitingReconnect",
324			Poisoned => "Poisoned",
325		})
326	}
327}