1use crate::TelemetryPayload;
20use futures::{channel::mpsc, prelude::*};
21use libp2p::{
22 core::{
23 transport::{DialOpts, PortUse, Transport},
24 Endpoint,
25 },
26 Multiaddr,
27};
28use rand::Rng as _;
29use std::{
30 fmt, mem,
31 pin::Pin,
32 task::{Context, Poll},
33 time::Duration,
34};
35use wasm_timer::Delay;
36
37pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>;
38pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>;
39
40pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver)
41{
42 mpsc::channel(0)
43}
44
45#[derive(Debug)]
57pub(crate) struct Node<TTrans: Transport> {
58 addr: Multiaddr,
60 socket: NodeSocket<TTrans>,
62 transport: TTrans,
64 pub(crate) connection_messages: Vec<TelemetryPayload>,
66 pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
68}
69
70enum NodeSocket<TTrans: Transport> {
71 Connected(NodeSocketConnected<TTrans>),
73 Dialing(TTrans::Dial),
75 ReconnectNow,
77 WaitingReconnect(Delay),
79 Poisoned,
81}
82
83impl<TTrans: Transport> NodeSocket<TTrans> {
84 fn wait_reconnect() -> NodeSocket<TTrans> {
85 let random_delay = rand::thread_rng().gen_range(10..20);
86 let delay = Delay::new(Duration::from_secs(random_delay));
87 log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
88 NodeSocket::WaitingReconnect(delay)
89 }
90}
91
92struct NodeSocketConnected<TTrans: Transport> {
93 sink: TTrans::Output,
95 buf: Vec<Vec<u8>>,
97}
98
99impl<TTrans: Transport> Node<TTrans> {
100 pub(crate) fn new(
102 transport: TTrans,
103 addr: Multiaddr,
104 connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
105 telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
106 ) -> Self {
107 Node {
108 addr,
109 socket: NodeSocket::ReconnectNow,
110 transport,
111 connection_messages,
112 telemetry_connection_notifier,
113 }
114 }
115}
116
117impl<TTrans: Transport, TSinkErr> Node<TTrans>
118where
119 TTrans::Dial: Unpin,
120 TTrans::Output:
121 Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
122 TSinkErr: fmt::Debug,
123{
124 fn try_send_connection_messages(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 conn: &mut NodeSocketConnected<TTrans>,
130 ) -> Poll<Result<(), TSinkErr>> {
131 while let Some(item) = conn.buf.pop() {
132 if let Err(e) = conn.sink.start_send_unpin(item) {
133 return Poll::Ready(Err(e))
134 }
135 futures::ready!(conn.sink.poll_ready_unpin(cx))?;
136 }
137 Poll::Ready(Ok(()))
138 }
139}
140
141pub(crate) enum Infallible {}
142
143impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
144where
145 TTrans: Unpin,
146 TTrans::Dial: Unpin,
147 TTrans::Output:
148 Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
149 TSinkErr: fmt::Debug,
150{
151 type Error = Infallible;
152
153 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
154 let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
155 self.socket = loop {
156 match socket {
157 NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
158 Poll::Ready(Ok(())) => {
159 match self.as_mut().try_send_connection_messages(cx, &mut conn) {
160 Poll::Ready(Err(err)) => {
161 log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
162 socket = NodeSocket::wait_reconnect();
163 },
164 Poll::Ready(Ok(())) => {
165 self.socket = NodeSocket::Connected(conn);
166 return Poll::Ready(Ok(()))
167 },
168 Poll::Pending => {
169 self.socket = NodeSocket::Connected(conn);
170 return Poll::Pending
171 },
172 }
173 },
174 Poll::Ready(Err(err)) => {
175 log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
176 socket = NodeSocket::wait_reconnect();
177 },
178 Poll::Pending => {
179 self.socket = NodeSocket::Connected(conn);
180 return Poll::Pending
181 },
182 },
183 NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
184 Poll::Ready(Ok(sink)) => {
185 log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
186
187 {
188 let mut index = 0;
189 while index < self.telemetry_connection_notifier.len() {
190 let sender = &mut self.telemetry_connection_notifier[index];
191 if let Err(error) = sender.try_send(()) {
192 if !error.is_disconnected() {
193 log::debug!(target: "telemetry", "Failed to send a telemetry connection notification: {}", error);
194 } else {
195 self.telemetry_connection_notifier.swap_remove(index);
196 continue
197 }
198 }
199 index += 1;
200 }
201 }
202
203 let buf = self
204 .connection_messages
205 .iter()
206 .map(|json| {
207 let mut json = json.clone();
208 json.insert(
209 "ts".to_string(),
210 chrono::Local::now().to_rfc3339().into(),
211 );
212 json
213 })
214 .filter_map(|json| match serde_json::to_vec(&json) {
215 Ok(message) => Some(message),
216 Err(err) => {
217 log::error!(
218 target: "telemetry",
219 "An error occurred while generating new connection \
220 messages: {}",
221 err,
222 );
223 None
224 },
225 })
226 .collect();
227
228 socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
229 },
230 Poll::Pending => break NodeSocket::Dialing(s),
231 Poll::Ready(Err(err)) => {
232 log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
233 socket = NodeSocket::wait_reconnect();
234 },
235 },
236 NodeSocket::ReconnectNow => {
237 let addr = self.addr.clone();
238 match self
239 .transport
240 .dial(addr, DialOpts { role: Endpoint::Dialer, port_use: PortUse::New })
241 {
242 Ok(d) => {
243 log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
244 socket = NodeSocket::Dialing(d);
245 },
246 Err(err) => {
247 log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
248 socket = NodeSocket::wait_reconnect();
249 },
250 }
251 },
252 NodeSocket::WaitingReconnect(mut s) => {
253 if Future::poll(Pin::new(&mut s), cx).is_ready() {
254 socket = NodeSocket::ReconnectNow;
255 } else {
256 break NodeSocket::WaitingReconnect(s)
257 }
258 },
259 NodeSocket::Poisoned => {
260 log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
261 break NodeSocket::Poisoned
262 },
263 }
264 };
265
266 Poll::Ready(Ok(()))
270 }
271
272 fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
273 match &mut self.socket {
275 NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
276 Ok(data) => {
277 log::trace!(target: "telemetry", "Sending {} bytes", data.len());
278 let _ = conn.sink.start_send_unpin(data);
279 },
280 Err(err) => log::debug!(
281 target: "telemetry",
282 "Could not serialize payload: {}",
283 err,
284 ),
285 },
286 NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
288 NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
290 NodeSocket::WaitingReconnect(_) => {},
292 NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
294 }
295 Ok(())
296 }
297
298 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
299 match &mut self.socket {
300 NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
301 Poll::Ready(Err(e)) => {
302 log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
307 self.socket = NodeSocket::wait_reconnect();
308 Poll::Ready(Ok(()))
309 },
310 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
311 Poll::Pending => Poll::Pending,
312 },
313 _ => Poll::Ready(Ok(())),
314 }
315 }
316
317 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
318 match &mut self.socket {
319 NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
320 _ => Poll::Ready(Ok(())),
321 }
322 }
323}
324
325impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
326 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327 use NodeSocket::*;
328 f.write_str(match self {
329 Connected(_) => "Connected",
330 Dialing(_) => "Dialing",
331 ReconnectNow => "ReconnectNow",
332 WaitingReconnect(_) => "WaitingReconnect",
333 Poisoned => "Poisoned",
334 })
335 }
336}