mick_jaeger/
lib.rs

1// Copyright (C) 2020 Pierre Krieger
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// 	http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Jaeger client.
17//!
18//! # Overview
19//!
20//! In order to use this crate, you must be familiar with the concept of a *span*.
21//!
22//! A *span* covers a certain period of time, typically from the start of an operation to the end.
23//! In other words, you generally start a span at the beginning of a function or block, and end
24//! it at the end of the function/block.
25//!
26//! The purpose of this crate is to let you easily record spans and send them to a Jaeger server,
27//! which will aggerate them and let you visualize them.
28//!
29//! Each span belongs to a *trace*. A trace is identified by a 128 bits identifier. Jaeger lets
30//! you easily visualize all the spans belonging to the same trace, even if they come from
31//! different clients.
32//!
33//! As an example, imagine an HTTP frontend server receiving an HTTP request. It can generate a
34//! new trace id for this request, then pass this identifier around to other external processes
35//! that process parts of this request. These external processes, being all connected to the same
36//! Jaeger server, can report spans corresponding to this request.
37//!
38//! The easiest way to start a Jaeger server for quick experimentation is through Docker:
39//!
40//! ```notrust
41//! docker run -d --name jaeger \
42//!   -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
43//!   -p 5775:5775/udp \
44//!   -p 6831:6831/udp \
45//!   -p 6832:6832/udp \
46//!   -p 5778:5778 \
47//!   -p 16686:16686 \
48//!   -p 14268:14268 \
49//!   -p 14250:14250 \
50//!   -p 9411:9411 \
51//!   jaegertracing/all-in-one:1.20
52//! ```
53//!
54//! See also [the official documentation](https://www.jaegertracing.io/docs/1.20/getting-started/).
55//!
56//! # Usage: initialization
57//!
58//! First and foremost, call [`init`] in order to allocate all the necessary objects.
59//!
60//! This returns a combination of a [`TracesIn`] and [`TracesOut`]. Think of them as a sender and
61//! receiver. The [`TracesIn`] is used in order to send completed spans to the [`TracesOut`].
62//!
63//! Sending the traces to the server isn't covered by this library. The [`TracesOut`] must be
64//! polled using [`TracesOut::next`], and the data sent through UDP to the Jaeger server.
65//!
66//! ```
67//! # async fn foo() {
68//! let (traces_in, mut traces_out) = mick_jaeger::init(mick_jaeger::Config {
69//!     service_name: "demo".to_string(),
70//! });
71//!
72//! let udp_socket = async_std::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
73//! udp_socket.connect("127.0.0.1:6831").await.unwrap();
74//!
75//! async_std::task::spawn(async move {
76//!     loop {
77//!         let buf = traces_out.next().await;
78//!         udp_socket.send(&buf).await.unwrap();
79//!     }
80//! });
81//! # }
82//! ```
83//!
84//! If [`TracesOut::next`] isn't called often enough, in other words if the background task is too
85//! slow, the spans sent on the [`TracesIn`] will be automatically and silently discarded. This
86//! isn't expected to happen under normal circumstances.
87//!
88//! # Usage: spans
89//!
90//! Use the [`TracesIn::span`] method to create spans.
91//!
92//! The basic way to use this library is to use [`TracesIn::span`]. This creates a [`Span`] object
93//! that, when destroyed, will send a report destined to the [`TracesOut`].
94//!
95//! > **Note**: As long as a [`Span`] is alive, it will not be visible on the Jaeger server. You
96//! >           are encouraged to create short-lived spans and long-lived trace IDs.
97//!
98//! ```
99//! # use std::num::NonZeroU128;
100//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
101//! let _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
102//!
103//! // do something
104//!
105//! // The span is reported when it is destroyed at the end of the scope.
106//! ```
107//!
108//! > **Note**: Do not name your spans `_`, otherwise they will be destroyed immediately!
109//!
110//! It is possible, and encouraged, to add tags to spans.
111//!
112//! ```
113//! # use std::num::NonZeroU128;
114//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
115//! let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
116//! _span.add_string_tag("key", "value");
117//! ```
118//!
119//! Spans can have children:
120//!
121//! ```
122//! # use std::num::NonZeroU128;
123//! fn my_function(traces_in: &std::sync::Arc<mick_jaeger::TracesIn>) {
124//!     let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "foo");
125//!
126//!     // do something
127//!
128//!     {
129//!         let mut _span = _span.child("bar");
130//!         // something expensive
131//!     }
132//! }
133//! ```
134//!
135//! If an event happens at a precise point in time rather than over time, logs can also be added.
136//!
137//! ```
138//! # use std::num::NonZeroU128;
139//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
140//! let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
141//! _span.log().with_string("key", "value");
142//! ```
143//!
144//! # Differences with other crates
145//!
146//! While there exists other crates that let you interface with *Jaeger*, they are all
147//! overcomplicated according to the author of `mick_jaeger`. Some are lossy abstractions: by
148//! trying to be easy to use, they hide important details (such as the trace ID), which causes
149//! more confusion than it helps.
150//!
151//! `mick_jaeger` tries to be simple. The fact that it doesn't handle sending to the server
152//! removes a lot of opinionated decisions concerning networking libraries and threading.
153//!
154//! `mick_jaeger` could theoretically be `no_std`-compatible (after a few tweaks), but can't
155//! because at the time of writing there is no no-std-compatible library for the *thrift*
156//! protocol.
157//!
158
159use futures::{channel::mpsc, prelude::*, stream::FusedStream as _};
160use protocol::agent::TAgentSyncClient as _;
161use std::{
162    convert::TryFrom as _,
163    mem,
164    num::{NonZeroU128, NonZeroU64},
165    sync::{Arc, Mutex},
166    time::{Duration, SystemTime},
167};
168use thrift::transport::TIoChannel as _;
169
170mod glue;
171mod protocol;
172
173/// Configuration to pass to [`init`].
174pub struct Config {
175    /// Name of the service. Reported to the Jaeger server.
176    pub service_name: String,
177}
178
179pub fn init(config: Config) -> (Arc<TracesIn>, TracesOut) {
180    let (tx, rx) = mpsc::channel(256);
181    let (buffer, write) = glue::TBufferChannel::with_capacity(512).split().unwrap();
182    let client = protocol::agent::AgentSyncClient::new(
183        thrift::protocol::TCompactInputProtocol::new(glue::TNoopChannel),
184        thrift::protocol::TCompactOutputProtocol::new(write),
185    );
186    let traces_out = TracesOut {
187        rx: rx.ready_chunks(64),
188        process: protocol::jaeger::Process {
189            service_name: config.service_name,
190            tags: Some(vec![]),
191        },
192        buffer,
193        client,
194    };
195    let traces_in = TracesIn {
196        sender: Mutex::new(tx),
197    };
198    (Arc::new(traces_in), traces_out)
199}
200
201pub struct TracesIn {
202    sender: Mutex<mpsc::Sender<protocol::jaeger::Span>>,
203}
204
205impl TracesIn {
206    /// Builds a new [`Span`].
207    ///
208    /// Must be passed a `trace_id` that is used to group spans together. Its meaning is
209    /// arbitrary.
210    pub fn span(
211        self: &Arc<Self>,
212        trace_id: NonZeroU128,
213        operation_name: impl Into<String>,
214    ) -> Span {
215        self.span_with_id_and_parent(
216            trace_id,
217            NonZeroU64::new(rand::random()).unwrap(),
218            None,
219            operation_name,
220        )
221    }
222
223    /// Builds a new [`Span`], using a specific span ID.
224    ///
225    /// Use this method when it is required to know the ID of a span, for example when building
226    /// links between spans across different services.
227    pub fn span_with_id(
228        self: &Arc<Self>,
229        trace_id: NonZeroU128,
230        span_id: NonZeroU64,
231        operation_name: impl Into<String>,
232    ) -> Span {
233        self.span_with_id_and_parent(trace_id, span_id, None, operation_name)
234    }
235
236    /// Builds a new [`Span`], whose parent uses a specific span ID.
237    ///
238    /// A `parent_id` equal to 0 means "no parent".
239    pub fn span_with_parent(
240        self: &Arc<Self>,
241        trace_id: NonZeroU128,
242        parent_id: Option<NonZeroU64>,
243        operation_name: impl Into<String>,
244    ) -> Span {
245        self.span_with_id_and_parent(
246            trace_id,
247            NonZeroU64::new(rand::random()).unwrap(),
248            parent_id,
249            operation_name,
250        )
251    }
252
253    /// Builds a new [`Span`], with a specific ID whose parent uses a specific span ID.
254    ///
255    /// A `parent_id` equal to 0 means "no parent".
256    pub fn span_with_id_and_parent(
257        self: &Arc<Self>,
258        trace_id: NonZeroU128,
259        span_id: NonZeroU64,
260        parent_id: Option<NonZeroU64>,
261        operation_name: impl Into<String>,
262    ) -> Span {
263        Span {
264            traces_in: self.clone(),
265            trace_id,
266            span_id,
267            parent_span_id: parent_id.map(|id| id.get()).unwrap_or(0),
268            operation_name: operation_name.into(),
269            references: Vec::new(),
270            start_time: SystemTime::now(),
271            tags: base_tags(),
272            logs: Vec::new(),
273        }
274    }
275}
276
277pub struct Span {
278    traces_in: Arc<TracesIn>,
279    trace_id: NonZeroU128,
280    span_id: NonZeroU64,
281    /// [`Span::span_id`] of the parent, or `0` if no parent.
282    parent_span_id: u64,
283    operation_name: String,
284    references: Vec<protocol::jaeger::SpanRef>,
285    start_time: SystemTime,
286    tags: Vec<protocol::jaeger::Tag>,
287    logs: Vec<protocol::jaeger::Log>,
288}
289
290impl Span {
291    /// Creates a new [`Span`], child of this one.
292    ///
293    /// > **Note**: There is no need to keep the parent [`Span`] alive while the children is
294    /// >           alive. The protocol allows for parents that don't completely overlap their
295    /// >           children.
296    pub fn child(&self, operation_name: impl Into<String>) -> Span {
297        self.child_with_id(NonZeroU64::new(rand::random()).unwrap(), operation_name)
298    }
299
300    /// Creates a new [`Span`], child of this one, with a specific ID.
301    pub fn child_with_id(&self, span_id: NonZeroU64, operation_name: impl Into<String>) -> Span {
302        Span {
303            traces_in: self.traces_in.clone(),
304            trace_id: self.trace_id,
305            span_id,
306            parent_span_id: self.span_id.get(),
307            operation_name: operation_name.into(),
308            references: Vec::new(),
309            start_time: SystemTime::now(),
310            tags: base_tags(),
311            logs: Vec::new(),
312        }
313    }
314
315    /// Returns the trace ID originally passed when building this span.
316    pub fn trace_id(&self) -> NonZeroU128 {
317        self.trace_id
318    }
319
320    /// Returns the span ID originally passed when building this span.
321    pub fn span_id(&self) -> NonZeroU64 {
322        self.span_id
323    }
324
325    /// Add a log entry to this span.
326    pub fn log(&mut self) -> Log {
327        let timestamp = i64::try_from(
328            SystemTime::now()
329                .duration_since(SystemTime::UNIX_EPOCH)
330                .unwrap_or(Duration::new(0, 0))
331                .as_micros(),
332        )
333        .unwrap_or(i64::max_value());
334
335        Log {
336            span: self,
337            timestamp,
338            fields: Vec::new(),
339        }
340    }
341
342    /// Adds a "followfrom" relation ship towards another span of (potentially) another trace.
343    pub fn add_follows_from(&mut self, other: &Span) {
344        self.add_follows_from_raw(other.trace_id(), other.span_id())
345    }
346
347    /// Adds a "followfrom" relation ship towards another span of (potentially) another trace.
348    pub fn add_follows_from_raw(&mut self, trace_id: NonZeroU128, span_id: NonZeroU64) {
349        self.references.push(protocol::jaeger::SpanRef {
350            ref_type: protocol::jaeger::SpanRefType::FollowsFrom,
351            trace_id_low: i64::from_be_bytes(
352                <[u8; 8]>::try_from(&trace_id.get().to_be_bytes()[8..]).unwrap(),
353            ),
354            trace_id_high: i64::from_be_bytes(
355                <[u8; 8]>::try_from(&trace_id.get().to_be_bytes()[..8]).unwrap(),
356            ),
357            span_id: i64::from_ne_bytes(span_id.get().to_ne_bytes()),
358        });
359    }
360
361    /// Add a new key-value tag to this span.
362    pub fn add_string_tag(&mut self, key: &str, value: &str) {
363        // TODO: check for duplicates?
364        self.tags.push(string_tag(key, value));
365    }
366
367    /// Add a new key-value tag to this span.
368    pub fn add_int_tag(&mut self, key: &str, value: i64) {
369        // TODO: check for duplicates?
370        self.tags.push(int_tag(key, value));
371    }
372
373    /// Modifies the start time of this span.
374    ///
375    /// > **Note**: This method can be useful in order to generate a span with a `trace_id` that
376    /// >           is only know after the span should have started. To do so, call
377    /// >           [`StartTime::now`] when the span should start, create the span once you know
378    /// >           the ̀`trace_id`, then call this method.
379    pub fn override_start_time(&mut self, start_time: StartTime) {
380        self.start_time = start_time.0;
381    }
382
383    /// Modifies the start time of this span.
384    ///
385    /// > **Note**: This method can be useful in order to generate a span with a `trace_id` that
386    /// >           is only know after the span should have started. To do so, call
387    /// >           [`StartTime::now`] when the span should start, create the span once you know
388    /// >           the ̀`trace_id`, then call this method.
389    pub fn with_start_time_override(mut self, start_time: StartTime) -> Self {
390        self.override_start_time(start_time);
391        self
392    }
393}
394
395impl Drop for Span {
396    fn drop(&mut self) {
397        let end_time = SystemTime::now();
398
399        // Try to send the span, but don't try too hard. If the channel is full, drop the tracing
400        // information.
401        let _ = self
402            .traces_in
403            .sender
404            .lock()
405            .unwrap()
406            .try_send(protocol::jaeger::Span {
407                trace_id_low: i64::from_be_bytes(
408                    <[u8; 8]>::try_from(&self.trace_id.get().to_be_bytes()[8..]).unwrap(),
409                ),
410                trace_id_high: i64::from_be_bytes(
411                    <[u8; 8]>::try_from(&self.trace_id.get().to_be_bytes()[..8]).unwrap(),
412                ),
413                span_id: i64::from_ne_bytes(self.span_id.get().to_ne_bytes()),
414                parent_span_id: i64::from_ne_bytes(self.parent_span_id.to_ne_bytes()),
415                operation_name: mem::take(&mut self.operation_name),
416                references: if self.references.is_empty() {
417                    None
418                } else {
419                    Some(mem::take(&mut self.references))
420                },
421                flags: 0,
422                start_time: i64::try_from(
423                    self.start_time
424                        .duration_since(SystemTime::UNIX_EPOCH)
425                        .unwrap_or_else(|_| Duration::new(0, 0))
426                        .as_micros(),
427                )
428                .unwrap_or(i64::max_value()),
429                duration: i64::try_from(
430                    end_time
431                        .duration_since(self.start_time)
432                        .unwrap_or_else(|_| Duration::new(0, 0))
433                        .as_micros(),
434                )
435                .unwrap_or(i64::max_value()),
436                tags: Some(mem::take(&mut self.tags)),
437                logs: if self.logs.is_empty() {
438                    None
439                } else {
440                    Some(mem::take(&mut self.logs))
441                },
442            });
443    }
444}
445
446pub struct StartTime(SystemTime);
447
448impl StartTime {
449    pub fn now() -> Self {
450        StartTime(SystemTime::now())
451    }
452}
453
454pub struct Log<'a> {
455    span: &'a mut Span,
456    timestamp: i64,
457    fields: Vec<protocol::jaeger::Tag>,
458}
459
460impl<'a> Log<'a> {
461    /// Add a new key-value tag to this log.
462    pub fn with_string(mut self, key: &str, value: &str) -> Self {
463        self.fields.push(string_tag(key, value));
464        self
465    }
466
467    /// Add a new key-value tag to this log.
468    pub fn with_int(mut self, key: &str, value: i64) -> Self {
469        self.fields.push(int_tag(key, value));
470        self
471    }
472
473    // TODO: other methods
474}
475
476impl<'a> Drop for Log<'a> {
477    fn drop(&mut self) {
478        self.span.logs.push(protocol::jaeger::Log {
479            timestamp: self.timestamp,
480            fields: mem::replace(&mut self.fields, Vec::new()),
481        });
482    }
483}
484
485fn int_tag(key: &str, value: i64) -> protocol::jaeger::Tag {
486    protocol::jaeger::Tag {
487        key: key.to_string(),
488        v_type: protocol::jaeger::TagType::Long,
489        v_long: Some(value),
490        v_str: None,
491        v_double: None,
492        v_bool: None,
493        v_binary: None,
494    }
495}
496
497fn string_tag(key: &str, value: &str) -> protocol::jaeger::Tag {
498    protocol::jaeger::Tag {
499        key: key.to_string(),
500        v_type: protocol::jaeger::TagType::String,
501        v_str: Some(value.to_string()),
502        v_long: None,
503        v_double: None,
504        v_bool: None,
505        v_binary: None,
506    }
507}
508
509fn base_tags() -> Vec<protocol::jaeger::Tag> {
510    vec![
511        string_tag("otel.library.name", env!("CARGO_PKG_NAME")),
512        string_tag("otel.library.version", env!("CARGO_PKG_VERSION")),
513    ]
514}
515
516/// Receiving side for spans.
517///
518/// This object must be processed in order to send traces to the UDP server.
519pub struct TracesOut {
520    rx: stream::ReadyChunks<mpsc::Receiver<protocol::jaeger::Span>>,
521    process: protocol::jaeger::Process,
522    buffer: thrift::transport::ReadHalf<glue::TBufferChannel>,
523    client: protocol::agent::AgentSyncClient<
524        thrift::protocol::TCompactInputProtocol<glue::TNoopChannel>,
525        thrift::protocol::TCompactOutputProtocol<
526            thrift::transport::WriteHalf<glue::TBufferChannel>,
527        >,
528    >,
529}
530
531impl TracesOut {
532    /// Returns the next packet of data to send on the UDP socket.
533    pub async fn next(&mut self) -> Vec<u8> {
534        if self.rx.is_terminated() {
535            loop {
536                futures::pending!()
537            }
538        }
539
540        let spans = self.rx.select_next_some().await;
541
542        self.client
543            .emit_batch(protocol::jaeger::Batch {
544                spans,
545                process: self.process.clone(),
546            })
547            .unwrap();
548        self.buffer.take_bytes()
549    }
550
551    /// Add a new key-value tag to the process.
552    pub fn add_string_tag(&mut self, key: &str, value: &str) {
553        // TODO: check for duplicates?
554        self.process
555            .tags
556            .as_mut()
557            .unwrap()
558            .push(string_tag(key, value));
559    }
560
561    /// Add a new key-value tag to the process.
562    pub fn add_int_tag(&mut self, key: &str, value: i64) {
563        // TODO: check for duplicates?
564        self.process
565            .tags
566            .as_mut()
567            .unwrap()
568            .push(int_tag(key, value));
569    }
570}