mick_jaeger/lib.rs
1// Copyright (C) 2020 Pierre Krieger
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Jaeger client.
17//!
18//! # Overview
19//!
20//! In order to use this crate, you must be familiar with the concept of a *span*.
21//!
22//! A *span* covers a certain period of time, typically from the start of an operation to the end.
23//! In other words, you generally start a span at the beginning of a function or block, and end
24//! it at the end of the function/block.
25//!
26//! The purpose of this crate is to let you easily record spans and send them to a Jaeger server,
27//! which will aggerate them and let you visualize them.
28//!
29//! Each span belongs to a *trace*. A trace is identified by a 128 bits identifier. Jaeger lets
30//! you easily visualize all the spans belonging to the same trace, even if they come from
31//! different clients.
32//!
33//! As an example, imagine an HTTP frontend server receiving an HTTP request. It can generate a
34//! new trace id for this request, then pass this identifier around to other external processes
35//! that process parts of this request. These external processes, being all connected to the same
36//! Jaeger server, can report spans corresponding to this request.
37//!
38//! The easiest way to start a Jaeger server for quick experimentation is through Docker:
39//!
40//! ```notrust
41//! docker run -d --name jaeger \
42//! -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
43//! -p 5775:5775/udp \
44//! -p 6831:6831/udp \
45//! -p 6832:6832/udp \
46//! -p 5778:5778 \
47//! -p 16686:16686 \
48//! -p 14268:14268 \
49//! -p 14250:14250 \
50//! -p 9411:9411 \
51//! jaegertracing/all-in-one:1.20
52//! ```
53//!
54//! See also [the official documentation](https://www.jaegertracing.io/docs/1.20/getting-started/).
55//!
56//! # Usage: initialization
57//!
58//! First and foremost, call [`init`] in order to allocate all the necessary objects.
59//!
60//! This returns a combination of a [`TracesIn`] and [`TracesOut`]. Think of them as a sender and
61//! receiver. The [`TracesIn`] is used in order to send completed spans to the [`TracesOut`].
62//!
63//! Sending the traces to the server isn't covered by this library. The [`TracesOut`] must be
64//! polled using [`TracesOut::next`], and the data sent through UDP to the Jaeger server.
65//!
66//! ```
67//! # async fn foo() {
68//! let (traces_in, mut traces_out) = mick_jaeger::init(mick_jaeger::Config {
69//! service_name: "demo".to_string(),
70//! });
71//!
72//! let udp_socket = async_std::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
73//! udp_socket.connect("127.0.0.1:6831").await.unwrap();
74//!
75//! async_std::task::spawn(async move {
76//! loop {
77//! let buf = traces_out.next().await;
78//! udp_socket.send(&buf).await.unwrap();
79//! }
80//! });
81//! # }
82//! ```
83//!
84//! If [`TracesOut::next`] isn't called often enough, in other words if the background task is too
85//! slow, the spans sent on the [`TracesIn`] will be automatically and silently discarded. This
86//! isn't expected to happen under normal circumstances.
87//!
88//! # Usage: spans
89//!
90//! Use the [`TracesIn::span`] method to create spans.
91//!
92//! The basic way to use this library is to use [`TracesIn::span`]. This creates a [`Span`] object
93//! that, when destroyed, will send a report destined to the [`TracesOut`].
94//!
95//! > **Note**: As long as a [`Span`] is alive, it will not be visible on the Jaeger server. You
96//! > are encouraged to create short-lived spans and long-lived trace IDs.
97//!
98//! ```
99//! # use std::num::NonZeroU128;
100//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
101//! let _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
102//!
103//! // do something
104//!
105//! // The span is reported when it is destroyed at the end of the scope.
106//! ```
107//!
108//! > **Note**: Do not name your spans `_`, otherwise they will be destroyed immediately!
109//!
110//! It is possible, and encouraged, to add tags to spans.
111//!
112//! ```
113//! # use std::num::NonZeroU128;
114//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
115//! let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
116//! _span.add_string_tag("key", "value");
117//! ```
118//!
119//! Spans can have children:
120//!
121//! ```
122//! # use std::num::NonZeroU128;
123//! fn my_function(traces_in: &std::sync::Arc<mick_jaeger::TracesIn>) {
124//! let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "foo");
125//!
126//! // do something
127//!
128//! {
129//! let mut _span = _span.child("bar");
130//! // something expensive
131//! }
132//! }
133//! ```
134//!
135//! If an event happens at a precise point in time rather than over time, logs can also be added.
136//!
137//! ```
138//! # use std::num::NonZeroU128;
139//! # let mut traces_in: std::sync::Arc<mick_jaeger::TracesIn> = return;
140//! let mut _span = traces_in.span(NonZeroU128::new(43).unwrap(), "something");
141//! _span.log().with_string("key", "value");
142//! ```
143//!
144//! # Differences with other crates
145//!
146//! While there exists other crates that let you interface with *Jaeger*, they are all
147//! overcomplicated according to the author of `mick_jaeger`. Some are lossy abstractions: by
148//! trying to be easy to use, they hide important details (such as the trace ID), which causes
149//! more confusion than it helps.
150//!
151//! `mick_jaeger` tries to be simple. The fact that it doesn't handle sending to the server
152//! removes a lot of opinionated decisions concerning networking libraries and threading.
153//!
154//! `mick_jaeger` could theoretically be `no_std`-compatible (after a few tweaks), but can't
155//! because at the time of writing there is no no-std-compatible library for the *thrift*
156//! protocol.
157//!
158
159use futures::{channel::mpsc, prelude::*, stream::FusedStream as _};
160use protocol::agent::TAgentSyncClient as _;
161use std::{
162 convert::TryFrom as _,
163 mem,
164 num::{NonZeroU128, NonZeroU64},
165 sync::{Arc, Mutex},
166 time::{Duration, SystemTime},
167};
168use thrift::transport::TIoChannel as _;
169
170mod glue;
171mod protocol;
172
173/// Configuration to pass to [`init`].
174pub struct Config {
175 /// Name of the service. Reported to the Jaeger server.
176 pub service_name: String,
177}
178
179pub fn init(config: Config) -> (Arc<TracesIn>, TracesOut) {
180 let (tx, rx) = mpsc::channel(256);
181 let (buffer, write) = glue::TBufferChannel::with_capacity(512).split().unwrap();
182 let client = protocol::agent::AgentSyncClient::new(
183 thrift::protocol::TCompactInputProtocol::new(glue::TNoopChannel),
184 thrift::protocol::TCompactOutputProtocol::new(write),
185 );
186 let traces_out = TracesOut {
187 rx: rx.ready_chunks(64),
188 process: protocol::jaeger::Process {
189 service_name: config.service_name,
190 tags: Some(vec![]),
191 },
192 buffer,
193 client,
194 };
195 let traces_in = TracesIn {
196 sender: Mutex::new(tx),
197 };
198 (Arc::new(traces_in), traces_out)
199}
200
201pub struct TracesIn {
202 sender: Mutex<mpsc::Sender<protocol::jaeger::Span>>,
203}
204
205impl TracesIn {
206 /// Builds a new [`Span`].
207 ///
208 /// Must be passed a `trace_id` that is used to group spans together. Its meaning is
209 /// arbitrary.
210 pub fn span(
211 self: &Arc<Self>,
212 trace_id: NonZeroU128,
213 operation_name: impl Into<String>,
214 ) -> Span {
215 self.span_with_id_and_parent(
216 trace_id,
217 NonZeroU64::new(rand::random()).unwrap(),
218 None,
219 operation_name,
220 )
221 }
222
223 /// Builds a new [`Span`], using a specific span ID.
224 ///
225 /// Use this method when it is required to know the ID of a span, for example when building
226 /// links between spans across different services.
227 pub fn span_with_id(
228 self: &Arc<Self>,
229 trace_id: NonZeroU128,
230 span_id: NonZeroU64,
231 operation_name: impl Into<String>,
232 ) -> Span {
233 self.span_with_id_and_parent(trace_id, span_id, None, operation_name)
234 }
235
236 /// Builds a new [`Span`], whose parent uses a specific span ID.
237 ///
238 /// A `parent_id` equal to 0 means "no parent".
239 pub fn span_with_parent(
240 self: &Arc<Self>,
241 trace_id: NonZeroU128,
242 parent_id: Option<NonZeroU64>,
243 operation_name: impl Into<String>,
244 ) -> Span {
245 self.span_with_id_and_parent(
246 trace_id,
247 NonZeroU64::new(rand::random()).unwrap(),
248 parent_id,
249 operation_name,
250 )
251 }
252
253 /// Builds a new [`Span`], with a specific ID whose parent uses a specific span ID.
254 ///
255 /// A `parent_id` equal to 0 means "no parent".
256 pub fn span_with_id_and_parent(
257 self: &Arc<Self>,
258 trace_id: NonZeroU128,
259 span_id: NonZeroU64,
260 parent_id: Option<NonZeroU64>,
261 operation_name: impl Into<String>,
262 ) -> Span {
263 Span {
264 traces_in: self.clone(),
265 trace_id,
266 span_id,
267 parent_span_id: parent_id.map(|id| id.get()).unwrap_or(0),
268 operation_name: operation_name.into(),
269 references: Vec::new(),
270 start_time: SystemTime::now(),
271 tags: base_tags(),
272 logs: Vec::new(),
273 }
274 }
275}
276
277pub struct Span {
278 traces_in: Arc<TracesIn>,
279 trace_id: NonZeroU128,
280 span_id: NonZeroU64,
281 /// [`Span::span_id`] of the parent, or `0` if no parent.
282 parent_span_id: u64,
283 operation_name: String,
284 references: Vec<protocol::jaeger::SpanRef>,
285 start_time: SystemTime,
286 tags: Vec<protocol::jaeger::Tag>,
287 logs: Vec<protocol::jaeger::Log>,
288}
289
290impl Span {
291 /// Creates a new [`Span`], child of this one.
292 ///
293 /// > **Note**: There is no need to keep the parent [`Span`] alive while the children is
294 /// > alive. The protocol allows for parents that don't completely overlap their
295 /// > children.
296 pub fn child(&self, operation_name: impl Into<String>) -> Span {
297 self.child_with_id(NonZeroU64::new(rand::random()).unwrap(), operation_name)
298 }
299
300 /// Creates a new [`Span`], child of this one, with a specific ID.
301 pub fn child_with_id(&self, span_id: NonZeroU64, operation_name: impl Into<String>) -> Span {
302 Span {
303 traces_in: self.traces_in.clone(),
304 trace_id: self.trace_id,
305 span_id,
306 parent_span_id: self.span_id.get(),
307 operation_name: operation_name.into(),
308 references: Vec::new(),
309 start_time: SystemTime::now(),
310 tags: base_tags(),
311 logs: Vec::new(),
312 }
313 }
314
315 /// Returns the trace ID originally passed when building this span.
316 pub fn trace_id(&self) -> NonZeroU128 {
317 self.trace_id
318 }
319
320 /// Returns the span ID originally passed when building this span.
321 pub fn span_id(&self) -> NonZeroU64 {
322 self.span_id
323 }
324
325 /// Add a log entry to this span.
326 pub fn log(&mut self) -> Log {
327 let timestamp = i64::try_from(
328 SystemTime::now()
329 .duration_since(SystemTime::UNIX_EPOCH)
330 .unwrap_or(Duration::new(0, 0))
331 .as_micros(),
332 )
333 .unwrap_or(i64::max_value());
334
335 Log {
336 span: self,
337 timestamp,
338 fields: Vec::new(),
339 }
340 }
341
342 /// Adds a "followfrom" relation ship towards another span of (potentially) another trace.
343 pub fn add_follows_from(&mut self, other: &Span) {
344 self.add_follows_from_raw(other.trace_id(), other.span_id())
345 }
346
347 /// Adds a "followfrom" relation ship towards another span of (potentially) another trace.
348 pub fn add_follows_from_raw(&mut self, trace_id: NonZeroU128, span_id: NonZeroU64) {
349 self.references.push(protocol::jaeger::SpanRef {
350 ref_type: protocol::jaeger::SpanRefType::FollowsFrom,
351 trace_id_low: i64::from_be_bytes(
352 <[u8; 8]>::try_from(&trace_id.get().to_be_bytes()[8..]).unwrap(),
353 ),
354 trace_id_high: i64::from_be_bytes(
355 <[u8; 8]>::try_from(&trace_id.get().to_be_bytes()[..8]).unwrap(),
356 ),
357 span_id: i64::from_ne_bytes(span_id.get().to_ne_bytes()),
358 });
359 }
360
361 /// Add a new key-value tag to this span.
362 pub fn add_string_tag(&mut self, key: &str, value: &str) {
363 // TODO: check for duplicates?
364 self.tags.push(string_tag(key, value));
365 }
366
367 /// Add a new key-value tag to this span.
368 pub fn add_int_tag(&mut self, key: &str, value: i64) {
369 // TODO: check for duplicates?
370 self.tags.push(int_tag(key, value));
371 }
372
373 /// Modifies the start time of this span.
374 ///
375 /// > **Note**: This method can be useful in order to generate a span with a `trace_id` that
376 /// > is only know after the span should have started. To do so, call
377 /// > [`StartTime::now`] when the span should start, create the span once you know
378 /// > the ̀`trace_id`, then call this method.
379 pub fn override_start_time(&mut self, start_time: StartTime) {
380 self.start_time = start_time.0;
381 }
382
383 /// Modifies the start time of this span.
384 ///
385 /// > **Note**: This method can be useful in order to generate a span with a `trace_id` that
386 /// > is only know after the span should have started. To do so, call
387 /// > [`StartTime::now`] when the span should start, create the span once you know
388 /// > the ̀`trace_id`, then call this method.
389 pub fn with_start_time_override(mut self, start_time: StartTime) -> Self {
390 self.override_start_time(start_time);
391 self
392 }
393}
394
395impl Drop for Span {
396 fn drop(&mut self) {
397 let end_time = SystemTime::now();
398
399 // Try to send the span, but don't try too hard. If the channel is full, drop the tracing
400 // information.
401 let _ = self
402 .traces_in
403 .sender
404 .lock()
405 .unwrap()
406 .try_send(protocol::jaeger::Span {
407 trace_id_low: i64::from_be_bytes(
408 <[u8; 8]>::try_from(&self.trace_id.get().to_be_bytes()[8..]).unwrap(),
409 ),
410 trace_id_high: i64::from_be_bytes(
411 <[u8; 8]>::try_from(&self.trace_id.get().to_be_bytes()[..8]).unwrap(),
412 ),
413 span_id: i64::from_ne_bytes(self.span_id.get().to_ne_bytes()),
414 parent_span_id: i64::from_ne_bytes(self.parent_span_id.to_ne_bytes()),
415 operation_name: mem::take(&mut self.operation_name),
416 references: if self.references.is_empty() {
417 None
418 } else {
419 Some(mem::take(&mut self.references))
420 },
421 flags: 0,
422 start_time: i64::try_from(
423 self.start_time
424 .duration_since(SystemTime::UNIX_EPOCH)
425 .unwrap_or_else(|_| Duration::new(0, 0))
426 .as_micros(),
427 )
428 .unwrap_or(i64::max_value()),
429 duration: i64::try_from(
430 end_time
431 .duration_since(self.start_time)
432 .unwrap_or_else(|_| Duration::new(0, 0))
433 .as_micros(),
434 )
435 .unwrap_or(i64::max_value()),
436 tags: Some(mem::take(&mut self.tags)),
437 logs: if self.logs.is_empty() {
438 None
439 } else {
440 Some(mem::take(&mut self.logs))
441 },
442 });
443 }
444}
445
446pub struct StartTime(SystemTime);
447
448impl StartTime {
449 pub fn now() -> Self {
450 StartTime(SystemTime::now())
451 }
452}
453
454pub struct Log<'a> {
455 span: &'a mut Span,
456 timestamp: i64,
457 fields: Vec<protocol::jaeger::Tag>,
458}
459
460impl<'a> Log<'a> {
461 /// Add a new key-value tag to this log.
462 pub fn with_string(mut self, key: &str, value: &str) -> Self {
463 self.fields.push(string_tag(key, value));
464 self
465 }
466
467 /// Add a new key-value tag to this log.
468 pub fn with_int(mut self, key: &str, value: i64) -> Self {
469 self.fields.push(int_tag(key, value));
470 self
471 }
472
473 // TODO: other methods
474}
475
476impl<'a> Drop for Log<'a> {
477 fn drop(&mut self) {
478 self.span.logs.push(protocol::jaeger::Log {
479 timestamp: self.timestamp,
480 fields: mem::replace(&mut self.fields, Vec::new()),
481 });
482 }
483}
484
485fn int_tag(key: &str, value: i64) -> protocol::jaeger::Tag {
486 protocol::jaeger::Tag {
487 key: key.to_string(),
488 v_type: protocol::jaeger::TagType::Long,
489 v_long: Some(value),
490 v_str: None,
491 v_double: None,
492 v_bool: None,
493 v_binary: None,
494 }
495}
496
497fn string_tag(key: &str, value: &str) -> protocol::jaeger::Tag {
498 protocol::jaeger::Tag {
499 key: key.to_string(),
500 v_type: protocol::jaeger::TagType::String,
501 v_str: Some(value.to_string()),
502 v_long: None,
503 v_double: None,
504 v_bool: None,
505 v_binary: None,
506 }
507}
508
509fn base_tags() -> Vec<protocol::jaeger::Tag> {
510 vec![
511 string_tag("otel.library.name", env!("CARGO_PKG_NAME")),
512 string_tag("otel.library.version", env!("CARGO_PKG_VERSION")),
513 ]
514}
515
516/// Receiving side for spans.
517///
518/// This object must be processed in order to send traces to the UDP server.
519pub struct TracesOut {
520 rx: stream::ReadyChunks<mpsc::Receiver<protocol::jaeger::Span>>,
521 process: protocol::jaeger::Process,
522 buffer: thrift::transport::ReadHalf<glue::TBufferChannel>,
523 client: protocol::agent::AgentSyncClient<
524 thrift::protocol::TCompactInputProtocol<glue::TNoopChannel>,
525 thrift::protocol::TCompactOutputProtocol<
526 thrift::transport::WriteHalf<glue::TBufferChannel>,
527 >,
528 >,
529}
530
531impl TracesOut {
532 /// Returns the next packet of data to send on the UDP socket.
533 pub async fn next(&mut self) -> Vec<u8> {
534 if self.rx.is_terminated() {
535 loop {
536 futures::pending!()
537 }
538 }
539
540 let spans = self.rx.select_next_some().await;
541
542 self.client
543 .emit_batch(protocol::jaeger::Batch {
544 spans,
545 process: self.process.clone(),
546 })
547 .unwrap();
548 self.buffer.take_bytes()
549 }
550
551 /// Add a new key-value tag to the process.
552 pub fn add_string_tag(&mut self, key: &str, value: &str) {
553 // TODO: check for duplicates?
554 self.process
555 .tags
556 .as_mut()
557 .unwrap()
558 .push(string_tag(key, value));
559 }
560
561 /// Add a new key-value tag to the process.
562 pub fn add_int_tag(&mut self, key: &str, value: i64) {
563 // TODO: check for duplicates?
564 self.process
565 .tags
566 .as_mut()
567 .unwrap()
568 .push(int_tag(key, value));
569 }
570}