diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 769847c2c4..4f5ca4bb99 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive", "rc"], optional = true } thiserror = "1" tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true } tokio-stream = { version = "0.1", optional = true } +crossbeam-channel = { version = "0.5", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3" @@ -47,7 +48,7 @@ tokio-stream = "0.1" [features] default = ["trace"] -trace = ["rand", "pin-project", "async-trait", "percent-encoding"] +trace = ["crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"] metrics = ["dashmap", "fnv"] serialize = ["serde"] testing = ["trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread"] diff --git a/opentelemetry/benches/trace.rs b/opentelemetry/benches/trace.rs index cd17ffc773..42347340c7 100644 --- a/opentelemetry/benches/trace.rs +++ b/opentelemetry/benches/trace.rs @@ -1,6 +1,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::{ - sdk::trace as sdktrace, + sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + trace as sdktrace, + }, trace::{Span, Tracer, TracerProvider}, Key, KeyValue, }; @@ -94,29 +97,35 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) { } } +#[derive(Debug)] +struct VoidExporter; + +#[async_trait::async_trait] +impl SpanExporter for VoidExporter { + async fn export(&mut self, _spans: Vec) -> ExportResult { + Ok(()) + } +} + fn trace_benchmark_group(c: &mut Criterion, name: &str, f: F) { let mut group = c.benchmark_group(name); group.bench_function("always-sample", |b| { - let always_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOn), - ..Default::default() - }) - .build() - .get_tracer("always-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn)) + .with_simple_exporter(VoidExporter) + .build(); + let always_sample = provider.get_tracer("always-sample", None); b.iter(|| f(&always_sample)); }); group.bench_function("never-sample", |b| { - let never_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOff), - ..Default::default() - }) - .build() - .get_tracer("never-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOff)) + .with_simple_exporter(VoidExporter) + .build(); + let never_sample = provider.get_tracer("never-sample", None); b.iter(|| f(&never_sample)); }); diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index b39b160c25..6024c4d5cc 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -43,8 +43,7 @@ use crate::{ Context, }; use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt}; -use std::env; -use std::{fmt, str::FromStr, sync::Mutex, time::Duration}; +use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -104,13 +103,37 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// ``` #[derive(Debug)] pub struct SimpleSpanProcessor { - exporter: Mutex>, + sender: crossbeam_channel::Sender>, + shutdown: crossbeam_channel::Receiver<()>, } impl SimpleSpanProcessor { - pub(crate) fn new(exporter: Box) -> Self { + pub(crate) fn new(mut exporter: Box) -> Self { + let (span_tx, span_rx) = crossbeam_channel::unbounded(); + let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0); + + let _ = thread::Builder::new() + .name("opentelemetry-exporter".to_string()) + .spawn(move || { + while let Ok(Some(span)) = span_rx.recv() { + if let Err(err) = executor::block_on(exporter.export(vec![span])) { + global::handle_error(err); + } + } + + exporter.shutdown(); + + if let Err(err) = shutdown_tx.send(()) { + global::handle_error(TraceError::from(format!( + "could not send shutdown: {:?}", + err + ))); + } + }); + SimpleSpanProcessor { - exporter: Mutex::new(exporter), + sender: span_tx, + shutdown: shutdown_rx, } } } @@ -121,14 +144,8 @@ impl SpanProcessor for SimpleSpanProcessor { } fn on_end(&self, span: SpanData) { - let result = self - .exporter - .lock() - .map_err(|_| TraceError::Other("simple span processor mutex poisoned".into())) - .and_then(|mut exporter| executor::block_on(exporter.export(vec![span]))); - - if let Err(err) = result { - global::handle_error(err); + if let Err(err) = self.sender.send(Some(span)) { + global::handle_error(TraceError::from(format!("error processing span {:?}", err))); } } @@ -138,15 +155,16 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&mut self) -> TraceResult<()> { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); - Ok(()) - } else { - Err(TraceError::Other( - "When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned" - .into(), - )) + if self.sender.send(None).is_ok() { + if let Err(err) = self.shutdown.recv() { + global::handle_error(TraceError::from(format!( + "error shutting down span processor: {:?}", + err + ))) + } } + + Ok(()) } } @@ -543,7 +561,7 @@ mod tests { let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); let processor = SimpleSpanProcessor::new(Box::new(exporter)); processor.on_end(new_test_export_span_data()); - assert!(rx_export.try_recv().is_ok()); + assert!(rx_export.recv().is_ok()); } #[test]