Skip to content

Commit

Permalink
Simple span processor perf (#502)
Browse files Browse the repository at this point in the history
Currently the simple span processor his _highly_ sensitive to any
latency in its underlying exporter as it will block the current thread
for the full export duration when each span ends.

This patch addresses this by moving span exporting to a separate thread,
and communicating via channels.

Spans are still exported as soon as they end, and shutdown will wait for
all spans to be successfully exported, preserving the simple span
processor semantics of immediate exports, and never dropping data.
  • Loading branch information
jtescher committed Mar 31, 2021
1 parent e8878a5 commit 4af37e1
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 38 deletions.
3 changes: 2 additions & 1 deletion opentelemetry/Cargo.toml
Expand Up @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive", "rc"], optional = true }
thiserror = "1"
tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
Expand All @@ -47,7 +48,7 @@ tokio-stream = "0.1"

[features]
default = ["trace"]
trace = ["rand", "pin-project", "async-trait", "percent-encoding"]
trace = ["crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"]
metrics = ["dashmap", "fnv"]
serialize = ["serde"]
testing = ["trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread"]
Expand Down
39 changes: 24 additions & 15 deletions opentelemetry/benches/trace.rs
@@ -1,6 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{
sdk::trace as sdktrace,
sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
trace as sdktrace,
},
trace::{Span, Tracer, TracerProvider},
Key, KeyValue,
};
Expand Down Expand Up @@ -94,29 +97,35 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) {
}
}

#[derive(Debug)]
struct VoidExporter;

#[async_trait::async_trait]
impl SpanExporter for VoidExporter {
async fn export(&mut self, _spans: Vec<SpanData>) -> ExportResult {
Ok(())
}
}

fn trace_benchmark_group<F: Fn(&sdktrace::Tracer)>(c: &mut Criterion, name: &str, f: F) {
let mut group = c.benchmark_group(name);

group.bench_function("always-sample", |b| {
let always_sample = sdktrace::TracerProvider::builder()
.with_config(sdktrace::Config {
default_sampler: Box::new(sdktrace::Sampler::AlwaysOn),
..Default::default()
})
.build()
.get_tracer("always-sample", None);
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn))
.with_simple_exporter(VoidExporter)
.build();
let always_sample = provider.get_tracer("always-sample", None);

b.iter(|| f(&always_sample));
});

group.bench_function("never-sample", |b| {
let never_sample = sdktrace::TracerProvider::builder()
.with_config(sdktrace::Config {
default_sampler: Box::new(sdktrace::Sampler::AlwaysOff),
..Default::default()
})
.build()
.get_tracer("never-sample", None);
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOff))
.with_simple_exporter(VoidExporter)
.build();
let never_sample = provider.get_tracer("never-sample", None);
b.iter(|| f(&never_sample));
});

Expand Down
62 changes: 40 additions & 22 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -43,8 +43,7 @@ use crate::{
Context,
};
use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt};
use std::env;
use std::{fmt, str::FromStr, sync::Mutex, time::Duration};
use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -104,13 +103,37 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// ```
#[derive(Debug)]
pub struct SimpleSpanProcessor {
exporter: Mutex<Box<dyn SpanExporter>>,
sender: crossbeam_channel::Sender<Option<SpanData>>,
shutdown: crossbeam_channel::Receiver<()>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(Some(span)) = span_rx.recv() {
if let Err(err) = executor::block_on(exporter.export(vec![span])) {
global::handle_error(err);
}
}

exporter.shutdown();

if let Err(err) = shutdown_tx.send(()) {
global::handle_error(TraceError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});

SimpleSpanProcessor {
exporter: Mutex::new(exporter),
sender: span_tx,
shutdown: shutdown_rx,
}
}
}
Expand All @@ -121,14 +144,8 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn on_end(&self, span: SpanData) {
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("simple span processor mutex poisoned".into()))
.and_then(|mut exporter| executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);
if let Err(err) = self.sender.send(Some(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
}
}

Expand All @@ -138,15 +155,16 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn shutdown(&mut self) -> TraceResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned"
.into(),
))
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(TraceError::from(format!(
"error shutting down span processor: {:?}",
err
)))
}
}

Ok(())
}
}

Expand Down Expand Up @@ -543,7 +561,7 @@ mod tests {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.on_end(new_test_export_span_data());
assert!(rx_export.try_recv().is_ok());
assert!(rx_export.recv().is_ok());
}

#[test]
Expand Down

0 comments on commit 4af37e1

Please sign in to comment.