1use crate::TelemetryPayload;
20use futures::{channel::mpsc, prelude::*};
21use libp2p::{core::transport::Transport, Multiaddr};
22use rand::Rng as _;
23use std::{
24 fmt, mem,
25 pin::Pin,
26 task::{Context, Poll},
27 time::Duration,
28};
29use wasm_timer::Delay;
30
31pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>;
32pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>;
33
34pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver)
35{
36 mpsc::channel(0)
37}
38
39#[derive(Debug)]
51pub(crate) struct Node<TTrans: Transport> {
52 addr: Multiaddr,
54 socket: NodeSocket<TTrans>,
56 transport: TTrans,
58 pub(crate) connection_messages: Vec<TelemetryPayload>,
60 pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
62}
63
64enum NodeSocket<TTrans: Transport> {
65 Connected(NodeSocketConnected<TTrans>),
67 Dialing(TTrans::Dial),
69 ReconnectNow,
71 WaitingReconnect(Delay),
73 Poisoned,
75}
76
77impl<TTrans: Transport> NodeSocket<TTrans> {
78 fn wait_reconnect() -> NodeSocket<TTrans> {
79 let random_delay = rand::thread_rng().gen_range(10..20);
80 let delay = Delay::new(Duration::from_secs(random_delay));
81 log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
82 NodeSocket::WaitingReconnect(delay)
83 }
84}
85
86struct NodeSocketConnected<TTrans: Transport> {
87 sink: TTrans::Output,
89 buf: Vec<Vec<u8>>,
91}
92
93impl<TTrans: Transport> Node<TTrans> {
94 pub(crate) fn new(
96 transport: TTrans,
97 addr: Multiaddr,
98 connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
99 telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
100 ) -> Self {
101 Node {
102 addr,
103 socket: NodeSocket::ReconnectNow,
104 transport,
105 connection_messages,
106 telemetry_connection_notifier,
107 }
108 }
109}
110
111impl<TTrans: Transport, TSinkErr> Node<TTrans>
112where
113 TTrans::Dial: Unpin,
114 TTrans::Output:
115 Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
116 TSinkErr: fmt::Debug,
117{
118 fn try_send_connection_messages(
121 self: Pin<&mut Self>,
122 cx: &mut Context<'_>,
123 conn: &mut NodeSocketConnected<TTrans>,
124 ) -> Poll<Result<(), TSinkErr>> {
125 while let Some(item) = conn.buf.pop() {
126 if let Err(e) = conn.sink.start_send_unpin(item) {
127 return Poll::Ready(Err(e))
128 }
129 futures::ready!(conn.sink.poll_ready_unpin(cx))?;
130 }
131 Poll::Ready(Ok(()))
132 }
133}
134
135pub(crate) enum Infallible {}
136
137impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
138where
139 TTrans: Unpin,
140 TTrans::Dial: Unpin,
141 TTrans::Output:
142 Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
143 TSinkErr: fmt::Debug,
144{
145 type Error = Infallible;
146
147 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
148 let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
149 self.socket = loop {
150 match socket {
151 NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
152 Poll::Ready(Ok(())) => {
153 match self.as_mut().try_send_connection_messages(cx, &mut conn) {
154 Poll::Ready(Err(err)) => {
155 log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
156 socket = NodeSocket::wait_reconnect();
157 },
158 Poll::Ready(Ok(())) => {
159 self.socket = NodeSocket::Connected(conn);
160 return Poll::Ready(Ok(()))
161 },
162 Poll::Pending => {
163 self.socket = NodeSocket::Connected(conn);
164 return Poll::Pending
165 },
166 }
167 },
168 Poll::Ready(Err(err)) => {
169 log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
170 socket = NodeSocket::wait_reconnect();
171 },
172 Poll::Pending => {
173 self.socket = NodeSocket::Connected(conn);
174 return Poll::Pending
175 },
176 },
177 NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
178 Poll::Ready(Ok(sink)) => {
179 log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
180
181 {
182 let mut index = 0;
183 while index < self.telemetry_connection_notifier.len() {
184 let sender = &mut self.telemetry_connection_notifier[index];
185 if let Err(error) = sender.try_send(()) {
186 if !error.is_disconnected() {
187 log::debug!(target: "telemetry", "Failed to send a telemetry connection notification: {}", error);
188 } else {
189 self.telemetry_connection_notifier.swap_remove(index);
190 continue
191 }
192 }
193 index += 1;
194 }
195 }
196
197 let buf = self
198 .connection_messages
199 .iter()
200 .map(|json| {
201 let mut json = json.clone();
202 json.insert(
203 "ts".to_string(),
204 chrono::Local::now().to_rfc3339().into(),
205 );
206 json
207 })
208 .filter_map(|json| match serde_json::to_vec(&json) {
209 Ok(message) => Some(message),
210 Err(err) => {
211 log::error!(
212 target: "telemetry",
213 "An error occurred while generating new connection \
214 messages: {}",
215 err,
216 );
217 None
218 },
219 })
220 .collect();
221
222 socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
223 },
224 Poll::Pending => break NodeSocket::Dialing(s),
225 Poll::Ready(Err(err)) => {
226 log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
227 socket = NodeSocket::wait_reconnect();
228 },
229 },
230 NodeSocket::ReconnectNow => {
231 let addr = self.addr.clone();
232 match self.transport.dial(addr) {
233 Ok(d) => {
234 log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
235 socket = NodeSocket::Dialing(d);
236 },
237 Err(err) => {
238 log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
239 socket = NodeSocket::wait_reconnect();
240 },
241 }
242 },
243 NodeSocket::WaitingReconnect(mut s) => {
244 if Future::poll(Pin::new(&mut s), cx).is_ready() {
245 socket = NodeSocket::ReconnectNow;
246 } else {
247 break NodeSocket::WaitingReconnect(s)
248 }
249 },
250 NodeSocket::Poisoned => {
251 log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
252 break NodeSocket::Poisoned
253 },
254 }
255 };
256
257 Poll::Ready(Ok(()))
261 }
262
263 fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
264 match &mut self.socket {
266 NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
267 Ok(data) => {
268 log::trace!(target: "telemetry", "Sending {} bytes", data.len());
269 let _ = conn.sink.start_send_unpin(data);
270 },
271 Err(err) => log::debug!(
272 target: "telemetry",
273 "Could not serialize payload: {}",
274 err,
275 ),
276 },
277 NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
279 NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
281 NodeSocket::WaitingReconnect(_) => {},
283 NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
285 }
286 Ok(())
287 }
288
289 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290 match &mut self.socket {
291 NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
292 Poll::Ready(Err(e)) => {
293 log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
298 self.socket = NodeSocket::wait_reconnect();
299 Poll::Ready(Ok(()))
300 },
301 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
302 Poll::Pending => Poll::Pending,
303 },
304 _ => Poll::Ready(Ok(())),
305 }
306 }
307
308 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
309 match &mut self.socket {
310 NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
311 _ => Poll::Ready(Ok(())),
312 }
313 }
314}
315
316impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
317 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
318 use NodeSocket::*;
319 f.write_str(match self {
320 Connected(_) => "Connected",
321 Dialing(_) => "Dialing",
322 ReconnectNow => "ReconnectNow",
323 WaitingReconnect(_) => "WaitingReconnect",
324 Poisoned => "Poisoned",
325 })
326 }
327}