From 4af37e150fb08eccdc86dc65c61541cc463daa57 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Tue, 30 Mar 2021 22:14:49 -0700 Subject: [PATCH] Simple span processor perf (#502) Currently the simple span processor his _highly_ sensitive to any latency in its underlying exporter as it will block the current thread for the full export duration when each span ends. This patch addresses this by moving span exporting to a separate thread, and communicating via channels. Spans are still exported as soon as they end, and shutdown will wait for all spans to be successfully exported, preserving the simple span processor semantics of immediate exports, and never dropping data. --- opentelemetry/Cargo.toml | 3 +- opentelemetry/benches/trace.rs | 39 +++++++----- opentelemetry/src/sdk/trace/span_processor.rs | 62 ++++++++++++------- 3 files changed, 66 insertions(+), 38 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 769847c2c4..4f5ca4bb99 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive", "rc"], optional = true } thiserror = "1" tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true } tokio-stream = { version = "0.1", optional = true } +crossbeam-channel = { version = "0.5", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3" @@ -47,7 +48,7 @@ tokio-stream = "0.1" [features] default = ["trace"] -trace = ["rand", "pin-project", "async-trait", "percent-encoding"] +trace = ["crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"] metrics = ["dashmap", "fnv"] serialize = ["serde"] testing = ["trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread"] diff --git a/opentelemetry/benches/trace.rs b/opentelemetry/benches/trace.rs index cd17ffc773..42347340c7 100644 --- a/opentelemetry/benches/trace.rs +++ b/opentelemetry/benches/trace.rs @@ -1,6 +1,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::{ - sdk::trace as sdktrace, + sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + trace as sdktrace, + }, trace::{Span, Tracer, TracerProvider}, Key, KeyValue, }; @@ -94,29 +97,35 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) { } } +#[derive(Debug)] +struct VoidExporter; + +#[async_trait::async_trait] +impl SpanExporter for VoidExporter { + async fn export(&mut self, _spans: Vec) -> ExportResult { + Ok(()) + } +} + fn trace_benchmark_group(c: &mut Criterion, name: &str, f: F) { let mut group = c.benchmark_group(name); group.bench_function("always-sample", |b| { - let always_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOn), - ..Default::default() - }) - .build() - .get_tracer("always-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn)) + .with_simple_exporter(VoidExporter) + .build(); + let always_sample = provider.get_tracer("always-sample", None); b.iter(|| f(&always_sample)); }); group.bench_function("never-sample", |b| { - let never_sample = sdktrace::TracerProvider::builder() - .with_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOff), - ..Default::default() - }) - .build() - .get_tracer("never-sample", None); + let provider = sdktrace::TracerProvider::builder() + .with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOff)) + .with_simple_exporter(VoidExporter) + .build(); + let never_sample = provider.get_tracer("never-sample", None); b.iter(|| f(&never_sample)); }); diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index b39b160c25..6024c4d5cc 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -43,8 +43,7 @@ use crate::{ Context, }; use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt}; -use std::env; -use std::{fmt, str::FromStr, sync::Mutex, time::Duration}; +use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -104,13 +103,37 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// ``` #[derive(Debug)] pub struct SimpleSpanProcessor { - exporter: Mutex>, + sender: crossbeam_channel::Sender>, + shutdown: crossbeam_channel::Receiver<()>, } impl SimpleSpanProcessor { - pub(crate) fn new(exporter: Box) -> Self { + pub(crate) fn new(mut exporter: Box) -> Self { + let (span_tx, span_rx) = crossbeam_channel::unbounded(); + let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0); + + let _ = thread::Builder::new() + .name("opentelemetry-exporter".to_string()) + .spawn(move || { + while let Ok(Some(span)) = span_rx.recv() { + if let Err(err) = executor::block_on(exporter.export(vec![span])) { + global::handle_error(err); + } + } + + exporter.shutdown(); + + if let Err(err) = shutdown_tx.send(()) { + global::handle_error(TraceError::from(format!( + "could not send shutdown: {:?}", + err + ))); + } + }); + SimpleSpanProcessor { - exporter: Mutex::new(exporter), + sender: span_tx, + shutdown: shutdown_rx, } } } @@ -121,14 +144,8 @@ impl SpanProcessor for SimpleSpanProcessor { } fn on_end(&self, span: SpanData) { - let result = self - .exporter - .lock() - .map_err(|_| TraceError::Other("simple span processor mutex poisoned".into())) - .and_then(|mut exporter| executor::block_on(exporter.export(vec![span]))); - - if let Err(err) = result { - global::handle_error(err); + if let Err(err) = self.sender.send(Some(span)) { + global::handle_error(TraceError::from(format!("error processing span {:?}", err))); } } @@ -138,15 +155,16 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&mut self) -> TraceResult<()> { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); - Ok(()) - } else { - Err(TraceError::Other( - "When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned" - .into(), - )) + if self.sender.send(None).is_ok() { + if let Err(err) = self.shutdown.recv() { + global::handle_error(TraceError::from(format!( + "error shutting down span processor: {:?}", + err + ))) + } } + + Ok(()) } } @@ -543,7 +561,7 @@ mod tests { let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); let processor = SimpleSpanProcessor::new(Box::new(exporter)); processor.on_end(new_test_export_span_data()); - assert!(rx_export.try_recv().is_ok()); + assert!(rx_export.recv().is_ok()); } #[test]