diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index 79dfcdcdbf..ee765771f4 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -7,9 +7,9 @@ pub use model::Error; use async_trait::async_trait; use http::{Method, Request, Uri}; use itertools::Itertools; -use opentelemetry::runtime::Runtime; use opentelemetry::sdk::export::trace; use opentelemetry::sdk::export::trace::SpanData; +use opentelemetry::sdk::trace::TraceRuntime; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk, trace::TracerProvider}; use opentelemetry_http::{HttpClient, ResponseExt}; @@ -127,7 +127,7 @@ impl DatadogPipelineBuilder { /// Install the Datadog trace exporter pipeline using a batch span processor with the specified /// runtime. - pub fn install_batch( + pub fn install_batch( mut self, runtime: R, ) -> Result { diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 5333e2ef7e..58903ad2ef 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -23,10 +23,9 @@ use isahc::prelude::Configurable; use opentelemetry::sdk::export::ExportError; use opentelemetry::trace::TraceError; use opentelemetry::{ - global, - runtime::Runtime, - sdk, + global, sdk, sdk::export::trace, + sdk::trace::TraceRuntime, trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, Key, KeyValue, }; @@ -268,7 +267,10 @@ impl PipelineBuilder { } /// Install a Jaeger pipeline with a batch span processor using the specified runtime. - pub fn install_batch(self, runtime: R) -> Result { + pub fn install_batch( + self, + runtime: R, + ) -> Result { let tracer_provider = self.build_batch(runtime)?; let tracer = tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION"))); @@ -290,7 +292,7 @@ impl PipelineBuilder { /// Build a configured `sdk::trace::TracerProvider` with a batch span processor using the /// specified runtime. - pub fn build_batch( + pub fn build_batch( mut self, runtime: R, ) -> Result { diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 7d8e780ee9..c989041932 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -146,7 +146,7 @@ #![cfg_attr(docsrs, feature(doc_cfg), deny(broken_intra_doc_links))] #![cfg_attr(test, deny(warnings))] -use opentelemetry::{global, runtime::Runtime, sdk, trace::TracerProvider}; +use opentelemetry::{global, sdk, sdk::trace::TraceRuntime, trace::TracerProvider}; #[cfg(any(feature = "grpc-sys", feature = "http-proto"))] use std::collections::HashMap; @@ -403,7 +403,10 @@ impl TonicPipelineBuilder { /// /// [`Tracer`]: opentelemetry::trace::Tracer /// [tonic]: https://github.com/hyperium/tonic - pub fn install_batch(self, runtime: R) -> Result { + pub fn install_batch( + self, + runtime: R, + ) -> Result { let exporter = match self.channel { Some(channel) => { TraceExporter::from_tonic_channel(self.exporter_config, self.tonic_config, channel) @@ -488,7 +491,10 @@ impl GrpcioPipelineBuilder { /// /// [`Tracer`]: opentelemetry::trace::Tracer /// [grpcio]: https://github.com/tikv/grpc-rs - pub fn install_batch(self, runtime: R) -> Result { + pub fn install_batch( + self, + runtime: R, + ) -> Result { let exporter = TraceExporter::new_grpcio(self.exporter_config, self.grpcio_config); Ok(build_batch_with_exporter( exporter, @@ -551,7 +557,10 @@ impl HttpPipelineBuilder { /// `install_batch` will panic if not called within a tokio runtime /// /// [`Tracer`]: opentelemetry::trace::Tracer - pub fn install_batch(self, runtime: R) -> Result { + pub fn install_batch( + self, + runtime: R, + ) -> Result { let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?; Ok(build_batch_with_exporter( exporter, @@ -575,7 +584,7 @@ fn build_simple_with_exporter( tracer } -fn build_batch_with_exporter( +fn build_batch_with_exporter( exporter: TraceExporter, trace_config: Option, runtime: R, diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index b21688ed69..5bdf612e39 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -5,10 +5,9 @@ use async_trait::async_trait; use http::Uri; use model::endpoint::Endpoint; use opentelemetry::{ - global, - runtime::Runtime, - sdk, + global, sdk, sdk::export::{trace, ExportError}, + sdk::trace::TraceRuntime, trace::{TraceError, TracerProvider}, }; use opentelemetry_http::HttpClient; @@ -120,7 +119,7 @@ impl ZipkinPipelineBuilder { /// Install the Zipkin trace exporter pipeline with a batch span processor using the specified /// runtime. - pub fn install_batch( + pub fn install_batch( mut self, runtime: R, ) -> Result { diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 7fdd79637b..4e525a5ab4 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -60,6 +60,11 @@ rt-async-std = ["async-std"] name = "trace" harness = false +[[bench]] +name = "batch_span_processor" +harness = false +required-features = ["rt-tokio"] + [[bench]] name = "metric" harness = false diff --git a/opentelemetry/benches/batch_span_processor.rs b/opentelemetry/benches/batch_span_processor.rs new file mode 100644 index 0000000000..5925502747 --- /dev/null +++ b/opentelemetry/benches/batch_span_processor.rs @@ -0,0 +1,82 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use opentelemetry::runtime::Tokio; +use opentelemetry::sdk::export::trace::SpanData; +use opentelemetry::sdk::trace::{BatchSpanProcessor, EvictedHashMap, EvictedQueue, SpanProcessor}; +use opentelemetry::trace::{ + NoopSpanExporter, SpanContext, SpanId, SpanKind, StatusCode, TraceId, TraceState, +}; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::runtime::Runtime; + +fn get_span_data() -> Vec { + (0..200) + .into_iter() + .map(|_| SpanData { + span_context: SpanContext::new( + TraceId::from_u128(12), + SpanId::from_u64(12), + 0, + false, + TraceState::default(), + ), + parent_span_id: SpanId::from_u64(12), + span_kind: SpanKind::Client, + name: Default::default(), + start_time: SystemTime::now(), + end_time: SystemTime::now(), + attributes: EvictedHashMap::new(12, 12), + events: EvictedQueue::new(12), + links: EvictedQueue::new(12), + status_code: StatusCode::Unset, + status_message: Default::default(), + resource: None, + instrumentation_lib: Default::default(), + }) + .collect::>() +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("BatchSpanProcessor"); + group.sample_size(50); + + for task_num in [1, 2, 4, 8, 16, 32].iter() { + group.bench_with_input( + BenchmarkId::from_parameter(format!("with {} concurrent task", task_num)), + task_num, + |b, &task_num| { + b.iter(|| { + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + let span_processor = + BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio) + .with_max_queue_size(10_000) + .build(); + let mut shared_span_processor = Arc::new(span_processor); + let mut handles = Vec::with_capacity(10); + for _ in 0..task_num { + let span_processor = shared_span_processor.clone(); + let spans = get_span_data(); + handles.push(tokio::spawn(async move { + for span in spans { + span_processor.on_end(span); + tokio::task::yield_now().await; + } + })); + } + futures::future::join_all(handles).await; + let _ = + Arc::>::get_mut(&mut shared_span_processor) + .unwrap() + .shutdown(); + }); + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/opentelemetry/src/global/trace.rs b/opentelemetry/src/global/trace.rs index 5a37d00c46..5dbeb649f4 100644 --- a/opentelemetry/src/global/trace.rs +++ b/opentelemetry/src/global/trace.rs @@ -328,8 +328,9 @@ pub fn force_flush_tracer_provider() { // threads Use cargo test -- --ignored --test-threads=1 to run those tests. mod tests { use super::*; + use crate::sdk::trace::TraceRuntime; use crate::{ - runtime::{self, Runtime}, + runtime, trace::{NoopTracer, Tracer}, }; use std::{ @@ -479,7 +480,7 @@ mod tests { assert!(second_resp.contains("thread 2")); } - fn build_batch_tracer_provider( + fn build_batch_tracer_provider( assert_writer: AssertWriter, runtime: R, ) -> crate::sdk::trace::TracerProvider { @@ -500,7 +501,7 @@ mod tests { .build() } - async fn test_set_provider_in_tokio(runtime: R) -> AssertWriter { + async fn test_set_provider_in_tokio(runtime: R) -> AssertWriter { let buffer = AssertWriter::new(); let _ = set_tracer_provider(build_batch_tracer_provider(buffer.clone(), runtime)); let tracer = tracer("opentelemetery"); diff --git a/opentelemetry/src/sdk/trace/mod.rs b/opentelemetry/src/sdk/trace/mod.rs index 5744518175..027b8301b2 100644 --- a/opentelemetry/src/sdk/trace/mod.rs +++ b/opentelemetry/src/sdk/trace/mod.rs @@ -11,6 +11,7 @@ mod evicted_hash_map; mod evicted_queue; mod id_generator; mod provider; +mod runtime; mod sampler; mod span; mod span_limit; @@ -22,10 +23,12 @@ pub use evicted_hash_map::EvictedHashMap; pub use evicted_queue::EvictedQueue; pub use id_generator::{aws::XrayIdGenerator, IdGenerator}; pub use provider::{Builder, TracerProvider}; +pub use runtime::{TraceRuntime, TrySend}; pub use sampler::{Sampler, SamplingDecision, SamplingResult, ShouldSample}; pub use span::Span; pub use span_limit::SpanLimits; pub use span_processor::{ - BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor, + BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, + SpanProcessor, }; pub use tracer::Tracer; diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index 7ed35eec14..d78d1c5e84 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -8,10 +8,10 @@ //! propagators) are provided by the `TracerProvider`. `Tracer` instances do //! not duplicate this data to avoid that different `Tracer` instances //! of the `TracerProvider` have different versions of these data. +use crate::sdk::trace::runtime::TraceRuntime; use crate::trace::TraceResult; use crate::{ global, - runtime::Runtime, sdk::{self, export::trace::SpanExporter, trace::SpanProcessor}, }; use std::sync::Arc; @@ -115,7 +115,7 @@ impl Builder { } /// The `SpanExporter` setup using a default `BatchSpanProcessor` that this provider should use. - pub fn with_batch_exporter( + pub fn with_batch_exporter( self, exporter: T, runtime: R, diff --git a/opentelemetry/src/sdk/trace/runtime.rs b/opentelemetry/src/sdk/trace/runtime.rs new file mode 100644 index 0000000000..118f59041a --- /dev/null +++ b/opentelemetry/src/sdk/trace/runtime.rs @@ -0,0 +1,119 @@ +//! # Trace Runtime +//! Trace runtime is an extension to [`Runtime`]. Currently it provides a channel that used +//! by [`BatchSpanProcessor`]. +//! +//! [`BatchSpanProcessor`]: crate::sdk::trace::span_processor::BatchSpanProcessor +//! [`Runtime`]: crate::runtime::Runtime +#[cfg(feature = "rt-async-std")] +use crate::runtime::AsyncStd; +use crate::runtime::Runtime; +#[cfg(feature = "rt-tokio")] +use crate::runtime::Tokio; +#[cfg(feature = "rt-tokio-current-thread")] +use crate::runtime::TokioCurrentThread; +use crate::sdk::trace::BatchMessage; +use crate::trace::TraceError; +use futures::Stream; +use std::fmt::Debug; + +#[cfg(any( + feature = "rt-tokio", + feature = "rt-tokio-current-thread", + feature = "rt-async-std" +))] +const CHANNEL_FULL_ERROR: &str = + "cannot send span to the batch span processor because the channel is full"; +#[cfg(any( + feature = "rt-tokio", + feature = "rt-tokio-current-thread", + feature = "rt-async-std" +))] +const CHANNEL_CLOSED_ERROR: &str = + "cannot send span to the batch span processor because the channel is closed"; + +/// Trace runtime is an extension to [`Runtime`]. Currently it provides a channel that used +/// by [`BatchSpanProcessor`]. +/// +/// [`BatchSpanProcessor`]: crate::sdk::trace::span_processor::BatchSpanProcessor +/// [`Runtime`]: crate::runtime::Runtime +pub trait TraceRuntime: Runtime { + /// A future stream to receive the batch messages from channels. + type Receiver: Stream + Send; + + /// A batch messages sender that could be sent across thread safely. + type Sender: TrySend + Debug; + + /// Return the sender and receiver used to send batch message between tasks. + fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver); +} + +/// TrySend is an abstraction of sender that is capable to send BatchMessage with reference. +pub trait TrySend: Sync + Send { + /// Try to send one batch message to worker thread. + /// + /// It can fail because either the receiver has closed or the buffer is full. + fn try_send(&self, item: BatchMessage) -> Result<(), TraceError>; +} + +#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] +impl TrySend for tokio::sync::mpsc::Sender { + fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> { + self.try_send(item).map_err(|err| match err { + tokio::sync::mpsc::error::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR), + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + TraceError::from(CHANNEL_CLOSED_ERROR) + } + }) + } +} + +#[cfg(feature = "rt-tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +impl TraceRuntime for Tokio { + type Receiver = tokio_stream::wrappers::ReceiverStream; + type Sender = tokio::sync::mpsc::Sender; + + fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { + let (sender, receiver) = tokio::sync::mpsc::channel(capacity); + ( + sender, + tokio_stream::wrappers::ReceiverStream::new(receiver), + ) + } +} + +#[cfg(feature = "rt-tokio-current-thread")] +#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +impl TraceRuntime for TokioCurrentThread { + type Receiver = tokio_stream::wrappers::ReceiverStream; + type Sender = tokio::sync::mpsc::Sender; + + fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { + let (sender, receiver) = tokio::sync::mpsc::channel(capacity); + ( + sender, + tokio_stream::wrappers::ReceiverStream::new(receiver), + ) + } +} + +#[cfg(feature = "rt-async-std")] +impl TrySend for async_std::channel::Sender { + fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> { + self.try_send(item).map_err(|err| match err { + async_std::channel::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR), + async_std::channel::TrySendError::Closed(_) => TraceError::from(CHANNEL_CLOSED_ERROR), + }) + } +} + +#[cfg(feature = "rt-async-std")] +#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +impl TraceRuntime for AsyncStd { + type Receiver = async_std::channel::Receiver; + type Sender = async_std::channel::Sender; + + fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { + async_std::channel::bounded(capacity) + } +} diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 1605587a0f..b77cba9ac7 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -35,15 +35,15 @@ //! [`TracerProvider`]: crate::trace::TracerProvider use crate::global; -use crate::runtime::Runtime; +use crate::sdk::trace::runtime::{TraceRuntime, TrySend}; use crate::sdk::trace::Span; use crate::{ sdk::export::trace::{ExportResult, SpanData, SpanExporter}, trace::{TraceError, TraceResult}, Context, }; -use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt}; -use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration}; +use futures::{channel::oneshot, executor, future::Either, pin_mut, StreamExt}; +use std::{env, fmt, str::FromStr, thread, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -211,11 +211,11 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html /// [`tokio`]: https://tokio.rs /// [`async-std`]: https://async.rs -pub struct BatchSpanProcessor { - message_sender: Mutex>, +pub struct BatchSpanProcessor { + message_sender: R::Sender, } -impl fmt::Debug for BatchSpanProcessor { +impl fmt::Debug for BatchSpanProcessor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BatchSpanProcessor") .field("message_sender", &self.message_sender) @@ -223,21 +223,13 @@ impl fmt::Debug for BatchSpanProcessor { } } -impl SpanProcessor for BatchSpanProcessor { +impl SpanProcessor for BatchSpanProcessor { fn on_start(&self, _span: &Span, _cx: &Context) { // Ignored } fn on_end(&self, span: SpanData) { - let result = self - .message_sender - .lock() - .map_err(|_| TraceError::Other("batch span processor mutex poisoned".into())) - .and_then(|mut sender| { - sender - .try_send(BatchMessage::ExportSpan(span)) - .map_err(|err| TraceError::Other(err.into())) - }); + let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); if let Err(err) = result { global::handle_error(err); @@ -245,9 +237,9 @@ impl SpanProcessor for BatchSpanProcessor { } fn force_flush(&self) -> TraceResult<()> { - let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned"))?; let (res_sender, res_receiver) = oneshot::channel(); - sender.try_send(BatchMessage::Flush(Some(res_sender)))?; + self.message_sender + .try_send(BatchMessage::Flush(Some(res_sender)))?; futures::executor::block_on(res_receiver) .map_err(|err| TraceError::Other(err.into())) @@ -255,9 +247,9 @@ impl SpanProcessor for BatchSpanProcessor { } fn shutdown(&mut self) -> TraceResult<()> { - let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned"))?; let (res_sender, res_receiver) = oneshot::channel(); - sender.try_send(BatchMessage::Shutdown(res_sender))?; + self.message_sender + .try_send(BatchMessage::Shutdown(res_sender))?; futures::executor::block_on(res_receiver) .map_err(|err| TraceError::Other(err.into())) @@ -265,23 +257,26 @@ impl SpanProcessor for BatchSpanProcessor { } } +/// Messages sent between application thread and batch span processor's work thread. #[derive(Debug)] -enum BatchMessage { +pub enum BatchMessage { + /// Export spans, usually called when span ends ExportSpan(SpanData), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. Flush(Option>), + /// Shut down the worker thread, push all spans in buffer to the backend. Shutdown(oneshot::Sender), } -impl BatchSpanProcessor { - pub(crate) fn new( +impl BatchSpanProcessor { + pub(crate) fn new( mut exporter: Box, config: BatchConfig, runtime: R, - ) -> Self - where - R: Runtime, - { - let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); + ) -> Self { + let (message_sender, message_receiver) = + runtime.batch_message_channel(config.max_queue_size); let ticker = runtime .interval(config.scheduled_delay) .map(|_| BatchMessage::Flush(None)); @@ -362,16 +357,13 @@ impl BatchSpanProcessor { })); // Return batch processor with link to worker - BatchSpanProcessor { - message_sender: Mutex::new(message_sender), - } + BatchSpanProcessor { message_sender } } /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder + pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder where E: SpanExporter, - R: Runtime, { BatchSpanProcessorBuilder { exporter, @@ -388,7 +380,7 @@ async fn export_with_timeout( batch: Vec, ) -> ExportResult where - R: Runtime, + R: TraceRuntime, E: SpanExporter + ?Sized, { if batch.is_empty() { @@ -487,7 +479,7 @@ pub struct BatchSpanProcessorBuilder { impl BatchSpanProcessorBuilder where E: SpanExporter + 'static, - R: Runtime, + R: TraceRuntime, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -528,7 +520,7 @@ where } /// Build a batch processor - pub fn build(self) -> BatchSpanProcessor { + pub fn build(self) -> BatchSpanProcessor { BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) } }