Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple span processor perf #502

Merged
merged 3 commits into from Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion opentelemetry/Cargo.toml
Expand Up @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive", "rc"], optional = true }
thiserror = "1"
tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
frigus02 marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
Expand All @@ -47,7 +48,7 @@ tokio-stream = "0.1"

[features]
default = ["trace"]
trace = ["rand", "pin-project", "async-trait", "percent-encoding"]
trace = ["crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"]
metrics = ["dashmap", "fnv"]
serialize = ["serde"]
testing = ["trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread"]
Expand Down
39 changes: 24 additions & 15 deletions opentelemetry/benches/trace.rs
@@ -1,6 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{
sdk::trace as sdktrace,
sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
trace as sdktrace,
},
trace::{Span, Tracer, TracerProvider},
Key, KeyValue,
};
Expand Down Expand Up @@ -94,29 +97,35 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) {
}
}

#[derive(Debug)]
struct VoidExporter;

#[async_trait::async_trait]
impl SpanExporter for VoidExporter {
async fn export(&mut self, _spans: Vec<SpanData>) -> ExportResult {
Ok(())
}
}

fn trace_benchmark_group<F: Fn(&sdktrace::Tracer)>(c: &mut Criterion, name: &str, f: F) {
let mut group = c.benchmark_group(name);

group.bench_function("always-sample", |b| {
let always_sample = sdktrace::TracerProvider::builder()
.with_config(sdktrace::Config {
default_sampler: Box::new(sdktrace::Sampler::AlwaysOn),
..Default::default()
})
.build()
.get_tracer("always-sample", None);
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn))
.with_simple_exporter(VoidExporter)
.build();
let always_sample = provider.get_tracer("always-sample", None);

b.iter(|| f(&always_sample));
});

group.bench_function("never-sample", |b| {
let never_sample = sdktrace::TracerProvider::builder()
.with_config(sdktrace::Config {
default_sampler: Box::new(sdktrace::Sampler::AlwaysOff),
..Default::default()
})
.build()
.get_tracer("never-sample", None);
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOff))
.with_simple_exporter(VoidExporter)
.build();
let never_sample = provider.get_tracer("never-sample", None);
b.iter(|| f(&never_sample));
});

Expand Down
62 changes: 40 additions & 22 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -43,8 +43,7 @@ use crate::{
Context,
};
use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt};
use std::env;
use std::{fmt, str::FromStr, sync::Mutex, time::Duration};
use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -104,13 +103,37 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// ```
#[derive(Debug)]
pub struct SimpleSpanProcessor {
exporter: Mutex<Box<dyn SpanExporter>>,
sender: crossbeam_channel::Sender<Option<SpanData>>,
shutdown: crossbeam_channel::Receiver<()>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(Some(span)) = span_rx.recv() {
if let Err(err) = executor::block_on(exporter.export(vec![span])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider adding a timeout around here to prevent the exporter block for too long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering that, but the spec doesn't have any config for it and if you are creating faster than you are exporting to the point where it would be a big issue you should probably just use the batch processor as your app may OOM or do other bad things as well with the simple processor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To that point we should probably warn users that using simple span processor may cause OOM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think a docs PR followup on the difference between both and when to choose one vs the other would be a good idea 👍

global::handle_error(err);
}
}

exporter.shutdown();

if let Err(err) = shutdown_tx.send(()) {
global::handle_error(TraceError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});

SimpleSpanProcessor {
exporter: Mutex::new(exporter),
sender: span_tx,
shutdown: shutdown_rx,
}
}
}
Expand All @@ -121,14 +144,8 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn on_end(&self, span: SpanData) {
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("simple span processor mutex poisoned".into()))
.and_then(|mut exporter| executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);
if let Err(err) = self.sender.send(Some(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
}
}

Expand All @@ -138,15 +155,16 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn shutdown(&mut self) -> TraceResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned"
.into(),
))
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
frigus02 marked this conversation as resolved.
Show resolved Hide resolved
global::handle_error(TraceError::from(format!(
"error shutting down span processor: {:?}",
err
)))
}
}

Ok(())
}
}

Expand Down Expand Up @@ -543,7 +561,7 @@ mod tests {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.on_end(new_test_export_span_data());
assert!(rx_export.try_recv().is_ok());
assert!(rx_export.recv().is_ok());
}

#[test]
Expand Down