1use crate::event::Event;
35
36use futures::{prelude::*, ready, stream::FusedStream};
37use log::{debug, error};
38use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
39use std::{
40 backtrace::Backtrace,
41 cell::RefCell,
42 fmt,
43 pin::Pin,
44 task::{Context, Poll},
45};
46
47pub const LOG_TARGET: &str = "sub-libp2p::out_events";
49
50pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
55 let (tx, rx) = async_channel::unbounded();
56 let tx = Sender {
57 inner: tx,
58 name,
59 queue_size_warning,
60 warning_fired: SenderWarningState::NotFired,
61 creation_backtrace: Backtrace::force_capture(),
62 metrics: None,
63 };
64 let rx = Receiver { inner: rx, name, metrics: None };
65 (tx, rx)
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70enum SenderWarningState {
71 NotFired,
73 FiredFull,
75 FiredFree,
77}
78
79pub struct Sender {
87 inner: async_channel::Sender<Event>,
88 name: &'static str,
90 queue_size_warning: usize,
92 warning_fired: SenderWarningState,
95 creation_backtrace: Backtrace,
97 metrics: Option<Metrics>,
100}
101
102impl fmt::Debug for Sender {
103 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104 f.debug_tuple("Sender").finish()
105 }
106}
107
108impl Drop for Sender {
109 fn drop(&mut self) {
110 if let Some(metrics) = self.metrics.as_ref() {
111 metrics.num_channels.with_label_values(&[self.name]).dec();
112 }
113 }
114}
115
116pub struct Receiver {
118 inner: async_channel::Receiver<Event>,
119 name: &'static str,
120 metrics: Option<Metrics>,
123}
124
125impl Stream for Receiver {
126 type Item = Event;
127
128 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
129 if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
130 if let Some(metrics) = &self.metrics {
131 metrics.event_out(&ev, self.name);
132 }
133 Poll::Ready(Some(ev))
134 } else {
135 Poll::Ready(None)
136 }
137 }
138}
139
140impl fmt::Debug for Receiver {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.debug_tuple("Receiver").finish()
143 }
144}
145
146impl Drop for Receiver {
147 fn drop(&mut self) {
148 if !self.inner.is_terminated() {
149 while let Some(Some(_)) = self.next().now_or_never() {}
151 }
152 }
153}
154
155pub struct OutChannels {
157 event_streams: Vec<Sender>,
158 metrics: Option<Metrics>,
161}
162
163impl OutChannels {
164 pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
166 let metrics =
167 if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };
168
169 Ok(Self { event_streams: Vec::new(), metrics })
170 }
171
172 pub fn push(&mut self, mut sender: Sender) {
174 debug_assert!(sender.metrics.is_none());
175 sender.metrics = self.metrics.clone();
176
177 if let Some(metrics) = &self.metrics {
178 metrics.num_channels.with_label_values(&[sender.name]).inc();
179 }
180
181 self.event_streams.push(sender);
182 }
183
184 pub fn send(&mut self, event: Event) {
186 self.event_streams.retain_mut(|sender| {
187 let current_pending = sender.inner.len();
188 if current_pending >= sender.queue_size_warning {
189 if sender.warning_fired == SenderWarningState::NotFired {
190 error!(
191 "The number of unprocessed events in channel `{}` exceeded {}.\n\
192 The channel was created at:\n{:}\n
193 The last event was sent from:\n{:}",
194 sender.name,
195 sender.queue_size_warning,
196 sender.creation_backtrace,
197 Backtrace::force_capture(),
198 );
199 } else if sender.warning_fired == SenderWarningState::FiredFree {
200 debug!(
202 target: LOG_TARGET,
203 "Channel `{}` is overflowed again. Number of events: {}",
204 sender.name, current_pending
205 );
206 }
207 sender.warning_fired = SenderWarningState::FiredFull;
208 } else if sender.warning_fired == SenderWarningState::FiredFull &&
209 current_pending < sender.queue_size_warning.wrapping_div(2)
210 {
211 sender.warning_fired = SenderWarningState::FiredFree;
212 debug!(
213 target: LOG_TARGET,
214 "Channel `{}` is no longer overflowed. Number of events: {}",
215 sender.name, current_pending
216 );
217 }
218
219 sender.inner.try_send(event.clone()).is_ok()
220 });
221
222 if let Some(metrics) = &self.metrics {
223 for ev in &self.event_streams {
224 metrics.event_in(&event, ev.name);
225 }
226 }
227 }
228}
229
230impl fmt::Debug for OutChannels {
231 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232 f.debug_struct("OutChannels")
233 .field("num_channels", &self.event_streams.len())
234 .finish()
235 }
236}
237
238#[derive(Clone)]
239struct Metrics {
240 events_total: CounterVec<U64>,
242 notifications_sizes: CounterVec<U64>,
243 num_channels: GaugeVec<U64>,
244}
245
246thread_local! {
247 static LABEL_BUFFER: RefCell<String> = RefCell::new(String::new());
248}
249
250fn format_label(prefix: &str, protocol: &str, callback: impl FnOnce(&str)) {
251 LABEL_BUFFER.with(|label_buffer| {
252 let mut label_buffer = label_buffer.borrow_mut();
253 label_buffer.clear();
254 label_buffer.reserve(prefix.len() + protocol.len() + 2);
255 label_buffer.push_str(prefix);
256 label_buffer.push('"');
257 label_buffer.push_str(protocol);
258 label_buffer.push('"');
259 callback(&label_buffer);
260 });
261}
262
263impl Metrics {
264 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
265 Ok(Self {
266 events_total: register(CounterVec::new(
267 Opts::new(
268 "substrate_sub_libp2p_out_events_events_total",
269 "Number of broadcast network events that have been sent or received across all \
270 channels"
271 ),
272 &["event_name", "action", "name"]
273 )?, registry)?,
274 notifications_sizes: register(CounterVec::new(
275 Opts::new(
276 "substrate_sub_libp2p_out_events_notifications_sizes",
277 "Size of notification events that have been sent or received across all \
278 channels"
279 ),
280 &["protocol", "action", "name"]
281 )?, registry)?,
282 num_channels: register(GaugeVec::new(
283 Opts::new(
284 "substrate_sub_libp2p_out_events_num_channels",
285 "Number of internal active channels that broadcast network events",
286 ),
287 &["name"]
288 )?, registry)?,
289 })
290 }
291
292 fn event_in(&self, event: &Event, name: &str) {
293 match event {
294 Event::Dht(_) => {
295 self.events_total.with_label_values(&["dht", "sent", name]).inc();
296 },
297 Event::NotificationStreamOpened { protocol, .. } => {
298 format_label("notif-open-", protocol, |protocol_label| {
299 self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
300 });
301 },
302 Event::NotificationStreamClosed { protocol, .. } => {
303 format_label("notif-closed-", protocol, |protocol_label| {
304 self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
305 });
306 },
307 Event::NotificationsReceived { messages, .. } =>
308 for (protocol, message) in messages {
309 format_label("notif-", protocol, |protocol_label| {
310 self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
311 });
312 self.notifications_sizes
313 .with_label_values(&[protocol, "sent", name])
314 .inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
315 },
316 }
317 }
318
319 fn event_out(&self, event: &Event, name: &str) {
320 match event {
321 Event::Dht(_) => {
322 self.events_total.with_label_values(&["dht", "received", name]).inc();
323 },
324 Event::NotificationStreamOpened { protocol, .. } => {
325 format_label("notif-open-", protocol, |protocol_label| {
326 self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
327 });
328 },
329 Event::NotificationStreamClosed { protocol, .. } => {
330 format_label("notif-closed-", protocol, |protocol_label| {
331 self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
332 });
333 },
334 Event::NotificationsReceived { messages, .. } =>
335 for (protocol, message) in messages {
336 format_label("notif-", protocol, |protocol_label| {
337 self.events_total
338 .with_label_values(&[protocol_label, "received", name])
339 .inc();
340 });
341 self.notifications_sizes
342 .with_label_values(&[protocol, "received", name])
343 .inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
344 },
345 }
346 }
347}