From d4d7cc655feea002f18850e17c18640dc4051f27 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Mon, 29 Mar 2021 09:09:35 -0700 Subject: [PATCH 1/2] Simple span processor perf Currently the simple span processor his _highly_ sensitive to any latency in its underlying exporter as it will block the current thread for the full export duration when each span ends. This patch addresses this by moving span exporting to a separate thread, and communicating via channels. Spans are still exported as soon as they end, and shutdown will wait for all spans to be successfully exported, preserving the simple span processor semantics of immediate exports, and never dropping data. --- opentelemetry/Cargo.toml | 3 +- opentelemetry/benches/trace.rs | 39 +++++++----- opentelemetry/src/sdk/trace/span_processor.rs | 62 ++++++++++++------- 3 files changed, 66 insertions(+), 38 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 769847c2c4..4f5ca4bb99 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive", "rc"], optional = true } thiserror = "1" tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true } tokio-stream = { version = "0.1", optional = true } +crossbeam-channel = { version = "0.5", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3" @@ -47,7 +48,7 @@ tokio-stream = "0.1" [features] default = ["trace"] -trace = ["rand", "pin-project", "async-trait", "percent-encoding"] +trace = ["crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"] metrics = ["dashmap", "fnv"] serialize = ["serde"] testing = ["trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread"] diff --git a/opentelemetry/benches/trace.rs b/opentelemetry/benches/trace.rs index cd17ffc773..42347340c7 100644 --- a/opentelemetry/benches/trace.rs +++ b/opentelemetry/benches/trace.rs @@ -1,6 +1,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::{ - sdk::trace as sdktrace, + sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + trace as sdktrace, + }, trace::{Span, Tracer, TracerProvider}, Key, KeyValue, }; @@ -94,29 +97,35 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) { } } +#[derive(Debug)] +struct VoidExporter; + +#[async_trait::async_trait] +impl SpanExporter for VoidExporter { + async fn export(&mut self, _spans: Vec) -> ExportResult { + Ok(()) + } +} + fn trace_benchmark_group(c: &mut Criterion, name: &str, f: F) { let mut group = c.benchmark_group(name); group.bench_function("always-sample", |b| { - let always_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOn), - ..Default::default() - }) - .build() - .get_tracer("always-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn)) + .with_simple_exporter(VoidExporter) + .build(); + let always_sample = provider.get_tracer("always-sample", None); b.iter(|| f(&always_sample)); }); group.bench_function("never-sample", |b| { - let never_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOff), - ..Default::default() - }) - .build() - .get_tracer("never-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOff)) + .with_simple_exporter(VoidExporter) + .build(); + let never_sample = provider.get_tracer("never-sample", None); b.iter(|| f(&never_sample)); }); diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index b39b160c25..bf13ed801e 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -43,8 +43,7 @@ use crate::{ Context, }; use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt}; -use std::env; -use std::{fmt, str::FromStr, sync::Mutex, time::Duration}; +use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -104,13 +103,37 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// ``` #[derive(Debug)] pub struct SimpleSpanProcessor { - exporter: Mutex>, + sender: crossbeam_channel::Sender>, + shutdown: crossbeam_channel::Receiver<()>, } impl SimpleSpanProcessor { - pub(crate) fn new(exporter: Box) -> Self { + pub(crate) fn new(mut exporter: Box) -> Self { + let (span_tx, span_rx) = crossbeam_channel::unbounded(); + let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0); + + let _ = thread::Builder::new() + .name("opentelemetry-exporter".to_string()) + .spawn(move || { + while let Ok(Some(span)) = span_rx.recv() { + if let Err(err) = executor::block_on(exporter.export(vec![span])) { + global::handle_error(err); + } + } + + exporter.shutdown(); + + if let Err(err) = shutdown_tx.send(()) { + global::handle_error(TraceError::from(format!( + "could not send shutdown: {:?}", + err + ))); + } + }); + SimpleSpanProcessor { - exporter: Mutex::new(exporter), + sender: span_tx, + shutdown: shutdown_rx, } } } @@ -121,14 +144,8 @@ impl SpanProcessor for SimpleSpanProcessor { } fn on_end(&self, span: SpanData) { - let result = self - .exporter - .lock() - .map_err(|_| TraceError::Other("simple span processor mutex poisoned".into())) - .and_then(|mut exporter| executor::block_on(exporter.export(vec![span]))); - - if let Err(err) = result { - global::handle_error(err); + if let Err(err) = self.sender.send(Some(span)) { + global::handle_error(TraceError::from(format!("error processing span {:?}", err))); } } @@ -138,15 +155,16 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&mut self) -> TraceResult<()> { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); - Ok(()) - } else { - Err(TraceError::Other( - "When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned" - .into(), - )) + if let Ok(_) = self.sender.send(None) { + if let Err(err) = self.shutdown.recv() { + global::handle_error(TraceError::from(format!( + "error shutting down span processor: {:?}", + err + ))) + } } + + Ok(()) } } @@ -543,7 +561,7 @@ mod tests { let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); let processor = SimpleSpanProcessor::new(Box::new(exporter)); processor.on_end(new_test_export_span_data()); - assert!(rx_export.try_recv().is_ok()); + assert!(rx_export.recv().is_ok()); } #[test] From 60c46ee816c4253a620253ad4c557aa212f28ac6 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Mon, 29 Mar 2021 09:36:29 -0700 Subject: [PATCH 2/2] Clippy lints --- opentelemetry/src/sdk/trace/span_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index bf13ed801e..6024c4d5cc 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -155,7 +155,7 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&mut self) -> TraceResult<()> { - if let Ok(_) = self.sender.send(None) { + if self.sender.send(None).is_ok() { if let Err(err) = self.shutdown.recv() { global::handle_error(TraceError::from(format!( "error shutting down span processor: {:?}",